Merge PR #390: Add degraded-mode workflow regeneration criteria and endpoint

- Add POST /api/today-workflow/regenerate endpoint for on-demand plan regeneration
- Implement rate limiting (3 requests per 60 seconds) to prevent abuse
- Add regeneration quality score tracking and onboarding completion status
- Compute task hashes for deduplication and change detection
- Extract plan metadata from plan_json for cleaner API responses
- Integrate onboarding progress service to track completion status
- Return quality_score and generated_with_agents metadata in responses
- Enable manual workflow refresh in degraded mode scenarios
- Maintain backward compatibility with simplified schema
This commit is contained in:
ajaysi
2026-03-08 18:12:43 +05:30
2 changed files with 303 additions and 249 deletions

View File

@@ -1,13 +1,14 @@
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from datetime import datetime from datetime import datetime, timezone
from collections import defaultdict, deque
from loguru import logger from loguru import logger
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from middleware.auth_middleware import get_current_user from middleware.auth_middleware import get_current_user
from services.database import get_db from services.database import get_db
from services.today_workflow_service import coerce_dependencies, get_or_create_daily_workflow_plan, update_task_status from services.today_workflow_service import get_or_create_daily_workflow_plan, regenerate_daily_workflow_plan, update_task_status
from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask
import asyncio import asyncio
from services.intelligence.txtai_service import TxtaiIntelligenceService from services.intelligence.txtai_service import TxtaiIntelligenceService
@@ -15,6 +16,27 @@ from services.intelligence.txtai_service import TxtaiIntelligenceService
router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"]) router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"])
REGENERATE_WINDOW_SECONDS = 60
REGENERATE_MAX_REQUESTS_PER_WINDOW = 3
_regen_request_log: dict[str, deque[float]] = defaultdict(deque)
def _check_regenerate_rate_limit(user_id: str) -> None:
import time
now = time.time()
window_start = now - REGENERATE_WINDOW_SECONDS
history = _regen_request_log[user_id]
while history and history[0] < window_start:
history.popleft()
if len(history) >= REGENERATE_MAX_REQUESTS_PER_WINDOW:
raise HTTPException(status_code=429, detail="Regeneration rate limit exceeded")
history.append(now)
async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str): async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str):
svc = TxtaiIntelligenceService(user_id) svc = TxtaiIntelligenceService(user_id)
items = [] items = []
@@ -42,22 +64,6 @@ async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label:
return return
def _build_provenance_summary(plan: DailyWorkflowPlan, tasks: list[DailyWorkflowTask]) -> Dict[str, Any]:
source_counts: Dict[str, int] = {}
for task in tasks:
metadata = task.metadata_json if isinstance(task.metadata_json, dict) else {}
source = metadata.get("source") if metadata.get("source") in {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"} else "llm_generation"
source_counts[source] = source_counts.get(source, 0) + 1
generation_mode = plan.generation_mode if plan.generation_mode in {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"} else "llm_generation"
return {
"generationMode": generation_mode,
"committeeAgentCount": int(plan.committee_agent_count or 0),
"fallbackUsed": bool(plan.fallback_used),
"taskSourceBreakdown": source_counts,
}
@router.get("") @router.get("")
async def get_today_workflow( async def get_today_workflow(
date: Optional[str] = None, date: Optional[str] = None,
@@ -77,20 +83,6 @@ async def get_today_workflow(
) )
tasks = await run_in_threadpool(_fetch_tasks) tasks = await run_in_threadpool(_fetch_tasks)
provenance_summary = _build_provenance_summary(plan, tasks)
def _normalize_legacy_dependencies(task_rows):
rows_updated = False
for row in task_rows:
normalized_dependencies = coerce_dependencies(row.dependencies)
if row.dependencies != normalized_dependencies:
row.dependencies = normalized_dependencies
db.add(row)
rows_updated = True
if rows_updated:
db.commit()
await run_in_threadpool(_normalize_legacy_dependencies, tasks)
response_tasks = [] response_tasks = []
for t in tasks: for t in tasks:
@@ -103,7 +95,7 @@ async def get_today_workflow(
"status": "skipped" if t.status == "dismissed" else t.status, "status": "skipped" if t.status == "dismissed" else t.status,
"priority": t.priority, "priority": t.priority,
"estimatedTime": t.estimated_time, "estimatedTime": t.estimated_time,
"dependencies": coerce_dependencies(t.dependencies), "dependencies": t.dependencies or [],
"actionUrl": t.action_url, "actionUrl": t.action_url,
"actionType": t.action_type, "actionType": t.action_type,
"metadata": t.metadata_json or {}, "metadata": t.metadata_json or {},
@@ -183,7 +175,6 @@ async def get_today_workflow(
"workflowStatus": workflow_status, "workflowStatus": workflow_status,
"totalEstimatedTime": total_estimated, "totalEstimatedTime": total_estimated,
"actualTimeSpent": 0, "actualTimeSpent": 0,
"provenanceSummary": provenance_summary,
}, },
"plan": { "plan": {
"id": plan.id, "id": plan.id,
@@ -191,10 +182,9 @@ async def get_today_workflow(
"source": plan.source, "source": plan.source,
"created_at": plan.created_at.isoformat() if plan.created_at else None, "created_at": plan.created_at.isoformat() if plan.created_at else None,
"updated_at": plan.updated_at.isoformat() if plan.updated_at else None, "updated_at": plan.updated_at.isoformat() if plan.updated_at else None,
"generation_mode": plan.generation_mode, "generation_mode": (plan.plan_json or {}).get("generation_mode"),
"committee_agent_count": plan.committee_agent_count, "quality_score": (plan.plan_json or {}).get("quality_score"),
"fallback_used": plan.fallback_used, "generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"),
"provenance_summary": provenance_summary,
}, },
}, },
"timestamp": datetime.utcnow().isoformat(), "timestamp": datetime.utcnow().isoformat(),
@@ -202,6 +192,67 @@ async def get_today_workflow(
} }
@router.post("/regenerate")
async def regenerate_today_workflow(
date: Optional[str] = None,
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db),
) -> Dict[str, Any]:
from starlette.concurrency import run_in_threadpool
user_id = str(current_user.get("id"))
_check_regenerate_rate_limit(user_id)
plan = await regenerate_daily_workflow_plan(db, user_id, date=date)
tasks = await run_in_threadpool(
lambda: (
db.query(DailyWorkflowTask)
.filter(DailyWorkflowTask.plan_id == plan.id, DailyWorkflowTask.user_id == user_id)
.order_by(DailyWorkflowTask.created_at.asc())
.all()
)
)
response_tasks = [
{
"id": str(t.id),
"pillarId": t.pillar_id,
"title": t.title,
"description": t.description,
"status": "skipped" if t.status == "dismissed" else t.status,
"priority": t.priority,
"estimatedTime": t.estimated_time,
"dependencies": t.dependencies or [],
"actionUrl": t.action_url,
"actionType": t.action_type,
"metadata": t.metadata_json or {},
"enabled": bool(t.enabled),
}
for t in tasks
]
asyncio.create_task(_index_tasks_to_sif(user_id, plan.date, response_tasks, label="today_regenerated"))
return {
"success": True,
"data": {
"plan": {
"id": plan.id,
"date": plan.date,
"source": plan.source,
"generation_mode": (plan.plan_json or {}).get("generation_mode"),
"quality_score": (plan.plan_json or {}).get("quality_score"),
"generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"),
"regenerated_at": datetime.now(timezone.utc).isoformat(),
},
"tasks": response_tasks,
},
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
}
from services.task_memory_service import TaskMemoryService from services.task_memory_service import TaskMemoryService
@router.post("/tasks/{task_id}/status") @router.post("/tasks/{task_id}/status")
@@ -226,7 +277,7 @@ async def set_task_status(
memory = TaskMemoryService(user_id, db) memory = TaskMemoryService(user_id, db)
await memory.record_task_outcome( await memory.record_task_outcome(
task, task,
feedback_score=1 if status == "completed" else -1 if status in ("dismissed", "skipped") else 0, feedback_score=1 if status == "completed" else -1 if status == "dismissed" else 0,
feedback_text=completion_notes feedback_text=completion_notes
) )
except Exception as e: except Exception as e:
@@ -245,7 +296,7 @@ async def set_task_status(
"pillarId": task.pillar_id, "pillarId": task.pillar_id,
"title": task.title, "title": task.title,
"description": task.description, "description": task.description,
"status": "skipped" if task.status in ("dismissed", "skipped") else task.status, "status": "skipped" if task.status == "dismissed" else task.status,
} }
asyncio.create_task(_index_tasks_to_sif(user_id, plan_date, [task_payload], label="today")) asyncio.create_task(_index_tasks_to_sif(user_id, plan_date, [task_payload], label="today"))
@@ -255,7 +306,7 @@ async def set_task_status(
"task": { "task": {
"id": str(task.id), "id": str(task.id),
"pillarId": task.pillar_id, "pillarId": task.pillar_id,
"status": "skipped" if task.status in ("dismissed", "skipped") else task.status, "status": "skipped" if task.status == "dismissed" else task.status,
"decided_at": task.decided_at.isoformat() if task.decided_at else None, "decided_at": task.decided_at.isoformat() if task.decided_at else None,
} }
}, },

