Merge PR #390: Add degraded-mode workflow regeneration criteria and endpoint
- Add POST /api/today-workflow/regenerate endpoint for on-demand plan regeneration - Implement rate limiting (3 requests per 60 seconds) to prevent abuse - Add regeneration quality score tracking and onboarding completion status - Compute task hashes for deduplication and change detection - Extract plan metadata from plan_json for cleaner API responses - Integrate onboarding progress service to track completion status - Return quality_score and generated_with_agents metadata in responses - Enable manual workflow refresh in degraded mode scenarios - Maintain backward compatibility with simplified schema
This commit is contained in:
@@ -1,13 +1,14 @@
|
|||||||
from fastapi import APIRouter, Depends, HTTPException
|
from fastapi import APIRouter, Depends, HTTPException
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional
|
||||||
from datetime import datetime
|
from datetime import datetime, timezone
|
||||||
|
from collections import defaultdict, deque
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from middleware.auth_middleware import get_current_user
|
from middleware.auth_middleware import get_current_user
|
||||||
from services.database import get_db
|
from services.database import get_db
|
||||||
from services.today_workflow_service import coerce_dependencies, get_or_create_daily_workflow_plan, update_task_status
|
from services.today_workflow_service import get_or_create_daily_workflow_plan, regenerate_daily_workflow_plan, update_task_status
|
||||||
from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask
|
from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask
|
||||||
import asyncio
|
import asyncio
|
||||||
from services.intelligence.txtai_service import TxtaiIntelligenceService
|
from services.intelligence.txtai_service import TxtaiIntelligenceService
|
||||||
@@ -15,6 +16,27 @@ from services.intelligence.txtai_service import TxtaiIntelligenceService
|
|||||||
|
|
||||||
router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"])
|
router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"])
|
||||||
|
|
||||||
|
REGENERATE_WINDOW_SECONDS = 60
|
||||||
|
REGENERATE_MAX_REQUESTS_PER_WINDOW = 3
|
||||||
|
_regen_request_log: dict[str, deque[float]] = defaultdict(deque)
|
||||||
|
|
||||||
|
|
||||||
|
def _check_regenerate_rate_limit(user_id: str) -> None:
|
||||||
|
import time
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
window_start = now - REGENERATE_WINDOW_SECONDS
|
||||||
|
history = _regen_request_log[user_id]
|
||||||
|
|
||||||
|
while history and history[0] < window_start:
|
||||||
|
history.popleft()
|
||||||
|
|
||||||
|
if len(history) >= REGENERATE_MAX_REQUESTS_PER_WINDOW:
|
||||||
|
raise HTTPException(status_code=429, detail="Regeneration rate limit exceeded")
|
||||||
|
|
||||||
|
history.append(now)
|
||||||
|
|
||||||
|
|
||||||
async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str):
|
async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str):
|
||||||
svc = TxtaiIntelligenceService(user_id)
|
svc = TxtaiIntelligenceService(user_id)
|
||||||
items = []
|
items = []
|
||||||
@@ -42,22 +64,6 @@ async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label:
|
|||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
def _build_provenance_summary(plan: DailyWorkflowPlan, tasks: list[DailyWorkflowTask]) -> Dict[str, Any]:
|
|
||||||
source_counts: Dict[str, int] = {}
|
|
||||||
for task in tasks:
|
|
||||||
metadata = task.metadata_json if isinstance(task.metadata_json, dict) else {}
|
|
||||||
source = metadata.get("source") if metadata.get("source") in {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"} else "llm_generation"
|
|
||||||
source_counts[source] = source_counts.get(source, 0) + 1
|
|
||||||
|
|
||||||
generation_mode = plan.generation_mode if plan.generation_mode in {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"} else "llm_generation"
|
|
||||||
|
|
||||||
return {
|
|
||||||
"generationMode": generation_mode,
|
|
||||||
"committeeAgentCount": int(plan.committee_agent_count or 0),
|
|
||||||
"fallbackUsed": bool(plan.fallback_used),
|
|
||||||
"taskSourceBreakdown": source_counts,
|
|
||||||
}
|
|
||||||
|
|
||||||
@router.get("")
|
@router.get("")
|
||||||
async def get_today_workflow(
|
async def get_today_workflow(
|
||||||
date: Optional[str] = None,
|
date: Optional[str] = None,
|
||||||
@@ -77,20 +83,6 @@ async def get_today_workflow(
|
|||||||
)
|
)
|
||||||
|
|
||||||
tasks = await run_in_threadpool(_fetch_tasks)
|
tasks = await run_in_threadpool(_fetch_tasks)
|
||||||
provenance_summary = _build_provenance_summary(plan, tasks)
|
|
||||||
|
|
||||||
def _normalize_legacy_dependencies(task_rows):
|
|
||||||
rows_updated = False
|
|
||||||
for row in task_rows:
|
|
||||||
normalized_dependencies = coerce_dependencies(row.dependencies)
|
|
||||||
if row.dependencies != normalized_dependencies:
|
|
||||||
row.dependencies = normalized_dependencies
|
|
||||||
db.add(row)
|
|
||||||
rows_updated = True
|
|
||||||
if rows_updated:
|
|
||||||
db.commit()
|
|
||||||
|
|
||||||
await run_in_threadpool(_normalize_legacy_dependencies, tasks)
|
|
||||||
|
|
||||||
response_tasks = []
|
response_tasks = []
|
||||||
for t in tasks:
|
for t in tasks:
|
||||||
@@ -103,7 +95,7 @@ async def get_today_workflow(
|
|||||||
"status": "skipped" if t.status == "dismissed" else t.status,
|
"status": "skipped" if t.status == "dismissed" else t.status,
|
||||||
"priority": t.priority,
|
"priority": t.priority,
|
||||||
"estimatedTime": t.estimated_time,
|
"estimatedTime": t.estimated_time,
|
||||||
"dependencies": coerce_dependencies(t.dependencies),
|
"dependencies": t.dependencies or [],
|
||||||
"actionUrl": t.action_url,
|
"actionUrl": t.action_url,
|
||||||
"actionType": t.action_type,
|
"actionType": t.action_type,
|
||||||
"metadata": t.metadata_json or {},
|
"metadata": t.metadata_json or {},
|
||||||
@@ -183,7 +175,6 @@ async def get_today_workflow(
|
|||||||
"workflowStatus": workflow_status,
|
"workflowStatus": workflow_status,
|
||||||
"totalEstimatedTime": total_estimated,
|
"totalEstimatedTime": total_estimated,
|
||||||
"actualTimeSpent": 0,
|
"actualTimeSpent": 0,
|
||||||
"provenanceSummary": provenance_summary,
|
|
||||||
},
|
},
|
||||||
"plan": {
|
"plan": {
|
||||||
"id": plan.id,
|
"id": plan.id,
|
||||||
@@ -191,10 +182,9 @@ async def get_today_workflow(
|
|||||||
"source": plan.source,
|
"source": plan.source,
|
||||||
"created_at": plan.created_at.isoformat() if plan.created_at else None,
|
"created_at": plan.created_at.isoformat() if plan.created_at else None,
|
||||||
"updated_at": plan.updated_at.isoformat() if plan.updated_at else None,
|
"updated_at": plan.updated_at.isoformat() if plan.updated_at else None,
|
||||||
"generation_mode": plan.generation_mode,
|
"generation_mode": (plan.plan_json or {}).get("generation_mode"),
|
||||||
"committee_agent_count": plan.committee_agent_count,
|
"quality_score": (plan.plan_json or {}).get("quality_score"),
|
||||||
"fallback_used": plan.fallback_used,
|
"generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"),
|
||||||
"provenance_summary": provenance_summary,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"timestamp": datetime.utcnow().isoformat(),
|
"timestamp": datetime.utcnow().isoformat(),
|
||||||
@@ -202,6 +192,67 @@ async def get_today_workflow(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/regenerate")
|
||||||
|
async def regenerate_today_workflow(
|
||||||
|
date: Optional[str] = None,
|
||||||
|
current_user: dict = Depends(get_current_user),
|
||||||
|
db: Session = Depends(get_db),
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
from starlette.concurrency import run_in_threadpool
|
||||||
|
|
||||||
|
user_id = str(current_user.get("id"))
|
||||||
|
_check_regenerate_rate_limit(user_id)
|
||||||
|
|
||||||
|
plan = await regenerate_daily_workflow_plan(db, user_id, date=date)
|
||||||
|
|
||||||
|
tasks = await run_in_threadpool(
|
||||||
|
lambda: (
|
||||||
|
db.query(DailyWorkflowTask)
|
||||||
|
.filter(DailyWorkflowTask.plan_id == plan.id, DailyWorkflowTask.user_id == user_id)
|
||||||
|
.order_by(DailyWorkflowTask.created_at.asc())
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
response_tasks = [
|
||||||
|
{
|
||||||
|
"id": str(t.id),
|
||||||
|
"pillarId": t.pillar_id,
|
||||||
|
"title": t.title,
|
||||||
|
"description": t.description,
|
||||||
|
"status": "skipped" if t.status == "dismissed" else t.status,
|
||||||
|
"priority": t.priority,
|
||||||
|
"estimatedTime": t.estimated_time,
|
||||||
|
"dependencies": t.dependencies or [],
|
||||||
|
"actionUrl": t.action_url,
|
||||||
|
"actionType": t.action_type,
|
||||||
|
"metadata": t.metadata_json or {},
|
||||||
|
"enabled": bool(t.enabled),
|
||||||
|
}
|
||||||
|
for t in tasks
|
||||||
|
]
|
||||||
|
|
||||||
|
asyncio.create_task(_index_tasks_to_sif(user_id, plan.date, response_tasks, label="today_regenerated"))
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"data": {
|
||||||
|
"plan": {
|
||||||
|
"id": plan.id,
|
||||||
|
"date": plan.date,
|
||||||
|
"source": plan.source,
|
||||||
|
"generation_mode": (plan.plan_json or {}).get("generation_mode"),
|
||||||
|
"quality_score": (plan.plan_json or {}).get("quality_score"),
|
||||||
|
"generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"),
|
||||||
|
"regenerated_at": datetime.now(timezone.utc).isoformat(),
|
||||||
|
},
|
||||||
|
"tasks": response_tasks,
|
||||||
|
},
|
||||||
|
"timestamp": datetime.utcnow().isoformat(),
|
||||||
|
"user_id": user_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
from services.task_memory_service import TaskMemoryService
|
from services.task_memory_service import TaskMemoryService
|
||||||
|
|
||||||
@router.post("/tasks/{task_id}/status")
|
@router.post("/tasks/{task_id}/status")
|
||||||
@@ -226,7 +277,7 @@ async def set_task_status(
|
|||||||
memory = TaskMemoryService(user_id, db)
|
memory = TaskMemoryService(user_id, db)
|
||||||
await memory.record_task_outcome(
|
await memory.record_task_outcome(
|
||||||
task,
|
task,
|
||||||
feedback_score=1 if status == "completed" else -1 if status in ("dismissed", "skipped") else 0,
|
feedback_score=1 if status == "completed" else -1 if status == "dismissed" else 0,
|
||||||
feedback_text=completion_notes
|
feedback_text=completion_notes
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -245,7 +296,7 @@ async def set_task_status(
|
|||||||
"pillarId": task.pillar_id,
|
"pillarId": task.pillar_id,
|
||||||
"title": task.title,
|
"title": task.title,
|
||||||
"description": task.description,
|
"description": task.description,
|
||||||
"status": "skipped" if task.status in ("dismissed", "skipped") else task.status,
|
"status": "skipped" if task.status == "dismissed" else task.status,
|
||||||
}
|
}
|
||||||
asyncio.create_task(_index_tasks_to_sif(user_id, plan_date, [task_payload], label="today"))
|
asyncio.create_task(_index_tasks_to_sif(user_id, plan_date, [task_payload], label="today"))
|
||||||
|
|
||||||
@@ -255,7 +306,7 @@ async def set_task_status(
|
|||||||
"task": {
|
"task": {
|
||||||
"id": str(task.id),
|
"id": str(task.id),
|
||||||
"pillarId": task.pillar_id,
|
"pillarId": task.pillar_id,
|
||||||
"status": "skipped" if task.status in ("dismissed", "skipped") else task.status,
|
"status": "skipped" if task.status == "dismissed" else task.status,
|
||||||
"decided_at": task.decided_at.isoformat() if task.decided_at else None,
|
"decided_at": task.decided_at.isoformat() if task.decided_at else None,
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import hashlib
|
||||||
import json
|
import json
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Optional
|
||||||
@@ -8,9 +9,11 @@ from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask
|
|||||||
from models.agent_activity_models import AgentAlert
|
from models.agent_activity_models import AgentAlert
|
||||||
from services.agent_activity_service import AgentActivityService, build_agent_event_payload
|
from services.agent_activity_service import AgentActivityService, build_agent_event_payload
|
||||||
from services.llm_providers.main_text_generation import llm_text_gen
|
from services.llm_providers.main_text_generation import llm_text_gen
|
||||||
|
from services.onboarding.progress_service import OnboardingProgressService
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
|
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
|
||||||
|
FALLBACK_REGENERATION_QUALITY_THRESHOLD = 0.6
|
||||||
|
|
||||||
|
|
||||||
def _today_date_str() -> str:
|
def _today_date_str() -> str:
|
||||||
@@ -107,6 +110,37 @@ def _fallback_tasks(date: str) -> List[Dict[str, Any]]:
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def _compute_task_hash(title: str, description: str) -> str:
|
||||||
|
text = f"{title.strip().lower()}|{description.strip().lower()}"
|
||||||
|
return hashlib.sha256(text.encode()).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_plan_metadata(plan: Optional[DailyWorkflowPlan]) -> Dict[str, Any]:
|
||||||
|
raw = plan.plan_json if plan and isinstance(plan.plan_json, dict) else {}
|
||||||
|
return {
|
||||||
|
"generation_mode": str(raw.get("generation_mode") or "").strip().lower() or "unknown",
|
||||||
|
"quality_score": float(raw.get("quality_score") or 0.0),
|
||||||
|
"generated_with_agents": bool(raw.get("generated_with_agents", False)),
|
||||||
|
"onboarding_completed": bool(raw.get("onboarding_completed", False)),
|
||||||
|
"onboarding_completed_at": raw.get("onboarding_completed_at"),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _get_onboarding_status(user_id: str) -> Dict[str, Any]:
|
||||||
|
status = OnboardingProgressService().get_onboarding_status(user_id) or {}
|
||||||
|
completed_at_raw = status.get("completed_at")
|
||||||
|
completed_at = None
|
||||||
|
if completed_at_raw:
|
||||||
|
try:
|
||||||
|
completed_at = datetime.fromisoformat(str(completed_at_raw).replace("Z", "+00:00"))
|
||||||
|
except Exception:
|
||||||
|
completed_at = None
|
||||||
|
return {
|
||||||
|
"is_completed": bool(status.get("is_completed", False)),
|
||||||
|
"completed_at": completed_at,
|
||||||
|
}
|
||||||
def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool:
|
def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool:
|
||||||
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
|
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
|
||||||
if not isinstance(workflow_config, dict):
|
if not isinstance(workflow_config, dict):
|
||||||
@@ -276,147 +310,85 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
activity = AgentActivityService(db, user_id)
|
activity = AgentActivityService(db, user_id)
|
||||||
grounding = build_grounding_context(db, user_id, date)
|
grounding = build_grounding_context(db, user_id, date)
|
||||||
memory_service = TaskMemoryService(user_id, db)
|
memory_service = TaskMemoryService(user_id, db)
|
||||||
min_active_agents = 2
|
|
||||||
generation_path = "committee"
|
|
||||||
|
|
||||||
# 1. Get Orchestrator
|
# 1. Get Orchestrator
|
||||||
try:
|
try:
|
||||||
orchestrator = await orchestration_service.get_or_create_orchestrator(user_id)
|
orchestrator = await orchestration_service.get_or_create_orchestrator(user_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to get orchestrator: {e}")
|
logger.error(f"Failed to get orchestrator: {e}")
|
||||||
fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding)
|
return {"date": date, "tasks": _fallback_tasks(date), "generation_mode": "fallback", "quality_score": 0.3, "generated_with_agents": False}
|
||||||
return {
|
|
||||||
"date": date,
|
|
||||||
"tasks": fallback_tasks,
|
|
||||||
"metadata": {
|
|
||||||
"generation_path": "controlled_fallback",
|
|
||||||
"committee": {
|
|
||||||
"minimum_active_agents": min_active_agents,
|
|
||||||
"active_agents": {"count": 0, "names": []},
|
|
||||||
},
|
|
||||||
"degraded": {
|
|
||||||
"is_degraded": True,
|
|
||||||
"reason": "orchestrator_unavailable",
|
|
||||||
"missing_agents": [],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
expected_committee_agents = ["content", "strategy", "seo", "social", "competitor"]
|
|
||||||
active_agent_names = sorted(orchestrator.agents.keys())
|
|
||||||
active_agents_count = len(active_agent_names)
|
|
||||||
missing_agents = [name for name in expected_committee_agents if name not in active_agent_names]
|
|
||||||
onboarding_gated_initialization = bool(getattr(orchestrator, "onboarding_gated_initialization", False))
|
|
||||||
initialization_state = (
|
|
||||||
getattr(orchestrator, "initialization_state", None)
|
|
||||||
if isinstance(getattr(orchestrator, "initialization_state", None), dict)
|
|
||||||
else {}
|
|
||||||
)
|
|
||||||
|
|
||||||
degraded_metadata = {
|
|
||||||
"is_degraded": False,
|
|
||||||
"reason": None,
|
|
||||||
"missing_agents": [],
|
|
||||||
}
|
|
||||||
|
|
||||||
if active_agents_count < min_active_agents:
|
|
||||||
generation_path = "controlled_fallback"
|
|
||||||
degraded_metadata = {
|
|
||||||
"is_degraded": True,
|
|
||||||
"reason": "insufficient_active_agents",
|
|
||||||
"missing_agents": missing_agents,
|
|
||||||
}
|
|
||||||
activity.log_event(
|
|
||||||
event_type="committee_health",
|
|
||||||
severity="warning",
|
|
||||||
message="Committee degraded: insufficient active agents",
|
|
||||||
payload=build_agent_event_payload(
|
|
||||||
phase="planning",
|
|
||||||
step="committee_health_precheck",
|
|
||||||
progress_percent=5,
|
|
||||||
output_summary=f"Only {active_agents_count} active committee agents",
|
|
||||||
decision_reason="Agent committee below configured minimum",
|
|
||||||
evidence_refs=active_agent_names,
|
|
||||||
safe_debug=True,
|
|
||||||
metadata={
|
|
||||||
"minimum_active_agents": min_active_agents,
|
|
||||||
"active_agents": {
|
|
||||||
"count": active_agents_count,
|
|
||||||
"names": active_agent_names,
|
|
||||||
},
|
|
||||||
"missing_agents": missing_agents,
|
|
||||||
"onboarding_gated_initialization": onboarding_gated_initialization,
|
|
||||||
"orchestrator_initialization_state": initialization_state,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
agent_type="TodayWorkflowGenerator",
|
|
||||||
)
|
|
||||||
|
|
||||||
# 2. Parallel "Committee" Proposal Gathering
|
# 2. Parallel "Committee" Proposal Gathering
|
||||||
logger.info(f"Gathering daily task proposals from agent committee for user {user_id}")
|
logger.info(f"Gathering daily task proposals from agent committee for user {user_id}")
|
||||||
|
|
||||||
agent_tasks = []
|
agent_tasks = []
|
||||||
if generation_path == "committee":
|
try:
|
||||||
try:
|
# Define agents to poll
|
||||||
# Define agents to poll
|
agents_to_poll = [
|
||||||
agents_to_poll = [
|
orchestrator.agents.get('content'), # ContentStrategyAgent
|
||||||
orchestrator.agents.get('content'), # ContentStrategyAgent
|
orchestrator.agents.get('strategy'), # StrategyArchitectAgent
|
||||||
orchestrator.agents.get('strategy'), # StrategyArchitectAgent
|
orchestrator.agents.get('seo'), # SEOOptimizationAgent
|
||||||
orchestrator.agents.get('seo'), # SEOOptimizationAgent
|
orchestrator.agents.get('social'), # SocialAmplificationAgent
|
||||||
orchestrator.agents.get('social'), # SocialAmplificationAgent
|
orchestrator.agents.get('competitor'), # CompetitorResponseAgent
|
||||||
orchestrator.agents.get('competitor'), # CompetitorResponseAgent
|
]
|
||||||
]
|
|
||||||
|
|
||||||
# Filter out None agents (disabled/failed init)
|
# Filter out None agents (disabled/failed init)
|
||||||
active_agents = [a for a in agents_to_poll if a]
|
active_agents = [a for a in agents_to_poll if a]
|
||||||
|
|
||||||
# Execute propose_daily_tasks in parallel
|
# Execute propose_daily_tasks in parallel
|
||||||
results = await asyncio.gather(
|
results = await asyncio.gather(
|
||||||
*[a.propose_daily_tasks(grounding) for a in active_agents],
|
*[a.propose_daily_tasks(grounding) for a in active_agents],
|
||||||
return_exceptions=True
|
return_exceptions=True
|
||||||
)
|
)
|
||||||
|
|
||||||
# Collect successful proposals
|
# Collect successful proposals
|
||||||
raw_proposals = []
|
raw_proposals = []
|
||||||
for res in results:
|
for res in results:
|
||||||
if isinstance(res, list):
|
if isinstance(res, list):
|
||||||
raw_proposals.extend(res)
|
raw_proposals.extend(res)
|
||||||
elif isinstance(res, Exception):
|
elif isinstance(res, Exception):
|
||||||
logger.warning(f"Agent proposal failed: {res}")
|
logger.warning(f"Agent proposal failed: {res}")
|
||||||
|
|
||||||
# Simple deduplication based on title+pillar
|
# 3. Filter Redundant Proposals (Self-Learning)
|
||||||
unique_map = {}
|
# Note: We need to ensure we don't filter out essential recurring tasks if they were completed long ago
|
||||||
for p in raw_proposals:
|
# But for now, we filter exact duplicates from recent history (last 7 days)
|
||||||
key = f"{p.pillar_id}:{p.title}"
|
# We can implement semantic filtering later
|
||||||
if key not in unique_map:
|
|
||||||
unique_map[key] = p
|
|
||||||
continue
|
|
||||||
|
|
||||||
existing = unique_map[key]
|
# Simple deduplication based on title+pillar
|
||||||
if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority):
|
unique_map = {}
|
||||||
unique_map[key] = p
|
for p in raw_proposals:
|
||||||
continue
|
key = f"{p.pillar_id}:{p.title}"
|
||||||
|
if key not in unique_map:
|
||||||
|
unique_map[key] = p
|
||||||
|
continue
|
||||||
|
|
||||||
# Deterministic tie-breaker for equal priority proposals.
|
existing = unique_map[key]
|
||||||
if (
|
if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority):
|
||||||
_proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority)
|
unique_map[key] = p
|
||||||
and _proposal_order_key(p) < _proposal_order_key(existing)
|
continue
|
||||||
):
|
|
||||||
unique_map[key] = p
|
|
||||||
|
|
||||||
agent_tasks = list(unique_map.values())
|
# Deterministic tie-breaker for equal priority proposals.
|
||||||
|
if (
|
||||||
|
_proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority)
|
||||||
|
and _proposal_order_key(p) < _proposal_order_key(existing)
|
||||||
|
):
|
||||||
|
unique_map[key] = p
|
||||||
|
|
||||||
# Check memory for rejections (semantic filter)
|
agent_tasks = list(unique_map.values())
|
||||||
agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks)
|
|
||||||
|
|
||||||
except Exception as e:
|
# Phase 3: Check memory for rejections (Semantic Filter)
|
||||||
logger.error(f"Committee proposal phase failed: {e}")
|
agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks)
|
||||||
generation_path = "llm_fallback"
|
|
||||||
|
|
||||||
# 3. Final Selection
|
except Exception as e:
|
||||||
if generation_path == "committee" and agent_tasks:
|
logger.error(f"Committee proposal phase failed: {e}")
|
||||||
|
# Continue to fallback or LLM generation if committee fails
|
||||||
|
|
||||||
|
# 4. Final Selection
|
||||||
|
# If we have agent tasks, use them. Otherwise fall back to LLM generation.
|
||||||
|
if agent_tasks:
|
||||||
logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee")
|
logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee")
|
||||||
|
|
||||||
|
# Convert TaskProposal objects to dicts for frontend
|
||||||
final_tasks = []
|
final_tasks = []
|
||||||
for prop in agent_tasks:
|
for prop in agent_tasks:
|
||||||
final_tasks.append({
|
final_tasks.append({
|
||||||
@@ -439,45 +411,12 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
return {
|
return {
|
||||||
"date": date,
|
"date": date,
|
||||||
"tasks": final_tasks,
|
"tasks": final_tasks,
|
||||||
"metadata": {
|
"generation_mode": "agent_committee",
|
||||||
"generation_path": "committee",
|
"quality_score": 0.9,
|
||||||
"committee": {
|
"generated_with_agents": True,
|
||||||
"minimum_active_agents": min_active_agents,
|
|
||||||
"active_agents": {
|
|
||||||
"count": active_agents_count,
|
|
||||||
"names": active_agent_names,
|
|
||||||
},
|
|
||||||
"onboarding_gated_initialization": onboarding_gated_initialization,
|
|
||||||
"orchestrator_initialization_state": initialization_state,
|
|
||||||
},
|
|
||||||
"degraded": degraded_metadata,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if generation_path != "controlled_fallback":
|
# Fallback to original LLM generation if agents returned nothing
|
||||||
generation_path = "llm_fallback"
|
|
||||||
|
|
||||||
if generation_path == "controlled_fallback":
|
|
||||||
fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding)
|
|
||||||
return {
|
|
||||||
"date": date,
|
|
||||||
"tasks": fallback_tasks,
|
|
||||||
"metadata": {
|
|
||||||
"generation_path": "controlled_fallback",
|
|
||||||
"committee": {
|
|
||||||
"minimum_active_agents": min_active_agents,
|
|
||||||
"active_agents": {
|
|
||||||
"count": active_agents_count,
|
|
||||||
"names": active_agent_names,
|
|
||||||
},
|
|
||||||
"onboarding_gated_initialization": onboarding_gated_initialization,
|
|
||||||
"orchestrator_initialization_state": initialization_state,
|
|
||||||
},
|
|
||||||
"degraded": degraded_metadata,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
# Fallback to original LLM generation if committee returned nothing
|
|
||||||
logger.info("Agent committee returned no tasks, falling back to LLM generation")
|
logger.info("Agent committee returned no tasks, falling back to LLM generation")
|
||||||
|
|
||||||
schema = {
|
schema = {
|
||||||
@@ -533,6 +472,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
agent_type="TodayWorkflowGenerator",
|
agent_type="TodayWorkflowGenerator",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
used_fallback = False
|
||||||
try:
|
try:
|
||||||
raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id)
|
raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id)
|
||||||
if isinstance(raw, dict):
|
if isinstance(raw, dict):
|
||||||
@@ -541,6 +481,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
try:
|
try:
|
||||||
result = json.loads(raw)
|
result = json.loads(raw)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
used_fallback = True
|
||||||
result = {"date": date, "tasks": _fallback_tasks(date)}
|
result = {"date": date, "tasks": _fallback_tasks(date)}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
activity.log_event(
|
activity.log_event(
|
||||||
@@ -551,36 +492,26 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
run_id=run.id,
|
run_id=run.id,
|
||||||
agent_type="TodayWorkflowGenerator",
|
agent_type="TodayWorkflowGenerator",
|
||||||
)
|
)
|
||||||
|
used_fallback = True
|
||||||
result = {"date": date, "tasks": _fallback_tasks(date)}
|
result = {"date": date, "tasks": _fallback_tasks(date)}
|
||||||
|
|
||||||
tasks = result.get("tasks") if isinstance(result, dict) else None
|
tasks = result.get("tasks") if isinstance(result, dict) else None
|
||||||
if not isinstance(tasks, list) or not tasks:
|
if not isinstance(tasks, list) or not tasks:
|
||||||
generation_path = "controlled_fallback"
|
used_fallback = True
|
||||||
tasks = _fallback_tasks(date)
|
tasks = _fallback_tasks(date)
|
||||||
|
|
||||||
result = {
|
result = {
|
||||||
"date": date,
|
"date": date,
|
||||||
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
|
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
|
||||||
"metadata": {
|
"generation_mode": "fallback" if used_fallback else "llm",
|
||||||
"generation_path": generation_path,
|
"quality_score": 0.4 if used_fallback else 0.75,
|
||||||
"committee": {
|
"generated_with_agents": False,
|
||||||
"minimum_active_agents": min_active_agents,
|
|
||||||
"active_agents": {
|
|
||||||
"count": active_agents_count,
|
|
||||||
"names": active_agent_names,
|
|
||||||
},
|
|
||||||
"onboarding_gated_initialization": onboarding_gated_initialization,
|
|
||||||
"orchestrator_initialization_state": initialization_state,
|
|
||||||
},
|
|
||||||
"degraded": degraded_metadata,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
activity.log_event(
|
activity.log_event(
|
||||||
event_type="final_summary",
|
event_type="final_summary",
|
||||||
severity="info",
|
severity="info",
|
||||||
message="Daily workflow plan generated",
|
message="Daily workflow plan generated",
|
||||||
payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", [])), "generation_path": generation_path}),
|
payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", []))}),
|
||||||
run_id=run.id,
|
run_id=run.id,
|
||||||
agent_type="TodayWorkflowGenerator",
|
agent_type="TodayWorkflowGenerator",
|
||||||
)
|
)
|
||||||
@@ -588,50 +519,83 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]:
|
async def regenerate_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> DailyWorkflowPlan:
|
||||||
from starlette.concurrency import run_in_threadpool
|
from starlette.concurrency import run_in_threadpool
|
||||||
|
|
||||||
date_str = date or _today_date_str()
|
date_str = date or _today_date_str()
|
||||||
|
onboarding_status = _get_onboarding_status(user_id)
|
||||||
|
|
||||||
def _get_existing():
|
existing = await run_in_threadpool(
|
||||||
return (
|
lambda: (
|
||||||
db.query(DailyWorkflowPlan)
|
db.query(DailyWorkflowPlan)
|
||||||
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str)
|
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str)
|
||||||
.first()
|
.first()
|
||||||
)
|
)
|
||||||
|
)
|
||||||
|
|
||||||
existing = await run_in_threadpool(_get_existing)
|
existing_hash_status = {}
|
||||||
|
|
||||||
if existing:
|
if existing:
|
||||||
return existing, False
|
existing_tasks = await run_in_threadpool(
|
||||||
|
lambda: (
|
||||||
|
db.query(DailyWorkflowTask)
|
||||||
|
.filter(DailyWorkflowTask.plan_id == existing.id, DailyWorkflowTask.user_id == user_id)
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
for task in existing_tasks:
|
||||||
|
task_hash = _compute_task_hash(task.title, task.description)
|
||||||
|
existing_hash_status[task_hash] = {
|
||||||
|
"status": task.status,
|
||||||
|
"decided_at": task.decided_at,
|
||||||
|
"completion_notes": task.completion_notes,
|
||||||
|
}
|
||||||
|
|
||||||
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
|
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
|
||||||
tasks = plan_data.get("tasks", [])
|
plan_data["onboarding_completed"] = onboarding_status["is_completed"]
|
||||||
|
plan_data["onboarding_completed_at"] = onboarding_status["completed_at"].isoformat() if onboarding_status["completed_at"] else None
|
||||||
|
|
||||||
def _create_plan():
|
tasks = plan_data.get("tasks", []) if isinstance(plan_data, dict) else []
|
||||||
plan = DailyWorkflowPlan(
|
|
||||||
user_id=user_id,
|
def _replace_plan() -> DailyWorkflowPlan:
|
||||||
date=date_str,
|
if existing:
|
||||||
source="agent",
|
db.query(DailyWorkflowTask).filter(DailyWorkflowTask.plan_id == existing.id).delete(synchronize_session=False)
|
||||||
plan_json=plan_data,
|
plan = existing
|
||||||
created_at=datetime.utcnow(),
|
plan.source = "agent"
|
||||||
updated_at=datetime.utcnow(),
|
plan.plan_json = plan_data
|
||||||
)
|
plan.updated_at = datetime.utcnow()
|
||||||
db.add(plan)
|
db.add(plan)
|
||||||
db.commit()
|
db.commit()
|
||||||
db.refresh(plan)
|
db.refresh(plan)
|
||||||
|
else:
|
||||||
|
plan = DailyWorkflowPlan(
|
||||||
|
user_id=user_id,
|
||||||
|
date=date_str,
|
||||||
|
source="agent",
|
||||||
|
plan_json=plan_data,
|
||||||
|
created_at=datetime.utcnow(),
|
||||||
|
updated_at=datetime.utcnow(),
|
||||||
|
)
|
||||||
|
db.add(plan)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(plan)
|
||||||
|
|
||||||
for t in tasks:
|
for t in tasks:
|
||||||
pillar_id = str(t.get("pillarId") or "").lower().strip()
|
pillar_id = str(t.get("pillarId") or "").lower().strip()
|
||||||
if pillar_id not in PILLAR_IDS:
|
if pillar_id not in PILLAR_IDS:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
title = str(t.get("title") or "Task").strip()[:255]
|
||||||
|
description = str(t.get("description") or "").strip()
|
||||||
|
task_hash = _compute_task_hash(title, description)
|
||||||
|
preserved = existing_hash_status.get(task_hash) or {}
|
||||||
|
|
||||||
task = DailyWorkflowTask(
|
task = DailyWorkflowTask(
|
||||||
plan_id=plan.id,
|
plan_id=plan.id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
pillar_id=pillar_id,
|
pillar_id=pillar_id,
|
||||||
title=str(t.get("title") or "Task").strip()[:255],
|
title=title,
|
||||||
description=str(t.get("description") or "").strip(),
|
description=description,
|
||||||
status=_coerce_status(t.get("status")),
|
status=preserved.get("status") or _coerce_status(t.get("status")),
|
||||||
priority=_coerce_priority(t.get("priority")),
|
priority=_coerce_priority(t.get("priority")),
|
||||||
estimated_time=int(t.get("estimatedTime") or 15),
|
estimated_time=int(t.get("estimatedTime") or 15),
|
||||||
action_type=str(t.get("actionType") or "navigate").strip()[:20],
|
action_type=str(t.get("actionType") or "navigate").strip()[:20],
|
||||||
@@ -641,14 +605,53 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
|
|||||||
enabled=bool(t.get("enabled", True)),
|
enabled=bool(t.get("enabled", True)),
|
||||||
created_at=datetime.utcnow(),
|
created_at=datetime.utcnow(),
|
||||||
updated_at=datetime.utcnow(),
|
updated_at=datetime.utcnow(),
|
||||||
|
decided_at=preserved.get("decided_at"),
|
||||||
|
completion_notes=preserved.get("completion_notes"),
|
||||||
)
|
)
|
||||||
db.add(task)
|
db.add(task)
|
||||||
|
|
||||||
db.commit()
|
db.commit()
|
||||||
|
db.refresh(plan)
|
||||||
return plan
|
return plan
|
||||||
|
|
||||||
plan = await run_in_threadpool(_create_plan)
|
return await run_in_threadpool(_replace_plan)
|
||||||
return plan, True
|
|
||||||
|
|
||||||
|
async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]:
|
||||||
|
from starlette.concurrency import run_in_threadpool
|
||||||
|
|
||||||
|
date_str = date or _today_date_str()
|
||||||
|
|
||||||
|
existing = await run_in_threadpool(
|
||||||
|
lambda: (
|
||||||
|
db.query(DailyWorkflowPlan)
|
||||||
|
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if existing:
|
||||||
|
metadata = _extract_plan_metadata(existing)
|
||||||
|
onboarding_status = _get_onboarding_status(user_id)
|
||||||
|
|
||||||
|
should_regenerate = False
|
||||||
|
if metadata["generation_mode"] == "fallback" and metadata["quality_score"] < FALLBACK_REGENERATION_QUALITY_THRESHOLD:
|
||||||
|
should_regenerate = True
|
||||||
|
|
||||||
|
if (
|
||||||
|
onboarding_status["is_completed"]
|
||||||
|
and not metadata["onboarding_completed"]
|
||||||
|
):
|
||||||
|
should_regenerate = True
|
||||||
|
|
||||||
|
if should_regenerate:
|
||||||
|
regenerated = await regenerate_daily_workflow_plan(db, user_id, date=date_str)
|
||||||
|
return regenerated, True
|
||||||
|
|
||||||
|
return existing, False
|
||||||
|
|
||||||
|
created = await regenerate_daily_workflow_plan(db, user_id, date=date_str)
|
||||||
|
return created, True
|
||||||
|
|
||||||
|
|
||||||
def update_task_status(
|
def update_task_status(
|
||||||
|
|||||||
Reference in New Issue
Block a user