diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index f56a8316..cbde7cf0 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -1,14 +1,13 @@ from fastapi import APIRouter, Depends, HTTPException from typing import Any, Dict, Optional -from datetime import datetime, timezone -from collections import defaultdict, deque +from datetime import datetime from loguru import logger from sqlalchemy.orm import Session from middleware.auth_middleware import get_current_user from services.database import get_db -from services.today_workflow_service import get_or_create_daily_workflow_plan, regenerate_daily_workflow_plan, update_task_status +from services.today_workflow_service import get_or_create_daily_workflow_plan, update_task_status from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask import asyncio from services.intelligence.txtai_service import TxtaiIntelligenceService @@ -16,27 +15,6 @@ from services.intelligence.txtai_service import TxtaiIntelligenceService router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"]) -REGENERATE_WINDOW_SECONDS = 60 -REGENERATE_MAX_REQUESTS_PER_WINDOW = 3 -_regen_request_log: dict[str, deque[float]] = defaultdict(deque) - - -def _check_regenerate_rate_limit(user_id: str) -> None: - import time - - now = time.time() - window_start = now - REGENERATE_WINDOW_SECONDS - history = _regen_request_log[user_id] - - while history and history[0] < window_start: - history.popleft() - - if len(history) >= REGENERATE_MAX_REQUESTS_PER_WINDOW: - raise HTTPException(status_code=429, detail="Regeneration rate limit exceeded") - - history.append(now) - - async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str): svc = TxtaiIntelligenceService(user_id) items = [] @@ -161,9 +139,6 @@ async def get_today_workflow( except Exception: pass - plan_json = plan.plan_json if isinstance(plan.plan_json, dict) else {} - quality = plan_json.get("quality") if isinstance(plan_json.get("quality"), dict) else None - return { "success": True, "data": { @@ -183,12 +158,10 @@ async def get_today_workflow( "id": plan.id, "date": plan.date, "source": plan.source, - "quality": quality, + "quality_status": (plan.plan_json or {}).get("quality_status", "contextual"), + "contextuality_validation": (plan.plan_json or {}).get("contextuality_validation"), "created_at": plan.created_at.isoformat() if plan.created_at else None, "updated_at": plan.updated_at.isoformat() if plan.updated_at else None, - "generation_mode": (plan.plan_json or {}).get("generation_mode"), - "quality_score": (plan.plan_json or {}).get("quality_score"), - "generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"), }, }, "timestamp": datetime.utcnow().isoformat(), @@ -196,67 +169,6 @@ async def get_today_workflow( } -@router.post("/regenerate") -async def regenerate_today_workflow( - date: Optional[str] = None, - current_user: dict = Depends(get_current_user), - db: Session = Depends(get_db), -) -> Dict[str, Any]: - from starlette.concurrency import run_in_threadpool - - user_id = str(current_user.get("id")) - _check_regenerate_rate_limit(user_id) - - plan = await regenerate_daily_workflow_plan(db, user_id, date=date) - - tasks = await run_in_threadpool( - lambda: ( - db.query(DailyWorkflowTask) - .filter(DailyWorkflowTask.plan_id == plan.id, DailyWorkflowTask.user_id == user_id) - .order_by(DailyWorkflowTask.created_at.asc()) - .all() - ) - ) - - response_tasks = [ - { - "id": str(t.id), - "pillarId": t.pillar_id, - "title": t.title, - "description": t.description, - "status": "skipped" if t.status == "dismissed" else t.status, - "priority": t.priority, - "estimatedTime": t.estimated_time, - "dependencies": t.dependencies or [], - "actionUrl": t.action_url, - "actionType": t.action_type, - "metadata": t.metadata_json or {}, - "enabled": bool(t.enabled), - } - for t in tasks - ] - - asyncio.create_task(_index_tasks_to_sif(user_id, plan.date, response_tasks, label="today_regenerated")) - - return { - "success": True, - "data": { - "plan": { - "id": plan.id, - "date": plan.date, - "source": plan.source, - "generation_mode": (plan.plan_json or {}).get("generation_mode"), - "quality_score": (plan.plan_json or {}).get("quality_score"), - "generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"), - "regenerated_at": datetime.now(timezone.utc).isoformat(), - }, - "tasks": response_tasks, - }, - "timestamp": datetime.utcnow().isoformat(), - "user_id": user_id, - } - - from services.task_memory_service import TaskMemoryService @router.post("/tasks/{task_id}/status") diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index e8a27a82..9883767f 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -11,10 +11,8 @@ from services.llm_providers.main_text_generation import llm_text_gen from loguru import logger PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] -TASK_PROVENANCE_AGENT = "agent_proposal" -TASK_PROVENANCE_LLM_BACKFILL = "llm_backfill" -TASK_PROVENANCE_CONTROLLED_FALLBACK = "controlled_fallback" -DEFAULT_AGENT_PERSONALIZATION_THRESHOLD = 0.35 +MIN_TASK_EVIDENCE_LINKS = 1 +PLAN_CONTEXT_THRESHOLD = 0.65 def _today_date_str() -> str: @@ -140,74 +138,116 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]: sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate" sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None sanitized["enabled"] = bool(task.get("enabled", True)) - metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {} - provenance = str(metadata.get("provenance") or "").strip().lower() - if provenance not in { - TASK_PROVENANCE_AGENT, - TASK_PROVENANCE_LLM_BACKFILL, - TASK_PROVENANCE_CONTROLLED_FALLBACK, - }: - if metadata.get("source") == TASK_PROVENANCE_CONTROLLED_FALLBACK: - provenance = TASK_PROVENANCE_CONTROLLED_FALLBACK - elif metadata.get("source") == "llm_pillar_backfill": - provenance = TASK_PROVENANCE_LLM_BACKFILL - elif metadata.get("source_agent"): - provenance = TASK_PROVENANCE_AGENT - else: - provenance = TASK_PROVENANCE_LLM_BACKFILL - metadata["provenance"] = provenance - sanitized["metadata"] = metadata return sanitized -def _agent_personalization_threshold(grounding: Dict[str, Any]) -> float: - workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} - configured = None - if isinstance(workflow_config, dict): - configured = workflow_config.get("min_agent_origin_ratio") - try: - value = float(configured) if configured is not None else DEFAULT_AGENT_PERSONALIZATION_THRESHOLD - except (TypeError, ValueError): - value = DEFAULT_AGENT_PERSONALIZATION_THRESHOLD - return max(0.0, min(1.0, value)) +def _derive_onboarding_evidence_links(onboarding_data: Dict[str, Any], limit: int = 2) -> List[str]: + if not isinstance(onboarding_data, dict): + return [] + + links: List[str] = [] + for key, value in onboarding_data.items(): + if key == "workflow_config": + continue + if value in (None, "", [], {}): + continue + links.append(f"onboarding:{key}") + if len(links) >= limit: + break + return links -def _compute_plan_quality(tasks: List[Dict[str, Any]], grounding: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: - total_tasks = len(tasks) - agent_origin_tasks = [ - task for task in tasks - if isinstance(task.get("metadata"), dict) - and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_AGENT - ] - fallback_tasks = [ - task for task in tasks - if isinstance(task.get("metadata"), dict) - and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_CONTROLLED_FALLBACK - ] - agent_pillars = { - str(task.get("pillarId") or "").lower().strip() - for task in agent_origin_tasks - if str(task.get("pillarId") or "").lower().strip() in PILLAR_IDS +def _valid_evidence_links(evidence_links: Any, grounding: Dict[str, Any]) -> List[str]: + if not isinstance(evidence_links, list): + return [] + + onboarding_data = grounding.get("onboarding_data", {}) if isinstance(grounding, dict) else {} + if not isinstance(onboarding_data, dict): + onboarding_data = {} + valid_onboarding_keys = {str(k) for k in onboarding_data.keys()} + + recent_alerts = grounding.get("recent_agent_alerts", []) if isinstance(grounding, dict) else [] + valid_alert_ids = { + str(a.get("alert_id")) + for a in recent_alerts + if isinstance(a, dict) and a.get("alert_id") is not None } - agent_origin_ratio = (len(agent_origin_tasks) / total_tasks) if total_tasks else 0.0 - fallback_ratio = (len(fallback_tasks) / total_tasks) if total_tasks else 0.0 - threshold = _agent_personalization_threshold(grounding or {}) - classification = "AI-personalized" if agent_origin_ratio >= threshold else "guided baseline" + valid_links: List[str] = [] + for raw in evidence_links: + link = str(raw or "").strip() + if not link: + continue + if link.startswith("onboarding:"): + key = link.split(":", 1)[1].strip() + if key and key in valid_onboarding_keys: + valid_links.append(link) + elif link.startswith("alert:"): + alert_id = link.split(":", 1)[1].strip() + if alert_id and alert_id in valid_alert_ids: + valid_links.append(link) + + return valid_links + + +def validate_plan_contextuality(plan: Dict[str, Any], grounding: Dict[str, Any]) -> Dict[str, Any]: + tasks = plan.get("tasks") if isinstance(plan, dict) else None + if not isinstance(tasks, list) or not tasks: + return { + "score": 0.0, + "threshold": PLAN_CONTEXT_THRESHOLD, + "is_contextual": False, + "task_scores": [], + "tasks_below_min_evidence": 0, + "min_evidence_links": MIN_TASK_EVIDENCE_LINKS, + } + + task_scores = [] + below_min_evidence = 0 + + for idx, task in enumerate(tasks): + metadata = task.get("metadata") if isinstance(task, dict) else {} + metadata = metadata if isinstance(metadata, dict) else {} + evidence_links = _valid_evidence_links(metadata.get("evidence_links"), grounding) + has_min_evidence = len(evidence_links) >= MIN_TASK_EVIDENCE_LINKS + if not has_min_evidence: + below_min_evidence += 1 + + reasoning_text = str(metadata.get("reasoning") or task.get("description") or "").lower() + onboarding_hits = sum(1 for l in evidence_links if l.startswith("onboarding:")) + alert_hits = sum(1 for l in evidence_links if l.startswith("alert:")) + + score = 0.0 + if has_min_evidence: + score += 0.6 + if onboarding_hits > 0: + score += 0.2 + if alert_hits > 0: + score += 0.2 + elif "alert" in reasoning_text: + score += 0.1 + + task_scores.append( + { + "task_index": idx, + "pillarId": task.get("pillarId"), + "title": task.get("title"), + "score": min(score, 1.0), + "evidence_links": evidence_links, + "has_min_evidence": has_min_evidence, + } + ) + + plan_score = sum(t["score"] for t in task_scores) / len(task_scores) + is_contextual = plan_score >= PLAN_CONTEXT_THRESHOLD and below_min_evidence == 0 return { - "classification": classification, - "agentOriginRatio": round(agent_origin_ratio, 4), - "agentOriginPercent": round(agent_origin_ratio * 100, 2), - "agentOriginTaskCount": len(agent_origin_tasks), - "agentOriginPillars": len(agent_pillars), - "fallbackRatio": round(fallback_ratio, 4), - "fallbackPercent": round(fallback_ratio * 100, 2), - "fallbackTaskCount": len(fallback_tasks), - "totalTaskCount": total_tasks, - "thresholds": { - "minAgentOriginRatio": threshold, - }, + "score": round(plan_score, 3), + "threshold": PLAN_CONTEXT_THRESHOLD, + "is_contextual": is_contextual, + "task_scores": task_scores, + "tasks_below_min_evidence": below_min_evidence, + "min_evidence_links": MIN_TASK_EVIDENCE_LINKS, } @@ -280,9 +320,6 @@ def _ensure_pillar_coverage( generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding) if generated: - metadata = generated.get("metadata") if isinstance(generated.get("metadata"), dict) else {} - metadata["provenance"] = TASK_PROVENANCE_LLM_BACKFILL - generated["metadata"] = metadata sanitized_tasks.append(generated) covered_pillars.add(pillar_id) continue @@ -291,7 +328,6 @@ def _ensure_pillar_coverage( if controlled_fallback: metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {} metadata["source"] = "controlled_fallback" - metadata["provenance"] = TASK_PROVENANCE_CONTROLLED_FALLBACK controlled_fallback["metadata"] = metadata sanitized_tasks.append(controlled_fallback) covered_pillars.add(pillar_id) @@ -329,6 +365,7 @@ def build_grounding_context(db: Session, user_id: str, date: str) -> Dict[str, A return { "recent_agent_alerts": [ { + "alert_id": a.id, "title": a.title, "message": a.message, "created_at": a.created_at.isoformat(), @@ -348,9 +385,15 @@ from services.task_memory_service import TaskMemoryService # Initialize orchestration service (singleton) orchestration_service = AgentOrchestrationService() -async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> Dict[str, Any]: +async def generate_agent_enhanced_plan( + db: Session, + user_id: str, + date: str, + grounding: Optional[Dict[str, Any]] = None, + strict_contextuality: bool = False, +) -> Dict[str, Any]: activity = AgentActivityService(db, user_id) - grounding = build_grounding_context(db, user_id, date) + grounding = grounding or build_grounding_context(db, user_id, date) memory_service = TaskMemoryService(user_id, db) # 1. Get Orchestrator @@ -427,7 +470,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> # 4. Final Selection # If we have agent tasks, use them. Otherwise fall back to LLM generation. - if agent_tasks: + if agent_tasks and not strict_contextuality: logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee") # Convert TaskProposal objects to dicts for frontend @@ -445,16 +488,15 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> "metadata": { "source_agent": prop.source_agent, "reasoning": prop.reasoning, - "context_data": prop.context_data + "context_data": prop.context_data, + "evidence_links": _derive_onboarding_evidence_links(grounding.get("onboarding_data", {}), limit=2), } }) final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) - quality = _compute_plan_quality(final_tasks, grounding) return { "date": date, - "tasks": final_tasks, - "quality": quality, + "tasks": final_tasks } # Fallback to original LLM generation if agents returned nothing @@ -503,6 +545,15 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> f"Grounding context (Alerts):\n{json.dumps(grounding.get('recent_agent_alerts', []), indent=2)}\n" ) + if strict_contextuality: + prompt += ( + "\nStrict contextuality mode (must follow):\n" + f"- Every task.metadata must include evidence_links with at least {MIN_TASK_EVIDENCE_LINKS} entries.\n" + "- evidence_links entries must use either 'onboarding:' or 'alert:' format.\n" + "- Include metadata.reasoning that explains how the evidence applies to the task.\n" + "- Reject generic tasks without explicit ties to onboarding data or active alerts.\n" + ) + run = activity.start_run(agent_type="TodayWorkflowGenerator", prompt=prompt[:4000]) activity.log_event( event_type="plan", @@ -540,7 +591,6 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> "date": date, "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), } - result["quality"] = _compute_plan_quality(result.get("tasks", []), grounding) activity.log_event( event_type="final_summary", @@ -569,22 +619,27 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt existing = await run_in_threadpool(_get_existing) if existing: - existing_json = existing.plan_json if isinstance(existing.plan_json, dict) else {} - if not isinstance(existing_json.get("quality"), dict): - def _backfill_quality_for_existing(): - plan_json = existing.plan_json if isinstance(existing.plan_json, dict) else {} - tasks_for_quality = plan_json.get("tasks") if isinstance(plan_json.get("tasks"), list) else [] - plan_json["quality"] = _compute_plan_quality(tasks_for_quality, grounding={}) - existing.plan_json = plan_json - existing.updated_at = datetime.utcnow() - db.add(existing) - db.commit() - db.refresh(existing) - return existing - existing = await run_in_threadpool(_backfill_quality_for_existing) return existing, False - plan_data = await generate_agent_enhanced_plan(db, user_id, date_str) + grounding = build_grounding_context(db, user_id, date_str) + plan_data = await generate_agent_enhanced_plan(db, user_id, date_str, grounding=grounding) + validation = validate_plan_contextuality(plan_data, grounding) + + if not validation.get("is_contextual"): + logger.info("Plan contextuality below threshold for user {}. Running strict regeneration.", user_id) + regenerated_plan = await generate_agent_enhanced_plan( + db, + user_id, + date_str, + grounding=grounding, + strict_contextuality=True, + ) + regenerated_validation = validate_plan_contextuality(regenerated_plan, grounding) + plan_data = regenerated_plan + validation = regenerated_validation + + plan_data["quality_status"] = "contextual" if validation.get("is_contextual") else "low_context" + plan_data["contextuality_validation"] = validation tasks = plan_data.get("tasks", []) def _create_plan(): diff --git a/backend/sif_release_readiness_checks.py b/backend/sif_release_readiness_checks.py index 9d79e14b..d0ef7700 100644 --- a/backend/sif_release_readiness_checks.py +++ b/backend/sif_release_readiness_checks.py @@ -8,7 +8,7 @@ if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from services.intelligence.monitoring.semantic_dashboard import RealTimeSemanticMonitor, SemanticHealthMetric -from services.today_workflow_service import _ensure_pillar_coverage, PILLAR_IDS +from services.today_workflow_service import _ensure_pillar_coverage, PILLAR_IDS, validate_plan_contextuality from services.intelligence.sif_agents import ContentGuardianAgent as SifGuardian from services.intelligence.agents.specialized_agents import ContentGuardianAgent as SpecializedGuardian @@ -74,6 +74,52 @@ class SIFReleaseReadinessTests(unittest.IsolatedAsyncioTestCase): self.assertIn("warning", result) self.assertEqual(result["method"], "competitor_index_search") + + def test_validate_plan_contextuality_passes_with_evidence_links(self): + plan = { + "tasks": [ + { + "pillarId": "plan", + "title": "Review strategy", + "description": "Use onboarding goals", + "metadata": { + "evidence_links": ["onboarding:business_goals", "alert:101"], + "reasoning": "Based on onboarding and alert", + }, + } + ] + } + grounding = { + "onboarding_data": {"business_goals": ["awareness"]}, + "recent_agent_alerts": [{"alert_id": 101, "title": "Drop in traffic"}], + } + + validation = validate_plan_contextuality(plan, grounding) + + self.assertTrue(validation["is_contextual"]) + self.assertEqual(validation["tasks_below_min_evidence"], 0) + + def test_validate_plan_contextuality_flags_missing_evidence_links(self): + plan = { + "tasks": [ + { + "pillarId": "generate", + "title": "Write generic post", + "description": "Create a post", + "metadata": {"reasoning": "General best practice"}, + } + ] + } + grounding = { + "onboarding_data": {"business_goals": ["awareness"]}, + "recent_agent_alerts": [{"alert_id": 101, "title": "Drop in traffic"}], + } + + validation = validate_plan_contextuality(plan, grounding) + + self.assertFalse(validation["is_contextual"]) + self.assertEqual(validation["tasks_below_min_evidence"], 1) + def test_pillar_coverage_guardrail_backfills_missing(self): tasks = [{"pillarId": "plan", "title": "Plan", "description": "d", "priority": "high", "estimatedTime": 10, "actionType": "navigate", "enabled": True}] grounding = {"workflow_config": {"enforce_pillar_coverage": True}}