diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 2e1efc3f..f56a8316 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -161,6 +161,9 @@ async def get_today_workflow( except Exception: pass + plan_json = plan.plan_json if isinstance(plan.plan_json, dict) else {} + quality = plan_json.get("quality") if isinstance(plan_json.get("quality"), dict) else None + return { "success": True, "data": { @@ -180,6 +183,7 @@ async def get_today_workflow( "id": plan.id, "date": plan.date, "source": plan.source, + "quality": quality, "created_at": plan.created_at.isoformat() if plan.created_at else None, "updated_at": plan.updated_at.isoformat() if plan.updated_at else None, "generation_mode": (plan.plan_json or {}).get("generation_mode"), diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index e9862d70..e8a27a82 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -1,4 +1,3 @@ -import hashlib import json from datetime import datetime, timezone from typing import Any, Dict, List, Optional @@ -9,11 +8,13 @@ from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask from models.agent_activity_models import AgentAlert from services.agent_activity_service import AgentActivityService, build_agent_event_payload from services.llm_providers.main_text_generation import llm_text_gen -from services.onboarding.progress_service import OnboardingProgressService from loguru import logger PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] -FALLBACK_REGENERATION_QUALITY_THRESHOLD = 0.6 +TASK_PROVENANCE_AGENT = "agent_proposal" +TASK_PROVENANCE_LLM_BACKFILL = "llm_backfill" +TASK_PROVENANCE_CONTROLLED_FALLBACK = "controlled_fallback" +DEFAULT_AGENT_PERSONALIZATION_THRESHOLD = 0.35 def _today_date_str() -> str: @@ -110,37 +111,6 @@ def _fallback_tasks(date: str) -> List[Dict[str, Any]]: ] - - -def _compute_task_hash(title: str, description: str) -> str: - text = f"{title.strip().lower()}|{description.strip().lower()}" - return hashlib.sha256(text.encode()).hexdigest() - - -def _extract_plan_metadata(plan: Optional[DailyWorkflowPlan]) -> Dict[str, Any]: - raw = plan.plan_json if plan and isinstance(plan.plan_json, dict) else {} - return { - "generation_mode": str(raw.get("generation_mode") or "").strip().lower() or "unknown", - "quality_score": float(raw.get("quality_score") or 0.0), - "generated_with_agents": bool(raw.get("generated_with_agents", False)), - "onboarding_completed": bool(raw.get("onboarding_completed", False)), - "onboarding_completed_at": raw.get("onboarding_completed_at"), - } - - -def _get_onboarding_status(user_id: str) -> Dict[str, Any]: - status = OnboardingProgressService().get_onboarding_status(user_id) or {} - completed_at_raw = status.get("completed_at") - completed_at = None - if completed_at_raw: - try: - completed_at = datetime.fromisoformat(str(completed_at_raw).replace("Z", "+00:00")) - except Exception: - completed_at = None - return { - "is_completed": bool(status.get("is_completed", False)), - "completed_at": completed_at, - } def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool: workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} if not isinstance(workflow_config, dict): @@ -170,9 +140,77 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]: sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate" sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None sanitized["enabled"] = bool(task.get("enabled", True)) + metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {} + provenance = str(metadata.get("provenance") or "").strip().lower() + if provenance not in { + TASK_PROVENANCE_AGENT, + TASK_PROVENANCE_LLM_BACKFILL, + TASK_PROVENANCE_CONTROLLED_FALLBACK, + }: + if metadata.get("source") == TASK_PROVENANCE_CONTROLLED_FALLBACK: + provenance = TASK_PROVENANCE_CONTROLLED_FALLBACK + elif metadata.get("source") == "llm_pillar_backfill": + provenance = TASK_PROVENANCE_LLM_BACKFILL + elif metadata.get("source_agent"): + provenance = TASK_PROVENANCE_AGENT + else: + provenance = TASK_PROVENANCE_LLM_BACKFILL + metadata["provenance"] = provenance + sanitized["metadata"] = metadata return sanitized +def _agent_personalization_threshold(grounding: Dict[str, Any]) -> float: + workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} + configured = None + if isinstance(workflow_config, dict): + configured = workflow_config.get("min_agent_origin_ratio") + try: + value = float(configured) if configured is not None else DEFAULT_AGENT_PERSONALIZATION_THRESHOLD + except (TypeError, ValueError): + value = DEFAULT_AGENT_PERSONALIZATION_THRESHOLD + return max(0.0, min(1.0, value)) + + +def _compute_plan_quality(tasks: List[Dict[str, Any]], grounding: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + total_tasks = len(tasks) + agent_origin_tasks = [ + task for task in tasks + if isinstance(task.get("metadata"), dict) + and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_AGENT + ] + fallback_tasks = [ + task for task in tasks + if isinstance(task.get("metadata"), dict) + and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_CONTROLLED_FALLBACK + ] + agent_pillars = { + str(task.get("pillarId") or "").lower().strip() + for task in agent_origin_tasks + if str(task.get("pillarId") or "").lower().strip() in PILLAR_IDS + } + + agent_origin_ratio = (len(agent_origin_tasks) / total_tasks) if total_tasks else 0.0 + fallback_ratio = (len(fallback_tasks) / total_tasks) if total_tasks else 0.0 + threshold = _agent_personalization_threshold(grounding or {}) + classification = "AI-personalized" if agent_origin_ratio >= threshold else "guided baseline" + + return { + "classification": classification, + "agentOriginRatio": round(agent_origin_ratio, 4), + "agentOriginPercent": round(agent_origin_ratio * 100, 2), + "agentOriginTaskCount": len(agent_origin_tasks), + "agentOriginPillars": len(agent_pillars), + "fallbackRatio": round(fallback_ratio, 4), + "fallbackPercent": round(fallback_ratio * 100, 2), + "fallbackTaskCount": len(fallback_tasks), + "totalTaskCount": total_tasks, + "thresholds": { + "minAgentOriginRatio": threshold, + }, + } + + def _build_single_task_for_missing_pillar( user_id: str, date: str, @@ -242,6 +280,9 @@ def _ensure_pillar_coverage( generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding) if generated: + metadata = generated.get("metadata") if isinstance(generated.get("metadata"), dict) else {} + metadata["provenance"] = TASK_PROVENANCE_LLM_BACKFILL + generated["metadata"] = metadata sanitized_tasks.append(generated) covered_pillars.add(pillar_id) continue @@ -250,6 +291,7 @@ def _ensure_pillar_coverage( if controlled_fallback: metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {} metadata["source"] = "controlled_fallback" + metadata["provenance"] = TASK_PROVENANCE_CONTROLLED_FALLBACK controlled_fallback["metadata"] = metadata sanitized_tasks.append(controlled_fallback) covered_pillars.add(pillar_id) @@ -316,7 +358,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> orchestrator = await orchestration_service.get_or_create_orchestrator(user_id) except Exception as e: logger.error(f"Failed to get orchestrator: {e}") - return {"date": date, "tasks": _fallback_tasks(date), "generation_mode": "fallback", "quality_score": 0.3, "generated_with_agents": False} + return {"date": date, "tasks": _fallback_tasks(date)} # 2. Parallel "Committee" Proposal Gathering logger.info(f"Gathering daily task proposals from agent committee for user {user_id}") @@ -408,12 +450,11 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> }) final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) + quality = _compute_plan_quality(final_tasks, grounding) return { "date": date, "tasks": final_tasks, - "generation_mode": "agent_committee", - "quality_score": 0.9, - "generated_with_agents": True, + "quality": quality, } # Fallback to original LLM generation if agents returned nothing @@ -472,7 +513,6 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> agent_type="TodayWorkflowGenerator", ) - used_fallback = False try: raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id) if isinstance(raw, dict): @@ -481,7 +521,6 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> try: result = json.loads(raw) except Exception: - used_fallback = True result = {"date": date, "tasks": _fallback_tasks(date)} except Exception as e: activity.log_event( @@ -492,20 +531,16 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> run_id=run.id, agent_type="TodayWorkflowGenerator", ) - used_fallback = True result = {"date": date, "tasks": _fallback_tasks(date)} tasks = result.get("tasks") if isinstance(result, dict) else None if not isinstance(tasks, list) or not tasks: - used_fallback = True tasks = _fallback_tasks(date) result = { "date": date, "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), - "generation_mode": "fallback" if used_fallback else "llm", - "quality_score": 0.4 if used_fallback else 0.75, - "generated_with_agents": False, } + result["quality"] = _compute_plan_quality(result.get("tasks", []), grounding) activity.log_event( event_type="final_summary", @@ -519,83 +554,63 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> return result -async def regenerate_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> DailyWorkflowPlan: +async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]: from starlette.concurrency import run_in_threadpool - + date_str = date or _today_date_str() - onboarding_status = _get_onboarding_status(user_id) - - existing = await run_in_threadpool( - lambda: ( + + def _get_existing(): + return ( db.query(DailyWorkflowPlan) .filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str) .first() ) - ) - - existing_hash_status = {} + + existing = await run_in_threadpool(_get_existing) + if existing: - existing_tasks = await run_in_threadpool( - lambda: ( - db.query(DailyWorkflowTask) - .filter(DailyWorkflowTask.plan_id == existing.id, DailyWorkflowTask.user_id == user_id) - .all() - ) - ) - for task in existing_tasks: - task_hash = _compute_task_hash(task.title, task.description) - existing_hash_status[task_hash] = { - "status": task.status, - "decided_at": task.decided_at, - "completion_notes": task.completion_notes, - } + existing_json = existing.plan_json if isinstance(existing.plan_json, dict) else {} + if not isinstance(existing_json.get("quality"), dict): + def _backfill_quality_for_existing(): + plan_json = existing.plan_json if isinstance(existing.plan_json, dict) else {} + tasks_for_quality = plan_json.get("tasks") if isinstance(plan_json.get("tasks"), list) else [] + plan_json["quality"] = _compute_plan_quality(tasks_for_quality, grounding={}) + existing.plan_json = plan_json + existing.updated_at = datetime.utcnow() + db.add(existing) + db.commit() + db.refresh(existing) + return existing + existing = await run_in_threadpool(_backfill_quality_for_existing) + return existing, False plan_data = await generate_agent_enhanced_plan(db, user_id, date_str) - plan_data["onboarding_completed"] = onboarding_status["is_completed"] - plan_data["onboarding_completed_at"] = onboarding_status["completed_at"].isoformat() if onboarding_status["completed_at"] else None + tasks = plan_data.get("tasks", []) - tasks = plan_data.get("tasks", []) if isinstance(plan_data, dict) else [] - - def _replace_plan() -> DailyWorkflowPlan: - if existing: - db.query(DailyWorkflowTask).filter(DailyWorkflowTask.plan_id == existing.id).delete(synchronize_session=False) - plan = existing - plan.source = "agent" - plan.plan_json = plan_data - plan.updated_at = datetime.utcnow() - db.add(plan) - db.commit() - db.refresh(plan) - else: - plan = DailyWorkflowPlan( - user_id=user_id, - date=date_str, - source="agent", - plan_json=plan_data, - created_at=datetime.utcnow(), - updated_at=datetime.utcnow(), - ) - db.add(plan) - db.commit() - db.refresh(plan) + def _create_plan(): + plan = DailyWorkflowPlan( + user_id=user_id, + date=date_str, + source="agent", + plan_json=plan_data, + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), + ) + db.add(plan) + db.commit() + db.refresh(plan) for t in tasks: pillar_id = str(t.get("pillarId") or "").lower().strip() if pillar_id not in PILLAR_IDS: continue - - title = str(t.get("title") or "Task").strip()[:255] - description = str(t.get("description") or "").strip() - task_hash = _compute_task_hash(title, description) - preserved = existing_hash_status.get(task_hash) or {} - task = DailyWorkflowTask( plan_id=plan.id, user_id=user_id, pillar_id=pillar_id, - title=title, - description=description, - status=preserved.get("status") or _coerce_status(t.get("status")), + title=str(t.get("title") or "Task").strip()[:255], + description=str(t.get("description") or "").strip(), + status=_coerce_status(t.get("status")), priority=_coerce_priority(t.get("priority")), estimated_time=int(t.get("estimatedTime") or 15), action_type=str(t.get("actionType") or "navigate").strip()[:20], @@ -605,53 +620,14 @@ async def regenerate_daily_workflow_plan(db: Session, user_id: str, date: Option enabled=bool(t.get("enabled", True)), created_at=datetime.utcnow(), updated_at=datetime.utcnow(), - decided_at=preserved.get("decided_at"), - completion_notes=preserved.get("completion_notes"), ) db.add(task) - + db.commit() - db.refresh(plan) return plan - return await run_in_threadpool(_replace_plan) - - -async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]: - from starlette.concurrency import run_in_threadpool - - date_str = date or _today_date_str() - - existing = await run_in_threadpool( - lambda: ( - db.query(DailyWorkflowPlan) - .filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str) - .first() - ) - ) - - if existing: - metadata = _extract_plan_metadata(existing) - onboarding_status = _get_onboarding_status(user_id) - - should_regenerate = False - if metadata["generation_mode"] == "fallback" and metadata["quality_score"] < FALLBACK_REGENERATION_QUALITY_THRESHOLD: - should_regenerate = True - - if ( - onboarding_status["is_completed"] - and not metadata["onboarding_completed"] - ): - should_regenerate = True - - if should_regenerate: - regenerated = await regenerate_daily_workflow_plan(db, user_id, date=date_str) - return regenerated, True - - return existing, False - - created = await regenerate_daily_workflow_plan(db, user_id, date=date_str) - return created, True + plan = await run_in_threadpool(_create_plan) + return plan, True def update_task_status(