Merge branch 'main' into codex/implement-central-visibility-for-seo-onboarding-tasks

This commit is contained in:
ي
2026-03-08 23:13:08 +05:30
committed by GitHub
13 changed files with 557 additions and 194 deletions

View File

@@ -1,9 +1,13 @@
from fastapi import APIRouter, Depends, HTTPException
from typing import Any, Dict, Optional
from datetime import datetime
import json
from enum import Enum
from loguru import logger
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from middleware.auth_middleware import get_current_user
from services.database import get_db
@@ -15,6 +19,37 @@ from services.intelligence.txtai_service import TxtaiIntelligenceService
router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"])
def _normalize_dependencies(dependencies: Any) -> list:
if dependencies is None:
return []
if isinstance(dependencies, list):
return dependencies
if isinstance(dependencies, str):
try:
parsed = json.loads(dependencies)
return parsed if isinstance(parsed, list) else []
except json.JSONDecodeError:
return []
return []
class TaskStatusEnum(str, Enum):
pending = "pending"
in_progress = "in_progress"
completed = "completed"
skipped = "skipped"
dismissed = "dismissed"
class TaskStatusUpdateRequest(BaseModel):
status: TaskStatusEnum = Field(..., description="New task status")
completion_notes: Optional[str] = Field(
None,
max_length=4000,
description="Optional notes about task completion or outcome",
)
async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str):
svc = TxtaiIntelligenceService(user_id)
items = []
@@ -73,7 +108,7 @@ async def get_today_workflow(
"status": "skipped" if t.status == "dismissed" else t.status,
"priority": t.priority,
"estimatedTime": t.estimated_time,
"dependencies": t.dependencies or [],
"dependencies": _normalize_dependencies(t.dependencies),
"actionUrl": t.action_url,
"actionType": t.action_type,
"metadata": t.metadata_json or {},
@@ -100,11 +135,21 @@ async def get_today_workflow(
if created:
asyncio.create_task(_index_tasks_to_sif(user_id, plan.date, response_tasks, label="today"))
try:
from datetime import date as date_type, timedelta
from datetime import date as date_type, timedelta
try:
parsed_plan_date = date_type.fromisoformat(plan.date)
except ValueError:
logger.warning(
"Invalid plan.date format; skipping yesterday indexing plan_id={} user_id={} plan_date={} reason={}",
plan.id,
user_id,
plan.date,
"plan.date is not in ISO format YYYY-MM-DD",
)
else:
y_str = (parsed_plan_date - timedelta(days=1)).isoformat()
y_str = (date_type.fromisoformat(plan.date) - timedelta(days=1)).isoformat()
def _fetch_yesterday():
y_plan = (
db.query(DailyWorkflowPlan)
@@ -121,23 +166,33 @@ async def get_today_workflow(
return y_tasks
return []
y_tasks = await run_in_threadpool(_fetch_yesterday)
if y_tasks:
y_response = []
for t in y_tasks:
y_response.append(
{
"id": str(t.id),
"pillarId": t.pillar_id,
"title": t.title,
"description": t.description,
"status": "skipped" if t.status == "dismissed" else t.status,
}
)
asyncio.create_task(_index_tasks_to_sif(user_id, y_str, y_response, label="yesterday"))
except Exception:
pass
try:
y_tasks = await run_in_threadpool(_fetch_yesterday)
except SQLAlchemyError as db_error:
logger.warning(
"Failed to fetch yesterday tasks; skipping yesterday indexing plan_id={} user_id={} plan_date={} yesterday_date={} error_class={} error_message={}",
plan.id,
user_id,
plan.date,
y_str,
type(db_error).__name__,
str(db_error),
)
else:
if y_tasks:
y_response = []
for t in y_tasks:
y_response.append(
{
"id": str(t.id),
"pillarId": t.pillar_id,
"title": t.title,
"description": t.description,
"status": "skipped" if t.status == "dismissed" else t.status,
"dependencies": _normalize_dependencies(t.dependencies),
}
)
asyncio.create_task(_index_tasks_to_sif(user_id, y_str, y_response, label="yesterday"))
return {
"success": True,
@@ -158,6 +213,8 @@ async def get_today_workflow(
"id": plan.id,
"date": plan.date,
"source": plan.source,
"quality_status": (plan.plan_json or {}).get("quality_status", "contextual"),
"contextuality_validation": (plan.plan_json or {}).get("contextuality_validation"),
"created_at": plan.created_at.isoformat() if plan.created_at else None,
"updated_at": plan.updated_at.isoformat() if plan.updated_at else None,
},
@@ -172,15 +229,13 @@ from services.task_memory_service import TaskMemoryService
@router.post("/tasks/{task_id}/status")
async def set_task_status(
task_id: int,
body: Dict[str, Any],
body: TaskStatusUpdateRequest,
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db),
) -> Dict[str, Any]:
user_id = str(current_user.get("id"))
status = body.get("status")
if not status:
raise HTTPException(status_code=400, detail="status is required")
completion_notes = body.get("completion_notes")
status = body.status.value
completion_notes = body.completion_notes
task = update_task_status(db, user_id, task_id, status=status, completion_notes=completion_notes)
if not task:
@@ -189,10 +244,18 @@ async def set_task_status(
# Record outcome in memory for self-learning
try:
memory = TaskMemoryService(user_id, db)
normalized_status = (task.status or "").lower()
if normalized_status == "completed":
feedback_score = 1
elif normalized_status in {"skipped", "dismissed", "rejected"}:
feedback_score = -1
else:
feedback_score = 0
await memory.record_task_outcome(
task,
feedback_score=1 if status == "completed" else -1 if status == "dismissed" else 0,
feedback_text=completion_notes
task,
feedback_score=feedback_score,
feedback_text=completion_notes,
)
except Exception as e:
logger.warning(

View File

@@ -13,6 +13,9 @@ class DailyWorkflowPlan(Base):
user_id = Column(String(255), nullable=False, index=True)
date = Column(String(10), nullable=False, index=True)
source = Column(String(30), nullable=False, default="agent")
generation_mode = Column(String(30), nullable=False, default="llm_generation", index=True)
committee_agent_count = Column(Integer, nullable=False, default=0)
fallback_used = Column(Boolean, nullable=False, default=False)
plan_json = Column(JSON, nullable=True)
generation_run_id = Column(Integer, nullable=True, index=True)
created_at = Column(DateTime, default=datetime.utcnow, index=True)

View File

@@ -1,5 +1,7 @@
# Core dependencies
fastapi>=0.104.0
fastapi>=0.115.14
starlette>=0.40.0,<0.47.0
sse-starlette<3.0.0
uvicorn>=0.24.0
python-multipart>=0.0.6
python-dotenv>=1.0.0

View File

@@ -464,6 +464,7 @@ class AgentOrchestrationService:
async def get_or_create_orchestrator(self, user_id: str) -> ALwrityAgentOrchestrator:
"""Get or create an orchestrator for a user"""
onboarding_gated_initialization = False
if user_id not in self.orchestrators:
config = AgentTeamConfiguration(user_id=user_id)
self.orchestrators[user_id] = ALwrityAgentOrchestrator(config)
@@ -474,6 +475,25 @@ class AgentOrchestrationService:
if not orchestrator.agents and not orchestrator.execution_history:
logger.info(f"Orchestrator for {user_id} has no agents. Attempting re-initialization.")
orchestrator._create_specialized_agents()
last_system_check = next(
(
entry
for entry in reversed(orchestrator.execution_history)
if entry.get("action") == "system_check"
),
None,
)
if last_system_check and last_system_check.get("status") == "pending":
details = str(last_system_check.get("details") or "").lower()
onboarding_gated_initialization = "onboarding" in details
orchestrator.onboarding_gated_initialization = onboarding_gated_initialization
orchestrator.initialization_state = {
"onboarding_gated_initialization": onboarding_gated_initialization,
"active_agent_count": len(orchestrator.agents),
"active_agent_keys": sorted(orchestrator.agents.keys()),
}
return orchestrator

View File

@@ -81,7 +81,7 @@ class OnboardingFullWebsiteAnalysisExecutor(TaskExecutor):
task.last_executed = datetime.utcnow()
task.last_success = datetime.utcnow()
task.status = 'paused'
task.status = 'completed' # Explicitly mark as completed instead of paused
task.next_execution = None
task.consecutive_failures = 0
task.failure_pattern = None

View File

@@ -15,7 +15,7 @@ from services.intelligence.txtai_service import TxtaiIntelligenceService
EXACT_DUPLICATE_LOOKBACK_DAYS = 7
SEMANTIC_SUPPRESSION_SCORE_THRESHOLD = 0.85
SUPPRESSED_STATUSES = {"dismissed", "rejected"}
SUPPRESSED_STATUSES = {"dismissed", "rejected", "skipped"}
class TaskMemoryService:
"""
@@ -72,7 +72,7 @@ class TaskMemoryService:
self.db.commit()
# 2. Index into txtai (if status is meaningful)
if task.status in ["completed", "dismissed", "rejected"]:
if task.status in ["completed", "dismissed", "rejected", "skipped"]:
# We index the task text with metadata about its outcome
# This allows us to search: "Has the user rejected similar tasks?"
doc = {

View File

@@ -11,6 +11,8 @@ from services.llm_providers.main_text_generation import llm_text_gen
from loguru import logger
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
MIN_TASK_EVIDENCE_LINKS = 1
PLAN_CONTEXT_THRESHOLD = 0.65
def _today_date_str() -> str:
@@ -139,6 +141,116 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]:
return sanitized
def _derive_onboarding_evidence_links(onboarding_data: Dict[str, Any], limit: int = 2) -> List[str]:
if not isinstance(onboarding_data, dict):
return []
links: List[str] = []
for key, value in onboarding_data.items():
if key == "workflow_config":
continue
if value in (None, "", [], {}):
continue
links.append(f"onboarding:{key}")
if len(links) >= limit:
break
return links
def _valid_evidence_links(evidence_links: Any, grounding: Dict[str, Any]) -> List[str]:
if not isinstance(evidence_links, list):
return []
onboarding_data = grounding.get("onboarding_data", {}) if isinstance(grounding, dict) else {}
if not isinstance(onboarding_data, dict):
onboarding_data = {}
valid_onboarding_keys = {str(k) for k in onboarding_data.keys()}
recent_alerts = grounding.get("recent_agent_alerts", []) if isinstance(grounding, dict) else []
valid_alert_ids = {
str(a.get("alert_id"))
for a in recent_alerts
if isinstance(a, dict) and a.get("alert_id") is not None
}
valid_links: List[str] = []
for raw in evidence_links:
link = str(raw or "").strip()
if not link:
continue
if link.startswith("onboarding:"):
key = link.split(":", 1)[1].strip()
if key and key in valid_onboarding_keys:
valid_links.append(link)
elif link.startswith("alert:"):
alert_id = link.split(":", 1)[1].strip()
if alert_id and alert_id in valid_alert_ids:
valid_links.append(link)
return valid_links
def validate_plan_contextuality(plan: Dict[str, Any], grounding: Dict[str, Any]) -> Dict[str, Any]:
tasks = plan.get("tasks") if isinstance(plan, dict) else None
if not isinstance(tasks, list) or not tasks:
return {
"score": 0.0,
"threshold": PLAN_CONTEXT_THRESHOLD,
"is_contextual": False,
"task_scores": [],
"tasks_below_min_evidence": 0,
"min_evidence_links": MIN_TASK_EVIDENCE_LINKS,
}
task_scores = []
below_min_evidence = 0
for idx, task in enumerate(tasks):
metadata = task.get("metadata") if isinstance(task, dict) else {}
metadata = metadata if isinstance(metadata, dict) else {}
evidence_links = _valid_evidence_links(metadata.get("evidence_links"), grounding)
has_min_evidence = len(evidence_links) >= MIN_TASK_EVIDENCE_LINKS
if not has_min_evidence:
below_min_evidence += 1
reasoning_text = str(metadata.get("reasoning") or task.get("description") or "").lower()
onboarding_hits = sum(1 for l in evidence_links if l.startswith("onboarding:"))
alert_hits = sum(1 for l in evidence_links if l.startswith("alert:"))
score = 0.0
if has_min_evidence:
score += 0.6
if onboarding_hits > 0:
score += 0.2
if alert_hits > 0:
score += 0.2
elif "alert" in reasoning_text:
score += 0.1
task_scores.append(
{
"task_index": idx,
"pillarId": task.get("pillarId"),
"title": task.get("title"),
"score": min(score, 1.0),
"evidence_links": evidence_links,
"has_min_evidence": has_min_evidence,
}
)
plan_score = sum(t["score"] for t in task_scores) / len(task_scores)
is_contextual = plan_score >= PLAN_CONTEXT_THRESHOLD and below_min_evidence == 0
return {
"score": round(plan_score, 3),
"threshold": PLAN_CONTEXT_THRESHOLD,
"is_contextual": is_contextual,
"task_scores": task_scores,
"tasks_below_min_evidence": below_min_evidence,
"min_evidence_links": MIN_TASK_EVIDENCE_LINKS,
}
def _build_single_task_for_missing_pillar(
user_id: str,
date: str,
@@ -253,6 +365,7 @@ def build_grounding_context(db: Session, user_id: str, date: str) -> Dict[str, A
return {
"recent_agent_alerts": [
{
"alert_id": a.id,
"title": a.title,
"message": a.message,
"created_at": a.created_at.isoformat(),
@@ -272,9 +385,15 @@ from services.task_memory_service import TaskMemoryService
# Initialize orchestration service (singleton)
orchestration_service = AgentOrchestrationService()
async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> Dict[str, Any]:
async def generate_agent_enhanced_plan(
db: Session,
user_id: str,
date: str,
grounding: Optional[Dict[str, Any]] = None,
strict_contextuality: bool = False,
) -> Dict[str, Any]:
activity = AgentActivityService(db, user_id)
grounding = build_grounding_context(db, user_id, date)
grounding = grounding or build_grounding_context(db, user_id, date)
memory_service = TaskMemoryService(user_id, db)
# 1. Get Orchestrator
@@ -351,7 +470,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
# 4. Final Selection
# If we have agent tasks, use them. Otherwise fall back to LLM generation.
if agent_tasks:
if agent_tasks and not strict_contextuality:
logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee")
# Convert TaskProposal objects to dicts for frontend
@@ -369,7 +488,8 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
"metadata": {
"source_agent": prop.source_agent,
"reasoning": prop.reasoning,
"context_data": prop.context_data
"context_data": prop.context_data,
"evidence_links": _derive_onboarding_evidence_links(grounding.get("onboarding_data", {}), limit=2),
}
})
@@ -425,6 +545,15 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
f"Grounding context (Alerts):\n{json.dumps(grounding.get('recent_agent_alerts', []), indent=2)}\n"
)
if strict_contextuality:
prompt += (
"\nStrict contextuality mode (must follow):\n"
f"- Every task.metadata must include evidence_links with at least {MIN_TASK_EVIDENCE_LINKS} entries.\n"
"- evidence_links entries must use either 'onboarding:<field_name>' or 'alert:<alert_id>' format.\n"
"- Include metadata.reasoning that explains how the evidence applies to the task.\n"
"- Reject generic tasks without explicit ties to onboarding data or active alerts.\n"
)
run = activity.start_run(agent_type="TodayWorkflowGenerator", prompt=prompt[:4000])
activity.log_event(
event_type="plan",
@@ -492,7 +621,25 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
if existing:
return existing, False
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
grounding = build_grounding_context(db, user_id, date_str)
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str, grounding=grounding)
validation = validate_plan_contextuality(plan_data, grounding)
if not validation.get("is_contextual"):
logger.info("Plan contextuality below threshold for user {}. Running strict regeneration.", user_id)
regenerated_plan = await generate_agent_enhanced_plan(
db,
user_id,
date_str,
grounding=grounding,
strict_contextuality=True,
)
regenerated_validation = validate_plan_contextuality(regenerated_plan, grounding)
plan_data = regenerated_plan
validation = regenerated_validation
plan_data["quality_status"] = "contextual" if validation.get("is_contextual") else "low_context"
plan_data["contextuality_validation"] = validation
tasks = plan_data.get("tasks", [])
def _create_plan():

View File

@@ -8,7 +8,7 @@ if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from services.intelligence.monitoring.semantic_dashboard import RealTimeSemanticMonitor, SemanticHealthMetric
from services.today_workflow_service import _ensure_pillar_coverage, PILLAR_IDS
from services.today_workflow_service import _ensure_pillar_coverage, PILLAR_IDS, validate_plan_contextuality
from services.intelligence.sif_agents import ContentGuardianAgent as SifGuardian
from services.intelligence.agents.specialized_agents import ContentGuardianAgent as SpecializedGuardian
@@ -74,6 +74,52 @@ class SIFReleaseReadinessTests(unittest.IsolatedAsyncioTestCase):
self.assertIn("warning", result)
self.assertEqual(result["method"], "competitor_index_search")
def test_validate_plan_contextuality_passes_with_evidence_links(self):
plan = {
"tasks": [
{
"pillarId": "plan",
"title": "Review strategy",
"description": "Use onboarding goals",
"metadata": {
"evidence_links": ["onboarding:business_goals", "alert:101"],
"reasoning": "Based on onboarding and alert",
},
}
]
}
grounding = {
"onboarding_data": {"business_goals": ["awareness"]},
"recent_agent_alerts": [{"alert_id": 101, "title": "Drop in traffic"}],
}
validation = validate_plan_contextuality(plan, grounding)
self.assertTrue(validation["is_contextual"])
self.assertEqual(validation["tasks_below_min_evidence"], 0)
def test_validate_plan_contextuality_flags_missing_evidence_links(self):
plan = {
"tasks": [
{
"pillarId": "generate",
"title": "Write generic post",
"description": "Create a post",
"metadata": {"reasoning": "General best practice"},
}
]
}
grounding = {
"onboarding_data": {"business_goals": ["awareness"]},
"recent_agent_alerts": [{"alert_id": 101, "title": "Drop in traffic"}],
}
validation = validate_plan_contextuality(plan, grounding)
self.assertFalse(validation["is_contextual"])
self.assertEqual(validation["tasks_below_min_evidence"], 1)
def test_pillar_coverage_guardrail_backfills_missing(self):
tasks = [{"pillarId": "plan", "title": "Plan", "description": "d", "priority": "high", "estimatedTime": 10, "actionType": "navigate", "enabled": True}]
grounding = {"workflow_config": {"enforce_pillar_coverage": True}}