diff --git a/backend/services/agent_activity_service.py b/backend/services/agent_activity_service.py index 4f67c9e1..e98e0252 100644 --- a/backend/services/agent_activity_service.py +++ b/backend/services/agent_activity_service.py @@ -1,7 +1,8 @@ from __future__ import annotations +from dataclasses import asdict, dataclass, field from datetime import datetime -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from sqlalchemy import func from sqlalchemy.orm import Session @@ -9,6 +10,73 @@ from sqlalchemy.orm import Session from models.agent_activity_models import AgentAlert, AgentApprovalRequest, AgentEvent, AgentRun +@dataclass +class AgentEventPayload: + """Shared schema for agent activity event payloads.""" + + phase: Optional[str] = None + step: Optional[str] = None + tool_name: Optional[str] = None + progress_percent: Optional[float] = None + input_summary: Optional[str] = None + output_summary: Optional[str] = None + decision_reason: Optional[str] = None + evidence_refs: List[str] = field(default_factory=list) + safe_debug: bool = True + metadata: Dict[str, Any] = field(default_factory=dict) + + +def build_agent_event_payload( + *, + phase: Optional[str] = None, + step: Optional[str] = None, + tool_name: Optional[str] = None, + progress_percent: Optional[float] = None, + input_summary: Optional[str] = None, + output_summary: Optional[str] = None, + decision_reason: Optional[str] = None, + evidence_refs: Optional[List[str]] = None, + safe_debug: bool = True, + metadata: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + return asdict( + AgentEventPayload( + phase=phase, + step=step, + tool_name=tool_name, + progress_percent=progress_percent, + input_summary=input_summary, + output_summary=output_summary, + decision_reason=decision_reason, + evidence_refs=list(evidence_refs or []), + safe_debug=bool(safe_debug), + metadata=dict(metadata or {}), + ) + ) + + +def _normalize_event_payload(payload: Optional[Union[Dict[str, Any], AgentEventPayload]]) -> Dict[str, Any]: + if payload is None: + return build_agent_event_payload() + if isinstance(payload, AgentEventPayload): + return asdict(payload) + if not isinstance(payload, dict): + return build_agent_event_payload(output_summary=str(payload)[:2000], safe_debug=False) + + return build_agent_event_payload( + phase=payload.get("phase"), + step=payload.get("step"), + tool_name=payload.get("tool_name"), + progress_percent=payload.get("progress_percent"), + input_summary=payload.get("input_summary"), + output_summary=payload.get("output_summary"), + decision_reason=payload.get("decision_reason"), + evidence_refs=payload.get("evidence_refs") if isinstance(payload.get("evidence_refs"), list) else [], + safe_debug=bool(payload.get("safe_debug", True)), + metadata=payload.get("metadata") if isinstance(payload.get("metadata"), dict) else {}, + ) + + class AgentActivityService: def __init__(self, db: Session, user_id: str): self.db = db @@ -51,10 +119,11 @@ class AgentActivityService: event_type: str, severity: str = "info", message: Optional[str] = None, - payload: Optional[Dict[str, Any]] = None, + payload: Optional[Union[Dict[str, Any], AgentEventPayload]] = None, run_id: Optional[int] = None, agent_type: Optional[str] = None, ) -> AgentEvent: + normalized_payload = _normalize_event_payload(payload) evt = AgentEvent( run_id=run_id, user_id=self.user_id, @@ -62,7 +131,7 @@ class AgentActivityService: event_type=event_type, severity=severity, message=message, - payload=payload, + payload=normalized_payload, created_at=datetime.utcnow(), ) self.db.add(evt) diff --git a/backend/services/intelligence/agents/core_agent_framework.py b/backend/services/intelligence/agents/core_agent_framework.py index 26896589..683de459 100644 --- a/backend/services/intelligence/agents/core_agent_framework.py +++ b/backend/services/intelligence/agents/core_agent_framework.py @@ -38,7 +38,7 @@ from utils.logger_utils import get_service_logger from services.database import get_session_for_user from services.intelligence.monitoring.semantic_dashboard import RealTimeSemanticMonitor from services.intelligence.agents.safety_framework import get_safety_framework -from services.agent_activity_service import AgentActivityService +from services.agent_activity_service import AgentActivityService, build_agent_event_payload from services.intelligence.agents.agent_usage_tracking import track_agent_usage_sync import time @@ -504,7 +504,7 @@ class BaseALwrityAgent(ABC): event_type="plan", severity="info", message=(prompt[:2000] if prompt else None), - payload={"kind": "prompt"}, + payload=build_agent_event_payload(phase="planning", step="run_started", tool_name="agent_run", progress_percent=0, input_summary=prompt[:250], output_summary="Agent run initialized", decision_reason="Received run prompt", safe_debug=False, metadata={"kind": "prompt"}), run_id=run_record.id, agent_type=self.agent_type, ) @@ -531,7 +531,7 @@ class BaseALwrityAgent(ABC): event_type="final_summary", severity="info", message=(str(result)[:2000] if result is not None else None), - payload={"kind": "result"}, + payload=build_agent_event_payload(phase="execution", step="run_completed", tool_name="agent_run", progress_percent=100, output_summary=str(result)[:400] if result is not None else "No output", decision_reason="Run completed", safe_debug=True, metadata={"kind": "result"}), run_id=run_record.id, agent_type=self.agent_type, ) @@ -545,7 +545,7 @@ class BaseALwrityAgent(ABC): event_type="error", severity="error", message=str(e)[:2000], - payload={"kind": "exception"}, + payload=build_agent_event_payload(phase="execution", step="run_error", tool_name="agent_runtime", output_summary=str(e)[:400], decision_reason="Unhandled exception during run", safe_debug=False, metadata={"kind": "exception"}), run_id=run_record.id, agent_type=self.agent_type, ) @@ -591,7 +591,7 @@ class BaseALwrityAgent(ABC): event_type="plan", severity="info", message=f"{action.action_type} -> {action.target_resource}", - payload={"action": asdict(action)}, + payload=build_agent_event_payload(phase="planning", step="action_received", tool_name=action.action_type, progress_percent=5, input_summary=f"target={action.target_resource}", output_summary="Action accepted for execution", decision_reason="Start run lifecycle", safe_debug=True, metadata={"action": asdict(action)}), run_id=run_record.id, agent_type=self.agent_type, ) @@ -606,7 +606,7 @@ class BaseALwrityAgent(ABC): event_type="decision", severity="warning", message="Action failed safety validation", - payload={"action_id": action.action_id, "action_type": action.action_type}, + payload=build_agent_event_payload(phase="validation", step="safety_blocked", tool_name="safety_framework", progress_percent=10, input_summary=action.action_type, output_summary="Action blocked by safety validation", decision_reason="Safety framework rejected action", safe_debug=True, metadata={"action_id": action.action_id, "action_type": action.action_type}), run_id=run_record.id, agent_type=self.agent_type, ) @@ -646,7 +646,7 @@ class BaseALwrityAgent(ABC): event_type="decision", severity="info", message="Action requires approval", - payload={"approval_id": req.id, "action_id": action.action_id}, + payload=build_agent_event_payload(phase="approval", step="awaiting_user_decision", tool_name=action.action_type, progress_percent=20, input_summary=action.target_resource, output_summary="Approval request created", decision_reason="Action requires human approval", safe_debug=True, metadata={"approval_id": req.id, "action_id": action.action_id}), run_id=run_record.id, agent_type=self.agent_type, ) @@ -671,7 +671,7 @@ class BaseALwrityAgent(ABC): event_type="progress", severity="info", message="Rollback checkpoint created", - payload={"checkpoint_id": checkpoint_id}, + payload=build_agent_event_payload(phase="safety", step="checkpoint_created", tool_name="rollback_manager", progress_percent=35, output_summary="Rollback checkpoint created", decision_reason="Prepare rollback safety net", safe_debug=True, metadata={"checkpoint_id": checkpoint_id}), run_id=run_record.id, agent_type=self.agent_type, ) @@ -682,7 +682,7 @@ class BaseALwrityAgent(ABC): event_type="warning", severity="warning", message=str(e)[:2000], - payload={"checkpoint": "failed"}, + payload=build_agent_event_payload(phase="safety", step="checkpoint_failed", tool_name="rollback_manager", progress_percent=30, output_summary="Checkpoint creation failed", decision_reason="Proceeding without checkpoint", safe_debug=False, metadata={"checkpoint": "failed"}), run_id=run_record.id, agent_type=self.agent_type, ) @@ -717,7 +717,7 @@ class BaseALwrityAgent(ABC): event_type="final_summary", severity="info", message=str(result)[:2000] if result is not None else None, - payload={"action_id": action.action_id}, + payload=build_agent_event_payload(phase="execution", step="completed", tool_name=action.action_type, progress_percent=100, output_summary=str(result)[:400] if result is not None else "No output", decision_reason="Action execution completed", safe_debug=True, metadata={"action_id": action.action_id}), run_id=run_record.id, agent_type=self.agent_type, ) @@ -768,7 +768,7 @@ class BaseALwrityAgent(ABC): event_type="error", severity="error", message=str(e)[:2000], - payload={"action_id": action.action_id, "checkpoint_id": checkpoint_id}, + payload=build_agent_event_payload(phase="execution", step="failed", tool_name=action.action_type, progress_percent=100, output_summary=str(e)[:400], decision_reason="Exception during action execution", safe_debug=False, metadata={"action_id": action.action_id, "checkpoint_id": checkpoint_id}), run_id=run_record.id, agent_type=self.agent_type, ) diff --git a/backend/services/intelligence/agents/specialized_agents.py b/backend/services/intelligence/agents/specialized_agents.py index f1064fd2..6b679beb 100644 --- a/backend/services/intelligence/agents/specialized_agents.py +++ b/backend/services/intelligence/agents/specialized_agents.py @@ -26,3 +26,2528 @@ __all__ = [ "SEOOptimizationAgent", "SocialAmplificationAgent" ] + + event_type="progress", + severity="info", + message=f"{self.__class__.__name__}: {operation}", + payload=build_agent_event_payload( + phase="specialized_agent", + step=operation.lower().replace(" ", "_"), + tool_name=self.__class__.__name__, + input_summary=str(kwargs)[:300] if kwargs else None, + output_summary="Operation invoked", + decision_reason="Agent method execution trace", + safe_debug=True, + metadata={"params": kwargs} if kwargs else {}, + ), + run_id=None, + agent_type=self.agent_type, + ) + except Exception: + pass + finally: + try: + + logger.warning(f"[{self.__class__.__name__}] Error checking pillars for proposals: {e}") + + # 2. Strategy Review (Generic fallback) + proposals.append(TaskProposal( + title="Review Strategic Goals", + description="Ensure your content output aligns with your quarterly business goals.", + pillar_id="plan", + priority="low", + estimated_time=10, + source_agent="StrategyArchitectAgent", + reasoning="Routine strategy maintenance.", + action_type="navigate", + action_url="/content-planning-dashboard" + )) + + return proposals + + async def find_semantic_gaps(self, competitor_indices: List[Any]) -> List[Dict[str, Any]]: + """Compare user content vs competitor content to find missing topics.""" + self._log_agent_operation("Finding semantic content gaps", competitor_count=len(competitor_indices)) + + try: + documents = await self._fetch_index_documents() + if not documents: + logger.info(f"[{self.__class__.__name__}] No indexed documents available for gap detection") + return [] + + competitor_docs, user_docs = [], [] + allowed_competitor_ids = set(str(idx) for idx in competitor_indices) if competitor_indices else None + if allowed_competitor_ids: + for idx in competitor_indices: + if isinstance(idx, int) and 0 <= idx < len(documents): + allowed_competitor_ids.add(str(documents[idx].get("id", ""))) + + for doc in documents: + metadata = doc.get("metadata", {}) + role = self._infer_document_role(metadata) + if role == "competitor": + if allowed_competitor_ids and str(doc.get("id")) not in allowed_competitor_ids: + continue + competitor_docs.append(doc) + elif role == "user": + user_docs.append(doc) + + if not competitor_docs or not user_docs: + logger.info( + f"[{self.__class__.__name__}] Insufficient split for gap analysis: " + f"user_docs={len(user_docs)}, competitor_docs={len(competitor_docs)}" + ) + return [] + + competitor_topics = self._extract_topic_density(competitor_docs) + user_topics = self._extract_topic_density(user_docs) + competitor_topic_docs = self._map_topic_to_doc_titles(competitor_docs) + user_topic_docs = self._map_topic_to_doc_titles(user_docs) + + gaps = [] + for topic, competitor_density in competitor_topics.items(): + user_density = user_topics.get(topic, 0.0) + coverage_delta = competitor_density - user_density + if coverage_delta <= 0.08: + continue + + competitor_support = len(competitor_topic_docs.get(topic, [])) + user_support = len(user_topic_docs.get(topic, [])) + confidence = max(0.0, min(1.0, (coverage_delta * 0.65) + (min(1.0, competitor_support / 4) * 0.35))) + severity_score = max(0.0, min(1.0, (coverage_delta * 0.7) + (confidence * 0.3))) + priority = "high" if severity_score >= 0.72 else "medium" if severity_score >= 0.45 else "low" + gaps.append({ + "topic": topic, + "priority": priority, + "reason": ( + f"Competitors mention '{topic}' substantially more often " + f"(density {competitor_density:.2f} vs {user_density:.2f})." + ), + "confidence": round(confidence, 3), + "severity_score": round(severity_score, 3), + "coverage_delta": round(coverage_delta, 4), + "topic_density": { + "competitor": round(competitor_density, 4), + "user": round(user_density, 4), + "gap": round(coverage_delta, 4) + }, + "evidence": { + "competitor_sample_titles": self._sample_titles_for_topic(competitor_docs, topic), + "user_sample_titles": self._sample_titles_for_topic(user_docs, topic), + "competitor_supporting_docs": competitor_support, + "user_supporting_docs": user_support, + "competitor_doc_count": len(competitor_docs), + "user_doc_count": len(user_docs) + } + }) + + gaps.sort( + key=lambda item: ( + item.get("severity_score", 0), + item.get("confidence", 0), + item.get("topic_density", {}).get("gap", 0) + ), + reverse=True + ) + return gaps[:12] + + except Exception as e: + logger.error(f"[{self.__class__.__name__}] Failed to find semantic gaps: {e}") + logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") + return [] + + async def _fetch_index_documents(self) -> List[Dict[str, Any]]: + """Fetch indexed documents and normalize metadata from txtai result objects.""" + if not self.intelligence.is_initialized() or not self.intelligence.embeddings: + return [] + + embeddings = self.intelligence.embeddings + limit = 0 + if hasattr(embeddings, "count"): + try: + limit = int(embeddings.count()) + except Exception: + limit = 0 + + documents = [] + candidate_queries = [] + if limit > 0: + candidate_queries.extend([ + f"select id, text, object from txtai limit {limit}", + f"select id, text, tags from txtai limit {limit}" + ]) + candidate_queries.extend(["marketing", "content", "seo", "strategy", "social media"]) + + seen_ids = set() + for query in candidate_queries: + try: + query_limit = limit if query.startswith("select") and limit > 0 else max(10, limit or 50) + rows = embeddings.search(query, limit=query_limit) + except Exception: + continue + + for row in rows or []: + doc_id = str(row.get("id", "")) + dedupe_key = doc_id or str(hash(f"{row.get('text','')}::{row.get('score',0)}")) + if dedupe_key in seen_ids: + continue + seen_ids.add(dedupe_key) + documents.append({ + "id": doc_id, + "text": row.get("text", "") or "", + "metadata": self._normalize_metadata(row) + }) + + if limit > 0 and len(documents) >= limit: + break + + return documents + + def _normalize_metadata(self, row: Dict[str, Any]) -> Dict[str, Any]: + """Normalize metadata payloads from txtai search rows.""" + for key in ("object", "tags", "metadata", "meta"): + payload = row.get(key) + if isinstance(payload, dict): + return payload + if isinstance(payload, str): + try: + parsed = json.loads(payload) + if isinstance(parsed, dict): + return parsed + except Exception: + continue + return {} + + def _extract_topic_density(self, documents: List[Dict[str, Any]]) -> Dict[str, float]: + """Extract topic density from document metadata and titles.""" + topic_counter: Counter = Counter() + + for doc in documents: + for topic in self._extract_topics_from_document(doc): + topic_counter[topic] += 1 + + total_docs = max(1, len(documents)) + return { + topic: count / total_docs + for topic, count in topic_counter.items() + if count >= 2 + } + + def _infer_document_role(self, metadata: Dict[str, Any]) -> str: + """Infer whether a document belongs to user content or competitor content.""" + signals = [ + metadata.get("type", ""), + metadata.get("doc_type", ""), + metadata.get("content_type", ""), + metadata.get("source", ""), + metadata.get("origin", "") + ] + signal_blob = " ".join(str(item).lower() for item in signals if item) + + if any(token in signal_blob for token in ("competitor", "rival", "market_peer")): + return "competitor" + if any(token in signal_blob for token in ("user", "owned", "first_party", "customer_site")): + return "user" + return "unknown" + + def _extract_topics_from_document(self, doc: Dict[str, Any]) -> List[str]: + """Extract normalized topic labels from metadata and lightweight text fields.""" + metadata = doc.get("metadata", {}) + candidates: List[str] = [] + + for key in ("topics", "topic", "themes", "theme", "keywords", "keyword", "tags", "category", "categories"): + value = metadata.get(key) + if isinstance(value, list): + candidates.extend([str(v) for v in value if v]) + elif isinstance(value, str) and value.strip(): + candidates.extend(re.split(r"[,|/]", value)) + + title = metadata.get("title") or doc.get("text", "")[:160] + if title: + candidates.extend(re.findall(r"[a-zA-Z][a-zA-Z\-]{3,}", str(title).lower())) + + stopwords = { + "with", "from", "that", "this", "your", "about", "into", "using", "guide", "best", + "tips", "what", "when", "where", "how", "the", "and", "for", "2024", "2025" + } + normalized = { + item.strip().lower() + for item in candidates + if item + and len(item.strip()) >= 4 + and not item.strip().isdigit() + and item.strip().lower() not in stopwords + } + return sorted(normalized) + + def _map_topic_to_doc_titles(self, documents: List[Dict[str, Any]]) -> Dict[str, List[str]]: + """Map each topic to a list of document titles that support it.""" + mapping: Dict[str, List[str]] = {} + for doc in documents: + metadata = doc.get("metadata", {}) + title = str(metadata.get("title") or doc.get("text", "")[:100] or "Untitled") + for topic in self._extract_topics_from_document(doc): + mapping.setdefault(topic, []).append(title) + return mapping + + def _sample_titles_for_topic(self, documents: List[Dict[str, Any]], topic: str, limit: int = 3) -> List[str]: + """Return sample titles for a topic.""" + samples = [] + topic_lower = topic.lower() + for doc in documents: + metadata = doc.get("metadata", {}) + title = metadata.get("title") or doc.get("text", "")[:100] + if not title: + continue + + haystack = f"{title} {json.dumps(metadata, default=str)}".lower() + if topic_lower in haystack: + samples.append(str(title)) + if len(samples) >= limit: + break + + return samples + +class ContentGuardianAgent(SIFBaseAgent): + """Agent for preventing cannibalization and ensuring content originality.""" + + CANNIBALIZATION_THRESHOLD = 0.85 # Similarity threshold for cannibalization warning + ORIGINALITY_THRESHOLD = 0.75 # Minimum originality score + + def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, sif_service: Any = None): + super().__init__(intelligence_service, user_id, agent_type="content_guardian") + self.sif_service = sif_service + + # Lazy initialization of SIF service if not provided + if self.sif_service is None and SIF_AVAILABLE: + try: + self.sif_service = SIFIntegrationService(user_id) + logger.info(f"[{self.__class__.__name__}] Lazily initialized SIFIntegrationService") + except Exception as e: + logger.warning(f"[{self.__class__.__name__}] Failed to lazily initialize SIF service: {e}") + + async def assess_content_quality(self, content: str) -> Dict[str, Any]: + """ + Assess content quality based on originality, readability, and cannibalization risks. + """ + self._log_agent_operation("Assessing content quality", content_length=len(content)) + + try: + # 1. Check for cannibalization + cannibalization_result = await self.check_cannibalization(content) + + # 2. Check originality (if not cannibalized) + originality_score = 1.0 + if not cannibalization_result.get("warning"): + originality_result = await self.verify_originality(content, None) + originality_score = originality_result.get("originality_score", 1.0) + + # 3. Check Style Compliance + style_result = await self.style_enforcer(content) + style_score = style_result.get("compliance_score", 1.0) + + # 4. Basic Readability (Flesch-Kincaid proxy via sentence length/word complexity) + # Simple heuristic for now + words = content.split() + sentences = content.split('.') + avg_sentence_length = len(words) / max(1, len(sentences)) + readability_score = 1.0 if avg_sentence_length < 20 else max(0.5, 1.0 - (avg_sentence_length - 20) * 0.05) + + # Weighted Score: Originality (40%) + Style (30%) + Readability (30%) + quality_score = (originality_score * 0.4) + (style_score * 0.3) + (readability_score * 0.3) + + return { + "quality_score": quality_score, + "originality_score": originality_score, + "readability_score": readability_score, + "style_score": style_score, + "cannibalization_risk": cannibalization_result, + "style_compliance": style_result, + "is_acceptable": quality_score > 0.7 and not cannibalization_result.get("warning", False) + } + + except Exception as e: + logger.error(f"[{self.__class__.__name__}] Failed to assess content quality: {e}") + return {"error": str(e), "quality_score": 0.0} + + async def check_cannibalization(self, new_draft: str) -> Dict[str, Any]: + """Check if a new draft competes semantically with existing pages.""" + self._log_agent_operation("Checking for semantic cannibalization", draft_length=len(new_draft)) + + try: + if not self.intelligence.is_initialized(): + logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized") + return {"warning": False, "error": "Service not initialized"} + + if not new_draft or len(new_draft.strip()) < 50: + logger.warning(f"[{self.__class__.__name__}] Draft too short for meaningful analysis") + return {"warning": False, "reason": "Draft too short"} + + results = await self.intelligence.search(new_draft, limit=1) + + if not results: + logger.info(f"[{self.__class__.__name__}] No similar content found - draft is unique") + return {"warning": False, "uniqueness_score": 1.0} + + top_result = results[0] + similarity_score = top_result.get('score', 0.0) + + logger.debug(f"[{self.__class__.__name__}] Top similarity score: {similarity_score:.4f}") + + if similarity_score > self.CANNIBALIZATION_THRESHOLD: + warning_data = { + "warning": True, + "similar_to": top_result.get('id', 'unknown'), + "score": similarity_score, + "threshold": self.CANNIBALIZATION_THRESHOLD, + "recommendation": "Consider revising the draft to target a different angle or merge with existing content" + } + logger.warning(f"[{self.__class__.__name__}] Cannibalization detected: {warning_data}") + return warning_data + + logger.info(f"[{self.__class__.__name__}] No cannibalization detected. Draft is sufficiently unique.") + return {"warning": False, "uniqueness_score": 1.0 - similarity_score} + + except Exception as e: + logger.error(f"[{self.__class__.__name__}] Failed to check cannibalization: {e}") + logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") + return {"warning": False, "error": str(e)} + + async def verify_originality(self, text: str, competitor_index: Any) -> Dict[str, Any]: + """Verify originality against competitor content index.""" + self._log_agent_operation("Verifying originality against competitors", text_length=len(text)) + + try: + if not text or len(text.strip()) < 50: + logger.warning(f"[{self.__class__.__name__}] Text too short for meaningful originality check") + return {"originality_score": 0.0, "reason": "Text too short"} + + # STUB: Implement cross-index search against competitor content + # This would search the text against a competitor-specific index + + logger.info(f"[{self.__class__.__name__}] Originality verification stub completed") + return { + "originality_score": 0.95, # Placeholder + "confidence": 0.8, + "method": "semantic_comparison", + "notes": "Competitor index integration pending" + } + + except Exception as e: + logger.error(f"[{self.__class__.__name__}] Failed to verify originality: {e}") + logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") + return {"originality_score": 0.0, "error": str(e)} + + async def style_enforcer(self, text: str, style_guidelines: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """ + Tool: Ensures content adheres to brand voice and style guidelines. + """ + self._log_agent_operation("Enforcing style guidelines", text_length=len(text)) + + try: + if not text: + return {"compliance_score": 0.0, "issues": ["No text provided"]} + + # 1. Fetch Style Guidelines from SIF if not provided + if not style_guidelines and self.sif_service: + try: + # Use central SIF service to get robust context + seo_context = await self.sif_service.get_seo_context() + + if seo_context and "error" not in seo_context: + # Extract brand voice/style from the context + # The context structure is normalized in get_seo_context + + # Note: get_seo_context returns a flattened dict. + # We need to dig into the original structure if available, or rely on what's mapped. + # However, get_seo_context maps 'seo_audit', 'sitemap_analysis', etc. + # Brand info is usually in 'brand_analysis' col of WebsiteAnalysis, which might not be fully exposed + # in the simplified get_seo_context return. + # Let's check if we can get the full object or if we need to expand get_seo_context. + # For now, we'll try to use what's there or fall back to a specific search if needed. + + # Actually, looking at get_seo_context implementation: + # It returns 'seo_audit', 'crawl_result'. + # Brand analysis is often stored in WebsiteAnalysis.brand_analysis. + # We might need to extend get_seo_context or do a specific retrieval here. + # But wait! I saw get_seo_context implementation earlier: + # It retrieves the "full_report" from the SIF metadata. + # If the SIF index contains the full WebsiteAnalysis object, we are good. + + # Let's try to get it from the full report if we can access it, + # but get_seo_context returns a filtered dict. + + # Alternative: Use the robust retrieval logic but specifically for brand info if get_seo_context is too narrow. + # But get_seo_context logic includes "website analysis seo audit" query. + + # Let's assume for now we use the same retrieval logic but locally adapted, + # OR better, trust get_seo_context to be the single point of truth. + # If get_seo_context doesn't return brand info, we should update IT, not hack here. + # But I can't update SIFIntegrationService right now without context switch. + + # Let's stick to the previous manual search pattern BUT use the SIF service helper if possible. + # Actually, the previous code was: + # results = await self.intelligence.search("website analysis brand voice style", limit=1) + + # Let's keep it simple and robust: + # Try to get it from SIF service if possible. + # Since get_seo_context might not return brand_voice directly, let's try to see if we can use it. + + # Actually, let's use the manual search but with better error handling, + # mirroring get_seo_context's robustness (e.g. parsing). + + results = await self.intelligence.search("website analysis brand voice style", limit=1) + if results: + res = results[0] + metadata_str = res.get('object') + metadata = json.loads(metadata_str) if isinstance(metadata_str, str) else (metadata_str or res) + + if metadata.get('type') == 'website_analysis': + report = metadata.get('full_report', {}) + # Support both flat and nested structures + brand_analysis = report.get('brand_analysis') or report.get('brand_voice', {}) + if isinstance(brand_analysis, str): + # Handle case where it might be a JSON string + try: brand_analysis = json.loads(brand_analysis) + except: brand_analysis = {"brand_voice": brand_analysis} + + style_guidelines = { + "tone": brand_analysis.get('brand_voice', 'neutral') if isinstance(brand_analysis, dict) else 'neutral', + "style_patterns": report.get('style_patterns', {}), + "writing_style": report.get('writing_style', {}) + } + logger.info(f"[{self.__class__.__name__}] Retrieved style guidelines from SIF index") + except Exception as e: + logger.warning(f"[{self.__class__.__name__}] Failed to retrieve style guidelines: {e}") + + issues = [] + score = 1.0 + + # Basic Heuristic Checks (Placeholder for LLM-based style analysis) + + # 1. Tone Check (e.g., formal vs casual) + # If guidelines specify 'formal', check for contractions + tone = style_guidelines.get('tone', '').lower() if style_guidelines else '' + if 'formal' in tone or 'professional' in tone: + contractions = ["can't", "won't", "don't", "it's"] + found_contractions = [c for c in contractions if c in text.lower()] + if found_contractions: + issues.append(f"Found contractions in formal text: {', '.join(found_contractions[:3])}...") + score -= 0.1 + + # 2. Length/Sentence Structure (simple metric) + sentences = text.split('.') + avg_len = sum(len(s.split()) for s in sentences if s) / max(1, len(sentences)) + if avg_len > 25: + issues.append("Average sentence length is too high (>25 words). Consider shortening.") + score -= 0.1 + + return { + "compliance_score": max(0.0, score), + "issues": issues, + "is_compliant": score > 0.8, + "guidelines_source": "sif_index" if not style_guidelines and self.sif_service else "provided" + } + + except Exception as e: + logger.error(f"[{self.__class__.__name__}] Style enforcement failed: {e}") + return {"error": str(e)} + + async def perform_site_audit(self, website_url: str, limit: int = 10) -> Dict[str, Any]: + """ + Perform a quality audit on the user's website content. + """ + self._log_agent_operation("Performing site audit", website_url=website_url) + + try: + # 1. Retrieve recent content for the site from SIF + # We search for everything with the website_url in metadata + # Note: This depends on how data is indexed. + results = await self.intelligence.search(f"site:{website_url}", limit=limit) + + if not results: + logger.info(f"[{self.__class__.__name__}] No content found for site audit") + return {"error": "No content found"} + + audit_results = [] + total_quality = 0.0 + + for res in results: + text = res.get('text', '') + if not text or len(text) < 100: + continue + + quality = await self.assess_content_quality(text) + audit_results.append({ + "id": res.get('id'), + "title": res.get('title', 'Unknown'), + "quality": quality + }) + total_quality += quality.get('quality_score', 0.0) + + avg_quality = total_quality / len(audit_results) if audit_results else 0.0 + + report = { + "website_url": website_url, + "pages_audited": len(audit_results), + "average_quality_score": avg_quality, + "details": audit_results, + "timestamp": datetime.utcnow().isoformat() + } + + logger.info(f"[{self.__class__.__name__}] Site audit completed. Avg Quality: {avg_quality:.2f}") + return report + + except Exception as e: + logger.error(f"[{self.__class__.__name__}] Site audit failed: {e}") + return {"error": str(e)} + + async def safety_filter(self, text: str) -> Dict[str, Any]: + """ + Tool: Flags potentially harmful, offensive, or sensitive content. + """ + self._log_agent_operation("Running safety filter", text_length=len(text)) + + try: + # Basic Keyword Blocklist (Placeholder for LLM/Safety Model) + # In production, this should call a dedicated safety API (e.g., OpenAI Moderation, Llama Guard) + unsafe_keywords = [ + "hate", "kill", "murder", "attack", "destroy", # Violent + "scam", "fraud", "steal", # Illegal + "explicit", "adult" # NSFW + ] + + found_flags = [] + text_lower = text.lower() + + for keyword in unsafe_keywords: + if f" {keyword} " in text_lower: # Simple word boundary check + found_flags.append(keyword) + + is_safe = len(found_flags) == 0 + + return { + "is_safe": is_safe, + "flags": found_flags, + "safety_score": 1.0 if is_safe else 0.0, + "action": "approve" if is_safe else "flag_for_review" + } + + except Exception as e: + logger.error(f"[{self.__class__.__name__}] Safety filter failed: {e}") + return {"error": str(e)} + +class LinkGraphAgent(SIFBaseAgent): + """ + Agent for internal link suggestions, graph management, and authority analysis. + Implements the semantic link graph using SIF and GSC/Bing data. + """ + + RELEVANCE_THRESHOLD = 0.6 # Minimum relevance score for link suggestions + MAX_SUGGESTIONS = 10 # Maximum number of link suggestions + + def __init__(self, intelligence_service: TxtaiIntelligenceService, user_id: str, sif_service: Any = None): + super().__init__(intelligence_service, user_id, agent_type="link_graph") + self.sif_service = sif_service + + async def suggest_internal_links(self, draft: str) -> List[Dict[str, Any]]: + """Suggest internal links based on semantic proximity and authority.""" + return await self.link_suggester(draft) + + async def link_suggester(self, draft: str) -> List[Dict[str, Any]]: + """ + Tool: Suggests internal links. + Analyzes draft content and finds semantically relevant pages, boosted by authority. + """ + self._log_agent_operation("Suggesting internal links", draft_length=len(draft)) + + try: + if not self.intelligence.is_initialized(): + logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized") + return [] + + if not draft or len(draft.strip()) < 50: # Reduced threshold for testing + logger.warning(f"[{self.__class__.__name__}] Draft too short for meaningful link suggestions") + return [] + + # 1. Get Semantic Candidates + results = await self.intelligence.search(draft, limit=self.MAX_SUGGESTIONS) + + if not results: + logger.info(f"[{self.__class__.__name__}] No relevant internal pages found") + return [] + + # 2. Get Authority Data (if available) + authority_map = {} + if self.sif_service: + try: + # Fetch dashboard context to get top performing content + # Note: This relies on what's available in the SIF index/dashboard summary + dashboard_context = await self.sif_service.get_seo_dashboard_context() + + if "error" not in dashboard_context: + # Extract top queries/pages if available in summary + # Ideally, we'd have a map of URL -> Authority Score + # For now, we'll try to extract what we can + data = dashboard_context.get("dashboard_data", {}) + summary = data.get("summary", {}) + + # Example: Boost if site health is good (general confidence) + site_health = data.get("health_score", {}).get("score", 0) + + # If we had top pages in the summary, we'd use them. + # For now, we'll use a placeholder authority map or just the site health + pass + except Exception as e: + logger.warning(f"Failed to fetch authority data: {e}") + + suggestions = [] + for result in results: + relevance_score = result.get('score', 0.0) + url = result.get('id', 'unknown') + + # Apply authority boost (placeholder logic) + # In a full implementation, we'd look up 'url' in authority_map + authority_boost = 1.0 + + final_score = relevance_score * authority_boost + + if final_score >= self.RELEVANCE_THRESHOLD: + suggestion = { + "url": url, + "relevance": relevance_score, + "final_score": final_score, + "confidence": self._calculate_link_confidence(final_score), + "reason": f"Semantic similarity: {relevance_score:.3f}" + } + suggestions.append(suggestion) + logger.debug(f"[{self.__class__.__name__}] Added link suggestion: {url} (score: {final_score:.3f})") + + # Sort by final score + suggestions.sort(key=lambda x: x['final_score'], reverse=True) + + logger.info(f"[{self.__class__.__name__}] Generated {len(suggestions)} internal link suggestions") + return suggestions + + except Exception as e: + logger.error(f"[{self.__class__.__name__}] Failed to suggest internal links: {e}") + logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") + return [] + + async def graph_builder(self) -> Dict[str, Any]: + """ + Tool: Builds/Visualizes the semantic link graph. + Returns the structure of the graph (nodes and edges) for visualization or analysis. + """ + self._log_agent_operation("Building semantic link graph") + + try: + if not self.intelligence.is_initialized(): + return {"error": "Intelligence service not initialized"} + + # This is a resource-intensive operation in a real vector DB. + # Here we simulate the graph structure based on recent content or clusters. + + # 1. Get Clusters (Nodes) + clusters = await self.intelligence.cluster(min_score=0.5) + + nodes = [] + edges = [] + + for i, cluster in enumerate(clusters): + cluster_id = f"cluster_{i}" + nodes.append({ + "id": cluster_id, + "type": "topic_cluster", + "size": len(cluster) + }) + + # Add content items as nodes linked to cluster + for item_idx in cluster: + # We need to retrieve item metadata. + # txtai cluster returns indices. We might need to query by index or ID. + # For this implementation, we'll return a simplified view. + pass + + return { + "graph_stats": { + "total_clusters": len(clusters), + "total_nodes": sum(len(c) for c in clusters) + }, + "structure": "hierarchical", # vs flat + "timestamp": datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"[{self.__class__.__name__}] Failed to build graph: {e}") + return {"error": str(e)} + + + + async def authority_analyzer(self, target_url: Optional[str] = None) -> Dict[str, Any]: + """ + Tool: Analyzes the authority of the site or specific pages using GSC/Bing data. + """ + self._log_agent_operation("Analyzing authority", target_url=target_url) + + if not self.sif_service: + return {"error": "SIF Service unavailable for authority analysis"} + + try: + # 1. Get Dashboard Context + context = await self.sif_service.get_seo_dashboard_context() + + if "error" in context: + return context + + data = context.get("dashboard_data", {}) + summary = data.get("summary", {}) + health = data.get("health_score", {}) + + # 2. Extract Authority Metrics + authority_report = { + "domain_authority_proxy": { + "health_score": health.get("score"), + "total_clicks": summary.get("clicks"), + "avg_position": summary.get("position") + }, + "page_authority": "Page-level authority requires granular GSC data (Planned)", # Placeholder + "timestamp": datetime.utcnow().isoformat() + } + + return authority_report + + except Exception as e: + logger.error(f"[{self.__class__.__name__}] Authority analysis failed: {e}") + return {"error": str(e)} + + def _calculate_link_confidence(self, relevance_score: float) -> float: + """Calculate confidence score for a link suggestion.""" + # Simple confidence based on relevance score + return min(1.0, relevance_score * 1.5) + + async def optimize_anchor_text(self, target_url: str, context: str) -> str: + """Suggest the best anchor text for a given link based on target page context.""" + self._log_agent_operation("Optimizing anchor text", target_url=target_url, context_length=len(context)) + + try: + # In a real implementation, we would fetch the target page content via SIF + # and use an LLM to generate the anchor text. + + # Placeholder for LLM call + # if self.llm: ... + + logger.info(f"[{self.__class__.__name__}] Anchor text optimization stub completed") + return "relevant anchor text" # Placeholder + + except Exception as e: + logger.error(f"[{self.__class__.__name__}] Failed to optimize anchor text: {e}") + logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") + return "click here" # Fallback anchor text + +class CitationExpert(SIFBaseAgent): + """ + Agent for fact-checking, citation generation, and evidence verification. + """ + + EVIDENCE_THRESHOLD = 0.7 # Minimum relevance score for evidence + MAX_EVIDENCE = 5 # Maximum number of evidence pieces to return + + async def fact_checker(self, claim: str) -> List[Dict[str, Any]]: + """ + Tool: Verifies facts against trusted research data. + Returns supporting or contradicting evidence. + """ + return await self.verify_facts(claim) + + async def citation_finder(self, topic: str) -> List[Dict[str, Any]]: + """ + Tool: Suggests authoritative citations for a given topic. + """ + self._log_agent_operation("Finding citations", topic=topic) + + try: + if not self.intelligence.is_initialized(): + return [] + + # Search for highly relevant content + results = await self.intelligence.search(topic, limit=self.MAX_EVIDENCE) + + citations = [] + for result in results: + relevance = result.get('score', 0.0) + if relevance > 0.6: + citations.append({ + "source": result.get('id'), + "title": result.get('text', '')[:100] + "...", + "relevance": relevance, + "citation_text": f"Source: {result.get('id')} (Relevance: {relevance:.2f})" + }) + + return citations + + except Exception as e: + logger.error(f"[{self.__class__.__name__}] Citation finder failed: {e}") + return [] + + async def claim_verifier(self, content: str) -> Dict[str, Any]: + """ + Tool: Detects unsupported statements and hallucinations. + """ + self._log_agent_operation("Verifying claims in content", content_length=len(content)) + + # 1. Extract potential claims (heuristic: numbers, 'research shows', etc.) + # This is a simplified extraction. A real implementation would use NLP/LLM. + claims = [] + sentences = content.split('.') + for sent in sentences: + if any(char.isdigit() for char in sent) or "show" in sent.lower() or "study" in sent.lower(): + if len(sent.strip()) > 20: + claims.append(sent.strip()) + + if not claims: + return {"status": "no_claims_detected", "verified_claims": []} + + verified_results = [] + for claim in claims[:5]: # Limit to top 5 claims for performance + evidence = await self.verify_facts(claim) + status = "supported" if evidence else "unsupported" + verified_results.append({ + "claim": claim, + "status": status, + "evidence_count": len(evidence), + "top_evidence": evidence[0]['source'] if evidence else None + }) + + return { + "status": "verification_complete", + "total_claims": len(claims), + "verified_claims": verified_results, + "unsupported_count": len([c for c in verified_results if c['status'] == 'unsupported']), + "timestamp": datetime.utcnow().isoformat() + } + + async def verify_facts(self, claim: str) -> List[Dict[str, Any]]: + """Find supporting or contradicting evidence in the indexed research.""" + self._log_agent_operation("Verifying facts", claim_length=len(claim)) + + try: + if not self.intelligence.is_initialized(): + logger.error(f"[{self.__class__.__name__}] Intelligence service not initialized") + return [] + + if not claim or len(claim.strip()) < 20: + logger.warning(f"[{self.__class__.__name__}] Claim too short for meaningful verification") + return [] + + results = await self.intelligence.search(claim, limit=self.MAX_EVIDENCE) + + if not results: + logger.info(f"[{self.__class__.__name__}] No evidence found for claim") + return [] + + evidence = [] + for result in results: + relevance_score = result.get('score', 0.0) + + if relevance_score >= self.EVIDENCE_THRESHOLD: + evidence_piece = { + "source": result.get('id', 'unknown'), + "relevance": relevance_score, + "confidence": self._calculate_evidence_confidence(relevance_score), + "type": "supporting" if relevance_score > 0.8 else "related", + "excerpt": result.get('text', '')[:200] + "..." if len(result.get('text', '')) > 200 else result.get('text', '') + } + evidence.append(evidence_piece) + logger.debug(f"[{self.__class__.__name__}] Found evidence: {evidence_piece['source']} (score: {relevance_score:.3f})") + + logger.info(f"[{self.__class__.__name__}] Found {len(evidence)} pieces of evidence for claim") + return evidence + + except Exception as e: + logger.error(f"[{self.__class__.__name__}] Failed to verify facts: {e}") + logger.error(f"[{self.__class__.__name__}] Full traceback: {traceback.format_exc()}") + return [] + + def _calculate_evidence_confidence(self, relevance_score: float) -> float: + """Calculate confidence score for evidence.""" + # Simple confidence based on relevance score + return min(1.0, relevance_score * 1.2) + +""" +Specialized ALwrity Autonomous Agents +Defines specific agent implementations for Content Strategy, Competitor Response, +SEO Optimization, and Social Amplification. +""" +import json +import logging +import asyncio +from typing import Dict, Any, List, Optional +from datetime import datetime + +# txtai imports +try: + from txtai import Agent, LLM + TXTAI_AVAILABLE = True +except ImportError: + TXTAI_AVAILABLE = False + logging.warning("txtai not available, using fallback implementation") + +from utils.logger_utils import get_service_logger +from services.intelligence.agents.core_agent_framework import BaseALwrityAgent, AgentAction +from services.seo_tools.content_strategy_service import ContentStrategyService + +# Import SIF Integration for real tool capabilities +try: + from services.intelligence.sif_integration import SIFIntegrationService + SIF_AVAILABLE = True +except ImportError: + SIF_AVAILABLE = False + +logger = get_service_logger(__name__) + +class ContentStrategyAgent(BaseALwrityAgent): + """ + Agent responsible for content strategy, gap analysis, and optimization. + """ + + def __init__(self, user_id: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None): + super().__init__(user_id, "content_strategist", model_name, llm) + self.sif_service = None + self.content_strategy_service = ContentStrategyService() + if SIF_AVAILABLE: + try: + self.sif_service = SIFIntegrationService(user_id) + except Exception as e: + logger.warning(f"Failed to initialize SIF service for ContentStrategyAgent: {e}") + + async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]: + """Propose GENERATE pillar tasks.""" + proposals = [] + + # 1. Content Gap Analysis + proposals.append(TaskProposal( + title="Analyze Content Gaps", + description="Identify missing topics in your strategy compared to competitors.", + pillar_id="generate", + priority="high", + estimated_time=30, + source_agent="ContentStrategyAgent", + reasoning="Regular gap analysis ensures competitive relevance.", + action_type="navigate", + action_url="/content-planning-dashboard" + )) + + # 2. Draft New Content + proposals.append(TaskProposal( + title="Draft New Blog Post", + description="Create a new article targeting your primary keywords.", + pillar_id="generate", + priority="medium", + estimated_time=45, + source_agent="ContentStrategyAgent", + reasoning="Maintain publishing consistency.", + action_type="navigate", + action_url="/blog-writer" + )) + + return proposals + + def _create_txtai_agent(self) -> Agent: + """Create Content Strategy Agent using txtai native framework""" + if not TXTAI_AVAILABLE: + return None + + return Agent( + llm=self.llm, + tools=[ + { + "name": "content_analyzer", + "description": "Analyzes content performance and engagement metrics", + "target": self._content_analyzer_tool + }, + { + "name": "semantic_gap_detector", + "description": "Identifies content gaps using semantic analysis", + "target": self._semantic_gap_detector_tool + }, + { + "name": "content_optimizer", + "description": "Optimizes content for better performance", + "target": self._content_optimizer_tool + }, + { + "name": "performance_tracker", + "description": "Tracks content performance over time", + "target": self._content_performance_tracker_tool + }, + { + "name": "sitemap_analyzer", + "description": "Analyzes website structure and publishing velocity via sitemap", + "target": self._sitemap_analyzer_tool + }, + { + "name": "gsc_low_ctr_queries", + "description": "Returns low-CTR queries with evidence from cached GSC metrics", + "target": self._cs_gsc_low_ctr_queries_tool + }, + { + "name": "gsc_striking_distance_queries", + "description": "Returns striking-distance queries (positions ~8–20) with evidence", + "target": self._cs_gsc_striking_distance_tool + }, + { + "name": "gsc_declining_queries", + "description": "Returns period-over-period declining queries with evidence", + "target": self._cs_gsc_declining_queries_tool + }, + { + "name": "gsc_low_ctr_pages", + "description": "Returns low-CTR pages with top contributing queries", + "target": self._cs_gsc_low_ctr_pages_tool + }, + { + "name": "gsc_cannibalization_candidates", + "description": "Returns query→multiple-pages cannibalization candidates with target recommendation", + "target": self._cs_gsc_cannibalization_candidates_tool + }, + { + "name": "default_content_gsc_plan", + "description": "Runs a default first-pass plan using GSC signals (titles/meta, consolidation, refreshes)", + "target": self._default_content_gsc_plan_tool + }, + ], + max_iterations=8, + system=self.get_effective_system_prompt(f"""You are the Content Strategy Agent for ALwrity user {self.user_id}. + + Your mission is to analyze content performance, identify optimization opportunities, + and execute content improvements autonomously. + + Focus on: + - Content gap identification (semantic and structural) + - Topic cluster optimization + - SEO strategy adaptation + - Performance-based content improvements + + Use semantic analysis (SIF) and sitemap analysis to understand content context. + Always prioritize user goals and maintain brand consistency. + + In your first pass, call 'default_content_gsc_plan' to ground your actions on live GSC signals.""" + ) + ) + + # Tool Implementations + + async def _cs_fetch_gsc_analytics(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> Dict[str, Any]: + svc = PlatformAnalyticsService() + data = await svc.get_comprehensive_analytics(self.user_id, platforms=["gsc"], start_date=start_date, end_date=end_date) + gsc = data.get("gsc") + if not gsc or gsc.status != "success": + err = getattr(gsc, "error_message", None) if gsc else "No data" + raise RuntimeError(f"GSC analytics unavailable: {err}") + return {"metrics": gsc.metrics, "date_range": gsc.date_range} + + async def _cs_gsc_low_ctr_queries_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + limit = int(context.get("limit", 10)); min_impr = int(context.get("min_impressions", 100)); min_clicks = int(context.get("min_clicks", 10)); ctr_threshold = float(context.get("ctr_threshold", 1.5)) + start_date = context.get("start_date"); end_date = context.get("end_date") + try: + result = await self._cs_fetch_gsc_analytics(start_date, end_date) + tq = result["metrics"].get("top_queries", []) or [] + items = [ + {"query": r.get("query"), "clicks": r.get("clicks", 0), "impressions": r.get("impressions", 0), "ctr": r.get("ctr", 0.0), "position": r.get("position")} + for r in tq + if (r.get("impressions", 0) >= min_impr and r.get("clicks", 0) >= min_clicks and float(r.get("ctr", 0.0)) < ctr_threshold) + ] + items.sort(key=lambda x: (x.get("impressions", 0), -x.get("ctr", 100.0)), reverse=True) + return {"items": items[:limit], "range": result["date_range"], "source": "gsc_cache"} + except Exception as e: + logger.error(f"cs low_ctr_queries failed: {e}"); return {"error": str(e)} + + async def _cs_gsc_striking_distance_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + limit = int(context.get("limit", 10)); min_impr = int(context.get("min_impressions", 100)); start_date = context.get("start_date"); end_date = context.get("end_date") + try: + result = await self._cs_fetch_gsc_analytics(start_date, end_date) + tq = result["metrics"].get("top_queries", []) or [] + items = [ + {"query": r.get("query"), "clicks": r.get("clicks", 0), "impressions": r.get("impressions", 0), "ctr": r.get("ctr", 0.0), "position": r.get("position")} + for r in tq + if (r.get("impressions", 0) >= min_impr and r.get("position") is not None and 8.0 <= float(r.get("position")) <= 20.0) + ] + items.sort(key=lambda x: (x.get("position") if x.get("position") is not None else 999, -x.get("impressions", 0))) + return {"items": items[:limit], "range": result["date_range"], "source": "gsc_cache"} + except Exception as e: + logger.error(f"cs striking_distance failed: {e}"); return {"error": str(e)} + + async def _cs_gsc_declining_queries_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + limit = int(context.get("limit", 10)); min_prev_clicks = int(context.get("min_prev_clicks", 10)); min_drop_pct = float(context.get("min_drop_pct", 30.0)) + start_date = context.get("start_date"); end_date = context.get("end_date") + try: + curr = await self._cs_fetch_gsc_analytics(start_date, end_date) + curr_range = curr["date_range"]; s = curr_range.get("start"); e = curr_range.get("end") + from datetime import datetime, timedelta; fmt = "%Y-%m-%d" + sd = datetime.strptime(s, fmt) if s else datetime.utcnow() - timedelta(days=30); ed = datetime.strptime(e, fmt) if e else datetime.utcnow() + days = max((ed - sd).days + 1, 1); prev_end = sd - timedelta(days=1); prev_start = prev_end - timedelta(days=days - 1) + prev = await self._cs_fetch_gsc_analytics(prev_start.strftime(fmt), prev_end.strftime(fmt)) + curr_queries = {r.get("query"): r for r in (curr["metrics"].get("top_queries", []) or [])} + prev_queries = {r.get("query"): r for r in (prev["metrics"].get("top_queries", []) or [])} + items = [] + for q, prev_row in prev_queries.items(): + curr_row = curr_queries.get(q); + if not curr_row: continue + prev_clicks = int(prev_row.get("clicks", 0) or 0); curr_clicks = int(curr_row.get("clicks", 0) or 0) + if prev_clicks >= min_prev_clicks and curr_clicks < prev_clicks: + drop_pct = ((prev_clicks - curr_clicks) / prev_clicks) * 100.0 + if drop_pct >= min_drop_pct: + items.append({"query": q, "prev_clicks": prev_clicks, "curr_clicks": curr_clicks, "drop_pct": round(drop_pct, 2)}) + items.sort(key=lambda x: (x.get("drop_pct", 0), x.get("prev_clicks", 0)), reverse=True) + return {"items": items[:limit], "range": curr_range, "previous_range": prev["date_range"], "source": "gsc_cache"} + except Exception as e: + logger.error(f"cs declining_queries failed: {e}"); return {"error": str(e)} + + async def _cs_gsc_low_ctr_pages_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + limit = int(context.get("limit", 10)); min_impr = int(context.get("min_impressions", 200)); ctr_threshold = float(context.get("ctr_threshold", 1.5)) + start_date = context.get("start_date"); end_date = context.get("end_date") + try: + result = await self._cs_fetch_gsc_analytics(start_date, end_date) + tp = result["metrics"].get("top_pages", []) or [] + items = [] + for r in tp: + if (r.get("impressions", 0) >= min_impr and float(r.get("ctr", 0.0)) < ctr_threshold): + items.append({"page": r.get("page"), "clicks": r.get("clicks", 0), "impressions": r.get("impressions", 0), "ctr": r.get("ctr", 0.0), "position": r.get("position"), "evidence_queries": r.get("queries", [])[:5]}) + items.sort(key=lambda x: (x.get("impressions", 0), -x.get("ctr", 100.0)), reverse=True) + return {"items": items[:limit], "range": result["date_range"], "source": "gsc_cache"} + except Exception as e: + logger.error(f"cs low_ctr_pages failed: {e}"); return {"error": str(e)} + + async def _cs_gsc_cannibalization_candidates_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + limit = int(context.get("limit", 10)); start_date = context.get("start_date"); end_date = context.get("end_date") + try: + result = await self._cs_fetch_gsc_analytics(start_date, end_date) + candidates = result["metrics"].get("cannibalization", []) or [] + return {"items": candidates[:limit], "range": result["date_range"], "source": "gsc_cache"} + except Exception as e: + logger.error(f"cs cannibalization_candidates failed: {e}"); return {"error": str(e)} + + async def _default_content_gsc_plan_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + start_date = context.get("start_date"); end_date = context.get("end_date") + try: + low_ctr_pages = await self._cs_gsc_low_ctr_pages_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) + cannibals = await self._cs_gsc_cannibalization_candidates_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) + striking = await self._cs_gsc_striking_distance_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) + declining = await self._cs_gsc_declining_queries_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) + + actions = [] + for p in low_ctr_pages.get("items", []): + actions.append({ + "type": "improve_titles_meta", + "target": p.get("page"), + "reason": f"Low CTR {p.get('ctr')}% with {p.get('impressions')} impressions", + "evidence": p.get("evidence_queries", []) + }) + for c in cannibals.get("items", []): + actions.append({ + "type": "consolidate/internal_link", + "target": c.get("recommended_target_page"), + "reason": f"Cannibalization on query '{c.get('query')}'", + "pages": c.get("pages", []) + }) + for q in striking.get("items", []): + actions.append({ + "type": "refresh_content", + "target": "query", + "query": q.get("query"), + "reason": f"Striking distance at position {q.get('position')} with {q.get('impressions')} impressions" + }) + for q in declining.get("items", []): + actions.append({ + "type": "refresh_content", + "target": "query", + "query": q.get("query"), + "reason": f"Clicks decline {q.get('prev_clicks')}→{q.get('curr_clicks')} ({q.get('drop_pct')}%)" + }) + + return { + "plan_name": "Default Content Plan from GSC", + "range": {"current": {"start": start_date, "end": end_date}}, + "actions": actions, + "source": "gsc_cache", + "timestamp": datetime.utcnow().isoformat() + } + except Exception as e: + logger.error(f"default_content_gsc_plan failed: {e}") + return {"error": str(e)} + + async def _sitemap_analyzer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """Sitemap analysis tool using ContentStrategyService""" + website_url = context.get('website_url') + competitors = context.get('competitors', []) + + if not website_url: + return {"error": "Website URL required for sitemap analysis"} + + try: + result = await self.content_strategy_service.analyze_content_strategy( + website_url=website_url, + competitors=competitors, + user_id=self.user_id + ) + return { + "sitemap_insights": result.get("deterministic_insights", {}), + "ai_strategy": result.get("ai_strategy", {}), + "timestamp": datetime.utcnow().isoformat() + } + except Exception as e: + logger.error(f"Sitemap analysis failed: {e}") + return {"error": str(e)} + + async def _content_analyzer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Content analysis tool with GSC integration. + Analyzes content performance using SIF insights and Google Search Console data. + """ + website_data = context.get('website_data', {}) + + # 1. SIF Semantic Analysis + sif_insights = {} + if self.sif_service: + try: + result = await self.sif_service.get_semantic_insights(website_data) + sif_insights = result.get('insights', {}) + except Exception as e: + logger.error(f"SIF content analysis failed: {e}") + + # 2. GSC Data Integration (Mock/Placeholder as per Phase 3A.2) + # In a real implementation, this would call a GSCService + gsc_data = { + "clicks": 1250, + "impressions": 45000, + "ctr": 2.8, + "position": 14.5, + "top_queries": [ + {"query": "ai content strategy", "clicks": 150, "position": 3.2}, + {"query": "seo automation", "clicks": 120, "position": 4.1} + ], + "underperforming_pages": [ + {"url": "/blog/old-post-1", "issue": "High impressions, low CTR"}, + {"url": "/blog/weak-content", "issue": "Declining traffic"} + ] + } + + # 3. Correlate Semantic Topics with GSC Performance + content_gaps = [] + if sif_insights and gsc_data: + # Example correlation logic + semantic_topics = sif_insights.get('content_pillars', []) + gsc_queries = [q['query'] for q in gsc_data['top_queries']] + + # Simple set difference to find topics with no traffic + for topic in semantic_topics: + if not any(topic.lower() in q.lower() for q in gsc_queries): + content_gaps.append(f"Topic '{topic}' has content but low search visibility") + + return { + "content_analysis": "Completed via SIF + GSC Integration", + "sif_insights": sif_insights, + "gsc_performance": gsc_data, + "identified_gaps": content_gaps, + "strategic_recommendations": sif_insights.get('strategic_recommendations', []) + ["Optimize meta descriptions for underperforming pages"], + "timestamp": datetime.utcnow().isoformat() + } + + async def _content_optimizer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Content optimization tool using LLM-based rewriting and semantic analysis. + Generates specific diffs/rewrites for content. + """ + content = context.get('content') + target_keywords = context.get('target_keywords', []) + focus_topic = context.get('focus_topic') + optimization_goal = context.get('goal', 'readability') # readability, seo, conversion + + if not content: + return {"error": "No content provided for optimization"} + + try: + # System prompt optimized for specific rewrites + system_prompt = f"""You are an expert Content Editor. + Task: Optimize the following text for '{focus_topic}' with goal: {optimization_goal}. + Keywords to include: {', '.join(target_keywords)}. + + Return a JSON object with: + 1. "original_snippet": A short snippet of the original text. + 2. "optimized_version": The fully rewritten version. + 3. "changes_explained": A list of specific changes made (e.g., "Added keyword 'X' in first sentence"). + 4. "diff_summary": A brief summary of why this version is better. + + Maintain the original meaning and tone. + """ + + if self.llm: + # We assume the LLM returns JSON-like text or we parse it + response = await self._generate_llm_response(f"{system_prompt}\n\nText to rewrite:\n{content}") + + # Simple parsing fallback if LLM returns raw text + if isinstance(response, str) and not response.strip().startswith("{"): + optimized_content = response + changes = ["Rewrote for clarity", "Integrated keywords"] + else: + # Try to parse JSON if model is good + try: + import json + data = json.loads(response) + optimized_content = data.get("optimized_version", response) + changes = data.get("changes_explained", []) + except: + optimized_content = response + changes = ["Optimization applied"] + else: + optimized_content = f"[Mock Rewrite]: Optimized '{content[:30]}...' for {focus_topic}" + changes = ["Mock optimization applied"] + + return { + "original_content_snippet": content[:50] + "...", + "optimized_content": optimized_content, + "changes_made": changes, + "expected_impact": "Higher relevance score and better readability", + "timestamp": datetime.utcnow().isoformat() + } + except Exception as e: + logger.error(f"Content optimization failed: {e}") + return {"error": str(e)} + + async def _content_performance_tracker_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Content performance tracking tool. + Persists metrics to monitor content health over time. + """ + content_id = context.get('content_id') or context.get('url') + metrics = context.get('metrics', {}) + + if not content_id: + return {"error": "Content ID or URL required for tracking"} + + # Simulate persistence (In real app, save to DB table 'content_performance_history') + # We can use the AgentPerformanceMonitor for generic metrics, but this is content-specific. + + tracking_record = { + "content_id": content_id, + "date": datetime.utcnow().date().isoformat(), + "metrics": metrics, + "health_score": self._calculate_content_health(metrics) + } + + # Log it for now as "persistence" + logger.info(f"Persisting content performance for {content_id}: {tracking_record}") + + return { + "status": "recorded", + "tracking_record": tracking_record, + "trend": "stable", # Would calculate based on history + "timestamp": datetime.utcnow().isoformat() + } + + def _calculate_content_health(self, metrics: Dict[str, Any]) -> float: + """Calculate a 0-100 health score based on metrics""" + # Simple heuristic + views = metrics.get('views', 0) + engagement = metrics.get('engagement_rate', 0) + + score = min(100, (views / 1000) * 10 + (engagement * 100)) + return round(score, 2) + + +class CompetitorResponseAgent(BaseALwrityAgent): + """ + Agent responsible for monitoring competitors and generating counter-strategies. + """ + + def __init__(self, user_id: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None): + super().__init__(user_id, "competitor_analyst", model_name, llm) + + self.sif_service = None + if SIF_AVAILABLE: + try: + self.sif_service = SIFIntegrationService(user_id) + except Exception as e: + logger.warning(f"Failed to initialize SIF service for CompetitorResponseAgent: {e}") + + async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]: + """Propose REMARKET pillar tasks.""" + proposals = [] + + # 1. Competitor Monitoring + proposals.append(TaskProposal( + title="Monitor Competitor Activity", + description="Check for new moves from your key competitors.", + pillar_id="remarket", + priority="medium", + estimated_time=15, + source_agent="CompetitorResponseAgent", + reasoning="Stay ahead of market changes.", + action_type="navigate", + action_url="/seo-dashboard" + )) + + return proposals + + def _create_txtai_agent(self) -> Agent: + """Create Competitor Response Agent using txtai native framework""" + if not TXTAI_AVAILABLE: + return None + + return Agent( + llm=self.llm, + tools=[ + { + "name": "competitor_monitor", + "description": "Monitors competitor content and strategy changes via SIF", + "target": self._competitor_monitor_tool + }, + { + "name": "threat_analyzer", + "description": "Analyzes competitive threats and opportunities based on SIF data", + "target": self._threat_analyzer_tool + }, + { + "name": "response_generator", + "description": "Generates counter-strategy recommendations", + "target": self._response_generator_tool + }, + { + "name": "strategy_executor", + "description": "Executes competitive response strategies", + "target": self._strategy_executor_tool + } + ], + max_iterations=12, + system=self.get_effective_system_prompt(f"""You are the Competitor Response Agent for ALwrity user {self.user_id}. + + Your mission is to monitor competitor activities, assess competitive threats, + and generate counter-strategies autonomously using SIF insights. + + Responsibilities: + - Real-time competitor content monitoring via SIF + - Competitive threat assessment + - Counter-strategy generation + - Rapid response deployment + + Use semantic analysis to understand competitor positioning and identify gaps. + Respond quickly to competitive threats while maintaining strategic advantage.""" + ) + ) + + # Tool Implementations + + async def _competitor_monitor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Competitor monitoring tool that retrieves data via SIF. + Expects 'competitor_url' (optional) in context to filter. + """ + competitor_url = context.get('competitor_url') + + if not self.sif_service: + return {"error": "SIF Service unavailable, cannot retrieve competitor data."} + + try: + logger.info(f"Retrieving Competitor data via SIF for user {self.user_id}") + result = await self.sif_service.get_competitor_context(competitor_url) + + if "error" in result and result.get("source") == "empty": + return {"error": "No competitor data found. Please complete onboarding Step 3."} + + competitors = result.get("competitors", []) + changes = [] + + for comp in competitors: + # In a real scenario, we would compare with previous snapshots. + # Here we extract highlights from the current analysis. + summary = comp.get("summary", "") + highlights = comp.get("highlights", []) + changes.append({ + "url": comp.get("competitor_url"), # Or however it's stored in analysis_data + "summary_snippet": summary[:100] + "...", + "highlights": highlights[:3] + }) + + return { + "competitor_changes": changes, + "data_source": "sif_index", + "count": len(competitors), + "timestamp": datetime.utcnow().isoformat() + } + except Exception as e: + logger.error(f"Competitor monitoring failed: {e}") + return {"error": str(e)} + + async def _threat_analyzer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Threat analysis tool using SIF data. + """ + # Get data from monitor tool or context + competitor_data = context.get('competitor_changes', []) + + # If not provided, fetch it + if not competitor_data and self.sif_service: + monitor_result = await self._competitor_monitor_tool(context) + competitor_data = monitor_result.get("competitor_changes", []) + + if not competitor_data: + return {"threat_assessment": "No data available for analysis", "level": "unknown"} + + # Simple rule-based or LLM-based analysis + # For now, we simulate a threat assessment based on highlights + threats = [] + overall_level = "low" + + for comp in competitor_data: + highlights = comp.get("highlights", []) + # Heuristic: If highlights mention "launch", "new", "pricing" -> Higher threat + level = "low" + risk_factors = [] + + full_text = " ".join(highlights).lower() + if "new feature" in full_text or "launch" in full_text: + level = "high" + risk_factors.append("New Product Launch") + elif "pricing" in full_text: + level = "medium" + risk_factors.append("Pricing Change") + + if level == "high": overall_level = "high" + elif level == "medium" and overall_level != "high": overall_level = "medium" + + threats.append({ + "competitor": comp.get("url"), + "level": level, + "risk_factors": risk_factors + }) + + return { + "threat_assessment": f"{overall_level.title()} threat level detected.", + "threat_level": overall_level, + "details": threats, + "timestamp": datetime.utcnow().isoformat() + } + + async def _response_generator_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """Response generation tool""" + threat_level = context.get("threat_level", "low") + threat_details = context.get("details", []) + + strategies = [] + if threat_level == "high": + strategies = ["Launch counter-campaign immediately", "Highlight USP differentiation"] + elif threat_level == "medium": + strategies = ["Monitor closely", "Prepare comparison content"] + else: + strategies = ["Continue current strategy", "Look for gap opportunities"] + + return { + "counter_strategies": strategies, + "priority": "high" if threat_level == "high" else "medium", + "timestamp": datetime.utcnow().isoformat() + } + + async def _strategy_executor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """Strategy execution tool""" + # In a real agent, this might trigger the Content Agent to write a post. + strategies = context.get("counter_strategies", []) + + execution_log = [] + for strategy in strategies: + execution_log.append(f"Scheduled: {strategy}") + + return { + "execution_status": "completed", + "actions_taken": execution_log, + "timestamp": datetime.utcnow().isoformat() + } + + +class SEOOptimizationAgent(BaseALwrityAgent): + """ + Agent responsible for technical SEO, keyword strategy, and performance optimization. + """ + + def __init__(self, user_id: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None): + super().__init__(user_id, "seo_specialist", model_name, llm) + + self.sif_service = None + if SIF_AVAILABLE: + try: + self.sif_service = SIFIntegrationService(user_id) + except Exception as e: + logger.warning(f"Failed to initialize SIF service for SEOOptimizationAgent: {e}") + + async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]: + """Propose ANALYZE pillar tasks.""" + proposals = [] + + # 1. Technical Audit + proposals.append(TaskProposal( + title="Review SEO Health", + description="Check for critical technical issues affecting your search visibility.", + pillar_id="analyze", + priority="high", + estimated_time=20, + source_agent="SEOOptimizationAgent", + reasoning="Regular health checks prevent traffic drops.", + action_type="navigate", + action_url="/seo-dashboard" + )) + + # 2. Keyword Opportunities + proposals.append(TaskProposal( + title="Optimize Underperforming Keywords", + description="Identify keywords where you rank on page 2 and optimize content to boost them.", + pillar_id="analyze", + priority="medium", + estimated_time=40, + source_agent="SEOOptimizationAgent", + reasoning="Low-hanging fruit for traffic growth.", + action_type="navigate", + action_url="/seo-dashboard" + )) + + return proposals + + def _create_txtai_agent(self) -> Agent: + """Create SEO Optimization Agent using txtai native framework""" + if not TXTAI_AVAILABLE: + return None + + return Agent( + llm=self.llm, + tools=[ + { + "name": "seo_auditor", + "description": "Performs comprehensive SEO audits", + "target": self._seo_auditor_tool + }, + { + "name": "issue_prioritizer", + "description": "Prioritizes SEO issues by impact and effort", + "target": self._issue_prioritizer_tool + }, + { + "name": "auto_fix_executor", + "description": "Automatically fixes high-impact SEO issues", + "target": self._auto_fix_executor_tool + }, + { + "name": "strategy_generator", + "description": "Generates SEO improvement strategies", + "target": self._strategy_generator_tool + }, + { + "name": "query_seo_knowledge_base", + "description": "Queries the SIF knowledge base for SEO dashboard data, GSC/Bing metrics, and semantic insights", + "target": self._query_seo_knowledge_base_tool + }, + { + "name": "gsc_low_ctr_queries", + "description": "Returns low-CTR queries with evidence from cached GSC metrics", + "target": self._gsc_low_ctr_queries_tool + }, + { + "name": "gsc_striking_distance_queries", + "description": "Returns striking-distance queries (positions ~8–20) with evidence", + "target": self._gsc_striking_distance_tool + }, + { + "name": "gsc_declining_queries", + "description": "Returns period-over-period declining queries with evidence", + "target": self._gsc_declining_queries_tool + }, + { + "name": "gsc_low_ctr_pages", + "description": "Returns low-CTR pages with top contributing queries", + "target": self._gsc_low_ctr_pages_tool + }, + { + "name": "gsc_cannibalization_candidates", + "description": "Returns query→multiple-pages cannibalization candidates with target recommendation", + "target": self._gsc_cannibalization_candidates_tool + }, + { + "name": "default_seo_gsc_plan", + "description": "Runs a default first-pass SEO plan using GSC signals (titles/meta, consolidation, refreshes)", + "target": self._default_seo_gsc_plan_tool + }, + ], + max_iterations=15, + system=self.get_effective_system_prompt(f"""You are the SEO Optimization Agent for ALwrity user {self.user_id}. + + Your mission is to perform continuous SEO audits, prioritize fixes by impact, + and execute optimizations autonomously. + + Capabilities: + - Technical SEO issue detection and fixing + - Keyword strategy dynamic adjustment + - SERP position optimization + - Backlink opportunity identification + - Deep semantic search of SEO data (GSC, Bing, Audits) + + Focus on high-impact, low-effort optimizations first. + In your first pass, call 'default_seo_gsc_plan' to ground your actions on live GSC signals. + Always maintain SEO best practices and user experience.""" + ) + ) + + # Tool Implementations + + async def _query_seo_knowledge_base_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Queries the SIF knowledge base for SEO insights. + Combines website analysis, competitor data, and SEO dashboard metrics. + """ + query = context.get('query') + website_url = context.get('website_url') + + if not query: + return {"error": "Query required for knowledge base search"} + + if not self.sif_service: + return {"error": "SIF Service unavailable"} + + try: + logger.info(f"Querying SEO knowledge base: {query}") + + # 1. Search General Context (Website Analysis) + seo_context = await self.sif_service.get_seo_context(website_url) + + # 2. Search Dashboard Context (GSC/Bing) + dashboard_context = await self.sif_service.get_seo_dashboard_context() + + # 3. Perform specific semantic search for the query + search_results = await self.sif_service.intelligence_service.search(query, limit=3) + + # Combine all contexts + combined_context = { + "query": query, + "seo_audit_context": seo_context.get("seo_audit", {}), + "dashboard_metrics": dashboard_context.get("dashboard_data", {}).get("summary", {}), + "dashboard_insights": dashboard_context.get("dashboard_data", {}).get("ai_insights", []), + "semantic_search_results": [r.get('text', '') for r in search_results], + "timestamp": datetime.utcnow().isoformat() + } + + return combined_context + + except Exception as e: + logger.error(f"SEO knowledge base query failed: {e}") + return {"error": str(e)} + + async def _seo_auditor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """ + SEO audit tool that retrieves existing SEO data via SIF. + Expects 'website_url' in context. + """ + website_url = context.get('website_url') + + # Lightweight Crawler (Fallback/Supplement) + crawler_data = {} + try: + # We import here to avoid dependency issues if not installed + import aiohttp + from bs4 import BeautifulSoup + + async with aiohttp.ClientSession() as session: + async with session.get(website_url, timeout=10) as resp: + if resp.status == 200: + html = await resp.text() + soup = BeautifulSoup(html, 'html.parser') + + # Basic Checks + title = soup.title.string if soup.title else None + desc = soup.find('meta', attrs={'name': 'description'}) + desc_content = desc['content'] if desc else None + h1s = [h.get_text().strip() for h in soup.find_all('h1')] + links = [a.get('href') for a in soup.find_all('a', href=True)] + + # Enhanced Image Analysis for Auto-fix + images_missing_alt = [] + for img in soup.find_all('img'): + if not img.get('alt'): + # Get surrounding context (parent text) + parent_text = img.parent.get_text().strip()[:200] if img.parent else "" + images_missing_alt.append({ + "src": img.get('src', ''), + "context": parent_text + }) + + crawler_data = { + "title_tag": title, + "meta_description": desc_content, + "h1_count": len(h1s), + "h1_content": h1s, + "internal_links_count": len([l for l in links if l.startswith('/') or website_url in l]), + "images_missing_alt_count": len(images_missing_alt), + "images_missing_alt_details": images_missing_alt + } + except Exception as e: + logger.warning(f"Lightweight crawler failed: {e}") + + if not self.sif_service: + # If SIF is down, return crawler data if available + if crawler_data: + return { + "audit_summary": {"technical_health": crawler_data}, + "data_source": "lightweight_crawler", + "timestamp": datetime.utcnow().isoformat() + } + return {"error": "SIF Service unavailable, cannot retrieve SEO data."} + + try: + logger.info(f"Retrieving SEO data via SIF for {website_url}") + result = await self.sif_service.get_seo_context(website_url) + + if "error" in result and result.get("source") == "empty": + # Fallback to crawler data if SIF is empty + if crawler_data: + return { + "audit_summary": {"technical_health": crawler_data, "note": "SIF empty, using live crawl"}, + "data_source": "lightweight_crawler_fallback", + "timestamp": datetime.utcnow().isoformat() + } + return {"error": "No SEO data found. Please ensure onboarding is complete or wait for scheduled analysis."} + + # Format the data for the agent + audit_report = { + "technical_health": result.get("seo_audit", {}).get("technical_issues", []), + "live_crawl_check": crawler_data, # Enrich with live data + "crawl_stats": result.get("crawl_result", {}).get("crawl_summary", {}), + "sitemap_status": "Available" if result.get("sitemap_analysis") else "Unknown", + "core_web_vitals": result.get("pagespeed_data", {}).get("core_web_vitals", {}), + "timestamp": result.get("analysis_date", datetime.utcnow().isoformat()) + } + + return { + "audit_summary": audit_report, + "data_source": "database_via_sif", + "full_context": result # Provide full context for deep analysis if needed + } + + except Exception as e: + logger.error(f"SEO audit retrieval failed: {e}") + return {"error": str(e)} + + async def _issue_prioritizer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """Issue prioritization tool""" + # In a real scenario, this would take the raw_results from the auditor and rank them. + # For now, we simulate this based on the audit summary if available. + + audit_summary = context.get('audit_summary', {}) + issues = [] + + # Extract issues from audit summary if available + if audit_summary: + tech_issues = audit_summary.get('technical_health', []) + # Handle SIF structured issues + if isinstance(tech_issues, list): + for issue in tech_issues: + if isinstance(issue, dict): + issues.append({"issue": issue.get('type', 'Unknown Issue'), "impact": "High" if issue.get('severity') == "High" else "Medium"}) + + # Handle Live Crawl issues + live_crawl = audit_summary.get('live_crawl_check', {}) + if live_crawl: + if not live_crawl.get('title_tag'): + issues.append({"issue": "Missing Title Tag", "impact": "Critical"}) + if not live_crawl.get('meta_description'): + issues.append({"issue": "Missing Meta Description", "impact": "High"}) + if live_crawl.get('h1_count', 0) == 0: + issues.append({"issue": "Missing H1 Tag", "impact": "High"}) + if live_crawl.get('h1_count', 0) > 1: + issues.append({"issue": "Multiple H1 Tags", "impact": "Medium"}) + + missing_alt_count = live_crawl.get('images_missing_alt_count', live_crawl.get('images_missing_alt', 0)) + if missing_alt_count > 0: + issues.append({ + "issue": f"{missing_alt_count} Images Missing Alt Text", + "impact": "Medium", + "details": live_crawl.get('images_missing_alt_details', []) + }) + + perf_score = audit_summary.get('performance_score', 100) + if perf_score < 50: + issues.append({"issue": "Critical Performance Issues", "impact": "High"}) + elif perf_score < 80: + issues.append({"issue": "Performance Optimization Needed", "impact": "Medium"}) + else: + issues = [{"issue": "Missing meta tags", "impact": "Medium"}, {"issue": "Slow loading", "impact": "High"}] + + return { + "prioritized_issues": [i["issue"] for i in issues], + "details": issues, + "timestamp": datetime.utcnow().isoformat() + } + + async def _auto_fix_executor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Auto-fix execution tool. + Generates ready-to-apply patches for identified issues using LLM. + """ + issues = context.get('issues', []) + page_content = context.get('page_content', '') # Content to analyze for generating tags + + # If page_content is missing/empty, try to infer or fallback + if not page_content: + logger.warning("No page_content provided to auto_fix_executor. Fixes may be generic.") + page_content = "Content unavailable for analysis." + + patches = [] + + for issue in issues: + issue_data = issue if isinstance(issue, dict) else {"issue": issue} + issue_name = issue_data.get('issue', str(issue)) + + try: + if "Missing Meta Description" in issue_name: + prompt = f"""Generate a concise, SEO-optimized meta description (max 160 chars) for a page with this content: + + {page_content[:1500]} + + Return ONLY the meta description text. + """ + description = await self._generate_llm_response(prompt) + + patches.append({ + "type": "meta_description", + "action": "insert", + "content": description.strip('"'), + "target": "
" + }) + + elif "Missing Title Tag" in issue_name: + prompt = f"""Generate a compelling, SEO-optimized title tag (max 60 chars) for a page with this content: + + {page_content[:1500]} + + Return ONLY the title tag text. + """ + title = await self._generate_llm_response(prompt) + + patches.append({ + "type": "title_tag", + "action": "insert", + "content": title.strip('"'), + "target": "" + }) + + elif "Missing Alt Text" in issue_name: + # Handle multiple images if details are provided + details = issue_data.get('details', []) + if not details: + # Fallback for generic issue without details + patches.append({ + "type": "alt_text", + "action": "update", + "details": "Manual review required: Missing image details for auto-fix.", + "count": 1 + }) + else: + for img in details: + context_text = img.get('context', '') + src = img.get('src', '') + + prompt = f"""Generate a short, descriptive alt text for an image on a webpage. + + Surrounding Text Context: "{context_text}" + Image Filename/Source: "{src}" + + Return ONLY the alt text. + """ + alt_text = await self._generate_llm_response(prompt) + + patches.append({ + "type": "alt_text", + "action": "update", + "target_src": src, + "content": alt_text.strip('"') + }) + + elif "Missing H1 Tag" in issue_name: + prompt = f"""Generate a main H1 heading for this page content: + + {page_content[:1500]} + + Return ONLY the H1 text. + """ + h1_text = await self._generate_llm_response(prompt) + + patches.append({ + "type": "h1_tag", + "action": "insert", + "content": h1_text.strip('"'), + "target": "" + }) + + except Exception as e: + logger.error(f"Failed to generate fix for issue '{issue_name}': {e}") + patches.append({ + "issue": issue_name, + "status": "failed", + "error": str(e) + }) + + return { + "fixes_generated": len(patches), + "patches": patches, + "status": "ready_for_review", + "timestamp": datetime.utcnow().isoformat() + } + + + async def _strategy_generator_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """SEO strategy generation tool""" + audit_results = context.get("audit_results", {}) + prioritized_issues = context.get("prioritized_issues", []) + + strategies = [] + if prioritized_issues: + strategies.append(f"Focus on fixing top 3 critical issues: {[i['issue'] for i in prioritized_issues[:3]]}") + + strategies.append("Optimize for long-tail keywords based on gap analysis") + + return { + "seo_strategy": strategies, + "next_steps": ["Execute auto-fixes", "Review content gaps"], + "timestamp": datetime.utcnow().isoformat() + } + + # GSC Insights Tools (Option B) + async def _fetch_gsc_analytics(self, start_date: Optional[str] = None, end_date: Optional[str] = None) -> Dict[str, Any]: + svc = PlatformAnalyticsService() + data = await svc.get_comprehensive_analytics(self.user_id, platforms=["gsc"], start_date=start_date, end_date=end_date) + gsc = data.get("gsc") + if not gsc or gsc.status != "success": + err = getattr(gsc, "error_message", None) if gsc else "No data" + raise RuntimeError(f"GSC analytics unavailable: {err}") + return { + "metrics": gsc.metrics, + "date_range": gsc.date_range + } + + async def _gsc_low_ctr_queries_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + limit = int(context.get("limit", 10)) + min_impr = int(context.get("min_impressions", 100)) + min_clicks = int(context.get("min_clicks", 10)) + ctr_threshold = float(context.get("ctr_threshold", 1.5)) + start_date = context.get("start_date") + end_date = context.get("end_date") + try: + result = await self._fetch_gsc_analytics(start_date, end_date) + tq = result["metrics"].get("top_queries", []) or [] + items = [ + { + "query": r.get("query"), + "clicks": r.get("clicks", 0), + "impressions": r.get("impressions", 0), + "ctr": r.get("ctr", 0.0), + "position": r.get("position") + } + for r in tq + if (r.get("impressions", 0) >= min_impr and r.get("clicks", 0) >= min_clicks and float(r.get("ctr", 0.0)) < ctr_threshold) + ] + items.sort(key=lambda x: (x.get("impressions", 0), -x.get("ctr", 100.0)), reverse=True) + return { + "items": items[:limit], + "range": result["date_range"], + "source": "gsc_cache" + } + except Exception as e: + logger.error(f"low_ctr_queries tool failed: {e}") + return {"error": str(e)} + + async def _gsc_striking_distance_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + limit = int(context.get("limit", 10)) + min_impr = int(context.get("min_impressions", 100)) + start_date = context.get("start_date") + end_date = context.get("end_date") + try: + result = await self._fetch_gsc_analytics(start_date, end_date) + tq = result["metrics"].get("top_queries", []) or [] + items = [ + { + "query": r.get("query"), + "clicks": r.get("clicks", 0), + "impressions": r.get("impressions", 0), + "ctr": r.get("ctr", 0.0), + "position": r.get("position") + } + for r in tq + if (r.get("impressions", 0) >= min_impr and r.get("position") is not None and 8.0 <= float(r.get("position")) <= 20.0) + ] + items.sort(key=lambda x: (x.get("position") if x.get("position") is not None else 999, -x.get("impressions", 0))) + return { + "items": items[:limit], + "range": result["date_range"], + "source": "gsc_cache" + } + except Exception as e: + logger.error(f"striking_distance tool failed: {e}") + return {"error": str(e)} + + async def _gsc_declining_queries_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + limit = int(context.get("limit", 10)) + min_prev_clicks = int(context.get("min_prev_clicks", 10)) + min_drop_pct = float(context.get("min_drop_pct", 30.0)) + start_date = context.get("start_date") + end_date = context.get("end_date") + try: + curr = await self._fetch_gsc_analytics(start_date, end_date) + curr_range = curr["date_range"] + s = curr_range.get("start") + e = curr_range.get("end") + from datetime import datetime, timedelta + fmt = "%Y-%m-%d" + sd = datetime.strptime(s, fmt) if s else datetime.utcnow() - timedelta(days=30) + ed = datetime.strptime(e, fmt) if e else datetime.utcnow() + days = max((ed - sd).days + 1, 1) + prev_end = sd - timedelta(days=1) + prev_start = prev_end - timedelta(days=days - 1) + prev = await self._fetch_gsc_analytics(prev_start.strftime(fmt), prev_end.strftime(fmt)) + curr_queries = {r.get("query"): r for r in (curr["metrics"].get("top_queries", []) or [])} + prev_queries = {r.get("query"): r for r in (prev["metrics"].get("top_queries", []) or [])} + items = [] + for q, prev_row in prev_queries.items(): + curr_row = curr_queries.get(q) + if not curr_row: + continue + prev_clicks = int(prev_row.get("clicks", 0) or 0) + curr_clicks = int(curr_row.get("clicks", 0) or 0) + if prev_clicks >= min_prev_clicks and curr_clicks < prev_clicks: + drop_pct = ((prev_clicks - curr_clicks) / prev_clicks) * 100.0 + if drop_pct >= min_drop_pct: + items.append({ + "query": q, + "prev_clicks": prev_clicks, + "curr_clicks": curr_clicks, + "drop_pct": round(drop_pct, 2) + }) + items.sort(key=lambda x: (x.get("drop_pct", 0), x.get("prev_clicks", 0)), reverse=True) + return { + "items": items[:limit], + "range": curr_range, + "previous_range": prev["date_range"], + "source": "gsc_cache" + } + except Exception as e: + logger.error(f"declining_queries tool failed: {e}") + return {"error": str(e)} + + async def _gsc_low_ctr_pages_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + limit = int(context.get("limit", 10)) + min_impr = int(context.get("min_impressions", 200)) + ctr_threshold = float(context.get("ctr_threshold", 1.5)) + start_date = context.get("start_date") + end_date = context.get("end_date") + try: + result = await self._fetch_gsc_analytics(start_date, end_date) + tp = result["metrics"].get("top_pages", []) or [] + items = [] + for r in tp: + if (r.get("impressions", 0) >= min_impr and float(r.get("ctr", 0.0)) < ctr_threshold): + items.append({ + "page": r.get("page"), + "clicks": r.get("clicks", 0), + "impressions": r.get("impressions", 0), + "ctr": r.get("ctr", 0.0), + "position": r.get("position"), + "evidence_queries": r.get("queries", [])[:5] + }) + items.sort(key=lambda x: (x.get("impressions", 0), -x.get("ctr", 100.0)), reverse=True) + return { + "items": items[:limit], + "range": result["date_range"], + "source": "gsc_cache" + } + except Exception as e: + logger.error(f"low_ctr_pages tool failed: {e}") + return {"error": str(e)} + + async def _gsc_cannibalization_candidates_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + limit = int(context.get("limit", 10)) + start_date = context.get("start_date") + end_date = context.get("end_date") + try: + result = await self._fetch_gsc_analytics(start_date, end_date) + candidates = result["metrics"].get("cannibalization", []) or [] + return { + "items": candidates[:limit], + "range": result["date_range"], + "source": "gsc_cache" + } + except Exception as e: + logger.error(f"cannibalization_candidates tool failed: {e}") + return {"error": str(e)} + + async def _default_seo_gsc_plan_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + start_date = context.get("start_date") + end_date = context.get("end_date") + try: + low_ctr_pages = await self._gsc_low_ctr_pages_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) + cannibals = await self._gsc_cannibalization_candidates_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) + striking = await self._gsc_striking_distance_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) + declining = await self._gsc_declining_queries_tool({"start_date": start_date, "end_date": end_date, "limit": 10}) + + actions = [] + for p in low_ctr_pages.get("items", []): + actions.append({ + "type": "update_titles_meta", + "target_page": p.get("page"), + "justification": f"Low CTR {p.get('ctr')}% with {p.get('impressions')} impressions", + "evidence": p.get("evidence_queries", []) + }) + for c in cannibals.get("items", []): + actions.append({ + "type": "consolidate/internal_link", + "target_page": c.get("recommended_target_page"), + "justification": f"Cannibalization on query '{c.get('query')}'", + "pages": c.get("pages", []) + }) + for q in striking.get("items", []): + actions.append({ + "type": "refresh_content", + "target": "query", + "query": q.get("query"), + "justification": f"Striking distance at position {q.get('position')} with {q.get('impressions')} impressions" + }) + for q in declining.get("items", []): + actions.append({ + "type": "refresh_content", + "target": "query", + "query": q.get("query"), + "justification": f"Clicks decline {q.get('prev_clicks')}→{q.get('curr_clicks')} ({q.get('drop_pct')}%)" + }) + + return { + "plan_name": "Default SEO Plan from GSC", + "range": {"current": {"start": start_date, "end": end_date}}, + "actions": actions, + "source": "gsc_cache", + "timestamp": datetime.utcnow().isoformat() + } + except Exception as e: + logger.error(f"default_seo_gsc_plan failed: {e}") + return {"error": str(e)} + + +class SocialAmplificationAgent(BaseALwrityAgent): + """ + Agent responsible for social media monitoring, content adaptation, and distribution. + """ + + def __init__(self, user_id: str, model_name: str = "Qwen/Qwen3-4B-Instruct-2507", llm: Any = None): + super().__init__(user_id, "social_media_manager", model_name, llm) + + self.sif_service = None + if SIF_AVAILABLE: + try: + self.sif_service = SIFIntegrationService(user_id) + except Exception as e: + logger.warning(f"Failed to initialize SIF service for SocialAmplificationAgent: {e}") + + async def propose_daily_tasks(self, context: Dict[str, Any]) -> List[TaskProposal]: + """Propose PUBLISH and ENGAGE pillar tasks.""" + proposals = [] + + # 1. Publish Task + proposals.append(TaskProposal( + title="Schedule Social Content", + description="Plan and schedule your posts for the week to maintain consistent presence.", + pillar_id="publish", + priority="high", + estimated_time=20, + source_agent="SocialAmplificationAgent", + reasoning="Consistency is key for algorithm growth.", + action_type="navigate", + action_url="/scheduler-dashboard" + )) + + # 2. Engage Task + proposals.append(TaskProposal( + title="Engage with Community", + description="Respond to comments and interact with industry leaders' posts.", + pillar_id="engage", + priority="medium", + estimated_time=15, + source_agent="SocialAmplificationAgent", + reasoning="Community building increases reach.", + action_type="navigate", + action_url="/social-dashboard" + )) + + return proposals + + def _create_txtai_agent(self) -> Agent: + """Create Social Amplification Agent using txtai native framework""" + if not TXTAI_AVAILABLE: + return None + + return Agent( + llm=self.llm, + tools=[ + { + "name": "social_monitor", + "description": "Monitors social media trends and engagement", + "target": self._social_monitor_tool + }, + { + "name": "content_adapter", + "description": "Adapts content for different social platforms", + "target": self._content_adapter_tool + }, + { + "name": "engagement_optimizer", + "description": "Optimizes content for maximum engagement", + "target": self._engagement_optimizer_tool + }, + { + "name": "distribution_manager", + "description": "Manages content distribution across platforms", + "target": self._distribution_manager_tool + } + ], + max_iterations=10, + system=self.get_effective_system_prompt(f"""You are the Social Media Amplification Agent for ALwrity user {self.user_id}. + + Your mission is to optimize content distribution, monitor social signals, + and amplify content reach across platforms. + + Responsibilities: + - Monitor social trends and brand mentions via SIF + - Adapt content for specific platforms (LinkedIn, Twitter, etc.) + - Optimize posts for engagement (hashtags, timing, tone) + - Plan and execute distribution strategies + + Use semantic insights to align social content with overall content strategy. + Focus on maximizing engagement and driving traffic back to the main content.""" + ) + ) + + # Tool Implementations + + async def _social_monitor_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Social monitoring tool using SIF. + """ + # In a real scenario, this would search for trends or mentions. + # For now, we search for social-related context in SIF. + query = context.get('query', 'social media trends') + + if self.sif_service: + try: + # Search for social media related insights in the index + results = await self.sif_service.search(query, limit=5) + + # Extract relevant info + trends = [] + for res in results: + text = res.get('text', '') + if 'social' in text.lower() or 'trend' in text.lower(): + trends.append(text[:100] + "...") + + return { + "trends": trends, + "mentions": [], # Placeholder + "sentiment": "neutral", + "source": "sif_index", + "timestamp": datetime.utcnow().isoformat() + } + except Exception as e: + logger.error(f"Social monitoring failed: {e}") + return {"error": str(e)} + + return { + "trends": ["AI in marketing", "Content automation"], + "source": "mock_data", + "timestamp": datetime.utcnow().isoformat() + } + + async def _content_adapter_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Adapts content for specific platforms. + Expects 'content' and 'platform' (e.g., 'linkedin', 'twitter'). + """ + content = context.get('content') + platform = context.get('platform', 'general') + + if not content: + return {"error": "No content provided for adaptation"} + + try: + # Use LLM to adapt content + prompt = f"""Adapt the following content for {platform}. + + Original Content: + {content} + + Requirements for {platform}: + - LinkedIn: Professional tone, use bullet points, engaging question at end. + - Twitter: Short, punchy, under 280 chars, use relevant hashtags. + - Instagram: Visual focus description, many hashtags. + - General: Balanced tone. + + Return ONLY the adapted content. + """ + + if self.llm: + adapted_content = await self._generate_llm_response(prompt) + else: + adapted_content = f"[Mock {platform}]: {content[:50]}... #adapted" + + return { + "original_content_snippet": content[:50] + "...", + "platform": platform, + "adapted_content": adapted_content, + "timestamp": datetime.utcnow().isoformat() + } + except Exception as e: + logger.error(f"Content adaptation failed: {e}") + return {"error": str(e)} + + async def _engagement_optimizer_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Optimizes content for engagement (hashtags, timing, hook). + """ + content = context.get('content') + platform = context.get('platform', 'general') + + if not content: + return {"error": "No content provided for optimization"} + + # Mock optimization logic or use LLM + suggestions = [ + "Add a call-to-action (CTA)", + "Use trending hashtags for the niche", + "Post during peak hours (Tue-Thu 10am)" + ] + + return { + "optimization_suggestions": suggestions, + "estimated_engagement_score": 8.5, + "timestamp": datetime.utcnow().isoformat() + } + + async def _distribution_manager_tool(self, context: Dict[str, Any]) -> Dict[str, Any]: + """ + Manages distribution (scheduling/posting). + """ + posts = context.get('posts', []) + + schedule = [] + for i, post in enumerate(posts): + schedule.append({ + "post_id": i, + "platform": post.get('platform', 'unknown'), + "scheduled_time": "Tomorrow 10:00 AM" # Mock + }) + + return { + "distribution_plan": schedule, + "status": "scheduled", + "timestamp": datetime.utcnow().isoformat() + } +>>>>>>> pr-369 diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index e027bc92..06b6bd27 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -6,7 +6,7 @@ from sqlalchemy.orm import Session from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask from models.agent_activity_models import AgentAlert -from services.agent_activity_service import AgentActivityService +from services.agent_activity_service import AgentActivityService, build_agent_event_payload from services.llm_providers.main_text_generation import llm_text_gen from loguru import logger @@ -430,7 +430,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> event_type="plan", severity="info", message="Building grounded daily workflow plan", - payload={"grounding": grounding}, + payload=build_agent_event_payload(phase="planning", step="build_grounded_plan", tool_name="llm_text_gen", progress_percent=10, input_summary="Grounding data assembled from onboarding + alerts", output_summary="Preparing daily workflow generation", decision_reason="Need context-aware workflow", evidence_refs=["onboarding_data","recent_agent_alerts"], safe_debug=True, metadata={"grounding": grounding}), run_id=run.id, agent_type="TodayWorkflowGenerator", ) @@ -449,7 +449,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> event_type="warning", severity="warning", message=str(e)[:2000], - payload={"fallback": True}, + payload=build_agent_event_payload(phase="generation", step="llm_failed_fallback", tool_name="llm_text_gen", progress_percent=70, output_summary="LLM generation failed, using fallback tasks", decision_reason="Exception during workflow generation", safe_debug=False, metadata={"fallback": True}), run_id=run.id, agent_type="TodayWorkflowGenerator", ) @@ -467,7 +467,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> event_type="final_summary", severity="info", message="Daily workflow plan generated", - payload={"date": date, "task_count": len(result.get("tasks", []))}, + payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", []))}), run_id=run.id, agent_type="TodayWorkflowGenerator", ) diff --git a/frontend/src/components/MainDashboard/components/TeamHuddleWidget.tsx b/frontend/src/components/MainDashboard/components/TeamHuddleWidget.tsx index 60ccebb8..278f9bd2 100644 --- a/frontend/src/components/MainDashboard/components/TeamHuddleWidget.tsx +++ b/frontend/src/components/MainDashboard/components/TeamHuddleWidget.tsx @@ -3,106 +3,161 @@ import { Box, Paper, Typography, - Avatar, Chip, List, ListItem, - ListItemAvatar, - ListItemText, Divider, IconButton, - Tooltip + Tooltip, + CircularProgress, + Accordion, + AccordionSummary, + AccordionDetails, + Stack, } from '@mui/material'; import { - Psychology as StrategyIcon, - Article as ContentIcon, - Search as SeoIcon, - Campaign as SocialIcon, - CompareArrows as CompetitorIcon, - Refresh as RefreshIcon + Refresh as RefreshIcon, + ExpandMore as ExpandMoreIcon, } from '@mui/icons-material'; -import { Link as RouterLink } from 'react-router-dom'; -import { useAgentHuddleFeed } from '../../../hooks/useAgentHuddleFeed'; +import { apiClient } from '../../../api/client'; -const ICON_BY_AGENT: Record