diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index c6a09f97..0aef12cb 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -8,7 +8,6 @@ from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask from models.agent_activity_models import AgentAlert from services.agent_activity_service import AgentActivityService from services.llm_providers.main_text_generation import llm_text_gen -from api.content_planning.services.content_strategy.onboarding.data_integration import OnboardingDataIntegrationService from loguru import logger PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] @@ -95,6 +94,122 @@ def _fallback_tasks(date: str) -> List[Dict[str, Any]]: ] +def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool: + workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} + if not isinstance(workflow_config, dict): + return True + if workflow_config.get("disable_pillar_coverage_guardrail") is True: + return False + if workflow_config.get("enforce_pillar_coverage") is False: + return False + return True + + +def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]: + if not isinstance(task, dict): + return None + + pillar_id = str(task.get("pillarId") or "").lower().strip() + title = str(task.get("title") or "").strip() + if pillar_id not in PILLAR_IDS or not title: + return None + + sanitized = dict(task) + sanitized["pillarId"] = pillar_id + sanitized["title"] = title + sanitized["description"] = str(task.get("description") or "").strip() + sanitized["priority"] = _coerce_priority(task.get("priority")) + sanitized["estimatedTime"] = max(5, int(task.get("estimatedTime") or 15)) + sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate" + sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None + sanitized["enabled"] = bool(task.get("enabled", True)) + return sanitized + + +def _build_single_task_for_missing_pillar( + user_id: str, + date: str, + pillar_id: str, + grounding: Dict[str, Any], +) -> Optional[Dict[str, Any]]: + schema = { + "type": "object", + "properties": { + "pillarId": {"type": "string"}, + "title": {"type": "string"}, + "description": {"type": "string"}, + "priority": {"type": "string"}, + "estimatedTime": {"type": "number"}, + "actionType": {"type": "string"}, + "actionUrl": {"type": "string"}, + "enabled": {"type": "boolean"}, + "metadata": {"type": "object"}, + }, + "required": ["pillarId", "title", "description", "priority", "estimatedTime", "actionType", "enabled"], + } + prompt = ( + "Generate exactly one actionable JSON task for today's workflow.\n" + f"Date: {date}\n" + f"Required pillarId: {pillar_id}\n" + "Constraints:\n" + "- Return a single JSON object only.\n" + "- Keep title concise and practical.\n" + "- Task must be completable today.\n" + "- Use actionType='navigate' and a valid ALwrity route when possible.\n" + f"User context: {json.dumps(grounding.get('onboarding_data', {}), indent=2)}\n" + ) + try: + raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id) + candidate = raw if isinstance(raw, dict) else json.loads(raw) + except Exception as e: + logger.warning(f"Failed to generate pillar backfill task for {pillar_id}: {e}") + return None + + candidate = _sanitize_task(candidate) + if candidate: + candidate["pillarId"] = pillar_id + metadata = candidate.get("metadata") if isinstance(candidate.get("metadata"), dict) else {} + metadata["source"] = "llm_pillar_backfill" + candidate["metadata"] = metadata + return candidate + + +def _ensure_pillar_coverage( + tasks: List[Dict[str, Any]], + user_id: str, + date: str, + grounding: Dict[str, Any], +) -> List[Dict[str, Any]]: + sanitized_tasks = [t for t in (_sanitize_task(task) for task in tasks) if t] + if not _is_coverage_guardrail_enabled(grounding): + return sanitized_tasks + + covered_pillars = {task["pillarId"] for task in sanitized_tasks} + fallback_by_pillar = { + task["pillarId"]: task for task in (_sanitize_task(t) for t in _fallback_tasks(date)) if task + } + + for pillar_id in PILLAR_IDS: + if pillar_id in covered_pillars: + continue + + generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding) + if generated: + sanitized_tasks.append(generated) + covered_pillars.add(pillar_id) + continue + + controlled_fallback = fallback_by_pillar.get(pillar_id) + if controlled_fallback: + metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {} + metadata["source"] = "controlled_fallback" + controlled_fallback["metadata"] = metadata + sanitized_tasks.append(controlled_fallback) + covered_pillars.add(pillar_id) + + return sanitized_tasks + + def build_grounding_context(db: Session, user_id: str, date: str) -> Dict[str, Any]: # 1. Fetch unread alerts unread_agent_alerts = ( @@ -234,8 +349,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> } }) - # Ensure we have coverage for all pillars (fill gaps with fallback/LLM if needed) - # For now, let's just return what the agents proposed + final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) return { "date": date, "tasks": final_tasks @@ -319,7 +433,11 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> tasks = result.get("tasks") if isinstance(result, dict) else None if not isinstance(tasks, list) or not tasks: - result = {"date": date, "tasks": _fallback_tasks(date)} + tasks = _fallback_tasks(date) + result = { + "date": date, + "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), + } activity.log_event( event_type="final_summary", diff --git a/backend/tests/test_today_workflow_pillar_coverage.py b/backend/tests/test_today_workflow_pillar_coverage.py new file mode 100644 index 00000000..e8fe3838 --- /dev/null +++ b/backend/tests/test_today_workflow_pillar_coverage.py @@ -0,0 +1,105 @@ +from types import SimpleNamespace + +import pytest + +from services.intelligence.agents.core_agent_framework import TaskProposal +from services import today_workflow_service as svc + + +class DummyActivity: + def __init__(self, *args, **kwargs): + pass + + def start_run(self, *args, **kwargs): + return SimpleNamespace(id="run-1") + + def log_event(self, *args, **kwargs): + return None + + def finish_run(self, *args, **kwargs): + return None + + +class DummyAgent: + def __init__(self, proposals): + self._proposals = proposals + + async def propose_daily_tasks(self, grounding): + return self._proposals + + +def _mock_orchestrator_with_agents(proposals): + return SimpleNamespace( + agents={ + "content": DummyAgent(proposals), + "seo": None, + "social": None, + "competitor": None, + } + ) + + +def _covered_pillars(result): + return {task["pillarId"] for task in result["tasks"]} + + +@pytest.mark.asyncio +async def test_generate_agent_enhanced_plan_preserves_full_committee_coverage(monkeypatch): + proposals = [ + TaskProposal("P", "desc", "plan", "high", 10, "content", "why", {}, "navigate", "/content-planning-dashboard"), + TaskProposal("G", "desc", "generate", "high", 10, "content", "why", {}, "navigate", "/blog-writer"), + TaskProposal("Pu", "desc", "publish", "high", 10, "content", "why", {}, "navigate", "/scheduler-dashboard"), + TaskProposal("A", "desc", "analyze", "high", 10, "content", "why", {}, "navigate", "/seo-dashboard"), + TaskProposal("E", "desc", "engage", "high", 10, "content", "why", {}, "navigate", "/linkedin-writer"), + TaskProposal("R", "desc", "remarket", "high", 10, "content", "why", {}, "navigate", "/facebook-writer"), + ] + + async def _get_orchestrator(user_id): + return _mock_orchestrator_with_agents(proposals) + + monkeypatch.setattr(svc, "build_grounding_context", lambda db, user_id, date: {}) + monkeypatch.setattr(svc.orchestration_service, "get_or_create_orchestrator", _get_orchestrator) + + result = await svc.generate_agent_enhanced_plan(db=None, user_id="u1", date="2026-01-01") + + assert _covered_pillars(result) == set(svc.PILLAR_IDS) + assert len(result["tasks"]) == len(proposals) + + +@pytest.mark.asyncio +async def test_generate_agent_enhanced_plan_backfills_missing_committee_pillars(monkeypatch): + proposals = [ + TaskProposal("P", "desc", "plan", "high", 10, "content", "why", {}, "navigate", "/content-planning-dashboard"), + TaskProposal("G", "desc", "generate", "high", 10, "content", "why", {}, "navigate", "/blog-writer"), + ] + + async def _get_orchestrator(user_id): + return _mock_orchestrator_with_agents(proposals) + + monkeypatch.setattr(svc, "build_grounding_context", lambda db, user_id, date: {}) + monkeypatch.setattr(svc.orchestration_service, "get_or_create_orchestrator", _get_orchestrator) + + result = await svc.generate_agent_enhanced_plan(db=None, user_id="u1", date="2026-01-01") + + assert _covered_pillars(result) == set(svc.PILLAR_IDS) + assert {"P", "G"}.issubset({task["title"] for task in result["tasks"]}) + + +@pytest.mark.asyncio +async def test_generate_agent_enhanced_plan_full_fallback_path_still_covers_all_pillars(monkeypatch): + async def _get_orchestrator(user_id): + return _mock_orchestrator_with_agents([]) + + monkeypatch.setattr(svc, "build_grounding_context", lambda db, user_id, date: {}) + monkeypatch.setattr(svc.orchestration_service, "get_or_create_orchestrator", _get_orchestrator) + monkeypatch.setattr(svc, "AgentActivityService", DummyActivity) + + def _raise_llm(*args, **kwargs): + raise RuntimeError("LLM down") + + monkeypatch.setattr(svc, "llm_text_gen", _raise_llm) + + result = await svc.generate_agent_enhanced_plan(db=None, user_id="u1", date="2026-01-01") + + assert _covered_pillars(result) == set(svc.PILLAR_IDS) + assert len(result["tasks"]) >= len(svc.PILLAR_IDS)