diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index d8ace76f..2e1efc3f 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -1,13 +1,14 @@ from fastapi import APIRouter, Depends, HTTPException from typing import Any, Dict, Optional -from datetime import datetime +from datetime import datetime, timezone +from collections import defaultdict, deque from loguru import logger from sqlalchemy.orm import Session from middleware.auth_middleware import get_current_user from services.database import get_db -from services.today_workflow_service import coerce_dependencies, get_or_create_daily_workflow_plan, update_task_status +from services.today_workflow_service import get_or_create_daily_workflow_plan, regenerate_daily_workflow_plan, update_task_status from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask import asyncio from services.intelligence.txtai_service import TxtaiIntelligenceService @@ -15,6 +16,27 @@ from services.intelligence.txtai_service import TxtaiIntelligenceService router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"]) +REGENERATE_WINDOW_SECONDS = 60 +REGENERATE_MAX_REQUESTS_PER_WINDOW = 3 +_regen_request_log: dict[str, deque[float]] = defaultdict(deque) + + +def _check_regenerate_rate_limit(user_id: str) -> None: + import time + + now = time.time() + window_start = now - REGENERATE_WINDOW_SECONDS + history = _regen_request_log[user_id] + + while history and history[0] < window_start: + history.popleft() + + if len(history) >= REGENERATE_MAX_REQUESTS_PER_WINDOW: + raise HTTPException(status_code=429, detail="Regeneration rate limit exceeded") + + history.append(now) + + async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str): svc = TxtaiIntelligenceService(user_id) items = [] @@ -42,22 +64,6 @@ async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: return -def _build_provenance_summary(plan: DailyWorkflowPlan, tasks: list[DailyWorkflowTask]) -> Dict[str, Any]: - source_counts: Dict[str, int] = {} - for task in tasks: - metadata = task.metadata_json if isinstance(task.metadata_json, dict) else {} - source = metadata.get("source") if metadata.get("source") in {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"} else "llm_generation" - source_counts[source] = source_counts.get(source, 0) + 1 - - generation_mode = plan.generation_mode if plan.generation_mode in {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"} else "llm_generation" - - return { - "generationMode": generation_mode, - "committeeAgentCount": int(plan.committee_agent_count or 0), - "fallbackUsed": bool(plan.fallback_used), - "taskSourceBreakdown": source_counts, - } - @router.get("") async def get_today_workflow( date: Optional[str] = None, @@ -77,20 +83,6 @@ async def get_today_workflow( ) tasks = await run_in_threadpool(_fetch_tasks) - provenance_summary = _build_provenance_summary(plan, tasks) - - def _normalize_legacy_dependencies(task_rows): - rows_updated = False - for row in task_rows: - normalized_dependencies = coerce_dependencies(row.dependencies) - if row.dependencies != normalized_dependencies: - row.dependencies = normalized_dependencies - db.add(row) - rows_updated = True - if rows_updated: - db.commit() - - await run_in_threadpool(_normalize_legacy_dependencies, tasks) response_tasks = [] for t in tasks: @@ -103,7 +95,7 @@ async def get_today_workflow( "status": "skipped" if t.status == "dismissed" else t.status, "priority": t.priority, "estimatedTime": t.estimated_time, - "dependencies": coerce_dependencies(t.dependencies), + "dependencies": t.dependencies or [], "actionUrl": t.action_url, "actionType": t.action_type, "metadata": t.metadata_json or {}, @@ -183,7 +175,6 @@ async def get_today_workflow( "workflowStatus": workflow_status, "totalEstimatedTime": total_estimated, "actualTimeSpent": 0, - "provenanceSummary": provenance_summary, }, "plan": { "id": plan.id, @@ -191,10 +182,9 @@ async def get_today_workflow( "source": plan.source, "created_at": plan.created_at.isoformat() if plan.created_at else None, "updated_at": plan.updated_at.isoformat() if plan.updated_at else None, - "generation_mode": plan.generation_mode, - "committee_agent_count": plan.committee_agent_count, - "fallback_used": plan.fallback_used, - "provenance_summary": provenance_summary, + "generation_mode": (plan.plan_json or {}).get("generation_mode"), + "quality_score": (plan.plan_json or {}).get("quality_score"), + "generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"), }, }, "timestamp": datetime.utcnow().isoformat(), @@ -202,6 +192,67 @@ async def get_today_workflow( } +@router.post("/regenerate") +async def regenerate_today_workflow( + date: Optional[str] = None, + current_user: dict = Depends(get_current_user), + db: Session = Depends(get_db), +) -> Dict[str, Any]: + from starlette.concurrency import run_in_threadpool + + user_id = str(current_user.get("id")) + _check_regenerate_rate_limit(user_id) + + plan = await regenerate_daily_workflow_plan(db, user_id, date=date) + + tasks = await run_in_threadpool( + lambda: ( + db.query(DailyWorkflowTask) + .filter(DailyWorkflowTask.plan_id == plan.id, DailyWorkflowTask.user_id == user_id) + .order_by(DailyWorkflowTask.created_at.asc()) + .all() + ) + ) + + response_tasks = [ + { + "id": str(t.id), + "pillarId": t.pillar_id, + "title": t.title, + "description": t.description, + "status": "skipped" if t.status == "dismissed" else t.status, + "priority": t.priority, + "estimatedTime": t.estimated_time, + "dependencies": t.dependencies or [], + "actionUrl": t.action_url, + "actionType": t.action_type, + "metadata": t.metadata_json or {}, + "enabled": bool(t.enabled), + } + for t in tasks + ] + + asyncio.create_task(_index_tasks_to_sif(user_id, plan.date, response_tasks, label="today_regenerated")) + + return { + "success": True, + "data": { + "plan": { + "id": plan.id, + "date": plan.date, + "source": plan.source, + "generation_mode": (plan.plan_json or {}).get("generation_mode"), + "quality_score": (plan.plan_json or {}).get("quality_score"), + "generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"), + "regenerated_at": datetime.now(timezone.utc).isoformat(), + }, + "tasks": response_tasks, + }, + "timestamp": datetime.utcnow().isoformat(), + "user_id": user_id, + } + + from services.task_memory_service import TaskMemoryService @router.post("/tasks/{task_id}/status") @@ -226,7 +277,7 @@ async def set_task_status( memory = TaskMemoryService(user_id, db) await memory.record_task_outcome( task, - feedback_score=1 if status == "completed" else -1 if status in ("dismissed", "skipped") else 0, + feedback_score=1 if status == "completed" else -1 if status == "dismissed" else 0, feedback_text=completion_notes ) except Exception as e: @@ -245,7 +296,7 @@ async def set_task_status( "pillarId": task.pillar_id, "title": task.title, "description": task.description, - "status": "skipped" if task.status in ("dismissed", "skipped") else task.status, + "status": "skipped" if task.status == "dismissed" else task.status, } asyncio.create_task(_index_tasks_to_sif(user_id, plan_date, [task_payload], label="today")) @@ -255,7 +306,7 @@ async def set_task_status( "task": { "id": str(task.id), "pillarId": task.pillar_id, - "status": "skipped" if task.status in ("dismissed", "skipped") else task.status, + "status": "skipped" if task.status == "dismissed" else task.status, "decided_at": task.decided_at.isoformat() if task.decided_at else None, } }, diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 0c2db96f..e9862d70 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -1,3 +1,4 @@ +import hashlib import json from datetime import datetime, timezone from typing import Any, Dict, List, Optional @@ -8,9 +9,11 @@ from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask from models.agent_activity_models import AgentAlert from services.agent_activity_service import AgentActivityService, build_agent_event_payload from services.llm_providers.main_text_generation import llm_text_gen +from services.onboarding.progress_service import OnboardingProgressService from loguru import logger PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] +FALLBACK_REGENERATION_QUALITY_THRESHOLD = 0.6 def _today_date_str() -> str: @@ -107,6 +110,37 @@ def _fallback_tasks(date: str) -> List[Dict[str, Any]]: ] + + +def _compute_task_hash(title: str, description: str) -> str: + text = f"{title.strip().lower()}|{description.strip().lower()}" + return hashlib.sha256(text.encode()).hexdigest() + + +def _extract_plan_metadata(plan: Optional[DailyWorkflowPlan]) -> Dict[str, Any]: + raw = plan.plan_json if plan and isinstance(plan.plan_json, dict) else {} + return { + "generation_mode": str(raw.get("generation_mode") or "").strip().lower() or "unknown", + "quality_score": float(raw.get("quality_score") or 0.0), + "generated_with_agents": bool(raw.get("generated_with_agents", False)), + "onboarding_completed": bool(raw.get("onboarding_completed", False)), + "onboarding_completed_at": raw.get("onboarding_completed_at"), + } + + +def _get_onboarding_status(user_id: str) -> Dict[str, Any]: + status = OnboardingProgressService().get_onboarding_status(user_id) or {} + completed_at_raw = status.get("completed_at") + completed_at = None + if completed_at_raw: + try: + completed_at = datetime.fromisoformat(str(completed_at_raw).replace("Z", "+00:00")) + except Exception: + completed_at = None + return { + "is_completed": bool(status.get("is_completed", False)), + "completed_at": completed_at, + } def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool: workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} if not isinstance(workflow_config, dict): @@ -276,147 +310,85 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> activity = AgentActivityService(db, user_id) grounding = build_grounding_context(db, user_id, date) memory_service = TaskMemoryService(user_id, db) - min_active_agents = 2 - generation_path = "committee" # 1. Get Orchestrator try: orchestrator = await orchestration_service.get_or_create_orchestrator(user_id) except Exception as e: logger.error(f"Failed to get orchestrator: {e}") - fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding) - return { - "date": date, - "tasks": fallback_tasks, - "metadata": { - "generation_path": "controlled_fallback", - "committee": { - "minimum_active_agents": min_active_agents, - "active_agents": {"count": 0, "names": []}, - }, - "degraded": { - "is_degraded": True, - "reason": "orchestrator_unavailable", - "missing_agents": [], - }, - }, - } - - expected_committee_agents = ["content", "strategy", "seo", "social", "competitor"] - active_agent_names = sorted(orchestrator.agents.keys()) - active_agents_count = len(active_agent_names) - missing_agents = [name for name in expected_committee_agents if name not in active_agent_names] - onboarding_gated_initialization = bool(getattr(orchestrator, "onboarding_gated_initialization", False)) - initialization_state = ( - getattr(orchestrator, "initialization_state", None) - if isinstance(getattr(orchestrator, "initialization_state", None), dict) - else {} - ) - - degraded_metadata = { - "is_degraded": False, - "reason": None, - "missing_agents": [], - } - - if active_agents_count < min_active_agents: - generation_path = "controlled_fallback" - degraded_metadata = { - "is_degraded": True, - "reason": "insufficient_active_agents", - "missing_agents": missing_agents, - } - activity.log_event( - event_type="committee_health", - severity="warning", - message="Committee degraded: insufficient active agents", - payload=build_agent_event_payload( - phase="planning", - step="committee_health_precheck", - progress_percent=5, - output_summary=f"Only {active_agents_count} active committee agents", - decision_reason="Agent committee below configured minimum", - evidence_refs=active_agent_names, - safe_debug=True, - metadata={ - "minimum_active_agents": min_active_agents, - "active_agents": { - "count": active_agents_count, - "names": active_agent_names, - }, - "missing_agents": missing_agents, - "onboarding_gated_initialization": onboarding_gated_initialization, - "orchestrator_initialization_state": initialization_state, - }, - ), - agent_type="TodayWorkflowGenerator", - ) + return {"date": date, "tasks": _fallback_tasks(date), "generation_mode": "fallback", "quality_score": 0.3, "generated_with_agents": False} # 2. Parallel "Committee" Proposal Gathering logger.info(f"Gathering daily task proposals from agent committee for user {user_id}") - + agent_tasks = [] - if generation_path == "committee": - try: - # Define agents to poll - agents_to_poll = [ - orchestrator.agents.get('content'), # ContentStrategyAgent - orchestrator.agents.get('strategy'), # StrategyArchitectAgent - orchestrator.agents.get('seo'), # SEOOptimizationAgent - orchestrator.agents.get('social'), # SocialAmplificationAgent - orchestrator.agents.get('competitor'), # CompetitorResponseAgent - ] + try: + # Define agents to poll + agents_to_poll = [ + orchestrator.agents.get('content'), # ContentStrategyAgent + orchestrator.agents.get('strategy'), # StrategyArchitectAgent + orchestrator.agents.get('seo'), # SEOOptimizationAgent + orchestrator.agents.get('social'), # SocialAmplificationAgent + orchestrator.agents.get('competitor'), # CompetitorResponseAgent + ] + + # Filter out None agents (disabled/failed init) + active_agents = [a for a in agents_to_poll if a] + + # Execute propose_daily_tasks in parallel + results = await asyncio.gather( + *[a.propose_daily_tasks(grounding) for a in active_agents], + return_exceptions=True + ) + + # Collect successful proposals + raw_proposals = [] + for res in results: + if isinstance(res, list): + raw_proposals.extend(res) + elif isinstance(res, Exception): + logger.warning(f"Agent proposal failed: {res}") - # Filter out None agents (disabled/failed init) - active_agents = [a for a in agents_to_poll if a] + # 3. Filter Redundant Proposals (Self-Learning) + # Note: We need to ensure we don't filter out essential recurring tasks if they were completed long ago + # But for now, we filter exact duplicates from recent history (last 7 days) + # We can implement semantic filtering later + + # Simple deduplication based on title+pillar + unique_map = {} + for p in raw_proposals: + key = f"{p.pillar_id}:{p.title}" + if key not in unique_map: + unique_map[key] = p + continue - # Execute propose_daily_tasks in parallel - results = await asyncio.gather( - *[a.propose_daily_tasks(grounding) for a in active_agents], - return_exceptions=True - ) + existing = unique_map[key] + if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority): + unique_map[key] = p + continue - # Collect successful proposals - raw_proposals = [] - for res in results: - if isinstance(res, list): - raw_proposals.extend(res) - elif isinstance(res, Exception): - logger.warning(f"Agent proposal failed: {res}") + # Deterministic tie-breaker for equal priority proposals. + if ( + _proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority) + and _proposal_order_key(p) < _proposal_order_key(existing) + ): + unique_map[key] = p + + agent_tasks = list(unique_map.values()) + + # Phase 3: Check memory for rejections (Semantic Filter) + agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks) + + except Exception as e: + logger.error(f"Committee proposal phase failed: {e}") + # Continue to fallback or LLM generation if committee fails - # Simple deduplication based on title+pillar - unique_map = {} - for p in raw_proposals: - key = f"{p.pillar_id}:{p.title}" - if key not in unique_map: - unique_map[key] = p - continue - - existing = unique_map[key] - if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority): - unique_map[key] = p - continue - - # Deterministic tie-breaker for equal priority proposals. - if ( - _proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority) - and _proposal_order_key(p) < _proposal_order_key(existing) - ): - unique_map[key] = p - - agent_tasks = list(unique_map.values()) - - # Check memory for rejections (semantic filter) - agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks) - - except Exception as e: - logger.error(f"Committee proposal phase failed: {e}") - generation_path = "llm_fallback" - - # 3. Final Selection - if generation_path == "committee" and agent_tasks: + # 4. Final Selection + # If we have agent tasks, use them. Otherwise fall back to LLM generation. + if agent_tasks: logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee") - + + # Convert TaskProposal objects to dicts for frontend final_tasks = [] for prop in agent_tasks: final_tasks.append({ @@ -434,50 +406,17 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> "context_data": prop.context_data } }) - + final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) return { "date": date, "tasks": final_tasks, - "metadata": { - "generation_path": "committee", - "committee": { - "minimum_active_agents": min_active_agents, - "active_agents": { - "count": active_agents_count, - "names": active_agent_names, - }, - "onboarding_gated_initialization": onboarding_gated_initialization, - "orchestrator_initialization_state": initialization_state, - }, - "degraded": degraded_metadata, - }, + "generation_mode": "agent_committee", + "quality_score": 0.9, + "generated_with_agents": True, } - if generation_path != "controlled_fallback": - generation_path = "llm_fallback" - - if generation_path == "controlled_fallback": - fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding) - return { - "date": date, - "tasks": fallback_tasks, - "metadata": { - "generation_path": "controlled_fallback", - "committee": { - "minimum_active_agents": min_active_agents, - "active_agents": { - "count": active_agents_count, - "names": active_agent_names, - }, - "onboarding_gated_initialization": onboarding_gated_initialization, - "orchestrator_initialization_state": initialization_state, - }, - "degraded": degraded_metadata, - }, - } - - # Fallback to original LLM generation if committee returned nothing + # Fallback to original LLM generation if agents returned nothing logger.info("Agent committee returned no tasks, falling back to LLM generation") schema = { @@ -533,6 +472,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> agent_type="TodayWorkflowGenerator", ) + used_fallback = False try: raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id) if isinstance(raw, dict): @@ -541,6 +481,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> try: result = json.loads(raw) except Exception: + used_fallback = True result = {"date": date, "tasks": _fallback_tasks(date)} except Exception as e: activity.log_event( @@ -551,36 +492,26 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> run_id=run.id, agent_type="TodayWorkflowGenerator", ) + used_fallback = True result = {"date": date, "tasks": _fallback_tasks(date)} tasks = result.get("tasks") if isinstance(result, dict) else None if not isinstance(tasks, list) or not tasks: - generation_path = "controlled_fallback" + used_fallback = True tasks = _fallback_tasks(date) - result = { "date": date, "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), - "metadata": { - "generation_path": generation_path, - "committee": { - "minimum_active_agents": min_active_agents, - "active_agents": { - "count": active_agents_count, - "names": active_agent_names, - }, - "onboarding_gated_initialization": onboarding_gated_initialization, - "orchestrator_initialization_state": initialization_state, - }, - "degraded": degraded_metadata, - }, + "generation_mode": "fallback" if used_fallback else "llm", + "quality_score": 0.4 if used_fallback else 0.75, + "generated_with_agents": False, } activity.log_event( event_type="final_summary", severity="info", message="Daily workflow plan generated", - payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", [])), "generation_path": generation_path}), + payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", []))}), run_id=run.id, agent_type="TodayWorkflowGenerator", ) @@ -588,50 +519,83 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> return result -async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]: +async def regenerate_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> DailyWorkflowPlan: from starlette.concurrency import run_in_threadpool - + date_str = date or _today_date_str() - - def _get_existing(): - return ( + onboarding_status = _get_onboarding_status(user_id) + + existing = await run_in_threadpool( + lambda: ( db.query(DailyWorkflowPlan) .filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str) .first() ) - - existing = await run_in_threadpool(_get_existing) - + ) + + existing_hash_status = {} if existing: - return existing, False + existing_tasks = await run_in_threadpool( + lambda: ( + db.query(DailyWorkflowTask) + .filter(DailyWorkflowTask.plan_id == existing.id, DailyWorkflowTask.user_id == user_id) + .all() + ) + ) + for task in existing_tasks: + task_hash = _compute_task_hash(task.title, task.description) + existing_hash_status[task_hash] = { + "status": task.status, + "decided_at": task.decided_at, + "completion_notes": task.completion_notes, + } plan_data = await generate_agent_enhanced_plan(db, user_id, date_str) - tasks = plan_data.get("tasks", []) + plan_data["onboarding_completed"] = onboarding_status["is_completed"] + plan_data["onboarding_completed_at"] = onboarding_status["completed_at"].isoformat() if onboarding_status["completed_at"] else None - def _create_plan(): - plan = DailyWorkflowPlan( - user_id=user_id, - date=date_str, - source="agent", - plan_json=plan_data, - created_at=datetime.utcnow(), - updated_at=datetime.utcnow(), - ) - db.add(plan) - db.commit() - db.refresh(plan) + tasks = plan_data.get("tasks", []) if isinstance(plan_data, dict) else [] + + def _replace_plan() -> DailyWorkflowPlan: + if existing: + db.query(DailyWorkflowTask).filter(DailyWorkflowTask.plan_id == existing.id).delete(synchronize_session=False) + plan = existing + plan.source = "agent" + plan.plan_json = plan_data + plan.updated_at = datetime.utcnow() + db.add(plan) + db.commit() + db.refresh(plan) + else: + plan = DailyWorkflowPlan( + user_id=user_id, + date=date_str, + source="agent", + plan_json=plan_data, + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), + ) + db.add(plan) + db.commit() + db.refresh(plan) for t in tasks: pillar_id = str(t.get("pillarId") or "").lower().strip() if pillar_id not in PILLAR_IDS: continue + + title = str(t.get("title") or "Task").strip()[:255] + description = str(t.get("description") or "").strip() + task_hash = _compute_task_hash(title, description) + preserved = existing_hash_status.get(task_hash) or {} + task = DailyWorkflowTask( plan_id=plan.id, user_id=user_id, pillar_id=pillar_id, - title=str(t.get("title") or "Task").strip()[:255], - description=str(t.get("description") or "").strip(), - status=_coerce_status(t.get("status")), + title=title, + description=description, + status=preserved.get("status") or _coerce_status(t.get("status")), priority=_coerce_priority(t.get("priority")), estimated_time=int(t.get("estimatedTime") or 15), action_type=str(t.get("actionType") or "navigate").strip()[:20], @@ -641,14 +605,53 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt enabled=bool(t.get("enabled", True)), created_at=datetime.utcnow(), updated_at=datetime.utcnow(), + decided_at=preserved.get("decided_at"), + completion_notes=preserved.get("completion_notes"), ) db.add(task) - + db.commit() + db.refresh(plan) return plan - plan = await run_in_threadpool(_create_plan) - return plan, True + return await run_in_threadpool(_replace_plan) + + +async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]: + from starlette.concurrency import run_in_threadpool + + date_str = date or _today_date_str() + + existing = await run_in_threadpool( + lambda: ( + db.query(DailyWorkflowPlan) + .filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str) + .first() + ) + ) + + if existing: + metadata = _extract_plan_metadata(existing) + onboarding_status = _get_onboarding_status(user_id) + + should_regenerate = False + if metadata["generation_mode"] == "fallback" and metadata["quality_score"] < FALLBACK_REGENERATION_QUALITY_THRESHOLD: + should_regenerate = True + + if ( + onboarding_status["is_completed"] + and not metadata["onboarding_completed"] + ): + should_regenerate = True + + if should_regenerate: + regenerated = await regenerate_daily_workflow_plan(db, user_id, date=date_str) + return regenerated, True + + return existing, False + + created = await regenerate_daily_workflow_plan(db, user_id, date=date_str) + return created, True def update_task_status(