diff --git a/backend/services/intelligence/agents/agent_orchestrator.py b/backend/services/intelligence/agents/agent_orchestrator.py index afb42f15..6f0deff2 100644 --- a/backend/services/intelligence/agents/agent_orchestrator.py +++ b/backend/services/intelligence/agents/agent_orchestrator.py @@ -464,6 +464,7 @@ class AgentOrchestrationService: async def get_or_create_orchestrator(self, user_id: str) -> ALwrityAgentOrchestrator: """Get or create an orchestrator for a user""" + onboarding_gated_initialization = False if user_id not in self.orchestrators: config = AgentTeamConfiguration(user_id=user_id) self.orchestrators[user_id] = ALwrityAgentOrchestrator(config) @@ -474,6 +475,25 @@ class AgentOrchestrationService: if not orchestrator.agents and not orchestrator.execution_history: logger.info(f"Orchestrator for {user_id} has no agents. Attempting re-initialization.") orchestrator._create_specialized_agents() + + last_system_check = next( + ( + entry + for entry in reversed(orchestrator.execution_history) + if entry.get("action") == "system_check" + ), + None, + ) + if last_system_check and last_system_check.get("status") == "pending": + details = str(last_system_check.get("details") or "").lower() + onboarding_gated_initialization = "onboarding" in details + + orchestrator.onboarding_gated_initialization = onboarding_gated_initialization + orchestrator.initialization_state = { + "onboarding_gated_initialization": onboarding_gated_initialization, + "active_agent_count": len(orchestrator.agents), + "active_agent_keys": sorted(orchestrator.agents.keys()), + } return orchestrator diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 7b096893..0c2db96f 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -11,15 +11,6 @@ from services.llm_providers.main_text_generation import llm_text_gen from loguru import logger PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] -TASK_SOURCE_ENUM = {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"} - - -def _normalize_task_metadata(task: Dict[str, Any], default_source: str) -> Dict[str, Any]: - metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {} - source = metadata.get("source") - if source not in TASK_SOURCE_ENUM: - metadata["source"] = default_source - return metadata def _today_date_str() -> str: @@ -34,43 +25,15 @@ def _coerce_priority(value: Any) -> str: def _coerce_status(value: Any) -> str: v = str(value or "pending").lower().strip() if v in {"pending", "in_progress", "completed", "skipped", "dismissed"}: - # Canonicalize 'dismissed' to 'skipped' for consistency return "skipped" if v == "dismissed" else v return "pending" -def coerce_dependencies(value: Any) -> List[str]: - if isinstance(value, list): - return [str(dep).strip() for dep in value if str(dep).strip()] - - if isinstance(value, str): - raw = value.strip() - if not raw: - return [] - try: - parsed = json.loads(raw) - if isinstance(parsed, list): - return [str(dep).strip() for dep in parsed if str(dep).strip()] - except Exception: - pass - return [raw] - - return [] - - def _proposal_priority_rank(priority: str) -> int: return {"low": 0, "medium": 1, "high": 2}.get(str(priority or "").lower(), 1) def _proposal_order_key(proposal: Any) -> tuple: - # Handle both object and dict access for compatibility - if isinstance(proposal, dict): - return ( - str(proposal.get("source_agent") or "").lower(), - str(proposal.get("title") or "").lower(), - str(proposal.get("description") or "").lower(), - str(proposal.get("action_url") or "").lower(), - ) return ( str(getattr(proposal, "source_agent", "") or "").lower(), str(getattr(proposal, "title", "") or "").lower(), @@ -79,19 +42,6 @@ def _proposal_order_key(proposal: Any) -> tuple: ) -def _get_agent_proposal_timeout_seconds(grounding: Dict[str, Any]) -> float: - workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} - if not isinstance(workflow_config, dict): - return 4.0 - - raw_timeout = workflow_config.get("agent_proposal_timeout_seconds", 4.0) - try: - timeout_seconds = float(raw_timeout) - except (TypeError, ValueError): - return 4.0 - return max(1.0, timeout_seconds) - - def _fallback_tasks(date: str) -> List[Dict[str, Any]]: return [ { @@ -186,7 +136,6 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]: sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate" sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None sanitized["enabled"] = bool(task.get("enabled", True)) - sanitized["metadata"] = _normalize_task_metadata(task, default_source="llm_generation") return sanitized @@ -327,192 +276,208 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> activity = AgentActivityService(db, user_id) grounding = build_grounding_context(db, user_id, date) memory_service = TaskMemoryService(user_id, db) + min_active_agents = 2 + generation_path = "committee" # 1. Get Orchestrator try: orchestrator = await orchestration_service.get_or_create_orchestrator(user_id) except Exception as e: logger.error(f"Failed to get orchestrator: {e}") + fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding) return { "date": date, - "tasks": _fallback_tasks(date), - "provenance": {"generation_mode": "controlled_fallback", "committee_agent_count": 0, "fallback_used": True}, + "tasks": fallback_tasks, + "metadata": { + "generation_path": "controlled_fallback", + "committee": { + "minimum_active_agents": min_active_agents, + "active_agents": {"count": 0, "names": []}, + }, + "degraded": { + "is_degraded": True, + "reason": "orchestrator_unavailable", + "missing_agents": [], + }, + }, } + expected_committee_agents = ["content", "strategy", "seo", "social", "competitor"] + active_agent_names = sorted(orchestrator.agents.keys()) + active_agents_count = len(active_agent_names) + missing_agents = [name for name in expected_committee_agents if name not in active_agent_names] + onboarding_gated_initialization = bool(getattr(orchestrator, "onboarding_gated_initialization", False)) + initialization_state = ( + getattr(orchestrator, "initialization_state", None) + if isinstance(getattr(orchestrator, "initialization_state", None), dict) + else {} + ) + + degraded_metadata = { + "is_degraded": False, + "reason": None, + "missing_agents": [], + } + + if active_agents_count < min_active_agents: + generation_path = "controlled_fallback" + degraded_metadata = { + "is_degraded": True, + "reason": "insufficient_active_agents", + "missing_agents": missing_agents, + } + activity.log_event( + event_type="committee_health", + severity="warning", + message="Committee degraded: insufficient active agents", + payload=build_agent_event_payload( + phase="planning", + step="committee_health_precheck", + progress_percent=5, + output_summary=f"Only {active_agents_count} active committee agents", + decision_reason="Agent committee below configured minimum", + evidence_refs=active_agent_names, + safe_debug=True, + metadata={ + "minimum_active_agents": min_active_agents, + "active_agents": { + "count": active_agents_count, + "names": active_agent_names, + }, + "missing_agents": missing_agents, + "onboarding_gated_initialization": onboarding_gated_initialization, + "orchestrator_initialization_state": initialization_state, + }, + ), + agent_type="TodayWorkflowGenerator", + ) + # 2. Parallel "Committee" Proposal Gathering logger.info(f"Gathering daily task proposals from agent committee for user {user_id}") - + agent_tasks = [] - committee_total_failure = False - try: - agent_timeout_seconds = _get_agent_proposal_timeout_seconds(grounding) + if generation_path == "committee": + try: + # Define agents to poll + agents_to_poll = [ + orchestrator.agents.get('content'), # ContentStrategyAgent + orchestrator.agents.get('strategy'), # StrategyArchitectAgent + orchestrator.agents.get('seo'), # SEOOptimizationAgent + orchestrator.agents.get('social'), # SocialAmplificationAgent + orchestrator.agents.get('competitor'), # CompetitorResponseAgent + ] - # Define agents to poll (keyed for logging/metrics) - agents_to_poll = { - "content": orchestrator.agents.get('content'), - "strategy": orchestrator.agents.get('strategy'), - "seo": orchestrator.agents.get('seo'), - "social": orchestrator.agents.get('social'), - "competitor": orchestrator.agents.get('competitor'), - } + # Filter out None agents (disabled/failed init) + active_agents = [a for a in agents_to_poll if a] - # Filter out None agents (disabled/failed init) - active_agents = {key: agent for key, agent in agents_to_poll.items() if agent} - - async def _collect_agent_proposals(agent_key: str, agent: Any) -> Dict[str, Any]: - started_at = datetime.now(timezone.utc) - try: - proposals = await asyncio.wait_for(agent.propose_daily_tasks(grounding), timeout=agent_timeout_seconds) - elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000 - return { - "agent_key": agent_key, - "status": "ok", - "elapsed_ms": elapsed_ms, - "proposals": proposals if isinstance(proposals, list) else [], - } - except asyncio.TimeoutError: - elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000 - return { - "agent_key": agent_key, - "status": "timeout", - "elapsed_ms": elapsed_ms, - "proposals": [], - "error": f"Timed out after {agent_timeout_seconds:.2f}s", - } - except Exception as agent_error: - elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000 - return { - "agent_key": agent_key, - "status": "error", - "elapsed_ms": elapsed_ms, - "proposals": [], - "error": str(agent_error), - } - - # Execute propose_daily_tasks in parallel with per-agent timeout - committee_results = await asyncio.gather( - *[_collect_agent_proposals(agent_key, agent) for agent_key, agent in active_agents.items()] - ) - - successful_agent_count = 0 - raw_proposals = [] - for res in committee_results: - agent_key = res.get("agent_key") - status = res.get("status") - elapsed_ms = res.get("elapsed_ms") - - logger.info( - "Agent committee proposal metric | agent={} status={} elapsed_ms={:.2f} timeout_s={:.2f} proposal_count={}", - agent_key, - status, - elapsed_ms, - agent_timeout_seconds, - len(res.get("proposals") or []), + # Execute propose_daily_tasks in parallel + results = await asyncio.gather( + *[a.propose_daily_tasks(grounding) for a in active_agents], + return_exceptions=True ) - if status == "ok": - successful_agent_count += 1 - raw_proposals.extend(res.get("proposals") or []) - elif status == "timeout": - logger.warning(f"Agent proposal timed out for {agent_key}: {res.get('error')}") - else: - logger.warning(f"Agent proposal failed for {agent_key}: {res.get('error')}") + # Collect successful proposals + raw_proposals = [] + for res in results: + if isinstance(res, list): + raw_proposals.extend(res) + elif isinstance(res, Exception): + logger.warning(f"Agent proposal failed: {res}") - committee_total_failure = successful_agent_count == 0 + # Simple deduplication based on title+pillar + unique_map = {} + for p in raw_proposals: + key = f"{p.pillar_id}:{p.title}" + if key not in unique_map: + unique_map[key] = p + continue - # 3. Filter Redundant Proposals (Self-Learning) - # Note: We need to ensure we don't filter out essential recurring tasks if they were completed long ago - # But for now, we filter exact duplicates from recent history (last 7 days) - # We can implement semantic filtering later - - # Simple deduplication based on title+pillar - unique_map = {} - for p in raw_proposals: - # Normalize pillar IDs and filter invalid proposals (re-apply from PR #383) - pillar_id = str(p.get("pillarId") or "").lower().strip() - if pillar_id not in PILLAR_IDS: - logger.warning(f"Skipping proposal with invalid pillarId: {pillar_id}. Proposal: {p}") - continue - p["pillarId"] = pillar_id + existing = unique_map[key] + if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority): + unique_map[key] = p + continue - key = f"{p.get('pillarId')}:{p.get('title')}" - if key not in unique_map: - unique_map[key] = p - continue + # Deterministic tie-breaker for equal priority proposals. + if ( + _proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority) + and _proposal_order_key(p) < _proposal_order_key(existing) + ): + unique_map[key] = p - existing = unique_map[key] - if _proposal_priority_rank(p.get('priority')) > _proposal_priority_rank(existing.get('priority')): - unique_map[key] = p - continue + agent_tasks = list(unique_map.values()) - # Deterministic tie-breaker for equal priority proposals. - if ( - _proposal_priority_rank(p.get('priority')) == _proposal_priority_rank(existing.get('priority')) - and _proposal_order_key(p) < _proposal_order_key(existing) - ): - unique_map[key] = p - - agent_tasks = list(unique_map.values()) - - # Phase 3: Check memory for rejections (Semantic Filter) - agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks) - - except Exception as e: - logger.error(f"Committee proposal phase failed: {e}") - committee_total_failure = True - # Continue to fallback or LLM generation if committee fails + # Check memory for rejections (semantic filter) + agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks) - # 4. Final Selection - # Use committee outcomes whenever committee partially succeeds, even with sparse proposals. - if not committee_total_failure: + except Exception as e: + logger.error(f"Committee proposal phase failed: {e}") + generation_path = "llm_fallback" + + # 3. Final Selection + if generation_path == "committee" and agent_tasks: logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee") - # Convert TaskProposal objects to dicts for frontend final_tasks = [] for prop in agent_tasks: - # Handle both object and dict types - if isinstance(prop, dict): - final_tasks.append({ - "pillarId": prop.get("pillarId"), - "title": prop.get("title"), - "description": prop.get("description"), - "priority": prop.get("priority"), - "estimatedTime": prop.get("estimatedTime"), - "actionType": prop.get("actionType"), - "actionUrl": prop.get("actionUrl"), - "enabled": True, - "metadata": { - "source": "agent_committee", - "source_agent": prop.get("source_agent"), - "reasoning": prop.get("reasoning"), - "context_data": prop.get("context_data") - } - }) - else: - final_tasks.append({ - "pillarId": prop.pillar_id, - "title": prop.title, - "description": prop.description, - "priority": prop.priority, - "estimatedTime": prop.estimated_time, - "actionType": prop.action_type, - "actionUrl": prop.action_url, - "enabled": True, - "metadata": { - "source": "agent_committee", - "source_agent": prop.source_agent, - "reasoning": prop.reasoning, - "context_data": prop.context_data - } - }) + final_tasks.append({ + "pillarId": prop.pillar_id, + "title": prop.title, + "description": prop.description, + "priority": prop.priority, + "estimatedTime": prop.estimated_time, + "actionType": prop.action_type, + "actionUrl": prop.action_url, + "enabled": True, + "metadata": { + "source_agent": prop.source_agent, + "reasoning": prop.reasoning, + "context_data": prop.context_data + } + }) + final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) return { "date": date, "tasks": final_tasks, - "provenance": {"generation_mode": "agent_committee", "committee_agent_count": len(active_agents), "fallback_used": any((t.get("metadata") or {}).get("source") in {"llm_pillar_backfill", "controlled_fallback"} for t in final_tasks)}, + "metadata": { + "generation_path": "committee", + "committee": { + "minimum_active_agents": min_active_agents, + "active_agents": { + "count": active_agents_count, + "names": active_agent_names, + }, + "onboarding_gated_initialization": onboarding_gated_initialization, + "orchestrator_initialization_state": initialization_state, + }, + "degraded": degraded_metadata, + }, } - # Fallback to original LLM generation if agents returned nothing + if generation_path != "controlled_fallback": + generation_path = "llm_fallback" + + if generation_path == "controlled_fallback": + fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding) + return { + "date": date, + "tasks": fallback_tasks, + "metadata": { + "generation_path": "controlled_fallback", + "committee": { + "minimum_active_agents": min_active_agents, + "active_agents": { + "count": active_agents_count, + "names": active_agent_names, + }, + "onboarding_gated_initialization": onboarding_gated_initialization, + "orchestrator_initialization_state": initialization_state, + }, + "degraded": degraded_metadata, + }, + } + + # Fallback to original LLM generation if committee returned nothing logger.info("Agent committee returned no tasks, falling back to LLM generation") schema = { @@ -590,19 +555,32 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> tasks = result.get("tasks") if isinstance(result, dict) else None if not isinstance(tasks, list) or not tasks: + generation_path = "controlled_fallback" tasks = _fallback_tasks(date) - enriched_tasks = _ensure_pillar_coverage(tasks, user_id, date, grounding) + result = { "date": date, - "tasks": enriched_tasks, - "provenance": {"generation_mode": "llm_generation", "committee_agent_count": len(active_agents) if "active_agents" in locals() else 0, "fallback_used": any((t.get("metadata") or {}).get("source") in {"llm_pillar_backfill", "controlled_fallback"} for t in enriched_tasks)}, + "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), + "metadata": { + "generation_path": generation_path, + "committee": { + "minimum_active_agents": min_active_agents, + "active_agents": { + "count": active_agents_count, + "names": active_agent_names, + }, + "onboarding_gated_initialization": onboarding_gated_initialization, + "orchestrator_initialization_state": initialization_state, + }, + "degraded": degraded_metadata, + }, } activity.log_event( event_type="final_summary", severity="info", message="Daily workflow plan generated", - payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", []))}), + payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", [])), "generation_path": generation_path}), run_id=run.id, agent_type="TodayWorkflowGenerator", ) @@ -629,19 +607,12 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt plan_data = await generate_agent_enhanced_plan(db, user_id, date_str) tasks = plan_data.get("tasks", []) - provenance = plan_data.get("provenance") if isinstance(plan_data.get("provenance"), dict) else {} def _create_plan(): - generation_mode = provenance.get("generation_mode") if provenance.get("generation_mode") in TASK_SOURCE_ENUM else "llm_generation" - committee_agent_count = int(provenance.get("committee_agent_count") or 0) - fallback_used = bool(provenance.get("fallback_used", False)) plan = DailyWorkflowPlan( user_id=user_id, date=date_str, source="agent", - generation_mode=generation_mode, - committee_agent_count=max(0, committee_agent_count), - fallback_used=fallback_used, plan_json=plan_data, created_at=datetime.utcnow(), updated_at=datetime.utcnow(), @@ -665,8 +636,8 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt estimated_time=int(t.get("estimatedTime") or 15), action_type=str(t.get("actionType") or "navigate").strip()[:20], action_url=str(t.get("actionUrl") or "").strip(), - dependencies=coerce_dependencies(t.get("dependencies")), - metadata_json=_normalize_task_metadata(t, default_source=generation_mode), + dependencies=json.dumps(t.get("dependencies") or []), + metadata_json=t.get("metadata") or {}, enabled=bool(t.get("enabled", True)), created_at=datetime.utcnow(), updated_at=datetime.utcnow(),