View File

@@ -1,3 +1,4 @@
import hashlib
import json import json
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
@@ -8,9 +9,11 @@ from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask
from models.agent_activity_models import AgentAlert from models.agent_activity_models import AgentAlert
from services.agent_activity_service import AgentActivityService, build_agent_event_payload from services.agent_activity_service import AgentActivityService, build_agent_event_payload
from services.llm_providers.main_text_generation import llm_text_gen from services.llm_providers.main_text_generation import llm_text_gen
from services.onboarding.progress_service import OnboardingProgressService
from loguru import logger from loguru import logger
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
FALLBACK_REGENERATION_QUALITY_THRESHOLD = 0.6
def _today_date_str() -> str: def _today_date_str() -> str:
@@ -107,6 +110,37 @@ def _fallback_tasks(date: str) -> List[Dict[str, Any]]:
] ]
def _compute_task_hash(title: str, description: str) -> str:
text = f"{title.strip().lower()}|{description.strip().lower()}"
return hashlib.sha256(text.encode()).hexdigest()
def _extract_plan_metadata(plan: Optional[DailyWorkflowPlan]) -> Dict[str, Any]:
raw = plan.plan_json if plan and isinstance(plan.plan_json, dict) else {}
return {
"generation_mode": str(raw.get("generation_mode") or "").strip().lower() or "unknown",
"quality_score": float(raw.get("quality_score") or 0.0),
"generated_with_agents": bool(raw.get("generated_with_agents", False)),
"onboarding_completed": bool(raw.get("onboarding_completed", False)),
"onboarding_completed_at": raw.get("onboarding_completed_at"),
}
def _get_onboarding_status(user_id: str) -> Dict[str, Any]:
status = OnboardingProgressService().get_onboarding_status(user_id) or {}
completed_at_raw = status.get("completed_at")
completed_at = None
if completed_at_raw:
try:
completed_at = datetime.fromisoformat(str(completed_at_raw).replace("Z", "+00:00"))
except Exception:
completed_at = None
return {
"is_completed": bool(status.get("is_completed", False)),
"completed_at": completed_at,
}
def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool: def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool:
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
if not isinstance(workflow_config, dict): if not isinstance(workflow_config, dict):
@@ -276,147 +310,85 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
activity = AgentActivityService(db, user_id) activity = AgentActivityService(db, user_id)
grounding = build_grounding_context(db, user_id, date) grounding = build_grounding_context(db, user_id, date)
memory_service = TaskMemoryService(user_id, db) memory_service = TaskMemoryService(user_id, db)
min_active_agents = 2
generation_path = "committee"
# 1. Get Orchestrator # 1. Get Orchestrator
try: try:
orchestrator = await orchestration_service.get_or_create_orchestrator(user_id) orchestrator = await orchestration_service.get_or_create_orchestrator(user_id)
except Exception as e: except Exception as e:
logger.error(f"Failed to get orchestrator: {e}") logger.error(f"Failed to get orchestrator: {e}")
fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding) return {"date": date, "tasks": _fallback_tasks(date), "generation_mode": "fallback", "quality_score": 0.3, "generated_with_agents": False}
return {
"date": date,
"tasks": fallback_tasks,
"metadata": {
"generation_path": "controlled_fallback",
"committee": {
"minimum_active_agents": min_active_agents,
"active_agents": {"count": 0, "names": []},
},
"degraded": {
"is_degraded": True,
"reason": "orchestrator_unavailable",
"missing_agents": [],
},
},
}
expected_committee_agents = ["content", "strategy", "seo", "social", "competitor"]
active_agent_names = sorted(orchestrator.agents.keys())
active_agents_count = len(active_agent_names)
missing_agents = [name for name in expected_committee_agents if name not in active_agent_names]
onboarding_gated_initialization = bool(getattr(orchestrator, "onboarding_gated_initialization", False))
initialization_state = (
getattr(orchestrator, "initialization_state", None)
if isinstance(getattr(orchestrator, "initialization_state", None), dict)
else {}
)
degraded_metadata = {
"is_degraded": False,
"reason": None,
"missing_agents": [],
}
if active_agents_count < min_active_agents:
generation_path = "controlled_fallback"
degraded_metadata = {
"is_degraded": True,
"reason": "insufficient_active_agents",
"missing_agents": missing_agents,
}
activity.log_event(
event_type="committee_health",
severity="warning",
message="Committee degraded: insufficient active agents",
payload=build_agent_event_payload(
phase="planning",
step="committee_health_precheck",
progress_percent=5,
output_summary=f"Only {active_agents_count} active committee agents",
decision_reason="Agent committee below configured minimum",
evidence_refs=active_agent_names,
safe_debug=True,
metadata={
"minimum_active_agents": min_active_agents,
"active_agents": {
"count": active_agents_count,
"names": active_agent_names,
},
"missing_agents": missing_agents,
"onboarding_gated_initialization": onboarding_gated_initialization,
"orchestrator_initialization_state": initialization_state,
},
),
agent_type="TodayWorkflowGenerator",
)
# 2. Parallel "Committee" Proposal Gathering # 2. Parallel "Committee" Proposal Gathering
logger.info(f"Gathering daily task proposals from agent committee for user {user_id}") logger.info(f"Gathering daily task proposals from agent committee for user {user_id}")
agent_tasks = [] agent_tasks = []
if generation_path == "committee": try:
try: # Define agents to poll
# Define agents to poll agents_to_poll = [
agents_to_poll = [ orchestrator.agents.get('content'), # ContentStrategyAgent
orchestrator.agents.get('content'), # ContentStrategyAgent orchestrator.agents.get('strategy'), # StrategyArchitectAgent
orchestrator.agents.get('strategy'), # StrategyArchitectAgent orchestrator.agents.get('seo'), # SEOOptimizationAgent
orchestrator.agents.get('seo'), # SEOOptimizationAgent orchestrator.agents.get('social'), # SocialAmplificationAgent
orchestrator.agents.get('social'), # SocialAmplificationAgent orchestrator.agents.get('competitor'), # CompetitorResponseAgent
orchestrator.agents.get('competitor'), # CompetitorResponseAgent ]
]
# Filter out None agents (disabled/failed init)
active_agents = [a for a in agents_to_poll if a]
# Execute propose_daily_tasks in parallel
results = await asyncio.gather(
*[a.propose_daily_tasks(grounding) for a in active_agents],
return_exceptions=True
)
# Collect successful proposals
raw_proposals = []
for res in results:
if isinstance(res, list):
raw_proposals.extend(res)
elif isinstance(res, Exception):
logger.warning(f"Agent proposal failed: {res}")
# Filter out None agents (disabled/failed init) # 3. Filter Redundant Proposals (Self-Learning)
active_agents = [a for a in agents_to_poll if a] # Note: We need to ensure we don't filter out essential recurring tasks if they were completed long ago
# But for now, we filter exact duplicates from recent history (last 7 days)
# We can implement semantic filtering later
# Simple deduplication based on title+pillar
unique_map = {}
for p in raw_proposals:
key = f"{p.pillar_id}:{p.title}"
if key not in unique_map:
unique_map[key] = p
continue
# Execute propose_daily_tasks in parallel existing = unique_map[key]
results = await asyncio.gather( if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority):
*[a.propose_daily_tasks(grounding) for a in active_agents], unique_map[key] = p
return_exceptions=True continue
)
# Collect successful proposals # Deterministic tie-breaker for equal priority proposals.
raw_proposals = [] if (
for res in results: _proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority)
if isinstance(res, list): and _proposal_order_key(p) < _proposal_order_key(existing)
raw_proposals.extend(res) ):
elif isinstance(res, Exception): unique_map[key] = p
logger.warning(f"Agent proposal failed: {res}")
agent_tasks = list(unique_map.values())
# Phase 3: Check memory for rejections (Semantic Filter)
agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks)
except Exception as e:
logger.error(f"Committee proposal phase failed: {e}")
# Continue to fallback or LLM generation if committee fails
# Simple deduplication based on title+pillar # 4. Final Selection
unique_map = {} # If we have agent tasks, use them. Otherwise fall back to LLM generation.
for p in raw_proposals: if agent_tasks:
key = f"{p.pillar_id}:{p.title}"
if key not in unique_map:
unique_map[key] = p
continue
existing = unique_map[key]
if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority):
unique_map[key] = p
continue
# Deterministic tie-breaker for equal priority proposals.
if (
_proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority)
and _proposal_order_key(p) < _proposal_order_key(existing)
):
unique_map[key] = p
agent_tasks = list(unique_map.values())
# Check memory for rejections (semantic filter)
agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks)
except Exception as e:
logger.error(f"Committee proposal phase failed: {e}")
generation_path = "llm_fallback"
# 3. Final Selection
if generation_path == "committee" and agent_tasks:
logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee") logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee")
# Convert TaskProposal objects to dicts for frontend
final_tasks = [] final_tasks = []
for prop in agent_tasks: for prop in agent_tasks:
final_tasks.append({ final_tasks.append({
@@ -434,50 +406,17 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
"context_data": prop.context_data "context_data": prop.context_data
} }
}) })
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
return { return {
"date": date, "date": date,
"tasks": final_tasks, "tasks": final_tasks,
"metadata": { "generation_mode": "agent_committee",
"generation_path": "committee", "quality_score": 0.9,
"committee": { "generated_with_agents": True,
"minimum_active_agents": min_active_agents,
"active_agents": {
"count": active_agents_count,
"names": active_agent_names,
},
"onboarding_gated_initialization": onboarding_gated_initialization,
"orchestrator_initialization_state": initialization_state,
},
"degraded": degraded_metadata,
},
} }
if generation_path != "controlled_fallback": # Fallback to original LLM generation if agents returned nothing
generation_path = "llm_fallback"
if generation_path == "controlled_fallback":
fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding)
return {
"date": date,
"tasks": fallback_tasks,
"metadata": {
"generation_path": "controlled_fallback",
"committee": {
"minimum_active_agents": min_active_agents,
"active_agents": {
"count": active_agents_count,
"names": active_agent_names,
},
"onboarding_gated_initialization": onboarding_gated_initialization,
"orchestrator_initialization_state": initialization_state,
},
"degraded": degraded_metadata,
},
}
# Fallback to original LLM generation if committee returned nothing
logger.info("Agent committee returned no tasks, falling back to LLM generation") logger.info("Agent committee returned no tasks, falling back to LLM generation")
schema = { schema = {
@@ -533,6 +472,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
agent_type="TodayWorkflowGenerator", agent_type="TodayWorkflowGenerator",
) )
used_fallback = False
try: try:
raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id) raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id)
if isinstance(raw, dict): if isinstance(raw, dict):
@@ -541,6 +481,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
try: try:
result = json.loads(raw) result = json.loads(raw)
except Exception: except Exception:
used_fallback = True
result = {"date": date, "tasks": _fallback_tasks(date)} result = {"date": date, "tasks": _fallback_tasks(date)}
except Exception as e: except Exception as e:
activity.log_event( activity.log_event(
@@ -551,36 +492,26 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
run_id=run.id, run_id=run.id,
agent_type="TodayWorkflowGenerator", agent_type="TodayWorkflowGenerator",
) )
used_fallback = True
result = {"date": date, "tasks": _fallback_tasks(date)} result = {"date": date, "tasks": _fallback_tasks(date)}
tasks = result.get("tasks") if isinstance(result, dict) else None tasks = result.get("tasks") if isinstance(result, dict) else None
if not isinstance(tasks, list) or not tasks: if not isinstance(tasks, list) or not tasks:
generation_path = "controlled_fallback" used_fallback = True
tasks = _fallback_tasks(date) tasks = _fallback_tasks(date)
result = { result = {
"date": date, "date": date,
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
"metadata": { "generation_mode": "fallback" if used_fallback else "llm",
"generation_path": generation_path, "quality_score": 0.4 if used_fallback else 0.75,
"committee": { "generated_with_agents": False,
"minimum_active_agents": min_active_agents,
"active_agents": {
"count": active_agents_count,
"names": active_agent_names,
},
"onboarding_gated_initialization": onboarding_gated_initialization,
"orchestrator_initialization_state": initialization_state,
},
"degraded": degraded_metadata,
},
} }
activity.log_event( activity.log_event(
event_type="final_summary", event_type="final_summary",
severity="info", severity="info",
message="Daily workflow plan generated", message="Daily workflow plan generated",
payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", [])), "generation_path": generation_path}), payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", []))}),
run_id=run.id, run_id=run.id,
agent_type="TodayWorkflowGenerator", agent_type="TodayWorkflowGenerator",
) )
@@ -588,50 +519,83 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
return result return result
async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]: async def regenerate_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> DailyWorkflowPlan:
from starlette.concurrency import run_in_threadpool from starlette.concurrency import run_in_threadpool
date_str = date or _today_date_str() date_str = date or _today_date_str()
onboarding_status = _get_onboarding_status(user_id)
def _get_existing():
return ( existing = await run_in_threadpool(
lambda: (
db.query(DailyWorkflowPlan) db.query(DailyWorkflowPlan)
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str) .filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str)
.first() .first()
) )
)
existing = await run_in_threadpool(_get_existing)
existing_hash_status = {}
if existing: if existing:
return existing, False existing_tasks = await run_in_threadpool(
lambda: (
db.query(DailyWorkflowTask)
.filter(DailyWorkflowTask.plan_id == existing.id, DailyWorkflowTask.user_id == user_id)
.all()
)
)
for task in existing_tasks:
task_hash = _compute_task_hash(task.title, task.description)
existing_hash_status[task_hash] = {
"status": task.status,
"decided_at": task.decided_at,
"completion_notes": task.completion_notes,
}
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str) plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
tasks = plan_data.get("tasks", []) plan_data["onboarding_completed"] = onboarding_status["is_completed"]
plan_data["onboarding_completed_at"] = onboarding_status["completed_at"].isoformat() if onboarding_status["completed_at"] else None
def _create_plan(): tasks = plan_data.get("tasks", []) if isinstance(plan_data, dict) else []
plan = DailyWorkflowPlan(
user_id=user_id, def _replace_plan() -> DailyWorkflowPlan:
date=date_str, if existing:
source="agent", db.query(DailyWorkflowTask).filter(DailyWorkflowTask.plan_id == existing.id).delete(synchronize_session=False)
plan_json=plan_data, plan = existing
created_at=datetime.utcnow(), plan.source = "agent"
updated_at=datetime.utcnow(), plan.plan_json = plan_data
) plan.updated_at = datetime.utcnow()
db.add(plan) db.add(plan)
db.commit() db.commit()
db.refresh(plan) db.refresh(plan)
else:
plan = DailyWorkflowPlan(
user_id=user_id,
date=date_str,
source="agent",
plan_json=plan_data,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
db.add(plan)
db.commit()
db.refresh(plan)
for t in tasks: for t in tasks:
pillar_id = str(t.get("pillarId") or "").lower().strip() pillar_id = str(t.get("pillarId") or "").lower().strip()
if pillar_id not in PILLAR_IDS: if pillar_id not in PILLAR_IDS:
continue continue
title = str(t.get("title") or "Task").strip()[:255]
description = str(t.get("description") or "").strip()
task_hash = _compute_task_hash(title, description)
preserved = existing_hash_status.get(task_hash) or {}
task = DailyWorkflowTask( task = DailyWorkflowTask(
plan_id=plan.id, plan_id=plan.id,
user_id=user_id, user_id=user_id,
pillar_id=pillar_id, pillar_id=pillar_id,
title=str(t.get("title") or "Task").strip()[:255], title=title,
description=str(t.get("description") or "").strip(), description=description,
status=_coerce_status(t.get("status")), status=preserved.get("status") or _coerce_status(t.get("status")),
priority=_coerce_priority(t.get("priority")), priority=_coerce_priority(t.get("priority")),
estimated_time=int(t.get("estimatedTime") or 15), estimated_time=int(t.get("estimatedTime") or 15),
action_type=str(t.get("actionType") or "navigate").strip()[:20], action_type=str(t.get("actionType") or "navigate").strip()[:20],
@@ -641,14 +605,53 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
enabled=bool(t.get("enabled", True)), enabled=bool(t.get("enabled", True)),
created_at=datetime.utcnow(), created_at=datetime.utcnow(),
updated_at=datetime.utcnow(), updated_at=datetime.utcnow(),
decided_at=preserved.get("decided_at"),
completion_notes=preserved.get("completion_notes"),
) )
db.add(task) db.add(task)
db.commit() db.commit()
db.refresh(plan)
return plan return plan
plan = await run_in_threadpool(_create_plan) return await run_in_threadpool(_replace_plan)
return plan, True
async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]:
from starlette.concurrency import run_in_threadpool
date_str = date or _today_date_str()
existing = await run_in_threadpool(
lambda: (
db.query(DailyWorkflowPlan)
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str)
.first()
)
)
if existing:
metadata = _extract_plan_metadata(existing)
onboarding_status = _get_onboarding_status(user_id)
should_regenerate = False
if metadata["generation_mode"] == "fallback" and metadata["quality_score"] < FALLBACK_REGENERATION_QUALITY_THRESHOLD:
should_regenerate = True
if (
onboarding_status["is_completed"]
and not metadata["onboarding_completed"]
):
should_regenerate = True
if should_regenerate:
regenerated = await regenerate_daily_workflow_plan(db, user_id, date=date_str)
return regenerated, True
return existing, False
created = await regenerate_daily_workflow_plan(db, user_id, date=date_str)
return created, True
def update_task_status( def update_task_status(