From c3f478a763b4b1a5609382228c081dcb08812a9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:37:36 +0530 Subject: [PATCH 01/15] Normalize today workflow task dependencies as arrays --- backend/api/today_workflow.py | 17 ++++++++-- backend/services/today_workflow_service.py | 21 +++++++++++- frontend/src/stores/workflowStore.ts | 38 ++++++++++++++++++++-- 3 files changed, 71 insertions(+), 5 deletions(-) diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 928840b3..2ca792cc 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -7,7 +7,7 @@ from sqlalchemy.orm import Session from middleware.auth_middleware import get_current_user from services.database import get_db -from services.today_workflow_service import get_or_create_daily_workflow_plan, update_task_status +from services.today_workflow_service import coerce_dependencies, get_or_create_daily_workflow_plan, update_task_status from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask import asyncio from services.intelligence.txtai_service import TxtaiIntelligenceService @@ -62,6 +62,19 @@ async def get_today_workflow( tasks = await run_in_threadpool(_fetch_tasks) + def _normalize_legacy_dependencies(task_rows): + rows_updated = False + for row in task_rows: + normalized_dependencies = coerce_dependencies(row.dependencies) + if row.dependencies != normalized_dependencies: + row.dependencies = normalized_dependencies + db.add(row) + rows_updated = True + if rows_updated: + db.commit() + + await run_in_threadpool(_normalize_legacy_dependencies, tasks) + response_tasks = [] for t in tasks: response_tasks.append( @@ -73,7 +86,7 @@ async def get_today_workflow( "status": "skipped" if t.status == "dismissed" else t.status, "priority": t.priority, "estimatedTime": t.estimated_time, - "dependencies": t.dependencies or [], + "dependencies": coerce_dependencies(t.dependencies), "actionUrl": t.action_url, "actionType": t.action_type, "metadata": t.metadata_json or {}, diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 1be7cb21..210e5aad 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -29,6 +29,25 @@ def _coerce_status(value: Any) -> str: return "pending" +def coerce_dependencies(value: Any) -> List[str]: + if isinstance(value, list): + return [str(dep).strip() for dep in value if str(dep).strip()] + + if isinstance(value, str): + raw = value.strip() + if not raw: + return [] + try: + parsed = json.loads(raw) + if isinstance(parsed, list): + return [str(dep).strip() for dep in parsed if str(dep).strip()] + except Exception: + pass + return [raw] + + return [] + + def _proposal_priority_rank(priority: str) -> int: return {"low": 0, "medium": 1, "high": 2}.get(str(priority or "").lower(), 1) @@ -523,7 +542,7 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt estimated_time=int(t.get("estimatedTime") or 15), action_type=str(t.get("actionType") or "navigate").strip()[:20], action_url=str(t.get("actionUrl") or "").strip(), - dependencies=json.dumps(t.get("dependencies") or []), + dependencies=coerce_dependencies(t.get("dependencies")), metadata_json=t.get("metadata") or {}, enabled=bool(t.get("enabled", True)), created_at=datetime.utcnow(), diff --git a/frontend/src/stores/workflowStore.ts b/frontend/src/stores/workflowStore.ts index fb0c1031..d834b540 100644 --- a/frontend/src/stores/workflowStore.ts +++ b/frontend/src/stores/workflowStore.ts @@ -14,6 +14,39 @@ import { apiClient } from '../api/client'; const isServerWorkflowId = (workflowId: string) => workflowId.startsWith('daily-'); + +const normalizeDependencies = (dependencies: unknown): string[] => { + if (Array.isArray(dependencies)) { + return dependencies.map(dep => String(dep).trim()).filter(Boolean); + } + + if (typeof dependencies === 'string') { + const raw = dependencies.trim(); + if (!raw) return []; + + try { + const parsed = JSON.parse(raw); + if (Array.isArray(parsed)) { + return parsed.map(dep => String(dep).trim()).filter(Boolean); + } + } catch {} + + return [raw]; + } + + return []; +}; + +const normalizeServerWorkflow = (workflow: DailyWorkflow): DailyWorkflow => ({ + ...workflow, + tasks: Array.isArray(workflow.tasks) + ? workflow.tasks.map(task => ({ + ...task, + dependencies: normalizeDependencies(task.dependencies), + })) + : [], +}); + const computeProgressAndNavigation = (workflow: DailyWorkflow): { progress: WorkflowProgress; navigation: NavigationState } => { const tasks = Array.isArray(workflow.tasks) ? workflow.tasks : []; const totalTasks = tasks.length; @@ -118,9 +151,10 @@ export const useWorkflowStore = create()( const resp = await apiClient.get('/api/today-workflow', { params: date ? { date } : {} }); const serverWorkflow = resp?.data?.data?.workflow as DailyWorkflow | undefined; if (serverWorkflow && Array.isArray(serverWorkflow.tasks)) { - const derived = computeProgressAndNavigation(serverWorkflow); + const normalizedWorkflow = normalizeServerWorkflow(serverWorkflow); + const derived = computeProgressAndNavigation(normalizedWorkflow); set({ - currentWorkflow: serverWorkflow, + currentWorkflow: normalizedWorkflow, workflowProgress: derived.progress, navigationState: derived.navigation, isLoading: false From 198143e6cad444b1798ae4524c00f3a13f5fbd75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:38:04 +0530 Subject: [PATCH 02/15] Add per-agent timeout handling for daily committee proposals --- backend/services/today_workflow_service.py | 116 ++++++++++++++++----- 1 file changed, 90 insertions(+), 26 deletions(-) diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 1be7cb21..4f8d74aa 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -42,6 +42,19 @@ def _proposal_order_key(proposal: Any) -> tuple: ) +def _get_agent_proposal_timeout_seconds(grounding: Dict[str, Any]) -> float: + workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} + if not isinstance(workflow_config, dict): + return 4.0 + + raw_timeout = workflow_config.get("agent_proposal_timeout_seconds", 4.0) + try: + timeout_seconds = float(raw_timeout) + except (TypeError, ValueError): + return 4.0 + return max(1.0, timeout_seconds) + + def _fallback_tasks(date: str) -> List[Dict[str, Any]]: return [ { @@ -288,32 +301,82 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> logger.info(f"Gathering daily task proposals from agent committee for user {user_id}") agent_tasks = [] + committee_total_failure = False try: - # Define agents to poll - agents_to_poll = [ - orchestrator.agents.get('content'), # ContentStrategyAgent - orchestrator.agents.get('strategy'), # StrategyArchitectAgent - orchestrator.agents.get('seo'), # SEOOptimizationAgent - orchestrator.agents.get('social'), # SocialAmplificationAgent - orchestrator.agents.get('competitor'), # CompetitorResponseAgent - ] - + agent_timeout_seconds = _get_agent_proposal_timeout_seconds(grounding) + + # Define agents to poll (keyed for logging/metrics) + agents_to_poll = { + "content": orchestrator.agents.get('content'), + "strategy": orchestrator.agents.get('strategy'), + "seo": orchestrator.agents.get('seo'), + "social": orchestrator.agents.get('social'), + "competitor": orchestrator.agents.get('competitor'), + } + # Filter out None agents (disabled/failed init) - active_agents = [a for a in agents_to_poll if a] - - # Execute propose_daily_tasks in parallel - results = await asyncio.gather( - *[a.propose_daily_tasks(grounding) for a in active_agents], - return_exceptions=True + active_agents = {key: agent for key, agent in agents_to_poll.items() if agent} + + async def _collect_agent_proposals(agent_key: str, agent: Any) -> Dict[str, Any]: + started_at = datetime.now(timezone.utc) + try: + proposals = await asyncio.wait_for(agent.propose_daily_tasks(grounding), timeout=agent_timeout_seconds) + elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000 + return { + "agent_key": agent_key, + "status": "ok", + "elapsed_ms": elapsed_ms, + "proposals": proposals if isinstance(proposals, list) else [], + } + except asyncio.TimeoutError: + elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000 + return { + "agent_key": agent_key, + "status": "timeout", + "elapsed_ms": elapsed_ms, + "proposals": [], + "error": f"Timed out after {agent_timeout_seconds:.2f}s", + } + except Exception as agent_error: + elapsed_ms = (datetime.now(timezone.utc) - started_at).total_seconds() * 1000 + return { + "agent_key": agent_key, + "status": "error", + "elapsed_ms": elapsed_ms, + "proposals": [], + "error": str(agent_error), + } + + # Execute propose_daily_tasks in parallel with per-agent timeout + committee_results = await asyncio.gather( + *[_collect_agent_proposals(agent_key, agent) for agent_key, agent in active_agents.items()] ) - - # Collect successful proposals + + successful_agent_count = 0 raw_proposals = [] - for res in results: - if isinstance(res, list): - raw_proposals.extend(res) - elif isinstance(res, Exception): - logger.warning(f"Agent proposal failed: {res}") + for res in committee_results: + agent_key = res.get("agent_key") + status = res.get("status") + elapsed_ms = res.get("elapsed_ms") + + logger.info( + "Agent committee proposal metric | agent={} status={} elapsed_ms={:.2f} timeout_s={:.2f} proposal_count={}", + agent_key, + status, + elapsed_ms, + agent_timeout_seconds, + len(res.get("proposals") or []), + ) + + if status == "ok": + successful_agent_count += 1 + raw_proposals.extend(res.get("proposals") or []) + elif status == "timeout": + logger.warning(f"Agent proposal timed out for {agent_key}: {res.get('error')}") + else: + logger.warning(f"Agent proposal failed for {agent_key}: {res.get('error')}") + + committee_total_failure = successful_agent_count == 0 # 3. Filter Redundant Proposals (Self-Learning) # Note: We need to ensure we don't filter out essential recurring tasks if they were completed long ago @@ -347,13 +410,14 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> except Exception as e: logger.error(f"Committee proposal phase failed: {e}") + committee_total_failure = True # Continue to fallback or LLM generation if committee fails # 4. Final Selection - # If we have agent tasks, use them. Otherwise fall back to LLM generation. - if agent_tasks: + # Use committee outcomes whenever committee partially succeeds, even with sparse proposals. + if not committee_total_failure: logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee") - + # Convert TaskProposal objects to dicts for frontend final_tasks = [] for prop in agent_tasks: @@ -372,7 +436,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> "context_data": prop.context_data } }) - + final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) return { "date": date, From ed625eae6175a13696ec2542c1b85abbb4eba123 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:38:39 +0530 Subject: [PATCH 03/15] Harden workflow fallback handling and degraded mode UI --- .../components/WorkflowProgressBar.tsx | 31 ++- .../src/services/TaskWorkflowOrchestrator.ts | 204 +++++++----------- frontend/src/stores/workflowStore.ts | 91 ++++++-- 3 files changed, 170 insertions(+), 156 deletions(-) diff --git a/frontend/src/components/MainDashboard/components/WorkflowProgressBar.tsx b/frontend/src/components/MainDashboard/components/WorkflowProgressBar.tsx index cfc344a2..961a7cad 100644 --- a/frontend/src/components/MainDashboard/components/WorkflowProgressBar.tsx +++ b/frontend/src/components/MainDashboard/components/WorkflowProgressBar.tsx @@ -14,7 +14,8 @@ import { Pause, CheckCircle, Schedule, - TrendingUp + TrendingUp, + CloudOff } from '@mui/icons-material'; import { useWorkflowStore } from '../../../stores/workflowStore'; @@ -42,7 +43,9 @@ const WorkflowProgressBar: React.FC = ({ startWorkflow, isWorkflowComplete, getCompletionPercentage, - generateDailyWorkflow + generateDailyWorkflow, + isDegradedMode, + degradedModeReason } = useWorkflowStore(); const completionPercentage = getCompletionPercentage(); @@ -169,6 +172,30 @@ const WorkflowProgressBar: React.FC = ({ )} + + {isDegradedMode && ( + + + + Degraded mode + + + {degradedModeReason || 'Server workflow is unavailable; local fallback is active.'} + + + )} + {/* Progress Bar */} diff --git a/frontend/src/services/TaskWorkflowOrchestrator.ts b/frontend/src/services/TaskWorkflowOrchestrator.ts index 6799d45a..4193f65c 100644 --- a/frontend/src/services/TaskWorkflowOrchestrator.ts +++ b/frontend/src/services/TaskWorkflowOrchestrator.ts @@ -311,18 +311,15 @@ class TaskWorkflowOrchestrator { date: string, context?: TaskGenerationContext ): Promise { - // This is a placeholder implementation - // In Phase 3, this will be replaced with AI-powered task generation - const defaultTasks: TodayTask[] = [ { - id: `${userId}-${date}-plan-1`, + id: `${userId}-${date}-plan`, pillarId: 'plan', - title: 'Review content strategy', - description: 'Check and update your content strategy for the week', + title: 'Review today\'s plan', + description: 'Confirm priorities and schedule for today\'s content work.', status: 'pending', priority: 'high', - estimatedTime: 15, + estimatedTime: 10, actionType: 'navigate', actionUrl: '/content-planning-dashboard', enabled: true, @@ -330,29 +327,14 @@ class TaskWorkflowOrchestrator { color: '#4CAF50' }, { - id: `${userId}-${date}-plan-2`, - pillarId: 'plan', - title: 'Update content calendar', - description: 'Review and update your content calendar', + id: `${userId}-${date}-generate`, + pillarId: 'generate', + title: 'Generate a draft', + description: 'Create one content draft using the content writer.', status: 'pending', priority: 'medium', - estimatedTime: 10, - dependencies: [`${userId}-${date}-plan-1`], - actionType: 'navigate', - actionUrl: '/content-planning-dashboard', - enabled: true, - icon: 'CalendarMonth', - color: '#4CAF50' - }, - { - id: `${userId}-${date}-generate-1`, - pillarId: 'generate', - title: 'Create social media content', - description: 'Generate content for your social media platforms', - status: 'pending', - priority: 'high', - estimatedTime: 30, - dependencies: [`${userId}-${date}-plan-1`, `${userId}-${date}-plan-2`], + estimatedTime: 20, + dependencies: [`${userId}-${date}-plan`], actionType: 'navigate', actionUrl: '/facebook-writer', enabled: true, @@ -360,29 +342,14 @@ class TaskWorkflowOrchestrator { color: '#2196F3' }, { - id: `${userId}-${date}-generate-2`, - pillarId: 'generate', - title: 'Create blog content', - description: 'Write blog posts for your website', - status: 'pending', - priority: 'medium', - estimatedTime: 45, - dependencies: [`${userId}-${date}-plan-1`], - actionType: 'navigate', - actionUrl: '/blog-writer', - enabled: true, - icon: 'Article', - color: '#2196F3' - }, - { - id: `${userId}-${date}-publish-1`, + id: `${userId}-${date}-publish`, pillarId: 'publish', - title: 'Publish social media content', - description: 'Publish your created content to social media', + title: 'Publish approved content', + description: 'Open publishing tools and publish today\'s approved draft.', status: 'pending', - priority: 'medium', + priority: 'high', estimatedTime: 10, - dependencies: [`${userId}-${date}-generate-1`], + dependencies: [`${userId}-${date}-generate`], actionType: 'navigate', actionUrl: '/facebook-writer', enabled: true, @@ -390,29 +357,14 @@ class TaskWorkflowOrchestrator { color: '#FF9800' }, { - id: `${userId}-${date}-publish-2`, - pillarId: 'publish', - title: 'Publish blog content', - description: 'Publish blog posts to your website', + id: `${userId}-${date}-analyze`, + pillarId: 'analyze', + title: 'Check performance snapshot', + description: 'Review key analytics to assess today\'s published content.', status: 'pending', priority: 'medium', - estimatedTime: 15, - dependencies: [`${userId}-${date}-generate-2`], - actionType: 'navigate', - actionUrl: '/blog-writer', - enabled: true, - icon: 'Publish', - color: '#FF9800' - }, - { - id: `${userId}-${date}-analyze-1`, - pillarId: 'analyze', - title: 'Review content performance', - description: 'Analyze performance of published content', - status: 'pending', - priority: 'low', - estimatedTime: 20, - dependencies: [`${userId}-${date}-publish-1`, `${userId}-${date}-publish-2`], + estimatedTime: 10, + dependencies: [`${userId}-${date}-publish`], actionType: 'navigate', actionUrl: '/analytics-dashboard', enabled: true, @@ -420,95 +372,50 @@ class TaskWorkflowOrchestrator { color: '#9C27B0' }, { - id: `${userId}-${date}-engage-1`, + id: `${userId}-${date}-engage`, pillarId: 'engage', - title: 'Respond to comments', - description: 'Engage with comments on your content', + title: 'Respond to audience activity', + description: 'Reply to new comments or mentions from today\'s posts.', status: 'pending', priority: 'low', - estimatedTime: 15, - dependencies: [`${userId}-${date}-publish-1`], + estimatedTime: 10, + dependencies: [`${userId}-${date}-publish`], actionType: 'navigate', actionUrl: '/engagement-dashboard', enabled: true, icon: 'ChatBubbleOutline', color: '#E91E63' }, - // Engage pillar tasks { - id: `${userId}-${date}-engage-1`, - pillarId: 'engage', - title: 'Reply to blog comment', - description: 'Respond to comments on your latest blog post', - status: 'pending', - priority: 'high', - estimatedTime: 10, - dependencies: [`${userId}-${date}-analyze-1`], - actionType: 'navigate', - actionUrl: '/engagement-dashboard', - enabled: true, - icon: 'Comment', - color: '#E91E63' - }, - { - id: `${userId}-${date}-engage-2`, - pillarId: 'engage', - title: 'Respond to Twitter mention', - description: 'Reply to Twitter mentions and engage with followers', - status: 'pending', - priority: 'medium', - estimatedTime: 5, - dependencies: [`${userId}-${date}-engage-1`], - actionType: 'navigate', - actionUrl: '/engagement-dashboard', - enabled: true, - icon: 'Twitter', - color: '#1DA1F2' - }, - // Remarket pillar tasks - { - id: `${userId}-${date}-remarket-1`, + id: `${userId}-${date}-remarket`, pillarId: 'remarket', - title: 'Launch Retargeting Campaign', - description: 'Create and launch targeted remarketing campaigns', + title: 'Prepare remarketing audience', + description: 'Open remarketing tools to refresh your retargeting audience.', status: 'pending', - priority: 'high', - estimatedTime: 35, - dependencies: [`${userId}-${date}-engage-2`], + priority: 'low', + estimatedTime: 15, + dependencies: [`${userId}-${date}-analyze`], actionType: 'navigate', actionUrl: '/remarketing-dashboard', enabled: true, icon: 'Psychology', color: '#00695C' - }, - { - id: `${userId}-${date}-remarket-2`, - pillarId: 'remarket', - title: 'Lead Nurturing Sequence', - description: 'Set up automated lead nurturing workflows', - status: 'pending', - priority: 'medium', - estimatedTime: 30, - dependencies: [`${userId}-${date}-remarket-1`], - actionType: 'navigate', - actionUrl: '/lead-nurturing', - enabled: true, - icon: 'Refresh', - color: '#4CAF50' } ]; + const uniqueTasks = this.ensureUniqueTaskIds(defaultTasks); + // Validate dependencies and get optimal execution order const tempWorkflow: DailyWorkflow = { id: `${userId}-${date}`, date, userId, - tasks: defaultTasks, + tasks: uniqueTasks, currentTaskIndex: 0, completedTasks: 0, - totalTasks: defaultTasks.length, + totalTasks: uniqueTasks.length, workflowStatus: 'not_started', - totalEstimatedTime: defaultTasks.reduce((sum, task) => sum + task.estimatedTime, 0), + totalEstimatedTime: uniqueTasks.reduce((sum, task) => sum + task.estimatedTime, 0), actualTimeSpent: 0 }; @@ -517,13 +424,46 @@ class TaskWorkflowOrchestrator { if (!validation.isValid) { console.warn('Dependency validation failed:', validation.errors); // Return tasks without dependencies if validation fails - return defaultTasks.map(task => ({ ...task, dependencies: [] })); + return uniqueTasks.map(task => ({ ...task, dependencies: [] })); } // Get optimal execution order const orderedTasks = taskDependencyManager.getOptimalExecutionOrder(tempWorkflow); - return orderedTasks; + return this.ensureUniqueTaskIds(orderedTasks); + } + + private ensureUniqueTaskIds(tasks: TodayTask[]): TodayTask[] { + const idOccurrences = new Map(); + const oldToNew = new Map(); + + const withUniqueIds = tasks.map(task => { + const count = idOccurrences.get(task.id) ?? 0; + idOccurrences.set(task.id, count + 1); + + if (count === 0) { + oldToNew.set(task.id, task.id); + return { ...task }; + } + + const uniqueId = `${task.id}-${count + 1}`; + oldToNew.set(`${task.id}#${count + 1}`, uniqueId); + return { ...task, id: uniqueId }; + }); + + const allTaskIds = new Set(withUniqueIds.map(task => task.id)); + + return withUniqueIds.map(task => { + const dependencies = (task.dependencies ?? []) + .map(dep => oldToNew.get(dep) || dep) + .filter((dep, index, arr) => arr.indexOf(dep) === index) + .filter(dep => allTaskIds.has(dep)); + + return { + ...task, + dependencies: dependencies.length > 0 ? dependencies : undefined + }; + }); } /** diff --git a/frontend/src/stores/workflowStore.ts b/frontend/src/stores/workflowStore.ts index fb0c1031..21c112bc 100644 --- a/frontend/src/stores/workflowStore.ts +++ b/frontend/src/stores/workflowStore.ts @@ -10,10 +10,25 @@ import { WorkflowError } from '../types/workflow'; import { taskWorkflowOrchestrator } from '../services/TaskWorkflowOrchestrator'; -import { apiClient } from '../api/client'; +import { apiClient, ConnectionError, NetworkError } from '../api/client'; const isServerWorkflowId = (workflowId: string) => workflowId.startsWith('daily-'); +const isServerUnavailableError = (error: unknown): boolean => + error instanceof ConnectionError || error instanceof NetworkError; + +const toWorkflowError = (error: unknown, fallbackMessage: string): WorkflowError => { + if (error instanceof WorkflowError) return error; + + const message = error instanceof Error ? error.message : fallbackMessage; + return { + code: 'WORKFLOW_ERROR', + message, + timestamp: new Date(), + recoverable: false, + }; +}; + const computeProgressAndNavigation = (workflow: DailyWorkflow): { progress: WorkflowProgress; navigation: NavigationState } => { const tasks = Array.isArray(workflow.tasks) ? workflow.tasks : []; const totalTasks = tasks.length; @@ -69,6 +84,8 @@ interface WorkflowState { isWorkflowModalOpen: boolean; isLoading: boolean; error: WorkflowError | null; + isDegradedMode: boolean; + degradedModeReason: string | null; // Actions generateDailyWorkflow: (userId: string, date?: string) => Promise; @@ -108,36 +125,66 @@ export const useWorkflowStore = create()( isWorkflowModalOpen: false, isLoading: false, error: null, + isDegradedMode: false, + degradedModeReason: null, // Generate daily workflow generateDailyWorkflow: async (userId: string, date?: string) => { set({ isLoading: true, error: null }); - - try { - try { - const resp = await apiClient.get('/api/today-workflow', { params: date ? { date } : {} }); - const serverWorkflow = resp?.data?.data?.workflow as DailyWorkflow | undefined; - if (serverWorkflow && Array.isArray(serverWorkflow.tasks)) { - const derived = computeProgressAndNavigation(serverWorkflow); - set({ - currentWorkflow: serverWorkflow, - workflowProgress: derived.progress, - navigationState: derived.navigation, - isLoading: false - }); - return; - } - } catch {} + try { + const resp = await apiClient.get('/api/today-workflow', { params: date ? { date } : {} }); + const serverWorkflow = resp?.data?.data?.workflow as DailyWorkflow | undefined; + + if (!serverWorkflow || !Array.isArray(serverWorkflow.tasks)) { + throw new WorkflowError({ + code: 'WORKFLOW_SCHEMA_INVALID', + message: 'Server workflow response is missing a valid tasks array.', + timestamp: new Date(), + recoverable: false, + suggestedAction: 'Refresh and try again. If this persists, contact support.' + }); + } + + const derived = computeProgressAndNavigation(serverWorkflow); + set({ + currentWorkflow: serverWorkflow, + workflowProgress: derived.progress, + navigationState: derived.navigation, + isLoading: false, + isDegradedMode: false, + degradedModeReason: null, + }); + return; + } catch (error) { + if (!isServerUnavailableError(error)) { + set({ + error: toWorkflowError(error, 'Failed to load workflow from server.'), + isLoading: false, + isDegradedMode: false, + degradedModeReason: null, + }); + return; + } + } + + try { const workflow = await taskWorkflowOrchestrator.generateDailyWorkflow(userId, date); const progress = taskWorkflowOrchestrator.getWorkflowProgress(workflow.id); const navigation = taskWorkflowOrchestrator.getNavigationState(workflow.id); - set({ currentWorkflow: workflow, workflowProgress: progress, navigationState: navigation, isLoading: false }); + set({ + currentWorkflow: workflow, + workflowProgress: progress, + navigationState: navigation, + isLoading: false, + isDegradedMode: true, + degradedModeReason: 'Server workflow unavailable. Using local fallback workflow.', + error: null, + }); } catch (error) { - const workflowError = error as WorkflowError; - set({ - error: workflowError, - isLoading: false + set({ + error: toWorkflowError(error, 'Failed to generate local fallback workflow.'), + isLoading: false, }); } }, From 81b29895b930ec80ad94b428d39812ca32fe7881 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:39:32 +0530 Subject: [PATCH 04/15] Improve daily workflow provenance modeling and UI labels --- backend/api/today_workflow.py | 22 +++++++++++++ backend/models/daily_workflow_models.py | 3 ++ backend/services/today_workflow_service.py | 33 ++++++++++++++++--- .../components/WorkflowProgressBar.tsx | 19 +++++++++++ frontend/src/stores/workflowStore.ts | 4 +++ frontend/src/types/workflow.ts | 10 ++++++ 6 files changed, 87 insertions(+), 4 deletions(-) diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 928840b3..48604872 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -42,6 +42,22 @@ async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: return +def _build_provenance_summary(plan: DailyWorkflowPlan, tasks: list[DailyWorkflowTask]) -> Dict[str, Any]: + source_counts: Dict[str, int] = {} + for task in tasks: + metadata = task.metadata_json if isinstance(task.metadata_json, dict) else {} + source = metadata.get("source") if metadata.get("source") in {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"} else "llm_generation" + source_counts[source] = source_counts.get(source, 0) + 1 + + generation_mode = plan.generation_mode if plan.generation_mode in {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"} else "llm_generation" + + return { + "generationMode": generation_mode, + "committeeAgentCount": int(plan.committee_agent_count or 0), + "fallbackUsed": bool(plan.fallback_used), + "taskSourceBreakdown": source_counts, + } + @router.get("") async def get_today_workflow( date: Optional[str] = None, @@ -61,6 +77,7 @@ async def get_today_workflow( ) tasks = await run_in_threadpool(_fetch_tasks) + provenance_summary = _build_provenance_summary(plan, tasks) response_tasks = [] for t in tasks: @@ -153,6 +170,7 @@ async def get_today_workflow( "workflowStatus": workflow_status, "totalEstimatedTime": total_estimated, "actualTimeSpent": 0, + "provenanceSummary": provenance_summary, }, "plan": { "id": plan.id, @@ -160,6 +178,10 @@ async def get_today_workflow( "source": plan.source, "created_at": plan.created_at.isoformat() if plan.created_at else None, "updated_at": plan.updated_at.isoformat() if plan.updated_at else None, + "generation_mode": plan.generation_mode, + "committee_agent_count": plan.committee_agent_count, + "fallback_used": plan.fallback_used, + "provenance_summary": provenance_summary, }, }, "timestamp": datetime.utcnow().isoformat(), diff --git a/backend/models/daily_workflow_models.py b/backend/models/daily_workflow_models.py index 237870f1..7a559a2d 100644 --- a/backend/models/daily_workflow_models.py +++ b/backend/models/daily_workflow_models.py @@ -13,6 +13,9 @@ class DailyWorkflowPlan(Base): user_id = Column(String(255), nullable=False, index=True) date = Column(String(10), nullable=False, index=True) source = Column(String(30), nullable=False, default="agent") + generation_mode = Column(String(30), nullable=False, default="llm_generation", index=True) + committee_agent_count = Column(Integer, nullable=False, default=0) + fallback_used = Column(Boolean, nullable=False, default=False) plan_json = Column(JSON, nullable=True) generation_run_id = Column(Integer, nullable=True, index=True) created_at = Column(DateTime, default=datetime.utcnow, index=True) diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 1be7cb21..725e6191 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -11,6 +11,15 @@ from services.llm_providers.main_text_generation import llm_text_gen from loguru import logger PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] +TASK_SOURCE_ENUM = {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"} + + +def _normalize_task_metadata(task: Dict[str, Any], default_source: str) -> Dict[str, Any]: + metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {} + source = metadata.get("source") + if source not in TASK_SOURCE_ENUM: + metadata["source"] = default_source + return metadata def _today_date_str() -> str: @@ -136,6 +145,7 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]: sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate" sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None sanitized["enabled"] = bool(task.get("enabled", True)) + sanitized["metadata"] = _normalize_task_metadata(task, default_source="llm_generation") return sanitized @@ -282,7 +292,11 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> orchestrator = await orchestration_service.get_or_create_orchestrator(user_id) except Exception as e: logger.error(f"Failed to get orchestrator: {e}") - return {"date": date, "tasks": _fallback_tasks(date)} + return { + "date": date, + "tasks": _fallback_tasks(date), + "provenance": {"generation_mode": "controlled_fallback", "committee_agent_count": 0, "fallback_used": True}, + } # 2. Parallel "Committee" Proposal Gathering logger.info(f"Gathering daily task proposals from agent committee for user {user_id}") @@ -367,6 +381,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> "actionUrl": prop.action_url, "enabled": True, "metadata": { + "source": "agent_committee", "source_agent": prop.source_agent, "reasoning": prop.reasoning, "context_data": prop.context_data @@ -376,7 +391,8 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) return { "date": date, - "tasks": final_tasks + "tasks": final_tasks, + "provenance": {"generation_mode": "agent_committee", "committee_agent_count": len(active_agents), "fallback_used": any((t.get("metadata") or {}).get("source") in {"llm_pillar_backfill", "controlled_fallback"} for t in final_tasks)}, } # Fallback to original LLM generation if agents returned nothing @@ -458,9 +474,11 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> tasks = result.get("tasks") if isinstance(result, dict) else None if not isinstance(tasks, list) or not tasks: tasks = _fallback_tasks(date) + enriched_tasks = _ensure_pillar_coverage(tasks, user_id, date, grounding) result = { "date": date, - "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), + "tasks": enriched_tasks, + "provenance": {"generation_mode": "llm_generation", "committee_agent_count": len(active_agents) if "active_agents" in locals() else 0, "fallback_used": any((t.get("metadata") or {}).get("source") in {"llm_pillar_backfill", "controlled_fallback"} for t in enriched_tasks)}, } activity.log_event( @@ -494,12 +512,19 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt plan_data = await generate_agent_enhanced_plan(db, user_id, date_str) tasks = plan_data.get("tasks", []) + provenance = plan_data.get("provenance") if isinstance(plan_data.get("provenance"), dict) else {} def _create_plan(): + generation_mode = provenance.get("generation_mode") if provenance.get("generation_mode") in TASK_SOURCE_ENUM else "llm_generation" + committee_agent_count = int(provenance.get("committee_agent_count") or 0) + fallback_used = bool(provenance.get("fallback_used", False)) plan = DailyWorkflowPlan( user_id=user_id, date=date_str, source="agent", + generation_mode=generation_mode, + committee_agent_count=max(0, committee_agent_count), + fallback_used=fallback_used, plan_json=plan_data, created_at=datetime.utcnow(), updated_at=datetime.utcnow(), @@ -524,7 +549,7 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt action_type=str(t.get("actionType") or "navigate").strip()[:20], action_url=str(t.get("actionUrl") or "").strip(), dependencies=json.dumps(t.get("dependencies") or []), - metadata_json=t.get("metadata") or {}, + metadata_json=_normalize_task_metadata(t, default_source=generation_mode), enabled=bool(t.get("enabled", True)), created_at=datetime.utcnow(), updated_at=datetime.utcnow(), diff --git a/frontend/src/components/MainDashboard/components/WorkflowProgressBar.tsx b/frontend/src/components/MainDashboard/components/WorkflowProgressBar.tsx index cfc344a2..a517df77 100644 --- a/frontend/src/components/MainDashboard/components/WorkflowProgressBar.tsx +++ b/frontend/src/components/MainDashboard/components/WorkflowProgressBar.tsx @@ -79,6 +79,15 @@ const WorkflowProgressBar: React.FC = ({ return 'Ready to Start'; }; + const getProvenanceLabel = () => { + const summary = currentWorkflow?.provenanceSummary; + if (!summary) return 'Daily Workflow'; + if (summary.generationMode === 'agent_committee') return 'Personalized by Agents'; + if (summary.generationMode === 'llm_generation' && !summary.fallbackUsed) return 'AI Personalized Guide'; + if (summary.fallbackUsed || summary.generationMode === 'controlled_fallback') return 'Baseline Daily Guide'; + return 'Daily Workflow'; + }; + return ( = ({ fontWeight: 600 }} /> + {/* Controls */} diff --git a/frontend/src/stores/workflowStore.ts b/frontend/src/stores/workflowStore.ts index fb0c1031..8374f766 100644 --- a/frontend/src/stores/workflowStore.ts +++ b/frontend/src/stores/workflowStore.ts @@ -117,7 +117,11 @@ export const useWorkflowStore = create()( try { const resp = await apiClient.get('/api/today-workflow', { params: date ? { date } : {} }); const serverWorkflow = resp?.data?.data?.workflow as DailyWorkflow | undefined; + const planSummary = resp?.data?.data?.plan?.provenance_summary; if (serverWorkflow && Array.isArray(serverWorkflow.tasks)) { + if (planSummary && !serverWorkflow.provenanceSummary) { + serverWorkflow.provenanceSummary = planSummary; + } const derived = computeProgressAndNavigation(serverWorkflow); set({ currentWorkflow: serverWorkflow, diff --git a/frontend/src/types/workflow.ts b/frontend/src/types/workflow.ts index a3992426..c28a986c 100644 --- a/frontend/src/types/workflow.ts +++ b/frontend/src/types/workflow.ts @@ -5,6 +5,14 @@ export type TaskStatus = 'pending' | 'in_progress' | 'completed' | 'skipped'; export type TaskPriority = 'high' | 'medium' | 'low'; export type ActionType = 'navigate' | 'modal' | 'external'; export type WorkflowStatus = 'not_started' | 'in_progress' | 'completed' | 'paused' | 'stopped'; +export type WorkflowGenerationMode = 'agent_committee' | 'llm_generation' | 'llm_pillar_backfill' | 'controlled_fallback'; + +export interface WorkflowProvenanceSummary { + generationMode: WorkflowGenerationMode; + committeeAgentCount: number; + fallbackUsed: boolean; + taskSourceBreakdown: Partial>; +} export interface TodayTask { id: string; @@ -44,6 +52,7 @@ export interface DailyWorkflow { completedAt?: Date; totalEstimatedTime: number; // in minutes actualTimeSpent: number; // in minutes + provenanceSummary?: WorkflowProvenanceSummary; } export interface WorkflowProgress { @@ -54,6 +63,7 @@ export interface WorkflowProgress { nextTask?: TodayTask; estimatedTimeRemaining: number; // in minutes actualTimeSpent: number; // in minutes + provenanceSummary?: WorkflowProvenanceSummary; } export interface TaskCompletionData { From 15a9eaa9a0fcf56d4e6604b3b479766f320d01e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:40:03 +0530 Subject: [PATCH 05/15] Add committee health precheck and orchestrator init state metadata --- .../intelligence/agents/agent_orchestrator.py | 20 ++ backend/services/today_workflow_service.py | 249 +++++++++++++----- 2 files changed, 201 insertions(+), 68 deletions(-) diff --git a/backend/services/intelligence/agents/agent_orchestrator.py b/backend/services/intelligence/agents/agent_orchestrator.py index afb42f15..6f0deff2 100644 --- a/backend/services/intelligence/agents/agent_orchestrator.py +++ b/backend/services/intelligence/agents/agent_orchestrator.py @@ -464,6 +464,7 @@ class AgentOrchestrationService: async def get_or_create_orchestrator(self, user_id: str) -> ALwrityAgentOrchestrator: """Get or create an orchestrator for a user""" + onboarding_gated_initialization = False if user_id not in self.orchestrators: config = AgentTeamConfiguration(user_id=user_id) self.orchestrators[user_id] = ALwrityAgentOrchestrator(config) @@ -474,6 +475,25 @@ class AgentOrchestrationService: if not orchestrator.agents and not orchestrator.execution_history: logger.info(f"Orchestrator for {user_id} has no agents. Attempting re-initialization.") orchestrator._create_specialized_agents() + + last_system_check = next( + ( + entry + for entry in reversed(orchestrator.execution_history) + if entry.get("action") == "system_check" + ), + None, + ) + if last_system_check and last_system_check.get("status") == "pending": + details = str(last_system_check.get("details") or "").lower() + onboarding_gated_initialization = "onboarding" in details + + orchestrator.onboarding_gated_initialization = onboarding_gated_initialization + orchestrator.initialization_state = { + "onboarding_gated_initialization": onboarding_gated_initialization, + "active_agent_count": len(orchestrator.agents), + "active_agent_keys": sorted(orchestrator.agents.keys()), + } return orchestrator diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 1be7cb21..0c2db96f 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -276,85 +276,147 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> activity = AgentActivityService(db, user_id) grounding = build_grounding_context(db, user_id, date) memory_service = TaskMemoryService(user_id, db) + min_active_agents = 2 + generation_path = "committee" # 1. Get Orchestrator try: orchestrator = await orchestration_service.get_or_create_orchestrator(user_id) except Exception as e: logger.error(f"Failed to get orchestrator: {e}") - return {"date": date, "tasks": _fallback_tasks(date)} + fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding) + return { + "date": date, + "tasks": fallback_tasks, + "metadata": { + "generation_path": "controlled_fallback", + "committee": { + "minimum_active_agents": min_active_agents, + "active_agents": {"count": 0, "names": []}, + }, + "degraded": { + "is_degraded": True, + "reason": "orchestrator_unavailable", + "missing_agents": [], + }, + }, + } + + expected_committee_agents = ["content", "strategy", "seo", "social", "competitor"] + active_agent_names = sorted(orchestrator.agents.keys()) + active_agents_count = len(active_agent_names) + missing_agents = [name for name in expected_committee_agents if name not in active_agent_names] + onboarding_gated_initialization = bool(getattr(orchestrator, "onboarding_gated_initialization", False)) + initialization_state = ( + getattr(orchestrator, "initialization_state", None) + if isinstance(getattr(orchestrator, "initialization_state", None), dict) + else {} + ) + + degraded_metadata = { + "is_degraded": False, + "reason": None, + "missing_agents": [], + } + + if active_agents_count < min_active_agents: + generation_path = "controlled_fallback" + degraded_metadata = { + "is_degraded": True, + "reason": "insufficient_active_agents", + "missing_agents": missing_agents, + } + activity.log_event( + event_type="committee_health", + severity="warning", + message="Committee degraded: insufficient active agents", + payload=build_agent_event_payload( + phase="planning", + step="committee_health_precheck", + progress_percent=5, + output_summary=f"Only {active_agents_count} active committee agents", + decision_reason="Agent committee below configured minimum", + evidence_refs=active_agent_names, + safe_debug=True, + metadata={ + "minimum_active_agents": min_active_agents, + "active_agents": { + "count": active_agents_count, + "names": active_agent_names, + }, + "missing_agents": missing_agents, + "onboarding_gated_initialization": onboarding_gated_initialization, + "orchestrator_initialization_state": initialization_state, + }, + ), + agent_type="TodayWorkflowGenerator", + ) # 2. Parallel "Committee" Proposal Gathering logger.info(f"Gathering daily task proposals from agent committee for user {user_id}") - + agent_tasks = [] - try: - # Define agents to poll - agents_to_poll = [ - orchestrator.agents.get('content'), # ContentStrategyAgent - orchestrator.agents.get('strategy'), # StrategyArchitectAgent - orchestrator.agents.get('seo'), # SEOOptimizationAgent - orchestrator.agents.get('social'), # SocialAmplificationAgent - orchestrator.agents.get('competitor'), # CompetitorResponseAgent - ] - - # Filter out None agents (disabled/failed init) - active_agents = [a for a in agents_to_poll if a] - - # Execute propose_daily_tasks in parallel - results = await asyncio.gather( - *[a.propose_daily_tasks(grounding) for a in active_agents], - return_exceptions=True - ) - - # Collect successful proposals - raw_proposals = [] - for res in results: - if isinstance(res, list): - raw_proposals.extend(res) - elif isinstance(res, Exception): - logger.warning(f"Agent proposal failed: {res}") + if generation_path == "committee": + try: + # Define agents to poll + agents_to_poll = [ + orchestrator.agents.get('content'), # ContentStrategyAgent + orchestrator.agents.get('strategy'), # StrategyArchitectAgent + orchestrator.agents.get('seo'), # SEOOptimizationAgent + orchestrator.agents.get('social'), # SocialAmplificationAgent + orchestrator.agents.get('competitor'), # CompetitorResponseAgent + ] - # 3. Filter Redundant Proposals (Self-Learning) - # Note: We need to ensure we don't filter out essential recurring tasks if they were completed long ago - # But for now, we filter exact duplicates from recent history (last 7 days) - # We can implement semantic filtering later - - # Simple deduplication based on title+pillar - unique_map = {} - for p in raw_proposals: - key = f"{p.pillar_id}:{p.title}" - if key not in unique_map: - unique_map[key] = p - continue + # Filter out None agents (disabled/failed init) + active_agents = [a for a in agents_to_poll if a] - existing = unique_map[key] - if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority): - unique_map[key] = p - continue + # Execute propose_daily_tasks in parallel + results = await asyncio.gather( + *[a.propose_daily_tasks(grounding) for a in active_agents], + return_exceptions=True + ) - # Deterministic tie-breaker for equal priority proposals. - if ( - _proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority) - and _proposal_order_key(p) < _proposal_order_key(existing) - ): - unique_map[key] = p - - agent_tasks = list(unique_map.values()) - - # Phase 3: Check memory for rejections (Semantic Filter) - agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks) - - except Exception as e: - logger.error(f"Committee proposal phase failed: {e}") - # Continue to fallback or LLM generation if committee fails + # Collect successful proposals + raw_proposals = [] + for res in results: + if isinstance(res, list): + raw_proposals.extend(res) + elif isinstance(res, Exception): + logger.warning(f"Agent proposal failed: {res}") - # 4. Final Selection - # If we have agent tasks, use them. Otherwise fall back to LLM generation. - if agent_tasks: + # Simple deduplication based on title+pillar + unique_map = {} + for p in raw_proposals: + key = f"{p.pillar_id}:{p.title}" + if key not in unique_map: + unique_map[key] = p + continue + + existing = unique_map[key] + if _proposal_priority_rank(p.priority) > _proposal_priority_rank(existing.priority): + unique_map[key] = p + continue + + # Deterministic tie-breaker for equal priority proposals. + if ( + _proposal_priority_rank(p.priority) == _proposal_priority_rank(existing.priority) + and _proposal_order_key(p) < _proposal_order_key(existing) + ): + unique_map[key] = p + + agent_tasks = list(unique_map.values()) + + # Check memory for rejections (semantic filter) + agent_tasks = await memory_service.filter_redundant_proposals(agent_tasks) + + except Exception as e: + logger.error(f"Committee proposal phase failed: {e}") + generation_path = "llm_fallback" + + # 3. Final Selection + if generation_path == "committee" and agent_tasks: logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee") - - # Convert TaskProposal objects to dicts for frontend + final_tasks = [] for prop in agent_tasks: final_tasks.append({ @@ -372,14 +434,50 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> "context_data": prop.context_data } }) - + final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) return { "date": date, - "tasks": final_tasks + "tasks": final_tasks, + "metadata": { + "generation_path": "committee", + "committee": { + "minimum_active_agents": min_active_agents, + "active_agents": { + "count": active_agents_count, + "names": active_agent_names, + }, + "onboarding_gated_initialization": onboarding_gated_initialization, + "orchestrator_initialization_state": initialization_state, + }, + "degraded": degraded_metadata, + }, } - # Fallback to original LLM generation if agents returned nothing + if generation_path != "controlled_fallback": + generation_path = "llm_fallback" + + if generation_path == "controlled_fallback": + fallback_tasks = _ensure_pillar_coverage(_fallback_tasks(date), user_id, date, grounding) + return { + "date": date, + "tasks": fallback_tasks, + "metadata": { + "generation_path": "controlled_fallback", + "committee": { + "minimum_active_agents": min_active_agents, + "active_agents": { + "count": active_agents_count, + "names": active_agent_names, + }, + "onboarding_gated_initialization": onboarding_gated_initialization, + "orchestrator_initialization_state": initialization_state, + }, + "degraded": degraded_metadata, + }, + } + + # Fallback to original LLM generation if committee returned nothing logger.info("Agent committee returned no tasks, falling back to LLM generation") schema = { @@ -457,17 +555,32 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> tasks = result.get("tasks") if isinstance(result, dict) else None if not isinstance(tasks, list) or not tasks: + generation_path = "controlled_fallback" tasks = _fallback_tasks(date) + result = { "date": date, "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), + "metadata": { + "generation_path": generation_path, + "committee": { + "minimum_active_agents": min_active_agents, + "active_agents": { + "count": active_agents_count, + "names": active_agent_names, + }, + "onboarding_gated_initialization": onboarding_gated_initialization, + "orchestrator_initialization_state": initialization_state, + }, + "degraded": degraded_metadata, + }, } activity.log_event( event_type="final_summary", severity="info", message="Daily workflow plan generated", - payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", []))}), + payload=build_agent_event_payload(phase="generation", step="workflow_generated", tool_name="llm_text_gen", progress_percent=100, output_summary=f"Generated {len(result.get('tasks', []))} tasks", decision_reason="Workflow assembled successfully", evidence_refs=[date], safe_debug=True, metadata={"date": date, "task_count": len(result.get("tasks", [])), "generation_path": generation_path}), run_id=run.id, agent_type="TodayWorkflowGenerator", ) From 4621107988962d6ae8bb14bd517771b0164dc99c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:40:29 +0530 Subject: [PATCH 06/15] Add degraded-mode workflow regeneration criteria and endpoint --- backend/api/today_workflow.py | 90 ++++++++++- backend/services/today_workflow_service.py | 176 +++++++++++++++++---- 2 files changed, 234 insertions(+), 32 deletions(-) diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 928840b3..2e1efc3f 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -1,13 +1,14 @@ from fastapi import APIRouter, Depends, HTTPException from typing import Any, Dict, Optional -from datetime import datetime +from datetime import datetime, timezone +from collections import defaultdict, deque from loguru import logger from sqlalchemy.orm import Session from middleware.auth_middleware import get_current_user from services.database import get_db -from services.today_workflow_service import get_or_create_daily_workflow_plan, update_task_status +from services.today_workflow_service import get_or_create_daily_workflow_plan, regenerate_daily_workflow_plan, update_task_status from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask import asyncio from services.intelligence.txtai_service import TxtaiIntelligenceService @@ -15,6 +16,27 @@ from services.intelligence.txtai_service import TxtaiIntelligenceService router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"]) +REGENERATE_WINDOW_SECONDS = 60 +REGENERATE_MAX_REQUESTS_PER_WINDOW = 3 +_regen_request_log: dict[str, deque[float]] = defaultdict(deque) + + +def _check_regenerate_rate_limit(user_id: str) -> None: + import time + + now = time.time() + window_start = now - REGENERATE_WINDOW_SECONDS + history = _regen_request_log[user_id] + + while history and history[0] < window_start: + history.popleft() + + if len(history) >= REGENERATE_MAX_REQUESTS_PER_WINDOW: + raise HTTPException(status_code=429, detail="Regeneration rate limit exceeded") + + history.append(now) + + async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str): svc = TxtaiIntelligenceService(user_id) items = [] @@ -160,6 +182,9 @@ async def get_today_workflow( "source": plan.source, "created_at": plan.created_at.isoformat() if plan.created_at else None, "updated_at": plan.updated_at.isoformat() if plan.updated_at else None, + "generation_mode": (plan.plan_json or {}).get("generation_mode"), + "quality_score": (plan.plan_json or {}).get("quality_score"), + "generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"), }, }, "timestamp": datetime.utcnow().isoformat(), @@ -167,6 +192,67 @@ async def get_today_workflow( } +@router.post("/regenerate") +async def regenerate_today_workflow( + date: Optional[str] = None, + current_user: dict = Depends(get_current_user), + db: Session = Depends(get_db), +) -> Dict[str, Any]: + from starlette.concurrency import run_in_threadpool + + user_id = str(current_user.get("id")) + _check_regenerate_rate_limit(user_id) + + plan = await regenerate_daily_workflow_plan(db, user_id, date=date) + + tasks = await run_in_threadpool( + lambda: ( + db.query(DailyWorkflowTask) + .filter(DailyWorkflowTask.plan_id == plan.id, DailyWorkflowTask.user_id == user_id) + .order_by(DailyWorkflowTask.created_at.asc()) + .all() + ) + ) + + response_tasks = [ + { + "id": str(t.id), + "pillarId": t.pillar_id, + "title": t.title, + "description": t.description, + "status": "skipped" if t.status == "dismissed" else t.status, + "priority": t.priority, + "estimatedTime": t.estimated_time, + "dependencies": t.dependencies or [], + "actionUrl": t.action_url, + "actionType": t.action_type, + "metadata": t.metadata_json or {}, + "enabled": bool(t.enabled), + } + for t in tasks + ] + + asyncio.create_task(_index_tasks_to_sif(user_id, plan.date, response_tasks, label="today_regenerated")) + + return { + "success": True, + "data": { + "plan": { + "id": plan.id, + "date": plan.date, + "source": plan.source, + "generation_mode": (plan.plan_json or {}).get("generation_mode"), + "quality_score": (plan.plan_json or {}).get("quality_score"), + "generated_with_agents": (plan.plan_json or {}).get("generated_with_agents"), + "regenerated_at": datetime.now(timezone.utc).isoformat(), + }, + "tasks": response_tasks, + }, + "timestamp": datetime.utcnow().isoformat(), + "user_id": user_id, + } + + from services.task_memory_service import TaskMemoryService @router.post("/tasks/{task_id}/status") diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 1be7cb21..e9862d70 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -1,3 +1,4 @@ +import hashlib import json from datetime import datetime, timezone from typing import Any, Dict, List, Optional @@ -8,9 +9,11 @@ from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask from models.agent_activity_models import AgentAlert from services.agent_activity_service import AgentActivityService, build_agent_event_payload from services.llm_providers.main_text_generation import llm_text_gen +from services.onboarding.progress_service import OnboardingProgressService from loguru import logger PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] +FALLBACK_REGENERATION_QUALITY_THRESHOLD = 0.6 def _today_date_str() -> str: @@ -107,6 +110,37 @@ def _fallback_tasks(date: str) -> List[Dict[str, Any]]: ] + + +def _compute_task_hash(title: str, description: str) -> str: + text = f"{title.strip().lower()}|{description.strip().lower()}" + return hashlib.sha256(text.encode()).hexdigest() + + +def _extract_plan_metadata(plan: Optional[DailyWorkflowPlan]) -> Dict[str, Any]: + raw = plan.plan_json if plan and isinstance(plan.plan_json, dict) else {} + return { + "generation_mode": str(raw.get("generation_mode") or "").strip().lower() or "unknown", + "quality_score": float(raw.get("quality_score") or 0.0), + "generated_with_agents": bool(raw.get("generated_with_agents", False)), + "onboarding_completed": bool(raw.get("onboarding_completed", False)), + "onboarding_completed_at": raw.get("onboarding_completed_at"), + } + + +def _get_onboarding_status(user_id: str) -> Dict[str, Any]: + status = OnboardingProgressService().get_onboarding_status(user_id) or {} + completed_at_raw = status.get("completed_at") + completed_at = None + if completed_at_raw: + try: + completed_at = datetime.fromisoformat(str(completed_at_raw).replace("Z", "+00:00")) + except Exception: + completed_at = None + return { + "is_completed": bool(status.get("is_completed", False)), + "completed_at": completed_at, + } def _is_coverage_guardrail_enabled(grounding: Dict[str, Any]) -> bool: workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} if not isinstance(workflow_config, dict): @@ -282,7 +316,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> orchestrator = await orchestration_service.get_or_create_orchestrator(user_id) except Exception as e: logger.error(f"Failed to get orchestrator: {e}") - return {"date": date, "tasks": _fallback_tasks(date)} + return {"date": date, "tasks": _fallback_tasks(date), "generation_mode": "fallback", "quality_score": 0.3, "generated_with_agents": False} # 2. Parallel "Committee" Proposal Gathering logger.info(f"Gathering daily task proposals from agent committee for user {user_id}") @@ -376,7 +410,10 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) return { "date": date, - "tasks": final_tasks + "tasks": final_tasks, + "generation_mode": "agent_committee", + "quality_score": 0.9, + "generated_with_agents": True, } # Fallback to original LLM generation if agents returned nothing @@ -435,6 +472,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> agent_type="TodayWorkflowGenerator", ) + used_fallback = False try: raw = llm_text_gen(prompt=prompt, json_struct=schema, user_id=user_id) if isinstance(raw, dict): @@ -443,6 +481,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> try: result = json.loads(raw) except Exception: + used_fallback = True result = {"date": date, "tasks": _fallback_tasks(date)} except Exception as e: activity.log_event( @@ -453,14 +492,19 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> run_id=run.id, agent_type="TodayWorkflowGenerator", ) + used_fallback = True result = {"date": date, "tasks": _fallback_tasks(date)} tasks = result.get("tasks") if isinstance(result, dict) else None if not isinstance(tasks, list) or not tasks: + used_fallback = True tasks = _fallback_tasks(date) result = { "date": date, "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), + "generation_mode": "fallback" if used_fallback else "llm", + "quality_score": 0.4 if used_fallback else 0.75, + "generated_with_agents": False, } activity.log_event( @@ -475,50 +519,83 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> return result -async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]: +async def regenerate_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> DailyWorkflowPlan: from starlette.concurrency import run_in_threadpool - + date_str = date or _today_date_str() - - def _get_existing(): - return ( + onboarding_status = _get_onboarding_status(user_id) + + existing = await run_in_threadpool( + lambda: ( db.query(DailyWorkflowPlan) .filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str) .first() ) - - existing = await run_in_threadpool(_get_existing) - + ) + + existing_hash_status = {} if existing: - return existing, False + existing_tasks = await run_in_threadpool( + lambda: ( + db.query(DailyWorkflowTask) + .filter(DailyWorkflowTask.plan_id == existing.id, DailyWorkflowTask.user_id == user_id) + .all() + ) + ) + for task in existing_tasks: + task_hash = _compute_task_hash(task.title, task.description) + existing_hash_status[task_hash] = { + "status": task.status, + "decided_at": task.decided_at, + "completion_notes": task.completion_notes, + } plan_data = await generate_agent_enhanced_plan(db, user_id, date_str) - tasks = plan_data.get("tasks", []) + plan_data["onboarding_completed"] = onboarding_status["is_completed"] + plan_data["onboarding_completed_at"] = onboarding_status["completed_at"].isoformat() if onboarding_status["completed_at"] else None - def _create_plan(): - plan = DailyWorkflowPlan( - user_id=user_id, - date=date_str, - source="agent", - plan_json=plan_data, - created_at=datetime.utcnow(), - updated_at=datetime.utcnow(), - ) - db.add(plan) - db.commit() - db.refresh(plan) + tasks = plan_data.get("tasks", []) if isinstance(plan_data, dict) else [] + + def _replace_plan() -> DailyWorkflowPlan: + if existing: + db.query(DailyWorkflowTask).filter(DailyWorkflowTask.plan_id == existing.id).delete(synchronize_session=False) + plan = existing + plan.source = "agent" + plan.plan_json = plan_data + plan.updated_at = datetime.utcnow() + db.add(plan) + db.commit() + db.refresh(plan) + else: + plan = DailyWorkflowPlan( + user_id=user_id, + date=date_str, + source="agent", + plan_json=plan_data, + created_at=datetime.utcnow(), + updated_at=datetime.utcnow(), + ) + db.add(plan) + db.commit() + db.refresh(plan) for t in tasks: pillar_id = str(t.get("pillarId") or "").lower().strip() if pillar_id not in PILLAR_IDS: continue + + title = str(t.get("title") or "Task").strip()[:255] + description = str(t.get("description") or "").strip() + task_hash = _compute_task_hash(title, description) + preserved = existing_hash_status.get(task_hash) or {} + task = DailyWorkflowTask( plan_id=plan.id, user_id=user_id, pillar_id=pillar_id, - title=str(t.get("title") or "Task").strip()[:255], - description=str(t.get("description") or "").strip(), - status=_coerce_status(t.get("status")), + title=title, + description=description, + status=preserved.get("status") or _coerce_status(t.get("status")), priority=_coerce_priority(t.get("priority")), estimated_time=int(t.get("estimatedTime") or 15), action_type=str(t.get("actionType") or "navigate").strip()[:20], @@ -528,14 +605,53 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt enabled=bool(t.get("enabled", True)), created_at=datetime.utcnow(), updated_at=datetime.utcnow(), + decided_at=preserved.get("decided_at"), + completion_notes=preserved.get("completion_notes"), ) db.add(task) - + db.commit() + db.refresh(plan) return plan - plan = await run_in_threadpool(_create_plan) - return plan, True + return await run_in_threadpool(_replace_plan) + + +async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]: + from starlette.concurrency import run_in_threadpool + + date_str = date or _today_date_str() + + existing = await run_in_threadpool( + lambda: ( + db.query(DailyWorkflowPlan) + .filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str) + .first() + ) + ) + + if existing: + metadata = _extract_plan_metadata(existing) + onboarding_status = _get_onboarding_status(user_id) + + should_regenerate = False + if metadata["generation_mode"] == "fallback" and metadata["quality_score"] < FALLBACK_REGENERATION_QUALITY_THRESHOLD: + should_regenerate = True + + if ( + onboarding_status["is_completed"] + and not metadata["onboarding_completed"] + ): + should_regenerate = True + + if should_regenerate: + regenerated = await regenerate_daily_workflow_plan(db, user_id, date=date_str) + return regenerated, True + + return existing, False + + created = await regenerate_daily_workflow_plan(db, user_id, date=date_str) + return created, True def update_task_status( From 84babd0407ea907f67bc92790f57ddcbebacdd29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:42:14 +0530 Subject: [PATCH 07/15] Add workflow provenance quality metrics and classification --- backend/api/today_workflow.py | 4 + backend/services/today_workflow_service.py | 94 +++++++++++++++++++++- 2 files changed, 97 insertions(+), 1 deletion(-) diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 928840b3..0c126a80 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -139,6 +139,9 @@ async def get_today_workflow( except Exception: pass + plan_json = plan.plan_json if isinstance(plan.plan_json, dict) else {} + quality = plan_json.get("quality") if isinstance(plan_json.get("quality"), dict) else None + return { "success": True, "data": { @@ -158,6 +161,7 @@ async def get_today_workflow( "id": plan.id, "date": plan.date, "source": plan.source, + "quality": quality, "created_at": plan.created_at.isoformat() if plan.created_at else None, "updated_at": plan.updated_at.isoformat() if plan.updated_at else None, }, diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 1be7cb21..e8a27a82 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -11,6 +11,10 @@ from services.llm_providers.main_text_generation import llm_text_gen from loguru import logger PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] +TASK_PROVENANCE_AGENT = "agent_proposal" +TASK_PROVENANCE_LLM_BACKFILL = "llm_backfill" +TASK_PROVENANCE_CONTROLLED_FALLBACK = "controlled_fallback" +DEFAULT_AGENT_PERSONALIZATION_THRESHOLD = 0.35 def _today_date_str() -> str: @@ -136,9 +140,77 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]: sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate" sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None sanitized["enabled"] = bool(task.get("enabled", True)) + metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {} + provenance = str(metadata.get("provenance") or "").strip().lower() + if provenance not in { + TASK_PROVENANCE_AGENT, + TASK_PROVENANCE_LLM_BACKFILL, + TASK_PROVENANCE_CONTROLLED_FALLBACK, + }: + if metadata.get("source") == TASK_PROVENANCE_CONTROLLED_FALLBACK: + provenance = TASK_PROVENANCE_CONTROLLED_FALLBACK + elif metadata.get("source") == "llm_pillar_backfill": + provenance = TASK_PROVENANCE_LLM_BACKFILL + elif metadata.get("source_agent"): + provenance = TASK_PROVENANCE_AGENT + else: + provenance = TASK_PROVENANCE_LLM_BACKFILL + metadata["provenance"] = provenance + sanitized["metadata"] = metadata return sanitized +def _agent_personalization_threshold(grounding: Dict[str, Any]) -> float: + workflow_config = grounding.get("workflow_config", {}) if isinstance(grounding, dict) else {} + configured = None + if isinstance(workflow_config, dict): + configured = workflow_config.get("min_agent_origin_ratio") + try: + value = float(configured) if configured is not None else DEFAULT_AGENT_PERSONALIZATION_THRESHOLD + except (TypeError, ValueError): + value = DEFAULT_AGENT_PERSONALIZATION_THRESHOLD + return max(0.0, min(1.0, value)) + + +def _compute_plan_quality(tasks: List[Dict[str, Any]], grounding: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + total_tasks = len(tasks) + agent_origin_tasks = [ + task for task in tasks + if isinstance(task.get("metadata"), dict) + and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_AGENT + ] + fallback_tasks = [ + task for task in tasks + if isinstance(task.get("metadata"), dict) + and task.get("metadata", {}).get("provenance") == TASK_PROVENANCE_CONTROLLED_FALLBACK + ] + agent_pillars = { + str(task.get("pillarId") or "").lower().strip() + for task in agent_origin_tasks + if str(task.get("pillarId") or "").lower().strip() in PILLAR_IDS + } + + agent_origin_ratio = (len(agent_origin_tasks) / total_tasks) if total_tasks else 0.0 + fallback_ratio = (len(fallback_tasks) / total_tasks) if total_tasks else 0.0 + threshold = _agent_personalization_threshold(grounding or {}) + classification = "AI-personalized" if agent_origin_ratio >= threshold else "guided baseline" + + return { + "classification": classification, + "agentOriginRatio": round(agent_origin_ratio, 4), + "agentOriginPercent": round(agent_origin_ratio * 100, 2), + "agentOriginTaskCount": len(agent_origin_tasks), + "agentOriginPillars": len(agent_pillars), + "fallbackRatio": round(fallback_ratio, 4), + "fallbackPercent": round(fallback_ratio * 100, 2), + "fallbackTaskCount": len(fallback_tasks), + "totalTaskCount": total_tasks, + "thresholds": { + "minAgentOriginRatio": threshold, + }, + } + + def _build_single_task_for_missing_pillar( user_id: str, date: str, @@ -208,6 +280,9 @@ def _ensure_pillar_coverage( generated = _build_single_task_for_missing_pillar(user_id, date, pillar_id, grounding) if generated: + metadata = generated.get("metadata") if isinstance(generated.get("metadata"), dict) else {} + metadata["provenance"] = TASK_PROVENANCE_LLM_BACKFILL + generated["metadata"] = metadata sanitized_tasks.append(generated) covered_pillars.add(pillar_id) continue @@ -216,6 +291,7 @@ def _ensure_pillar_coverage( if controlled_fallback: metadata = controlled_fallback.get("metadata") if isinstance(controlled_fallback.get("metadata"), dict) else {} metadata["source"] = "controlled_fallback" + metadata["provenance"] = TASK_PROVENANCE_CONTROLLED_FALLBACK controlled_fallback["metadata"] = metadata sanitized_tasks.append(controlled_fallback) covered_pillars.add(pillar_id) @@ -374,9 +450,11 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> }) final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding) + quality = _compute_plan_quality(final_tasks, grounding) return { "date": date, - "tasks": final_tasks + "tasks": final_tasks, + "quality": quality, } # Fallback to original LLM generation if agents returned nothing @@ -462,6 +540,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> "date": date, "tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding), } + result["quality"] = _compute_plan_quality(result.get("tasks", []), grounding) activity.log_event( event_type="final_summary", @@ -490,6 +569,19 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt existing = await run_in_threadpool(_get_existing) if existing: + existing_json = existing.plan_json if isinstance(existing.plan_json, dict) else {} + if not isinstance(existing_json.get("quality"), dict): + def _backfill_quality_for_existing(): + plan_json = existing.plan_json if isinstance(existing.plan_json, dict) else {} + tasks_for_quality = plan_json.get("tasks") if isinstance(plan_json.get("tasks"), list) else [] + plan_json["quality"] = _compute_plan_quality(tasks_for_quality, grounding={}) + existing.plan_json = plan_json + existing.updated_at = datetime.utcnow() + db.add(existing) + db.commit() + db.refresh(existing) + return existing + existing = await run_in_threadpool(_backfill_quality_for_existing) return existing, False plan_data = await generate_agent_enhanced_plan(db, user_id, date_str) From 7096f036230e410dd4b174bb8678183215ef6b6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:42:49 +0530 Subject: [PATCH 08/15] Add contextuality validation and low-context workflow status --- backend/api/today_workflow.py | 2 + backend/services/today_workflow_service.py | 157 ++++++++++++++++++++- backend/sif_release_readiness_checks.py | 48 ++++++- 3 files changed, 201 insertions(+), 6 deletions(-) diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 928840b3..cbde7cf0 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -158,6 +158,8 @@ async def get_today_workflow( "id": plan.id, "date": plan.date, "source": plan.source, + "quality_status": (plan.plan_json or {}).get("quality_status", "contextual"), + "contextuality_validation": (plan.plan_json or {}).get("contextuality_validation"), "created_at": plan.created_at.isoformat() if plan.created_at else None, "updated_at": plan.updated_at.isoformat() if plan.updated_at else None, }, diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 1be7cb21..9883767f 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -11,6 +11,8 @@ from services.llm_providers.main_text_generation import llm_text_gen from loguru import logger PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] +MIN_TASK_EVIDENCE_LINKS = 1 +PLAN_CONTEXT_THRESHOLD = 0.65 def _today_date_str() -> str: @@ -139,6 +141,116 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]: return sanitized +def _derive_onboarding_evidence_links(onboarding_data: Dict[str, Any], limit: int = 2) -> List[str]: + if not isinstance(onboarding_data, dict): + return [] + + links: List[str] = [] + for key, value in onboarding_data.items(): + if key == "workflow_config": + continue + if value in (None, "", [], {}): + continue + links.append(f"onboarding:{key}") + if len(links) >= limit: + break + return links + + +def _valid_evidence_links(evidence_links: Any, grounding: Dict[str, Any]) -> List[str]: + if not isinstance(evidence_links, list): + return [] + + onboarding_data = grounding.get("onboarding_data", {}) if isinstance(grounding, dict) else {} + if not isinstance(onboarding_data, dict): + onboarding_data = {} + valid_onboarding_keys = {str(k) for k in onboarding_data.keys()} + + recent_alerts = grounding.get("recent_agent_alerts", []) if isinstance(grounding, dict) else [] + valid_alert_ids = { + str(a.get("alert_id")) + for a in recent_alerts + if isinstance(a, dict) and a.get("alert_id") is not None + } + + valid_links: List[str] = [] + for raw in evidence_links: + link = str(raw or "").strip() + if not link: + continue + + if link.startswith("onboarding:"): + key = link.split(":", 1)[1].strip() + if key and key in valid_onboarding_keys: + valid_links.append(link) + elif link.startswith("alert:"): + alert_id = link.split(":", 1)[1].strip() + if alert_id and alert_id in valid_alert_ids: + valid_links.append(link) + + return valid_links + + +def validate_plan_contextuality(plan: Dict[str, Any], grounding: Dict[str, Any]) -> Dict[str, Any]: + tasks = plan.get("tasks") if isinstance(plan, dict) else None + if not isinstance(tasks, list) or not tasks: + return { + "score": 0.0, + "threshold": PLAN_CONTEXT_THRESHOLD, + "is_contextual": False, + "task_scores": [], + "tasks_below_min_evidence": 0, + "min_evidence_links": MIN_TASK_EVIDENCE_LINKS, + } + + task_scores = [] + below_min_evidence = 0 + + for idx, task in enumerate(tasks): + metadata = task.get("metadata") if isinstance(task, dict) else {} + metadata = metadata if isinstance(metadata, dict) else {} + evidence_links = _valid_evidence_links(metadata.get("evidence_links"), grounding) + has_min_evidence = len(evidence_links) >= MIN_TASK_EVIDENCE_LINKS + if not has_min_evidence: + below_min_evidence += 1 + + reasoning_text = str(metadata.get("reasoning") or task.get("description") or "").lower() + onboarding_hits = sum(1 for l in evidence_links if l.startswith("onboarding:")) + alert_hits = sum(1 for l in evidence_links if l.startswith("alert:")) + + score = 0.0 + if has_min_evidence: + score += 0.6 + if onboarding_hits > 0: + score += 0.2 + if alert_hits > 0: + score += 0.2 + elif "alert" in reasoning_text: + score += 0.1 + + task_scores.append( + { + "task_index": idx, + "pillarId": task.get("pillarId"), + "title": task.get("title"), + "score": min(score, 1.0), + "evidence_links": evidence_links, + "has_min_evidence": has_min_evidence, + } + ) + + plan_score = sum(t["score"] for t in task_scores) / len(task_scores) + is_contextual = plan_score >= PLAN_CONTEXT_THRESHOLD and below_min_evidence == 0 + return { + "score": round(plan_score, 3), + "threshold": PLAN_CONTEXT_THRESHOLD, + "is_contextual": is_contextual, + "task_scores": task_scores, + "tasks_below_min_evidence": below_min_evidence, + "min_evidence_links": MIN_TASK_EVIDENCE_LINKS, + } + + def _build_single_task_for_missing_pillar( user_id: str, date: str, @@ -253,6 +365,7 @@ def build_grounding_context(db: Session, user_id: str, date: str) -> Dict[str, A return { "recent_agent_alerts": [ { + "alert_id": a.id, "title": a.title, "message": a.message, "created_at": a.created_at.isoformat(), @@ -272,9 +385,15 @@ from services.task_memory_service import TaskMemoryService # Initialize orchestration service (singleton) orchestration_service = AgentOrchestrationService() -async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> Dict[str, Any]: +async def generate_agent_enhanced_plan( + db: Session, + user_id: str, + date: str, + grounding: Optional[Dict[str, Any]] = None, + strict_contextuality: bool = False, +) -> Dict[str, Any]: activity = AgentActivityService(db, user_id) - grounding = build_grounding_context(db, user_id, date) + grounding = grounding or build_grounding_context(db, user_id, date) memory_service = TaskMemoryService(user_id, db) # 1. Get Orchestrator @@ -351,7 +470,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> # 4. Final Selection # If we have agent tasks, use them. Otherwise fall back to LLM generation. - if agent_tasks: + if agent_tasks and not strict_contextuality: logger.info(f"Generated {len(agent_tasks)} tasks via Agent Committee") # Convert TaskProposal objects to dicts for frontend @@ -369,7 +488,8 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> "metadata": { "source_agent": prop.source_agent, "reasoning": prop.reasoning, - "context_data": prop.context_data + "context_data": prop.context_data, + "evidence_links": _derive_onboarding_evidence_links(grounding.get("onboarding_data", {}), limit=2), } }) @@ -425,6 +545,15 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> f"Grounding context (Alerts):\n{json.dumps(grounding.get('recent_agent_alerts', []), indent=2)}\n" ) + if strict_contextuality: + prompt += ( + "\nStrict contextuality mode (must follow):\n" + f"- Every task.metadata must include evidence_links with at least {MIN_TASK_EVIDENCE_LINKS} entries.\n" + "- evidence_links entries must use either 'onboarding:' or 'alert:' format.\n" + "- Include metadata.reasoning that explains how the evidence applies to the task.\n" + "- Reject generic tasks without explicit ties to onboarding data or active alerts.\n" + ) + run = activity.start_run(agent_type="TodayWorkflowGenerator", prompt=prompt[:4000]) activity.log_event( event_type="plan", @@ -492,7 +621,25 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt if existing: return existing, False - plan_data = await generate_agent_enhanced_plan(db, user_id, date_str) + grounding = build_grounding_context(db, user_id, date_str) + plan_data = await generate_agent_enhanced_plan(db, user_id, date_str, grounding=grounding) + validation = validate_plan_contextuality(plan_data, grounding) + + if not validation.get("is_contextual"): + logger.info("Plan contextuality below threshold for user {}. Running strict regeneration.", user_id) + regenerated_plan = await generate_agent_enhanced_plan( + db, + user_id, + date_str, + grounding=grounding, + strict_contextuality=True, + ) + regenerated_validation = validate_plan_contextuality(regenerated_plan, grounding) + plan_data = regenerated_plan + validation = regenerated_validation + + plan_data["quality_status"] = "contextual" if validation.get("is_contextual") else "low_context" + plan_data["contextuality_validation"] = validation tasks = plan_data.get("tasks", []) def _create_plan(): diff --git a/backend/sif_release_readiness_checks.py b/backend/sif_release_readiness_checks.py index 9d79e14b..d0ef7700 100644 --- a/backend/sif_release_readiness_checks.py +++ b/backend/sif_release_readiness_checks.py @@ -8,7 +8,7 @@ if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from services.intelligence.monitoring.semantic_dashboard import RealTimeSemanticMonitor, SemanticHealthMetric -from services.today_workflow_service import _ensure_pillar_coverage, PILLAR_IDS +from services.today_workflow_service import _ensure_pillar_coverage, PILLAR_IDS, validate_plan_contextuality from services.intelligence.sif_agents import ContentGuardianAgent as SifGuardian from services.intelligence.agents.specialized_agents import ContentGuardianAgent as SpecializedGuardian @@ -74,6 +74,52 @@ class SIFReleaseReadinessTests(unittest.IsolatedAsyncioTestCase): self.assertIn("warning", result) self.assertEqual(result["method"], "competitor_index_search") + + def test_validate_plan_contextuality_passes_with_evidence_links(self): + plan = { + "tasks": [ + { + "pillarId": "plan", + "title": "Review strategy", + "description": "Use onboarding goals", + "metadata": { + "evidence_links": ["onboarding:business_goals", "alert:101"], + "reasoning": "Based on onboarding and alert", + }, + } + ] + } + grounding = { + "onboarding_data": {"business_goals": ["awareness"]}, + "recent_agent_alerts": [{"alert_id": 101, "title": "Drop in traffic"}], + } + + validation = validate_plan_contextuality(plan, grounding) + + self.assertTrue(validation["is_contextual"]) + self.assertEqual(validation["tasks_below_min_evidence"], 0) + + def test_validate_plan_contextuality_flags_missing_evidence_links(self): + plan = { + "tasks": [ + { + "pillarId": "generate", + "title": "Write generic post", + "description": "Create a post", + "metadata": {"reasoning": "General best practice"}, + } + ] + } + grounding = { + "onboarding_data": {"business_goals": ["awareness"]}, + "recent_agent_alerts": [{"alert_id": 101, "title": "Drop in traffic"}], + } + + validation = validate_plan_contextuality(plan, grounding) + + self.assertFalse(validation["is_contextual"]) + self.assertEqual(validation["tasks_below_min_evidence"], 1) + def test_pillar_coverage_guardrail_backfills_missing(self): tasks = [{"pillarId": "plan", "title": "Plan", "description": "d", "priority": "high", "estimatedTime": 10, "actionType": "navigate", "enabled": True}] grounding = {"workflow_config": {"enforce_pillar_coverage": True}} From acecf2a3f4fc804f9443e486d28552c6d70b7721 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:43:40 +0530 Subject: [PATCH 09/15] Fix task outcome feedback scoring to use normalized status --- backend/api/today_workflow.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 928840b3..1047fef5 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -189,10 +189,18 @@ async def set_task_status( # Record outcome in memory for self-learning try: memory = TaskMemoryService(user_id, db) + normalized_status = (task.status or "").lower() + if normalized_status == "completed": + feedback_score = 1 + elif normalized_status in {"skipped", "dismissed", "rejected"}: + feedback_score = -1 + else: + feedback_score = 0 + await memory.record_task_outcome( - task, - feedback_score=1 if status == "completed" else -1 if status == "dismissed" else 0, - feedback_text=completion_notes + task, + feedback_score=feedback_score, + feedback_text=completion_notes, ) except Exception as e: logger.warning( From 2403d92f9d0f9c92cec7a66b5912d98668450cc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:44:23 +0530 Subject: [PATCH 10/15] Normalize today workflow task dependencies payload --- backend/api/today_workflow.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 928840b3..bf48a947 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -1,6 +1,7 @@ from fastapi import APIRouter, Depends, HTTPException from typing import Any, Dict, Optional from datetime import datetime +import json from loguru import logger from sqlalchemy.orm import Session @@ -15,6 +16,20 @@ from services.intelligence.txtai_service import TxtaiIntelligenceService router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"]) + +def _normalize_dependencies(dependencies: Any) -> list: + if dependencies is None: + return [] + if isinstance(dependencies, list): + return dependencies + if isinstance(dependencies, str): + try: + parsed = json.loads(dependencies) + return parsed if isinstance(parsed, list) else [] + except json.JSONDecodeError: + return [] + return [] + async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str): svc = TxtaiIntelligenceService(user_id) items = [] @@ -73,7 +88,7 @@ async def get_today_workflow( "status": "skipped" if t.status == "dismissed" else t.status, "priority": t.priority, "estimatedTime": t.estimated_time, - "dependencies": t.dependencies or [], + "dependencies": _normalize_dependencies(t.dependencies), "actionUrl": t.action_url, "actionType": t.action_type, "metadata": t.metadata_json or {}, @@ -133,6 +148,7 @@ async def get_today_workflow( "title": t.title, "description": t.description, "status": "skipped" if t.status == "dismissed" else t.status, + "dependencies": _normalize_dependencies(t.dependencies), } ) asyncio.create_task(_index_tasks_to_sif(user_id, y_str, y_response, label="yesterday")) From e694e6172fd6c2667ae8620a70305907d262c8de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:45:25 +0530 Subject: [PATCH 11/15] Validate plan date before yesterday workflow indexing --- backend/api/today_workflow.py | 62 +++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 928840b3..368d6a8b 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -4,6 +4,7 @@ from datetime import datetime from loguru import logger from sqlalchemy.orm import Session +from sqlalchemy.exc import SQLAlchemyError from middleware.auth_middleware import get_current_user from services.database import get_db @@ -100,11 +101,21 @@ async def get_today_workflow( if created: asyncio.create_task(_index_tasks_to_sif(user_id, plan.date, response_tasks, label="today")) - try: - from datetime import date as date_type, timedelta + from datetime import date as date_type, timedelta + + try: + parsed_plan_date = date_type.fromisoformat(plan.date) + except ValueError: + logger.warning( + "Invalid plan.date format; skipping yesterday indexing plan_id={} user_id={} plan_date={} reason={}", + plan.id, + user_id, + plan.date, + "plan.date is not in ISO format YYYY-MM-DD", + ) + else: + y_str = (parsed_plan_date - timedelta(days=1)).isoformat() - y_str = (date_type.fromisoformat(plan.date) - timedelta(days=1)).isoformat() - def _fetch_yesterday(): y_plan = ( db.query(DailyWorkflowPlan) @@ -121,23 +132,32 @@ async def get_today_workflow( return y_tasks return [] - y_tasks = await run_in_threadpool(_fetch_yesterday) - - if y_tasks: - y_response = [] - for t in y_tasks: - y_response.append( - { - "id": str(t.id), - "pillarId": t.pillar_id, - "title": t.title, - "description": t.description, - "status": "skipped" if t.status == "dismissed" else t.status, - } - ) - asyncio.create_task(_index_tasks_to_sif(user_id, y_str, y_response, label="yesterday")) - except Exception: - pass + try: + y_tasks = await run_in_threadpool(_fetch_yesterday) + except SQLAlchemyError as db_error: + logger.warning( + "Failed to fetch yesterday tasks; skipping yesterday indexing plan_id={} user_id={} plan_date={} yesterday_date={} error_class={} error_message={}", + plan.id, + user_id, + plan.date, + y_str, + type(db_error).__name__, + str(db_error), + ) + else: + if y_tasks: + y_response = [] + for t in y_tasks: + y_response.append( + { + "id": str(t.id), + "pillarId": t.pillar_id, + "title": t.title, + "description": t.description, + "status": "skipped" if t.status == "dismissed" else t.status, + } + ) + asyncio.create_task(_index_tasks_to_sif(user_id, y_str, y_response, label="yesterday")) return { "success": True, From 62d5cf773e254591e01f14b82b325c23658556b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Fri, 6 Mar 2026 21:45:48 +0530 Subject: [PATCH 12/15] Add typed request model for today workflow task status updates --- backend/api/today_workflow.py | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 928840b3..e73dd5a7 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -1,7 +1,9 @@ from fastapi import APIRouter, Depends, HTTPException from typing import Any, Dict, Optional from datetime import datetime +from enum import Enum from loguru import logger +from pydantic import BaseModel, Field from sqlalchemy.orm import Session @@ -15,6 +17,23 @@ from services.intelligence.txtai_service import TxtaiIntelligenceService router = APIRouter(prefix="/api/today-workflow", tags=["Today Workflow"]) + +class TaskStatusEnum(str, Enum): + pending = "pending" + in_progress = "in_progress" + completed = "completed" + skipped = "skipped" + dismissed = "dismissed" + + +class TaskStatusUpdateRequest(BaseModel): + status: TaskStatusEnum = Field(..., description="New task status") + completion_notes: Optional[str] = Field( + None, + max_length=4000, + description="Optional notes about task completion or outcome", + ) + async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: str): svc = TxtaiIntelligenceService(user_id) items = [] @@ -172,15 +191,13 @@ from services.task_memory_service import TaskMemoryService @router.post("/tasks/{task_id}/status") async def set_task_status( task_id: int, - body: Dict[str, Any], + body: TaskStatusUpdateRequest, current_user: dict = Depends(get_current_user), db: Session = Depends(get_db), ) -> Dict[str, Any]: user_id = str(current_user.get("id")) - status = body.get("status") - if not status: - raise HTTPException(status_code=400, detail="status is required") - completion_notes = body.get("completion_notes") + status = body.status.value + completion_notes = body.completion_notes task = update_task_status(db, user_id, task_id, status=status, completion_notes=completion_notes) if not task: From 8b554a35c4dc7ea0dfbc0f5a8e7dde8e818efc55 Mon Sep 17 00:00:00 2001 From: ajaysi Date: Sat, 7 Mar 2026 11:51:59 +0530 Subject: [PATCH 13/15] fix: Resolve dependency conflicts, scheduler status error, and frontend config (Closes #382) --- backend/requirements.txt | 4 +++- .../executors/onboarding_full_website_analysis_executor.py | 2 +- frontend/src/App.tsx | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/backend/requirements.txt b/backend/requirements.txt index 6f3cb35f..9d695301 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -1,5 +1,7 @@ # Core dependencies -fastapi>=0.104.0 +fastapi>=0.115.14 +starlette>=0.40.0,<0.47.0 +sse-starlette<3.0.0 uvicorn>=0.24.0 python-multipart>=0.0.6 python-dotenv>=1.0.0 diff --git a/backend/services/scheduler/executors/onboarding_full_website_analysis_executor.py b/backend/services/scheduler/executors/onboarding_full_website_analysis_executor.py index 47f9bc30..15efc4e7 100644 --- a/backend/services/scheduler/executors/onboarding_full_website_analysis_executor.py +++ b/backend/services/scheduler/executors/onboarding_full_website_analysis_executor.py @@ -81,7 +81,7 @@ class OnboardingFullWebsiteAnalysisExecutor(TaskExecutor): task.last_executed = datetime.utcnow() task.last_success = datetime.utcnow() - task.status = 'paused' + task.status = 'completed' # Explicitly mark as completed instead of paused task.next_execution = None task.consecutive_failures = 0 task.failure_pattern = None diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index fc168638..46579c85 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -544,6 +544,7 @@ const App: React.FC = () => { // Get environment variables with fallbacks const clerkPublishableKey = process.env.REACT_APP_CLERK_PUBLISHABLE_KEY || ''; + const clerkJSUrl = process.env.REACT_APP_CLERK_JS_URL; // Show error if required keys are missing if (!clerkPublishableKey) { @@ -654,7 +655,7 @@ const App: React.FC = () => { // TODO: Send to error tracking service (Sentry, LogRocket, etc.) }} > - + {renderApp()} From 5780deff2f30a1e30b3f807bd2b57cd455688880 Mon Sep 17 00:00:00 2001 From: ajaysi Date: Sat, 7 Mar 2026 11:57:26 +0530 Subject: [PATCH 14/15] fix: Normalize specialized agent pillar IDs and log invalid proposals (Closes #383) --- backend/services/today_workflow_service.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 1be7cb21..08eeee69 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -311,7 +311,14 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) -> raw_proposals = [] for res in results: if isinstance(res, list): - raw_proposals.extend(res) + # Normalize pillar IDs and filter invalid proposals + for proposal in res: + pillar_id = str(proposal.get("pillarId") or "").lower().strip() + if pillar_id not in PILLAR_IDS: + logger.warning(f"Skipping proposal with invalid pillarId: {pillar_id}. Proposal: {proposal}") + continue + proposal["pillarId"] = pillar_id + raw_proposals.append(proposal) elif isinstance(res, Exception): logger.warning(f"Agent proposal failed: {res}") From a00212ca4d17c49b4deaf6e73c8dcd45ce469867 Mon Sep 17 00:00:00 2001 From: ajaysi Date: Sat, 7 Mar 2026 12:00:04 +0530 Subject: [PATCH 15/15] refactor: Unify canonical task outcome statuses (completed, skipped) across workflow and memory services (Closes #384) --- backend/api/today_workflow.py | 6 +++--- backend/services/task_memory_service.py | 4 ++-- backend/services/today_workflow_service.py | 1 + 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index 928840b3..ed89e7aa 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -191,7 +191,7 @@ async def set_task_status( memory = TaskMemoryService(user_id, db) await memory.record_task_outcome( task, - feedback_score=1 if status == "completed" else -1 if status == "dismissed" else 0, + feedback_score=1 if status == "completed" else -1 if status in ("dismissed", "skipped") else 0, feedback_text=completion_notes ) except Exception as e: @@ -210,7 +210,7 @@ async def set_task_status( "pillarId": task.pillar_id, "title": task.title, "description": task.description, - "status": "skipped" if task.status == "dismissed" else task.status, + "status": "skipped" if task.status in ("dismissed", "skipped") else task.status, } asyncio.create_task(_index_tasks_to_sif(user_id, plan_date, [task_payload], label="today")) @@ -220,7 +220,7 @@ async def set_task_status( "task": { "id": str(task.id), "pillarId": task.pillar_id, - "status": "skipped" if task.status == "dismissed" else task.status, + "status": "skipped" if task.status in ("dismissed", "skipped") else task.status, "decided_at": task.decided_at.isoformat() if task.decided_at else None, } }, diff --git a/backend/services/task_memory_service.py b/backend/services/task_memory_service.py index 94cab477..6c5eea90 100644 --- a/backend/services/task_memory_service.py +++ b/backend/services/task_memory_service.py @@ -15,7 +15,7 @@ from services.intelligence.txtai_service import TxtaiIntelligenceService EXACT_DUPLICATE_LOOKBACK_DAYS = 7 SEMANTIC_SUPPRESSION_SCORE_THRESHOLD = 0.85 -SUPPRESSED_STATUSES = {"dismissed", "rejected"} +SUPPRESSED_STATUSES = {"dismissed", "rejected", "skipped"} class TaskMemoryService: """ @@ -72,7 +72,7 @@ class TaskMemoryService: self.db.commit() # 2. Index into txtai (if status is meaningful) - if task.status in ["completed", "dismissed", "rejected"]: + if task.status in ["completed", "dismissed", "rejected", "skipped"]: # We index the task text with metadata about its outcome # This allows us to search: "Has the user rejected similar tasks?" doc = { diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 08eeee69..62ab39c3 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -25,6 +25,7 @@ def _coerce_priority(value: Any) -> str: def _coerce_status(value: Any) -> str: v = str(value or "pending").lower().strip() if v in {"pending", "in_progress", "completed", "skipped", "dismissed"}: + # Canonicalize 'dismissed' to 'skipped' for consistency return "skipped" if v == "dismissed" else v return "pending"