Merge PR #392: Add contextuality validation and low-context workflow status
- Replace provenance-based quality with contextuality validation framework - Add evidence link tracking system (onboarding:key and alert:id formats) - Implement plan contextuality validation function with configurable thresholds - Calculate task-level context scores based on evidence link density - Define contextual workflows (>65% threshold) vs low-context workflows (<65%) - Add validation in plan persistence layer before database commit - Integrate contextuality metrics into release readiness checks - Add recovery strategies for low-context workflows (regeneration with guidance) - Track evidence link validity against grounding context (onboarding data, alerts) - Provide detailed contextuality reports in quality assessments - Maintain backward compatibility while enabling contextual workflow detection
This commit is contained in:
@@ -1,14 +1,13 @@
|
|||||||
from fastapi import APIRouter, Depends, HTTPException
|
from fastapi import APIRouter, Depends, HTTPException
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime
|
||||||
from collections import defaultdict, deque
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from middleware.auth_middleware import get_current_user
|
from middleware.auth_middleware import get_current_user
|
||||||
from services.database import get_db
|
from services.database import get_db
|
||||||
from services.today_workflow_service import get_or_create_daily_workflow_plan, regenerate_daily_workflow_plan, update_task_status
|
from services.today_workflow_service import get_or_create_daily_workflow_plan, update_task_status
|
||||||
from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask
|
from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask
|
||||||
import asyncio
|
import asyncio
|
||||||
from services.intelligence.txtai_service import TxtaiIntelligenceService
|
from services.intelligence.txtai_service import TxtaiIntelligenceService
|
||||||
@@ -16,27 +15,6 @@ from services.intelligence.txtai_service import TxtaiIntelligenceService
|
|||||||
|
|
||||||
router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"])
|
router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"])
|
||||||
|
|
||||||
REGENERATE_WINDOW_SECONDS = 60
|
|
||||||
REGENERATE_MAX_REQUESTS_PER_WINDOW = 3
|
|
||||||
_regen_request_log: dict[str, deque[float]] = defaultdict(deque)
|
|
||||||
|
|
||||||
|
|
||||||
def _check_regenerate_rate_limit(user_id: str) -> None:
|
|
||||||
import time
|
|
||||||
|
|
||||||
now = time.time()
|
|
||||||
window_start = now - REGENERATE_WINDOW_SECONDS
|
|
||||||
history = _regen_request_log[user_id]
|
|
||||||
|
|
||||||
while history and history[0] < window_start:
|
|
||||||
history.popleft()
|
|
||||||
|
|
||||||
if len(history) >= REGENERATE_MAX_REQUESTS_PER_WINDOW:
|
|
||||||
raise HTTPException(status_code=429, detail="Regeneration rate limit exceeded")
|
|
||||||
|
|
||||||
history.append(now)
|
|
||||||
|
|
||||||
|
|
||||||
async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str):
|
async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str):
|
||||||
svc = TxtaiIntelligenceService(user_id)
|
svc = TxtaiIntelligenceService(user_id)
|
||||||
items = []
|
items = []
|
||||||
@@ -161,9 +139,6 @@ async def get_today_workflow(
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
plan_json = plan.plan_json if isinstance(plan.plan_json, dict) else {}
|
|
||||||
quality = plan_json.get("quality") if isinstance(plan_json.get("quality"), dict) else None
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"success": True,
|
"success": True,
|
||||||
"data": {
|
"data": {
|
||||||
@@ -183,12 +158,10 @@ async def get_today_workflow(
|
|||||||
"id": plan.id,
|
"id": plan.id,
|
||||||
"date": plan.date,
|
"date": plan.date,
|
||||||
"source": plan.source,
|
"source": plan.source,
|
||||||
"quality": quality,
|
"quality_status": (plan.plan_json or {}).get("quality_status", "contextual"),
|
||||||
|
"contextuality_validation": (plan.plan_json or {}).get("contextuality_validation"),
|
||||||
"created_at": plan.created_at.isoformat() if plan.created_at else None,
|
"created_at": plan.created_at.isoformat() if plan.created_at else None,
|
||||||
"updated_at": plan.updated_at.isoformat() if plan.updated_at else None,
|
"updated_at": plan.updated_at.isoformat() if plan.updated_at else None,
|
||||||
"generation_mode": (plan.plan_json or {}).get("generation_mode"),
|
|
||||||
"quality_score": (plan.plan_json or {}).get("quality_score"),
|
|
||||||
"generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"),
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"timestamp": datetime.utcnow().isoformat(),
|
"timestamp": datetime.utcnow().isoformat(),
|
||||||
@@ -196,67 +169,6 @@ async def get_today_workflow(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@router.post("/regenerate")
|
|
||||||
async def regenerate_today_workflow(
|
|
||||||
date: Optional[str] = None,
|
|
||||||
current_user: dict = Depends(get_current_user),
|
|
||||||
db: Session = Depends(get_db),
|
|
||||||
) -> Dict[str, Any]:
|
|
||||||
from starlette.concurrency import run_in_threadpool
|
|
||||||
|
|
||||||
user_id = str(current_user.get("id"))
|
|
||||||
_check_regenerate_rate_limit(user_id)
|
|
||||||
|
|
||||||
plan = await regenerate_daily_workflow_plan(db, user_id, date=date)
|
|
||||||
|
|
||||||
tasks = await run_in_threadpool(
|
|
||||||
lambda: (
|
|
||||||
db.query(DailyWorkflowTask)
|
|
||||||
.filter(DailyWorkflowTask.plan_id == plan.id, DailyWorkflowTask.user_id == user_id)
|
|
||||||
.order_by(DailyWorkflowTask.created_at.asc())
|
|
||||||
.all()
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
response_tasks = [
|
|
||||||
{
|
|
||||||
"id": str(t.id),
|
|
||||||
"pillarId": t.pillar_id,
|
|
||||||
"title": t.title,
|
|
||||||
"description": t.description,
|
|
||||||
"status": "skipped" if t.status == "dismissed" else t.status,
|
|
||||||
"priority": t.priority,
|
|
||||||
"estimatedTime": t.estimated_time,
|
|
||||||
"dependencies": t.dependencies or [],
|
|
||||||
"actionUrl": t.action_url,
|
|
||||||
"actionType": t.action_type,
|
|
||||||
"metadata": t.metadata_json or {},
|
|
||||||
"enabled": bool(t.enabled),
|
|
||||||
}
|
|
||||||
for t in tasks
|
|
||||||
]
|
|
||||||
|
|
||||||
asyncio.create_task(_index_tasks_to_sif(user_id, plan.date, response_tasks, label="today_regenerated"))
|
|
||||||
|
|
||||||
return {
|
|
||||||
"success": True,
|
|
||||||
"data": {
|
|
||||||
"plan": {
|
|
||||||
"id": plan.id,
|
|
||||||
"date": plan.date,
|
|
||||||
"source": plan.source,
|
|
||||||
"generation_mode": (plan.plan_json or {}).get("generation_mode"),
|
|
||||||
"quality_score": (plan.plan_json or {}).get("quality_score"),
|
|
||||||
"generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"),
|
|
||||||
"regenerated_at": datetime.now(timezone.utc).isoformat(),
|
|
||||||
},
|
|
||||||
"tasks": response_tasks,
|
|
||||||
},
|
|
||||||
"timestamp": datetime.utcnow().isoformat(),
|
|
||||||
"user_id": user_id,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
from services.task_memory_service import TaskMemoryService
|
from services.task_memory_service import TaskMemoryService
|
||||||
|
|
||||||
@router.post("/tasks/{task_id}/status")
|
@router.post("/tasks/{task_id}/status")
|
||||||
|
|||||||
@@ -11,10 +11,8 @@ from services.llm_providers.main_text_generation import llm_text_gen
|
|||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
|
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
|
||||||
TASK_PROVENANCE_AGENT = "agent_proposal"
|
MIN_TASK_EVIDENCE_LINKS = 1
|
||||||
TASK_PROVENANCE_LLM_BACKFILL = "llm_backfill"
|
PLAN_CONTEXT_THRESHOLD = 0.65
|
||||||
TASK_PROVENANCE_CONTROLLED_FALLBACK = "controlled_fallback"
|
|
||||||
DEFAULT_AGENT_PERSONALIZATION_THRESHOLD = 0.35
|
|
||||||
|
|
||||||
|
|
||||||
def _today_date_str() -> str:
|
def _today_date_str() -> str:
|
||||||
@@ -140,74 +138,116 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|||||||
sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate"
|
sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate"
|
||||||
sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None
|
sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None
|
||||||
sanitized["enabled"] = bool(task.get("enabled", True))
|
sanitized["enabled"] = bool(task.get("enabled", True))
|
||||||
metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {}
|
|
||||||
provenance = str(metadata.get("provenance") or "").strip().lower()
|
|
||||||
if provenance not in {
|
|
||||||
TASK_PROVENANCE_AGENT,
|
|
||||||
TASK_PROVENANCE_LLM_BACKFILL,
|
|
||||||
TASK_PROVENANCE_CONTROLLED_FALLBACK,
|
|
||||||
}:
|
|
||||||
if metadata.get("source") == TASK_PROVENANCE_CONTROLLED_FALLBACK:
|
|
||||||
provenance = TASK_PROVENANCE_CONTROLLED_FALLBACK
|
|
||||||
elif metadata.get("source") == "llm_pillar_backfill":
|
|
||||||
provenance = TASK_PROVENANCE_LLM_BACKFILL
|
|
||||||
elif metadata.get("source_agent"):
|
|
||||||
provenance = TASK_PROVENANCE_AGENT
|
|
||||||
else:
|
|
||||||
provenance = TASK_PROVENANCE_LLM_BACKFILL
|
|
||||||
metadata["provenance"] = provenance
|
|
||||||
sanitized["metadata"] = metadata
|
|
||||||
return sanitized
|
return sanitized
|
||||||
|
|
||||||
|
|
||||||
def _agent_personalization_threshold(grounding: Dict[str, Any]) -> float:
|
def _derive_onboarding_evidence_links(onboarding_data: Dict[str, Any], limit: int = 2) -> List[str]:
|
||||||
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
|
if not isinstance(onboarding_data, dict):
|
||||||
configured = None
|
return []
|
||||||
if isinstance(workflow_config, dict):
|
|
||||||
configured = workflow_config.get("min_agent_origin_ratio")
|
links: List[str] = []
|
||||||
try:
|
for key, value in onboarding_data.items():
|
||||||
value = float(configured) if configured is not None else DEFAULT_AGENT_PERSONALIZATION_THRESHOLD
|
if key == "workflow_config":
|
||||||
except (TypeError, ValueError):
|
continue
|
||||||
value = DEFAULT_AGENT_PERSONALIZATION_THRESHOLD
|
if value in (None, "", [], {}):
|
||||||
return max(0.0, min(1.0, value))
|
continue
|
||||||
|
links.append(f"onboarding:{key}")
|
||||||
|
if len(links) >= limit:
|
||||||
|
break
|
||||||
|
return links
|
||||||
|
|
||||||
|
|
||||||
def _compute_plan_quality(tasks: List[Dict[str, Any]], grounding: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
def _valid_evidence_links(evidence_links: Any, grounding: Dict[str, Any]) -> List[str]:
|
||||||
total_tasks = len(tasks)
|
if not isinstance(evidence_links, list):
|
||||||
agent_origin_tasks = [
|
return []
|
||||||
task for task in tasks
|
|
||||||
if isinstance(task.get("metadata"), dict)
|
onboarding_data = grounding.get("onboarding_data", {}) if isinstance(grounding, dict) else {}
|
||||||
and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_AGENT
|
if not isinstance(onboarding_data, dict):
|
||||||
]
|
onboarding_data = {}
|
||||||
fallback_tasks = [
|
valid_onboarding_keys = {str(k) for k in onboarding_data.keys()}
|
||||||
task for task in tasks
|
|
||||||
if isinstance(task.get("metadata"), dict)
|
recent_alerts = grounding.get("recent_agent_alerts", []) if isinstance(grounding, dict) else []
|
||||||
and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_CONTROLLED_FALLBACK
|
valid_alert_ids = {
|
||||||
]
|
str(a.get("alert_id"))
|
||||||
agent_pillars = {
|
for a in recent_alerts
|
||||||
str(task.get("pillarId") or "").lower().strip()
|
if isinstance(a, dict) and a.get("alert_id") is not None
|
||||||
for task in agent_origin_tasks
|
|
||||||
if str(task.get("pillarId") or "").lower().strip() in PILLAR_IDS
|
|
||||||
}
|
}
|
||||||
|
|
||||||
agent_origin_ratio = (len(agent_origin_tasks) / total_tasks) if total_tasks else 0.0
|
valid_links: List[str] = []
|
||||||
fallback_ratio = (len(fallback_tasks) / total_tasks) if total_tasks else 0.0
|
for raw in evidence_links:
|
||||||
threshold = _agent_personalization_threshold(grounding or {})
|
link = str(raw or "").strip()
|
||||||
classification = "AI-personalized" if agent_origin_ratio >= threshold else "guided baseline"
|
if not link:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if link.startswith("onboarding:"):
|
||||||
|
key = link.split(":", 1)[1].strip()
|
||||||
|
if key and key in valid_onboarding_keys:
|
||||||
|
valid_links.append(link)
|
||||||
|
elif link.startswith("alert:"):
|
||||||
|
alert_id = link.split(":", 1)[1].strip()
|
||||||
|
if alert_id and alert_id in valid_alert_ids:
|
||||||
|
valid_links.append(link)
|
||||||
|
|
||||||
|
return valid_links
|
||||||
|
|
||||||
|
|
||||||
|
def validate_plan_contextuality(plan: Dict[str, Any], grounding: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
tasks = plan.get("tasks") if isinstance(plan, dict) else None
|
||||||
|
if not isinstance(tasks, list) or not tasks:
|
||||||
|
return {
|
||||||
|
"score": 0.0,
|
||||||
|
"threshold": PLAN_CONTEXT_THRESHOLD,
|
||||||
|
"is_contextual": False,
|
||||||
|
"task_scores": [],
|
||||||
|
"tasks_below_min_evidence": 0,
|
||||||
|
"min_evidence_links": MIN_TASK_EVIDENCE_LINKS,
|
||||||
|
}
|
||||||
|
|
||||||
|
task_scores = []
|
||||||
|
below_min_evidence = 0
|
||||||
|
|
||||||
|
for idx, task in enumerate(tasks):
|
||||||
|
metadata = task.get("metadata") if isinstance(task, dict) else {}
|
||||||
|
metadata = metadata if isinstance(metadata, dict) else {}
|
||||||
|
evidence_links = _valid_evidence_links(metadata.get("evidence_links"), grounding)
|
||||||
|
has_min_evidence = len(evidence_links) >= MIN_TASK_EVIDENCE_LINKS
|
||||||
|
if not has_min_evidence:
|
||||||
|
below_min_evidence += 1
|
||||||
|
|
||||||
|
reasoning_text = str(metadata.get("reasoning") or task.get("description") or "").lower()
|
||||||
|
onboarding_hits = sum(1 for l in evidence_links if l.startswith("onboarding:"))
|
||||||
|
alert_hits = sum(1 for l in evidence_links if l.startswith("alert:"))
|
||||||
|
|
||||||
|
score = 0.0
|
||||||
|
if has_min_evidence:
|
||||||
|
score += 0.6
|
||||||
|
if onboarding_hits > 0:
|
||||||
|
score += 0.2
|
||||||
|
if alert_hits > 0:
|
||||||
|
score += 0.2
|
||||||
|
elif "alert" in reasoning_text:
|
||||||
|
score += 0.1
|
||||||
|
|
||||||
|
task_scores.append(
|
||||||
|
{
|
||||||
|
"task_index": idx,
|
||||||
|
"pillarId": task.get("pillarId"),
|
||||||
|
"title": task.get("title"),
|
||||||
|
"score": min(score, 1.0),
|
||||||
|
"evidence_links": evidence_links,
|
||||||
|
"has_min_evidence": has_min_evidence,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
plan_score = sum(t["score"] for t in task_scores) / len(task_scores)
|
||||||
|
is_contextual = plan_score >= PLAN_CONTEXT_THRESHOLD and below_min_evidence == 0
|
||||||
return {
|
return {
|
||||||
"classification": classification,
|
"score": round(plan_score, 3),
|
||||||
"agentOriginRatio": round(agent_origin_ratio, 4),
|
"threshold": PLAN_CONTEXT_THRESHOLD,
|
||||||
"agentOriginPercent": round(agent_origin_ratio * 100, 2),
|
"is_contextual": is_contextual,
|
||||||
"agentOriginTaskCount": len(agent_origin_tasks),
|
"task_scores": task_scores,
|
||||||
"agentOriginPillars": len(agent_pillars),
|
"tasks_below_min_evidence": below_min_evidence,
|
||||||
"fallbackRatio": round(fallback_ratio, 4),
|
"min_evidence_links": MIN_TASK_EVIDENCE_LINKS,
|
||||||
"fallbackPercent": round(fallback_ratio * 100, 2),
|
|
||||||
"fallbackTaskCount": len(fallback_tasks),
|
|
||||||
"totalTaskCount": total_tasks,
|
|
||||||
"thresholds": {
|
|
||||||
"minAgentOriginRatio": threshold,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -280,9 +320,6 @@ def _ensure_pillar_coverage(
|
|||||||
|
|
||||||
generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding)
|
generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding)
|
||||||
if generated:
|
if generated:
|
||||||
metadata = generated.get("metadata") if isinstance(generated.get("metadata"), dict) else {}
|
|
||||||
metadata["provenance"] = TASK_PROVENANCE_LLM_BACKFILL
|
|
||||||
generated["metadata"] = metadata
|
|
||||||
sanitized_tasks.append(generated)
|
sanitized_tasks.append(generated)
|
||||||
covered_pillars.add(pillar_id)
|
covered_pillars.add(pillar_id)
|
||||||
continue
|
continue
|
||||||
@@ -291,7 +328,6 @@ def _ensure_pillar_coverage(
|
|||||||
if controlled_fallback:
|
if controlled_fallback:
|
||||||
metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {}
|
metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {}
|
||||||
metadata["source"] = "controlled_fallback"
|
metadata["source"] = "controlled_fallback"
|
||||||
metadata["provenance"] = TASK_PROVENANCE_CONTROLLED_FALLBACK
|
|
||||||
controlled_fallback["metadata"] = metadata
|
controlled_fallback["metadata"] = metadata
|
||||||
sanitized_tasks.append(controlled_fallback)
|
sanitized_tasks.append(controlled_fallback)
|
||||||
covered_pillars.add(pillar_id)
|
covered_pillars.add(pillar_id)
|
||||||
@@ -329,6 +365,7 @@ def build_grounding_context(db: Session, user_id: str, date: str) -> Dict[str, A
|
|||||||
return {
|
return {
|
||||||
"recent_agent_alerts": [
|
"recent_agent_alerts": [
|
||||||
{
|
{
|
||||||
|
"alert_id": a.id,
|
||||||
"title": a.title,
|
"title": a.title,
|
||||||
"message": a.message,
|
"message": a.message,
|
||||||
"created_at": a.created_at.isoformat(),
|
"created_at": a.created_at.isoformat(),
|
||||||
@@ -348,9 +385,15 @@ from services.task_memory_service import TaskMemoryService
|
|||||||
# Initialize orchestration service (singleton)
|
# Initialize orchestration service (singleton)
|
||||||
orchestration_service = AgentOrchestrationService()
|
orchestration_service = AgentOrchestrationService()
|
||||||
|
|
||||||
async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> Dict[str, Any]:
|
async def generate_agent_enhanced_plan(
|
||||||
|
db: Session,
|
||||||
|
user_id: str,
|
||||||
|
date: str,
|
||||||
|
grounding: Optional[Dict[str, Any]] = None,
|
||||||
|
strict_contextuality: bool = False,
|
||||||
|
) -> Dict[str, Any]:
|
||||||
activity = AgentActivityService(db, user_id)
|
activity = AgentActivityService(db, user_id)
|
||||||
grounding = build_grounding_context(db, user_id, date)
|
grounding = grounding or build_grounding_context(db, user_id, date)
|
||||||
memory_service = TaskMemoryService(user_id, db)
|
memory_service = TaskMemoryService(user_id, db)
|
||||||
|
|
||||||
# 1. Get Orchestrator
|
# 1. Get Orchestrator
|
||||||
@@ -427,7 +470,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
|
|
||||||
# 4. Final Selection
|
# 4. Final Selection
|
||||||
# If we have agent tasks, use them. Otherwise fall back to LLM generation.
|
# If we have agent tasks, use them. Otherwise fall back to LLM generation.
|
||||||
if agent_tasks:
|
if agent_tasks and not strict_contextuality:
|
||||||
logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee")
|
logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee")
|
||||||
|
|
||||||
# Convert TaskProposal objects to dicts for frontend
|
# Convert TaskProposal objects to dicts for frontend
|
||||||
@@ -445,16 +488,15 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
"metadata": {
|
"metadata": {
|
||||||
"source_agent": prop.source_agent,
|
"source_agent": prop.source_agent,
|
||||||
"reasoning": prop.reasoning,
|
"reasoning": prop.reasoning,
|
||||||
"context_data": prop.context_data
|
"context_data": prop.context_data,
|
||||||
|
"evidence_links": _derive_onboarding_evidence_links(grounding.get("onboarding_data", {}), limit=2),
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
|
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
|
||||||
quality = _compute_plan_quality(final_tasks, grounding)
|
|
||||||
return {
|
return {
|
||||||
"date": date,
|
"date": date,
|
||||||
"tasks": final_tasks,
|
"tasks": final_tasks
|
||||||
"quality": quality,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# Fallback to original LLM generation if agents returned nothing
|
# Fallback to original LLM generation if agents returned nothing
|
||||||
@@ -503,6 +545,15 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
f"Grounding context (Alerts):\n{json.dumps(grounding.get('recent_agent_alerts', []), indent=2)}\n"
|
f"Grounding context (Alerts):\n{json.dumps(grounding.get('recent_agent_alerts', []), indent=2)}\n"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if strict_contextuality:
|
||||||
|
prompt += (
|
||||||
|
"\nStrict contextuality mode (must follow):\n"
|
||||||
|
f"- Every task.metadata must include evidence_links with at least {MIN_TASK_EVIDENCE_LINKS} entries.\n"
|
||||||
|
"- evidence_links entries must use either 'onboarding:<field_name>' or 'alert:<alert_id>' format.\n"
|
||||||
|
"- Include metadata.reasoning that explains how the evidence applies to the task.\n"
|
||||||
|
"- Reject generic tasks without explicit ties to onboarding data or active alerts.\n"
|
||||||
|
)
|
||||||
|
|
||||||
run = activity.start_run(agent_type="TodayWorkflowGenerator", prompt=prompt[:4000])
|
run = activity.start_run(agent_type="TodayWorkflowGenerator", prompt=prompt[:4000])
|
||||||
activity.log_event(
|
activity.log_event(
|
||||||
event_type="plan",
|
event_type="plan",
|
||||||
@@ -540,7 +591,6 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
"date": date,
|
"date": date,
|
||||||
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
|
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
|
||||||
}
|
}
|
||||||
result["quality"] = _compute_plan_quality(result.get("tasks", []), grounding)
|
|
||||||
|
|
||||||
activity.log_event(
|
activity.log_event(
|
||||||
event_type="final_summary",
|
event_type="final_summary",
|
||||||
@@ -569,22 +619,27 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
|
|||||||
existing = await run_in_threadpool(_get_existing)
|
existing = await run_in_threadpool(_get_existing)
|
||||||
|
|
||||||
if existing:
|
if existing:
|
||||||
existing_json = existing.plan_json if isinstance(existing.plan_json, dict) else {}
|
|
||||||
if not isinstance(existing_json.get("quality"), dict):
|
|
||||||
def _backfill_quality_for_existing():
|
|
||||||
plan_json = existing.plan_json if isinstance(existing.plan_json, dict) else {}
|
|
||||||
tasks_for_quality = plan_json.get("tasks") if isinstance(plan_json.get("tasks"), list) else []
|
|
||||||
plan_json["quality"] = _compute_plan_quality(tasks_for_quality, grounding={})
|
|
||||||
existing.plan_json = plan_json
|
|
||||||
existing.updated_at = datetime.utcnow()
|
|
||||||
db.add(existing)
|
|
||||||
db.commit()
|
|
||||||
db.refresh(existing)
|
|
||||||
return existing
|
|
||||||
existing = await run_in_threadpool(_backfill_quality_for_existing)
|
|
||||||
return existing, False
|
return existing, False
|
||||||
|
|
||||||
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
|
grounding = build_grounding_context(db, user_id, date_str)
|
||||||
|
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str, grounding=grounding)
|
||||||
|
validation = validate_plan_contextuality(plan_data, grounding)
|
||||||
|
|
||||||
|
if not validation.get("is_contextual"):
|
||||||
|
logger.info("Plan contextuality below threshold for user {}. Running strict regeneration.", user_id)
|
||||||
|
regenerated_plan = await generate_agent_enhanced_plan(
|
||||||
|
db,
|
||||||
|
user_id,
|
||||||
|
date_str,
|
||||||
|
grounding=grounding,
|
||||||
|
strict_contextuality=True,
|
||||||
|
)
|
||||||
|
regenerated_validation = validate_plan_contextuality(regenerated_plan, grounding)
|
||||||
|
plan_data = regenerated_plan
|
||||||
|
validation = regenerated_validation
|
||||||
|
|
||||||
|
plan_data["quality_status"] = "contextual" if validation.get("is_contextual") else "low_context"
|
||||||
|
plan_data["contextuality_validation"] = validation
|
||||||
tasks = plan_data.get("tasks", [])
|
tasks = plan_data.get("tasks", [])
|
||||||
|
|
||||||
def _create_plan():
|
def _create_plan():
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ if str(ROOT) not in sys.path:
|
|||||||
sys.path.insert(0, str(ROOT))
|
sys.path.insert(0, str(ROOT))
|
||||||
|
|
||||||
from services.intelligence.monitoring.semantic_dashboard import RealTimeSemanticMonitor, SemanticHealthMetric
|
from services.intelligence.monitoring.semantic_dashboard import RealTimeSemanticMonitor, SemanticHealthMetric
|
||||||
from services.today_workflow_service import _ensure_pillar_coverage, PILLAR_IDS
|
from services.today_workflow_service import _ensure_pillar_coverage, PILLAR_IDS, validate_plan_contextuality
|
||||||
from services.intelligence.sif_agents import ContentGuardianAgent as SifGuardian
|
from services.intelligence.sif_agents import ContentGuardianAgent as SifGuardian
|
||||||
from services.intelligence.agents.specialized_agents import ContentGuardianAgent as SpecializedGuardian
|
from services.intelligence.agents.specialized_agents import ContentGuardianAgent as SpecializedGuardian
|
||||||
|
|
||||||
@@ -74,6 +74,52 @@ class SIFReleaseReadinessTests(unittest.IsolatedAsyncioTestCase):
|
|||||||
self.assertIn("warning", result)
|
self.assertIn("warning", result)
|
||||||
self.assertEqual(result["method"], "competitor_index_search")
|
self.assertEqual(result["method"], "competitor_index_search")
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_plan_contextuality_passes_with_evidence_links(self):
|
||||||
|
plan = {
|
||||||
|
"tasks": [
|
||||||
|
{
|
||||||
|
"pillarId": "plan",
|
||||||
|
"title": "Review strategy",
|
||||||
|
"description": "Use onboarding goals",
|
||||||
|
"metadata": {
|
||||||
|
"evidence_links": ["onboarding:business_goals", "alert:101"],
|
||||||
|
"reasoning": "Based on onboarding and alert",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
grounding = {
|
||||||
|
"onboarding_data": {"business_goals": ["awareness"]},
|
||||||
|
"recent_agent_alerts": [{"alert_id": 101, "title": "Drop in traffic"}],
|
||||||
|
}
|
||||||
|
|
||||||
|
validation = validate_plan_contextuality(plan, grounding)
|
||||||
|
|
||||||
|
self.assertTrue(validation["is_contextual"])
|
||||||
|
self.assertEqual(validation["tasks_below_min_evidence"], 0)
|
||||||
|
|
||||||
|
def test_validate_plan_contextuality_flags_missing_evidence_links(self):
|
||||||
|
plan = {
|
||||||
|
"tasks": [
|
||||||
|
{
|
||||||
|
"pillarId": "generate",
|
||||||
|
"title": "Write generic post",
|
||||||
|
"description": "Create a post",
|
||||||
|
"metadata": {"reasoning": "General best practice"},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
grounding = {
|
||||||
|
"onboarding_data": {"business_goals": ["awareness"]},
|
||||||
|
"recent_agent_alerts": [{"alert_id": 101, "title": "Drop in traffic"}],
|
||||||
|
}
|
||||||
|
|
||||||
|
validation = validate_plan_contextuality(plan, grounding)
|
||||||
|
|
||||||
|
self.assertFalse(validation["is_contextual"])
|
||||||
|
self.assertEqual(validation["tasks_below_min_evidence"], 1)
|
||||||
|
|
||||||
def test_pillar_coverage_guardrail_backfills_missing(self):
|
def test_pillar_coverage_guardrail_backfills_missing(self):
|
||||||
tasks = [{"pillarId": "plan", "title": "Plan", "description": "d", "priority": "high", "estimatedTime": 10, "actionType": "navigate", "enabled": True}]
|
tasks = [{"pillarId": "plan", "title": "Plan", "description": "d", "priority": "high", "estimatedTime": 10, "actionType": "navigate", "enabled": True}]
|
||||||
grounding = {"workflow_config": {"enforce_pillar_coverage": True}}
|
grounding = {"workflow_config": {"enforce_pillar_coverage": True}}
|
||||||
|
|||||||
Reference in New Issue
Block a user