Merge PR #392: Add contextuality validation and low-context workflow status

- Replace provenance-based quality with contextuality validation framework
- Add evidence link tracking system (onboarding:key and alert:id formats)
- Implement plan contextuality validation function with configurable thresholds
- Calculate task-level context scores based on evidence link density
- Define contextual workflows (>65% threshold) vs low-context workflows (<65%)
- Add validation in plan persistence layer before database commit
- Integrate contextuality metrics into release readiness checks
- Add recovery strategies for low-context workflows (regeneration with guidance)
- Track evidence link validity against grounding context (onboarding data, alerts)
- Provide detailed contextuality reports in quality assessments
- Maintain backward compatibility while enabling contextual workflow detection
This commit is contained in:
ajaysi
2026-03-08 18:18:43 +05:30
3 changed files with 195 additions and 182 deletions

View File

@@ -11,10 +11,8 @@ from services.llm_providers.main_text_generation import llm_text_gen
from loguru import logger
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
TASK_PROVENANCE_AGENT = "agent_proposal"
TASK_PROVENANCE_LLM_BACKFILL = "llm_backfill"
TASK_PROVENANCE_CONTROLLED_FALLBACK = "controlled_fallback"
DEFAULT_AGENT_PERSONALIZATION_THRESHOLD = 0.35
MIN_TASK_EVIDENCE_LINKS = 1
PLAN_CONTEXT_THRESHOLD = 0.65
def _today_date_str() -> str:
@@ -140,74 +138,116 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]:
sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate"
sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None
sanitized["enabled"] = bool(task.get("enabled", True))
metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {}
provenance = str(metadata.get("provenance") or "").strip().lower()
if provenance not in {
TASK_PROVENANCE_AGENT,
TASK_PROVENANCE_LLM_BACKFILL,
TASK_PROVENANCE_CONTROLLED_FALLBACK,
}:
if metadata.get("source") == TASK_PROVENANCE_CONTROLLED_FALLBACK:
provenance = TASK_PROVENANCE_CONTROLLED_FALLBACK
elif metadata.get("source") == "llm_pillar_backfill":
provenance = TASK_PROVENANCE_LLM_BACKFILL
elif metadata.get("source_agent"):
provenance = TASK_PROVENANCE_AGENT
else:
provenance = TASK_PROVENANCE_LLM_BACKFILL
metadata["provenance"] = provenance
sanitized["metadata"] = metadata
return sanitized
def _agent_personalization_threshold(grounding: Dict[str, Any]) -> float:
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
configured = None
if isinstance(workflow_config, dict):
configured = workflow_config.get("min_agent_origin_ratio")
try:
value = float(configured) if configured is not None else DEFAULT_AGENT_PERSONALIZATION_THRESHOLD
except (TypeError, ValueError):
value = DEFAULT_AGENT_PERSONALIZATION_THRESHOLD
return max(0.0, min(1.0, value))
def _derive_onboarding_evidence_links(onboarding_data: Dict[str, Any], limit: int = 2) -> List[str]:
if not isinstance(onboarding_data, dict):
return []
links: List[str] = []
for key, value in onboarding_data.items():
if key == "workflow_config":
continue
if value in (None, "", [], {}):
continue
links.append(f"onboarding:{key}")
if len(links) >= limit:
break
return links
def _compute_plan_quality(tasks: List[Dict[str, Any]], grounding: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
total_tasks = len(tasks)
agent_origin_tasks = [
task for task in tasks
if isinstance(task.get("metadata"), dict)
and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_AGENT
]
fallback_tasks = [
task for task in tasks
if isinstance(task.get("metadata"), dict)
and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_CONTROLLED_FALLBACK
]
agent_pillars = {
str(task.get("pillarId") or "").lower().strip()
for task in agent_origin_tasks
if str(task.get("pillarId") or "").lower().strip() in PILLAR_IDS
def _valid_evidence_links(evidence_links: Any, grounding: Dict[str, Any]) -> List[str]:
if not isinstance(evidence_links, list):
return []
onboarding_data = grounding.get("onboarding_data", {}) if isinstance(grounding, dict) else {}
if not isinstance(onboarding_data, dict):
onboarding_data = {}
valid_onboarding_keys = {str(k) for k in onboarding_data.keys()}
recent_alerts = grounding.get("recent_agent_alerts", []) if isinstance(grounding, dict) else []
valid_alert_ids = {
str(a.get("alert_id"))
for a in recent_alerts
if isinstance(a, dict) and a.get("alert_id") is not None
}
agent_origin_ratio = (len(agent_origin_tasks) / total_tasks) if total_tasks else 0.0
fallback_ratio = (len(fallback_tasks) / total_tasks) if total_tasks else 0.0
threshold = _agent_personalization_threshold(grounding or {})
classification = "AI-personalized" if agent_origin_ratio >= threshold else "guided baseline"
valid_links: List[str] = []
for raw in evidence_links:
link = str(raw or "").strip()
if not link:
continue
if link.startswith("onboarding:"):
key = link.split(":", 1)[1].strip()
if key and key in valid_onboarding_keys:
valid_links.append(link)
elif link.startswith("alert:"):
alert_id = link.split(":", 1)[1].strip()
if alert_id and alert_id in valid_alert_ids:
valid_links.append(link)
return valid_links
def validate_plan_contextuality(plan: Dict[str, Any], grounding: Dict[str, Any]) -> Dict[str, Any]:
tasks = plan.get("tasks") if isinstance(plan, dict) else None
if not isinstance(tasks, list) or not tasks:
return {
"score": 0.0,
"threshold": PLAN_CONTEXT_THRESHOLD,
"is_contextual": False,
"task_scores": [],
"tasks_below_min_evidence": 0,
"min_evidence_links": MIN_TASK_EVIDENCE_LINKS,
}
task_scores = []
below_min_evidence = 0
for idx, task in enumerate(tasks):
metadata = task.get("metadata") if isinstance(task, dict) else {}
metadata = metadata if isinstance(metadata, dict) else {}
evidence_links = _valid_evidence_links(metadata.get("evidence_links"), grounding)
has_min_evidence = len(evidence_links) >= MIN_TASK_EVIDENCE_LINKS
if not has_min_evidence:
below_min_evidence += 1
reasoning_text = str(metadata.get("reasoning") or task.get("description") or "").lower()
onboarding_hits = sum(1 for l in evidence_links if l.startswith("onboarding:"))
alert_hits = sum(1 for l in evidence_links if l.startswith("alert:"))
score = 0.0
if has_min_evidence:
score += 0.6
if onboarding_hits > 0:
score += 0.2
if alert_hits > 0:
score += 0.2
elif "alert" in reasoning_text:
score += 0.1
task_scores.append(
{
"task_index": idx,
"pillarId": task.get("pillarId"),
"title": task.get("title"),
"score": min(score, 1.0),
"evidence_links": evidence_links,
"has_min_evidence": has_min_evidence,
}
)
plan_score = sum(t["score"] for t in task_scores) / len(task_scores)
is_contextual = plan_score >= PLAN_CONTEXT_THRESHOLD and below_min_evidence == 0
return {
"classification": classification,
"agentOriginRatio": round(agent_origin_ratio, 4),
"agentOriginPercent": round(agent_origin_ratio * 100, 2),
"agentOriginTaskCount": len(agent_origin_tasks),
"agentOriginPillars": len(agent_pillars),
"fallbackRatio": round(fallback_ratio, 4),
"fallbackPercent": round(fallback_ratio * 100, 2),
"fallbackTaskCount": len(fallback_tasks),
"totalTaskCount": total_tasks,
"thresholds": {
"minAgentOriginRatio": threshold,
},
"score": round(plan_score, 3),
"threshold": PLAN_CONTEXT_THRESHOLD,
"is_contextual": is_contextual,
"task_scores": task_scores,
"tasks_below_min_evidence": below_min_evidence,
"min_evidence_links": MIN_TASK_EVIDENCE_LINKS,
}
@@ -280,9 +320,6 @@ def _ensure_pillar_coverage(
generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding)
if generated:
metadata = generated.get("metadata") if isinstance(generated.get("metadata"), dict) else {}
metadata["provenance"] = TASK_PROVENANCE_LLM_BACKFILL
generated["metadata"] = metadata
sanitized_tasks.append(generated)
covered_pillars.add(pillar_id)
continue
@@ -291,7 +328,6 @@ def _ensure_pillar_coverage(
if controlled_fallback:
metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {}
metadata["source"] = "controlled_fallback"
metadata["provenance"] = TASK_PROVENANCE_CONTROLLED_FALLBACK
controlled_fallback["metadata"] = metadata
sanitized_tasks.append(controlled_fallback)
covered_pillars.add(pillar_id)
@@ -329,6 +365,7 @@ def build_grounding_context(db: Session, user_id: str, date: str) -> Dict[str, A
return {
"recent_agent_alerts": [
{
"alert_id": a.id,
"title": a.title,
"message": a.message,
"created_at": a.created_at.isoformat(),
@@ -348,9 +385,15 @@ from services.task_memory_service import TaskMemoryService
# Initialize orchestration service (singleton)
orchestration_service = AgentOrchestrationService()
async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> Dict[str, Any]:
async def generate_agent_enhanced_plan(
db: Session,
user_id: str,
date: str,
grounding: Optional[Dict[str, Any]] = None,
strict_contextuality: bool = False,
) -> Dict[str, Any]:
activity = AgentActivityService(db, user_id)
grounding = build_grounding_context(db, user_id, date)
grounding = grounding or build_grounding_context(db, user_id, date)
memory_service = TaskMemoryService(user_id, db)
# 1. Get Orchestrator
@@ -427,7 +470,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
# 4. Final Selection
# If we have agent tasks, use them. Otherwise fall back to LLM generation.
if agent_tasks:
if agent_tasks and not strict_contextuality:
logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee")
# Convert TaskProposal objects to dicts for frontend
@@ -445,16 +488,15 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
"metadata": {
"source_agent": prop.source_agent,
"reasoning": prop.reasoning,
"context_data": prop.context_data
"context_data": prop.context_data,
"evidence_links": _derive_onboarding_evidence_links(grounding.get("onboarding_data", {}), limit=2),
}
})
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
quality = _compute_plan_quality(final_tasks, grounding)
return {
"date": date,
"tasks": final_tasks,
"quality": quality,
"tasks": final_tasks
}
# Fallback to original LLM generation if agents returned nothing
@@ -503,6 +545,15 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
f"Grounding context (Alerts):\n{json.dumps(grounding.get('recent_agent_alerts', []), indent=2)}\n"
)
if strict_contextuality:
prompt += (
"\nStrict contextuality mode (must follow):\n"
f"- Every task.metadata must include evidence_links with at least {MIN_TASK_EVIDENCE_LINKS} entries.\n"
"- evidence_links entries must use either 'onboarding:<field_name>' or 'alert:<alert_id>' format.\n"
"- Include metadata.reasoning that explains how the evidence applies to the task.\n"
"- Reject generic tasks without explicit ties to onboarding data or active alerts.\n"
)
run = activity.start_run(agent_type="TodayWorkflowGenerator", prompt=prompt[:4000])
activity.log_event(
event_type="plan",
@@ -540,7 +591,6 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
"date": date,
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
}
result["quality"] = _compute_plan_quality(result.get("tasks", []), grounding)
activity.log_event(
event_type="final_summary",
@@ -569,22 +619,27 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
existing = await run_in_threadpool(_get_existing)
if existing:
existing_json = existing.plan_json if isinstance(existing.plan_json, dict) else {}
if not isinstance(existing_json.get("quality"), dict):
def _backfill_quality_for_existing():
plan_json = existing.plan_json if isinstance(existing.plan_json, dict) else {}
tasks_for_quality = plan_json.get("tasks") if isinstance(plan_json.get("tasks"), list) else []
plan_json["quality"] = _compute_plan_quality(tasks_for_quality, grounding={})
existing.plan_json = plan_json
existing.updated_at = datetime.utcnow()
db.add(existing)
db.commit()
db.refresh(existing)
return existing
existing = await run_in_threadpool(_backfill_quality_for_existing)
return existing, False
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
grounding = build_grounding_context(db, user_id, date_str)
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str, grounding=grounding)
validation = validate_plan_contextuality(plan_data, grounding)
if not validation.get("is_contextual"):
logger.info("Plan contextuality below threshold for user {}. Running strict regeneration.", user_id)
regenerated_plan = await generate_agent_enhanced_plan(
db,
user_id,
date_str,
grounding=grounding,
strict_contextuality=True,
)
regenerated_validation = validate_plan_contextuality(regenerated_plan, grounding)
plan_data = regenerated_plan
validation = regenerated_validation
plan_data["quality_status"] = "contextual" if validation.get("is_contextual") else "low_context"
plan_data["contextuality_validation"] = validation
tasks = plan_data.get("tasks", [])
def _create_plan():