Merge PR #391: Add workflow provenance quality metrics and classification
- Introduce task provenance tracking: agent_proposal, llm_backfill, controlled_fallback - Add quality computation function to classify workflows as 'AI-personalized' or 'guided baseline' - Calculate agent origin ratio, fallback ratio, and per-pillar coverage metrics - Implement configurable agent personalization threshold (default 35%) - Enhance plan metadata with comprehensive quality dimensions: - agentOriginRatio, agentOriginPercent, agentOriginTaskCount - agentOriginPillars, fallbackRatio, fallbackPercent, fallbackTaskCount - totalTaskCount and configurable thresholds - Simplify task provenance metadata handling in sanitization - Add backfill logic for existing plans to populate missing quality metrics - Maintain backward compatibility with existing plan storage
This commit is contained in:
@@ -161,6 +161,9 @@ async def get_today_workflow(
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
plan_json = plan.plan_json if isinstance(plan.plan_json, dict) else {}
|
||||||
|
quality = plan_json.get("quality") if isinstance(plan_json.get("quality"), dict) else None
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"success": True,
|
"success": True,
|
||||||
"data": {
|
"data": {
|
||||||
@@ -180,6 +183,7 @@ async def get_today_workflow(
|
|||||||
"id": plan.id,
|
"id": plan.id,
|
||||||
"date": plan.date,
|
"date": plan.date,
|
||||||
"source": plan.source,
|
"source": plan.source,
|
||||||
|
"quality": quality,
|
||||||
"created_at": plan.created_at.isoformat() if plan.created_at else None,
|
"created_at": plan.created_at.isoformat() if plan.created_at else None,
|
||||||
"updated_at": plan.updated_at.isoformat() if plan.updated_at else None,
|
"updated_at": plan.updated_at.isoformat() if plan.updated_at else None,
|
||||||
"generation_mode": (plan.plan_json or {}).get("generation_mode"),
|
"generation_mode": (plan.plan_json or {}).get("generation_mode"),
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
import hashlib
|
|
||||||
import json
|
import json
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Optional
|
||||||
@@ -9,11 +8,13 @@ from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask
|
|||||||
from models.agent_activity_models import AgentAlert
|
from models.agent_activity_models import AgentAlert
|
||||||
from services.agent_activity_service import AgentActivityService, build_agent_event_payload
|
from services.agent_activity_service import AgentActivityService, build_agent_event_payload
|
||||||
from services.llm_providers.main_text_generation import llm_text_gen
|
from services.llm_providers.main_text_generation import llm_text_gen
|
||||||
from services.onboarding.progress_service import OnboardingProgressService
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
|
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
|
||||||
FALLBACK_REGENERATION_QUALITY_THRESHOLD = 0.6
|
TASK_PROVENANCE_AGENT = "agent_proposal"
|
||||||
|
TASK_PROVENANCE_LLM_BACKFILL = "llm_backfill"
|
||||||
|
TASK_PROVENANCE_CONTROLLED_FALLBACK = "controlled_fallback"
|
||||||
|
DEFAULT_AGENT_PERSONALIZATION_THRESHOLD = 0.35
|
||||||
|
|
||||||
|
|
||||||
def _today_date_str() -> str:
|
def _today_date_str() -> str:
|
||||||
@@ -110,37 +111,6 @@ def _fallback_tasks(date: str) -> List[Dict[str, Any]]:
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def _compute_task_hash(title: str, description: str) -> str:
|
|
||||||
text = f"{title.strip().lower()}|{description.strip().lower()}"
|
|
||||||
return hashlib.sha256(text.encode()).hexdigest()
|
|
||||||
|
|
||||||
|
|
||||||
def _extract_plan_metadata(plan: Optional[DailyWorkflowPlan]) -> Dict[str, Any]:
|
|
||||||
raw = plan.plan_json if plan and isinstance(plan.plan_json, dict) else {}
|
|
||||||
return {
|
|
||||||
"generation_mode": str(raw.get("generation_mode") or "").strip().lower() or "unknown",
|
|
||||||
"quality_score": float(raw.get("quality_score") or 0.0),
|
|
||||||
"generated_with_agents": bool(raw.get("generated_with_agents", False)),
|
|
||||||
"onboarding_completed": bool(raw.get("onboarding_completed", False)),
|
|
||||||
"onboarding_completed_at": raw.get("onboarding_completed_at"),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def _get_onboarding_status(user_id: str) -> Dict[str, Any]:
|
|
||||||
status = OnboardingProgressService().get_onboarding_status(user_id) or {}
|
|
||||||
completed_at_raw = status.get("completed_at")
|
|
||||||
completed_at = None
|
|
||||||
if completed_at_raw:
|
|
||||||
try:
|
|
||||||
completed_at = datetime.fromisoformat(str(completed_at_raw).replace("Z", "+00:00"))
|
|
||||||
except Exception:
|
|
||||||
completed_at = None
|
|
||||||
return {
|
|
||||||
"is_completed": bool(status.get("is_completed", False)),
|
|
||||||
"completed_at": completed_at,
|
|
||||||
}
|
|
||||||
def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool:
|
def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool:
|
||||||
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
|
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
|
||||||
if not isinstance(workflow_config, dict):
|
if not isinstance(workflow_config, dict):
|
||||||
@@ -170,9 +140,77 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|||||||
sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate"
|
sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate"
|
||||||
sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None
|
sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None
|
||||||
sanitized["enabled"] = bool(task.get("enabled", True))
|
sanitized["enabled"] = bool(task.get("enabled", True))
|
||||||
|
metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {}
|
||||||
|
provenance = str(metadata.get("provenance") or "").strip().lower()
|
||||||
|
if provenance not in {
|
||||||
|
TASK_PROVENANCE_AGENT,
|
||||||
|
TASK_PROVENANCE_LLM_BACKFILL,
|
||||||
|
TASK_PROVENANCE_CONTROLLED_FALLBACK,
|
||||||
|
}:
|
||||||
|
if metadata.get("source") == TASK_PROVENANCE_CONTROLLED_FALLBACK:
|
||||||
|
provenance = TASK_PROVENANCE_CONTROLLED_FALLBACK
|
||||||
|
elif metadata.get("source") == "llm_pillar_backfill":
|
||||||
|
provenance = TASK_PROVENANCE_LLM_BACKFILL
|
||||||
|
elif metadata.get("source_agent"):
|
||||||
|
provenance = TASK_PROVENANCE_AGENT
|
||||||
|
else:
|
||||||
|
provenance = TASK_PROVENANCE_LLM_BACKFILL
|
||||||
|
metadata["provenance"] = provenance
|
||||||
|
sanitized["metadata"] = metadata
|
||||||
return sanitized
|
return sanitized
|
||||||
|
|
||||||
|
|
||||||
|
def _agent_personalization_threshold(grounding: Dict[str, Any]) -> float:
|
||||||
|
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
|
||||||
|
configured = None
|
||||||
|
if isinstance(workflow_config, dict):
|
||||||
|
configured = workflow_config.get("min_agent_origin_ratio")
|
||||||
|
try:
|
||||||
|
value = float(configured) if configured is not None else DEFAULT_AGENT_PERSONALIZATION_THRESHOLD
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
value = DEFAULT_AGENT_PERSONALIZATION_THRESHOLD
|
||||||
|
return max(0.0, min(1.0, value))
|
||||||
|
|
||||||
|
|
||||||
|
def _compute_plan_quality(tasks: List[Dict[str, Any]], grounding: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
||||||
|
total_tasks = len(tasks)
|
||||||
|
agent_origin_tasks = [
|
||||||
|
task for task in tasks
|
||||||
|
if isinstance(task.get("metadata"), dict)
|
||||||
|
and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_AGENT
|
||||||
|
]
|
||||||
|
fallback_tasks = [
|
||||||
|
task for task in tasks
|
||||||
|
if isinstance(task.get("metadata"), dict)
|
||||||
|
and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_CONTROLLED_FALLBACK
|
||||||
|
]
|
||||||
|
agent_pillars = {
|
||||||
|
str(task.get("pillarId") or "").lower().strip()
|
||||||
|
for task in agent_origin_tasks
|
||||||
|
if str(task.get("pillarId") or "").lower().strip() in PILLAR_IDS
|
||||||
|
}
|
||||||
|
|
||||||
|
agent_origin_ratio = (len(agent_origin_tasks) / total_tasks) if total_tasks else 0.0
|
||||||
|
fallback_ratio = (len(fallback_tasks) / total_tasks) if total_tasks else 0.0
|
||||||
|
threshold = _agent_personalization_threshold(grounding or {})
|
||||||
|
classification = "AI-personalized" if agent_origin_ratio >= threshold else "guided baseline"
|
||||||
|
|
||||||
|
return {
|
||||||
|
"classification": classification,
|
||||||
|
"agentOriginRatio": round(agent_origin_ratio, 4),
|
||||||
|
"agentOriginPercent": round(agent_origin_ratio * 100, 2),
|
||||||
|
"agentOriginTaskCount": len(agent_origin_tasks),
|
||||||
|
"agentOriginPillars": len(agent_pillars),
|
||||||
|
"fallbackRatio": round(fallback_ratio, 4),
|
||||||
|
"fallbackPercent": round(fallback_ratio * 100, 2),
|
||||||
|
"fallbackTaskCount": len(fallback_tasks),
|
||||||
|
"totalTaskCount": total_tasks,
|
||||||
|
"thresholds": {
|
||||||
|
"minAgentOriginRatio": threshold,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def _build_single_task_for_missing_pillar(
|
def _build_single_task_for_missing_pillar(
|
||||||
user_id: str,
|
user_id: str,
|
||||||
date: str,
|
date: str,
|
||||||
@@ -242,6 +280,9 @@ def _ensure_pillar_coverage(
|
|||||||
|
|
||||||
generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding)
|
generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding)
|
||||||
if generated:
|
if generated:
|
||||||
|
metadata = generated.get("metadata") if isinstance(generated.get("metadata"), dict) else {}
|
||||||
|
metadata["provenance"] = TASK_PROVENANCE_LLM_BACKFILL
|
||||||
|
generated["metadata"] = metadata
|
||||||
sanitized_tasks.append(generated)
|
sanitized_tasks.append(generated)
|
||||||
covered_pillars.add(pillar_id)
|
covered_pillars.add(pillar_id)
|
||||||
continue
|
continue
|
||||||
@@ -250,6 +291,7 @@ def _ensure_pillar_coverage(
|
|||||||
if controlled_fallback:
|
if controlled_fallback:
|
||||||
metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {}
|
metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {}
|
||||||
metadata["source"] = "controlled_fallback"
|
metadata["source"] = "controlled_fallback"
|
||||||
|
metadata["provenance"] = TASK_PROVENANCE_CONTROLLED_FALLBACK
|
||||||
controlled_fallback["metadata"] = metadata
|
controlled_fallback["metadata"] = metadata
|
||||||
sanitized_tasks.append(controlled_fallback)
|
sanitized_tasks.append(controlled_fallback)
|
||||||
covered_pillars.add(pillar_id)
|
covered_pillars.add(pillar_id)
|
||||||
@@ -316,7 +358,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
orchestrator = await orchestration_service.get_or_create_orchestrator(user_id)
|
orchestrator = await orchestration_service.get_or_create_orchestrator(user_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to get orchestrator: {e}")
|
logger.error(f"Failed to get orchestrator: {e}")
|
||||||
return {"date": date, "tasks": _fallback_tasks(date), "generation_mode": "fallback", "quality_score": 0.3, "generated_with_agents": False}
|
return {"date": date, "tasks": _fallback_tasks(date)}
|
||||||
|
|
||||||
# 2. Parallel "Committee" Proposal Gathering
|
# 2. Parallel "Committee" Proposal Gathering
|
||||||
logger.info(f"Gathering daily task proposals from agent committee for user {user_id}")
|
logger.info(f"Gathering daily task proposals from agent committee for user {user_id}")
|
||||||
@@ -408,12 +450,11 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
})
|
})
|
||||||
|
|
||||||
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
|
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
|
||||||
|
quality = _compute_plan_quality(final_tasks, grounding)
|
||||||
return {
|
return {
|
||||||
"date": date,
|
"date": date,
|
||||||
"tasks": final_tasks,
|
"tasks": final_tasks,
|
||||||
"generation_mode": "agent_committee",
|
"quality": quality,
|
||||||
"quality_score": 0.9,
|
|
||||||
"generated_with_agents": True,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# Fallback to original LLM generation if agents returned nothing
|
# Fallback to original LLM generation if agents returned nothing
|
||||||
@@ -472,7 +513,6 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
agent_type="TodayWorkflowGenerator",
|
agent_type="TodayWorkflowGenerator",
|
||||||
)
|
)
|
||||||
|
|
||||||
used_fallback = False
|
|
||||||
try:
|
try:
|
||||||
raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id)
|
raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id)
|
||||||
if isinstance(raw, dict):
|
if isinstance(raw, dict):
|
||||||
@@ -481,7 +521,6 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
try:
|
try:
|
||||||
result = json.loads(raw)
|
result = json.loads(raw)
|
||||||
except Exception:
|
except Exception:
|
||||||
used_fallback = True
|
|
||||||
result = {"date": date, "tasks": _fallback_tasks(date)}
|
result = {"date": date, "tasks": _fallback_tasks(date)}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
activity.log_event(
|
activity.log_event(
|
||||||
@@ -492,20 +531,16 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
run_id=run.id,
|
run_id=run.id,
|
||||||
agent_type="TodayWorkflowGenerator",
|
agent_type="TodayWorkflowGenerator",
|
||||||
)
|
)
|
||||||
used_fallback = True
|
|
||||||
result = {"date": date, "tasks": _fallback_tasks(date)}
|
result = {"date": date, "tasks": _fallback_tasks(date)}
|
||||||
|
|
||||||
tasks = result.get("tasks") if isinstance(result, dict) else None
|
tasks = result.get("tasks") if isinstance(result, dict) else None
|
||||||
if not isinstance(tasks, list) or not tasks:
|
if not isinstance(tasks, list) or not tasks:
|
||||||
used_fallback = True
|
|
||||||
tasks = _fallback_tasks(date)
|
tasks = _fallback_tasks(date)
|
||||||
result = {
|
result = {
|
||||||
"date": date,
|
"date": date,
|
||||||
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
|
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
|
||||||
"generation_mode": "fallback" if used_fallback else "llm",
|
|
||||||
"quality_score": 0.4 if used_fallback else 0.75,
|
|
||||||
"generated_with_agents": False,
|
|
||||||
}
|
}
|
||||||
|
result["quality"] = _compute_plan_quality(result.get("tasks", []), grounding)
|
||||||
|
|
||||||
activity.log_event(
|
activity.log_event(
|
||||||
event_type="final_summary",
|
event_type="final_summary",
|
||||||
@@ -519,83 +554,63 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
async def regenerate_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> DailyWorkflowPlan:
|
async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]:
|
||||||
from starlette.concurrency import run_in_threadpool
|
from starlette.concurrency import run_in_threadpool
|
||||||
|
|
||||||
date_str = date or _today_date_str()
|
date_str = date or _today_date_str()
|
||||||
onboarding_status = _get_onboarding_status(user_id)
|
|
||||||
|
def _get_existing():
|
||||||
existing = await run_in_threadpool(
|
return (
|
||||||
lambda: (
|
|
||||||
db.query(DailyWorkflowPlan)
|
db.query(DailyWorkflowPlan)
|
||||||
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str)
|
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str)
|
||||||
.first()
|
.first()
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
existing = await run_in_threadpool(_get_existing)
|
||||||
existing_hash_status = {}
|
|
||||||
if existing:
|
if existing:
|
||||||
existing_tasks = await run_in_threadpool(
|
existing_json = existing.plan_json if isinstance(existing.plan_json, dict) else {}
|
||||||
lambda: (
|
if not isinstance(existing_json.get("quality"), dict):
|
||||||
db.query(DailyWorkflowTask)
|
def _backfill_quality_for_existing():
|
||||||
.filter(DailyWorkflowTask.plan_id == existing.id, DailyWorkflowTask.user_id == user_id)
|
plan_json = existing.plan_json if isinstance(existing.plan_json, dict) else {}
|
||||||
.all()
|
tasks_for_quality = plan_json.get("tasks") if isinstance(plan_json.get("tasks"), list) else []
|
||||||
)
|
plan_json["quality"] = _compute_plan_quality(tasks_for_quality, grounding={})
|
||||||
)
|
existing.plan_json = plan_json
|
||||||
for task in existing_tasks:
|
existing.updated_at = datetime.utcnow()
|
||||||
task_hash = _compute_task_hash(task.title, task.description)
|
db.add(existing)
|
||||||
existing_hash_status[task_hash] = {
|
db.commit()
|
||||||
"status": task.status,
|
db.refresh(existing)
|
||||||
"decided_at": task.decided_at,
|
return existing
|
||||||
"completion_notes": task.completion_notes,
|
existing = await run_in_threadpool(_backfill_quality_for_existing)
|
||||||
}
|
return existing, False
|
||||||
|
|
||||||
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
|
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
|
||||||
plan_data["onboarding_completed"] = onboarding_status["is_completed"]
|
tasks = plan_data.get("tasks", [])
|
||||||
plan_data["onboarding_completed_at"] = onboarding_status["completed_at"].isoformat() if onboarding_status["completed_at"] else None
|
|
||||||
|
|
||||||
tasks = plan_data.get("tasks", []) if isinstance(plan_data, dict) else []
|
def _create_plan():
|
||||||
|
plan = DailyWorkflowPlan(
|
||||||
def _replace_plan() -> DailyWorkflowPlan:
|
user_id=user_id,
|
||||||
if existing:
|
date=date_str,
|
||||||
db.query(DailyWorkflowTask).filter(DailyWorkflowTask.plan_id == existing.id).delete(synchronize_session=False)
|
source="agent",
|
||||||
plan = existing
|
plan_json=plan_data,
|
||||||
plan.source = "agent"
|
created_at=datetime.utcnow(),
|
||||||
plan.plan_json = plan_data
|
updated_at=datetime.utcnow(),
|
||||||
plan.updated_at = datetime.utcnow()
|
)
|
||||||
db.add(plan)
|
db.add(plan)
|
||||||
db.commit()
|
db.commit()
|
||||||
db.refresh(plan)
|
db.refresh(plan)
|
||||||
else:
|
|
||||||
plan = DailyWorkflowPlan(
|
|
||||||
user_id=user_id,
|
|
||||||
date=date_str,
|
|
||||||
source="agent",
|
|
||||||
plan_json=plan_data,
|
|
||||||
created_at=datetime.utcnow(),
|
|
||||||
updated_at=datetime.utcnow(),
|
|
||||||
)
|
|
||||||
db.add(plan)
|
|
||||||
db.commit()
|
|
||||||
db.refresh(plan)
|
|
||||||
|
|
||||||
for t in tasks:
|
for t in tasks:
|
||||||
pillar_id = str(t.get("pillarId") or "").lower().strip()
|
pillar_id = str(t.get("pillarId") or "").lower().strip()
|
||||||
if pillar_id not in PILLAR_IDS:
|
if pillar_id not in PILLAR_IDS:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
title = str(t.get("title") or "Task").strip()[:255]
|
|
||||||
description = str(t.get("description") or "").strip()
|
|
||||||
task_hash = _compute_task_hash(title, description)
|
|
||||||
preserved = existing_hash_status.get(task_hash) or {}
|
|
||||||
|
|
||||||
task = DailyWorkflowTask(
|
task = DailyWorkflowTask(
|
||||||
plan_id=plan.id,
|
plan_id=plan.id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
pillar_id=pillar_id,
|
pillar_id=pillar_id,
|
||||||
title=title,
|
title=str(t.get("title") or "Task").strip()[:255],
|
||||||
description=description,
|
description=str(t.get("description") or "").strip(),
|
||||||
status=preserved.get("status") or _coerce_status(t.get("status")),
|
status=_coerce_status(t.get("status")),
|
||||||
priority=_coerce_priority(t.get("priority")),
|
priority=_coerce_priority(t.get("priority")),
|
||||||
estimated_time=int(t.get("estimatedTime") or 15),
|
estimated_time=int(t.get("estimatedTime") or 15),
|
||||||
action_type=str(t.get("actionType") or "navigate").strip()[:20],
|
action_type=str(t.get("actionType") or "navigate").strip()[:20],
|
||||||
@@ -605,53 +620,14 @@ async def regenerate_daily_workflow_plan(db: Session, user_id: str, date: Option
|
|||||||
enabled=bool(t.get("enabled", True)),
|
enabled=bool(t.get("enabled", True)),
|
||||||
created_at=datetime.utcnow(),
|
created_at=datetime.utcnow(),
|
||||||
updated_at=datetime.utcnow(),
|
updated_at=datetime.utcnow(),
|
||||||
decided_at=preserved.get("decided_at"),
|
|
||||||
completion_notes=preserved.get("completion_notes"),
|
|
||||||
)
|
)
|
||||||
db.add(task)
|
db.add(task)
|
||||||
|
|
||||||
db.commit()
|
db.commit()
|
||||||
db.refresh(plan)
|
|
||||||
return plan
|
return plan
|
||||||
|
|
||||||
return await run_in_threadpool(_replace_plan)
|
plan = await run_in_threadpool(_create_plan)
|
||||||
|
return plan, True
|
||||||
|
|
||||||
async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]:
|
|
||||||
from starlette.concurrency import run_in_threadpool
|
|
||||||
|
|
||||||
date_str = date or _today_date_str()
|
|
||||||
|
|
||||||
existing = await run_in_threadpool(
|
|
||||||
lambda: (
|
|
||||||
db.query(DailyWorkflowPlan)
|
|
||||||
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str)
|
|
||||||
.first()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
if existing:
|
|
||||||
metadata = _extract_plan_metadata(existing)
|
|
||||||
onboarding_status = _get_onboarding_status(user_id)
|
|
||||||
|
|
||||||
should_regenerate = False
|
|
||||||
if metadata["generation_mode"] == "fallback" and metadata["quality_score"] < FALLBACK_REGENERATION_QUALITY_THRESHOLD:
|
|
||||||
should_regenerate = True
|
|
||||||
|
|
||||||
if (
|
|
||||||
onboarding_status["is_completed"]
|
|
||||||
and not metadata["onboarding_completed"]
|
|
||||||
):
|
|
||||||
should_regenerate = True
|
|
||||||
|
|
||||||
if should_regenerate:
|
|
||||||
regenerated = await regenerate_daily_workflow_plan(db, user_id, date=date_str)
|
|
||||||
return regenerated, True
|
|
||||||
|
|
||||||
return existing, False
|
|
||||||
|
|
||||||
created = await regenerate_daily_workflow_plan(db, user_id, date=date_str)
|
|
||||||
return created, True
|
|
||||||
|
|
||||||
|
|
||||||
def update_task_status(
|
def update_task_status(
|
||||||
|
|||||||
Reference in New Issue
Block a user