import json from datetime import datetime, timezone from typing import Any, Dict, List, Optional from sqlalchemy.orm import Session from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask from models.agent_activity_models import AgentAlert from services.agent_activity_service import AgentActivityService, build_agent_event_payload from services.llm_providers.main_text_generation import llm_text_gen from loguru import logger PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] def _today_date_str() -> str: return datetime.now(timezone.utc).date().isoformat() def _coerce_priority(value: Any) -> str: v = str(value or "medium").lower().strip() return v if v in {"high", "medium", "low"} else "medium" def _coerce_status(value: Any) -> str: v = str(value or "pending").lower().strip() if v in {"pending", "in_progress", "completed", "skipped", "dismissed"}: # Canonicalize 'dismissed' to 'skipped' for consistency return "skipped" if v == "dismissed" else v return "pending" def coerce_dependencies(value: Any) -> List[str]: if isinstance(value, list): return [str(dep).strip() for dep in value if str(dep).strip()] if isinstance(value, str): raw = value.strip() if not raw: return [] try: parsed = json.loads(raw) if isinstance(parsed, list): return [str(dep).strip() for dep in parsed if str(dep).strip()] except Exception: pass return [raw] return [] def _proposal_priority_rank(priority: str) -> int: return {"low": 0, "medium": 1, "high": 2}.get(str(priority or "").lower(), 1) def _proposal_order_key(proposal: Any) -> tuple: # Handle both object and dict access for compatibility if isinstance(proposal, dict): return ( str(proposal.get("source_agent") or "").lower(), str(proposal.get("title") or "").lower(), str(proposal.get("description") or "").lower(), str(proposal.get("action_url") or "").lower(), ) return ( str(getattr(proposal, "source_agent", "") or "").lower(), str(getattr(proposal, "title", "") or "").lower(), str(getattr(proposal, "description", "") or "").lower(), str(getattr(proposal, "action_url", "") or "").lower(), ) def _get_agent_proposal_timeout_seconds(grounding: Dict[str, Any]) -> float: workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} if not isinstance(workflow_config, dict): return 4.0 raw_timeout = workflow_config.get("agent_proposal_timeout_seconds", 4.0) try: timeout_seconds = float(raw_timeout) except (TypeError, ValueError): return 4.0 return max(1.0, timeout_seconds) def _fallback_tasks(date: str) -> List[Dict[str, Any]]: return [ { "pillarId": "plan", "title": "Review today’s plan", "description": "Confirm priorities and adjust the content calendar for today.", "priority": "high", "estimatedTime": 15, "actionType": "navigate", "actionUrl": "/content-planning-dashboard", "enabled": True, }, { "pillarId": "generate", "title": "Generate one core content asset", "description": "Create a draft aligned with your current strategy and voice.", "priority": "high", "estimatedTime": 45, "actionType": "navigate", "actionUrl": "/blog-writer", "enabled": True, }, { "pillarId": "publish", "title": "Publish or schedule today’s content", "description": "Publish or schedule content across the selected channel(s).", "priority": "medium", "estimatedTime": 20, "actionType": "navigate", "actionUrl": "/content-planning-dashboard", "enabled": True, }, { "pillarId": "analyze", "title": "Check semantic health and performance", "description": "Review semantic health metrics and key performance indicators.", "priority": "medium", "estimatedTime": 15, "actionType": "navigate", "actionUrl": "/seo-dashboard", "enabled": True, }, { "pillarId": "engage", "title": "Engage on one channel", "description": "Respond to comments and share one post to keep momentum.", "priority": "medium", "estimatedTime": 15, "actionType": "navigate", "actionUrl": "/linkedin-writer", "enabled": True, }, { "pillarId": "remarket", "title": "Repurpose and remarket content", "description": "Create one repurposed snippet and distribute it to increase reach.", "priority": "low", "estimatedTime": 20, "actionType": "navigate", "actionUrl": "/facebook-writer", "enabled": True, }, ] def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool: workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} if not isinstance(workflow_config, dict): return True if workflow_config.get("disable_pillar_coverage_guardrail") is True: return False if workflow_config.get("enforce_pillar_coverage") is False: return False return True def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]: if not isinstance(task, dict): return None pillar_id = str(task.get("pillarId") or "").lower().strip() title = str(task.get("title") or "").strip() if pillar_id not in PILLAR_IDS or not title: return None sanitized = dict(task) sanitized["pillarId"] = pillar_id sanitized["title"] = title sanitized["description"] = str(task.get("description") or "").strip() sanitized["priority"] = _coerce_priority(task.get("priority")) sanitized["estimatedTime"] = max(5, int(task.get("estimatedTime") or 15)) sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate" sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None sanitized["enabled"] = bool(task.get("enabled", True)) return sanitized def _build_single_task_for_missing_pillar( user_id: str, date: str, pillar_id: str, grounding: Dict[str, Any], ) -> Optional[Dict[str, Any]]: schema = { "type": "object", "properties": { "pillarId": {"type": "string"}, "title": {"type": "string"}, "description": {"type": "string"}, "priority": {"type": "string"}, "estimatedTime": {"type": "number"}, "actionType": {"type": "string"}, "actionUrl": {"type": "string"}, "enabled": {"type": "boolean"}, "metadata": {"type": "object"}, }, "required": ["pillarId", "title", "description", "priority", "estimatedTime", "actionType", "enabled"], } prompt = ( "Generate exactly one actionable JSON task for today's workflow.\n" f"Date: {date}\n" f"Required pillarId: {pillar_id}\n" "Constraints:\n" "- Return a single JSON object only.\n" "- Keep title concise and practical.\n" "- Task must be completable today.\n" "- Use actionType='navigate' and a valid ALwrity route when possible.\n" f"User context: {json.dumps(grounding.get('onboarding_data', {}), indent=2)}\n" ) try: raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id) candidate = raw if isinstance(raw, dict) else json.loads(raw) except Exception as e: logger.warning(f"Failed to generate pillar backfill task for {pillar_id}: {e}") return None candidate = _sanitize_task(candidate) if candidate: candidate["pillarId"] = pillar_id metadata = candidate.get("metadata") if isinstance(candidate.get("metadata"), dict) else {} metadata["source"] = "llm_pillar_backfill" candidate["metadata"] = metadata return candidate def _ensure_pillar_coverage( tasks: List[Dict[str, Any]], user_id: str, date: str, grounding: Dict[str, Any], ) -> List[Dict[str, Any]]: sanitized_tasks = [t for t in (_sanitize_task(task) for task in tasks) if t] if not _is_coverage_guardrail_enabled(grounding): return sanitized_tasks covered_pillars = {task["pillarId"] for task in sanitized_tasks} fallback_by_pillar = { task["pillarId"]: task for task in (_sanitize_task(t) for t in _fallback_tasks(date)) if task } for pillar_id in PILLAR_IDS: if pillar_id in covered_pillars: continue generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding) if generated: sanitized_tasks.append(generated) covered_pillars.add(pillar_id) continue controlled_fallback = fallback_by_pillar.get(pillar_id) if controlled_fallback: metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {} metadata["source"] = "controlled_fallback" controlled_fallback["metadata"] = metadata sanitized_tasks.append(controlled_fallback) covered_pillars.add(pillar_id) return sanitized_tasks def build_grounding_context(db: Session, user_id: str, date: str) -> Dict[str, Any]: # 1. Fetch unread alerts unread_agent_alerts = ( db.query(AgentAlert) .filter(AgentAlert.user_id == user_id, AgentAlert.read_at.is_(None)) .order_by(AgentAlert.created_at.desc()) .limit(10) .all() ) # 2. Fetch comprehensive onboarding data (SIF) onboarding_context = {} try: from api.content_planning.services.content_strategy.onboarding.data_integration import OnboardingDataIntegrationService svc = OnboardingDataIntegrationService() integrated = svc.get_integrated_data_sync(user_id, db) or {} # Populate key sections onboarding_context = integrated except Exception as e: logger.warning(f"Failed to load full onboarding data for context: {e}") # Ensure workflow_config exists if "workflow_config" not in onboarding_context: onboarding_context["workflow_config"] = {} return { "recent_agent_alerts": [ { "title": a.title, "message": a.message, "created_at": a.created_at.isoformat(), "alert_type": a.alert_type, } for a in unread_agent_alerts ], "onboarding_data": onboarding_context, "workflow_config": onboarding_context.get("workflow_config", {}) } import asyncio from services.intelligence.agents.agent_orchestrator import AgentOrchestrationService from services.task_memory_service import TaskMemoryService # Initialize orchestration service (singleton) orchestration_service = AgentOrchestrationService() async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> Dict[str, Any]: activity = AgentActivityService(db, user_id) grounding = build_grounding_context(db, user_id, date) memory_service = TaskMemoryService(user_id, db) # 1. Get Orchestrator try: orchestrator = await orchestration_service.get_or_create_orchestrator(user_id) except Exception as e: logger.error(f"Failed to get orchestrator: {e}") return {"date": date, "tasks": _fallback_tasks(date)} # 2. Parallel "Committee" Proposal Gathering logger.info(f"Gathering daily task proposals from agent committee for user {user_id}") agent_tasks = [] committee_total_failure = False try: agent_timeout_seconds = _get_agent_proposal_timeout_seconds(grounding) # Define agents to poll (keyed for logging/metrics) agents_to_poll = { "content": orchestrator.agents.get('content'), "strategy": orchestrator.agents.get('strategy'), "seo": orchestrator.agents.get('seo'), "social": orchestrator.agents.get('social'), "competitor": orchestrator.agents.get('competitor'), } # Filter out None agents (disabled/failed init) active_agents = {key: agent for key, agent in agents_to_poll.items() if agent} async def _collect_agent_proposals(agent_key: str, agent: Any) -> Dict[str, Any]: started_at = datetime.now(timezone.utc) try: proposals = await asyncio.wait_for(agent.propose_daily_tasks(grounding), timeout=agent_timeout_seconds) elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000 return { "agent_key": agent_key, "status": "ok", "elapsed_ms": elapsed_ms, "proposals": proposals if isinstance(proposals, list) else [], } except asyncio.TimeoutError: elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000 return { "agent_key": agent_key, "status": "timeout", "elapsed_ms": elapsed_ms, "proposals": [], "error": f"Timed out after {agent_timeout_seconds:.2f}s", } except Exception as agent_error: elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000 return { "agent_key": agent_key, "status": "error", "elapsed_ms": elapsed_ms, "proposals": [], "error": str(agent_error), } # Execute propose_daily_tasks in parallel with per-agent timeout committee_results = await asyncio.gather( *[_collect_agent_proposals(agent_key, agent) for agent_key, agent in active_agents.items()] ) successful_agent_count = 0 raw_proposals = [] for res in committee_results: agent_key = res.get("agent_key") status = res.get("status") elapsed_ms = res.get("elapsed_ms") logger.info( "Agent committee proposal metric | agent={} status={} elapsed_ms={:.2f} timeout_s={:.2f} proposal_count={}", agent_key, status, elapsed_ms, agent_timeout_seconds, len(res.get("proposals") or []), ) if status == "ok": successful_agent_count += 1 raw_proposals.extend(res.get("proposals") or []) elif status == "timeout": logger.warning(f"Agent proposal timed out for {agent_key}: {res.get('error')}") else: logger.warning(f"Agent proposal failed for {agent_key}: {res.get('error')}") committee_total_failure = successful_agent_count == 0 # 3. Filter Redundant Proposals (Self-Learning) # Note: We need to ensure we don't filter out essential recurring tasks if they were completed long ago # But for now, we filter exact duplicates from recent history (last 7 days) # We can implement semantic filtering later # Simple deduplication based on title+pillar unique_map = {} for p in raw_proposals: # Normalize pillar IDs and filter invalid proposals (re-apply from PR #383) pillar_id = str(p.get("pillarId") or "").lower().strip() if pillar_id not in PILLAR_IDS: logger.warning(f"Skipping proposal with invalid pillarId: {pillar_id}. Proposal: {p}") continue p["pillarId"] = pillar_id key = f"{p.get('pillarId')}:{p.get('title')}" if key not in unique_map: unique_map[key] = p continue existing = unique_map[key] if _proposal_priority_rank(p.get('priority')) > _proposal_priority_rank(existing.get('priority')): unique_map[key] = p continue # Deterministic tie-breaker for equal priority proposals. if ( _proposal_priority_rank(p.get('priority')) == _proposal_priority_rank(existing.get('priority')) and _proposal_order_key(p) < _proposal_order_key(existing) ): unique_map[key] = p agent_tasks = list(unique_map.values()) # Phase 3: Check memory for rejections (Semantic Filter) agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks) except Exception as e: logger.error(f"Committee proposal phase failed: {e}") committee_total_failure = True # Continue to fallback or LLM generation if committee fails # 4. Final Selection # Use committee outcomes whenever committee partially succeeds, even with sparse proposals. if not committee_total_failure: logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee") # Convert TaskProposal objects to dicts for frontend final_tasks = [] for prop in agent_tasks: # Handle both object and dict types if isinstance(prop, dict): final_tasks.append({ "pillarId": prop.get("pillarId"), "title": prop.get("title"), "description": prop.get("description"), "priority": prop.get("priority"), "estimatedTime": prop.get("estimatedTime"), "actionType": prop.get("actionType"), "actionUrl": prop.get("actionUrl"), "enabled": True, "metadata": { "source_agent": prop.get("source_agent"), "reasoning": prop.get("reasoning"), "context_data": prop.get("context_data") } }) else: final_tasks.append({ "pillarId": prop.pillar_id, "title": prop.title, "description": prop.description, "priority": prop.priority, "estimatedTime": prop.estimated_time, "actionType": prop.action_type, "actionUrl": prop.action_url, "enabled": True, "metadata": { "source_agent": prop.source_agent, "reasoning": prop.reasoning, "context_data": prop.context_data } }) final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) return { "date": date, "tasks": final_tasks } # Fallback to original LLM generation if agents returned nothing logger.info("Agent committee returned no tasks, falling back to LLM generation") schema = { "type": "object", "properties": { "date": {"type": "string"}, "tasks": { "type": "array", "items": { "type": "object", "properties": { "pillarId": {"type": "string"}, "title": {"type": "string"}, "description": {"type": "string"}, "priority": {"type": "string"}, "estimatedTime": {"type": "number"}, "actionType": {"type": "string"}, "actionUrl": {"type": "string"}, "enabled": {"type": "boolean"}, "dependencies": {"type": "array", "items": {"type": "string"}}, "metadata": {"type": "object"}, }, }, }, }, } prompt = ( "Generate a personalized Today workflow plan for ALwrity with exactly 6 lifecycle pillars: " "plan, generate, publish, analyze, engage, remarket.\n\n" "User Context (Onboarding & Strategy):\n" f"{json.dumps(grounding.get('onboarding_data', {}), indent=2)}\n\n" "Rules:\n" "- Produce JSON only that matches the schema.\n" "- Include 1-3 tasks per pillar.\n" "- Each task must have pillarId in {plan, generate, publish, analyze, engage, remarket}.\n" "- Customize tasks based on the user's industry, business type, and content pillars found in User Context.\n" "- If competitors are listed, include a task to analyze one of them.\n" "- Prefer actionable tasks that can be completed today.\n" "- Use these common actionUrl routes when relevant: " "/content-planning-dashboard, /blog-writer, /linkedin-writer, /facebook-writer, /seo-dashboard, /scheduler-dashboard.\n" "- Keep descriptions concise.\n\n" f"Grounding context (Alerts):\n{json.dumps(grounding.get('recent_agent_alerts', []), indent=2)}\n" ) run = activity.start_run(agent_type="TodayWorkflowGenerator", prompt=prompt[:4000]) activity.log_event( event_type="plan", severity="info", message="Building grounded daily workflow plan", payload=build_agent_event_payload(phase="planning", step="build_grounded_plan", tool_name="llm_text_gen", progress_percent=10, input_summary="Grounding data assembled from onboarding + alerts", output_summary="Preparing daily workflow generation", decision_reason="Need context-aware workflow", evidence_refs=["onboarding_data","recent_agent_alerts"], safe_debug=True, metadata={"grounding": grounding}), run_id=run.id, agent_type="TodayWorkflowGenerator", ) try: raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id) if isinstance(raw, dict): result = raw else: try: result = json.loads(raw) except Exception: result = {"date": date, "tasks": _fallback_tasks(date)} except Exception as e: activity.log_event( event_type="warning", severity="warning", message=str(e)[:2000], payload=build_agent_event_payload(phase="generation", step="llm_failed_fallback", tool_name="llm_text_gen", progress_percent=70, output_summary="LLM generation failed, using fallback tasks", decision_reason="Exception during workflow generation", safe_debug=False, metadata={"fallback": True}), run_id=run.id, agent_type="TodayWorkflowGenerator", ) result = {"date": date, "tasks": _fallback_tasks(date)} tasks = result.get("tasks") if isinstance(result, dict) else None if not isinstance(tasks, list) or not tasks: tasks = _fallback_tasks(date) result = { "date": date, "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), } activity.log_event( event_type="final_summary", severity="info", message="Daily workflow plan generated", payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", []))}), run_id=run.id, agent_type="TodayWorkflowGenerator", ) activity.finish_run(run.id, success=True, result_summary=json.dumps({"date": date, "tasks": result.get("tasks", [])})[:4000]) return result async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]: from starlette.concurrency import run_in_threadpool date_str = date or _today_date_str() def _get_existing(): return ( db.query(DailyWorkflowPlan) .filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str) .first() ) existing = await run_in_threadpool(_get_existing) if existing: return existing, False plan_data = await generate_agent_enhanced_plan(db, user_id, date_str) tasks = plan_data.get("tasks", []) def _create_plan(): plan = DailyWorkflowPlan( user_id=user_id, date=date_str, source="agent", plan_json=plan_data, created_at=datetime.utcnow(), updated_at=datetime.utcnow(), ) db.add(plan) db.commit() db.refresh(plan) for t in tasks: pillar_id = str(t.get("pillarId") or "").lower().strip() if pillar_id not in PILLAR_IDS: continue task = DailyWorkflowTask( plan_id=plan.id, user_id=user_id, pillar_id=pillar_id, title=str(t.get("title") or "Task").strip()[:255], description=str(t.get("description") or "").strip(), status=_coerce_status(t.get("status")), priority=_coerce_priority(t.get("priority")), estimated_time=int(t.get("estimatedTime") or 15), action_type=str(t.get("actionType") or "navigate").strip()[:20], action_url=str(t.get("actionUrl") or "").strip(), dependencies=coerce_dependencies(t.get("dependencies")), metadata_json=t.get("metadata") or {}, enabled=bool(t.get("enabled", True)), created_at=datetime.utcnow(), updated_at=datetime.utcnow(), ) db.add(task) db.commit() return plan plan = await run_in_threadpool(_create_plan) return plan, True def update_task_status( db: Session, user_id: str, task_id: int, status: str, completion_notes: Optional[str] = None, ) -> Optional[DailyWorkflowTask]: task = db.query(DailyWorkflowTask).filter(DailyWorkflowTask.id == task_id, DailyWorkflowTask.user_id == user_id).first() if not task: return None task.status = _coerce_status(status) task.decided_at = datetime.utcnow() if completion_notes is not None: task.completion_notes = completion_notes[:4000] db.add(task) db.commit() db.refresh(task) return task