From 84babd0407ea907f67bc92790f57ddcbebacdd29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:42:14 +0530 Subject: [PATCH] Add workflow provenance quality metrics and classification --- backend/api/today_workflow.py | 4 + backend/services/today_workflow_service.py | 94 +++++++++++++++++++++- 2 files changed, 97 insertions(+), 1 deletion(-) diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 928840b3..0c126a80 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -139,6 +139,9 @@ async def get_today_workflow( except Exception: pass + plan_json = plan.plan_json if isinstance(plan.plan_json, dict) else {} + quality = plan_json.get("quality") if isinstance(plan_json.get("quality"), dict) else None + return { "success": True, "data": { @@ -158,6 +161,7 @@ async def get_today_workflow( "id": plan.id, "date": plan.date, "source": plan.source, + "quality": quality, "created_at": plan.created_at.isoformat() if plan.created_at else None, "updated_at": plan.updated_at.isoformat() if plan.updated_at else None, }, diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 1be7cb21..e8a27a82 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -11,6 +11,10 @@ from services.llm_providers.main_text_generation import llm_text_gen from loguru import logger PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] +TASK_PROVENANCE_AGENT = "agent_proposal" +TASK_PROVENANCE_LLM_BACKFILL = "llm_backfill" +TASK_PROVENANCE_CONTROLLED_FALLBACK = "controlled_fallback" +DEFAULT_AGENT_PERSONALIZATION_THRESHOLD = 0.35 def _today_date_str() -> str: @@ -136,9 +140,77 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]: sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate" sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None sanitized["enabled"] = bool(task.get("enabled", True)) + metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {} + provenance = str(metadata.get("provenance") or "").strip().lower() + if provenance not in { + TASK_PROVENANCE_AGENT, + TASK_PROVENANCE_LLM_BACKFILL, + TASK_PROVENANCE_CONTROLLED_FALLBACK, + }: + if metadata.get("source") == TASK_PROVENANCE_CONTROLLED_FALLBACK: + provenance = TASK_PROVENANCE_CONTROLLED_FALLBACK + elif metadata.get("source") == "llm_pillar_backfill": + provenance = TASK_PROVENANCE_LLM_BACKFILL + elif metadata.get("source_agent"): + provenance = TASK_PROVENANCE_AGENT + else: + provenance = TASK_PROVENANCE_LLM_BACKFILL + metadata["provenance"] = provenance + sanitized["metadata"] = metadata return sanitized +def _agent_personalization_threshold(grounding: Dict[str, Any]) -> float: + workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} + configured = None + if isinstance(workflow_config, dict): + configured = workflow_config.get("min_agent_origin_ratio") + try: + value = float(configured) if configured is not None else DEFAULT_AGENT_PERSONALIZATION_THRESHOLD + except (TypeError, ValueError): + value = DEFAULT_AGENT_PERSONALIZATION_THRESHOLD + return max(0.0, min(1.0, value)) + + +def _compute_plan_quality(tasks: List[Dict[str, Any]], grounding: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + total_tasks = len(tasks) + agent_origin_tasks = [ + task for task in tasks + if isinstance(task.get("metadata"), dict) + and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_AGENT + ] + fallback_tasks = [ + task for task in tasks + if isinstance(task.get("metadata"), dict) + and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_CONTROLLED_FALLBACK + ] + agent_pillars = { + str(task.get("pillarId") or "").lower().strip() + for task in agent_origin_tasks + if str(task.get("pillarId") or "").lower().strip() in PILLAR_IDS + } + + agent_origin_ratio = (len(agent_origin_tasks) / total_tasks) if total_tasks else 0.0 + fallback_ratio = (len(fallback_tasks) / total_tasks) if total_tasks else 0.0 + threshold = _agent_personalization_threshold(grounding or {}) + classification = "AI-personalized" if agent_origin_ratio >= threshold else "guided baseline" + + return { + "classification": classification, + "agentOriginRatio": round(agent_origin_ratio, 4), + "agentOriginPercent": round(agent_origin_ratio * 100, 2), + "agentOriginTaskCount": len(agent_origin_tasks), + "agentOriginPillars": len(agent_pillars), + "fallbackRatio": round(fallback_ratio, 4), + "fallbackPercent": round(fallback_ratio * 100, 2), + "fallbackTaskCount": len(fallback_tasks), + "totalTaskCount": total_tasks, + "thresholds": { + "minAgentOriginRatio": threshold, + }, + } + + def _build_single_task_for_missing_pillar( user_id: str, date: str, @@ -208,6 +280,9 @@ def _ensure_pillar_coverage( generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding) if generated: + metadata = generated.get("metadata") if isinstance(generated.get("metadata"), dict) else {} + metadata["provenance"] = TASK_PROVENANCE_LLM_BACKFILL + generated["metadata"] = metadata sanitized_tasks.append(generated) covered_pillars.add(pillar_id) continue @@ -216,6 +291,7 @@ def _ensure_pillar_coverage( if controlled_fallback: metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {} metadata["source"] = "controlled_fallback" + metadata["provenance"] = TASK_PROVENANCE_CONTROLLED_FALLBACK controlled_fallback["metadata"] = metadata sanitized_tasks.append(controlled_fallback) covered_pillars.add(pillar_id) @@ -374,9 +450,11 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> }) final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) + quality = _compute_plan_quality(final_tasks, grounding) return { "date": date, - "tasks": final_tasks + "tasks": final_tasks, + "quality": quality, } # Fallback to original LLM generation if agents returned nothing @@ -462,6 +540,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> "date": date, "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), } + result["quality"] = _compute_plan_quality(result.get("tasks", []), grounding) activity.log_event( event_type="final_summary", @@ -490,6 +569,19 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt existing = await run_in_threadpool(_get_existing) if existing: + existing_json = existing.plan_json if isinstance(existing.plan_json, dict) else {} + if not isinstance(existing_json.get("quality"), dict): + def _backfill_quality_for_existing(): + plan_json = existing.plan_json if isinstance(existing.plan_json, dict) else {} + tasks_for_quality = plan_json.get("tasks") if isinstance(plan_json.get("tasks"), list) else [] + plan_json["quality"] = _compute_plan_quality(tasks_for_quality, grounding={}) + existing.plan_json = plan_json + existing.updated_at = datetime.utcnow() + db.add(existing) + db.commit() + db.refresh(existing) + return existing + existing = await run_in_threadpool(_backfill_quality_for_existing) return existing, False plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)