Merge PR #392: Add contextuality validation and low-context workflow status

- Replace provenance-based quality with contextuality validation framework
- Add evidence link tracking system (onboarding:key and alert:id formats)
- Implement plan contextuality validation function with configurable thresholds
- Calculate task-level context scores based on evidence link density
- Define contextual workflows (>65% threshold) vs low-context workflows (<65%)
- Add validation in plan persistence layer before database commit
- Integrate contextuality metrics into release readiness checks
- Add recovery strategies for low-context workflows (regeneration with guidance)
- Track evidence link validity against grounding context (onboarding data, alerts)
- Provide detailed contextuality reports in quality assessments
- Maintain backward compatibility while enabling contextual workflow detection
This commit is contained in:
ajaysi
2026-03-08 18:18:43 +05:30
3 changed files with 195 additions and 182 deletions

View File

@@ -1,14 +1,13 @@
from fastapi import APIRouter, Depends, HTTPException
from typing import Any, Dict, Optional
from datetime import datetime, timezone
from collections import defaultdict, deque
from datetime import datetime
from loguru import logger
from sqlalchemy.orm import Session
from middleware.auth_middleware import get_current_user
from services.database import get_db
from services.today_workflow_service import get_or_create_daily_workflow_plan, regenerate_daily_workflow_plan, update_task_status
from services.today_workflow_service import get_or_create_daily_workflow_plan, update_task_status
from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask
import asyncio
from services.intelligence.txtai_service import TxtaiIntelligenceService
@@ -16,27 +15,6 @@ from services.intelligence.txtai_service import TxtaiIntelligenceService
router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"])
REGENERATE_WINDOW_SECONDS = 60
REGENERATE_MAX_REQUESTS_PER_WINDOW = 3
_regen_request_log: dict[str, deque[float]] = defaultdict(deque)
def _check_regenerate_rate_limit(user_id: str) -> None:
import time
now = time.time()
window_start = now - REGENERATE_WINDOW_SECONDS
history = _regen_request_log[user_id]
while history and history[0] < window_start:
history.popleft()
if len(history) >= REGENERATE_MAX_REQUESTS_PER_WINDOW:
raise HTTPException(status_code=429, detail="Regeneration rate limit exceeded")
history.append(now)
async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str):
svc = TxtaiIntelligenceService(user_id)
items = []
@@ -161,9 +139,6 @@ async def get_today_workflow(
except Exception:
pass
plan_json = plan.plan_json if isinstance(plan.plan_json, dict) else {}
quality = plan_json.get("quality") if isinstance(plan_json.get("quality"), dict) else None
return {
"success": True,
"data": {
@@ -183,12 +158,10 @@ async def get_today_workflow(
"id": plan.id,
"date": plan.date,
"source": plan.source,
"quality": quality,
"quality_status": (plan.plan_json or {}).get("quality_status", "contextual"),
"contextuality_validation": (plan.plan_json or {}).get("contextuality_validation"),
"created_at": plan.created_at.isoformat() if plan.created_at else None,
"updated_at": plan.updated_at.isoformat() if plan.updated_at else None,
"generation_mode": (plan.plan_json or {}).get("generation_mode"),
"quality_score": (plan.plan_json or {}).get("quality_score"),
"generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"),
},
},
"timestamp": datetime.utcnow().isoformat(),
@@ -196,67 +169,6 @@ async def get_today_workflow(
}
@router.post("/regenerate")
async def regenerate_today_workflow(
date: Optional[str] = None,
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db),
) -> Dict[str, Any]:
from starlette.concurrency import run_in_threadpool
user_id = str(current_user.get("id"))
_check_regenerate_rate_limit(user_id)
plan = await regenerate_daily_workflow_plan(db, user_id, date=date)
tasks = await run_in_threadpool(
lambda: (
db.query(DailyWorkflowTask)
.filter(DailyWorkflowTask.plan_id == plan.id, DailyWorkflowTask.user_id == user_id)
.order_by(DailyWorkflowTask.created_at.asc())
.all()
)
)
response_tasks = [
{
"id": str(t.id),
"pillarId": t.pillar_id,
"title": t.title,
"description": t.description,
"status": "skipped" if t.status == "dismissed" else t.status,
"priority": t.priority,
"estimatedTime": t.estimated_time,
"dependencies": t.dependencies or [],
"actionUrl": t.action_url,
"actionType": t.action_type,
"metadata": t.metadata_json or {},
"enabled": bool(t.enabled),
}
for t in tasks
]
asyncio.create_task(_index_tasks_to_sif(user_id, plan.date, response_tasks, label="today_regenerated"))
return {
"success": True,
"data": {
"plan": {
"id": plan.id,
"date": plan.date,
"source": plan.source,
"generation_mode": (plan.plan_json or {}).get("generation_mode"),
"quality_score": (plan.plan_json or {}).get("quality_score"),
"generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"),
"regenerated_at": datetime.now(timezone.utc).isoformat(),
},
"tasks": response_tasks,
},
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
}
from services.task_memory_service import TaskMemoryService
@router.post("/tasks/{task_id}/status")

