192 lines
8.5 KiB
Python
192 lines
8.5 KiB
Python
"""
|
|
Trend Surfer Agent
|
|
Agent for identifying and capitalizing on emerging market trends.
|
|
"""
|
|
|
|
import traceback
|
|
from typing import List, Dict, Any, Optional
|
|
from loguru import logger
|
|
|
|
from services.intelligence.agents.specialized_agents import SIFBaseAgent
|
|
from services.intelligence.agents.market_signal_detector import MarketSignalDetector, MarketSignal, UrgencyLevel, SignalType
|
|
from services.intelligence.txtai_service import TxtaiIntelligenceService
|
|
from services.research.trends.google_trends_service import GoogleTrendsService
|
|
|
|
class TrendSurferAgent(SIFBaseAgent):
|
|
"""
|
|
Agent for identifying and capitalizing on emerging market trends.
|
|
"Surfs" the trends detected by MarketSignalDetector to propose timely content.
|
|
"""
|
|
|
|
def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, **kwargs):
|
|
super().__init__(intelligence_service, user_id, agent_type="trend_surfer", **kwargs)
|
|
self.user_id = user_id
|
|
self.signal_detector = MarketSignalDetector(user_id)
|
|
self.trends_service = GoogleTrendsService()
|
|
|
|
async def surf_trends(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Identify high-potential trends and suggest content angles.
|
|
Integrates real-time Google Trends data with MarketSignalDetector signals.
|
|
"""
|
|
self._log_agent_operation("Surfing market trends")
|
|
|
|
try:
|
|
# 1. Get real-time trending searches from Google Trends
|
|
realtime_trends = await self.trends_service.get_trending_searches(user_id=self.user_id)
|
|
logger.info(f"[{self.__class__.__name__}] Found {len(realtime_trends)} real-time trends")
|
|
|
|
# 2. Detect internal market signals (competitors, SERP, etc.)
|
|
signals = await self.signal_detector.detect_market_signals()
|
|
|
|
# 3. Analyze real-time trends and convert to signals if actionable
|
|
trend_signals = await self._analyze_realtime_trends(realtime_trends)
|
|
signals.extend(trend_signals)
|
|
|
|
if not signals:
|
|
logger.info(f"[{self.__class__.__name__}] No active market signals found")
|
|
return []
|
|
|
|
# Filter for actionable trends (High/Critical urgency or High impact)
|
|
actionable_trends = [
|
|
s for s in signals
|
|
if s.urgency_level.value in ['high', 'critical'] or s.impact_score > 0.7
|
|
]
|
|
|
|
logger.info(f"[{self.__class__.__name__}] Found {len(actionable_trends)} actionable trends")
|
|
|
|
opportunities = []
|
|
for trend in actionable_trends:
|
|
opp = await self._analyze_opportunity(trend)
|
|
if opp:
|
|
opportunities.append(opp)
|
|
|
|
return opportunities
|
|
|
|
except Exception as e:
|
|
logger.error(f"[{self.__class__.__name__}] Trend surfing failed: {e}")
|
|
logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}")
|
|
return []
|
|
|
|
async def _analyze_realtime_trends(self, trends: List[str]) -> List[MarketSignal]:
|
|
"""
|
|
Analyze raw trend keywords and convert actionable ones to MarketSignals.
|
|
Uses pytrends (via GoogleTrendsService) to validate interest.
|
|
"""
|
|
signals = []
|
|
# Limit to top 5 for detailed analysis to avoid rate limits
|
|
top_trends = trends[:5]
|
|
|
|
for trend_kw in top_trends:
|
|
try:
|
|
# Get detailed data for the keyword
|
|
trend_data = await self.trends_service.analyze_trends(
|
|
keywords=[trend_kw],
|
|
timeframe="now 7-d", # Last 7 days to see immediate trajectory
|
|
geo="US" # Default to US for now, could be user-configured
|
|
)
|
|
|
|
# Check if rising
|
|
interest_over_time = trend_data.get("interest_over_time", [])
|
|
if not interest_over_time:
|
|
continue
|
|
|
|
# Simple logic: is the last point higher than the average?
|
|
values = [float(point.get(trend_kw, 0)) for point in interest_over_time if trend_kw in point]
|
|
if not values:
|
|
continue
|
|
|
|
avg_interest = sum(values) / len(values)
|
|
last_interest = values[-1]
|
|
|
|
# Calculate impact/urgency
|
|
impact_score = min(last_interest / 100.0, 1.0) # Normalized
|
|
urgency = UrgencyLevel.MEDIUM
|
|
if last_interest > 80:
|
|
urgency = UrgencyLevel.CRITICAL
|
|
elif last_interest > 50:
|
|
urgency = UrgencyLevel.HIGH
|
|
|
|
# Create Signal
|
|
signal = MarketSignal(
|
|
signal_id=f"trend_{trend_kw.replace(' ', '_')}_{int(values[-1])}",
|
|
signal_type=SignalType.SOCIAL_TREND, # Using SOCIAL_TREND as proxy for general search trend
|
|
source="google_trends",
|
|
description=f"Surging interest in '{trend_kw}'",
|
|
impact_score=impact_score,
|
|
urgency_level=urgency,
|
|
confidence_score=0.9,
|
|
related_topics=[t.get("topic_title", "") for t in trend_data.get("related_topics", {}).get("top", [])[:3]],
|
|
suggested_actions=["Create timely content", "Update social media"],
|
|
metadata=trend_data
|
|
)
|
|
signals.append(signal)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"[{self.__class__.__name__}] Failed to analyze trend '{trend_kw}': {e}")
|
|
continue
|
|
|
|
return signals
|
|
|
|
async def _analyze_opportunity(self, trend: MarketSignal) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Analyze a specific trend signal to generate a content opportunity.
|
|
"""
|
|
try:
|
|
# Use semantic search to find if we already have content covering this
|
|
query = f"{trend.description} {' '.join(trend.related_topics)}"
|
|
existing_content = await self.intelligence.search(query, limit=3)
|
|
|
|
coverage_score = 0.0
|
|
if existing_content:
|
|
# If top result has high score, we might already cover it
|
|
coverage_score = existing_content[0].get('score', 0.0)
|
|
|
|
# If already well-covered, might skip or suggest update
|
|
if coverage_score > 0.8:
|
|
recommendation = "Update existing content"
|
|
else:
|
|
recommendation = "Create new content"
|
|
|
|
# Use LLM to generate creative angle
|
|
headline = f"Trend: {trend.description}"
|
|
angle = f"Leverage {trend.source} trend on {trend.related_topics[0] if trend.related_topics else 'topic'}"
|
|
|
|
try:
|
|
prompt = f"""
|
|
Analyze this market trend signal and propose a content angle:
|
|
Trend: {trend.description}
|
|
Related Topics: {', '.join(trend.related_topics)}
|
|
Impact Score: {trend.impact_score}
|
|
Recommendation: {recommendation}
|
|
|
|
Provide a catchy headline and a 1-sentence strategic angle.
|
|
Format: Headline | Angle
|
|
"""
|
|
response = await self._generate_llm_response(prompt)
|
|
if response and "|" in response:
|
|
parts = response.split('|')
|
|
headline = parts[0].strip()
|
|
angle = parts[1].strip()
|
|
elif response:
|
|
angle = response.strip()
|
|
except Exception as e:
|
|
logger.warning(f"[{self.__class__.__name__}] LLM generation failed for opportunity: {e}")
|
|
|
|
return {
|
|
"trend_id": trend.signal_id,
|
|
"topic": trend.description,
|
|
"headline": headline,
|
|
"source": trend.source,
|
|
"urgency": trend.urgency_level.value,
|
|
"impact_score": trend.impact_score,
|
|
"current_coverage": coverage_score,
|
|
"recommendation": recommendation,
|
|
"suggested_angle": angle,
|
|
"detected_at": trend.detected_at
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"[{self.__class__.__name__}] Failed to analyze opportunity for signal {trend.signal_id}: {e}")
|
|
return None
|