Merge PR #390: Add degraded-mode workflow regeneration criteria and endpoint

- Add POST /api/today-workflow/regenerate endpoint for on-demand plan regeneration
- Implement rate limiting (3 requests per 60 seconds) to prevent abuse
- Add regeneration quality score tracking and onboarding completion status
- Compute task hashes for deduplication and change detection
- Extract plan metadata from plan_json for cleaner API responses
- Integrate onboarding progress service to track completion status
- Return quality_score and generated_with_agents metadata in responses
- Enable manual workflow refresh in degraded mode scenarios
- Maintain backward compatibility with simplified schema
This commit is contained in:
ajaysi
2026-03-08 18:12:43 +05:30
2 changed files with 303 additions and 249 deletions

View File

@@ -1,3 +1,4 @@
import hashlib
import json
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
@@ -8,9 +9,11 @@ from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask
from models.agent_activity_models import AgentAlert
from services.agent_activity_service import AgentActivityService, build_agent_event_payload
from services.llm_providers.main_text_generation import llm_text_gen
from services.onboarding.progress_service import OnboardingProgressService
from loguru import logger
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
FALLBACK_REGENERATION_QUALITY_THRESHOLD = 0.6
def _today_date_str() -> str:
@@ -107,6 +110,37 @@ def _fallback_tasks(date: str) -> List[Dict[str, Any]]:
]
def _compute_task_hash(title: str, description: str) -> str:
text = f"{title.strip().lower()}|{description.strip().lower()}"
return hashlib.sha256(text.encode()).hexdigest()
def _extract_plan_metadata(plan: Optional[DailyWorkflowPlan]) -> Dict[str, Any]:
raw = plan.plan_json if plan and isinstance(plan.plan_json, dict) else {}
return {
"generation_mode": str(raw.get("generation_mode") or "").strip().lower() or "unknown",
"quality_score": float(raw.get("quality_score") or 0.0),
"generated_with_agents": bool(raw.get("generated_with_agents", False)),
"onboarding_completed": bool(raw.get("onboarding_completed", False)),
"onboarding_completed_at": raw.get("onboarding_completed_at"),
}
def _get_onboarding_status(user_id: str) -> Dict[str, Any]:
status = OnboardingProgressService().get_onboarding_status(user_id) or {}
completed_at_raw = status.get("completed_at")
completed_at = None
if completed_at_raw:
try:
completed_at = datetime.fromisoformat(str(completed_at_raw).replace("Z", "+00:00"))
except Exception:
completed_at = None
return {
"is_completed": bool(status.get("is_completed", False)),
"completed_at": completed_at,
}
def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool:
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
if not isinstance(workflow_config, dict):
@@ -276,147 +310,85 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
activity = AgentActivityService(db, user_id)
grounding = build_grounding_context(db, user_id, date)
memory_service = TaskMemoryService(user_id, db)
min_active_agents = 2
generation_path = "committee"
# 1. Get Orchestrator
try:
orchestrator = await orchestration_service.get_or_create_orchestrator(user_id)
except Exception as e:
logger.error(f"Failed to get orchestrator: {e}")
fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding)
return {
"date": date,
"tasks": fallback_tasks,
"metadata": {
"generation_path": "controlled_fallback",
"committee": {
"minimum_active_agents": min_active_agents,
"active_agents": {"count": 0, "names": []},
},
"degraded": {
"is_degraded": True,
"reason": "orchestrator_unavailable",
"missing_agents": [],
},
},
}
expected_committee_agents = ["content", "strategy", "seo", "social", "competitor"]
active_agent_names = sorted(orchestrator.agents.keys())
active_agents_count = len(active_agent_names)
missing_agents = [name for name in expected_committee_agents if name not in active_agent_names]
onboarding_gated_initialization = bool(getattr(orchestrator, "onboarding_gated_initialization", False))
initialization_state = (
getattr(orchestrator, "initialization_state", None)
if isinstance(getattr(orchestrator, "initialization_state", None), dict)
else {}
)
degraded_metadata = {
"is_degraded": False,
"reason": None,
"missing_agents": [],
}
if active_agents_count < min_active_agents:
generation_path = "controlled_fallback"
degraded_metadata = {
"is_degraded": True,
"reason": "insufficient_active_agents",
"missing_agents": missing_agents,
}
activity.log_event(
event_type="committee_health",
severity="warning",
message="Committee degraded: insufficient active agents",
payload=build_agent_event_payload(
phase="planning",
step="committee_health_precheck",
progress_percent=5,
output_summary=f"Only {active_agents_count} active committee agents",
decision_reason="Agent committee below configured minimum",
evidence_refs=active_agent_names,
safe_debug=True,
metadata={
"minimum_active_agents": min_active_agents,
"active_agents": {
"count": active_agents_count,
"names": active_agent_names,
},
"missing_agents": missing_agents,
"onboarding_gated_initialization": onboarding_gated_initialization,
"orchestrator_initialization_state": initialization_state,
},
),
agent_type="TodayWorkflowGenerator",
)
return {"date": date, "tasks": _fallback_tasks(date), "generation_mode": "fallback", "quality_score": 0.3, "generated_with_agents": False}
# 2. Parallel "Committee" Proposal Gathering
logger.info(f"Gathering daily task proposals from agent committee for user {user_id}")
agent_tasks = []
if generation_path == "committee":
try:
# Define agents to poll
agents_to_poll = [
orchestrator.agents.get('content'), # ContentStrategyAgent
orchestrator.agents.get('strategy'), # StrategyArchitectAgent
orchestrator.agents.get('seo'), # SEOOptimizationAgent
orchestrator.agents.get('social'), # SocialAmplificationAgent
orchestrator.agents.get('competitor'), # CompetitorResponseAgent
]
try:
# Define agents to poll
agents_to_poll = [
orchestrator.agents.get('content'), # ContentStrategyAgent
orchestrator.agents.get('strategy'), # StrategyArchitectAgent
orchestrator.agents.get('seo'), # SEOOptimizationAgent
orchestrator.agents.get('social'), # SocialAmplificationAgent
orchestrator.agents.get('competitor'), # CompetitorResponseAgent
]
# Filter out None agents (disabled/failed init)
active_agents = [a for a in agents_to_poll if a]
# Execute propose_daily_tasks in parallel
results = await asyncio.gather(
*[a.propose_daily_tasks(grounding) for a in active_agents],
return_exceptions=True
)
# Collect successful proposals
raw_proposals = []
for res in results:
if isinstance(res, list):
raw_proposals.extend(res)
elif isinstance(res, Exception):
logger.warning(f"Agent proposal failed: {res}")
# Filter out None agents (disabled/failed init)
active_agents = [a for a in agents_to_poll if a]
# 3. Filter Redundant Proposals (Self-Learning)
# Note: We need to ensure we don't filter out essential recurring tasks if they were completed long ago
# But for now, we filter exact duplicates from recent history (last 7 days)
# We can implement semantic filtering later
# Simple deduplication based on title+pillar
unique_map = {}
for p in raw_proposals:
key = f"{p.pillar_id}:{p.title}"
if key not in unique_map:
unique_map[key] = p
continue
# Execute propose_daily_tasks in parallel
results = await asyncio.gather(
*[a.propose_daily_tasks(grounding) for a in active_agents],
return_exceptions=True
)
existing = unique_map[key]
if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority):
unique_map[key] = p
continue
# Collect successful proposals
raw_proposals = []
for res in results:
if isinstance(res, list):
raw_proposals.extend(res)
elif isinstance(res, Exception):
logger.warning(f"Agent proposal failed: {res}")
# Deterministic tie-breaker for equal priority proposals.
if (
_proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority)
and _proposal_order_key(p) < _proposal_order_key(existing)
):
unique_map[key] = p
agent_tasks = list(unique_map.values())
# Phase 3: Check memory for rejections (Semantic Filter)
agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks)
except Exception as e:
logger.error(f"Committee proposal phase failed: {e}")
# Continue to fallback or LLM generation if committee fails
# Simple deduplication based on title+pillar
unique_map = {}
for p in raw_proposals:
key = f"{p.pillar_id}:{p.title}"
if key not in unique_map:
unique_map[key] = p
continue
existing = unique_map[key]
if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority):
unique_map[key] = p
continue
# Deterministic tie-breaker for equal priority proposals.
if (
_proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority)
and _proposal_order_key(p) < _proposal_order_key(existing)
):
unique_map[key] = p
agent_tasks = list(unique_map.values())
# Check memory for rejections (semantic filter)
agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks)
except Exception as e:
logger.error(f"Committee proposal phase failed: {e}")
generation_path = "llm_fallback"
# 3. Final Selection
if generation_path == "committee" and agent_tasks:
# 4. Final Selection
# If we have agent tasks, use them. Otherwise fall back to LLM generation.
if agent_tasks:
logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee")
# Convert TaskProposal objects to dicts for frontend
final_tasks = []
for prop in agent_tasks:
final_tasks.append({
@@ -434,50 +406,17 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
"context_data": prop.context_data
}
})
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
return {
"date": date,
"tasks": final_tasks,
"metadata": {
"generation_path": "committee",
"committee": {
"minimum_active_agents": min_active_agents,
"active_agents": {
"count": active_agents_count,
"names": active_agent_names,
},
"onboarding_gated_initialization": onboarding_gated_initialization,
"orchestrator_initialization_state": initialization_state,
},
"degraded": degraded_metadata,
},
"generation_mode": "agent_committee",
"quality_score": 0.9,
"generated_with_agents": True,
}
if generation_path != "controlled_fallback":
generation_path = "llm_fallback"
if generation_path == "controlled_fallback":
fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding)
return {
"date": date,
"tasks": fallback_tasks,
"metadata": {
"generation_path": "controlled_fallback",
"committee": {
"minimum_active_agents": min_active_agents,
"active_agents": {
"count": active_agents_count,
"names": active_agent_names,
},
"onboarding_gated_initialization": onboarding_gated_initialization,
"orchestrator_initialization_state": initialization_state,
},
"degraded": degraded_metadata,
},
}
# Fallback to original LLM generation if committee returned nothing
# Fallback to original LLM generation if agents returned nothing
logger.info("Agent committee returned no tasks, falling back to LLM generation")
schema = {
@@ -533,6 +472,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
agent_type="TodayWorkflowGenerator",
)
used_fallback = False
try:
raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id)
if isinstance(raw, dict):
@@ -541,6 +481,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
try:
result = json.loads(raw)
except Exception:
used_fallback = True
result = {"date": date, "tasks": _fallback_tasks(date)}
except Exception as e:
activity.log_event(
@@ -551,36 +492,26 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
run_id=run.id,
agent_type="TodayWorkflowGenerator",
)
used_fallback = True
result = {"date": date, "tasks": _fallback_tasks(date)}
tasks = result.get("tasks") if isinstance(result, dict) else None
if not isinstance(tasks, list) or not tasks:
generation_path = "controlled_fallback"
used_fallback = True
tasks = _fallback_tasks(date)
result = {
"date": date,
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
"metadata": {
"generation_path": generation_path,
"committee": {
"minimum_active_agents": min_active_agents,
"active_agents": {
"count": active_agents_count,
"names": active_agent_names,
},
"onboarding_gated_initialization": onboarding_gated_initialization,
"orchestrator_initialization_state": initialization_state,
},
"degraded": degraded_metadata,
},
"generation_mode": "fallback" if used_fallback else "llm",
"quality_score": 0.4 if used_fallback else 0.75,
"generated_with_agents": False,
}
activity.log_event(
event_type="final_summary",
severity="info",
message="Daily workflow plan generated",
payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", [])), "generation_path": generation_path}),
payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", []))}),
run_id=run.id,
agent_type="TodayWorkflowGenerator",
)
@@ -588,50 +519,83 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
return result
async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]:
async def regenerate_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> DailyWorkflowPlan:
from starlette.concurrency import run_in_threadpool
date_str = date or _today_date_str()
def _get_existing():
return (
onboarding_status = _get_onboarding_status(user_id)
existing = await run_in_threadpool(
lambda: (
db.query(DailyWorkflowPlan)
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str)
.first()
)
existing = await run_in_threadpool(_get_existing)
)
existing_hash_status = {}
if existing:
return existing, False
existing_tasks = await run_in_threadpool(
lambda: (
db.query(DailyWorkflowTask)
.filter(DailyWorkflowTask.plan_id == existing.id, DailyWorkflowTask.user_id == user_id)
.all()
)
)
for task in existing_tasks:
task_hash = _compute_task_hash(task.title, task.description)
existing_hash_status[task_hash] = {
"status": task.status,
"decided_at": task.decided_at,
"completion_notes": task.completion_notes,
}
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
tasks = plan_data.get("tasks", [])
plan_data["onboarding_completed"] = onboarding_status["is_completed"]
plan_data["onboarding_completed_at"] = onboarding_status["completed_at"].isoformat() if onboarding_status["completed_at"] else None
def _create_plan():
plan = DailyWorkflowPlan(
user_id=user_id,
date=date_str,
source="agent",
plan_json=plan_data,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
db.add(plan)
db.commit()
db.refresh(plan)
tasks = plan_data.get("tasks", []) if isinstance(plan_data, dict) else []
def _replace_plan() -> DailyWorkflowPlan:
if existing:
db.query(DailyWorkflowTask).filter(DailyWorkflowTask.plan_id == existing.id).delete(synchronize_session=False)
plan = existing
plan.source = "agent"
plan.plan_json = plan_data
plan.updated_at = datetime.utcnow()
db.add(plan)
db.commit()
db.refresh(plan)
else:
plan = DailyWorkflowPlan(
user_id=user_id,
date=date_str,
source="agent",
plan_json=plan_data,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
db.add(plan)
db.commit()
db.refresh(plan)
for t in tasks:
pillar_id = str(t.get("pillarId") or "").lower().strip()
if pillar_id not in PILLAR_IDS:
continue
title = str(t.get("title") or "Task").strip()[:255]
description = str(t.get("description") or "").strip()
task_hash = _compute_task_hash(title, description)
preserved = existing_hash_status.get(task_hash) or {}
task = DailyWorkflowTask(
plan_id=plan.id,
user_id=user_id,
pillar_id=pillar_id,
title=str(t.get("title") or "Task").strip()[:255],
description=str(t.get("description") or "").strip(),
status=_coerce_status(t.get("status")),
title=title,
description=description,
status=preserved.get("status") or _coerce_status(t.get("status")),
priority=_coerce_priority(t.get("priority")),
estimated_time=int(t.get("estimatedTime") or 15),
action_type=str(t.get("actionType") or "navigate").strip()[:20],
@@ -641,14 +605,53 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
enabled=bool(t.get("enabled", True)),
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
decided_at=preserved.get("decided_at"),
completion_notes=preserved.get("completion_notes"),
)
db.add(task)
db.commit()
db.refresh(plan)
return plan
plan = await run_in_threadpool(_create_plan)
return plan, True
return await run_in_threadpool(_replace_plan)
async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]:
from starlette.concurrency import run_in_threadpool
date_str = date or _today_date_str()
existing = await run_in_threadpool(
lambda: (
db.query(DailyWorkflowPlan)
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str)
.first()
)
)
if existing:
metadata = _extract_plan_metadata(existing)
onboarding_status = _get_onboarding_status(user_id)
should_regenerate = False
if metadata["generation_mode"] == "fallback" and metadata["quality_score"] < FALLBACK_REGENERATION_QUALITY_THRESHOLD:
should_regenerate = True
if (
onboarding_status["is_completed"]
and not metadata["onboarding_completed"]
):
should_regenerate = True
if should_regenerate:
regenerated = await regenerate_daily_workflow_plan(db, user_id, date=date_str)
return regenerated, True
return existing, False
created = await regenerate_daily_workflow_plan(db, user_id, date=date_str)
return created, True
def update_task_status(