From 15a9eaa9a0fcf56d4e6604b3b479766f320d01e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:40:03 +0530 Subject: [PATCH] Add committee health precheck and orchestrator init state metadata --- .../intelligence/agents/agent_orchestrator.py | 20 ++ backend/services/today_workflow_service.py | 249 +++++++++++++----- 2 files changed, 201 insertions(+), 68 deletions(-) diff --git a/backend/services/intelligence/agents/agent_orchestrator.py b/backend/services/intelligence/agents/agent_orchestrator.py index afb42f15..6f0deff2 100644 --- a/backend/services/intelligence/agents/agent_orchestrator.py +++ b/backend/services/intelligence/agents/agent_orchestrator.py @@ -464,6 +464,7 @@ class AgentOrchestrationService: async def get_or_create_orchestrator(self, user_id: str) -> ALwrityAgentOrchestrator: """Get or create an orchestrator for a user""" + onboarding_gated_initialization = False if user_id not in self.orchestrators: config = AgentTeamConfiguration(user_id=user_id) self.orchestrators[user_id] = ALwrityAgentOrchestrator(config) @@ -474,6 +475,25 @@ class AgentOrchestrationService: if not orchestrator.agents and not orchestrator.execution_history: logger.info(f"Orchestrator for {user_id} has no agents. Attempting re-initialization.") orchestrator._create_specialized_agents() + + last_system_check = next( + ( + entry + for entry in reversed(orchestrator.execution_history) + if entry.get("action") == "system_check" + ), + None, + ) + if last_system_check and last_system_check.get("status") == "pending": + details = str(last_system_check.get("details") or "").lower() + onboarding_gated_initialization = "onboarding" in details + + orchestrator.onboarding_gated_initialization = onboarding_gated_initialization + orchestrator.initialization_state = { + "onboarding_gated_initialization": onboarding_gated_initialization, + "active_agent_count": len(orchestrator.agents), + "active_agent_keys": sorted(orchestrator.agents.keys()), + } return orchestrator diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 1be7cb21..0c2db96f 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -276,85 +276,147 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> activity = AgentActivityService(db, user_id) grounding = build_grounding_context(db, user_id, date) memory_service = TaskMemoryService(user_id, db) + min_active_agents = 2 + generation_path = "committee" # 1. Get Orchestrator try: orchestrator = await orchestration_service.get_or_create_orchestrator(user_id) except Exception as e: logger.error(f"Failed to get orchestrator: {e}") - return {"date": date, "tasks": _fallback_tasks(date)} + fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding) + return { + "date": date, + "tasks": fallback_tasks, + "metadata": { + "generation_path": "controlled_fallback", + "committee": { + "minimum_active_agents": min_active_agents, + "active_agents": {"count": 0, "names": []}, + }, + "degraded": { + "is_degraded": True, + "reason": "orchestrator_unavailable", + "missing_agents": [], + }, + }, + } + + expected_committee_agents = ["content", "strategy", "seo", "social", "competitor"] + active_agent_names = sorted(orchestrator.agents.keys()) + active_agents_count = len(active_agent_names) + missing_agents = [name for name in expected_committee_agents if name not in active_agent_names] + onboarding_gated_initialization = bool(getattr(orchestrator, "onboarding_gated_initialization", False)) + initialization_state = ( + getattr(orchestrator, "initialization_state", None) + if isinstance(getattr(orchestrator, "initialization_state", None), dict) + else {} + ) + + degraded_metadata = { + "is_degraded": False, + "reason": None, + "missing_agents": [], + } + + if active_agents_count < min_active_agents: + generation_path = "controlled_fallback" + degraded_metadata = { + "is_degraded": True, + "reason": "insufficient_active_agents", + "missing_agents": missing_agents, + } + activity.log_event( + event_type="committee_health", + severity="warning", + message="Committee degraded: insufficient active agents", + payload=build_agent_event_payload( + phase="planning", + step="committee_health_precheck", + progress_percent=5, + output_summary=f"Only {active_agents_count} active committee agents", + decision_reason="Agent committee below configured minimum", + evidence_refs=active_agent_names, + safe_debug=True, + metadata={ + "minimum_active_agents": min_active_agents, + "active_agents": { + "count": active_agents_count, + "names": active_agent_names, + }, + "missing_agents": missing_agents, + "onboarding_gated_initialization": onboarding_gated_initialization, + "orchestrator_initialization_state": initialization_state, + }, + ), + agent_type="TodayWorkflowGenerator", + ) # 2. Parallel "Committee" Proposal Gathering logger.info(f"Gathering daily task proposals from agent committee for user {user_id}") - + agent_tasks = [] - try: - # Define agents to poll - agents_to_poll = [ - orchestrator.agents.get('content'), # ContentStrategyAgent - orchestrator.agents.get('strategy'), # StrategyArchitectAgent - orchestrator.agents.get('seo'), # SEOOptimizationAgent - orchestrator.agents.get('social'), # SocialAmplificationAgent - orchestrator.agents.get('competitor'), # CompetitorResponseAgent - ] - - # Filter out None agents (disabled/failed init) - active_agents = [a for a in agents_to_poll if a] - - # Execute propose_daily_tasks in parallel - results = await asyncio.gather( - *[a.propose_daily_tasks(grounding) for a in active_agents], - return_exceptions=True - ) - - # Collect successful proposals - raw_proposals = [] - for res in results: - if isinstance(res, list): - raw_proposals.extend(res) - elif isinstance(res, Exception): - logger.warning(f"Agent proposal failed: {res}") + if generation_path == "committee": + try: + # Define agents to poll + agents_to_poll = [ + orchestrator.agents.get('content'), # ContentStrategyAgent + orchestrator.agents.get('strategy'), # StrategyArchitectAgent + orchestrator.agents.get('seo'), # SEOOptimizationAgent + orchestrator.agents.get('social'), # SocialAmplificationAgent + orchestrator.agents.get('competitor'), # CompetitorResponseAgent + ] - # 3. Filter Redundant Proposals (Self-Learning) - # Note: We need to ensure we don't filter out essential recurring tasks if they were completed long ago - # But for now, we filter exact duplicates from recent history (last 7 days) - # We can implement semantic filtering later - - # Simple deduplication based on title+pillar - unique_map = {} - for p in raw_proposals: - key = f"{p.pillar_id}:{p.title}" - if key not in unique_map: - unique_map[key] = p - continue + # Filter out None agents (disabled/failed init) + active_agents = [a for a in agents_to_poll if a] - existing = unique_map[key] - if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority): - unique_map[key] = p - continue + # Execute propose_daily_tasks in parallel + results = await asyncio.gather( + *[a.propose_daily_tasks(grounding) for a in active_agents], + return_exceptions=True + ) - # Deterministic tie-breaker for equal priority proposals. - if ( - _proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority) - and _proposal_order_key(p) < _proposal_order_key(existing) - ): - unique_map[key] = p - - agent_tasks = list(unique_map.values()) - - # Phase 3: Check memory for rejections (Semantic Filter) - agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks) - - except Exception as e: - logger.error(f"Committee proposal phase failed: {e}") - # Continue to fallback or LLM generation if committee fails + # Collect successful proposals + raw_proposals = [] + for res in results: + if isinstance(res, list): + raw_proposals.extend(res) + elif isinstance(res, Exception): + logger.warning(f"Agent proposal failed: {res}") - # 4. Final Selection - # If we have agent tasks, use them. Otherwise fall back to LLM generation. - if agent_tasks: + # Simple deduplication based on title+pillar + unique_map = {} + for p in raw_proposals: + key = f"{p.pillar_id}:{p.title}" + if key not in unique_map: + unique_map[key] = p + continue + + existing = unique_map[key] + if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority): + unique_map[key] = p + continue + + # Deterministic tie-breaker for equal priority proposals. + if ( + _proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority) + and _proposal_order_key(p) < _proposal_order_key(existing) + ): + unique_map[key] = p + + agent_tasks = list(unique_map.values()) + + # Check memory for rejections (semantic filter) + agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks) + + except Exception as e: + logger.error(f"Committee proposal phase failed: {e}") + generation_path = "llm_fallback" + + # 3. Final Selection + if generation_path == "committee" and agent_tasks: logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee") - - # Convert TaskProposal objects to dicts for frontend + final_tasks = [] for prop in agent_tasks: final_tasks.append({ @@ -372,14 +434,50 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> "context_data": prop.context_data } }) - + final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) return { "date": date, - "tasks": final_tasks + "tasks": final_tasks, + "metadata": { + "generation_path": "committee", + "committee": { + "minimum_active_agents": min_active_agents, + "active_agents": { + "count": active_agents_count, + "names": active_agent_names, + }, + "onboarding_gated_initialization": onboarding_gated_initialization, + "orchestrator_initialization_state": initialization_state, + }, + "degraded": degraded_metadata, + }, } - # Fallback to original LLM generation if agents returned nothing + if generation_path != "controlled_fallback": + generation_path = "llm_fallback" + + if generation_path == "controlled_fallback": + fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding) + return { + "date": date, + "tasks": fallback_tasks, + "metadata": { + "generation_path": "controlled_fallback", + "committee": { + "minimum_active_agents": min_active_agents, + "active_agents": { + "count": active_agents_count, + "names": active_agent_names, + }, + "onboarding_gated_initialization": onboarding_gated_initialization, + "orchestrator_initialization_state": initialization_state, + }, + "degraded": degraded_metadata, + }, + } + + # Fallback to original LLM generation if committee returned nothing logger.info("Agent committee returned no tasks, falling back to LLM generation") schema = { @@ -457,17 +555,32 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> tasks = result.get("tasks") if isinstance(result, dict) else None if not isinstance(tasks, list) or not tasks: + generation_path = "controlled_fallback" tasks = _fallback_tasks(date) + result = { "date": date, "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), + "metadata": { + "generation_path": generation_path, + "committee": { + "minimum_active_agents": min_active_agents, + "active_agents": { + "count": active_agents_count, + "names": active_agent_names, + }, + "onboarding_gated_initialization": onboarding_gated_initialization, + "orchestrator_initialization_state": initialization_state, + }, + "degraded": degraded_metadata, + }, } activity.log_event( event_type="final_summary", severity="info", message="Daily workflow plan generated", - payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", []))}), + payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", [])), "generation_path": generation_path}), run_id=run.id, agent_type="TodayWorkflowGenerator", )