diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index f88730ef..02b59fe0 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -54,6 +54,14 @@ def _proposal_priority_rank(priority: str) -> int: def _proposal_order_key(proposal: Any) -> tuple: + # Handle both object and dict access for compatibility + if isinstance(proposal, dict): + return ( + str(proposal.get("source_agent") or "").lower(), + str(proposal.get("title") or "").lower(), + str(proposal.get("description") or "").lower(), + str(proposal.get("action_url") or "").lower(), + ) return ( str(getattr(proposal, "source_agent", "") or "").lower(), str(getattr(proposal, "title", "") or "").lower(), @@ -62,6 +70,19 @@ def _proposal_order_key(proposal: Any) -> tuple: ) +def _get_agent_proposal_timeout_seconds(grounding: Dict[str, Any]) -> float: + workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} + if not isinstance(workflow_config, dict): + return 4.0 + + raw_timeout = workflow_config.get("agent_proposal_timeout_seconds", 4.0) + try: + timeout_seconds = float(raw_timeout) + except (TypeError, ValueError): + return 4.0 + return max(1.0, timeout_seconds) + + def _fallback_tasks(date: str) -> List[Dict[str, Any]]: return [ { @@ -308,39 +329,82 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> logger.info(f"Gathering daily task proposals from agent committee for user {user_id}") agent_tasks = [] + committee_total_failure = False try: - # Define agents to poll - agents_to_poll = [ - orchestrator.agents.get('content'), # ContentStrategyAgent - orchestrator.agents.get('strategy'), # StrategyArchitectAgent - orchestrator.agents.get('seo'), # SEOOptimizationAgent - orchestrator.agents.get('social'), # SocialAmplificationAgent - orchestrator.agents.get('competitor'), # CompetitorResponseAgent - ] - + agent_timeout_seconds = _get_agent_proposal_timeout_seconds(grounding) + + # Define agents to poll (keyed for logging/metrics) + agents_to_poll = { + "content": orchestrator.agents.get('content'), + "strategy": orchestrator.agents.get('strategy'), + "seo": orchestrator.agents.get('seo'), + "social": orchestrator.agents.get('social'), + "competitor": orchestrator.agents.get('competitor'), + } + # Filter out None agents (disabled/failed init) - active_agents = [a for a in agents_to_poll if a] - - # Execute propose_daily_tasks in parallel - results = await asyncio.gather( - *[a.propose_daily_tasks(grounding) for a in active_agents], - return_exceptions=True + active_agents = {key: agent for key, agent in agents_to_poll.items() if agent} + + async def _collect_agent_proposals(agent_key: str, agent: Any) -> Dict[str, Any]: + started_at = datetime.now(timezone.utc) + try: + proposals = await asyncio.wait_for(agent.propose_daily_tasks(grounding), timeout=agent_timeout_seconds) + elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000 + return { + "agent_key": agent_key, + "status": "ok", + "elapsed_ms": elapsed_ms, + "proposals": proposals if isinstance(proposals, list) else [], + } + except asyncio.TimeoutError: + elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000 + return { + "agent_key": agent_key, + "status": "timeout", + "elapsed_ms": elapsed_ms, + "proposals": [], + "error": f"Timed out after {agent_timeout_seconds:.2f}s", + } + except Exception as agent_error: + elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000 + return { + "agent_key": agent_key, + "status": "error", + "elapsed_ms": elapsed_ms, + "proposals": [], + "error": str(agent_error), + } + + # Execute propose_daily_tasks in parallel with per-agent timeout + committee_results = await asyncio.gather( + *[_collect_agent_proposals(agent_key, agent) for agent_key, agent in active_agents.items()] ) - - # Collect successful proposals + + successful_agent_count = 0 raw_proposals = [] - for res in results: - if isinstance(res, list): - # Normalize pillar IDs and filter invalid proposals - for proposal in res: - pillar_id = str(proposal.get("pillarId") or "").lower().strip() - if pillar_id not in PILLAR_IDS: - logger.warning(f"Skipping proposal with invalid pillarId: {pillar_id}. Proposal: {proposal}") - continue - proposal["pillarId"] = pillar_id - raw_proposals.append(proposal) - elif isinstance(res, Exception): - logger.warning(f"Agent proposal failed: {res}") + for res in committee_results: + agent_key = res.get("agent_key") + status = res.get("status") + elapsed_ms = res.get("elapsed_ms") + + logger.info( + "Agent committee proposal metric | agent={} status={} elapsed_ms={:.2f} timeout_s={:.2f} proposal_count={}", + agent_key, + status, + elapsed_ms, + agent_timeout_seconds, + len(res.get("proposals") or []), + ) + + if status == "ok": + successful_agent_count += 1 + raw_proposals.extend(res.get("proposals") or []) + elif status == "timeout": + logger.warning(f"Agent proposal timed out for {agent_key}: {res.get('error')}") + else: + logger.warning(f"Agent proposal failed for {agent_key}: {res.get('error')}") + + committee_total_failure = successful_agent_count == 0 # 3. Filter Redundant Proposals (Self-Learning) # Note: We need to ensure we don't filter out essential recurring tasks if they were completed long ago @@ -350,19 +414,26 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> # Simple deduplication based on title+pillar unique_map = {} for p in raw_proposals: - key = f"{p.pillar_id}:{p.title}" + # Normalize pillar IDs and filter invalid proposals (re-apply from PR #383) + pillar_id = str(p.get("pillarId") or "").lower().strip() + if pillar_id not in PILLAR_IDS: + logger.warning(f"Skipping proposal with invalid pillarId: {pillar_id}. Proposal: {p}") + continue + p["pillarId"] = pillar_id + + key = f"{p.get('pillarId')}:{p.get('title')}" if key not in unique_map: unique_map[key] = p continue existing = unique_map[key] - if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority): + if _proposal_priority_rank(p.get('priority')) > _proposal_priority_rank(existing.get('priority')): unique_map[key] = p continue # Deterministic tie-breaker for equal priority proposals. if ( - _proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority) + _proposal_priority_rank(p.get('priority')) == _proposal_priority_rank(existing.get('priority')) and _proposal_order_key(p) < _proposal_order_key(existing) ): unique_map[key] = p @@ -374,32 +445,51 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> except Exception as e: logger.error(f"Committee proposal phase failed: {e}") + committee_total_failure = True # Continue to fallback or LLM generation if committee fails # 4. Final Selection - # If we have agent tasks, use them. Otherwise fall back to LLM generation. - if agent_tasks: + # Use committee outcomes whenever committee partially succeeds, even with sparse proposals. + if not committee_total_failure: logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee") - + # Convert TaskProposal objects to dicts for frontend final_tasks = [] for prop in agent_tasks: - final_tasks.append({ - "pillarId": prop.pillar_id, - "title": prop.title, - "description": prop.description, - "priority": prop.priority, - "estimatedTime": prop.estimated_time, - "actionType": prop.action_type, - "actionUrl": prop.action_url, - "enabled": True, - "metadata": { - "source_agent": prop.source_agent, - "reasoning": prop.reasoning, - "context_data": prop.context_data - } - }) - + # Handle both object and dict types + if isinstance(prop, dict): + final_tasks.append({ + "pillarId": prop.get("pillarId"), + "title": prop.get("title"), + "description": prop.get("description"), + "priority": prop.get("priority"), + "estimatedTime": prop.get("estimatedTime"), + "actionType": prop.get("actionType"), + "actionUrl": prop.get("actionUrl"), + "enabled": True, + "metadata": { + "source_agent": prop.get("source_agent"), + "reasoning": prop.get("reasoning"), + "context_data": prop.get("context_data") + } + }) + else: + final_tasks.append({ + "pillarId": prop.pillar_id, + "title": prop.title, + "description": prop.description, + "priority": prop.priority, + "estimatedTime": prop.estimated_time, + "actionType": prop.action_type, + "actionUrl": prop.action_url, + "enabled": True, + "metadata": { + "source_agent": prop.source_agent, + "reasoning": prop.reasoning, + "context_data": prop.context_data + } + }) + final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) return { "date": date,