diff --git a/backend/services/agent_framework.py b/backend/services/agent_framework.py index b6bcc7b7..0e667b03 100644 --- a/backend/services/agent_framework.py +++ b/backend/services/agent_framework.py @@ -13,12 +13,17 @@ from abc import ABC, abstractmethod # txtai imports for native agent framework try: - from txtai import Agent, LLM - TXTAI_AVAILABLE = Agent.__module__ != "txtai.agent.placeholder" + from txtai.pipeline import Agent, LLM + TXTAI_AVAILABLE = True except ImportError: - TXTAI_AVAILABLE = False - # Fallback implementation for development - logging.warning("txtai not available, using fallback implementation") + try: + from txtai import Agent, LLM + TXTAI_AVAILABLE = True + except ImportError: + TXTAI_AVAILABLE = False + Agent = None + LLM = None + logging.warning("txtai not available") # Optional MLflow integration try: @@ -121,8 +126,10 @@ class BaseALwrityAgent(ABC): if TXTAI_AVAILABLE: try: if not self.llm: - # Hardening: Explicitly set task to avoid 'text2text-generation' default failures - self.llm = LLM(model_name, task="text-generation") + # Allow txtai to auto-detect the correct task for the model + # But force language-generation (txtai mapping for text-generation) for known models + task_hint = "language-generation" if any(x in model_name for x in ["Qwen", "Instruct", "GPT", "Llama"]) else None + self.llm = LLM(path=model_name, task=task_hint) if task_hint else LLM(path=model_name) self.txtai_agent = self._create_txtai_agent() logger.info(f"Initialized txtai agent for {agent_type} - {self.agent_id}") @@ -776,60 +783,9 @@ class StrategyOrchestratorAgent(BaseALwrityAgent): """Create txtai orchestrator agent with coordination tools""" if not TXTAI_AVAILABLE: return None - - return Agent( - llm=self.llm, - tools=[ - { - "name": "market_signal_detector", - "description": "Detects market changes and competitor activities", - "target": self._market_signal_detector_tool - }, - { - "name": "google_trends_fetcher", - "description": "Fetches Google Trends data and embeds it into SIF for retrieval", - "target": self._google_trends_fetcher_tool - }, - { - "name": "agent_coordinator", - "description": "Coordinates actions between multiple agents", - "target": self._agent_coordinator_tool - }, - { - "name": "performance_analyzer", - "description": "Analyzes marketing performance metrics", - "target": self._performance_analyzer_tool - }, - { - "name": "strategy_synthesizer", - "description": "Synthesizes unified strategies from multiple inputs", - "target": self._strategy_synthesizer_tool - }, - { - "name": "task_delegator", - "description": "Delegates specific tasks to specialized agents (content, competitor, seo, social)", - "target": self._delegate_task_tool - } - ], - max_iterations=15, - system=self.get_effective_system_prompt(f"""You are the Marketing Strategy Orchestrator for ALwrity user {self.user_id}. - - Your role is to coordinate all marketing agents, analyze market signals, - and synthesize unified strategies. - - Key Responsibility: DELEGATE tasks to specialized agents. - - Content Strategy Agent: For content analysis, gaps, and optimization. - - Competitor Response Agent: For monitoring and counter-strategies. - - SEO Optimization Agent: For technical SEO and keywords. - - Social Amplification Agent: For social trends and distribution. - - Use the 'task_delegator' tool to assign work to these agents. - Do not just plan; EXECUTE by delegating. - - Always prioritize user goals and maintain safety constraints. - Coordinate multi-agent responses to market changes effectively.""" - ) - ) + + llm = getattr(self.llm, "llm", self.llm) + return Agent(llm=llm, tools=[], max_iterations=15) async def _market_signal_detector_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: """Tool for detecting market signals""" diff --git a/backend/services/intelligence/agents/agent_orchestrator.py b/backend/services/intelligence/agents/agent_orchestrator.py index b00c9bce..afb42f15 100644 --- a/backend/services/intelligence/agents/agent_orchestrator.py +++ b/backend/services/intelligence/agents/agent_orchestrator.py @@ -47,7 +47,7 @@ logger = get_service_logger(__name__) class AgentTeamConfiguration: """Configuration for the complete agent team""" user_id: str - shared_llm: str = "Qwen/Qwen3-4B-Instruct-2507" + shared_llm: str = "Qwen/Qwen2.5-3B-Instruct" # Updated to a stable model known for text-generation max_iterations: int = 15 enable_safety: bool = True enable_performance_monitoring: bool = True @@ -70,6 +70,9 @@ class ALwrityAgentOrchestrator: self.performance_monitor: Optional[AgentPerformanceMonitor] = None self.safety_framework: Optional[Dict[str, Any]] = None + # Initialize execution history for tracking agent activities + self.execution_history: List[Dict[str, Any]] = [] + # Initialize components self._initialize_components() @@ -80,8 +83,14 @@ class ALwrityAgentOrchestrator: try: # Initialize shared LLM if TXTAI_AVAILABLE: - # Hardening: Explicitly set task to avoid 'text2text-generation' default failures - self.llm = LLM(self.config.shared_llm, task="text-generation") + try: + # Allow auto-detection of task + self.llm = LLM(self.config.shared_llm) + except Exception as e: + logger.error( + f"Failed to initialize shared LLM '{self.config.shared_llm}': {e}" + ) + raise else: self.llm = None @@ -117,6 +126,36 @@ class ALwrityAgentOrchestrator: status = onboarding_service.get_onboarding_status(self.user_id) if not status.get("is_completed", False): logger.info(f"Skipping agent initialization for user {self.user_id} - Onboarding incomplete") + self.execution_history.append({ + "user_id": self.user_id, + "timestamp": datetime.utcnow().isoformat(), + "agent_id": "system", + "action": "system_check", + "status": "pending", + "details": "Agent initialization paused. Waiting for user onboarding completion.", + "agent_name": "System Orchestrator", + "agent_type": "orchestrator" + }) + + # Persist this check to DB so it survives refreshes + try: + from services.database import get_session_for_user + from services.agent_activity_service import AgentActivityService + db = get_session_for_user(self.user_id) + if db: + try: + activity_service = AgentActivityService(db, self.user_id) + run = activity_service.start_run(agent_type="system_orchestrator", prompt="System Check") + activity_service.finish_run( + run_id=run.id, + success=True, + result_summary="Agent initialization paused. Waiting for user onboarding completion." + ) + finally: + db.close() + except Exception: + pass + return except Exception as e: logger.warning(f"Could not check onboarding status for {self.user_id}: {e}") @@ -141,10 +180,14 @@ class ALwrityAgentOrchestrator: except Exception: pass + # Track successful initializations + initialized_agents = [] + # Content Strategy Agent if enabled_by_key.get("content_strategist", True): self.content_agent = ContentStrategyAgent(self.user_id, self.config.shared_llm, llm=self.llm) self.agents['content'] = self.content_agent + initialized_agents.append("Content Strategist") # Strategy Architect Agent if enabled_by_key.get("strategy_architect", True): @@ -152,43 +195,37 @@ class ALwrityAgentOrchestrator: intel_service = TxtaiIntelligenceService(self.user_id) self.strategy_agent = StrategyArchitectAgent(intel_service, self.user_id) self.agents['strategy'] = self.strategy_agent + initialized_agents.append("Strategy Architect") # Competitor Response Agent if enabled_by_key.get("competitor_analyst", True): self.competitor_agent = CompetitorResponseAgent(self.user_id, self.config.shared_llm, llm=self.llm) self.agents['competitor'] = self.competitor_agent + initialized_agents.append("Competitor Analyst") # SEO Optimization Agent if enabled_by_key.get("seo_specialist", True): self.seo_agent = SEOOptimizationAgent(self.user_id, self.config.shared_llm, llm=self.llm) self.agents['seo'] = self.seo_agent + initialized_agents.append("SEO Specialist") # Social Amplification Agent if enabled_by_key.get("social_media_manager", True): self.social_agent = SocialAmplificationAgent(self.user_id, self.config.shared_llm, llm=self.llm) self.agents['social'] = self.social_agent - - # Strategy Architect Agent - if enabled_by_key.get("strategy_architect", True): - from services.intelligence.txtai_service import TxtaiIntelligenceService - intel_service = TxtaiIntelligenceService(self.user_id) - self.strategy_agent = StrategyArchitectAgent(intel_service, self.user_id) - self.agents['strategy'] = self.strategy_agent + initialized_agents.append("Social Media Manager") # Trend Surfer Agent if enabled_by_key.get("trend_surfer", True): - # TrendSurferAgent needs TxtaiIntelligenceService, which we might need to get from SIF or initialize - # For now, we assume SIF integration is handled elsewhere or we pass a mock/stub if needed - # But wait, TrendSurferAgent constructor is (intelligence_service, user_id) - # We need to get the intelligence service here. - # Since AgentOrchestrator doesn't hold TxtaiIntelligenceService directly (SIFIntegrationService does), - # this is tricky. - # However, SIFIntegrationService initializes AgentOrchestrator. - # Let's import TxtaiIntelligenceService and initialize it here for the agent - from services.intelligence.txtai_service import TxtaiIntelligenceService - intel_service = TxtaiIntelligenceService(self.user_id) - self.trend_surfer_agent = TrendSurferAgent(intel_service, self.user_id) - self.agents['trend'] = self.trend_surfer_agent + # TrendSurferAgent needs TxtaiIntelligenceService + try: + from services.intelligence.txtai_service import TxtaiIntelligenceService + intel_service = TxtaiIntelligenceService(self.user_id) + self.trend_surfer_agent = TrendSurferAgent(intel_service, self.user_id) + self.agents['trend'] = self.trend_surfer_agent + initialized_agents.append("Trend Surfer") + except Exception as e: + logger.error(f"Failed to initialize TrendSurferAgent: {e}") # Content Guardian Agent if enabled_by_key.get("content_guardian", True): @@ -206,11 +243,51 @@ class ALwrityAgentOrchestrator: sif_service=None # SIF service is optional/circular dependency handling ) self.agents['guardian'] = self.content_guardian_agent + initialized_agents.append("Content Guardian") logger.info(f"Initialized ContentGuardianAgent for user {self.user_id}") except Exception as e: logger.error(f"Failed to initialize ContentGuardianAgent: {e}") logger.info(f"Created {len(self.agents)} specialized agents for user {self.user_id}") + + # Log initialization activity + if initialized_agents: + # 1. Add to in-memory history + self.execution_history.append({ + "user_id": self.user_id, + "timestamp": datetime.utcnow().isoformat(), + "agent_id": "system", + "action": "agent_initialization", + "status": "completed", + "details": f"Successfully initialized agent team: {', '.join(initialized_agents)}", + "agent_name": "System Orchestrator", + "agent_type": "orchestrator" + }) + + # 2. Add to persistent database history (so dashboard sees it on refresh) + try: + from services.database import get_session_for_user + from services.agent_activity_service import AgentActivityService + + db = get_session_for_user(self.user_id) + if db: + try: + activity_service = AgentActivityService(db, self.user_id) + # Create a run record + run = activity_service.start_run( + agent_type="system_orchestrator", + prompt="Initialize Autonomous Agent Team" + ) + # Immediately finish it + activity_service.finish_run( + run_id=run.id, + success=True, + result_summary=f"Successfully initialized agent team: {', '.join(initialized_agents)}" + ) + finally: + db.close() + except Exception as e: + logger.error(f"Failed to log initialization activity to DB: {e}") except Exception as e: logger.error(f"Error creating specialized agents for user {self.user_id}: {e}") @@ -258,6 +335,11 @@ class ALwrityAgentOrchestrator: "4. Provide specific action recommendations.\n\n" "Return a comprehensive strategy with specific actions, priorities, and expected outcomes." ) + + # Ensure orchestrator is initialized + if not self.orchestrator_agent: + self._create_orchestrator_agent() + orchestrator_prompt = self.orchestrator_agent.build_task_prompt(instruction=instruction, task_context=context) result = await self.orchestrator_agent.run(orchestrator_prompt) @@ -322,6 +404,7 @@ class ALwrityAgentOrchestrator: "timestamp": datetime.utcnow().isoformat(), "agent_statuses": agent_statuses, "performance_summary": performance_summary, + "recent_activity": self.get_execution_history(limit=5), "market_signals_active": self.config.enable_market_signals, "safety_enabled": self.config.enable_safety, "performance_monitoring_enabled": self.config.enable_performance_monitoring @@ -342,6 +425,10 @@ class ALwrityAgentOrchestrator: # Helper methods + def get_execution_history(self, limit: int = 100) -> List[Dict[str, Any]]: + """Get execution history for this orchestrator""" + return self.execution_history[-limit:] + async def _prepare_orchestrator_context(self, market_context: Dict[str, Any]) -> Dict[str, Any]: """Prepare comprehensive context for orchestrator""" context = { @@ -382,7 +469,13 @@ class AgentOrchestrationService: self.orchestrators[user_id] = ALwrityAgentOrchestrator(config) logger.info(f"Created new orchestrator for user: {user_id}") - return self.orchestrators[user_id] + # Ensure initialization happened, if not try again (e.g. if onboarding was just completed) + orchestrator = self.orchestrators[user_id] + if not orchestrator.agents and not orchestrator.execution_history: + logger.info(f"Orchestrator for {user_id} has no agents. Attempting re-initialization.") + orchestrator._create_specialized_agents() + + return orchestrator async def execute_marketing_strategy(self, user_id: str, market_context: Dict[str, Any]) -> Dict[str, Any]: """Execute marketing strategy for a user""" diff --git a/backend/services/intelligence/agents/core_agent_framework.py b/backend/services/intelligence/agents/core_agent_framework.py index cba44f77..d32416d7 100644 --- a/backend/services/intelligence/agents/core_agent_framework.py +++ b/backend/services/intelligence/agents/core_agent_framework.py @@ -13,12 +13,19 @@ from abc import ABC, abstractmethod # txtai imports for native agent framework try: - from txtai import Agent, LLM - TXTAI_AVAILABLE = Agent.__module__ != "txtai.agent.placeholder" + from txtai.pipeline import Agent, LLM + TXTAI_AVAILABLE = True except ImportError: - TXTAI_AVAILABLE = False - # Fallback implementation for development - logging.warning("txtai not available, using fallback implementation") + try: + from txtai import Agent, LLM + TXTAI_AVAILABLE = True + except ImportError: + TXTAI_AVAILABLE = False + Agent = None + LLM = None + logging.warning("txtai not available") + +_core_llm_cache = {} # Optional MLflow integration try: @@ -162,12 +169,15 @@ class BaseALwrityAgent(ABC): _prompt_context_cache: Dict[str, Dict[str, Any]] = {} _profile_cache: Dict[str, Dict[str, Any]] = {} - def __init__(self, user_id: str, agent_type: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None, enable_tracing: bool = True): + def __init__(self, user_id: str, agent_type: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None, enable_tracing: bool = True, **kwargs): self.user_id = user_id self.agent_type = agent_type self.model_name = model_name self.agent_id = f"{agent_type}_{user_id}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}" self.enable_tracing = enable_tracing + self.kwargs = kwargs + # Optional task hint; do not enforce + self.llm_task = str(self.kwargs.get("task") or "").strip() self.performance = AgentPerformance( agent_id=self.agent_id, total_actions=0, @@ -191,26 +201,90 @@ class BaseALwrityAgent(ABC): self._agent_profile = self._load_agent_profile_overrides() self._prompt_context = self._load_prompt_context() + # Note: We cannot await async methods in __init__. + # If _create_txtai_agent is async, it must be called explicitly after initialization + # or we must use a factory method. + # For now, we will handle sync initialization here, and specialized agents + # must handle their async initialization separately or via a sync wrapper. + if TXTAI_AVAILABLE: try: if not self.llm: - # Create new LLM if not provided - # Hardening: Explicitly set task to avoid 'text2text-generation' default failures - raw_llm = LLM(model_name, task="text-generation") - # Wrap it + try: + # Use global cache for core agent LLMs too + cache_key = f"{model_name}:language-generation" + if cache_key in _core_llm_cache: + raw_llm = _core_llm_cache[cache_key] + else: + # Use language-generation for txtai internal mapping + raw_llm = LLM(path=model_name, task="language-generation") + _core_llm_cache[cache_key] = raw_llm + except Exception as e: + logger.error(f"Failed to initialize LLM for {agent_type}: {e}") + raise self.llm = TrackingLLMWrapper(raw_llm, self.user_id, self.model_name) try: - self.txtai_agent = self._create_txtai_agent() - logger.info(f"Initialized txtai agent for {agent_type} - {self.agent_id}") + # _create_txtai_agent might be async or sync + # CRITICAL FIX: We cannot await here. If it's async, we must run it in a loop or warn. + # Given specialized agents define it as async, we need a sync wrapper or run_until_complete. + + if asyncio.iscoroutinefunction(self._create_txtai_agent): + try: + # Check if we are in a running loop + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop and loop.is_running(): + # We are already in a loop (e.g. server), we can't block. + # This is a design flaw in initializing async agents in __init__. + # We will defer initialization or use a sync wrapper if possible. + logger.warning(f"Cannot await async _create_txtai_agent for {agent_type} in __init__ within running loop. Initializing via create_task (agent may not be ready immediately).") + + # Create a task to initialize it + async def async_init(): + try: + self.txtai_agent = await self._create_txtai_agent() + logger.info(f"Async initialized txtai agent for {agent_type} - {self.agent_id}") + except Exception as e: + logger.error(f"Async initialization failed for {agent_type}: {e}") + + loop.create_task(async_init()) + # Temporarily set to None or a placeholder, but we can't set it to the result yet + self.txtai_agent = None + else: + # No running loop, we can run_until_complete + if not loop: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + self.txtai_agent = loop.run_until_complete(self._create_txtai_agent()) + except Exception as e: + logger.error(f"Failed to handle async initialization for {agent_type}: {e}") + # Try fallback to sync run if possible + try: + self.txtai_agent = asyncio.run(self._create_txtai_agent()) + except Exception as e2: + logger.error(f"Even asyncio.run failed: {e2}") + raise e + else: + self.txtai_agent = self._create_txtai_agent() + + if self.txtai_agent: + logger.info(f"Initialized txtai agent for {agent_type} - {self.agent_id}") + else: + raise RuntimeError(f"txtai agent creation returned None for {agent_type}") except Exception as inner_e: - logger.warning(f"Could not initialize specific txtai agent for {agent_type}: {inner_e}") - self.txtai_agent = self._create_fallback_agent() + logger.error(f"Could not initialize specific txtai agent for {agent_type}: {inner_e}") + # Fail fast: Re-raise exception + raise inner_e except Exception as e: logger.error(f"Failed to initialize txtai agent for {agent_type}: {e}") - self.txtai_agent = self._create_fallback_agent() + # Fail fast: Re-raise exception + raise e else: - self.txtai_agent = self._create_fallback_agent() + raise RuntimeError(f"txtai not available for {agent_type}") # Initialize safety framework self.safety_framework = get_safety_framework(user_id) @@ -253,6 +327,10 @@ class BaseALwrityAgent(ABC): return "strategy_orchestrator" return value + def _resolve_llm_task(self, requested_task: Optional[str]) -> str: + # No enforcement; return provided value or empty string + return str(requested_task or "").strip() + def _load_agent_profile_overrides(self) -> Dict[str, Any]: cache_key = f"{self.user_id}:{self.agent_key}" cached = BaseALwrityAgent._profile_cache.get(cache_key) @@ -880,8 +958,8 @@ class BaseALwrityAgent(ABC): class StrategyOrchestratorAgent(BaseALwrityAgent): """Central orchestrator agent that coordinates all marketing agents""" - def __init__(self, user_id: str, market_detector: Any = None, performance_monitor: Any = None, llm: Any = None): - super().__init__(user_id, "StrategyOrchestrator", llm=llm) + def __init__(self, user_id: str, market_detector: Any = None, performance_monitor: Any = None, llm: Any = None, **kwargs): + super().__init__(user_id, "StrategyOrchestrator", llm=llm, **kwargs) self.market_detector = market_detector self.performance_monitor = performance_monitor self.sub_agents = {} @@ -895,67 +973,11 @@ class StrategyOrchestratorAgent(BaseALwrityAgent): """Create txtai orchestrator agent with coordination tools""" if not TXTAI_AVAILABLE: return None - - return Agent( - llm=self.llm, - tools=[ - { - "name": "market_signal_detector", - "description": "Detects market changes and competitor activities", - "target": self._market_signal_detector_tool - }, - { - "name": "google_trends_fetcher", - "description": "Fetches Google Trends data and embeds it into SIF for retrieval", - "target": self._google_trends_fetcher_tool - }, - { - "name": "agent_coordinator", - "description": "Coordinates actions between multiple agents", - "target": self._agent_coordinator_tool - }, - { - "name": "performance_analyzer", - "description": "Analyzes marketing performance metrics", - "target": self._performance_analyzer_tool - }, - { - "name": "strategy_synthesizer", - "description": "Synthesizes unified strategies from multiple inputs", - "target": self._strategy_synthesizer_tool - }, - { - "name": "task_delegator", - "description": "Delegates specific tasks to specialized agents (content, competitor, seo, social)", - "target": self._delegate_task_tool - }, - { - "name": "kickoff_gsc_first_pass", - "description": "Kicks off first-pass execution by invoking SEO/Content default GSC plans", - "target": self._kickoff_gsc_first_pass_tool - } - ], - max_iterations=15, - system=self.get_effective_system_prompt(f"""You are the Marketing Strategy Orchestrator for ALwrity user {self.user_id}. - - Your role is to coordinate all marketing agents, analyze market signals, - and synthesize unified strategies. - - Key Responsibility: DELEGATE tasks to specialized agents. - - Content Strategy Agent: For content analysis, gaps, and optimization. - - Competitor Response Agent: For monitoring and counter-strategies. - - SEO Optimization Agent: For technical SEO and keywords. - - Social Amplification Agent: For social trends and distribution. - - Use the 'task_delegator' tool to assign work to these agents. - Do not just plan; EXECUTE by delegating. - - Always prioritize user goals and maintain safety constraints. - Coordinate multi-agent responses to market changes effectively. - - First, call 'kickoff_gsc_first_pass' to ground the plan on live GSC signals.""" - ) - ) + + _llm_for_agent = self.llm + for _ in range(3): + _llm_for_agent = getattr(_llm_for_agent, "llm", _llm_for_agent) + return Agent(llm=_llm_for_agent, tools=[], max_iterations=15) async def _market_signal_detector_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: """Tool for detecting market signals""" diff --git a/backend/services/intelligence/agents/specialized_agents.py b/backend/services/intelligence/agents/specialized_agents.py index 333799b0..f1064fd2 100644 --- a/backend/services/intelligence/agents/specialized_agents.py +++ b/backend/services/intelligence/agents/specialized_agents.py @@ -1,2661 +1,28 @@ """ -SIF Agent Interfaces -Defines the specialized agents for digital marketing and SEO. -Each agent leverages TxtaiIntelligenceService for semantic operations. +DEPRECATED: This module is being refactored. +Please import from 'services.intelligence.agents.specialized' instead. """ - -import traceback -import json -import asyncio -import re -from collections import Counter -from typing import List, Dict, Any, Optional -from datetime import datetime -from loguru import logger -from ..txtai_service import TxtaiIntelligenceService -from services.intelligence.agents.core_agent_framework import BaseALwrityAgent, AgentAction, TaskProposal -from services.seo_tools.content_strategy_service import ContentStrategyService -from services.analytics import PlatformAnalyticsService -from services.intelligence.sif_agents import SharedLLMWrapper, LocalLLMWrapper -try: - from services.intelligence.sif_integration import SIFIntegrationService - SIF_AVAILABLE = True -except ImportError: - SIF_AVAILABLE = False - -try: - # Try importing from pipeline first (standard location) - from txtai.pipeline import Agent, LLM - TXTAI_AVAILABLE = True -except ImportError: - try: - # Fallback to top-level import - from txtai import Agent, LLM - TXTAI_AVAILABLE = True - except ImportError: - TXTAI_AVAILABLE = False - Agent = None - LLM = None - logger.warning("txtai not available, using fallback implementation") - -class SIFBaseAgent(BaseALwrityAgent): - def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, agent_type: str = "sif_agent", model_name: str = "Qwen/Qwen2.5-3B-Instruct", llm: Any = None): - # Hybrid LLM Strategy: - # 1. Shared LLM for external/high-quality generation - self.shared_llm = SharedLLMWrapper(user_id) - - # 2. Local LLM for internal agent work (default for SIF agents) - if llm is None: - if TXTAI_AVAILABLE: - # Use Lazy Local LLM - llm = LocalLLMWrapper(model_name) - else: - # Fallback to Shared if txtai not available - llm = self.shared_llm - - super().__init__(user_id, agent_type, model_name, llm) - self.intelligence = intelligence_service - - def _log_agent_operation(self, operation: str, **kwargs): - """Standardized logging for agent operations.""" - logger.info(f"[{self.__class__.__name__}] {operation}") - if kwargs: - logger.debug(f"[{self.__class__.__name__}] Parameters: {kwargs}") - - def _create_txtai_agent(self): - """ - SIF agents use the intelligence service directly, but we can expose - capabilities via a standard agent interface if needed. - """ - if not TXTAI_AVAILABLE or Agent is None: - return None - - # Return a simple agent that can use the LLM - try: - return Agent(llm=self.llm, tools=[]) - except Exception as e: - logger.warning(f"Failed to create txtai Agent: {e}") - return None - -class StrategyArchitectAgent(SIFBaseAgent): - """Agent for discovering content pillars and identifying strategic gaps.""" - - def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str): - super().__init__(intelligence_service, user_id, agent_type="strategy_architect") - - async def discover_pillars(self) -> List[Dict[str, Any]]: - """Identify content pillars through semantic clustering.""" - self._log_agent_operation("Discovering content pillars") - - try: - # Check if intelligence service is initialized - if not self.intelligence.is_initialized(): - logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized") - return [] - - clusters = await self.intelligence.cluster(min_score=0.6) - - if not clusters: - logger.warning(f"[{self.__class__.__name__}] No clusters found") - return [] - - # Create pillar objects with metadata - pillars = [] - for i, cluster_indices in enumerate(clusters): - pillar = { - "pillar_id": f"pillar_{i}", - "indices": cluster_indices, - "size": len(cluster_indices), - "confidence": self._calculate_cluster_confidence(cluster_indices) - } - pillars.append(pillar) - logger.debug(f"[{self.__class__.__name__}] Created pillar {pillar['pillar_id']} with {pillar['size']} items") - - logger.info(f"[{self.__class__.__name__}] Discovered {len(pillars)} content pillars") - return pillars - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Failed to discover pillars: {e}") - logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") - return [] - - def _calculate_cluster_confidence(self, cluster_indices: List[int]) -> float: - """Calculate confidence score for a cluster based on its size and coherence.""" - # Simple confidence based on cluster size - larger clusters are more reliable - return min(1.0, len(cluster_indices) / 10.0) - - async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]: - """Propose PLAN pillar tasks based on semantic analysis.""" - proposals = [] - - # 1. Pillar Health Check - try: - # We use a shorter timeout or cached check if possible, but discover_pillars is fairly fast - pillars = await self.discover_pillars() - if not pillars: - proposals.append(TaskProposal( - title="Establish Content Pillars", - description="Your content strategy lacks defined pillars. Let's analyze your niche to find core topics.", - pillar_id="plan", - priority="high", - estimated_time=15, - source_agent="StrategyArchitectAgent", - reasoning="No content pillars detected via SIF clustering.", - action_type="navigate", - action_url="/content-planning-dashboard" - )) - elif len(pillars) < 3: - proposals.append(TaskProposal( - title="Expand Content Pillars", - description=f"You only have {len(pillars)} active pillars. Consider diversifying your strategy.", - pillar_id="plan", - priority="medium", - estimated_time=20, - source_agent="StrategyArchitectAgent", - reasoning=f"Low pillar diversity ({len(pillars)} detected).", - action_type="navigate", - action_url="/content-planning-dashboard" - )) - except Exception as e: - logger.warning(f"[{self.__class__.__name__}] Error checking pillars for proposals: {e}") - - # 2. Strategy Review (Generic fallback) - proposals.append(TaskProposal( - title="Review Strategic Goals", - description="Ensure your content output aligns with your quarterly business goals.", - pillar_id="plan", - priority="low", - estimated_time=10, - source_agent="StrategyArchitectAgent", - reasoning="Routine strategy maintenance.", - action_type="navigate", - action_url="/content-planning-dashboard" - )) - - return proposals - - async def find_semantic_gaps(self, competitor_indices: List[Any]) -> List[Dict[str, Any]]: - """Compare user content vs competitor content to find missing topics.""" - self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices)) - - try: - documents = await self._fetch_index_documents() - if not documents: - logger.info(f"[{self.__class__.__name__}] No indexed documents available for gap detection") - return [] - - competitor_docs, user_docs = [], [] - allowed_competitor_ids = set(str(idx) for idx in competitor_indices) if competitor_indices else None - if allowed_competitor_ids: - for idx in competitor_indices: - if isinstance(idx, int) and 0 <= idx < len(documents): - allowed_competitor_ids.add(str(documents[idx].get("id", ""))) - - for doc in documents: - metadata = doc.get("metadata", {}) - role = self._infer_document_role(metadata) - if role == "competitor": - if allowed_competitor_ids and str(doc.get("id")) not in allowed_competitor_ids: - continue - competitor_docs.append(doc) - elif role == "user": - user_docs.append(doc) - - if not competitor_docs or not user_docs: - logger.info( - f"[{self.__class__.__name__}] Insufficient split for gap analysis: " - f"user_docs={len(user_docs)}, competitor_docs={len(competitor_docs)}" - ) - return [] - - competitor_topics = self._extract_topic_density(competitor_docs) - user_topics = self._extract_topic_density(user_docs) - competitor_topic_docs = self._map_topic_to_doc_titles(competitor_docs) - user_topic_docs = self._map_topic_to_doc_titles(user_docs) - - gaps = [] - for topic, competitor_density in competitor_topics.items(): - user_density = user_topics.get(topic, 0.0) - coverage_delta = competitor_density - user_density - if coverage_delta <= 0.08: - continue - - competitor_support = len(competitor_topic_docs.get(topic, [])) - user_support = len(user_topic_docs.get(topic, [])) - confidence = max(0.0, min(1.0, (coverage_delta * 0.65) + (min(1.0, competitor_support / 4) * 0.35))) - severity_score = max(0.0, min(1.0, (coverage_delta * 0.7) + (confidence * 0.3))) - priority = "high" if severity_score >= 0.72 else "medium" if severity_score >= 0.45 else "low" - gaps.append({ - "topic": topic, - "priority": priority, - "reason": ( - f"Competitors mention '{topic}' substantially more often " - f"(density {competitor_density:.2f} vs {user_density:.2f})." - ), - "confidence": round(confidence, 3), - "severity_score": round(severity_score, 3), - "coverage_delta": round(coverage_delta, 4), - "topic_density": { - "competitor": round(competitor_density, 4), - "user": round(user_density, 4), - "gap": round(coverage_delta, 4) - }, - "evidence": { - "competitor_sample_titles": self._sample_titles_for_topic(competitor_docs, topic), - "user_sample_titles": self._sample_titles_for_topic(user_docs, topic), - "competitor_supporting_docs": competitor_support, - "user_supporting_docs": user_support, - "competitor_doc_count": len(competitor_docs), - "user_doc_count": len(user_docs) - } - }) - - gaps.sort( - key=lambda item: ( - item.get("severity_score", 0), - item.get("confidence", 0), - item.get("topic_density", {}).get("gap", 0) - ), - reverse=True - ) - return gaps[:12] - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Failed to find semantic gaps: {e}") - logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") - return [] - - async def _fetch_index_documents(self) -> List[Dict[str, Any]]: - """Fetch indexed documents and normalize metadata from txtai result objects.""" - if not self.intelligence.is_initialized() or not self.intelligence.embeddings: - return [] - - embeddings = self.intelligence.embeddings - limit = 0 - if hasattr(embeddings, "count"): - try: - limit = int(embeddings.count()) - except Exception: - limit = 0 - - documents = [] - candidate_queries = [] - if limit > 0: - candidate_queries.extend([ - f"select id, text, object from txtai limit {limit}", - f"select id, text, tags from txtai limit {limit}" - ]) - candidate_queries.extend(["marketing", "content", "seo", "strategy", "social media"]) - - seen_ids = set() - for query in candidate_queries: - try: - query_limit = limit if query.startswith("select") and limit > 0 else max(10, limit or 50) - rows = embeddings.search(query, limit=query_limit) - except Exception: - continue - - for row in rows or []: - doc_id = str(row.get("id", "")) - dedupe_key = doc_id or str(hash(f"{row.get('text','')}::{row.get('score',0)}")) - if dedupe_key in seen_ids: - continue - seen_ids.add(dedupe_key) - documents.append({ - "id": doc_id, - "text": row.get("text", "") or "", - "metadata": self._normalize_metadata(row) - }) - - if limit > 0 and len(documents) >= limit: - break - - return documents - - def _normalize_metadata(self, row: Dict[str, Any]) -> Dict[str, Any]: - """Normalize metadata payloads from txtai search rows.""" - for key in ("object", "tags", "metadata", "meta"): - payload = row.get(key) - if isinstance(payload, dict): - return payload - if isinstance(payload, str): - try: - parsed = json.loads(payload) - if isinstance(parsed, dict): - return parsed - except Exception: - continue - return {} - - def _extract_topic_density(self, documents: List[Dict[str, Any]]) -> Dict[str, float]: - """Extract topic density from document metadata and titles.""" - topic_counter: Counter = Counter() - - for doc in documents: - for topic in self._extract_topics_from_document(doc): - topic_counter[topic] += 1 - - total_docs = max(1, len(documents)) - return { - topic: count / total_docs - for topic, count in topic_counter.items() - if count >= 2 - } - - def _infer_document_role(self, metadata: Dict[str, Any]) -> str: - """Infer whether a document belongs to user content or competitor content.""" - signals = [ - metadata.get("type", ""), - metadata.get("doc_type", ""), - metadata.get("content_type", ""), - metadata.get("source", ""), - metadata.get("origin", "") - ] - signal_blob = " ".join(str(item).lower() for item in signals if item) - - if any(token in signal_blob for token in ("competitor", "rival", "market_peer")): - return "competitor" - if any(token in signal_blob for token in ("user", "owned", "first_party", "customer_site")): - return "user" - return "unknown" - - def _extract_topics_from_document(self, doc: Dict[str, Any]) -> List[str]: - """Extract normalized topic labels from metadata and lightweight text fields.""" - metadata = doc.get("metadata", {}) - candidates: List[str] = [] - - for key in ("topics", "topic", "themes", "theme", "keywords", "keyword", "tags", "category", "categories"): - value = metadata.get(key) - if isinstance(value, list): - candidates.extend([str(v) for v in value if v]) - elif isinstance(value, str) and value.strip(): - candidates.extend(re.split(r"[,|/]", value)) - - title = metadata.get("title") or doc.get("text", "")[:160] - if title: - candidates.extend(re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower())) - - stopwords = { - "with", "from", "that", "this", "your", "about", "into", "using", "guide", "best", - "tips", "what", "when", "where", "how", "the", "and", "for", "2024", "2025" - } - normalized = { - item.strip().lower() - for item in candidates - if item - and len(item.strip()) >= 4 - and not item.strip().isdigit() - and item.strip().lower() not in stopwords - } - return sorted(normalized) - - def _map_topic_to_doc_titles(self, documents: List[Dict[str, Any]]) -> Dict[str, List[str]]: - """Map each topic to a list of document titles that support it.""" - mapping: Dict[str, List[str]] = {} - for doc in documents: - metadata = doc.get("metadata", {}) - title = str(metadata.get("title") or doc.get("text", "")[:100] or "Untitled") - for topic in self._extract_topics_from_document(doc): - mapping.setdefault(topic, []).append(title) - return mapping - - def _sample_titles_for_topic(self, documents: List[Dict[str, Any]], topic: str, limit: int = 3) -> List[str]: - """Return sample titles for a topic.""" - samples = [] - topic_lower = topic.lower() - for doc in documents: - metadata = doc.get("metadata", {}) - title = metadata.get("title") or doc.get("text", "")[:100] - if not title: - continue - - haystack = f"{title} {json.dumps(metadata, default=str)}".lower() - if topic_lower in haystack: - samples.append(str(title)) - if len(samples) >= limit: - break - - return samples - -class ContentGuardianAgent(SIFBaseAgent): - """Agent for preventing cannibalization and ensuring content originality.""" - - CANNIBALIZATION_THRESHOLD = 0.85 # Similarity threshold for cannibalization warning - ORIGINALITY_THRESHOLD = 0.75 # Minimum originality score - - def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, sif_service: Any = None): - super().__init__(intelligence_service, user_id, agent_type="content_guardian") - self.sif_service = sif_service - - # Lazy initialization of SIF service if not provided - if self.sif_service is None and SIF_AVAILABLE: - try: - self.sif_service = SIFIntegrationService(user_id) - logger.info(f"[{self.__class__.__name__}] Lazily initialized SIFIntegrationService") - except Exception as e: - logger.warning(f"[{self.__class__.__name__}] Failed to lazily initialize SIF service: {e}") - - async def assess_content_quality(self, content: str) -> Dict[str, Any]: - """ - Assess content quality based on originality, readability, and cannibalization risks. - """ - self._log_agent_operation("Assessing content quality", content_length=len(content)) - - try: - # 1. Check for cannibalization - cannibalization_result = await self.check_cannibalization(content) - - # 2. Check originality (if not cannibalized) - originality_score = 1.0 - if not cannibalization_result.get("warning"): - originality_result = await self.verify_originality(content, None) - originality_score = originality_result.get("originality_score", 1.0) - - # 3. Check Style Compliance - style_result = await self.style_enforcer(content) - style_score = style_result.get("compliance_score", 1.0) - - # 4. Basic Readability (Flesch-Kincaid proxy via sentence length/word complexity) - # Simple heuristic for now - words = content.split() - sentences = content.split('.') - avg_sentence_length = len(words) / max(1, len(sentences)) - readability_score = 1.0 if avg_sentence_length < 20 else max(0.5, 1.0 - (avg_sentence_length - 20) * 0.05) - - # Weighted Score: Originality (40%) + Style (30%) + Readability (30%) - quality_score = (originality_score * 0.4) + (style_score * 0.3) + (readability_score * 0.3) - - return { - "quality_score": quality_score, - "originality_score": originality_score, - "readability_score": readability_score, - "style_score": style_score, - "cannibalization_risk": cannibalization_result, - "style_compliance": style_result, - "is_acceptable": quality_score > 0.7 and not cannibalization_result.get("warning", False) - } - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Failed to assess content quality: {e}") - return {"error": str(e), "quality_score": 0.0} - - async def check_cannibalization(self, new_draft: str) -> Dict[str, Any]: - """Check if a new draft competes semantically with existing pages.""" - self._log_agent_operation("Checking for semantic cannibalization", draft_length=len(new_draft)) - - try: - if not self.intelligence.is_initialized(): - logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized") - return {"warning": False, "error": "Service not initialized"} - - if not new_draft or len(new_draft.strip()) < 50: - logger.warning(f"[{self.__class__.__name__}] Draft too short for meaningful analysis") - return {"warning": False, "reason": "Draft too short"} - - results = await self.intelligence.search(new_draft, limit=1) - - if not results: - logger.info(f"[{self.__class__.__name__}] No similar content found - draft is unique") - return {"warning": False, "uniqueness_score": 1.0} - - top_result = results[0] - similarity_score = top_result.get('score', 0.0) - - logger.debug(f"[{self.__class__.__name__}] Top similarity score: {similarity_score:.4f}") - - if similarity_score > self.CANNIBALIZATION_THRESHOLD: - warning_data = { - "warning": True, - "similar_to": top_result.get('id', 'unknown'), - "score": similarity_score, - "threshold": self.CANNIBALIZATION_THRESHOLD, - "recommendation": "Consider revising the draft to target a different angle or merge with existing content" - } - logger.warning(f"[{self.__class__.__name__}] Cannibalization detected: {warning_data}") - return warning_data - - logger.info(f"[{self.__class__.__name__}] No cannibalization detected. Draft is sufficiently unique.") - return {"warning": False, "uniqueness_score": 1.0 - similarity_score} - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Failed to check cannibalization: {e}") - logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") - return {"warning": False, "error": str(e)} - - async def verify_originality(self, text: str, competitor_index: Any) -> Dict[str, Any]: - """Verify originality against competitor content index.""" - self._log_agent_operation("Verifying originality against competitors", text_length=len(text)) - - try: - if not text or len(text.strip()) < 50: - logger.warning(f"[{self.__class__.__name__}] Text too short for meaningful originality check") - return {"originality_score": 0.0, "reason": "Text too short"} - - # STUB: Implement cross-index search against competitor content - # This would search the text against a competitor-specific index - - logger.info(f"[{self.__class__.__name__}] Originality verification stub completed") - return { - "originality_score": 0.95, # Placeholder - "confidence": 0.8, - "method": "semantic_comparison", - "notes": "Competitor index integration pending" - } - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Failed to verify originality: {e}") - logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") - return {"originality_score": 0.0, "error": str(e)} - - async def style_enforcer(self, text: str, style_guidelines: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: - """ - Tool: Ensures content adheres to brand voice and style guidelines. - """ - self._log_agent_operation("Enforcing style guidelines", text_length=len(text)) - - try: - if not text: - return {"compliance_score": 0.0, "issues": ["No text provided"]} - - # 1. Fetch Style Guidelines from SIF if not provided - if not style_guidelines and self.sif_service: - try: - # Use central SIF service to get robust context - seo_context = await self.sif_service.get_seo_context() - - if seo_context and "error" not in seo_context: - # Extract brand voice/style from the context - # The context structure is normalized in get_seo_context - - # Note: get_seo_context returns a flattened dict. - # We need to dig into the original structure if available, or rely on what's mapped. - # However, get_seo_context maps 'seo_audit', 'sitemap_analysis', etc. - # Brand info is usually in 'brand_analysis' col of WebsiteAnalysis, which might not be fully exposed - # in the simplified get_seo_context return. - # Let's check if we can get the full object or if we need to expand get_seo_context. - # For now, we'll try to use what's there or fall back to a specific search if needed. - - # Actually, looking at get_seo_context implementation: - # It returns 'seo_audit', 'crawl_result'. - # Brand analysis is often stored in WebsiteAnalysis.brand_analysis. - # We might need to extend get_seo_context or do a specific retrieval here. - # But wait! I saw get_seo_context implementation earlier: - # It retrieves the "full_report" from the SIF metadata. - # If the SIF index contains the full WebsiteAnalysis object, we are good. - - # Let's try to get it from the full report if we can access it, - # but get_seo_context returns a filtered dict. - - # Alternative: Use the robust retrieval logic but specifically for brand info if get_seo_context is too narrow. - # But get_seo_context logic includes "website analysis seo audit" query. - - # Let's assume for now we use the same retrieval logic but locally adapted, - # OR better, trust get_seo_context to be the single point of truth. - # If get_seo_context doesn't return brand info, we should update IT, not hack here. - # But I can't update SIFIntegrationService right now without context switch. - - # Let's stick to the previous manual search pattern BUT use the SIF service helper if possible. - # Actually, the previous code was: - # results = await self.intelligence.search("website analysis brand voice style", limit=1) - - # Let's keep it simple and robust: - # Try to get it from SIF service if possible. - # Since get_seo_context might not return brand_voice directly, let's try to see if we can use it. - - # Actually, let's use the manual search but with better error handling, - # mirroring get_seo_context's robustness (e.g. parsing). - - results = await self.intelligence.search("website analysis brand voice style", limit=1) - if results: - res = results[0] - metadata_str = res.get('object') - metadata = json.loads(metadata_str) if isinstance(metadata_str, str) else (metadata_str or res) - - if metadata.get('type') == 'website_analysis': - report = metadata.get('full_report', {}) - # Support both flat and nested structures - brand_analysis = report.get('brand_analysis') or report.get('brand_voice', {}) - if isinstance(brand_analysis, str): - # Handle case where it might be a JSON string - try: brand_analysis = json.loads(brand_analysis) - except: brand_analysis = {"brand_voice": brand_analysis} - - style_guidelines = { - "tone": brand_analysis.get('brand_voice', 'neutral') if isinstance(brand_analysis, dict) else 'neutral', - "style_patterns": report.get('style_patterns', {}), - "writing_style": report.get('writing_style', {}) - } - logger.info(f"[{self.__class__.__name__}] Retrieved style guidelines from SIF index") - except Exception as e: - logger.warning(f"[{self.__class__.__name__}] Failed to retrieve style guidelines: {e}") - - issues = [] - score = 1.0 - - # Basic Heuristic Checks (Placeholder for LLM-based style analysis) - - # 1. Tone Check (e.g., formal vs casual) - # If guidelines specify 'formal', check for contractions - tone = style_guidelines.get('tone', '').lower() if style_guidelines else '' - if 'formal' in tone or 'professional' in tone: - contractions = ["can't", "won't", "don't", "it's"] - found_contractions = [c for c in contractions if c in text.lower()] - if found_contractions: - issues.append(f"Found contractions in formal text: {', '.join(found_contractions[:3])}...") - score -= 0.1 - - # 2. Length/Sentence Structure (simple metric) - sentences = text.split('.') - avg_len = sum(len(s.split()) for s in sentences if s) / max(1, len(sentences)) - if avg_len > 25: - issues.append("Average sentence length is too high (>25 words). Consider shortening.") - score -= 0.1 - - return { - "compliance_score": max(0.0, score), - "issues": issues, - "is_compliant": score > 0.8, - "guidelines_source": "sif_index" if not style_guidelines and self.sif_service else "provided" - } - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Style enforcement failed: {e}") - return {"error": str(e)} - - async def perform_site_audit(self, website_url: str, limit: int = 10) -> Dict[str, Any]: - """ - Perform a quality audit on the user's website content. - """ - self._log_agent_operation("Performing site audit", website_url=website_url) - - try: - # 1. Retrieve recent content for the site from SIF - # We search for everything with the website_url in metadata - # Note: This depends on how data is indexed. - results = await self.intelligence.search(f"site:{website_url}", limit=limit) - - if not results: - logger.info(f"[{self.__class__.__name__}] No content found for site audit") - return {"error": "No content found"} - - audit_results = [] - total_quality = 0.0 - - for res in results: - text = res.get('text', '') - if not text or len(text) < 100: - continue - - quality = await self.assess_content_quality(text) - audit_results.append({ - "id": res.get('id'), - "title": res.get('title', 'Unknown'), - "quality": quality - }) - total_quality += quality.get('quality_score', 0.0) - - avg_quality = total_quality / len(audit_results) if audit_results else 0.0 - - report = { - "website_url": website_url, - "pages_audited": len(audit_results), - "average_quality_score": avg_quality, - "details": audit_results, - "timestamp": datetime.utcnow().isoformat() - } - - logger.info(f"[{self.__class__.__name__}] Site audit completed. Avg Quality: {avg_quality:.2f}") - return report - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Site audit failed: {e}") - return {"error": str(e)} - - async def safety_filter(self, text: str) -> Dict[str, Any]: - """ - Tool: Flags potentially harmful, offensive, or sensitive content. - """ - self._log_agent_operation("Running safety filter", text_length=len(text)) - - try: - # Basic Keyword Blocklist (Placeholder for LLM/Safety Model) - # In production, this should call a dedicated safety API (e.g., OpenAI Moderation, Llama Guard) - unsafe_keywords = [ - "hate", "kill", "murder", "attack", "destroy", # Violent - "scam", "fraud", "steal", # Illegal - "explicit", "adult" # NSFW - ] - - found_flags = [] - text_lower = text.lower() - - for keyword in unsafe_keywords: - if f" {keyword} " in text_lower: # Simple word boundary check - found_flags.append(keyword) - - is_safe = len(found_flags) == 0 - - return { - "is_safe": is_safe, - "flags": found_flags, - "safety_score": 1.0 if is_safe else 0.0, - "action": "approve" if is_safe else "flag_for_review" - } - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Safety filter failed: {e}") - return {"error": str(e)} - -class LinkGraphAgent(SIFBaseAgent): - """ - Agent for internal link suggestions, graph management, and authority analysis. - Implements the semantic link graph using SIF and GSC/Bing data. - """ - - RELEVANCE_THRESHOLD = 0.6 # Minimum relevance score for link suggestions - MAX_SUGGESTIONS = 10 # Maximum number of link suggestions - - def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, sif_service: Any = None): - super().__init__(intelligence_service, user_id, agent_type="link_graph") - self.sif_service = sif_service - - async def suggest_internal_links(self, draft: str) -> List[Dict[str, Any]]: - """Suggest internal links based on semantic proximity and authority.""" - return await self.link_suggester(draft) - - async def link_suggester(self, draft: str) -> List[Dict[str, Any]]: - """ - Tool: Suggests internal links. - Analyzes draft content and finds semantically relevant pages, boosted by authority. - """ - self._log_agent_operation("Suggesting internal links", draft_length=len(draft)) - - try: - if not self.intelligence.is_initialized(): - logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized") - return [] - - if not draft or len(draft.strip()) < 50: # Reduced threshold for testing - logger.warning(f"[{self.__class__.__name__}] Draft too short for meaningful link suggestions") - return [] - - # 1. Get Semantic Candidates - results = await self.intelligence.search(draft, limit=self.MAX_SUGGESTIONS) - - if not results: - logger.info(f"[{self.__class__.__name__}] No relevant internal pages found") - return [] - - # 2. Get Authority Data (if available) - authority_map = {} - if self.sif_service: - try: - # Fetch dashboard context to get top performing content - # Note: This relies on what's available in the SIF index/dashboard summary - dashboard_context = await self.sif_service.get_seo_dashboard_context() - - if "error" not in dashboard_context: - # Extract top queries/pages if available in summary - # Ideally, we'd have a map of URL -> Authority Score - # For now, we'll try to extract what we can - data = dashboard_context.get("dashboard_data", {}) - summary = data.get("summary", {}) - - # Example: Boost if site health is good (general confidence) - site_health = data.get("health_score", {}).get("score", 0) - - # If we had top pages in the summary, we'd use them. - # For now, we'll use a placeholder authority map or just the site health - pass - except Exception as e: - logger.warning(f"Failed to fetch authority data: {e}") - - suggestions = [] - for result in results: - relevance_score = result.get('score', 0.0) - url = result.get('id', 'unknown') - - # Apply authority boost (placeholder logic) - # In a full implementation, we'd look up 'url' in authority_map - authority_boost = 1.0 - - final_score = relevance_score * authority_boost - - if final_score >= self.RELEVANCE_THRESHOLD: - suggestion = { - "url": url, - "relevance": relevance_score, - "final_score": final_score, - "confidence": self._calculate_link_confidence(final_score), - "reason": f"Semantic similarity: {relevance_score:.3f}" - } - suggestions.append(suggestion) - logger.debug(f"[{self.__class__.__name__}] Added link suggestion: {url} (score: {final_score:.3f})") - - # Sort by final score - suggestions.sort(key=lambda x: x['final_score'], reverse=True) - - logger.info(f"[{self.__class__.__name__}] Generated {len(suggestions)} internal link suggestions") - return suggestions - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Failed to suggest internal links: {e}") - logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") - return [] - - async def graph_builder(self) -> Dict[str, Any]: - """ - Tool: Builds/Visualizes the semantic link graph. - Returns the structure of the graph (nodes and edges) for visualization or analysis. - """ - self._log_agent_operation("Building semantic link graph") - - try: - if not self.intelligence.is_initialized(): - return {"error": "Intelligence service not initialized"} - - # This is a resource-intensive operation in a real vector DB. - # Here we simulate the graph structure based on recent content or clusters. - - # 1. Get Clusters (Nodes) - clusters = await self.intelligence.cluster(min_score=0.5) - - nodes = [] - edges = [] - - for i, cluster in enumerate(clusters): - cluster_id = f"cluster_{i}" - nodes.append({ - "id": cluster_id, - "type": "topic_cluster", - "size": len(cluster) - }) - - # Add content items as nodes linked to cluster - for item_idx in cluster: - # We need to retrieve item metadata. - # txtai cluster returns indices. We might need to query by index or ID. - # For this implementation, we'll return a simplified view. - pass - - return { - "graph_stats": { - "total_clusters": len(clusters), - "total_nodes": sum(len(c) for c in clusters) - }, - "structure": "hierarchical", # vs flat - "timestamp": datetime.utcnow().isoformat() - } - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Failed to build graph: {e}") - return {"error": str(e)} - - - - async def authority_analyzer(self, target_url: Optional[str] = None) -> Dict[str, Any]: - """ - Tool: Analyzes the authority of the site or specific pages using GSC/Bing data. - """ - self._log_agent_operation("Analyzing authority", target_url=target_url) - - if not self.sif_service: - return {"error": "SIF Service unavailable for authority analysis"} - - try: - # 1. Get Dashboard Context - context = await self.sif_service.get_seo_dashboard_context() - - if "error" in context: - return context - - data = context.get("dashboard_data", {}) - summary = data.get("summary", {}) - health = data.get("health_score", {}) - - # 2. Extract Authority Metrics - authority_report = { - "domain_authority_proxy": { - "health_score": health.get("score"), - "total_clicks": summary.get("clicks"), - "avg_position": summary.get("position") - }, - "page_authority": "Page-level authority requires granular GSC data (Planned)", # Placeholder - "timestamp": datetime.utcnow().isoformat() - } - - return authority_report - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Authority analysis failed: {e}") - return {"error": str(e)} - - def _calculate_link_confidence(self, relevance_score: float) -> float: - """Calculate confidence score for a link suggestion.""" - # Simple confidence based on relevance score - return min(1.0, relevance_score * 1.5) - - async def optimize_anchor_text(self, target_url: str, context: str) -> str: - """Suggest the best anchor text for a given link based on target page context.""" - self._log_agent_operation("Optimizing anchor text", target_url=target_url, context_length=len(context)) - - try: - # In a real implementation, we would fetch the target page content via SIF - # and use an LLM to generate the anchor text. - - # Placeholder for LLM call - # if self.llm: ... - - logger.info(f"[{self.__class__.__name__}] Anchor text optimization stub completed") - return "relevant anchor text" # Placeholder - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Failed to optimize anchor text: {e}") - logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") - return "click here" # Fallback anchor text - -class CitationExpert(SIFBaseAgent): - """ - Agent for fact-checking, citation generation, and evidence verification. - """ - - EVIDENCE_THRESHOLD = 0.7 # Minimum relevance score for evidence - MAX_EVIDENCE = 5 # Maximum number of evidence pieces to return - - async def fact_checker(self, claim: str) -> List[Dict[str, Any]]: - """ - Tool: Verifies facts against trusted research data. - Returns supporting or contradicting evidence. - """ - return await self.verify_facts(claim) - - async def citation_finder(self, topic: str) -> List[Dict[str, Any]]: - """ - Tool: Suggests authoritative citations for a given topic. - """ - self._log_agent_operation("Finding citations", topic=topic) - - try: - if not self.intelligence.is_initialized(): - return [] - - # Search for highly relevant content - results = await self.intelligence.search(topic, limit=self.MAX_EVIDENCE) - - citations = [] - for result in results: - relevance = result.get('score', 0.0) - if relevance > 0.6: - citations.append({ - "source": result.get('id'), - "title": result.get('text', '')[:100] + "...", - "relevance": relevance, - "citation_text": f"Source: {result.get('id')} (Relevance: {relevance:.2f})" - }) - - return citations - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Citation finder failed: {e}") - return [] - - async def claim_verifier(self, content: str) -> Dict[str, Any]: - """ - Tool: Detects unsupported statements and hallucinations. - """ - self._log_agent_operation("Verifying claims in content", content_length=len(content)) - - # 1. Extract potential claims (heuristic: numbers, 'research shows', etc.) - # This is a simplified extraction. A real implementation would use NLP/LLM. - claims = [] - sentences = content.split('.') - for sent in sentences: - if any(char.isdigit() for char in sent) or "show" in sent.lower() or "study" in sent.lower(): - if len(sent.strip()) > 20: - claims.append(sent.strip()) - - if not claims: - return {"status": "no_claims_detected", "verified_claims": []} - - verified_results = [] - for claim in claims[:5]: # Limit to top 5 claims for performance - evidence = await self.verify_facts(claim) - status = "supported" if evidence else "unsupported" - verified_results.append({ - "claim": claim, - "status": status, - "evidence_count": len(evidence), - "top_evidence": evidence[0]['source'] if evidence else None - }) - - return { - "status": "verification_complete", - "total_claims": len(claims), - "verified_claims": verified_results, - "unsupported_count": len([c for c in verified_results if c['status'] == 'unsupported']), - "timestamp": datetime.utcnow().isoformat() - } - - async def verify_facts(self, claim: str) -> List[Dict[str, Any]]: - """Find supporting or contradicting evidence in the indexed research.""" - self._log_agent_operation("Verifying facts", claim_length=len(claim)) - - try: - if not self.intelligence.is_initialized(): - logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized") - return [] - - if not claim or len(claim.strip()) < 20: - logger.warning(f"[{self.__class__.__name__}] Claim too short for meaningful verification") - return [] - - results = await self.intelligence.search(claim, limit=self.MAX_EVIDENCE) - - if not results: - logger.info(f"[{self.__class__.__name__}] No evidence found for claim") - return [] - - evidence = [] - for result in results: - relevance_score = result.get('score', 0.0) - - if relevance_score >= self.EVIDENCE_THRESHOLD: - evidence_piece = { - "source": result.get('id', 'unknown'), - "relevance": relevance_score, - "confidence": self._calculate_evidence_confidence(relevance_score), - "type": "supporting" if relevance_score > 0.8 else "related", - "excerpt": result.get('text', '')[:200] + "..." if len(result.get('text', '')) > 200 else result.get('text', '') - } - evidence.append(evidence_piece) - logger.debug(f"[{self.__class__.__name__}] Found evidence: {evidence_piece['source']} (score: {relevance_score:.3f})") - - logger.info(f"[{self.__class__.__name__}] Found {len(evidence)} pieces of evidence for claim") - return evidence - - except Exception as e: - logger.error(f"[{self.__class__.__name__}] Failed to verify facts: {e}") - logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") - return [] - - def _calculate_evidence_confidence(self, relevance_score: float) -> float: - """Calculate confidence score for evidence.""" - # Simple confidence based on relevance score - return min(1.0, relevance_score * 1.2) - -""" -Specialized ALwrity Autonomous Agents -Defines specific agent implementations for Content Strategy, Competitor Response, -SEO Optimization, and Social Amplification. -""" -import json -import logging -import asyncio -from typing import Dict, Any, List, Optional -from datetime import datetime - -# txtai imports -try: - from txtai import Agent, LLM - TXTAI_AVAILABLE = True -except ImportError: - TXTAI_AVAILABLE = False - logging.warning("txtai not available, using fallback implementation") - -from utils.logger_utils import get_service_logger -from services.intelligence.agents.core_agent_framework import BaseALwrityAgent, AgentAction -from services.seo_tools.content_strategy_service import ContentStrategyService - -# Import SIF Integration for real tool capabilities -try: - from services.intelligence.sif_integration import SIFIntegrationService - SIF_AVAILABLE = True -except ImportError: - SIF_AVAILABLE = False - -logger = get_service_logger(__name__) - -class ContentStrategyAgent(BaseALwrityAgent): - """ - Agent responsible for content strategy, gap analysis, and optimization. - """ - - def __init__(self, user_id: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None): - super().__init__(user_id, "content_strategist", model_name, llm) - self.sif_service = None - self.content_strategy_service = ContentStrategyService() - if SIF_AVAILABLE: - try: - self.sif_service = SIFIntegrationService(user_id) - except Exception as e: - logger.warning(f"Failed to initialize SIF service for ContentStrategyAgent: {e}") - - async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]: - """Propose GENERATE pillar tasks.""" - proposals = [] - - # 1. Content Gap Analysis - proposals.append(TaskProposal( - title="Analyze Content Gaps", - description="Identify missing topics in your strategy compared to competitors.", - pillar_id="generate", - priority="high", - estimated_time=30, - source_agent="ContentStrategyAgent", - reasoning="Regular gap analysis ensures competitive relevance.", - action_type="navigate", - action_url="/content-planning-dashboard" - )) - - # 2. Draft New Content - proposals.append(TaskProposal( - title="Draft New Blog Post", - description="Create a new article targeting your primary keywords.", - pillar_id="generate", - priority="medium", - estimated_time=45, - source_agent="ContentStrategyAgent", - reasoning="Maintain publishing consistency.", - action_type="navigate", - action_url="/blog-writer" - )) - - return proposals - - def _create_txtai_agent(self) -> Agent: - """Create Content Strategy Agent using txtai native framework""" - if not TXTAI_AVAILABLE: - return None - - return Agent( - llm=self.llm, - tools=[ - { - "name": "content_analyzer", - "description": "Analyzes content performance and engagement metrics", - "target": self._content_analyzer_tool - }, - { - "name": "semantic_gap_detector", - "description": "Identifies content gaps using semantic analysis", - "target": self._semantic_gap_detector_tool - }, - { - "name": "content_optimizer", - "description": "Optimizes content for better performance", - "target": self._content_optimizer_tool - }, - { - "name": "performance_tracker", - "description": "Tracks content performance over time", - "target": self._content_performance_tracker_tool - }, - { - "name": "sitemap_analyzer", - "description": "Analyzes website structure and publishing velocity via sitemap", - "target": self._sitemap_analyzer_tool - }, - { - "name": "gsc_low_ctr_queries", - "description": "Returns low-CTR queries with evidence from cached GSC metrics", - "target": self._cs_gsc_low_ctr_queries_tool - }, - { - "name": "gsc_striking_distance_queries", - "description": "Returns striking-distance queries (positions ~8–20) with evidence", - "target": self._cs_gsc_striking_distance_tool - }, - { - "name": "gsc_declining_queries", - "description": "Returns period-over-period declining queries with evidence", - "target": self._cs_gsc_declining_queries_tool - }, - { - "name": "gsc_low_ctr_pages", - "description": "Returns low-CTR pages with top contributing queries", - "target": self._cs_gsc_low_ctr_pages_tool - }, - { - "name": "gsc_cannibalization_candidates", - "description": "Returns query→multiple-pages cannibalization candidates with target recommendation", - "target": self._cs_gsc_cannibalization_candidates_tool - }, - { - "name": "default_content_gsc_plan", - "description": "Runs a default first-pass plan using GSC signals (titles/meta, consolidation, refreshes)", - "target": self._default_content_gsc_plan_tool - }, - ], - max_iterations=8, - system=self.get_effective_system_prompt(f"""You are the Content Strategy Agent for ALwrity user {self.user_id}. - - Your mission is to analyze content performance, identify optimization opportunities, - and execute content improvements autonomously. - - Focus on: - - Content gap identification (semantic and structural) - - Topic cluster optimization - - SEO strategy adaptation - - Performance-based content improvements - - Use semantic analysis (SIF) and sitemap analysis to understand content context. - Always prioritize user goals and maintain brand consistency. - - In your first pass, call 'default_content_gsc_plan' to ground your actions on live GSC signals.""" - ) - ) - - # Tool Implementations - - async def _cs_fetch_gsc_analytics(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> Dict[str, Any]: - svc = PlatformAnalyticsService() - data = await svc.get_comprehensive_analytics(self.user_id, platforms=["gsc"], start_date=start_date, end_date=end_date) - gsc = data.get("gsc") - if not gsc or gsc.status != "success": - err = getattr(gsc, "error_message", None) if gsc else "No data" - raise RuntimeError(f"GSC analytics unavailable: {err}") - return {"metrics": gsc.metrics, "date_range": gsc.date_range} - - async def _cs_gsc_low_ctr_queries_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - limit = int(context.get("limit", 10)); min_impr = int(context.get("min_impressions", 100)); min_clicks = int(context.get("min_clicks", 10)); ctr_threshold = float(context.get("ctr_threshold", 1.5)) - start_date = context.get("start_date"); end_date = context.get("end_date") - try: - result = await self._cs_fetch_gsc_analytics(start_date, end_date) - tq = result["metrics"].get("top_queries", []) or [] - items = [ - {"query": r.get("query"), "clicks": r.get("clicks", 0), "impressions": r.get("impressions", 0), "ctr": r.get("ctr", 0.0), "position": r.get("position")} - for r in tq - if (r.get("impressions", 0) >= min_impr and r.get("clicks", 0) >= min_clicks and float(r.get("ctr", 0.0)) < ctr_threshold) - ] - items.sort(key=lambda x: (x.get("impressions", 0), -x.get("ctr", 100.0)), reverse=True) - return {"items": items[:limit], "range": result["date_range"], "source": "gsc_cache"} - except Exception as e: - logger.error(f"cs low_ctr_queries failed: {e}"); return {"error": str(e)} - - async def _cs_gsc_striking_distance_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - limit = int(context.get("limit", 10)); min_impr = int(context.get("min_impressions", 100)); start_date = context.get("start_date"); end_date = context.get("end_date") - try: - result = await self._cs_fetch_gsc_analytics(start_date, end_date) - tq = result["metrics"].get("top_queries", []) or [] - items = [ - {"query": r.get("query"), "clicks": r.get("clicks", 0), "impressions": r.get("impressions", 0), "ctr": r.get("ctr", 0.0), "position": r.get("position")} - for r in tq - if (r.get("impressions", 0) >= min_impr and r.get("position") is not None and 8.0 <= float(r.get("position")) <= 20.0) - ] - items.sort(key=lambda x: (x.get("position") if x.get("position") is not None else 999, -x.get("impressions", 0))) - return {"items": items[:limit], "range": result["date_range"], "source": "gsc_cache"} - except Exception as e: - logger.error(f"cs striking_distance failed: {e}"); return {"error": str(e)} - - async def _cs_gsc_declining_queries_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - limit = int(context.get("limit", 10)); min_prev_clicks = int(context.get("min_prev_clicks", 10)); min_drop_pct = float(context.get("min_drop_pct", 30.0)) - start_date = context.get("start_date"); end_date = context.get("end_date") - try: - curr = await self._cs_fetch_gsc_analytics(start_date, end_date) - curr_range = curr["date_range"]; s = curr_range.get("start"); e = curr_range.get("end") - from datetime import datetime, timedelta; fmt = "%Y-%m-%d" - sd = datetime.strptime(s, fmt) if s else datetime.utcnow() - timedelta(days=30); ed = datetime.strptime(e, fmt) if e else datetime.utcnow() - days = max((ed - sd).days + 1, 1); prev_end = sd - timedelta(days=1); prev_start = prev_end - timedelta(days=days - 1) - prev = await self._cs_fetch_gsc_analytics(prev_start.strftime(fmt), prev_end.strftime(fmt)) - curr_queries = {r.get("query"): r for r in (curr["metrics"].get("top_queries", []) or [])} - prev_queries = {r.get("query"): r for r in (prev["metrics"].get("top_queries", []) or [])} - items = [] - for q, prev_row in prev_queries.items(): - curr_row = curr_queries.get(q); - if not curr_row: continue - prev_clicks = int(prev_row.get("clicks", 0) or 0); curr_clicks = int(curr_row.get("clicks", 0) or 0) - if prev_clicks >= min_prev_clicks and curr_clicks < prev_clicks: - drop_pct = ((prev_clicks - curr_clicks) / prev_clicks) * 100.0 - if drop_pct >= min_drop_pct: - items.append({"query": q, "prev_clicks": prev_clicks, "curr_clicks": curr_clicks, "drop_pct": round(drop_pct, 2)}) - items.sort(key=lambda x: (x.get("drop_pct", 0), x.get("prev_clicks", 0)), reverse=True) - return {"items": items[:limit], "range": curr_range, "previous_range": prev["date_range"], "source": "gsc_cache"} - except Exception as e: - logger.error(f"cs declining_queries failed: {e}"); return {"error": str(e)} - - async def _cs_gsc_low_ctr_pages_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - limit = int(context.get("limit", 10)); min_impr = int(context.get("min_impressions", 200)); ctr_threshold = float(context.get("ctr_threshold", 1.5)) - start_date = context.get("start_date"); end_date = context.get("end_date") - try: - result = await self._cs_fetch_gsc_analytics(start_date, end_date) - tp = result["metrics"].get("top_pages", []) or [] - items = [] - for r in tp: - if (r.get("impressions", 0) >= min_impr and float(r.get("ctr", 0.0)) < ctr_threshold): - items.append({"page": r.get("page"), "clicks": r.get("clicks", 0), "impressions": r.get("impressions", 0), "ctr": r.get("ctr", 0.0), "position": r.get("position"), "evidence_queries": r.get("queries", [])[:5]}) - items.sort(key=lambda x: (x.get("impressions", 0), -x.get("ctr", 100.0)), reverse=True) - return {"items": items[:limit], "range": result["date_range"], "source": "gsc_cache"} - except Exception as e: - logger.error(f"cs low_ctr_pages failed: {e}"); return {"error": str(e)} - - async def _cs_gsc_cannibalization_candidates_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - limit = int(context.get("limit", 10)); start_date = context.get("start_date"); end_date = context.get("end_date") - try: - result = await self._cs_fetch_gsc_analytics(start_date, end_date) - candidates = result["metrics"].get("cannibalization", []) or [] - return {"items": candidates[:limit], "range": result["date_range"], "source": "gsc_cache"} - except Exception as e: - logger.error(f"cs cannibalization_candidates failed: {e}"); return {"error": str(e)} - - async def _default_content_gsc_plan_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - start_date = context.get("start_date"); end_date = context.get("end_date") - try: - low_ctr_pages = await self._cs_gsc_low_ctr_pages_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) - cannibals = await self._cs_gsc_cannibalization_candidates_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) - striking = await self._cs_gsc_striking_distance_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) - declining = await self._cs_gsc_declining_queries_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) - - actions = [] - for p in low_ctr_pages.get("items", []): - actions.append({ - "type": "improve_titles_meta", - "target": p.get("page"), - "reason": f"Low CTR {p.get('ctr')}% with {p.get('impressions')} impressions", - "evidence": p.get("evidence_queries", []) - }) - for c in cannibals.get("items", []): - actions.append({ - "type": "consolidate/internal_link", - "target": c.get("recommended_target_page"), - "reason": f"Cannibalization on query '{c.get('query')}'", - "pages": c.get("pages", []) - }) - for q in striking.get("items", []): - actions.append({ - "type": "refresh_content", - "target": "query", - "query": q.get("query"), - "reason": f"Striking distance at position {q.get('position')} with {q.get('impressions')} impressions" - }) - for q in declining.get("items", []): - actions.append({ - "type": "refresh_content", - "target": "query", - "query": q.get("query"), - "reason": f"Clicks decline {q.get('prev_clicks')}→{q.get('curr_clicks')} ({q.get('drop_pct')}%)" - }) - - return { - "plan_name": "Default Content Plan from GSC", - "range": {"current": {"start": start_date, "end": end_date}}, - "actions": actions, - "source": "gsc_cache", - "timestamp": datetime.utcnow().isoformat() - } - except Exception as e: - logger.error(f"default_content_gsc_plan failed: {e}") - return {"error": str(e)} - - async def _sitemap_analyzer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """Sitemap analysis tool using ContentStrategyService""" - website_url = context.get('website_url') - competitors = context.get('competitors', []) - - if not website_url: - return {"error": "Website URL required for sitemap analysis"} - - try: - result = await self.content_strategy_service.analyze_content_strategy( - website_url=website_url, - competitors=competitors, - user_id=self.user_id - ) - return { - "sitemap_insights": result.get("deterministic_insights", {}), - "ai_strategy": result.get("ai_strategy", {}), - "timestamp": datetime.utcnow().isoformat() - } - except Exception as e: - logger.error(f"Sitemap analysis failed: {e}") - return {"error": str(e)} - - async def _content_analyzer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """ - Content analysis tool with GSC integration. - Analyzes content performance using SIF insights and Google Search Console data. - """ - website_data = context.get('website_data', {}) - - # 1. SIF Semantic Analysis - sif_insights = {} - if self.sif_service: - try: - result = await self.sif_service.get_semantic_insights(website_data) - sif_insights = result.get('insights', {}) - except Exception as e: - logger.error(f"SIF content analysis failed: {e}") - - # 2. GSC Data Integration (Mock/Placeholder as per Phase 3A.2) - # In a real implementation, this would call a GSCService - gsc_data = { - "clicks": 1250, - "impressions": 45000, - "ctr": 2.8, - "position": 14.5, - "top_queries": [ - {"query": "ai content strategy", "clicks": 150, "position": 3.2}, - {"query": "seo automation", "clicks": 120, "position": 4.1} - ], - "underperforming_pages": [ - {"url": "/blog/old-post-1", "issue": "High impressions, low CTR"}, - {"url": "/blog/weak-content", "issue": "Declining traffic"} - ] - } - - # 3. Correlate Semantic Topics with GSC Performance - content_gaps = [] - if sif_insights and gsc_data: - # Example correlation logic - semantic_topics = sif_insights.get('content_pillars', []) - gsc_queries = [q['query'] for q in gsc_data['top_queries']] - - # Simple set difference to find topics with no traffic - for topic in semantic_topics: - if not any(topic.lower() in q.lower() for q in gsc_queries): - content_gaps.append(f"Topic '{topic}' has content but low search visibility") - - return { - "content_analysis": "Completed via SIF + GSC Integration", - "sif_insights": sif_insights, - "gsc_performance": gsc_data, - "identified_gaps": content_gaps, - "strategic_recommendations": sif_insights.get('strategic_recommendations', []) + ["Optimize meta descriptions for underperforming pages"], - "timestamp": datetime.utcnow().isoformat() - } - - async def _content_optimizer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """ - Content optimization tool using LLM-based rewriting and semantic analysis. - Generates specific diffs/rewrites for content. - """ - content = context.get('content') - target_keywords = context.get('target_keywords', []) - focus_topic = context.get('focus_topic') - optimization_goal = context.get('goal', 'readability') # readability, seo, conversion - - if not content: - return {"error": "No content provided for optimization"} - - try: - # System prompt optimized for specific rewrites - system_prompt = f"""You are an expert Content Editor. - Task: Optimize the following text for '{focus_topic}' with goal: {optimization_goal}. - Keywords to include: {', '.join(target_keywords)}. - - Return a JSON object with: - 1. "original_snippet": A short snippet of the original text. - 2. "optimized_version": The fully rewritten version. - 3. "changes_explained": A list of specific changes made (e.g., "Added keyword 'X' in first sentence"). - 4. "diff_summary": A brief summary of why this version is better. - - Maintain the original meaning and tone. - """ - - if self.llm: - # We assume the LLM returns JSON-like text or we parse it - response = await self._generate_llm_response(f"{system_prompt}\n\nText to rewrite:\n{content}") - - # Simple parsing fallback if LLM returns raw text - if isinstance(response, str) and not response.strip().startswith("{"): - optimized_content = response - changes = ["Rewrote for clarity", "Integrated keywords"] - else: - # Try to parse JSON if model is good - try: - import json - data = json.loads(response) - optimized_content = data.get("optimized_version", response) - changes = data.get("changes_explained", []) - except: - optimized_content = response - changes = ["Optimization applied"] - else: - optimized_content = f"[Mock Rewrite]: Optimized '{content[:30]}...' for {focus_topic}" - changes = ["Mock optimization applied"] - - return { - "original_content_snippet": content[:50] + "...", - "optimized_content": optimized_content, - "changes_made": changes, - "expected_impact": "Higher relevance score and better readability", - "timestamp": datetime.utcnow().isoformat() - } - except Exception as e: - logger.error(f"Content optimization failed: {e}") - return {"error": str(e)} - - async def _content_performance_tracker_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """ - Content performance tracking tool. - Persists metrics to monitor content health over time. - """ - content_id = context.get('content_id') or context.get('url') - metrics = context.get('metrics', {}) - - if not content_id: - return {"error": "Content ID or URL required for tracking"} - - # Simulate persistence (In real app, save to DB table 'content_performance_history') - # We can use the AgentPerformanceMonitor for generic metrics, but this is content-specific. - - tracking_record = { - "content_id": content_id, - "date": datetime.utcnow().date().isoformat(), - "metrics": metrics, - "health_score": self._calculate_content_health(metrics) - } - - # Log it for now as "persistence" - logger.info(f"Persisting content performance for {content_id}: {tracking_record}") - - return { - "status": "recorded", - "tracking_record": tracking_record, - "trend": "stable", # Would calculate based on history - "timestamp": datetime.utcnow().isoformat() - } - - def _calculate_content_health(self, metrics: Dict[str, Any]) -> float: - """Calculate a 0-100 health score based on metrics""" - # Simple heuristic - views = metrics.get('views', 0) - engagement = metrics.get('engagement_rate', 0) - - score = min(100, (views / 1000) * 10 + (engagement * 100)) - return round(score, 2) - - -class CompetitorResponseAgent(BaseALwrityAgent): - """ - Agent responsible for monitoring competitors and generating counter-strategies. - """ - - def __init__(self, user_id: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None): - super().__init__(user_id, "competitor_analyst", model_name, llm) - - self.sif_service = None - if SIF_AVAILABLE: - try: - self.sif_service = SIFIntegrationService(user_id) - except Exception as e: - logger.warning(f"Failed to initialize SIF service for CompetitorResponseAgent: {e}") - - async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]: - """Propose REMARKET pillar tasks.""" - proposals = [] - - # 1. Competitor Monitoring - proposals.append(TaskProposal( - title="Monitor Competitor Activity", - description="Check for new moves from your key competitors.", - pillar_id="remarket", - priority="medium", - estimated_time=15, - source_agent="CompetitorResponseAgent", - reasoning="Stay ahead of market changes.", - action_type="navigate", - action_url="/seo-dashboard" - )) - - return proposals - - def _create_txtai_agent(self) -> Agent: - """Create Competitor Response Agent using txtai native framework""" - if not TXTAI_AVAILABLE: - return None - - return Agent( - llm=self.llm, - tools=[ - { - "name": "competitor_monitor", - "description": "Monitors competitor content and strategy changes via SIF", - "target": self._competitor_monitor_tool - }, - { - "name": "threat_analyzer", - "description": "Analyzes competitive threats and opportunities based on SIF data", - "target": self._threat_analyzer_tool - }, - { - "name": "response_generator", - "description": "Generates counter-strategy recommendations", - "target": self._response_generator_tool - }, - { - "name": "strategy_executor", - "description": "Executes competitive response strategies", - "target": self._strategy_executor_tool - } - ], - max_iterations=12, - system=self.get_effective_system_prompt(f"""You are the Competitor Response Agent for ALwrity user {self.user_id}. - - Your mission is to monitor competitor activities, assess competitive threats, - and generate counter-strategies autonomously using SIF insights. - - Responsibilities: - - Real-time competitor content monitoring via SIF - - Competitive threat assessment - - Counter-strategy generation - - Rapid response deployment - - Use semantic analysis to understand competitor positioning and identify gaps. - Respond quickly to competitive threats while maintaining strategic advantage.""" - ) - ) - - # Tool Implementations - - async def _competitor_monitor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """ - Competitor monitoring tool that retrieves data via SIF. - Expects 'competitor_url' (optional) in context to filter. - """ - competitor_url = context.get('competitor_url') - - if not self.sif_service: - return {"error": "SIF Service unavailable, cannot retrieve competitor data."} - - try: - logger.info(f"Retrieving Competitor data via SIF for user {self.user_id}") - result = await self.sif_service.get_competitor_context(competitor_url) - - if "error" in result and result.get("source") == "empty": - return {"error": "No competitor data found. Please complete onboarding Step 3."} - - competitors = result.get("competitors", []) - changes = [] - - for comp in competitors: - # In a real scenario, we would compare with previous snapshots. - # Here we extract highlights from the current analysis. - summary = comp.get("summary", "") - highlights = comp.get("highlights", []) - changes.append({ - "url": comp.get("competitor_url"), # Or however it's stored in analysis_data - "summary_snippet": summary[:100] + "...", - "highlights": highlights[:3] - }) - - return { - "competitor_changes": changes, - "data_source": "sif_index", - "count": len(competitors), - "timestamp": datetime.utcnow().isoformat() - } - except Exception as e: - logger.error(f"Competitor monitoring failed: {e}") - return {"error": str(e)} - - async def _threat_analyzer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """ - Threat analysis tool using SIF data. - """ - # Get data from monitor tool or context - competitor_data = context.get('competitor_changes', []) - - # If not provided, fetch it - if not competitor_data and self.sif_service: - monitor_result = await self._competitor_monitor_tool(context) - competitor_data = monitor_result.get("competitor_changes", []) - - if not competitor_data: - return {"threat_assessment": "No data available for analysis", "level": "unknown"} - - # Simple rule-based or LLM-based analysis - # For now, we simulate a threat assessment based on highlights - threats = [] - overall_level = "low" - - for comp in competitor_data: - highlights = comp.get("highlights", []) - # Heuristic: If highlights mention "launch", "new", "pricing" -> Higher threat - level = "low" - risk_factors = [] - - full_text = " ".join(highlights).lower() - if "new feature" in full_text or "launch" in full_text: - level = "high" - risk_factors.append("New Product Launch") - elif "pricing" in full_text: - level = "medium" - risk_factors.append("Pricing Change") - - if level == "high": overall_level = "high" - elif level == "medium" and overall_level != "high": overall_level = "medium" - - threats.append({ - "competitor": comp.get("url"), - "level": level, - "risk_factors": risk_factors - }) - - return { - "threat_assessment": f"{overall_level.title()} threat level detected.", - "threat_level": overall_level, - "details": threats, - "timestamp": datetime.utcnow().isoformat() - } - - async def _response_generator_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """Response generation tool""" - threat_level = context.get("threat_level", "low") - threat_details = context.get("details", []) - - strategies = [] - if threat_level == "high": - strategies = ["Launch counter-campaign immediately", "Highlight USP differentiation"] - elif threat_level == "medium": - strategies = ["Monitor closely", "Prepare comparison content"] - else: - strategies = ["Continue current strategy", "Look for gap opportunities"] - - return { - "counter_strategies": strategies, - "priority": "high" if threat_level == "high" else "medium", - "timestamp": datetime.utcnow().isoformat() - } - - async def _strategy_executor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """Strategy execution tool""" - # In a real agent, this might trigger the Content Agent to write a post. - strategies = context.get("counter_strategies", []) - - execution_log = [] - for strategy in strategies: - execution_log.append(f"Scheduled: {strategy}") - - return { - "execution_status": "completed", - "actions_taken": execution_log, - "timestamp": datetime.utcnow().isoformat() - } - - -class SEOOptimizationAgent(BaseALwrityAgent): - """ - Agent responsible for technical SEO, keyword strategy, and performance optimization. - """ - - def __init__(self, user_id: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None): - super().__init__(user_id, "seo_specialist", model_name, llm) - - self.sif_service = None - if SIF_AVAILABLE: - try: - self.sif_service = SIFIntegrationService(user_id) - except Exception as e: - logger.warning(f"Failed to initialize SIF service for SEOOptimizationAgent: {e}") - - async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]: - """Propose ANALYZE pillar tasks.""" - proposals = [] - - # 1. Technical Audit - proposals.append(TaskProposal( - title="Review SEO Health", - description="Check for critical technical issues affecting your search visibility.", - pillar_id="analyze", - priority="high", - estimated_time=20, - source_agent="SEOOptimizationAgent", - reasoning="Regular health checks prevent traffic drops.", - action_type="navigate", - action_url="/seo-dashboard" - )) - - # 2. Keyword Opportunities - proposals.append(TaskProposal( - title="Optimize Underperforming Keywords", - description="Identify keywords where you rank on page 2 and optimize content to boost them.", - pillar_id="analyze", - priority="medium", - estimated_time=40, - source_agent="SEOOptimizationAgent", - reasoning="Low-hanging fruit for traffic growth.", - action_type="navigate", - action_url="/seo-dashboard" - )) - - return proposals - - def _create_txtai_agent(self) -> Agent: - """Create SEO Optimization Agent using txtai native framework""" - if not TXTAI_AVAILABLE: - return None - - return Agent( - llm=self.llm, - tools=[ - { - "name": "seo_auditor", - "description": "Performs comprehensive SEO audits", - "target": self._seo_auditor_tool - }, - { - "name": "issue_prioritizer", - "description": "Prioritizes SEO issues by impact and effort", - "target": self._issue_prioritizer_tool - }, - { - "name": "auto_fix_executor", - "description": "Automatically fixes high-impact SEO issues", - "target": self._auto_fix_executor_tool - }, - { - "name": "strategy_generator", - "description": "Generates SEO improvement strategies", - "target": self._strategy_generator_tool - }, - { - "name": "query_seo_knowledge_base", - "description": "Queries the SIF knowledge base for SEO dashboard data, GSC/Bing metrics, and semantic insights", - "target": self._query_seo_knowledge_base_tool - }, - { - "name": "gsc_low_ctr_queries", - "description": "Returns low-CTR queries with evidence from cached GSC metrics", - "target": self._gsc_low_ctr_queries_tool - }, - { - "name": "gsc_striking_distance_queries", - "description": "Returns striking-distance queries (positions ~8–20) with evidence", - "target": self._gsc_striking_distance_tool - }, - { - "name": "gsc_declining_queries", - "description": "Returns period-over-period declining queries with evidence", - "target": self._gsc_declining_queries_tool - }, - { - "name": "gsc_low_ctr_pages", - "description": "Returns low-CTR pages with top contributing queries", - "target": self._gsc_low_ctr_pages_tool - }, - { - "name": "gsc_cannibalization_candidates", - "description": "Returns query→multiple-pages cannibalization candidates with target recommendation", - "target": self._gsc_cannibalization_candidates_tool - }, - { - "name": "default_seo_gsc_plan", - "description": "Runs a default first-pass SEO plan using GSC signals (titles/meta, consolidation, refreshes)", - "target": self._default_seo_gsc_plan_tool - }, - ], - max_iterations=15, - system=self.get_effective_system_prompt(f"""You are the SEO Optimization Agent for ALwrity user {self.user_id}. - - Your mission is to perform continuous SEO audits, prioritize fixes by impact, - and execute optimizations autonomously. - - Capabilities: - - Technical SEO issue detection and fixing - - Keyword strategy dynamic adjustment - - SERP position optimization - - Backlink opportunity identification - - Deep semantic search of SEO data (GSC, Bing, Audits) - - Focus on high-impact, low-effort optimizations first. - In your first pass, call 'default_seo_gsc_plan' to ground your actions on live GSC signals. - Always maintain SEO best practices and user experience.""" - ) - ) - - # Tool Implementations - - async def _query_seo_knowledge_base_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """ - Queries the SIF knowledge base for SEO insights. - Combines website analysis, competitor data, and SEO dashboard metrics. - """ - query = context.get('query') - website_url = context.get('website_url') - - if not query: - return {"error": "Query required for knowledge base search"} - - if not self.sif_service: - return {"error": "SIF Service unavailable"} - - try: - logger.info(f"Querying SEO knowledge base: {query}") - - # 1. Search General Context (Website Analysis) - seo_context = await self.sif_service.get_seo_context(website_url) - - # 2. Search Dashboard Context (GSC/Bing) - dashboard_context = await self.sif_service.get_seo_dashboard_context() - - # 3. Perform specific semantic search for the query - search_results = await self.sif_service.intelligence_service.search(query, limit=3) - - # Combine all contexts - combined_context = { - "query": query, - "seo_audit_context": seo_context.get("seo_audit", {}), - "dashboard_metrics": dashboard_context.get("dashboard_data", {}).get("summary", {}), - "dashboard_insights": dashboard_context.get("dashboard_data", {}).get("ai_insights", []), - "semantic_search_results": [r.get('text', '') for r in search_results], - "timestamp": datetime.utcnow().isoformat() - } - - return combined_context - - except Exception as e: - logger.error(f"SEO knowledge base query failed: {e}") - return {"error": str(e)} - - async def _seo_auditor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """ - SEO audit tool that retrieves existing SEO data via SIF. - Expects 'website_url' in context. - """ - website_url = context.get('website_url') - - # Lightweight Crawler (Fallback/Supplement) - crawler_data = {} - try: - # We import here to avoid dependency issues if not installed - import aiohttp - from bs4 import BeautifulSoup - - async with aiohttp.ClientSession() as session: - async with session.get(website_url, timeout=10) as resp: - if resp.status == 200: - html = await resp.text() - soup = BeautifulSoup(html, 'html.parser') - - # Basic Checks - title = soup.title.string if soup.title else None - desc = soup.find('meta', attrs={'name': 'description'}) - desc_content = desc['content'] if desc else None - h1s = [h.get_text().strip() for h in soup.find_all('h1')] - links = [a.get('href') for a in soup.find_all('a', href=True)] - - # Enhanced Image Analysis for Auto-fix - images_missing_alt = [] - for img in soup.find_all('img'): - if not img.get('alt'): - # Get surrounding context (parent text) - parent_text = img.parent.get_text().strip()[:200] if img.parent else "" - images_missing_alt.append({ - "src": img.get('src', ''), - "context": parent_text - }) - - crawler_data = { - "title_tag": title, - "meta_description": desc_content, - "h1_count": len(h1s), - "h1_content": h1s, - "internal_links_count": len([l for l in links if l.startswith('/') or website_url in l]), - "images_missing_alt_count": len(images_missing_alt), - "images_missing_alt_details": images_missing_alt - } - except Exception as e: - logger.warning(f"Lightweight crawler failed: {e}") - - if not self.sif_service: - # If SIF is down, return crawler data if available - if crawler_data: - return { - "audit_summary": {"technical_health": crawler_data}, - "data_source": "lightweight_crawler", - "timestamp": datetime.utcnow().isoformat() - } - return {"error": "SIF Service unavailable, cannot retrieve SEO data."} - - try: - logger.info(f"Retrieving SEO data via SIF for {website_url}") - result = await self.sif_service.get_seo_context(website_url) - - if "error" in result and result.get("source") == "empty": - # Fallback to crawler data if SIF is empty - if crawler_data: - return { - "audit_summary": {"technical_health": crawler_data, "note": "SIF empty, using live crawl"}, - "data_source": "lightweight_crawler_fallback", - "timestamp": datetime.utcnow().isoformat() - } - return {"error": "No SEO data found. Please ensure onboarding is complete or wait for scheduled analysis."} - - # Format the data for the agent - audit_report = { - "technical_health": result.get("seo_audit", {}).get("technical_issues", []), - "live_crawl_check": crawler_data, # Enrich with live data - "crawl_stats": result.get("crawl_result", {}).get("crawl_summary", {}), - "sitemap_status": "Available" if result.get("sitemap_analysis") else "Unknown", - "core_web_vitals": result.get("pagespeed_data", {}).get("core_web_vitals", {}), - "timestamp": result.get("analysis_date", datetime.utcnow().isoformat()) - } - - return { - "audit_summary": audit_report, - "data_source": "database_via_sif", - "full_context": result # Provide full context for deep analysis if needed - } - - except Exception as e: - logger.error(f"SEO audit retrieval failed: {e}") - return {"error": str(e)} - - async def _issue_prioritizer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """Issue prioritization tool""" - # In a real scenario, this would take the raw_results from the auditor and rank them. - # For now, we simulate this based on the audit summary if available. - - audit_summary = context.get('audit_summary', {}) - issues = [] - - # Extract issues from audit summary if available - if audit_summary: - tech_issues = audit_summary.get('technical_health', []) - # Handle SIF structured issues - if isinstance(tech_issues, list): - for issue in tech_issues: - if isinstance(issue, dict): - issues.append({"issue": issue.get('type', 'Unknown Issue'), "impact": "High" if issue.get('severity') == "High" else "Medium"}) - - # Handle Live Crawl issues - live_crawl = audit_summary.get('live_crawl_check', {}) - if live_crawl: - if not live_crawl.get('title_tag'): - issues.append({"issue": "Missing Title Tag", "impact": "Critical"}) - if not live_crawl.get('meta_description'): - issues.append({"issue": "Missing Meta Description", "impact": "High"}) - if live_crawl.get('h1_count', 0) == 0: - issues.append({"issue": "Missing H1 Tag", "impact": "High"}) - if live_crawl.get('h1_count', 0) > 1: - issues.append({"issue": "Multiple H1 Tags", "impact": "Medium"}) - - missing_alt_count = live_crawl.get('images_missing_alt_count', live_crawl.get('images_missing_alt', 0)) - if missing_alt_count > 0: - issues.append({ - "issue": f"{missing_alt_count} Images Missing Alt Text", - "impact": "Medium", - "details": live_crawl.get('images_missing_alt_details', []) - }) - - perf_score = audit_summary.get('performance_score', 100) - if perf_score < 50: - issues.append({"issue": "Critical Performance Issues", "impact": "High"}) - elif perf_score < 80: - issues.append({"issue": "Performance Optimization Needed", "impact": "Medium"}) - else: - issues = [{"issue": "Missing meta tags", "impact": "Medium"}, {"issue": "Slow loading", "impact": "High"}] - - return { - "prioritized_issues": [i["issue"] for i in issues], - "details": issues, - "timestamp": datetime.utcnow().isoformat() - } - - async def _auto_fix_executor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """ - Auto-fix execution tool. - Generates ready-to-apply patches for identified issues using LLM. - """ - issues = context.get('issues', []) - page_content = context.get('page_content', '') # Content to analyze for generating tags - - # If page_content is missing/empty, try to infer or fallback - if not page_content: - logger.warning("No page_content provided to auto_fix_executor. Fixes may be generic.") - page_content = "Content unavailable for analysis." - - patches = [] - - for issue in issues: - issue_data = issue if isinstance(issue, dict) else {"issue": issue} - issue_name = issue_data.get('issue', str(issue)) - - try: - if "Missing Meta Description" in issue_name: - prompt = f"""Generate a concise, SEO-optimized meta description (max 160 chars) for a page with this content: - - {page_content[:1500]} - - Return ONLY the meta description text. - """ - description = await self._generate_llm_response(prompt) - - patches.append({ - "type": "meta_description", - "action": "insert", - "content": description.strip('"'), - "target": "" - }) - - elif "Missing Title Tag" in issue_name: - prompt = f"""Generate a compelling, SEO-optimized title tag (max 60 chars) for a page with this content: - - {page_content[:1500]} - - Return ONLY the title tag text. - """ - title = await self._generate_llm_response(prompt) - - patches.append({ - "type": "title_tag", - "action": "insert", - "content": title.strip('"'), - "target": "" - }) - - elif "Missing Alt Text" in issue_name: - # Handle multiple images if details are provided - details = issue_data.get('details', []) - if not details: - # Fallback for generic issue without details - patches.append({ - "type": "alt_text", - "action": "update", - "details": "Manual review required: Missing image details for auto-fix.", - "count": 1 - }) - else: - for img in details: - context_text = img.get('context', '') - src = img.get('src', '') - - prompt = f"""Generate a short, descriptive alt text for an image on a webpage. - - Surrounding Text Context: "{context_text}" - Image Filename/Source: "{src}" - - Return ONLY the alt text. - """ - alt_text = await self._generate_llm_response(prompt) - - patches.append({ - "type": "alt_text", - "action": "update", - "target_src": src, - "content": alt_text.strip('"') - }) - - elif "Missing H1 Tag" in issue_name: - prompt = f"""Generate a main H1 heading for this page content: - - {page_content[:1500]} - - Return ONLY the H1 text. - """ - h1_text = await self._generate_llm_response(prompt) - - patches.append({ - "type": "h1_tag", - "action": "insert", - "content": h1_text.strip('"'), - "target": "" - }) - - except Exception as e: - logger.error(f"Failed to generate fix for issue '{issue_name}': {e}") - patches.append({ - "issue": issue_name, - "status": "failed", - "error": str(e) - }) - - return { - "fixes_generated": len(patches), - "patches": patches, - "status": "ready_for_review", - "timestamp": datetime.utcnow().isoformat() - } - - - async def _strategy_generator_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """SEO strategy generation tool""" - audit_results = context.get("audit_results", {}) - prioritized_issues = context.get("prioritized_issues", []) - - strategies = [] - if prioritized_issues: - strategies.append(f"Focus on fixing top 3 critical issues: {[i['issue'] for i in prioritized_issues[:3]]}") - - strategies.append("Optimize for long-tail keywords based on gap analysis") - - return { - "seo_strategy": strategies, - "next_steps": ["Execute auto-fixes", "Review content gaps"], - "timestamp": datetime.utcnow().isoformat() - } - - # GSC Insights Tools (Option B) - async def _fetch_gsc_analytics(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> Dict[str, Any]: - svc = PlatformAnalyticsService() - data = await svc.get_comprehensive_analytics(self.user_id, platforms=["gsc"], start_date=start_date, end_date=end_date) - gsc = data.get("gsc") - if not gsc or gsc.status != "success": - err = getattr(gsc, "error_message", None) if gsc else "No data" - raise RuntimeError(f"GSC analytics unavailable: {err}") - return { - "metrics": gsc.metrics, - "date_range": gsc.date_range - } - - async def _gsc_low_ctr_queries_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - limit = int(context.get("limit", 10)) - min_impr = int(context.get("min_impressions", 100)) - min_clicks = int(context.get("min_clicks", 10)) - ctr_threshold = float(context.get("ctr_threshold", 1.5)) - start_date = context.get("start_date") - end_date = context.get("end_date") - try: - result = await self._fetch_gsc_analytics(start_date, end_date) - tq = result["metrics"].get("top_queries", []) or [] - items = [ - { - "query": r.get("query"), - "clicks": r.get("clicks", 0), - "impressions": r.get("impressions", 0), - "ctr": r.get("ctr", 0.0), - "position": r.get("position") - } - for r in tq - if (r.get("impressions", 0) >= min_impr and r.get("clicks", 0) >= min_clicks and float(r.get("ctr", 0.0)) < ctr_threshold) - ] - items.sort(key=lambda x: (x.get("impressions", 0), -x.get("ctr", 100.0)), reverse=True) - return { - "items": items[:limit], - "range": result["date_range"], - "source": "gsc_cache" - } - except Exception as e: - logger.error(f"low_ctr_queries tool failed: {e}") - return {"error": str(e)} - - async def _gsc_striking_distance_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - limit = int(context.get("limit", 10)) - min_impr = int(context.get("min_impressions", 100)) - start_date = context.get("start_date") - end_date = context.get("end_date") - try: - result = await self._fetch_gsc_analytics(start_date, end_date) - tq = result["metrics"].get("top_queries", []) or [] - items = [ - { - "query": r.get("query"), - "clicks": r.get("clicks", 0), - "impressions": r.get("impressions", 0), - "ctr": r.get("ctr", 0.0), - "position": r.get("position") - } - for r in tq - if (r.get("impressions", 0) >= min_impr and r.get("position") is not None and 8.0 <= float(r.get("position")) <= 20.0) - ] - items.sort(key=lambda x: (x.get("position") if x.get("position") is not None else 999, -x.get("impressions", 0))) - return { - "items": items[:limit], - "range": result["date_range"], - "source": "gsc_cache" - } - except Exception as e: - logger.error(f"striking_distance tool failed: {e}") - return {"error": str(e)} - - async def _gsc_declining_queries_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - limit = int(context.get("limit", 10)) - min_prev_clicks = int(context.get("min_prev_clicks", 10)) - min_drop_pct = float(context.get("min_drop_pct", 30.0)) - start_date = context.get("start_date") - end_date = context.get("end_date") - try: - curr = await self._fetch_gsc_analytics(start_date, end_date) - curr_range = curr["date_range"] - s = curr_range.get("start") - e = curr_range.get("end") - from datetime import datetime, timedelta - fmt = "%Y-%m-%d" - sd = datetime.strptime(s, fmt) if s else datetime.utcnow() - timedelta(days=30) - ed = datetime.strptime(e, fmt) if e else datetime.utcnow() - days = max((ed - sd).days + 1, 1) - prev_end = sd - timedelta(days=1) - prev_start = prev_end - timedelta(days=days - 1) - prev = await self._fetch_gsc_analytics(prev_start.strftime(fmt), prev_end.strftime(fmt)) - curr_queries = {r.get("query"): r for r in (curr["metrics"].get("top_queries", []) or [])} - prev_queries = {r.get("query"): r for r in (prev["metrics"].get("top_queries", []) or [])} - items = [] - for q, prev_row in prev_queries.items(): - curr_row = curr_queries.get(q) - if not curr_row: - continue - prev_clicks = int(prev_row.get("clicks", 0) or 0) - curr_clicks = int(curr_row.get("clicks", 0) or 0) - if prev_clicks >= min_prev_clicks and curr_clicks < prev_clicks: - drop_pct = ((prev_clicks - curr_clicks) / prev_clicks) * 100.0 - if drop_pct >= min_drop_pct: - items.append({ - "query": q, - "prev_clicks": prev_clicks, - "curr_clicks": curr_clicks, - "drop_pct": round(drop_pct, 2) - }) - items.sort(key=lambda x: (x.get("drop_pct", 0), x.get("prev_clicks", 0)), reverse=True) - return { - "items": items[:limit], - "range": curr_range, - "previous_range": prev["date_range"], - "source": "gsc_cache" - } - except Exception as e: - logger.error(f"declining_queries tool failed: {e}") - return {"error": str(e)} - - async def _gsc_low_ctr_pages_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - limit = int(context.get("limit", 10)) - min_impr = int(context.get("min_impressions", 200)) - ctr_threshold = float(context.get("ctr_threshold", 1.5)) - start_date = context.get("start_date") - end_date = context.get("end_date") - try: - result = await self._fetch_gsc_analytics(start_date, end_date) - tp = result["metrics"].get("top_pages", []) or [] - items = [] - for r in tp: - if (r.get("impressions", 0) >= min_impr and float(r.get("ctr", 0.0)) < ctr_threshold): - items.append({ - "page": r.get("page"), - "clicks": r.get("clicks", 0), - "impressions": r.get("impressions", 0), - "ctr": r.get("ctr", 0.0), - "position": r.get("position"), - "evidence_queries": r.get("queries", [])[:5] - }) - items.sort(key=lambda x: (x.get("impressions", 0), -x.get("ctr", 100.0)), reverse=True) - return { - "items": items[:limit], - "range": result["date_range"], - "source": "gsc_cache" - } - except Exception as e: - logger.error(f"low_ctr_pages tool failed: {e}") - return {"error": str(e)} - - async def _gsc_cannibalization_candidates_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - limit = int(context.get("limit", 10)) - start_date = context.get("start_date") - end_date = context.get("end_date") - try: - result = await self._fetch_gsc_analytics(start_date, end_date) - candidates = result["metrics"].get("cannibalization", []) or [] - return { - "items": candidates[:limit], - "range": result["date_range"], - "source": "gsc_cache" - } - except Exception as e: - logger.error(f"cannibalization_candidates tool failed: {e}") - return {"error": str(e)} - - async def _default_seo_gsc_plan_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - start_date = context.get("start_date") - end_date = context.get("end_date") - try: - low_ctr_pages = await self._gsc_low_ctr_pages_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) - cannibals = await self._gsc_cannibalization_candidates_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) - striking = await self._gsc_striking_distance_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) - declining = await self._gsc_declining_queries_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) - - actions = [] - for p in low_ctr_pages.get("items", []): - actions.append({ - "type": "update_titles_meta", - "target_page": p.get("page"), - "justification": f"Low CTR {p.get('ctr')}% with {p.get('impressions')} impressions", - "evidence": p.get("evidence_queries", []) - }) - for c in cannibals.get("items", []): - actions.append({ - "type": "consolidate/internal_link", - "target_page": c.get("recommended_target_page"), - "justification": f"Cannibalization on query '{c.get('query')}'", - "pages": c.get("pages", []) - }) - for q in striking.get("items", []): - actions.append({ - "type": "refresh_content", - "target": "query", - "query": q.get("query"), - "justification": f"Striking distance at position {q.get('position')} with {q.get('impressions')} impressions" - }) - for q in declining.get("items", []): - actions.append({ - "type": "refresh_content", - "target": "query", - "query": q.get("query"), - "justification": f"Clicks decline {q.get('prev_clicks')}→{q.get('curr_clicks')} ({q.get('drop_pct')}%)" - }) - - return { - "plan_name": "Default SEO Plan from GSC", - "range": {"current": {"start": start_date, "end": end_date}}, - "actions": actions, - "source": "gsc_cache", - "timestamp": datetime.utcnow().isoformat() - } - except Exception as e: - logger.error(f"default_seo_gsc_plan failed: {e}") - return {"error": str(e)} - - -class SocialAmplificationAgent(BaseALwrityAgent): - """ - Agent responsible for social media monitoring, content adaptation, and distribution. - """ - - def __init__(self, user_id: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None): - super().__init__(user_id, "social_media_manager", model_name, llm) - - self.sif_service = None - if SIF_AVAILABLE: - try: - self.sif_service = SIFIntegrationService(user_id) - except Exception as e: - logger.warning(f"Failed to initialize SIF service for SocialAmplificationAgent: {e}") - - async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]: - """Propose PUBLISH and ENGAGE pillar tasks.""" - proposals = [] - - # 1. Publish Task - proposals.append(TaskProposal( - title="Schedule Social Content", - description="Plan and schedule your posts for the week to maintain consistent presence.", - pillar_id="publish", - priority="high", - estimated_time=20, - source_agent="SocialAmplificationAgent", - reasoning="Consistency is key for algorithm growth.", - action_type="navigate", - action_url="/scheduler-dashboard" - )) - - # 2. Engage Task - proposals.append(TaskProposal( - title="Engage with Community", - description="Respond to comments and interact with industry leaders' posts.", - pillar_id="engage", - priority="medium", - estimated_time=15, - source_agent="SocialAmplificationAgent", - reasoning="Community building increases reach.", - action_type="navigate", - action_url="/social-dashboard" - )) - - return proposals - - def _create_txtai_agent(self) -> Agent: - """Create Social Amplification Agent using txtai native framework""" - if not TXTAI_AVAILABLE: - return None - - return Agent( - llm=self.llm, - tools=[ - { - "name": "social_monitor", - "description": "Monitors social media trends and engagement", - "target": self._social_monitor_tool - }, - { - "name": "content_adapter", - "description": "Adapts content for different social platforms", - "target": self._content_adapter_tool - }, - { - "name": "engagement_optimizer", - "description": "Optimizes content for maximum engagement", - "target": self._engagement_optimizer_tool - }, - { - "name": "distribution_manager", - "description": "Manages content distribution across platforms", - "target": self._distribution_manager_tool - } - ], - max_iterations=10, - system=self.get_effective_system_prompt(f"""You are the Social Media Amplification Agent for ALwrity user {self.user_id}. - - Your mission is to optimize content distribution, monitor social signals, - and amplify content reach across platforms. - - Responsibilities: - - Monitor social trends and brand mentions via SIF - - Adapt content for specific platforms (LinkedIn, Twitter, etc.) - - Optimize posts for engagement (hashtags, timing, tone) - - Plan and execute distribution strategies - - Use semantic insights to align social content with overall content strategy. - Focus on maximizing engagement and driving traffic back to the main content.""" - ) - ) - - # Tool Implementations - - async def _social_monitor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """ - Social monitoring tool using SIF. - """ - # In a real scenario, this would search for trends or mentions. - # For now, we search for social-related context in SIF. - query = context.get('query', 'social media trends') - - if self.sif_service: - try: - # Search for social media related insights in the index - results = await self.sif_service.search(query, limit=5) - - # Extract relevant info - trends = [] - for res in results: - text = res.get('text', '') - if 'social' in text.lower() or 'trend' in text.lower(): - trends.append(text[:100] + "...") - - return { - "trends": trends, - "mentions": [], # Placeholder - "sentiment": "neutral", - "source": "sif_index", - "timestamp": datetime.utcnow().isoformat() - } - except Exception as e: - logger.error(f"Social monitoring failed: {e}") - return {"error": str(e)} - - return { - "trends": ["AI in marketing", "Content automation"], - "source": "mock_data", - "timestamp": datetime.utcnow().isoformat() - } - - async def _content_adapter_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """ - Adapts content for specific platforms. - Expects 'content' and 'platform' (e.g., 'linkedin', 'twitter'). - """ - content = context.get('content') - platform = context.get('platform', 'general') - - if not content: - return {"error": "No content provided for adaptation"} - - try: - # Use LLM to adapt content - prompt = f"""Adapt the following content for {platform}. - - Original Content: - {content} - - Requirements for {platform}: - - LinkedIn: Professional tone, use bullet points, engaging question at end. - - Twitter: Short, punchy, under 280 chars, use relevant hashtags. - - Instagram: Visual focus description, many hashtags. - - General: Balanced tone. - - Return ONLY the adapted content. - """ - - if self.llm: - adapted_content = await self._generate_llm_response(prompt) - else: - adapted_content = f"[Mock {platform}]: {content[:50]}... #adapted" - - return { - "original_content_snippet": content[:50] + "...", - "platform": platform, - "adapted_content": adapted_content, - "timestamp": datetime.utcnow().isoformat() - } - except Exception as e: - logger.error(f"Content adaptation failed: {e}") - return {"error": str(e)} - - async def _engagement_optimizer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """ - Optimizes content for engagement (hashtags, timing, hook). - """ - content = context.get('content') - platform = context.get('platform', 'general') - - if not content: - return {"error": "No content provided for optimization"} - - # Mock optimization logic or use LLM - suggestions = [ - "Add a call-to-action (CTA)", - "Use trending hashtags for the niche", - "Post during peak hours (Tue-Thu 10am)" - ] - - return { - "optimization_suggestions": suggestions, - "estimated_engagement_score": 8.5, - "timestamp": datetime.utcnow().isoformat() - } - - async def _distribution_manager_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: - """ - Manages distribution (scheduling/posting). - """ - posts = context.get('posts', []) - - schedule = [] - for i, post in enumerate(posts): - schedule.append({ - "post_id": i, - "platform": post.get('platform', 'unknown'), - "scheduled_time": "Tomorrow 10:00 AM" # Mock - }) - - return { - "distribution_plan": schedule, - "status": "scheduled", - "timestamp": datetime.utcnow().isoformat() - } +from services.intelligence.agents.specialized import ( + SIFBaseAgent, + StrategyArchitectAgent, + ContentGuardianAgent, + LinkGraphAgent, + CitationExpert, + ContentStrategyAgent, + CompetitorResponseAgent, + SEOOptimizationAgent, + SocialAmplificationAgent +) + +# Re-export for backward compatibility +__all__ = [ + "SIFBaseAgent", + "StrategyArchitectAgent", + "ContentGuardianAgent", + "LinkGraphAgent", + "CitationExpert", + "ContentStrategyAgent", + "CompetitorResponseAgent", + "SEOOptimizationAgent", + "SocialAmplificationAgent" +] diff --git a/backend/services/intelligence/agents/trend_surfer_agent.py b/backend/services/intelligence/agents/trend_surfer_agent.py index f7895559..d4330048 100644 --- a/backend/services/intelligence/agents/trend_surfer_agent.py +++ b/backend/services/intelligence/agents/trend_surfer_agent.py @@ -18,8 +18,8 @@ class TrendSurferAgent(SIFBaseAgent): "Surfs" the trends detected by MarketSignalDetector to propose timely content. """ - def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str): - super().__init__(intelligence_service, user_id, agent_type="trend_surfer") + def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, **kwargs): + super().__init__(intelligence_service, user_id, agent_type="trend_surfer", **kwargs) self.user_id = user_id self.signal_detector = MarketSignalDetector(user_id) self.trends_service = GoogleTrendsService() diff --git a/backend/services/intelligence/sif_agents.py b/backend/services/intelligence/sif_agents.py index 6bc697c0..4bec6120 100644 --- a/backend/services/intelligence/sif_agents.py +++ b/backend/services/intelligence/sif_agents.py @@ -25,8 +25,9 @@ except ImportError: class SharedLLMWrapper: """Wraps the shared ALwrity LLM service to look like a txtai LLM.""" - def __init__(self, user_id: str): + def __init__(self, user_id: str, task: Optional[str] = None): self.user_id = user_id + self.task = task def generate(self, prompt: str, **kwargs) -> str: """Generate text using the shared LLM provider.""" @@ -41,25 +42,54 @@ class SharedLLMWrapper: def __call__(self, prompt: str, **kwargs) -> str: return self.generate(prompt, **kwargs) +_local_llm_cache = {} + class LocalLLMWrapper: """ - Lazily loads a local LLM via txtai. - This prevents blocking server startup with heavy model loads. + Lazily loads a local LLM via txtai and caches it globally. + This prevents blocking server startup and redundant model loads. """ - def __init__(self, model_path: str, task: str = "text-generation"): + def __init__(self, model_path: str, task: str = None): self.model_path = model_path self.task = task - self._llm = None + # No self._llm here, we use the global cache @property def llm(self): - if self._llm is None: - if LLM is None: - raise ImportError("txtai.pipeline.LLM is not available") - logger.info(f"Loading local LLM: {self.model_path} with task: {self.task}") - # Explicitly set task to avoid 'text2text-generation' default failures - self._llm = LLM(path=self.model_path, task=self.task) - return self._llm + # Create a cache key based on model path and task + cache_key = f"{self.model_path}:{self.task}" + + if cache_key in _local_llm_cache: + return _local_llm_cache[cache_key] + + if LLM is None: + raise ImportError("txtai.pipeline.LLM is not available") + + task_to_use = (self.task or "language-generation").strip() + # Explicitly force language-generation for known models if auto-detect fails + if any(x in self.model_path for x in ["Qwen", "Instruct", "GPT", "Llama"]): + task_to_use = "language-generation" + if task_to_use == "text-generation": + task_to_use = "language-generation" + + logger.info(f"Loading local LLM (singleton): {self.model_path} (task={task_to_use})") + try: + _local_llm_cache[cache_key] = LLM(path=self.model_path, task=task_to_use) + except Exception as e: + try: + import transformers + from transformers.pipelines import SUPPORTED_TASKS + logger.error( + f"LocalLLMWrapper init failed (model={self.model_path}, requested_task={task_to_use}, " + f"transformers={getattr(transformers, '__version__', 'unknown')}, " + f"supported_tasks={sorted(list(SUPPORTED_TASKS.keys()))[:50]})" + ) + except Exception: + pass + logger.error(f"Failed to initialize LocalLLMWrapper: {e}") + raise e + + return _local_llm_cache[cache_key] def __call__(self, prompt: str, **kwargs) -> str: return self.llm(prompt, **kwargs) @@ -75,13 +105,9 @@ class SIFBaseAgent(BaseALwrityAgent): # 2. Local LLM for internal agent work (default for SIF agents) if llm is None: - if TXTAI_AVAILABLE and LLM is not None: - # Use Lazy Local LLM when txtai LLM is available - # Hardening: Specify 'text-generation' task to avoid text2text defaults - llm = LocalLLMWrapper(model_name, task="text-generation") - else: - # Fallback to Shared if txtai or LLM is not available - llm = self.shared_llm + if not (TXTAI_AVAILABLE and LLM is not None): + raise RuntimeError("txtai LLM is required for SIF agents but is not available") + llm = LocalLLMWrapper(model_name, task="text-generation") super().__init__(user_id, agent_type, model_name, llm) self.intelligence = intelligence_service @@ -98,14 +124,16 @@ class SIFBaseAgent(BaseALwrityAgent): capabilities via a standard agent interface if available. """ if not TXTAI_AVAILABLE or Agent is None: - logger.debug(f"[{self.__class__.__name__}] txtai Agent not available, using fallback agent") - return self._create_fallback_agent() + raise RuntimeError(f"[{self.__class__.__name__}] txtai Agent not available") try: - return Agent(llm=self.llm, tools=[]) + _llm_for_agent = self.llm + for _ in range(3): + _llm_for_agent = getattr(_llm_for_agent, "llm", _llm_for_agent) + return Agent(llm=_llm_for_agent, tools=[]) except Exception as e: - logger.warning(f"[{self.__class__.__name__}] Failed to create txtai Agent: {e}") - return self._create_fallback_agent() + logger.error(f"[{self.__class__.__name__}] Failed to create txtai Agent: {e}") + raise class StrategyArchitectAgent(SIFBaseAgent): """Agent for discovering content pillars and identifying strategic gaps."""