Add per-agent timeout handling for daily committee proposals
This commit is contained in:
@@ -42,6 +42,19 @@ def _proposal_order_key(proposal: Any) -> tuple:
|
||||
)
|
||||
|
||||
|
||||
def _get_agent_proposal_timeout_seconds(grounding: Dict[str, Any]) -> float:
|
||||
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
|
||||
if not isinstance(workflow_config, dict):
|
||||
return 4.0
|
||||
|
||||
raw_timeout = workflow_config.get("agent_proposal_timeout_seconds", 4.0)
|
||||
try:
|
||||
timeout_seconds = float(raw_timeout)
|
||||
except (TypeError, ValueError):
|
||||
return 4.0
|
||||
return max(1.0, timeout_seconds)
|
||||
|
||||
|
||||
def _fallback_tasks(date: str) -> List[Dict[str, Any]]:
|
||||
return [
|
||||
{
|
||||
@@ -288,32 +301,82 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
||||
logger.info(f"Gathering daily task proposals from agent committee for user {user_id}")
|
||||
|
||||
agent_tasks = []
|
||||
committee_total_failure = False
|
||||
try:
|
||||
# Define agents to poll
|
||||
agents_to_poll = [
|
||||
orchestrator.agents.get('content'), # ContentStrategyAgent
|
||||
orchestrator.agents.get('strategy'), # StrategyArchitectAgent
|
||||
orchestrator.agents.get('seo'), # SEOOptimizationAgent
|
||||
orchestrator.agents.get('social'), # SocialAmplificationAgent
|
||||
orchestrator.agents.get('competitor'), # CompetitorResponseAgent
|
||||
]
|
||||
|
||||
agent_timeout_seconds = _get_agent_proposal_timeout_seconds(grounding)
|
||||
|
||||
# Define agents to poll (keyed for logging/metrics)
|
||||
agents_to_poll = {
|
||||
"content": orchestrator.agents.get('content'),
|
||||
"strategy": orchestrator.agents.get('strategy'),
|
||||
"seo": orchestrator.agents.get('seo'),
|
||||
"social": orchestrator.agents.get('social'),
|
||||
"competitor": orchestrator.agents.get('competitor'),
|
||||
}
|
||||
|
||||
# Filter out None agents (disabled/failed init)
|
||||
active_agents = [a for a in agents_to_poll if a]
|
||||
|
||||
# Execute propose_daily_tasks in parallel
|
||||
results = await asyncio.gather(
|
||||
*[a.propose_daily_tasks(grounding) for a in active_agents],
|
||||
return_exceptions=True
|
||||
active_agents = {key: agent for key, agent in agents_to_poll.items() if agent}
|
||||
|
||||
async def _collect_agent_proposals(agent_key: str, agent: Any) -> Dict[str, Any]:
|
||||
started_at = datetime.now(timezone.utc)
|
||||
try:
|
||||
proposals = await asyncio.wait_for(agent.propose_daily_tasks(grounding), timeout=agent_timeout_seconds)
|
||||
elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000
|
||||
return {
|
||||
"agent_key": agent_key,
|
||||
"status": "ok",
|
||||
"elapsed_ms": elapsed_ms,
|
||||
"proposals": proposals if isinstance(proposals, list) else [],
|
||||
}
|
||||
except asyncio.TimeoutError:
|
||||
elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000
|
||||
return {
|
||||
"agent_key": agent_key,
|
||||
"status": "timeout",
|
||||
"elapsed_ms": elapsed_ms,
|
||||
"proposals": [],
|
||||
"error": f"Timed out after {agent_timeout_seconds:.2f}s",
|
||||
}
|
||||
except Exception as agent_error:
|
||||
elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000
|
||||
return {
|
||||
"agent_key": agent_key,
|
||||
"status": "error",
|
||||
"elapsed_ms": elapsed_ms,
|
||||
"proposals": [],
|
||||
"error": str(agent_error),
|
||||
}
|
||||
|
||||
# Execute propose_daily_tasks in parallel with per-agent timeout
|
||||
committee_results = await asyncio.gather(
|
||||
*[_collect_agent_proposals(agent_key, agent) for agent_key, agent in active_agents.items()]
|
||||
)
|
||||
|
||||
# Collect successful proposals
|
||||
|
||||
successful_agent_count = 0
|
||||
raw_proposals = []
|
||||
for res in results:
|
||||
if isinstance(res, list):
|
||||
raw_proposals.extend(res)
|
||||
elif isinstance(res, Exception):
|
||||
logger.warning(f"Agent proposal failed: {res}")
|
||||
for res in committee_results:
|
||||
agent_key = res.get("agent_key")
|
||||
status = res.get("status")
|
||||
elapsed_ms = res.get("elapsed_ms")
|
||||
|
||||
logger.info(
|
||||
"Agent committee proposal metric | agent={} status={} elapsed_ms={:.2f} timeout_s={:.2f} proposal_count={}",
|
||||
agent_key,
|
||||
status,
|
||||
elapsed_ms,
|
||||
agent_timeout_seconds,
|
||||
len(res.get("proposals") or []),
|
||||
)
|
||||
|
||||
if status == "ok":
|
||||
successful_agent_count += 1
|
||||
raw_proposals.extend(res.get("proposals") or [])
|
||||
elif status == "timeout":
|
||||
logger.warning(f"Agent proposal timed out for {agent_key}: {res.get('error')}")
|
||||
else:
|
||||
logger.warning(f"Agent proposal failed for {agent_key}: {res.get('error')}")
|
||||
|
||||
committee_total_failure = successful_agent_count == 0
|
||||
|
||||
# 3. Filter Redundant Proposals (Self-Learning)
|
||||
# Note: We need to ensure we don't filter out essential recurring tasks if they were completed long ago
|
||||
@@ -347,13 +410,14 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Committee proposal phase failed: {e}")
|
||||
committee_total_failure = True
|
||||
# Continue to fallback or LLM generation if committee fails
|
||||
|
||||
# 4. Final Selection
|
||||
# If we have agent tasks, use them. Otherwise fall back to LLM generation.
|
||||
if agent_tasks:
|
||||
# Use committee outcomes whenever committee partially succeeds, even with sparse proposals.
|
||||
if not committee_total_failure:
|
||||
logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee")
|
||||
|
||||
|
||||
# Convert TaskProposal objects to dicts for frontend
|
||||
final_tasks = []
|
||||
for prop in agent_tasks:
|
||||
@@ -372,7 +436,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
||||
"context_data": prop.context_data
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
|
||||
return {
|
||||
"date": date,
|
||||
|
||||
Reference in New Issue
Block a user