Merge PR #389: Committee Health Precheck and Simplified Architecture

- Simplify workflow generation by removing complex dependency coercion and metadata normalization functions
- Add committee health precheck to detect insufficient active agents before proposal gathering
- Add orchestrator initialization state metadata tracking for observability
- Downgrade fastapi to 0.104.0 for stability
- Simplify database schema (remove generation_mode, committee_agent_count, fallback_used columns from DailyWorkflowPlan)
- Remove 'skipped' from suppressed task statuses in memory service (keep only dismissed, rejected)
- Update task status normalization logic for clarity
- Return simplified metadata structure with generation_path and degradation tracking
This commit is contained in:
ajaysi
2026-03-08 18:10:27 +05:30
2 changed files with 208 additions and 217 deletions

View File

@@ -464,6 +464,7 @@ class AgentOrchestrationService:
async def get_or_create_orchestrator(self, user_id: str) -> ALwrityAgentOrchestrator: async def get_or_create_orchestrator(self, user_id: str) -> ALwrityAgentOrchestrator:
"""Get or create an orchestrator for a user""" """Get or create an orchestrator for a user"""
onboarding_gated_initialization = False
if user_id not in self.orchestrators: if user_id not in self.orchestrators:
config = AgentTeamConfiguration(user_id=user_id) config = AgentTeamConfiguration(user_id=user_id)
self.orchestrators[user_id] = ALwrityAgentOrchestrator(config) self.orchestrators[user_id] = ALwrityAgentOrchestrator(config)
@@ -474,6 +475,25 @@ class AgentOrchestrationService:
if not orchestrator.agents and not orchestrator.execution_history: if not orchestrator.agents and not orchestrator.execution_history:
logger.info(f"Orchestrator for {user_id} has no agents. Attempting re-initialization.") logger.info(f"Orchestrator for {user_id} has no agents. Attempting re-initialization.")
orchestrator._create_specialized_agents() orchestrator._create_specialized_agents()
last_system_check = next(
(
entry
for entry in reversed(orchestrator.execution_history)
if entry.get("action") == "system_check"
),
None,
)
if last_system_check and last_system_check.get("status") == "pending":
details = str(last_system_check.get("details") or "").lower()
onboarding_gated_initialization = "onboarding" in details
orchestrator.onboarding_gated_initialization = onboarding_gated_initialization
orchestrator.initialization_state = {
"onboarding_gated_initialization": onboarding_gated_initialization,
"active_agent_count": len(orchestrator.agents),
"active_agent_keys": sorted(orchestrator.agents.keys()),
}
return orchestrator return orchestrator

View File

