Files
ALwrity/backend/services/today_workflow_service.py

706 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from sqlalchemy.orm import Session
from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask
from models.agent_activity_models import AgentAlert
from services.agent_activity_service import AgentActivityService, build_agent_event_payload
from services.llm_providers.main_text_generation import llm_text_gen
from loguru import logger
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
MIN_TASK_EVIDENCE_LINKS = 1
PLAN_CONTEXT_THRESHOLD = 0.65
def _today_date_str() -> str:
return datetime.now(timezone.utc).date().isoformat()
def _coerce_priority(value: Any) -> str:
v = str(value or "medium").lower().strip()
return v if v in {"high", "medium", "low"} else "medium"
def _coerce_status(value: Any) -> str:
v = str(value or "pending").lower().strip()
if v in {"pending", "in_progress", "completed", "skipped", "dismissed"}:
return "skipped" if v == "dismissed" else v
return "pending"
def _proposal_priority_rank(priority: str) -> int:
return {"low": 0, "medium": 1, "high": 2}.get(str(priority or "").lower(), 1)
def _proposal_order_key(proposal: Any) -> tuple:
return (
str(getattr(proposal, "source_agent", "") or "").lower(),
str(getattr(proposal, "title", "") or "").lower(),
str(getattr(proposal, "description", "") or "").lower(),
str(getattr(proposal, "action_url", "") or "").lower(),
)
def _fallback_tasks(date: str) -> List[Dict[str, Any]]:
return [
{
"pillarId": "plan",
"title": "Review todays plan",
"description": "Confirm priorities and adjust the content calendar for today.",
"priority": "high",
"estimatedTime": 15,
"actionType": "navigate",
"actionUrl": "/content-planning-dashboard",
"enabled": True,
},
{
"pillarId": "generate",
"title": "Generate one core content asset",
"description": "Create a draft aligned with your current strategy and voice.",
"priority": "high",
"estimatedTime": 45,
"actionType": "navigate",
"actionUrl": "/blog-writer",
"enabled": True,
},
{
"pillarId": "publish",
"title": "Publish or schedule todays content",
"description": "Publish or schedule content across the selected channel(s).",
"priority": "medium",
"estimatedTime": 20,
"actionType": "navigate",
"actionUrl": "/content-planning-dashboard",
"enabled": True,
},
{
"pillarId": "analyze",
"title": "Check semantic health and performance",
"description": "Review semantic health metrics and key performance indicators.",
"priority": "medium",
"estimatedTime": 15,
"actionType": "navigate",
"actionUrl": "/seo-dashboard",
"enabled": True,
},
{
"pillarId": "engage",
"title": "Engage on one channel",
"description": "Respond to comments and share one post to keep momentum.",
"priority": "medium",
"estimatedTime": 15,
"actionType": "navigate",
"actionUrl": "/linkedin-writer",
"enabled": True,
},
{
"pillarId": "remarket",
"title": "Repurpose and remarket content",
"description": "Create one repurposed snippet and distribute it to increase reach.",
"priority": "low",
"estimatedTime": 20,
"actionType": "navigate",
"actionUrl": "/facebook-writer",
"enabled": True,
},
]
def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool:
workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {}
if not isinstance(workflow_config, dict):
return True
if workflow_config.get("disable_pillar_coverage_guardrail") is True:
return False
if workflow_config.get("enforce_pillar_coverage") is False:
return False
return True
def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if not isinstance(task, dict):
return None
pillar_id = str(task.get("pillarId") or "").lower().strip()
title = str(task.get("title") or "").strip()
if pillar_id not in PILLAR_IDS or not title:
return None
sanitized = dict(task)
sanitized["pillarId"] = pillar_id
sanitized["title"] = title
sanitized["description"] = str(task.get("description") or "").strip()
sanitized["priority"] = _coerce_priority(task.get("priority"))
sanitized["estimatedTime"] = max(5, int(task.get("estimatedTime") or 15))
sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate"
sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None
sanitized["enabled"] = bool(task.get("enabled", True))
return sanitized
def _derive_onboarding_evidence_links(onboarding_data: Dict[str, Any], limit: int = 2) -> List[str]:
if not isinstance(onboarding_data, dict):
return []
links: List[str] = []
for key, value in onboarding_data.items():
if key == "workflow_config":
continue
if value in (None, "", [], {}):
continue
links.append(f"onboarding:{key}")
if len(links) >= limit:
break
return links
def _valid_evidence_links(evidence_links: Any, grounding: Dict[str, Any]) -> List[str]:
if not isinstance(evidence_links, list):
return []
onboarding_data = grounding.get("onboarding_data", {}) if isinstance(grounding, dict) else {}
if not isinstance(onboarding_data, dict):
onboarding_data = {}
valid_onboarding_keys = {str(k) for k in onboarding_data.keys()}
recent_alerts = grounding.get("recent_agent_alerts", []) if isinstance(grounding, dict) else []
valid_alert_ids = {
str(a.get("alert_id"))
for a in recent_alerts
if isinstance(a, dict) and a.get("alert_id") is not None
}
valid_links: List[str] = []
for raw in evidence_links:
link = str(raw or "").strip()
if not link:
continue
if link.startswith("onboarding:"):
key = link.split(":", 1)[1].strip()
if key and key in valid_onboarding_keys:
valid_links.append(link)
elif link.startswith("alert:"):
alert_id = link.split(":", 1)[1].strip()
if alert_id and alert_id in valid_alert_ids:
valid_links.append(link)
return valid_links
def validate_plan_contextuality(plan: Dict[str, Any], grounding: Dict[str, Any]) -> Dict[str, Any]:
tasks = plan.get("tasks") if isinstance(plan, dict) else None
if not isinstance(tasks, list) or not tasks:
return {
"score": 0.0,
"threshold": PLAN_CONTEXT_THRESHOLD,
"is_contextual": False,
"task_scores": [],
"tasks_below_min_evidence": 0,
"min_evidence_links": MIN_TASK_EVIDENCE_LINKS,
}
task_scores = []
below_min_evidence = 0
for idx, task in enumerate(tasks):
metadata = task.get("metadata") if isinstance(task, dict) else {}
metadata = metadata if isinstance(metadata, dict) else {}
evidence_links = _valid_evidence_links(metadata.get("evidence_links"), grounding)
has_min_evidence = len(evidence_links) >= MIN_TASK_EVIDENCE_LINKS
if not has_min_evidence:
below_min_evidence += 1
reasoning_text = str(metadata.get("reasoning") or task.get("description") or "").lower()
onboarding_hits = sum(1 for l in evidence_links if l.startswith("onboarding:"))
alert_hits = sum(1 for l in evidence_links if l.startswith("alert:"))
score = 0.0
if has_min_evidence:
score += 0.6
if onboarding_hits > 0:
score += 0.2
if alert_hits > 0:
score += 0.2
elif "alert" in reasoning_text:
score += 0.1
task_scores.append(
{
"task_index": idx,
"pillarId": task.get("pillarId"),
"title": task.get("title"),
"score": min(score, 1.0),
"evidence_links": evidence_links,
"has_min_evidence": has_min_evidence,
}
)
plan_score = sum(t["score"] for t in task_scores) / len(task_scores)
is_contextual = plan_score >= PLAN_CONTEXT_THRESHOLD and below_min_evidence == 0
return {
"score": round(plan_score, 3),
"threshold": PLAN_CONTEXT_THRESHOLD,
"is_contextual": is_contextual,
"task_scores": task_scores,
"tasks_below_min_evidence": below_min_evidence,
"min_evidence_links": MIN_TASK_EVIDENCE_LINKS,
}
def _build_single_task_for_missing_pillar(
user_id: str,
date: str,
pillar_id: str,
grounding: Dict[str, Any],
) -> Optional[Dict[str, Any]]:
schema = {
"type": "object",
"properties": {
"pillarId": {"type": "string"},
"title": {"type": "string"},
"description": {"type": "string"},
"priority": {"type": "string"},
"estimatedTime": {"type": "number"},
"actionType": {"type": "string"},
"actionUrl": {"type": "string"},
"enabled": {"type": "boolean"},
"metadata": {"type": "object"},
},
"required": ["pillarId", "title", "description", "priority", "estimatedTime", "actionType", "enabled"],
}
prompt = (
"Generate exactly one actionable JSON task for today's workflow.\n"
f"Date: {date}\n"
f"Required pillarId: {pillar_id}\n"
"Constraints:\n"
"- Return a single JSON object only.\n"
"- Keep title concise and practical.\n"
"- Task must be completable today.\n"
"- Use actionType='navigate' and a valid ALwrity route when possible.\n"
f"User context: {json.dumps(grounding.get('onboarding_data', {}), indent=2)}\n"
)
try:
raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id)
candidate = raw if isinstance(raw, dict) else json.loads(raw)
except Exception as e:
logger.warning(f"Failed to generate pillar backfill task for {pillar_id}: {e}")
return None
candidate = _sanitize_task(candidate)
if candidate:
candidate["pillarId"] = pillar_id
metadata = candidate.get("metadata") if isinstance(candidate.get("metadata"), dict) else {}
metadata["source"] = "llm_pillar_backfill"
candidate["metadata"] = metadata
return candidate
def _ensure_pillar_coverage(
tasks: List[Dict[str, Any]],
user_id: str,
date: str,
grounding: Dict[str, Any],
) -> List[Dict[str, Any]]:
sanitized_tasks = [t for t in (_sanitize_task(task) for task in tasks) if t]
if not _is_coverage_guardrail_enabled(grounding):
return sanitized_tasks
covered_pillars = {task["pillarId"] for task in sanitized_tasks}
fallback_by_pillar = {
task["pillarId"]: task for task in (_sanitize_task(t) for t in _fallback_tasks(date)) if task
}
for pillar_id in PILLAR_IDS:
if pillar_id in covered_pillars:
continue
generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding)
if generated:
sanitized_tasks.append(generated)
covered_pillars.add(pillar_id)
continue
controlled_fallback = fallback_by_pillar.get(pillar_id)
if controlled_fallback:
metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {}
metadata["source"] = "controlled_fallback"
controlled_fallback["metadata"] = metadata
sanitized_tasks.append(controlled_fallback)
covered_pillars.add(pillar_id)
return sanitized_tasks
def build_grounding_context(db: Session, user_id: str, date: str) -> Dict[str, Any]:
# 1. Fetch unread alerts
unread_agent_alerts = (
db.query(AgentAlert)
.filter(AgentAlert.user_id == user_id, AgentAlert.read_at.is_(None))
.order_by(AgentAlert.created_at.desc())
.limit(10)
.all()
)
# 2. Fetch comprehensive onboarding data (SIF)
onboarding_context = {}
try:
from api.content_planning.services.content_strategy.onboarding.data_integration import OnboardingDataIntegrationService
svc = OnboardingDataIntegrationService()
integrated = svc.get_integrated_data_sync(user_id, db) or {}
# Populate key sections
onboarding_context = integrated
except Exception as e:
logger.warning(f"Failed to load full onboarding data for context: {e}")
# Ensure workflow_config exists
if "workflow_config" not in onboarding_context:
onboarding_context["workflow_config"] = {}
return {
"recent_agent_alerts": [
{
"alert_id": a.id,
"title": a.title,
"message": a.message,
"created_at": a.created_at.isoformat(),
"alert_type": a.alert_type,
}
for a in unread_agent_alerts
],
"onboarding_data": onboarding_context,
"workflow_config": onboarding_context.get("workflow_config", {})
}
import asyncio
from services.intelligence.agents.agent_orchestrator import AgentOrchestrationService
from services.task_memory_service import TaskMemoryService
# Initialize orchestration service (singleton)
orchestration_service = AgentOrchestrationService()
async def generate_agent_enhanced_plan(
db: Session,
user_id: str,
date: str,
grounding: Optional[Dict[str, Any]] = None,
strict_contextuality: bool = False,
) -> Dict[str, Any]:
activity = AgentActivityService(db, user_id)
grounding = grounding or build_grounding_context(db, user_id, date)
memory_service = TaskMemoryService(user_id, db)
# 1. Get Orchestrator
try:
orchestrator = await orchestration_service.get_or_create_orchestrator(user_id)
except Exception as e:
logger.error(f"Failed to get orchestrator: {e}")
return {"date": date, "tasks": _fallback_tasks(date)}
# 2. Parallel "Committee" Proposal Gathering
logger.info(f"Gathering daily task proposals from agent committee for user {user_id}")
agent_tasks = []
try:
# Define agents to poll
agents_to_poll = [
orchestrator.agents.get('content'), # ContentStrategyAgent
orchestrator.agents.get('strategy'), # StrategyArchitectAgent
orchestrator.agents.get('seo'), # SEOOptimizationAgent
orchestrator.agents.get('social'), # SocialAmplificationAgent
orchestrator.agents.get('competitor'), # CompetitorResponseAgent
]
# Filter out None agents (disabled/failed init)
active_agents = [a for a in agents_to_poll if a]
# Execute propose_daily_tasks in parallel
results = await asyncio.gather(
*[a.propose_daily_tasks(grounding) for a in active_agents],
return_exceptions=True
)
# Collect successful proposals
raw_proposals = []
for res in results:
if isinstance(res, list):
raw_proposals.extend(res)
elif isinstance(res, Exception):
logger.warning(f"Agent proposal failed: {res}")
# 3. Filter Redundant Proposals (Self-Learning)
# Note: We need to ensure we don't filter out essential recurring tasks if they were completed long ago
# But for now, we filter exact duplicates from recent history (last 7 days)
# We can implement semantic filtering later
# Simple deduplication based on title+pillar
unique_map = {}
for p in raw_proposals:
key = f"{p.pillar_id}:{p.title}"
if key not in unique_map:
unique_map[key] = p
continue
existing = unique_map[key]
if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority):
unique_map[key] = p
continue
# Deterministic tie-breaker for equal priority proposals.
if (
_proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority)
and _proposal_order_key(p) < _proposal_order_key(existing)
):
unique_map[key] = p
agent_tasks = list(unique_map.values())
# Phase 3: Check memory for rejections (Semantic Filter)
agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks)
except Exception as e:
logger.error(f"Committee proposal phase failed: {e}")
# Continue to fallback or LLM generation if committee fails
# 4. Final Selection
# If we have agent tasks, use them. Otherwise fall back to LLM generation.
if agent_tasks and not strict_contextuality:
logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee")
# Convert TaskProposal objects to dicts for frontend
final_tasks = []
for prop in agent_tasks:
final_tasks.append({
"pillarId": prop.pillar_id,
"title": prop.title,
"description": prop.description,
"priority": prop.priority,
"estimatedTime": prop.estimated_time,
"actionType": prop.action_type,
"actionUrl": prop.action_url,
"enabled": True,
"metadata": {
"source_agent": prop.source_agent,
"reasoning": prop.reasoning,
"context_data": prop.context_data,
"evidence_links": _derive_onboarding_evidence_links(grounding.get("onboarding_data", {}), limit=2),
}
})
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
return {
"date": date,
"tasks": final_tasks
}
# Fallback to original LLM generation if agents returned nothing
logger.info("Agent committee returned no tasks, falling back to LLM generation")
schema = {
"type": "object",
"properties": {
"date": {"type": "string"},
"tasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"pillarId": {"type": "string"},
"title": {"type": "string"},
"description": {"type": "string"},
"priority": {"type": "string"},
"estimatedTime": {"type": "number"},
"actionType": {"type": "string"},
"actionUrl": {"type": "string"},
"enabled": {"type": "boolean"},
"dependencies": {"type": "array", "items": {"type": "string"}},
"metadata": {"type": "object"},
},
},
},
},
}
prompt = (
"Generate a personalized Today workflow plan for ALwrity with exactly 6 lifecycle pillars: "
"plan, generate, publish, analyze, engage, remarket.\n\n"
"User Context (Onboarding & Strategy):\n"
f"{json.dumps(grounding.get('onboarding_data', {}), indent=2)}\n\n"
"Rules:\n"
"- Produce JSON only that matches the schema.\n"
"- Include 1-3 tasks per pillar.\n"
"- Each task must have pillarId in {plan, generate, publish, analyze, engage, remarket}.\n"
"- Customize tasks based on the user's industry, business type, and content pillars found in User Context.\n"
"- If competitors are listed, include a task to analyze one of them.\n"
"- Prefer actionable tasks that can be completed today.\n"
"- Use these common actionUrl routes when relevant: "
"/content-planning-dashboard, /blog-writer, /linkedin-writer, /facebook-writer, /seo-dashboard, /scheduler-dashboard.\n"
"- Keep descriptions concise.\n\n"
f"Grounding context (Alerts):\n{json.dumps(grounding.get('recent_agent_alerts', []), indent=2)}\n"
)
if strict_contextuality:
prompt += (
"\nStrict contextuality mode (must follow):\n"
f"- Every task.metadata must include evidence_links with at least {MIN_TASK_EVIDENCE_LINKS} entries.\n"
"- evidence_links entries must use either 'onboarding:<field_name>' or 'alert:<alert_id>' format.\n"
"- Include metadata.reasoning that explains how the evidence applies to the task.\n"
"- Reject generic tasks without explicit ties to onboarding data or active alerts.\n"
)
run = activity.start_run(agent_type="TodayWorkflowGenerator", prompt=prompt[:4000])
activity.log_event(
event_type="plan",
severity="info",
message="Building grounded daily workflow plan",
payload=build_agent_event_payload(phase="planning", step="build_grounded_plan", tool_name="llm_text_gen", progress_percent=10, input_summary="Grounding data assembled from onboarding + alerts", output_summary="Preparing daily workflow generation", decision_reason="Need context-aware workflow", evidence_refs=["onboarding_data","recent_agent_alerts"], safe_debug=True, metadata={"grounding": grounding}),
run_id=run.id,
agent_type="TodayWorkflowGenerator",
)
try:
raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id)
if isinstance(raw, dict):
result = raw
else:
try:
result = json.loads(raw)
except Exception:
result = {"date": date, "tasks": _fallback_tasks(date)}
except Exception as e:
activity.log_event(
event_type="warning",
severity="warning",
message=str(e)[:2000],
payload=build_agent_event_payload(phase="generation", step="llm_failed_fallback", tool_name="llm_text_gen", progress_percent=70, output_summary="LLM generation failed, using fallback tasks", decision_reason="Exception during workflow generation", safe_debug=False, metadata={"fallback": True}),
run_id=run.id,
agent_type="TodayWorkflowGenerator",
)
result = {"date": date, "tasks": _fallback_tasks(date)}
tasks = result.get("tasks") if isinstance(result, dict) else None
if not isinstance(tasks, list) or not tasks:
tasks = _fallback_tasks(date)
result = {
"date": date,
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
}
activity.log_event(
event_type="final_summary",
severity="info",
message="Daily workflow plan generated",
payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", []))}),
run_id=run.id,
agent_type="TodayWorkflowGenerator",
)
activity.finish_run(run.id, success=True, result_summary=json.dumps({"date": date, "tasks": result.get("tasks", [])})[:4000])
return result
async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]:
from starlette.concurrency import run_in_threadpool
date_str = date or _today_date_str()
def _get_existing():
return (
db.query(DailyWorkflowPlan)
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str)
.first()
)
existing = await run_in_threadpool(_get_existing)
if existing:
return existing, False
grounding = build_grounding_context(db, user_id, date_str)
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str, grounding=grounding)
validation = validate_plan_contextuality(plan_data, grounding)
if not validation.get("is_contextual"):
logger.info("Plan contextuality below threshold for user {}. Running strict regeneration.", user_id)
regenerated_plan = await generate_agent_enhanced_plan(
db,
user_id,
date_str,
grounding=grounding,
strict_contextuality=True,
)
regenerated_validation = validate_plan_contextuality(regenerated_plan, grounding)
plan_data = regenerated_plan
validation = regenerated_validation
plan_data["quality_status"] = "contextual" if validation.get("is_contextual") else "low_context"
plan_data["contextuality_validation"] = validation
tasks = plan_data.get("tasks", [])
def _create_plan():
plan = DailyWorkflowPlan(
user_id=user_id,
date=date_str,
source="agent",
plan_json=plan_data,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
db.add(plan)
db.commit()
db.refresh(plan)
for t in tasks:
pillar_id = str(t.get("pillarId") or "").lower().strip()
if pillar_id not in PILLAR_IDS:
continue
task = DailyWorkflowTask(
plan_id=plan.id,
user_id=user_id,
pillar_id=pillar_id,
title=str(t.get("title") or "Task").strip()[:255],
description=str(t.get("description") or "").strip(),
status=_coerce_status(t.get("status")),
priority=_coerce_priority(t.get("priority")),
estimated_time=int(t.get("estimatedTime") or 15),
action_type=str(t.get("actionType") or "navigate").strip()[:20],
action_url=str(t.get("actionUrl") or "").strip(),
dependencies=json.dumps(t.get("dependencies") or []),
metadata_json=t.get("metadata") or {},
enabled=bool(t.get("enabled", True)),
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
db.add(task)
db.commit()
return plan
plan = await run_in_threadpool(_create_plan)
return plan, True
def update_task_status(
db: Session,
user_id: str,
task_id: int,
status: str,
completion_notes: Optional[str] = None,
) -> Optional[DailyWorkflowTask]:
task = db.query(DailyWorkflowTask).filter(DailyWorkflowTask.id == task_id, DailyWorkflowTask.user_id == user_id).first()
if not task:
return None
task.status = _coerce_status(status)
task.decided_at = datetime.utcnow()
if completion_notes is not None:
task.completion_notes = completion_notes[:4000]
db.add(task)
db.commit()
db.refresh(task)
return task