Add pillar coverage guardrails for today workflow plans
This commit is contained in:
@@ -8,7 +8,6 @@ from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask
|
|||||||
from models.agent_activity_models import AgentAlert
|
from models.agent_activity_models import AgentAlert
|
||||||
from services.agent_activity_service import AgentActivityService
|
from services.agent_activity_service import AgentActivityService
|
||||||
from services.llm_providers.main_text_generation import llm_text_gen
|
from services.llm_providers.main_text_generation import llm_text_gen
|
||||||
from api.content_planning.services.content_strategy.onboarding.data_integration import OnboardingDataIntegrationService
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
|
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
|
||||||
@@ -95,6 +94,122 @@ def _fallback_tasks(date: str) -> List[Dict[str, Any]]:
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool:
|
||||||
|
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
|
||||||
|
if not isinstance(workflow_config, dict):
|
||||||
|
return True
|
||||||
|
if workflow_config.get("disable_pillar_coverage_guardrail") is True:
|
||||||
|
return False
|
||||||
|
if workflow_config.get("enforce_pillar_coverage") is False:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||||
|
if not isinstance(task, dict):
|
||||||
|
return None
|
||||||
|
|
||||||
|
pillar_id = str(task.get("pillarId") or "").lower().strip()
|
||||||
|
title = str(task.get("title") or "").strip()
|
||||||
|
if pillar_id not in PILLAR_IDS or not title:
|
||||||
|
return None
|
||||||
|
|
||||||
|
sanitized = dict(task)
|
||||||
|
sanitized["pillarId"] = pillar_id
|
||||||
|
sanitized["title"] = title
|
||||||
|
sanitized["description"] = str(task.get("description") or "").strip()
|
||||||
|
sanitized["priority"] = _coerce_priority(task.get("priority"))
|
||||||
|
sanitized["estimatedTime"] = max(5, int(task.get("estimatedTime") or 15))
|
||||||
|
sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate"
|
||||||
|
sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None
|
||||||
|
sanitized["enabled"] = bool(task.get("enabled", True))
|
||||||
|
return sanitized
|
||||||
|
|
||||||
|
|
||||||
|
def _build_single_task_for_missing_pillar(
|
||||||
|
user_id: str,
|
||||||
|
date: str,
|
||||||
|
pillar_id: str,
|
||||||
|
grounding: Dict[str, Any],
|
||||||
|
) -> Optional[Dict[str, Any]]:
|
||||||
|
schema = {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"pillarId": {"type": "string"},
|
||||||
|
"title": {"type": "string"},
|
||||||
|
"description": {"type": "string"},
|
||||||
|
"priority": {"type": "string"},
|
||||||
|
"estimatedTime": {"type": "number"},
|
||||||
|
"actionType": {"type": "string"},
|
||||||
|
"actionUrl": {"type": "string"},
|
||||||
|
"enabled": {"type": "boolean"},
|
||||||
|
"metadata": {"type": "object"},
|
||||||
|
},
|
||||||
|
"required": ["pillarId", "title", "description", "priority", "estimatedTime", "actionType", "enabled"],
|
||||||
|
}
|
||||||
|
prompt = (
|
||||||
|
"Generate exactly one actionable JSON task for today's workflow.\n"
|
||||||
|
f"Date: {date}\n"
|
||||||
|
f"Required pillarId: {pillar_id}\n"
|
||||||
|
"Constraints:\n"
|
||||||
|
"- Return a single JSON object only.\n"
|
||||||
|
"- Keep title concise and practical.\n"
|
||||||
|
"- Task must be completable today.\n"
|
||||||
|
"- Use actionType='navigate' and a valid ALwrity route when possible.\n"
|
||||||
|
f"User context: {json.dumps(grounding.get('onboarding_data', {}), indent=2)}\n"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id)
|
||||||
|
candidate = raw if isinstance(raw, dict) else json.loads(raw)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to generate pillar backfill task for {pillar_id}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
candidate = _sanitize_task(candidate)
|
||||||
|
if candidate:
|
||||||
|
candidate["pillarId"] = pillar_id
|
||||||
|
metadata = candidate.get("metadata") if isinstance(candidate.get("metadata"), dict) else {}
|
||||||
|
metadata["source"] = "llm_pillar_backfill"
|
||||||
|
candidate["metadata"] = metadata
|
||||||
|
return candidate
|
||||||
|
|
||||||
|
|
||||||
|
def _ensure_pillar_coverage(
|
||||||
|
tasks: List[Dict[str, Any]],
|
||||||
|
user_id: str,
|
||||||
|
date: str,
|
||||||
|
grounding: Dict[str, Any],
|
||||||
|
) -> List[Dict[str, Any]]:
|
||||||
|
sanitized_tasks = [t for t in (_sanitize_task(task) for task in tasks) if t]
|
||||||
|
if not _is_coverage_guardrail_enabled(grounding):
|
||||||
|
return sanitized_tasks
|
||||||
|
|
||||||
|
covered_pillars = {task["pillarId"] for task in sanitized_tasks}
|
||||||
|
fallback_by_pillar = {
|
||||||
|
task["pillarId"]: task for task in (_sanitize_task(t) for t in _fallback_tasks(date)) if task
|
||||||
|
}
|
||||||
|
|
||||||
|
for pillar_id in PILLAR_IDS:
|
||||||
|
if pillar_id in covered_pillars:
|
||||||
|
continue
|
||||||
|
|
||||||
|
generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding)
|
||||||
|
if generated:
|
||||||
|
sanitized_tasks.append(generated)
|
||||||
|
covered_pillars.add(pillar_id)
|
||||||
|
continue
|
||||||
|
|
||||||
|
controlled_fallback = fallback_by_pillar.get(pillar_id)
|
||||||
|
if controlled_fallback:
|
||||||
|
metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {}
|
||||||
|
metadata["source"] = "controlled_fallback"
|
||||||
|
controlled_fallback["metadata"] = metadata
|
||||||
|
sanitized_tasks.append(controlled_fallback)
|
||||||
|
covered_pillars.add(pillar_id)
|
||||||
|
|
||||||
|
return sanitized_tasks
|
||||||
|
|
||||||
|
|
||||||
def build_grounding_context(db: Session, user_id: str, date: str) -> Dict[str, Any]:
|
def build_grounding_context(db: Session, user_id: str, date: str) -> Dict[str, Any]:
|
||||||
# 1. Fetch unread alerts
|
# 1. Fetch unread alerts
|
||||||
unread_agent_alerts = (
|
unread_agent_alerts = (
|
||||||
@@ -108,6 +223,8 @@ def build_grounding_context(db: Session, user_id: str, date: str) -> Dict[str, A
|
|||||||
# 2. Fetch comprehensive onboarding data (SIF)
|
# 2. Fetch comprehensive onboarding data (SIF)
|
||||||
onboarding_context = {}
|
onboarding_context = {}
|
||||||
try:
|
try:
|
||||||
|
from api.content_planning.services.content_strategy.onboarding.data_integration import OnboardingDataIntegrationService
|
||||||
|
|
||||||
svc = OnboardingDataIntegrationService()
|
svc = OnboardingDataIntegrationService()
|
||||||
integrated = svc.get_integrated_data_sync(user_id, db) or {}
|
integrated = svc.get_integrated_data_sync(user_id, db) or {}
|
||||||
|
|
||||||
@@ -235,8 +352,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
# Ensure we have coverage for all pillars (fill gaps with fallback/LLM if needed)
|
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
|
||||||
# For now, let's just return what the agents proposed
|
|
||||||
return {
|
return {
|
||||||
"date": date,
|
"date": date,
|
||||||
"tasks": final_tasks
|
"tasks": final_tasks
|
||||||
@@ -320,7 +436,11 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
|
|
||||||
tasks = result.get("tasks") if isinstance(result, dict) else None
|
tasks = result.get("tasks") if isinstance(result, dict) else None
|
||||||
if not isinstance(tasks, list) or not tasks:
|
if not isinstance(tasks, list) or not tasks:
|
||||||
result = {"date": date, "tasks": _fallback_tasks(date)}
|
tasks = _fallback_tasks(date)
|
||||||
|
result = {
|
||||||
|
"date": date,
|
||||||
|
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
|
||||||
|
}
|
||||||
|
|
||||||
activity.log_event(
|
activity.log_event(
|
||||||
event_type="final_summary",
|
event_type="final_summary",
|
||||||
|
|||||||
105
backend/tests/test_today_workflow_pillar_coverage.py
Normal file
105
backend/tests/test_today_workflow_pillar_coverage.py
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
from types import SimpleNamespace
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from services.intelligence.agents.core_agent_framework import TaskProposal
|
||||||
|
from services import today_workflow_service as svc
|
||||||
|
|
||||||
|
|
||||||
|
class DummyActivity:
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def start_run(self, *args, **kwargs):
|
||||||
|
return SimpleNamespace(id="run-1")
|
||||||
|
|
||||||
|
def log_event(self, *args, **kwargs):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def finish_run(self, *args, **kwargs):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class DummyAgent:
|
||||||
|
def __init__(self, proposals):
|
||||||
|
self._proposals = proposals
|
||||||
|
|
||||||
|
async def propose_daily_tasks(self, grounding):
|
||||||
|
return self._proposals
|
||||||
|
|
||||||
|
|
||||||
|
def _mock_orchestrator_with_agents(proposals):
|
||||||
|
return SimpleNamespace(
|
||||||
|
agents={
|
||||||
|
"content": DummyAgent(proposals),
|
||||||
|
"seo": None,
|
||||||
|
"social": None,
|
||||||
|
"competitor": None,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _covered_pillars(result):
|
||||||
|
return {task["pillarId"] for task in result["tasks"]}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_generate_agent_enhanced_plan_preserves_full_committee_coverage(monkeypatch):
|
||||||
|
proposals = [
|
||||||
|
TaskProposal("P", "desc", "plan", "high", 10, "content", "why", {}, "navigate", "/content-planning-dashboard"),
|
||||||
|
TaskProposal("G", "desc", "generate", "high", 10, "content", "why", {}, "navigate", "/blog-writer"),
|
||||||
|
TaskProposal("Pu", "desc", "publish", "high", 10, "content", "why", {}, "navigate", "/scheduler-dashboard"),
|
||||||
|
TaskProposal("A", "desc", "analyze", "high", 10, "content", "why", {}, "navigate", "/seo-dashboard"),
|
||||||
|
TaskProposal("E", "desc", "engage", "high", 10, "content", "why", {}, "navigate", "/linkedin-writer"),
|
||||||
|
TaskProposal("R", "desc", "remarket", "high", 10, "content", "why", {}, "navigate", "/facebook-writer"),
|
||||||
|
]
|
||||||
|
|
||||||
|
async def _get_orchestrator(user_id):
|
||||||
|
return _mock_orchestrator_with_agents(proposals)
|
||||||
|
|
||||||
|
monkeypatch.setattr(svc, "build_grounding_context", lambda db, user_id, date: {})
|
||||||
|
monkeypatch.setattr(svc.orchestration_service, "get_or_create_orchestrator", _get_orchestrator)
|
||||||
|
|
||||||
|
result = await svc.generate_agent_enhanced_plan(db=None, user_id="u1", date="2026-01-01")
|
||||||
|
|
||||||
|
assert _covered_pillars(result) == set(svc.PILLAR_IDS)
|
||||||
|
assert len(result["tasks"]) == len(proposals)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_generate_agent_enhanced_plan_backfills_missing_committee_pillars(monkeypatch):
|
||||||
|
proposals = [
|
||||||
|
TaskProposal("P", "desc", "plan", "high", 10, "content", "why", {}, "navigate", "/content-planning-dashboard"),
|
||||||
|
TaskProposal("G", "desc", "generate", "high", 10, "content", "why", {}, "navigate", "/blog-writer"),
|
||||||
|
]
|
||||||
|
|
||||||
|
async def _get_orchestrator(user_id):
|
||||||
|
return _mock_orchestrator_with_agents(proposals)
|
||||||
|
|
||||||
|
monkeypatch.setattr(svc, "build_grounding_context", lambda db, user_id, date: {})
|
||||||
|
monkeypatch.setattr(svc.orchestration_service, "get_or_create_orchestrator", _get_orchestrator)
|
||||||
|
|
||||||
|
result = await svc.generate_agent_enhanced_plan(db=None, user_id="u1", date="2026-01-01")
|
||||||
|
|
||||||
|
assert _covered_pillars(result) == set(svc.PILLAR_IDS)
|
||||||
|
assert {"P", "G"}.issubset({task["title"] for task in result["tasks"]})
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_generate_agent_enhanced_plan_full_fallback_path_still_covers_all_pillars(monkeypatch):
|
||||||
|
async def _get_orchestrator(user_id):
|
||||||
|
return _mock_orchestrator_with_agents([])
|
||||||
|
|
||||||
|
monkeypatch.setattr(svc, "build_grounding_context", lambda db, user_id, date: {})
|
||||||
|
monkeypatch.setattr(svc.orchestration_service, "get_or_create_orchestrator", _get_orchestrator)
|
||||||
|
monkeypatch.setattr(svc, "AgentActivityService", DummyActivity)
|
||||||
|
|
||||||
|
def _raise_llm(*args, **kwargs):
|
||||||
|
raise RuntimeError("LLM down")
|
||||||
|
|
||||||
|
monkeypatch.setattr(svc, "llm_text_gen", _raise_llm)
|
||||||
|
|
||||||
|
result = await svc.generate_agent_enhanced_plan(db=None, user_id="u1", date="2026-01-01")
|
||||||
|
|
||||||
|
assert _covered_pillars(result) == set(svc.PILLAR_IDS)
|
||||||
|
assert len(result["tasks"]) >= len(svc.PILLAR_IDS)
|
||||||
Reference in New Issue
Block a user