@@ -11,15 +11,6 @@ from services.llm_providers.main_text_generation import llm_text_gen
from loguru import logger from loguru import logger
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
TASK_SOURCE_ENUM = {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"}
def _normalize_task_metadata(task: Dict[str, Any], default_source: str) -> Dict[str, Any]:
metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {}
source = metadata.get("source")
if source not in TASK_SOURCE_ENUM:
metadata["source"] = default_source
return metadata
def _today_date_str() -> str: def _today_date_str() -> str:
@@ -34,43 +25,15 @@ def _coerce_priority(value: Any) -> str:
def _coerce_status(value: Any) -> str: def _coerce_status(value: Any) -> str:
v = str(value or "pending").lower().strip() v = str(value or "pending").lower().strip()
if v in {"pending", "in_progress", "completed", "skipped", "dismissed"}: if v in {"pending", "in_progress", "completed", "skipped", "dismissed"}:
# Canonicalize 'dismissed' to 'skipped' for consistency
return "skipped" if v == "dismissed" else v return "skipped" if v == "dismissed" else v
return "pending" return "pending"
def coerce_dependencies(value: Any) -> List[str]:
if isinstance(value, list):
return [str(dep).strip() for dep in value if str(dep).strip()]
if isinstance(value, str):
raw = value.strip()
if not raw:
return []
try:
parsed = json.loads(raw)
if isinstance(parsed, list):
return [str(dep).strip() for dep in parsed if str(dep).strip()]
except Exception:
pass
return [raw]
return []
def _proposal_priority_rank(priority: str) -> int: def _proposal_priority_rank(priority: str) -> int:
return {"low": 0, "medium": 1, "high": 2}.get(str(priority or "").lower(), 1) return {"low": 0, "medium": 1, "high": 2}.get(str(priority or "").lower(), 1)
def _proposal_order_key(proposal: Any) -> tuple: def _proposal_order_key(proposal: Any) -> tuple:
# Handle both object and dict access for compatibility
if isinstance(proposal, dict):
return (
str(proposal.get("source_agent") or "").lower(),
str(proposal.get("title") or "").lower(),
str(proposal.get("description") or "").lower(),
str(proposal.get("action_url") or "").lower(),
)
return ( return (
str(getattr(proposal, "source_agent", "") or "").lower(), str(getattr(proposal, "source_agent", "") or "").lower(),
str(getattr(proposal, "title", "") or "").lower(), str(getattr(proposal, "title", "") or "").lower(),
@@ -79,19 +42,6 @@ def _proposal_order_key(proposal: Any) -> tuple:
) )
def _get_agent_proposal_timeout_seconds(grounding: Dict[str, Any]) -> float:
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
if not isinstance(workflow_config, dict):
return 4.0
raw_timeout = workflow_config.get("agent_proposal_timeout_seconds", 4.0)
try:
timeout_seconds = float(raw_timeout)
except (TypeError, ValueError):
return 4.0
return max(1.0, timeout_seconds)
def _fallback_tasks(date: str) -> List[Dict[str, Any]]: def _fallback_tasks(date: str) -> List[Dict[str, Any]]:
return [ return [
{ {
@@ -186,7 +136,6 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]:
sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate" sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate"
sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None
sanitized["enabled"] = bool(task.get("enabled", True)) sanitized["enabled"] = bool(task.get("enabled", True))
sanitized["metadata"] = _normalize_task_metadata(task, default_source="llm_generation")
return sanitized return sanitized
@@ -327,192 +276,208 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
activity = AgentActivityService(db, user_id) activity = AgentActivityService(db, user_id)
grounding = build_grounding_context(db, user_id, date) grounding = build_grounding_context(db, user_id, date)
memory_service = TaskMemoryService(user_id, db) memory_service = TaskMemoryService(user_id, db)
min_active_agents = 2
generation_path = "committee"
# 1. Get Orchestrator # 1. Get Orchestrator
try: try:
orchestrator = await orchestration_service.get_or_create_orchestrator(user_id) orchestrator = await orchestration_service.get_or_create_orchestrator(user_id)
except Exception as e: except Exception as e:
logger.error(f"Failed to get orchestrator: {e}") logger.error(f"Failed to get orchestrator: {e}")
fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding)
return { return {
"date": date, "date": date,
"tasks": _fallback_tasks(date), "tasks": fallback_tasks,
"provenance": {"generation_mode": "controlled_fallback", "committee_agent_count": 0, "fallback_used": True}, "metadata": {
"generation_path": "controlled_fallback",
"committee": {
"minimum_active_agents": min_active_agents,
"active_agents": {"count": 0, "names": []},
},
"degraded": {
"is_degraded": True,
"reason": "orchestrator_unavailable",
"missing_agents": [],
},
},
} }
expected_committee_agents = ["content", "strategy", "seo", "social", "competitor"]
active_agent_names = sorted(orchestrator.agents.keys())
active_agents_count = len(active_agent_names)
missing_agents = [name for name in expected_committee_agents if name not in active_agent_names]
onboarding_gated_initialization = bool(getattr(orchestrator, "onboarding_gated_initialization", False))
initialization_state = (
getattr(orchestrator, "initialization_state", None)
if isinstance(getattr(orchestrator, "initialization_state", None), dict)
else {}
)
degraded_metadata = {
"is_degraded": False,
"reason": None,
"missing_agents": [],
}
if active_agents_count < min_active_agents:
generation_path = "controlled_fallback"
degraded_metadata = {
"is_degraded": True,
"reason": "insufficient_active_agents",
"missing_agents": missing_agents,
}
activity.log_event(
event_type="committee_health",
severity="warning",
message="Committee degraded: insufficient active agents",
payload=build_agent_event_payload(
phase="planning",
step="committee_health_precheck",
progress_percent=5,
output_summary=f"Only {active_agents_count} active committee agents",
decision_reason="Agent committee below configured minimum",
evidence_refs=active_agent_names,
safe_debug=True,
metadata={
"minimum_active_agents": min_active_agents,
"active_agents": {
"count": active_agents_count,
"names": active_agent_names,
},
"missing_agents": missing_agents,
"onboarding_gated_initialization": onboarding_gated_initialization,
"orchestrator_initialization_state": initialization_state,
},
),
agent_type="TodayWorkflowGenerator",
)
# 2. Parallel "Committee" Proposal Gathering # 2. Parallel "Committee" Proposal Gathering
logger.info(f"Gathering daily task proposals from agent committee for user {user_id}") logger.info(f"Gathering daily task proposals from agent committee for user {user_id}")
agent_tasks = [] agent_tasks = []
committee_total_failure = False if generation_path == "committee":
try: try:
agent_timeout_seconds = _get_agent_proposal_timeout_seconds(grounding) # Define agents to poll
agents_to_poll = [
orchestrator.agents.get('content'), # ContentStrategyAgent
orchestrator.agents.get('strategy'), # StrategyArchitectAgent
orchestrator.agents.get('seo'), # SEOOptimizationAgent
orchestrator.agents.get('social'), # SocialAmplificationAgent
orchestrator.agents.get('competitor'), # CompetitorResponseAgent
]
# Define agents to poll (keyed for logging/metrics) # Filter out None agents (disabled/failed init)
agents_to_poll = { active_agents = [a for a in agents_to_poll if a]
"content": orchestrator.agents.get('content'),
"strategy": orchestrator.agents.get('strategy'),
"seo": orchestrator.agents.get('seo'),
"social": orchestrator.agents.get('social'),
"competitor": orchestrator.agents.get('competitor'),
}
# Filter out None agents (disabled/failed init) # Execute propose_daily_tasks in parallel
active_agents = {key: agent for key, agent in agents_to_poll.items() if agent} results = await asyncio.gather(
*[a.propose_daily_tasks(grounding) for a in active_agents],
async def _collect_agent_proposals(agent_key: str, agent: Any) -> Dict[str, Any]: return_exceptions=True
started_at = datetime.now(timezone.utc)
try:
proposals = await asyncio.wait_for(agent.propose_daily_tasks(grounding), timeout=agent_timeout_seconds)
elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000
return {
"agent_key": agent_key,
"status": "ok",
"elapsed_ms": elapsed_ms,
"proposals": proposals if isinstance(proposals, list) else [],
}
except asyncio.TimeoutError:
elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000
return {
"agent_key": agent_key,
"status": "timeout",
"elapsed_ms": elapsed_ms,
"proposals": [],
"error": f"Timed out after {agent_timeout_seconds:.2f}s",
}
except Exception as agent_error:
elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000
return {
"agent_key": agent_key,
"status": "error",
"elapsed_ms": elapsed_ms,
"proposals": [],
"error": str(agent_error),
}
# Execute propose_daily_tasks in parallel with per-agent timeout
committee_results = await asyncio.gather(
*[_collect_agent_proposals(agent_key, agent) for agent_key, agent in active_agents.items()]
)
successful_agent_count = 0
raw_proposals = []
for res in committee_results:
agent_key = res.get("agent_key")
status = res.get("status")
elapsed_ms = res.get("elapsed_ms")
logger.info(
"Agent committee proposal metric | agent={} status={} elapsed_ms={:.2f} timeout_s={:.2f} proposal_count={}",
agent_key,
status,
elapsed_ms,
agent_timeout_seconds,
len(res.get("proposals") or []),
) )
if status == "ok": # Collect successful proposals
successful_agent_count += 1 raw_proposals = []
raw_proposals.extend(res.get("proposals") or []) for res in results:
elif status == "timeout": if isinstance(res, list):
logger.warning(f"Agent proposal timed out for {agent_key}: {res.get('error')}") raw_proposals.extend(res)
else: elif isinstance(res, Exception):
logger.warning(f"Agent proposal failed for {agent_key}: {res.get('error')}") logger.warning(f"Agent proposal failed: {res}")
committee_total_failure = successful_agent_count == 0 # Simple deduplication based on title+pillar
unique_map = {}
for p in raw_proposals:
key = f"{p.pillar_id}:{p.title}"
if key not in unique_map:
unique_map[key] = p
continue
# 3. Filter Redundant Proposals (Self-Learning) existing = unique_map[key]
# Note: We need to ensure we don't filter out essential recurring tasks if they were completed long ago if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority):
# But for now, we filter exact duplicates from recent history (last 7 days) unique_map[key] = p
# We can implement semantic filtering later continue
# Simple deduplication based on title+pillar
unique_map = {}
for p in raw_proposals:
# Normalize pillar IDs and filter invalid proposals (re-apply from PR #383)
pillar_id = str(p.get("pillarId") or "").lower().strip()
if pillar_id not in PILLAR_IDS:
logger.warning(f"Skipping proposal with invalid pillarId: {pillar_id}. Proposal: {p}")
continue
p["pillarId"] = pillar_id
key = f"{p.get('pillarId')}:{p.get('title')}" # Deterministic tie-breaker for equal priority proposals.
if key not in unique_map: if (
unique_map[key] = p _proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority)
continue and _proposal_order_key(p) < _proposal_order_key(existing)
):
unique_map[key] = p
existing = unique_map[key] agent_tasks = list(unique_map.values())
if _proposal_priority_rank(p.get('priority')) > _proposal_priority_rank(existing.get('priority')):
unique_map[key] = p
continue
# Deterministic tie-breaker for equal priority proposals. # Check memory for rejections (semantic filter)
if ( agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks)
_proposal_priority_rank(p.get('priority')) == _proposal_priority_rank(existing.get('priority'))
and _proposal_order_key(p) < _proposal_order_key(existing)
):
unique_map[key] = p
agent_tasks = list(unique_map.values())
# Phase 3: Check memory for rejections (Semantic Filter)
agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks)
except Exception as e:
logger.error(f"Committee proposal phase failed: {e}")
committee_total_failure = True
# Continue to fallback or LLM generation if committee fails
# 4. Final Selection except Exception as e:
# Use committee outcomes whenever committee partially succeeds, even with sparse proposals. logger.error(f"Committee proposal phase failed: {e}")
if not committee_total_failure: generation_path = "llm_fallback"
# 3. Final Selection
if generation_path == "committee" and agent_tasks:
logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee") logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee")
# Convert TaskProposal objects to dicts for frontend
final_tasks = [] final_tasks = []
for prop in agent_tasks: for prop in agent_tasks:
# Handle both object and dict types final_tasks.append({
if isinstance(prop, dict): "pillarId": prop.pillar_id,
final_tasks.append({ "title": prop.title,
"pillarId": prop.get("pillarId"), "description": prop.description,
"title": prop.get("title"), "priority": prop.priority,
"description": prop.get("description"), "estimatedTime": prop.estimated_time,
"priority": prop.get("priority"), "actionType": prop.action_type,
"estimatedTime": prop.get("estimatedTime"), "actionUrl": prop.action_url,
"actionType": prop.get("actionType"), "enabled": True,
"actionUrl": prop.get("actionUrl"), "metadata": {
"enabled": True, "source_agent": prop.source_agent,
"metadata": { "reasoning": prop.reasoning,
"source": "agent_committee", "context_data": prop.context_data
"source_agent": prop.get("source_agent"), }
"reasoning": prop.get("reasoning"), })
"context_data": prop.get("context_data")
}
})
else:
final_tasks.append({
"pillarId": prop.pillar_id,
"title": prop.title,
"description": prop.description,
"priority": prop.priority,
"estimatedTime": prop.estimated_time,
"actionType": prop.action_type,
"actionUrl": prop.action_url,
"enabled": True,
"metadata": {
"source": "agent_committee",
"source_agent": prop.source_agent,
"reasoning": prop.reasoning,
"context_data": prop.context_data
}
})
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
return { return {
"date": date, "date": date,
"tasks": final_tasks, "tasks": final_tasks,
"provenance": {"generation_mode": "agent_committee", "committee_agent_count": len(active_agents), "fallback_used": any((t.get("metadata") or {}).get("source") in {"llm_pillar_backfill", "controlled_fallback"} for t in final_tasks)}, "metadata": {
"generation_path": "committee",
"committee": {
"minimum_active_agents": min_active_agents,
"active_agents": {
"count": active_agents_count,
"names": active_agent_names,
},
"onboarding_gated_initialization": onboarding_gated_initialization,
"orchestrator_initialization_state": initialization_state,
},
"degraded": degraded_metadata,
},
} }
# Fallback to original LLM generation if agents returned nothing if generation_path != "controlled_fallback":
generation_path = "llm_fallback"
if generation_path == "controlled_fallback":
fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding)
return {
"date": date,
"tasks": fallback_tasks,
"metadata": {
"generation_path": "controlled_fallback",
"committee": {
"minimum_active_agents": min_active_agents,
"active_agents": {
"count": active_agents_count,
"names": active_agent_names,
},
"onboarding_gated_initialization": onboarding_gated_initialization,
"orchestrator_initialization_state": initialization_state,
},
"degraded": degraded_metadata,
},
}
# Fallback to original LLM generation if committee returned nothing
logger.info("Agent committee returned no tasks, falling back to LLM generation") logger.info("Agent committee returned no tasks, falling back to LLM generation")
schema = { schema = {
@@ -590,19 +555,32 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
tasks = result.get("tasks") if isinstance(result, dict) else None tasks = result.get("tasks") if isinstance(result, dict) else None
if not isinstance(tasks, list) or not tasks: if not isinstance(tasks, list) or not tasks:
generation_path = "controlled_fallback"
tasks = _fallback_tasks(date) tasks = _fallback_tasks(date)
enriched_tasks = _ensure_pillar_coverage(tasks, user_id, date, grounding)
result = { result = {
"date": date, "date": date,
"tasks": enriched_tasks, "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
"provenance": {"generation_mode": "llm_generation", "committee_agent_count": len(active_agents) if "active_agents" in locals() else 0, "fallback_used": any((t.get("metadata") or {}).get("source") in {"llm_pillar_backfill", "controlled_fallback"} for t in enriched_tasks)}, "metadata": {
"generation_path": generation_path,
"committee": {
"minimum_active_agents": min_active_agents,
"active_agents": {
"count": active_agents_count,
"names": active_agent_names,
},
"onboarding_gated_initialization": onboarding_gated_initialization,
"orchestrator_initialization_state": initialization_state,
},
"degraded": degraded_metadata,
},
} }
activity.log_event( activity.log_event(
event_type="final_summary", event_type="final_summary",
severity="info", severity="info",
message="Daily workflow plan generated", message="Daily workflow plan generated",
payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", []))}), payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", [])), "generation_path": generation_path}),
run_id=run.id, run_id=run.id,
agent_type="TodayWorkflowGenerator", agent_type="TodayWorkflowGenerator",
) )
@@ -629,19 +607,12 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str) plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
tasks = plan_data.get("tasks", []) tasks = plan_data.get("tasks", [])
provenance = plan_data.get("provenance") if isinstance(plan_data.get("provenance"), dict) else {}
def _create_plan(): def _create_plan():
generation_mode = provenance.get("generation_mode") if provenance.get("generation_mode") in TASK_SOURCE_ENUM else "llm_generation"
committee_agent_count = int(provenance.get("committee_agent_count") or 0)
fallback_used = bool(provenance.get("fallback_used", False))
plan = DailyWorkflowPlan( plan = DailyWorkflowPlan(
user_id=user_id, user_id=user_id,
date=date_str, date=date_str,
source="agent", source="agent",
generation_mode=generation_mode,
committee_agent_count=max(0, committee_agent_count),
fallback_used=fallback_used,
plan_json=plan_data, plan_json=plan_data,
created_at=datetime.utcnow(), created_at=datetime.utcnow(),
updated_at=datetime.utcnow(), updated_at=datetime.utcnow(),
@@ -665,8 +636,8 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
estimated_time=int(t.get("estimatedTime") or 15), estimated_time=int(t.get("estimatedTime") or 15),
action_type=str(t.get("actionType") or "navigate").strip()[:20], action_type=str(t.get("actionType") or "navigate").strip()[:20],
action_url=str(t.get("actionUrl") or "").strip(), action_url=str(t.get("actionUrl") or "").strip(),
dependencies=coerce_dependencies(t.get("dependencies")), dependencies=json.dumps(t.get("dependencies") or []),
metadata_json=_normalize_task_metadata(t, default_source=generation_mode), metadata_json=t.get("metadata") or {},
enabled=bool(t.get("enabled", True)), enabled=bool(t.get("enabled", True)),
created_at=datetime.utcnow(), created_at=datetime.utcnow(),
updated_at=datetime.utcnow(), updated_at=datetime.utcnow(),