From 4621107988962d6ae8bb14bd517771b0164dc99c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:40:29 +0530 Subject: [PATCH] Add degraded-mode workflow regeneration criteria and endpoint --- backend/api/today_workflow.py | 90 ++++++++++- backend/services/today_workflow_service.py | 176 +++++++++++++++++---- 2 files changed, 234 insertions(+), 32 deletions(-) diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 928840b3..2e1efc3f 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -1,13 +1,14 @@ from fastapi import APIRouter, Depends, HTTPException from typing import Any, Dict, Optional -from datetime import datetime +from datetime import datetime, timezone +from collections import defaultdict, deque from loguru import logger from sqlalchemy.orm import Session from middleware.auth_middleware import get_current_user from services.database import get_db -from services.today_workflow_service import get_or_create_daily_workflow_plan, update_task_status +from services.today_workflow_service import get_or_create_daily_workflow_plan, regenerate_daily_workflow_plan, update_task_status from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask import asyncio from services.intelligence.txtai_service import TxtaiIntelligenceService @@ -15,6 +16,27 @@ from services.intelligence.txtai_service import TxtaiIntelligenceService router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"]) +REGENERATE_WINDOW_SECONDS = 60 +REGENERATE_MAX_REQUESTS_PER_WINDOW = 3 +_regen_request_log: dict[str, deque[float]] = defaultdict(deque) + + +def _check_regenerate_rate_limit(user_id: str) -> None: + import time + + now = time.time() + window_start = now - REGENERATE_WINDOW_SECONDS + history = _regen_request_log[user_id] + + while history and history[0] < window_start: + history.popleft() + + if len(history) >= REGENERATE_MAX_REQUESTS_PER_WINDOW: + raise HTTPException(status_code=429, detail="Regeneration rate limit exceeded") + + history.append(now) + + async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str): svc = TxtaiIntelligenceService(user_id) items = [] @@ -160,6 +182,9 @@ async def get_today_workflow( "source": plan.source, "created_at": plan.created_at.isoformat() if plan.created_at else None, "updated_at": plan.updated_at.isoformat() if plan.updated_at else None, + "generation_mode": (plan.plan_json or {}).get("generation_mode"), + "quality_score": (plan.plan_json or {}).get("quality_score"), + "generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"), }, }, "timestamp": datetime.utcnow().isoformat(), @@ -167,6 +192,67 @@ async def get_today_workflow( } +@router.post("/regenerate") +async def regenerate_today_workflow( + date: Optional[str] = None, + current_user: dict = Depends(get_current_user), + db: Session = Depends(get_db), +) -> Dict[str, Any]: + from starlette.concurrency import run_in_threadpool + + user_id = str(current_user.get("id")) + _check_regenerate_rate_limit(user_id) + + plan = await regenerate_daily_workflow_plan(db, user_id, date=date) + + tasks = await run_in_threadpool( + lambda: ( + db.query(DailyWorkflowTask) + .filter(DailyWorkflowTask.plan_id == plan.id, DailyWorkflowTask.user_id == user_id) + .order_by(DailyWorkflowTask.created_at.asc()) + .all() + ) + ) + + response_tasks = [ + { + "id": str(t.id), + "pillarId": t.pillar_id, + "title": t.title, + "description": t.description, + "status": "skipped" if t.status == "dismissed" else t.status, + "priority": t.priority, + "estimatedTime": t.estimated_time, + "dependencies": t.dependencies or [], + "actionUrl": t.action_url, + "actionType": t.action_type, + "metadata": t.metadata_json or {}, + "enabled": bool(t.enabled), + } + for t in tasks + ] + + asyncio.create_task(_index_tasks_to_sif(user_id, plan.date, response_tasks, label="today_regenerated")) + + return { + "success": True, + "data": { + "plan": { + "id": plan.id, + "date": plan.date, + "source": plan.source, + "generation_mode": (plan.plan_json or {}).get("generation_mode"), + "quality_score": (plan.plan_json or {}).get("quality_score"), + "generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"), + "regenerated_at": datetime.now(timezone.utc).isoformat(), + }, + "tasks": response_tasks, + }, + "timestamp": datetime.utcnow().isoformat(), + "user_id": user_id, + } + + from services.task_memory_service import TaskMemoryService @router.post("/tasks/{task_id}/status") diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 1be7cb21..e9862d70 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -1,3 +1,4 @@ +import hashlib import json from datetime import datetime, timezone from typing import Any, Dict, List, Optional @@ -8,9 +9,11 @@ from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask from models.agent_activity_models import AgentAlert from services.agent_activity_service import AgentActivityService, build_agent_event_payload from services.llm_providers.main_text_generation import llm_text_gen +from services.onboarding.progress_service import OnboardingProgressService from loguru import logger PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] +FALLBACK_REGENERATION_QUALITY_THRESHOLD = 0.6 def _today_date_str() -> str: @@ -107,6 +110,37 @@ def _fallback_tasks(date: str) -> List[Dict[str, Any]]: ] + + +def _compute_task_hash(title: str, description: str) -> str: + text = f"{title.strip().lower()}|{description.strip().lower()}" + return hashlib.sha256(text.encode()).hexdigest() + + +def _extract_plan_metadata(plan: Optional[DailyWorkflowPlan]) -> Dict[str, Any]: + raw = plan.plan_json if plan and isinstance(plan.plan_json, dict) else {} + return { + "generation_mode": str(raw.get("generation_mode") or "").strip().lower() or "unknown", + "quality_score": float(raw.get("quality_score") or 0.0), + "generated_with_agents": bool(raw.get("generated_with_agents", False)), + "onboarding_completed": bool(raw.get("onboarding_completed", False)), + "onboarding_completed_at": raw.get("onboarding_completed_at"), + } + + +def _get_onboarding_status(user_id: str) -> Dict[str, Any]: + status = OnboardingProgressService().get_onboarding_status(user_id) or {} + completed_at_raw = status.get("completed_at") + completed_at = None + if completed_at_raw: + try: + completed_at = datetime.fromisoformat(str(completed_at_raw).replace("Z", "+00:00")) + except Exception: + completed_at = None + return { + "is_completed": bool(status.get("is_completed", False)), + "completed_at": completed_at, + } def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool: workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} if not isinstance(workflow_config, dict): @@ -282,7 +316,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> orchestrator = await orchestration_service.get_or_create_orchestrator(user_id) except Exception as e: logger.error(f"Failed to get orchestrator: {e}") - return {"date": date, "tasks": _fallback_tasks(date)} + return {"date": date, "tasks": _fallback_tasks(date), "generation_mode": "fallback", "quality_score": 0.3, "generated_with_agents": False} # 2. Parallel "Committee" Proposal Gathering logger.info(f"Gathering daily task proposals from agent committee for user {user_id}") @@ -376,7 +410,10 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) return { "date": date, - "tasks": final_tasks + "tasks": final_tasks, + "generation_mode": "agent_committee", + "quality_score": 0.9, + "generated_with_agents": True, } # Fallback to original LLM generation if agents returned nothing @@ -435,6 +472,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> agent_type="TodayWorkflowGenerator", ) + used_fallback = False try: raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id) if isinstance(raw, dict): @@ -443,6 +481,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> try: result = json.loads(raw) except Exception: + used_fallback = True result = {"date": date, "tasks": _fallback_tasks(date)} except Exception as e: activity.log_event( @@ -453,14 +492,19 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> run_id=run.id, agent_type="TodayWorkflowGenerator", ) + used_fallback = True result = {"date": date, "tasks": _fallback_tasks(date)} tasks = result.get("tasks") if isinstance(result, dict) else None if not isinstance(tasks, list) or not tasks: + used_fallback = True tasks = _fallback_tasks(date) result = { "date": date, "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), + "generation_mode": "fallback" if used_fallback else "llm", + "quality_score": 0.4 if used_fallback else 0.75, + "generated_with_agents": False, } activity.log_event( @@ -475,50 +519,83 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> return result -async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]: +async def regenerate_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> DailyWorkflowPlan: from starlette.concurrency import run_in_threadpool - + date_str = date or _today_date_str() - - def _get_existing(): - return ( + onboarding_status = _get_onboarding_status(user_id) + + existing = await run_in_threadpool( + lambda: ( db.query(DailyWorkflowPlan) .filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str) .first() ) - - existing = await run_in_threadpool(_get_existing) - + ) + + existing_hash_status = {} if existing: - return existing, False + existing_tasks = await run_in_threadpool( + lambda: ( + db.query(DailyWorkflowTask) + .filter(DailyWorkflowTask.plan_id == existing.id, DailyWorkflowTask.user_id == user_id) + .all() + ) + ) + for task in existing_tasks: + task_hash = _compute_task_hash(task.title, task.description) + existing_hash_status[task_hash] = { + "status": task.status, + "decided_at": task.decided_at, + "completion_notes": task.completion_notes, + } plan_data = await generate_agent_enhanced_plan(db, user_id, date_str) - tasks = plan_data.get("tasks", []) + plan_data["onboarding_completed"] = onboarding_status["is_completed"] + plan_data["onboarding_completed_at"] = onboarding_status["completed_at"].isoformat() if onboarding_status["completed_at"] else None - def _create_plan(): - plan = DailyWorkflowPlan( - user_id=user_id, - date=date_str, - source="agent", - plan_json=plan_data, - created_at=datetime.utcnow(), - updated_at=datetime.utcnow(), - ) - db.add(plan) - db.commit() - db.refresh(plan) + tasks = plan_data.get("tasks", []) if isinstance(plan_data, dict) else [] + + def _replace_plan() -> DailyWorkflowPlan: + if existing: + db.query(DailyWorkflowTask).filter(DailyWorkflowTask.plan_id == existing.id).delete(synchronize_session=False) + plan = existing + plan.source = "agent" + plan.plan_json = plan_data + plan.updated_at = datetime.utcnow() + db.add(plan) + db.commit() + db.refresh(plan) + else: + plan = DailyWorkflowPlan( + user_id=user_id, + date=date_str, + source="agent", + plan_json=plan_data, + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), + ) + db.add(plan) + db.commit() + db.refresh(plan) for t in tasks: pillar_id = str(t.get("pillarId") or "").lower().strip() if pillar_id not in PILLAR_IDS: continue + + title = str(t.get("title") or "Task").strip()[:255] + description = str(t.get("description") or "").strip() + task_hash = _compute_task_hash(title, description) + preserved = existing_hash_status.get(task_hash) or {} + task = DailyWorkflowTask( plan_id=plan.id, user_id=user_id, pillar_id=pillar_id, - title=str(t.get("title") or "Task").strip()[:255], - description=str(t.get("description") or "").strip(), - status=_coerce_status(t.get("status")), + title=title, + description=description, + status=preserved.get("status") or _coerce_status(t.get("status")), priority=_coerce_priority(t.get("priority")), estimated_time=int(t.get("estimatedTime") or 15), action_type=str(t.get("actionType") or "navigate").strip()[:20], @@ -528,14 +605,53 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt enabled=bool(t.get("enabled", True)), created_at=datetime.utcnow(), updated_at=datetime.utcnow(), + decided_at=preserved.get("decided_at"), + completion_notes=preserved.get("completion_notes"), ) db.add(task) - + db.commit() + db.refresh(plan) return plan - plan = await run_in_threadpool(_create_plan) - return plan, True + return await run_in_threadpool(_replace_plan) + + +async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]: + from starlette.concurrency import run_in_threadpool + + date_str = date or _today_date_str() + + existing = await run_in_threadpool( + lambda: ( + db.query(DailyWorkflowPlan) + .filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str) + .first() + ) + ) + + if existing: + metadata = _extract_plan_metadata(existing) + onboarding_status = _get_onboarding_status(user_id) + + should_regenerate = False + if metadata["generation_mode"] == "fallback" and metadata["quality_score"] < FALLBACK_REGENERATION_QUALITY_THRESHOLD: + should_regenerate = True + + if ( + onboarding_status["is_completed"] + and not metadata["onboarding_completed"] + ): + should_regenerate = True + + if should_regenerate: + regenerated = await regenerate_daily_workflow_plan(db, user_id, date=date_str) + return regenerated, True + + return existing, False + + created = await regenerate_daily_workflow_plan(db, user_id, date=date_str) + return created, True def update_task_status(