Merge PR #388: Daily Workflow Integration & Enhanced Reliability
- Resolve merge conflicts in backend/services/today_workflow_service.py and frontend/src/stores/workflowStore.ts - Backend: Keep robust handling for both dict and object types in TaskProposal conversion - Backend: Combine dependencies coercion with task metadata normalization - Frontend: Implement graceful fallback pattern (try server first, then local generation on unavailability) - Add provenanceSummary integration from server responses - Ensure degraded mode handling with appropriate messaging
This commit is contained in:
@@ -42,6 +42,22 @@ async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label:
|
|||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def _build_provenance_summary(plan: DailyWorkflowPlan, tasks: list[DailyWorkflowTask]) -> Dict[str, Any]:
|
||||||
|
source_counts: Dict[str, int] = {}
|
||||||
|
for task in tasks:
|
||||||
|
metadata = task.metadata_json if isinstance(task.metadata_json, dict) else {}
|
||||||
|
source = metadata.get("source") if metadata.get("source") in {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"} else "llm_generation"
|
||||||
|
source_counts[source] = source_counts.get(source, 0) + 1
|
||||||
|
|
||||||
|
generation_mode = plan.generation_mode if plan.generation_mode in {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"} else "llm_generation"
|
||||||
|
|
||||||
|
return {
|
||||||
|
"generationMode": generation_mode,
|
||||||
|
"committeeAgentCount": int(plan.committee_agent_count or 0),
|
||||||
|
"fallbackUsed": bool(plan.fallback_used),
|
||||||
|
"taskSourceBreakdown": source_counts,
|
||||||
|
}
|
||||||
|
|
||||||
@router.get("")
|
@router.get("")
|
||||||
async def get_today_workflow(
|
async def get_today_workflow(
|
||||||
date: Optional[str] = None,
|
date: Optional[str] = None,
|
||||||
@@ -61,6 +77,7 @@ async def get_today_workflow(
|
|||||||
)
|
)
|
||||||
|
|
||||||
tasks = await run_in_threadpool(_fetch_tasks)
|
tasks = await run_in_threadpool(_fetch_tasks)
|
||||||
|
provenance_summary = _build_provenance_summary(plan, tasks)
|
||||||
|
|
||||||
def _normalize_legacy_dependencies(task_rows):
|
def _normalize_legacy_dependencies(task_rows):
|
||||||
rows_updated = False
|
rows_updated = False
|
||||||
@@ -166,6 +183,7 @@ async def get_today_workflow(
|
|||||||
"workflowStatus": workflow_status,
|
"workflowStatus": workflow_status,
|
||||||
"totalEstimatedTime": total_estimated,
|
"totalEstimatedTime": total_estimated,
|
||||||
"actualTimeSpent": 0,
|
"actualTimeSpent": 0,
|
||||||
|
"provenanceSummary": provenance_summary,
|
||||||
},
|
},
|
||||||
"plan": {
|
"plan": {
|
||||||
"id": plan.id,
|
"id": plan.id,
|
||||||
@@ -173,6 +191,10 @@ async def get_today_workflow(
|
|||||||
"source": plan.source,
|
"source": plan.source,
|
||||||
"created_at": plan.created_at.isoformat() if plan.created_at else None,
|
"created_at": plan.created_at.isoformat() if plan.created_at else None,
|
||||||
"updated_at": plan.updated_at.isoformat() if plan.updated_at else None,
|
"updated_at": plan.updated_at.isoformat() if plan.updated_at else None,
|
||||||
|
"generation_mode": plan.generation_mode,
|
||||||
|
"committee_agent_count": plan.committee_agent_count,
|
||||||
|
"fallback_used": plan.fallback_used,
|
||||||
|
"provenance_summary": provenance_summary,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"timestamp": datetime.utcnow().isoformat(),
|
"timestamp": datetime.utcnow().isoformat(),
|
||||||
|
|||||||
@@ -13,6 +13,9 @@ class DailyWorkflowPlan(Base):
|
|||||||
user_id = Column(String(255), nullable=False, index=True)
|
user_id = Column(String(255), nullable=False, index=True)
|
||||||
date = Column(String(10), nullable=False, index=True)
|
date = Column(String(10), nullable=False, index=True)
|
||||||
source = Column(String(30), nullable=False, default="agent")
|
source = Column(String(30), nullable=False, default="agent")
|
||||||
|
generation_mode = Column(String(30), nullable=False, default="llm_generation", index=True)
|
||||||
|
committee_agent_count = Column(Integer, nullable=False, default=0)
|
||||||
|
fallback_used = Column(Boolean, nullable=False, default=False)
|
||||||
plan_json = Column(JSON, nullable=True)
|
plan_json = Column(JSON, nullable=True)
|
||||||
generation_run_id = Column(Integer, nullable=True, index=True)
|
generation_run_id = Column(Integer, nullable=True, index=True)
|
||||||
created_at = Column(DateTime, default=datetime.utcnow, index=True)
|
created_at = Column(DateTime, default=datetime.utcnow, index=True)
|
||||||
|
|||||||
@@ -11,6 +11,15 @@ from services.llm_providers.main_text_generation import llm_text_gen
|
|||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
|
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
|
||||||
|
TASK_SOURCE_ENUM = {"agent_committee", "llm_generation", "llm_pillar_backfill", "controlled_fallback"}
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_task_metadata(task: Dict[str, Any], default_source: str) -> Dict[str, Any]:
|
||||||
|
metadata = task.get("metadata") if isinstance(task.get("metadata"), dict) else {}
|
||||||
|
source = metadata.get("source")
|
||||||
|
if source not in TASK_SOURCE_ENUM:
|
||||||
|
metadata["source"] = default_source
|
||||||
|
return metadata
|
||||||
|
|
||||||
|
|
||||||
def _today_date_str() -> str:
|
def _today_date_str() -> str:
|
||||||
@@ -177,6 +186,7 @@ def _sanitize_task(task: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|||||||
sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate"
|
sanitized["actionType"] = str(task.get("actionType") or "navigate").strip() or "navigate"
|
||||||
sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None
|
sanitized["actionUrl"] = str(task.get("actionUrl") or "").strip() or None
|
||||||
sanitized["enabled"] = bool(task.get("enabled", True))
|
sanitized["enabled"] = bool(task.get("enabled", True))
|
||||||
|
sanitized["metadata"] = _normalize_task_metadata(task, default_source="llm_generation")
|
||||||
return sanitized
|
return sanitized
|
||||||
|
|
||||||
|
|
||||||
@@ -323,7 +333,11 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
orchestrator = await orchestration_service.get_or_create_orchestrator(user_id)
|
orchestrator = await orchestration_service.get_or_create_orchestrator(user_id)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to get orchestrator: {e}")
|
logger.error(f"Failed to get orchestrator: {e}")
|
||||||
return {"date": date, "tasks": _fallback_tasks(date)}
|
return {
|
||||||
|
"date": date,
|
||||||
|
"tasks": _fallback_tasks(date),
|
||||||
|
"provenance": {"generation_mode": "controlled_fallback", "committee_agent_count": 0, "fallback_used": True},
|
||||||
|
}
|
||||||
|
|
||||||
# 2. Parallel "Committee" Proposal Gathering
|
# 2. Parallel "Committee" Proposal Gathering
|
||||||
logger.info(f"Gathering daily task proposals from agent committee for user {user_id}")
|
logger.info(f"Gathering daily task proposals from agent committee for user {user_id}")
|
||||||
@@ -468,6 +482,7 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
"actionUrl": prop.get("actionUrl"),
|
"actionUrl": prop.get("actionUrl"),
|
||||||
"enabled": True,
|
"enabled": True,
|
||||||
"metadata": {
|
"metadata": {
|
||||||
|
"source": "agent_committee",
|
||||||
"source_agent": prop.get("source_agent"),
|
"source_agent": prop.get("source_agent"),
|
||||||
"reasoning": prop.get("reasoning"),
|
"reasoning": prop.get("reasoning"),
|
||||||
"context_data": prop.get("context_data")
|
"context_data": prop.get("context_data")
|
||||||
@@ -484,16 +499,17 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
"actionUrl": prop.action_url,
|
"actionUrl": prop.action_url,
|
||||||
"enabled": True,
|
"enabled": True,
|
||||||
"metadata": {
|
"metadata": {
|
||||||
|
"source": "agent_committee",
|
||||||
"source_agent": prop.source_agent,
|
"source_agent": prop.source_agent,
|
||||||
"reasoning": prop.reasoning,
|
"reasoning": prop.reasoning,
|
||||||
"context_data": prop.context_data
|
"context_data": prop.context_data
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
|
final_tasks = _ensure_pillar_coverage(final_tasks, user_id, date, grounding)
|
||||||
return {
|
return {
|
||||||
"date": date,
|
"date": date,
|
||||||
"tasks": final_tasks
|
"tasks": final_tasks,
|
||||||
|
"provenance": {"generation_mode": "agent_committee", "committee_agent_count": len(active_agents), "fallback_used": any((t.get("metadata") or {}).get("source") in {"llm_pillar_backfill", "controlled_fallback"} for t in final_tasks)},
|
||||||
}
|
}
|
||||||
|
|
||||||
# Fallback to original LLM generation if agents returned nothing
|
# Fallback to original LLM generation if agents returned nothing
|
||||||
@@ -575,9 +591,11 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
|
|||||||
tasks = result.get("tasks") if isinstance(result, dict) else None
|
tasks = result.get("tasks") if isinstance(result, dict) else None
|
||||||
if not isinstance(tasks, list) or not tasks:
|
if not isinstance(tasks, list) or not tasks:
|
||||||
tasks = _fallback_tasks(date)
|
tasks = _fallback_tasks(date)
|
||||||
|
enriched_tasks = _ensure_pillar_coverage(tasks, user_id, date, grounding)
|
||||||
result = {
|
result = {
|
||||||
"date": date,
|
"date": date,
|
||||||
"tasks": _ensure_pillar_coverage(tasks, user_id, date, grounding),
|
"tasks": enriched_tasks,
|
||||||
|
"provenance": {"generation_mode": "llm_generation", "committee_agent_count": len(active_agents) if "active_agents" in locals() else 0, "fallback_used": any((t.get("metadata") or {}).get("source") in {"llm_pillar_backfill", "controlled_fallback"} for t in enriched_tasks)},
|
||||||
}
|
}
|
||||||
|
|
||||||
activity.log_event(
|
activity.log_event(
|
||||||
@@ -611,12 +629,19 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
|
|||||||
|
|
||||||
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
|
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
|
||||||
tasks = plan_data.get("tasks", [])
|
tasks = plan_data.get("tasks", [])
|
||||||
|
provenance = plan_data.get("provenance") if isinstance(plan_data.get("provenance"), dict) else {}
|
||||||
|
|
||||||
def _create_plan():
|
def _create_plan():
|
||||||
|
generation_mode = provenance.get("generation_mode") if provenance.get("generation_mode") in TASK_SOURCE_ENUM else "llm_generation"
|
||||||
|
committee_agent_count = int(provenance.get("committee_agent_count") or 0)
|
||||||
|
fallback_used = bool(provenance.get("fallback_used", False))
|
||||||
plan = DailyWorkflowPlan(
|
plan = DailyWorkflowPlan(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
date=date_str,
|
date=date_str,
|
||||||
source="agent",
|
source="agent",
|
||||||
|
generation_mode=generation_mode,
|
||||||
|
committee_agent_count=max(0, committee_agent_count),
|
||||||
|
fallback_used=fallback_used,
|
||||||
plan_json=plan_data,
|
plan_json=plan_data,
|
||||||
created_at=datetime.utcnow(),
|
created_at=datetime.utcnow(),
|
||||||
updated_at=datetime.utcnow(),
|
updated_at=datetime.utcnow(),
|
||||||
@@ -641,7 +666,7 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
|
|||||||
action_type=str(t.get("actionType") or "navigate").strip()[:20],
|
action_type=str(t.get("actionType") or "navigate").strip()[:20],
|
||||||
action_url=str(t.get("actionUrl") or "").strip(),
|
action_url=str(t.get("actionUrl") or "").strip(),
|
||||||
dependencies=coerce_dependencies(t.get("dependencies")),
|
dependencies=coerce_dependencies(t.get("dependencies")),
|
||||||
metadata_json=t.get("metadata") or {},
|
metadata_json=_normalize_task_metadata(t, default_source=generation_mode),
|
||||||
enabled=bool(t.get("enabled", True)),
|
enabled=bool(t.get("enabled", True)),
|
||||||
created_at=datetime.utcnow(),
|
created_at=datetime.utcnow(),
|
||||||
updated_at=datetime.utcnow(),
|
updated_at=datetime.utcnow(),
|
||||||
|
|||||||
@@ -82,6 +82,15 @@ const WorkflowProgressBar: React.FC<WorkflowProgressBarProps> = ({
|
|||||||
return 'Ready to Start';
|
return 'Ready to Start';
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const getProvenanceLabel = () => {
|
||||||
|
const summary = currentWorkflow?.provenanceSummary;
|
||||||
|
if (!summary) return 'Daily Workflow';
|
||||||
|
if (summary.generationMode === 'agent_committee') return 'Personalized by Agents';
|
||||||
|
if (summary.generationMode === 'llm_generation' && !summary.fallbackUsed) return 'AI Personalized Guide';
|
||||||
|
if (summary.fallbackUsed || summary.generationMode === 'controlled_fallback') return 'Baseline Daily Guide';
|
||||||
|
return 'Daily Workflow';
|
||||||
|
};
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<motion.div
|
<motion.div
|
||||||
initial={{ opacity: 0, y: -20 }}
|
initial={{ opacity: 0, y: -20 }}
|
||||||
@@ -128,6 +137,16 @@ const WorkflowProgressBar: React.FC<WorkflowProgressBarProps> = ({
|
|||||||
fontWeight: 600
|
fontWeight: 600
|
||||||
}}
|
}}
|
||||||
/>
|
/>
|
||||||
|
<Chip
|
||||||
|
label={getProvenanceLabel()}
|
||||||
|
size="small"
|
||||||
|
sx={{
|
||||||
|
background: 'rgba(255,255,255,0.08)',
|
||||||
|
color: 'rgba(255,255,255,0.9)',
|
||||||
|
border: '1px solid rgba(255,255,255,0.2)',
|
||||||
|
fontWeight: 600
|
||||||
|
}}
|
||||||
|
/>
|
||||||
</Box>
|
</Box>
|
||||||
|
|
||||||
{/* Controls */}
|
{/* Controls */}
|
||||||
|
|||||||
@@ -168,28 +168,32 @@ export const useWorkflowStore = create<WorkflowState>()(
|
|||||||
try {
|
try {
|
||||||
const resp = await apiClient.get('/api/today-workflow', { params: date ? { date } : {} });
|
const resp = await apiClient.get('/api/today-workflow', { params: date ? { date } : {} });
|
||||||
const serverWorkflow = resp?.data?.data?.workflow as DailyWorkflow | undefined;
|
const serverWorkflow = resp?.data?.data?.workflow as DailyWorkflow | undefined;
|
||||||
|
const planSummary = resp?.data?.data?.plan?.provenance_summary;
|
||||||
|
|
||||||
if (!serverWorkflow || !Array.isArray(serverWorkflow.tasks)) {
|
if (serverWorkflow && Array.isArray(serverWorkflow.tasks)) {
|
||||||
throw new WorkflowError({
|
if (planSummary && !serverWorkflow.provenanceSummary) {
|
||||||
code: 'WORKFLOW_SCHEMA_INVALID',
|
serverWorkflow.provenanceSummary = planSummary;
|
||||||
message: 'Server workflow response is missing a valid tasks array.',
|
}
|
||||||
timestamp: new Date(),
|
const normalizedWorkflow = normalizeServerWorkflow(serverWorkflow);
|
||||||
recoverable: false,
|
const derived = computeProgressAndNavigation(normalizedWorkflow);
|
||||||
suggestedAction: 'Refresh and try again. If this persists, contact support.'
|
set({
|
||||||
|
currentWorkflow: normalizedWorkflow,
|
||||||
|
workflowProgress: derived.progress,
|
||||||
|
navigationState: derived.navigation,
|
||||||
|
isLoading: false,
|
||||||
|
isDegradedMode: false,
|
||||||
|
degradedModeReason: null,
|
||||||
});
|
});
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const normalizedWorkflow = normalizeServerWorkflow(serverWorkflow);
|
throw new WorkflowError({
|
||||||
const derived = computeProgressAndNavigation(normalizedWorkflow);
|
code: 'WORKFLOW_SCHEMA_INVALID',
|
||||||
set({
|
message: 'Server workflow response is missing a valid tasks array.',
|
||||||
currentWorkflow: normalizedWorkflow,
|
timestamp: new Date(),
|
||||||
workflowProgress: derived.progress,
|
recoverable: false,
|
||||||
navigationState: derived.navigation,
|
suggestedAction: 'Refresh and try again. If this persists, contact support.'
|
||||||
isLoading: false,
|
|
||||||
isDegradedMode: false,
|
|
||||||
degradedModeReason: null,
|
|
||||||
});
|
});
|
||||||
return;
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (!isServerUnavailableError(error)) {
|
if (!isServerUnavailableError(error)) {
|
||||||
set({
|
set({
|
||||||
|
|||||||
@@ -5,6 +5,14 @@ export type TaskStatus = 'pending' | 'in_progress' | 'completed' | 'skipped';
|
|||||||
export type TaskPriority = 'high' | 'medium' | 'low';
|
export type TaskPriority = 'high' | 'medium' | 'low';
|
||||||
export type ActionType = 'navigate' | 'modal' | 'external';
|
export type ActionType = 'navigate' | 'modal' | 'external';
|
||||||
export type WorkflowStatus = 'not_started' | 'in_progress' | 'completed' | 'paused' | 'stopped';
|
export type WorkflowStatus = 'not_started' | 'in_progress' | 'completed' | 'paused' | 'stopped';
|
||||||
|
export type WorkflowGenerationMode = 'agent_committee' | 'llm_generation' | 'llm_pillar_backfill' | 'controlled_fallback';
|
||||||
|
|
||||||
|
export interface WorkflowProvenanceSummary {
|
||||||
|
generationMode: WorkflowGenerationMode;
|
||||||
|
committeeAgentCount: number;
|
||||||
|
fallbackUsed: boolean;
|
||||||
|
taskSourceBreakdown: Partial<Record<WorkflowGenerationMode, number>>;
|
||||||
|
}
|
||||||
|
|
||||||
export interface TodayTask {
|
export interface TodayTask {
|
||||||
id: string;
|
id: string;
|
||||||
@@ -44,6 +52,7 @@ export interface DailyWorkflow {
|
|||||||
completedAt?: Date;
|
completedAt?: Date;
|
||||||
totalEstimatedTime: number; // in minutes
|
totalEstimatedTime: number; // in minutes
|
||||||
actualTimeSpent: number; // in minutes
|
actualTimeSpent: number; // in minutes
|
||||||
|
provenanceSummary?: WorkflowProvenanceSummary;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface WorkflowProgress {
|
export interface WorkflowProgress {
|
||||||
@@ -54,6 +63,7 @@ export interface WorkflowProgress {
|
|||||||
nextTask?: TodayTask;
|
nextTask?: TodayTask;
|
||||||
estimatedTimeRemaining: number; // in minutes
|
estimatedTimeRemaining: number; // in minutes
|
||||||
actualTimeSpent: number; // in minutes
|
actualTimeSpent: number; // in minutes
|
||||||
|
provenanceSummary?: WorkflowProvenanceSummary;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface TaskCompletionData {
|
export interface TaskCompletionData {
|
||||||
|
|||||||
Reference in New Issue
Block a user