Add workflow provenance quality metrics and classification

This commit is contained in:
ي
2026-03-06 21:42:14 +05:30
parent 5d49351c2d
commit 84babd0407
2 changed files with 97 additions and 1 deletions

View File

@@ -139,6 +139,9 @@ async def get_today_workflow(
except Exception:
pass
plan_json = plan.plan_json if isinstance(plan.plan_json, dict) else {}
quality = plan_json.get("quality") if isinstance(plan_json.get("quality"), dict) else None
return {
"success": True,
"data": {
@@ -158,6 +161,7 @@ async def get_today_workflow(
"id": plan.id,
"date": plan.date,
"source": plan.source,
"quality": quality,
"created_at": plan.created_at.isoformat() if plan.created_at else None,
"updated_at": plan.updated_at.isoformat() if plan.updated_at else None,
},

View File

@@ -11,6 +11,10 @@ from services.llm_providers.main_text_generation import llm_text_gen
from loguru import logger
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
TASK_PROVENANCE_AGENT = "agent_proposal"
TASK_PROVENANCE_LLM_BACKFILL = "llm_backfill"
TASK_PROVENANCE_CONTROLLED_FALLBACK = "controlled_fallback"
DEFAULT_AGENT_PERSONALIZATION_THRESHOLD = 0.35
def _today_date_str() -> str:
@@ -136,9 +140,77 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]:
sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate"
sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None
sanitized["enabled"] = bool(task.get("enabled", True))
metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {}
provenance = str(metadata.get("provenance") or "").strip().lower()
if provenance not in {
TASK_PROVENANCE_AGENT,
TASK_PROVENANCE_LLM_BACKFILL,
TASK_PROVENANCE_CONTROLLED_FALLBACK,
}:
if metadata.get("source") == TASK_PROVENANCE_CONTROLLED_FALLBACK:
provenance = TASK_PROVENANCE_CONTROLLED_FALLBACK
elif metadata.get("source") == "llm_pillar_backfill":
provenance = TASK_PROVENANCE_LLM_BACKFILL
elif metadata.get("source_agent"):
provenance = TASK_PROVENANCE_AGENT
else:
provenance = TASK_PROVENANCE_LLM_BACKFILL
metadata["provenance"] = provenance
sanitized["metadata"] = metadata
return sanitized
def _agent_personalization_threshold(grounding: Dict[str, Any]) -> float:
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
configured = None
if isinstance(workflow_config, dict):
configured = workflow_config.get("min_agent_origin_ratio")
try:
value = float(configured) if configured is not None else DEFAULT_AGENT_PERSONALIZATION_THRESHOLD
except (TypeError, ValueError):
value = DEFAULT_AGENT_PERSONALIZATION_THRESHOLD
return max(0.0, min(1.0, value))
def _compute_plan_quality(tasks: List[Dict[str, Any]], grounding: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
total_tasks = len(tasks)
agent_origin_tasks = [
task for task in tasks
if isinstance(task.get("metadata"), dict)
and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_AGENT
]
fallback_tasks = [
task for task in tasks
if isinstance(task.get("metadata"), dict)
and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_CONTROLLED_FALLBACK
]
agent_pillars = {
str(task.get("pillarId") or "").lower().strip()
for task in agent_origin_tasks
if str(task.get("pillarId") or "").lower().strip() in PILLAR_IDS
}
agent_origin_ratio = (len(agent_origin_tasks) / total_tasks) if total_tasks else 0.0
fallback_ratio = (len(fallback_tasks) / total_tasks) if total_tasks else 0.0
threshold = _agent_personalization_threshold(grounding or {})
classification = "AI-personalized" if agent_origin_ratio >= threshold else "guided baseline"
return {
"classification": classification,
"agentOriginRatio": round(agent_origin_ratio, 4),
"agentOriginPercent": round(agent_origin_ratio * 100, 2),
"agentOriginTaskCount": len(agent_origin_tasks),
"agentOriginPillars": len(agent_pillars),
"fallbackRatio": round(fallback_ratio, 4),
"fallbackPercent": round(fallback_ratio * 100, 2),
"fallbackTaskCount": len(fallback_tasks),
"totalTaskCount": total_tasks,
"thresholds": {
"minAgentOriginRatio": threshold,
},
}
def _build_single_task_for_missing_pillar(
user_id: str,
date: str,
@@ -208,6 +280,9 @@ def _ensure_pillar_coverage(
generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding)
if generated:
metadata = generated.get("metadata") if isinstance(generated.get("metadata"), dict) else {}
metadata["provenance"] = TASK_PROVENANCE_LLM_BACKFILL
generated["metadata"] = metadata
sanitized_tasks.append(generated)
covered_pillars.add(pillar_id)
continue
@@ -216,6 +291,7 @@ def _ensure_pillar_coverage(
if controlled_fallback:
metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {}
metadata["source"] = "controlled_fallback"
metadata["provenance"] = TASK_PROVENANCE_CONTROLLED_FALLBACK
controlled_fallback["metadata"] = metadata
sanitized_tasks.append(controlled_fallback)
covered_pillars.add(pillar_id)
@@ -374,9 +450,11 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
})
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
quality = _compute_plan_quality(final_tasks, grounding)
return {
"date": date,
"tasks": final_tasks
"tasks": final_tasks,
"quality": quality,
}
# Fallback to original LLM generation if agents returned nothing
@@ -462,6 +540,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
"date": date,
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
}
result["quality"] = _compute_plan_quality(result.get("tasks", []), grounding)
activity.log_event(
event_type="final_summary",
@@ -490,6 +569,19 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
existing = await run_in_threadpool(_get_existing)
if existing:
existing_json = existing.plan_json if isinstance(existing.plan_json, dict) else {}
if not isinstance(existing_json.get("quality"), dict):
def _backfill_quality_for_existing():
plan_json = existing.plan_json if isinstance(existing.plan_json, dict) else {}
tasks_for_quality = plan_json.get("tasks") if isinstance(plan_json.get("tasks"), list) else []
plan_json["quality"] = _compute_plan_quality(tasks_for_quality, grounding={})
existing.plan_json = plan_json
existing.updated_at = datetime.utcnow()
db.add(existing)
db.commit()
db.refresh(existing)
return existing
existing = await run_in_threadpool(_backfill_quality_for_existing)
return existing, False
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)