View File

@@ -11,10 +11,8 @@ from services.llm_providers.main_text_generation import llm_text_gen
from loguru import logger
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
TASK_PROVENANCE_AGENT = "agent_proposal"
TASK_PROVENANCE_LLM_BACKFILL = "llm_backfill"
TASK_PROVENANCE_CONTROLLED_FALLBACK = "controlled_fallback"
DEFAULT_AGENT_PERSONALIZATION_THRESHOLD = 0.35
MIN_TASK_EVIDENCE_LINKS = 1
PLAN_CONTEXT_THRESHOLD = 0.65
def _today_date_str() -> str:
@@ -140,74 +138,116 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]:
sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate"
sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None
sanitized["enabled"] = bool(task.get("enabled", True))
metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {}
provenance = str(metadata.get("provenance") or "").strip().lower()
if provenance not in {
TASK_PROVENANCE_AGENT,
TASK_PROVENANCE_LLM_BACKFILL,
TASK_PROVENANCE_CONTROLLED_FALLBACK,
}:
if metadata.get("source") == TASK_PROVENANCE_CONTROLLED_FALLBACK:
provenance = TASK_PROVENANCE_CONTROLLED_FALLBACK
elif metadata.get("source") == "llm_pillar_backfill":
provenance = TASK_PROVENANCE_LLM_BACKFILL
elif metadata.get("source_agent"):
provenance = TASK_PROVENANCE_AGENT
else:
provenance = TASK_PROVENANCE_LLM_BACKFILL
metadata["provenance"] = provenance
sanitized["metadata"] = metadata
return sanitized
def _agent_personalization_threshold(grounding: Dict[str, Any]) -> float:
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
configured = None
if isinstance(workflow_config, dict):
configured = workflow_config.get("min_agent_origin_ratio")
try:
value = float(configured) if configured is not None else DEFAULT_AGENT_PERSONALIZATION_THRESHOLD
except (TypeError, ValueError):
value = DEFAULT_AGENT_PERSONALIZATION_THRESHOLD
return max(0.0, min(1.0, value))
def _derive_onboarding_evidence_links(onboarding_data: Dict[str, Any], limit: int = 2) -> List[str]:
if not isinstance(onboarding_data, dict):
return []
links: List[str] = []
for key, value in onboarding_data.items():
if key == "workflow_config":
continue
if value in (None, "", [], {}):
continue
links.append(f"onboarding:{key}")
if len(links) >= limit:
break
return links
def _compute_plan_quality(tasks: List[Dict[str, Any]], grounding: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
total_tasks = len(tasks)
agent_origin_tasks = [
task for task in tasks
if isinstance(task.get("metadata"), dict)
and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_AGENT
]
fallback_tasks = [
task for task in tasks
if isinstance(task.get("metadata"), dict)
and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_CONTROLLED_FALLBACK
]
agent_pillars = {
str(task.get("pillarId") or "").lower().strip()
for task in agent_origin_tasks
if str(task.get("pillarId") or "").lower().strip() in PILLAR_IDS
def _valid_evidence_links(evidence_links: Any, grounding: Dict[str, Any]) -> List[str]:
if not isinstance(evidence_links, list):
return []
onboarding_data = grounding.get("onboarding_data", {}) if isinstance(grounding, dict) else {}
if not isinstance(onboarding_data, dict):
onboarding_data = {}
valid_onboarding_keys = {str(k) for k in onboarding_data.keys()}
recent_alerts = grounding.get("recent_agent_alerts", []) if isinstance(grounding, dict) else []
valid_alert_ids = {
str(a.get("alert_id"))
for a in recent_alerts
if isinstance(a, dict) and a.get("alert_id") is not None
}
agent_origin_ratio = (len(agent_origin_tasks) / total_tasks) if total_tasks else 0.0
fallback_ratio = (len(fallback_tasks) / total_tasks) if total_tasks else 0.0
threshold = _agent_personalization_threshold(grounding or {})
classification = "AI-personalized" if agent_origin_ratio >= threshold else "guided baseline"
valid_links: List[str] = []
for raw in evidence_links:
link = str(raw or "").strip()
if not link:
continue
if link.startswith("onboarding:"):
key = link.split(":", 1)[1].strip()
if key and key in valid_onboarding_keys:
valid_links.append(link)
elif link.startswith("alert:"):
alert_id = link.split(":", 1)[1].strip()
if alert_id and alert_id in valid_alert_ids:
valid_links.append(link)
return valid_links
def validate_plan_contextuality(plan: Dict[str, Any], grounding: Dict[str, Any]) -> Dict[str, Any]:
tasks = plan.get("tasks") if isinstance(plan, dict) else None
if not isinstance(tasks, list) or not tasks:
return {
"score": 0.0,
"threshold": PLAN_CONTEXT_THRESHOLD,
"is_contextual": False,
"task_scores": [],
"tasks_below_min_evidence": 0,
"min_evidence_links": MIN_TASK_EVIDENCE_LINKS,
}
task_scores = []
below_min_evidence = 0
for idx, task in enumerate(tasks):
metadata = task.get("metadata") if isinstance(task, dict) else {}
metadata = metadata if isinstance(metadata, dict) else {}
evidence_links = _valid_evidence_links(metadata.get("evidence_links"), grounding)
has_min_evidence = len(evidence_links) >= MIN_TASK_EVIDENCE_LINKS
if not has_min_evidence:
below_min_evidence += 1
reasoning_text = str(metadata.get("reasoning") or task.get("description") or "").lower()
onboarding_hits = sum(1 for l in evidence_links if l.startswith("onboarding:"))
alert_hits = sum(1 for l in evidence_links if l.startswith("alert:"))
score = 0.0
if has_min_evidence:
score += 0.6
if onboarding_hits > 0:
score += 0.2
if alert_hits > 0:
score += 0.2
elif "alert" in reasoning_text:
score += 0.1
task_scores.append(
{
"task_index": idx,
"pillarId": task.get("pillarId"),
"title": task.get("title"),
"score": min(score, 1.0),
"evidence_links": evidence_links,
"has_min_evidence": has_min_evidence,
}
)
plan_score = sum(t["score"] for t in task_scores) / len(task_scores)
is_contextual = plan_score >= PLAN_CONTEXT_THRESHOLD and below_min_evidence == 0
return {
"classification": classification,
"agentOriginRatio": round(agent_origin_ratio, 4),
"agentOriginPercent": round(agent_origin_ratio * 100, 2),
"agentOriginTaskCount": len(agent_origin_tasks),
"agentOriginPillars": len(agent_pillars),
"fallbackRatio": round(fallback_ratio, 4),
"fallbackPercent": round(fallback_ratio * 100, 2),
"fallbackTaskCount": len(fallback_tasks),
"totalTaskCount": total_tasks,
"thresholds": {
"minAgentOriginRatio": threshold,
},
"score": round(plan_score, 3),
"threshold": PLAN_CONTEXT_THRESHOLD,
"is_contextual": is_contextual,
"task_scores": task_scores,
"tasks_below_min_evidence": below_min_evidence,
"min_evidence_links": MIN_TASK_EVIDENCE_LINKS,
}
@@ -280,9 +320,6 @@ def _ensure_pillar_coverage(
generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding)
if generated:
metadata = generated.get("metadata") if isinstance(generated.get("metadata"), dict) else {}
metadata["provenance"] = TASK_PROVENANCE_LLM_BACKFILL
generated["metadata"] = metadata
sanitized_tasks.append(generated)
covered_pillars.add(pillar_id)
continue
@@ -291,7 +328,6 @@ def _ensure_pillar_coverage(
if controlled_fallback:
metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {}
metadata["source"] = "controlled_fallback"
metadata["provenance"] = TASK_PROVENANCE_CONTROLLED_FALLBACK
controlled_fallback["metadata"] = metadata
sanitized_tasks.append(controlled_fallback)
covered_pillars.add(pillar_id)
@@ -329,6 +365,7 @@ def build_grounding_context(db: Session, user_id: str, date: str) -> Dict[str, A
return {
"recent_agent_alerts": [
{
"alert_id": a.id,
"title": a.title,
"message": a.message,
"created_at": a.created_at.isoformat(),
@@ -348,9 +385,15 @@ from services.task_memory_service import TaskMemoryService
# Initialize orchestration service (singleton)
orchestration_service = AgentOrchestrationService()
async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> Dict[str, Any]:
async def generate_agent_enhanced_plan(
db: Session,
user_id: str,
date: str,
grounding: Optional[Dict[str, Any]] = None,
strict_contextuality: bool = False,
) -> Dict[str, Any]:
activity = AgentActivityService(db, user_id)
grounding = build_grounding_context(db, user_id, date)
grounding = grounding or build_grounding_context(db, user_id, date)
memory_service = TaskMemoryService(user_id, db)
# 1. Get Orchestrator
@@ -427,7 +470,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
# 4. Final Selection
# If we have agent tasks, use them. Otherwise fall back to LLM generation.
if agent_tasks:
if agent_tasks and not strict_contextuality:
logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee")
# Convert TaskProposal objects to dicts for frontend
@@ -445,16 +488,15 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
"metadata": {
"source_agent": prop.source_agent,
"reasoning": prop.reasoning,
"context_data": prop.context_data
"context_data": prop.context_data,
"evidence_links": _derive_onboarding_evidence_links(grounding.get("onboarding_data", {}), limit=2),
}
})
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
quality = _compute_plan_quality(final_tasks, grounding)
return {
"date": date,
"tasks": final_tasks,
"quality": quality,
"tasks": final_tasks
}
# Fallback to original LLM generation if agents returned nothing
@@ -503,6 +545,15 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
f"Grounding context (Alerts):\n{json.dumps(grounding.get('recent_agent_alerts', []), indent=2)}\n"
)
if strict_contextuality:
prompt += (
"\nStrict contextuality mode (must follow):\n"
f"- Every task.metadata must include evidence_links with at least {MIN_TASK_EVIDENCE_LINKS} entries.\n"
"- evidence_links entries must use either 'onboarding:<field_name>' or 'alert:<alert_id>' format.\n"
"- Include metadata.reasoning that explains how the evidence applies to the task.\n"
"- Reject generic tasks without explicit ties to onboarding data or active alerts.\n"
)
run = activity.start_run(agent_type="TodayWorkflowGenerator", prompt=prompt[:4000])
activity.log_event(
event_type="plan",
@@ -540,7 +591,6 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
"date": date,
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
}
result["quality"] = _compute_plan_quality(result.get("tasks", []), grounding)
activity.log_event(
event_type="final_summary",
@@ -569,22 +619,27 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
existing = await run_in_threadpool(_get_existing)
if existing:
existing_json = existing.plan_json if isinstance(existing.plan_json, dict) else {}
if not isinstance(existing_json.get("quality"), dict):
def _backfill_quality_for_existing():
plan_json = existing.plan_json if isinstance(existing.plan_json, dict) else {}
tasks_for_quality = plan_json.get("tasks") if isinstance(plan_json.get("tasks"), list) else []
plan_json["quality"] = _compute_plan_quality(tasks_for_quality, grounding={})
existing.plan_json = plan_json
existing.updated_at = datetime.utcnow()
db.add(existing)
db.commit()
db.refresh(existing)
return existing
existing = await run_in_threadpool(_backfill_quality_for_existing)
return existing, False
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
grounding = build_grounding_context(db, user_id, date_str)
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str, grounding=grounding)
validation = validate_plan_contextuality(plan_data, grounding)
if not validation.get("is_contextual"):
logger.info("Plan contextuality below threshold for user {}. Running strict regeneration.", user_id)
regenerated_plan = await generate_agent_enhanced_plan(
db,
user_id,
date_str,
grounding=grounding,
strict_contextuality=True,
)
regenerated_validation = validate_plan_contextuality(regenerated_plan, grounding)
plan_data = regenerated_plan
validation = regenerated_validation
plan_data["quality_status"] = "contextual" if validation.get("is_contextual") else "low_context"
plan_data["contextuality_validation"] = validation
tasks = plan_data.get("tasks", [])
def _create_plan():

View File

@@ -8,7 +8,7 @@ if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from services.intelligence.monitoring.semantic_dashboard import RealTimeSemanticMonitor, SemanticHealthMetric
from services.today_workflow_service import _ensure_pillar_coverage, PILLAR_IDS
from services.today_workflow_service import _ensure_pillar_coverage, PILLAR_IDS, validate_plan_contextuality
from services.intelligence.sif_agents import ContentGuardianAgent as SifGuardian
from services.intelligence.agents.specialized_agents import ContentGuardianAgent as SpecializedGuardian
@@ -74,6 +74,52 @@ class SIFReleaseReadinessTests(unittest.IsolatedAsyncioTestCase):
self.assertIn("warning", result)
self.assertEqual(result["method"], "competitor_index_search")
def test_validate_plan_contextuality_passes_with_evidence_links(self):
plan = {
"tasks": [
{
"pillarId": "plan",
"title": "Review strategy",
"description": "Use onboarding goals",
"metadata": {
"evidence_links": ["onboarding:business_goals", "alert:101"],
"reasoning": "Based on onboarding and alert",
},
}
]
}
grounding = {
"onboarding_data": {"business_goals": ["awareness"]},
"recent_agent_alerts": [{"alert_id": 101, "title": "Drop in traffic"}],
}
validation = validate_plan_contextuality(plan, grounding)
self.assertTrue(validation["is_contextual"])
self.assertEqual(validation["tasks_below_min_evidence"], 0)
def test_validate_plan_contextuality_flags_missing_evidence_links(self):
plan = {
"tasks": [
{
"pillarId": "generate",
"title": "Write generic post",
"description": "Create a post",
"metadata": {"reasoning": "General best practice"},
}
]
}
grounding = {
"onboarding_data": {"business_goals": ["awareness"]},
"recent_agent_alerts": [{"alert_id": 101, "title": "Drop in traffic"}],
}
validation = validate_plan_contextuality(plan, grounding)
self.assertFalse(validation["is_contextual"])
self.assertEqual(validation["tasks_below_min_evidence"], 1)
def test_pillar_coverage_guardrail_backfills_missing(self):
tasks = [{"pillarId": "plan", "title": "Plan", "description": "d", "priority": "high", "estimatedTime": 10, "actionType": "navigate", "enabled": True}]
grounding = {"workflow_config": {"enforce_pillar_coverage": True}}