diff --git a/PR_MERGE_SUMMARY.md b/PR_MERGE_SUMMARY.md deleted file mode 100644 index c84e2a17..00000000 --- a/PR_MERGE_SUMMARY.md +++ /dev/null @@ -1,316 +0,0 @@ -# ALwrity Daily Workflow PR Merge Summary -**Date:** March 9, 2026 -**Session Goal:** Review and integrate workflow enhancement PRs (#388-397) -**Status:** ✅ COMPLETED - 9 PRs successfully merged - ---- - -## Successfully Merged PRs (9 Total) - -### Core Workflow Enhancement Series - -| # | Title | Commit | Key Improvements | -|---|-------|--------|-----------------| -| #388 | Daily Workflow Integration & Enhanced Reliability | 8f6ed3a | Agent committee orchestration, robust task proposal handling, metadata normalization | -| #389 | Committee Health Precheck & Simplified Architecture | 3558131 | Simplified schema, health precheck, removed complex dependency coercion | -| #390 | Degraded-mode Workflow Regeneration Criteria | 56854df | Rate-limited `/regenerate` endpoint (3 req/60s), quality score tracking | -| #391 | Workflow Provenance Quality Metrics | 2d4c83e | Provenance classification (agent vs fallback), quality ratio calculation | -| #392 | Contextuality Validation & Low-context Status | 74b788a | Evidence-link grounding, plan contextuality scoring (65% threshold) | -| #394 | Task Memory Feedback Scoring | 38444f4 | Proper self-learning: uses persisted task.status, handles all negative cases | -| #395 | Dependencies Normalization | 0aaaf07 | Robust `_normalize_dependencies()` helper for consistent data types | -| #396 | Date Validation & Error Handling | 9271566 | ISO date validation before yesterday indexing, narrower SQLAlchemyError handling | -| #397 | Typed Request Model for Task Status | 39bc3e3 | Pydantic `TaskStatusEnum` & `TaskStatusUpdateRequest`, FastAPI auto-validation | - ---- - -## System Architecture Evolution - -### From Simple to Sophisticated -``` -PR #388 ─→ Agent Committee Orchestration -PR #389 ─→ Clean Architecture -PR #390 ─→ Regeneration Control -PR #391 ─→ Quality Awareness -PR #392 ─→ Evidence-Based Grounding -PR #394 ─→ Proper Memory Learning -PR #395 ─→ Data Consistency -PR #396 ─→ Production Observability -PR #397 ─→ API Type Safety -``` - ---- - -## Key Features Implemented - -### 1. **Agent Committee (PR #388)** -- Multi-agent orchestration with 5 specialized agents: - - ContentStrategyAgent - - StrategyArchitectAgent - - SEOOptimizationAgent - - SocialAmplificationAgent - - CompetitorResponseAgent -- Parallel proposal gathering with exception safety -- Deduplication by priority and semantic ordering - -### 2. **Contextuality Validation (PR #392)** -- Evidence-link framework: - - `onboarding:{field_name}` references - - `alert:{alert_id}` references -- Task contextuality scoring: minimum 1 evidence link -- Plan contextuality threshold: 65% of tasks must meet threshold -- Automatic strict regeneration for low-context plans -- Response fields: `quality_status`, `contextuality_validation` - -### 3. **Self-Learning Memory (PR #394)** -- Uses canonical `task.status` from database (not request param) -- Proper feedback scoring: - - `completed` → +1 (positive learning) - - `skipped`, `dismissed`, `rejected` → -1 (negative learning) - - Other statuses → 0 (neutral) -- Prevents inconsistent memory behavior from status normalization mismatches - -### 4. **Data Consistency (PR #395)** -- `_normalize_dependencies()` helper handles all type variations: - - `None` → `[]` - - List → returned as-is - - JSON string → parsed and validated - - Invalid types → `[]` -- Applied to today and yesterday task payloads -- Ensures indexing pipeline receives consistent types - -### 5. **Production Observability (PR #396)** -- Date validation: - - ISO format check before computing yesterday - - Clear warning logs (plan_id, user_id, plan_date, reason) - - Graceful skip on parse failure -- Narrower exception handling: - - `SQLAlchemyError` instead of silent `except Exception: pass` - - Detailed error logs with context - - Non-fatal failures preserve today's indexing - -### 6. **API Type Safety (PR #397)** -- `TaskStatusEnum` enumeration: - - Constrains valid status values at type level - - FastAPI auto-validation in OpenAPI -- `TaskStatusUpdateRequest` Pydantic model: - - `status: TaskStatusEnum` (auto-validated) - - `completion_notes: Optional[str]` (max 4000 chars enforced) - - Eliminates manual validation code - ---- - -## Technical Highlights - -### Backend Services -- **today_workflow_service.py**: - - `generate_agent_enhanced_plan()` with agent committee + LLM fallback - - `validate_plan_contextuality()` for evidence-link scoring - - `_ensure_pillar_coverage()` with LLM backfill + controlled fallback - - `update_task_status()` with memory integration - -- **API (today_workflow.py)**: - - Type-safe endpoint handlers - - Pydantic request/response validation - - Comprehensive error handling - - Normalized dependencies throughout - - Detailed logging for observability - -### Database & ORM -- Efficient schema after simplification (PR #389) -- `plan_json` BLOB stores complete workflow metadata -- Proper foreign key relationships -- Transaction safety with SQLAlchemy - -### Frontend (TypeScript) -- Zustand store for workflow state -- Error boundary handling -- Fallback logic for degraded mode -- Type-safe API calls - ---- - -## Quality Metrics - -### Code Quality -- ✅ Type safety throughout (Pydantic, TypeScript) -- ✅ Comprehensive error handling (narrower scopes) -- ✅ Detailed observability logging -- ✅ Non-fatal failure modes -- ✅ Data consistency guarantees - -### Testing Coverage -- ✅ Python static compile checks (all PRs) -- ✅ Backend unit tests (scheduler, onboarding, database) -- ✅ Frontend builds without errors (linting auto-fixed) - -### Production Readiness -- ✅ Rate limiting for regeneration endpoint -- ✅ Evidence-link grounding prevents hallucinations -- ✅ Self-learning memory improves task proposals -- ✅ Graceful degradation with fallback tasks -- ✅ Detailed error logging for operations - ---- - -## Skipped PRs & Rationale - -### PR #393: Improve indexing observability logs -- **Status:** ❌ CLOSED (user decision) -- **Reason:** Contextuality validation too important to remove -- **Contains:** Good logging improvements, but removes core validation - -### PR #398: Resolve canonical user IDs in scheduler -- **Status:** ⏸️ SKIPPED -- **Reason:** - - Codex flagged P1 concern: User ID filtering could drop legacy tasks - - Codex flagged P2 concern: DB initialization as side effect in discovery - - Causes regressions in API layer (removes Pydantic models, error handling) - - Built from older main version -- **Recommendation:** Await rebase on current main + Codex concerns addressed - -### PR #399: Centralize onboarding SEO task health -- **Status:** ⏸️ SKIPPED -- **Reason:** - - Same regressions as PR #398 (removes API improvements) - - Built from older main version - - SEO dashboard improvements are solid but not worth losing workflow API enhancements -- **Recommendation:** Rebase on current main when #398 is fixed - ---- - -## Current State Summary - -### What We Have -✅ **Agent Committee System** -- 5 specialized agents with parallel proposal gathering -- Semantic deduplication -- Self-learning memory integration -- Graceful fallback to LLM generation - -✅ **Evidence-Link Grounding** -- Tasks reference onboarding data and system alerts -- Contextuality scoring prevents hallucinations -- Automatic strict regeneration for low-context workflows -- Response metadata for monitoring - -✅ **Self-Learning Memory** -- Proper feedback scoring from database state -- Handles all task status outcomes -- Prevents inconsistent learning from normalized statuses - -✅ **Data Consistency** -- Normalized dependencies across all payloads -- Type-safe API endpoints -- Consistent data handling in indexing - -✅ **Production Observability** -- Date validation before yesterday indexing -- Narrower exception handling with detailed logs -- Non-fatal error modes -- Clear operational visibility - -✅ **API Type Safety** -- Pydantic validation -- OpenAPI documentation -- No manual validation code needed -- Better IDE support with TypeScript - -### System Capabilities -- Daily workflow generation with 6 lifecycle pillars -- Rate-limited on-demand regeneration -- Evidence-based contextuality validation -- Self-improving task proposals through memory -- Graceful degradation with fallback tasks -- Comprehensive logging and error handling -- Type-safe endpoints with auto-validation - ---- - -## Lessons Learned - -### PR Review Patterns -1. **Check for regressions:** Several PRs removed recent improvements -2. **Verify git history:** PRs #398-399 were built from older main -3. **Surgical merges work:** Combining good parts while preserving improvements -4. **Documentation matters:** Clear merge commit messages help understand evolution - -### Code Quality -1. **Type safety prevents bugs:** Pydantic models caught issues early -2. **Narrow exception scopes:** Better observability than broad catches -3. **Evidence-based design:** Grounding prevents hallucination -4. **Data consistency:** Normalization functions prevent downstream bugs - -### Architecture Decisions -1. **Committee approach:** Multiple agents > single LLM -2. **Evidence links:** Better than quality ratios for grounding -3. **Memory learning:** Use DB state, not request params -4. **Graceful degradation:** Fallback tasks > error states - ---- - -## Next Steps (Future Work) - -### High Priority -1. **PR #398 Rebase**: Wait for: - - Rebase on current main - - Codex P1 concern: Address user ID filtering for legacy tasks - - Codex P2 concern: Avoid DB initialization in discovery - -2. **PR #399 Rebase**: Depends on #398 - - SEO dashboard improvements once #398 is fixed - -### Medium Priority -1. **Performance Tuning**: Monitor agent committee query times -2. **Memory Optimization**: Cache agent proposals for repeated patterns -3. **Dashboard Enhancement**: Add contextuality metrics to UI - -### Low Priority -1. **Documentation**: Update API docs with new models -2. **Logging**: Expand observability for edge cases -3. **Testing**: Add integration tests for committee scenarios - ---- - -## Session Statistics - -| Metric | Value | -|--------|-------| -| **PRs Reviewed** | 12 (#388-397, #398-399) | -| **PRs Merged** | 9 (#388-397, excluding #393) | -| **PRs Skipped** | 3 (#393 closed by user, #398-399 due to regressions) | -| **Merge Conflicts Resolved** | 11 | -| **Surgical Merges** | 4 (#394-397) | -| **Git Commits** | 9 merge commits | -| **Files Modified** | 30+ across backend/frontend | -| **Lines Added** | 1000+ | -| **Lines Removed** | 1500+ | -| **Time Span** | March 8-9, 2026 | - ---- - -## Recommendation for Future Sessions - -1. **Before merging PRs:** - - Check that PR is based on current main - - Review for regressions in dependent code - - Look for Codex review comments (P1/P2 flags) - -2. **When PRs conflict with improvements:** - - Use surgical merge to extract good parts - - Preserve working system over incomplete features - -3. **For architectural changes:** - - Validate against existing patterns - - Ensure data consistency maintained - - Test against real workflows - -4. **Documentation:** - - Update this file when significant changes occur - - Keep git history clean with descriptive commits - - Tag versions for major milestones - ---- - -**Session Completed:** ✅ -**System State:** Production-ready with advanced features -**Next Review:** When PR #398 is rebased on current main diff --git a/backend/api/podcast/handlers/video.py b/backend/api/podcast/handlers/video.py index 3f56dd63..b7cc69af 100644 --- a/backend/api/podcast/handlers/video.py +++ b/backend/api/podcast/handlers/video.py @@ -45,11 +45,17 @@ def _extract_error_message(exc: Exception) -> str: """ Extract user-friendly error message from exception. Handles HTTPException with nested error details from WaveSpeed API. + Preserves subscription modal flags for frontend. """ if isinstance(exc, HTTPException): detail = exc.detail # If detail is a dict (from WaveSpeed client) if isinstance(detail, dict): + # Check if this is a subscription/credit error + if detail.get("error_type") == "insufficient_credits" or detail.get("show_subscription_modal"): + # Return the error message with subscription modal flag + return detail.get("message", "Insufficient credits. Please top up your account.") + # Try to extract message from nested response JSON response_str = detail.get("response", "") if response_str: @@ -86,6 +92,27 @@ def _extract_error_message(exc: Exception) -> str: return error_str +def _extract_error_metadata(exc: Exception) -> Dict[str, Any]: + """Extract structured error metadata for task polling clients.""" + if isinstance(exc, HTTPException): + detail = exc.detail + if isinstance(detail, dict): + return { + "error_status": exc.status_code, + "error_data": detail, + } + if isinstance(detail, str): + return { + "error_status": exc.status_code, + "error_data": { + "error": detail, + "message": detail, + }, + } + + return {} + + def _execute_podcast_video_task( task_id: str, request: PodcastVideoGenerationRequest, @@ -229,9 +256,15 @@ def _execute_podcast_video_task( # Extract user-friendly error message from exception error_msg = _extract_error_message(exc) + error_meta = _extract_error_metadata(exc) task_manager.update_task_status( - task_id, "failed", error=error_msg, message=f"Video generation failed: {error_msg}" + task_id, + "failed", + error=error_msg, + message=f"Video generation failed: {error_msg}", + error_status=error_meta.get("error_status"), + error_data=error_meta.get("error_data"), ) @@ -257,7 +290,7 @@ async def generate_podcast_video( try: if hasattr(request, 'headers') and hasattr(request.headers, 'get'): auth_header = request.headers.get("Authorization") - except: + except Exception: pass if auth_header and auth_header.startswith("Bearer "): diff --git a/backend/api/story_writer/task_manager.py b/backend/api/story_writer/task_manager.py index 2e6d2e40..42415269 100644 --- a/backend/api/story_writer/task_manager.py +++ b/backend/api/story_writer/task_manager.py @@ -76,6 +76,10 @@ class TaskManager: if task["status"] == "failed" and task.get("error"): response["error"] = task["error"] + if task.get("error_status") is not None: + response["error_status"] = task["error_status"] + if task.get("error_data") is not None: + response["error_data"] = task["error_data"] return response @@ -86,7 +90,9 @@ class TaskManager: progress: Optional[float] = None, message: Optional[str] = None, result: Optional[Dict[str, Any]] = None, - error: Optional[str] = None + error: Optional[str] = None, + error_status: Optional[int] = None, + error_data: Optional[Dict[str, Any]] = None, ): """Update the status of a task.""" if task_id not in self.task_storage: @@ -112,6 +118,10 @@ class TaskManager: if error is not None: task["error"] = error logger.error(f"[StoryWriter] Task {task_id} error: {error}") + if error_status is not None: + task["error_status"] = error_status + if error_data is not None: + task["error_data"] = error_data async def execute_story_generation_task( self, diff --git a/backend/api/today_workflow.py b/backend/api/today_workflow.py index b6efea6d..94ffb949 100644 --- a/backend/api/today_workflow.py +++ b/backend/api/today_workflow.py @@ -11,7 +11,7 @@ from sqlalchemy.exc import SQLAlchemyError from middleware.auth_middleware import get_current_user from services.database import get_db -from services.today_workflow_service import get_or_create_daily_workflow_plan, update_task_status +from services.today_workflow_service import get_or_create_daily_workflow_plan, update_task_status, _today_date_str from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask import asyncio from services.intelligence.txtai_service import TxtaiIntelligenceService @@ -81,26 +81,7 @@ async def _index_tasks_to_sif(user_id: str, date: str, tasks: list[dict], label: logger.debug(f"Background indexing failed for user {user_id}: {e}") -@router.get("") -async def get_today_workflow( - date: Optional[str] = None, - current_user: dict = Depends(get_current_user), - db: Session = Depends(get_db), -) -> Dict[str, Any]: - from starlette.concurrency import run_in_threadpool - user_id = str(current_user.get("id")) - plan, created = await get_or_create_daily_workflow_plan(db, user_id, date=date) - - def _fetch_tasks(): - return ( - db.query(DailyWorkflowTask) - .filter(DailyWorkflowTask.plan_id == plan.id, DailyWorkflowTask.user_id == user_id) - .order_by(DailyWorkflowTask.created_at.asc()) - .all() - ) - - tasks = await run_in_threadpool(_fetch_tasks) - +def _build_workflow_payload(user_id: str, plan: DailyWorkflowPlan, tasks: list[DailyWorkflowTask]) -> Dict[str, Any]: response_tasks = [] for t in tasks: response_tasks.append( @@ -136,8 +117,156 @@ async def get_today_workflow( workflow_status = "completed" total_estimated = int(sum(int(t.get("estimatedTime") or 0) for t in response_tasks)) + plan_json = plan.plan_json or {} + + return { + "workflow": { + "id": f"daily-{user_id}-{plan.date}", + "date": plan.date, + "userId": user_id, + "tasks": response_tasks, + "currentTaskIndex": current_index, + "completedTasks": completed, + "totalTasks": total, + "workflowStatus": workflow_status, + "totalEstimatedTime": total_estimated, + "actualTimeSpent": 0, + }, + "plan": { + "id": plan.id, + "date": plan.date, + "source": plan.source, + "generation_mode": plan.generation_mode, + "committee_agent_count": plan.committee_agent_count, + "fallback_used": bool(plan.fallback_used), + "quality_status": plan_json.get("quality_status", "contextual"), + "contextuality_validation": plan_json.get("contextuality_validation"), + "provenance_summary": { + "generationMode": plan.generation_mode, + "committeeAgentCount": plan.committee_agent_count, + "fallbackUsed": bool(plan.fallback_used), + "taskSourceBreakdown": {}, + }, + "created_at": plan.created_at.isoformat() if plan.created_at else None, + "updated_at": plan.updated_at.isoformat() if plan.updated_at else None, + }, + "schedule_status": { + "date": plan.date, + "generated": True, + "scheduled_run_completed": plan.source == "scheduled", + "source": plan.source, + "created_at": plan.created_at.isoformat() if plan.created_at else None, + }, + } + + +@router.get("") +async def get_today_workflow( + date: Optional[str] = None, + current_user: dict = Depends(get_current_user), + db: Session = Depends(get_db), +) -> Dict[str, Any]: + """Get existing daily workflow for the specified date. + Returns 404 if no workflow exists for the date. + Workflow should only be created via explicit user action or scheduled job. + """ + from starlette.concurrency import run_in_threadpool + user_id = str(current_user.get("id")) + date_str = date or _today_date_str() + + def _get_existing(): + return ( + db.query(DailyWorkflowPlan) + .filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str) + .first() + ) + + plan = await run_in_threadpool(_get_existing) + + if not plan: + raise HTTPException( + status_code=404, + detail=f"No workflow found for date {date_str}. Workflow should be generated via explicit user action or scheduled job." + ) + + def _fetch_tasks(): + return ( + db.query(DailyWorkflowTask) + .filter(DailyWorkflowTask.plan_id == plan.id, DailyWorkflowTask.user_id == user_id) + .order_by(DailyWorkflowTask.created_at.asc()) + .all() + ) + + tasks = await run_in_threadpool(_fetch_tasks) + + return { + "success": True, + "data": _build_workflow_payload(user_id, plan, tasks), + "timestamp": datetime.utcnow().isoformat(), + "user_id": user_id, + } + + +@router.get("/status") +async def get_today_workflow_status( + date: Optional[str] = None, + current_user: dict = Depends(get_current_user), + db: Session = Depends(get_db), +) -> Dict[str, Any]: + from starlette.concurrency import run_in_threadpool + + user_id = str(current_user.get("id")) + date_str = date or _today_date_str() + + def _get_existing(): + return ( + db.query(DailyWorkflowPlan) + .filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str) + .first() + ) + + plan = await run_in_threadpool(_get_existing) + + return { + "success": True, + "data": { + "date": date_str, + "generated": plan is not None, + "scheduled_run_completed": bool(plan and plan.source == "scheduled"), + "source": plan.source if plan else None, + "created_at": plan.created_at.isoformat() if plan and plan.created_at else None, + }, + "timestamp": datetime.utcnow().isoformat(), + "user_id": user_id, + } + + +@router.post("/generate") +async def generate_workflow( + date: Optional[str] = None, + current_user: dict = Depends(get_current_user), + db: Session = Depends(get_db), +) -> Dict[str, Any]: + """Explicitly generate a new daily workflow for the specified date. + This should only be called when the user explicitly requests workflow generation + or via a scheduled job at night. + """ + from starlette.concurrency import run_in_threadpool + user_id = str(current_user.get("id")) + plan, created = await get_or_create_daily_workflow_plan(db, user_id, date=date, creation_source="manual") + + def _fetch_tasks(): + return ( + db.query(DailyWorkflowTask) + .filter(DailyWorkflowTask.plan_id == plan.id, DailyWorkflowTask.user_id == user_id) + .order_by(DailyWorkflowTask.created_at.asc()) + .all() + ) + + tasks = await run_in_threadpool(_fetch_tasks) if created: + response_tasks = _build_workflow_payload(user_id, plan, tasks)["workflow"]["tasks"] asyncio.create_task(_index_tasks_to_sif(user_id, plan.date, response_tasks, label="today")) from datetime import date as date_type, timedelta @@ -200,29 +329,7 @@ async def get_today_workflow( return { "success": True, - "data": { - "workflow": { - "id": f"daily-{user_id}-{plan.date}", - "date": plan.date, - "userId": user_id, - "tasks": response_tasks, - "currentTaskIndex": current_index, - "completedTasks": completed, - "totalTasks": total, - "workflowStatus": workflow_status, - "totalEstimatedTime": total_estimated, - "actualTimeSpent": 0, - }, - "plan": { - "id": plan.id, - "date": plan.date, - "source": plan.source, - "quality_status": (plan.plan_json or {}).get("quality_status", "contextual"), - "contextuality_validation": (plan.plan_json or {}).get("contextuality_validation"), - "created_at": plan.created_at.isoformat() if plan.created_at else None, - "updated_at": plan.updated_at.isoformat() if plan.updated_at else None, - }, - }, + "data": _build_workflow_payload(user_id, plan, tasks), "timestamp": datetime.utcnow().isoformat(), "user_id": user_id, } diff --git a/backend/api/video_studio/handlers/avatar.py b/backend/api/video_studio/handlers/avatar.py index f7570d2d..555a95a4 100644 --- a/backend/api/video_studio/handlers/avatar.py +++ b/backend/api/video_studio/handlers/avatar.py @@ -18,6 +18,26 @@ router = APIRouter() UPLOAD_DIR = Path("backend/data/video_studio/uploads") UPLOAD_DIR.mkdir(parents=True, exist_ok=True) + +def _extract_error_metadata(exc: Exception) -> Dict[str, Any]: + """Extract structured HTTP error metadata for polling clients.""" + if isinstance(exc, HTTPException): + detail = exc.detail + if isinstance(detail, dict): + return { + "error_status": exc.status_code, + "error_data": detail, + } + if isinstance(detail, str): + return { + "error_status": exc.status_code, + "error_data": { + "error": detail, + "message": detail, + }, + } + return {} + def _process_avatar_generation(task_id: str, image_path: Path, audio_path: Path, user_id: str, resolution: str, model: str): """ Background task to process avatar generation using shared InfiniteTalk service. @@ -94,7 +114,15 @@ def _process_avatar_generation(task_id: str, image_path: Path, audio_path: Path, except Exception as e: logger.error(f"[VideoStudio] Avatar generation failed for task {task_id}: {e}", exc_info=True) - task_manager.update_task(task_id, "failed", error=str(e), user_id=user_id) + error_meta = _extract_error_metadata(e) + task_manager.update_task( + task_id, + "failed", + error=str(e), + user_id=user_id, + error_status=error_meta.get("error_status"), + error_data=error_meta.get("error_data"), + ) finally: # Cleanup temp upload files try: diff --git a/backend/api/video_studio/task_manager.py b/backend/api/video_studio/task_manager.py index 378b10b3..c4fabeac 100644 --- a/backend/api/video_studio/task_manager.py +++ b/backend/api/video_studio/task_manager.py @@ -39,7 +39,18 @@ class TaskManager: logger.error(f"[VideoStudio] Failed to create task: {e}") raise - def update_task(self, task_id: str, status: str, result: Optional[Dict] = None, error: Optional[str] = None, user_id: str = None, progress: float = None, message: str = None): + def update_task( + self, + task_id: str, + status: str, + result: Optional[Dict] = None, + error: Optional[str] = None, + user_id: str = None, + progress: float = None, + message: str = None, + error_status: Optional[int] = None, + error_data: Optional[Dict[str, Any]] = None, + ): """Update an existing task.""" if not user_id: logger.error(f"[VideoStudio] Cannot update task {task_id} without user_id") @@ -74,6 +85,13 @@ class TaskManager: task.result = result if error: task.error = error + if error_status is not None or error_data is not None: + result_payload = task.result if isinstance(task.result, dict) else {} + if error_status is not None: + result_payload["error_status"] = error_status + if error_data is not None: + result_payload["error_data"] = error_data + task.result = result_payload if progress is not None: task.progress = progress if message: @@ -107,7 +125,7 @@ class TaskManager: if status_val == "processing": status_val = "running" - return { + response = { "task_id": task.task_id, "status": status_val, "result": task.result, @@ -117,6 +135,12 @@ class TaskManager: "created_at": task.created_at, "updated_at": task.updated_at } + if isinstance(task.result, dict): + if task.result.get("error_status") is not None: + response["error_status"] = task.result.get("error_status") + if task.result.get("error_data") is not None: + response["error_data"] = task.result.get("error_data") + return response finally: db.close() except Exception as e: diff --git a/backend/migrate_schema.py b/backend/migrate_schema.py index 4c109db1..307fe0fc 100644 --- a/backend/migrate_schema.py +++ b/backend/migrate_schema.py @@ -4,6 +4,10 @@ import sqlite3 import os from pathlib import Path + +ROOT_DIR = Path(__file__).resolve().parent.parent +WORKSPACE_DIR = ROOT_DIR / "workspace" + def migrate_database(db_path): """Add missing columns to daily_workflow_plans table.""" if not os.path.exists(db_path): @@ -46,14 +50,14 @@ def migrate_database(db_path): def find_and_migrate_databases(): """Find all databases and apply migrations.""" - workspace_dir = r'c:\Users\diksha rawat\Desktop\ALwrity\workspace' + workspace_dir = WORKSPACE_DIR - if not os.path.exists(workspace_dir): + if not workspace_dir.exists(): print(f"Workspace directory not found: {workspace_dir}") return # Find all .db files - db_files = list(Path(workspace_dir).glob('**/db/*.db')) + db_files = list(workspace_dir.glob('**/db/*.db')) if not db_files: print("No databases found to migrate") diff --git a/backend/services/database.py b/backend/services/database.py index 39de47ba..0c31d54c 100644 --- a/backend/services/database.py +++ b/backend/services/database.py @@ -53,6 +53,39 @@ WORKSPACE_DIR = os.path.join(ROOT_DIR, 'workspace') # Engine cache for multi-tenant support _user_engines = {} + +def _ensure_daily_workflow_schema(engine, user_id: str) -> None: + """Backfill required daily_workflow_plans columns for legacy tenant DBs.""" + required_columns = { + "generation_mode": "VARCHAR(30) NOT NULL DEFAULT 'llm_generation'", + "committee_agent_count": "INTEGER NOT NULL DEFAULT 0", + "fallback_used": "BOOLEAN NOT NULL DEFAULT 0", + "generation_run_id": "INTEGER", + } + + try: + with engine.begin() as conn: + table_check = conn.exec_driver_sql( + "SELECT name FROM sqlite_master WHERE type='table' AND name='daily_workflow_plans'" + ).fetchone() + if not table_check: + return + + existing_cols = { + row[1] for row in conn.exec_driver_sql("PRAGMA table_info(daily_workflow_plans)").fetchall() + } + + for col_name, col_def in required_columns.items(): + if col_name not in existing_cols: + conn.exec_driver_sql( + f"ALTER TABLE daily_workflow_plans ADD COLUMN {col_name} {col_def}" + ) + logger.warning( + f"Auto-migrated daily_workflow_plans column '{col_name}' for user {user_id}" + ) + except Exception as e: + logger.error(f"Failed daily_workflow_plans schema compatibility check for user {user_id}: {e}") + def get_user_db_path(user_id: str) -> str: """Get the database path for a specific user.""" # Sanitize user_id to be safe for filesystem @@ -192,6 +225,7 @@ def init_user_database(user_id: str): UserBusinessInfoBase.metadata.create_all(bind=engine) ContentAssetBase.metadata.create_all(bind=engine) BingAnalyticsBase.metadata.create_all(bind=engine) + _ensure_daily_workflow_schema(engine, user_id) # Initialize default data for new databases try: diff --git a/backend/services/scheduler/__init__.py b/backend/services/scheduler/__init__.py index 974b452d..5d7d1983 100644 --- a/backend/services/scheduler/__init__.py +++ b/backend/services/scheduler/__init__.py @@ -3,7 +3,10 @@ Task Scheduler Package Modular, pluggable scheduler for ALwrity tasks. """ +import os + from sqlalchemy.orm import Session +from apscheduler.triggers.cron import CronTrigger from .core.scheduler import TaskScheduler from .core.executor_interface import TaskExecutor, TaskExecutionResult @@ -32,6 +35,7 @@ from .utils.platform_insights_task_loader import load_due_platform_insights_task from .utils.advertools_task_loader import load_due_advertools_tasks from .utils.sif_indexing_task_loader import load_due_sif_indexing_tasks from .utils.market_trends_task_loader import load_due_market_trends_tasks +from services.today_workflow_service import generate_scheduled_daily_workflows # Global scheduler instance (initialized on first access) _scheduler_instance: TaskScheduler = None @@ -143,6 +147,18 @@ def get_scheduler() -> TaskScheduler: market_trends_executor, load_due_market_trends_tasks ) + + today_workflow_hour_utc = int(os.getenv('TODAY_WORKFLOW_SCHEDULE_HOUR_UTC', '2')) + today_workflow_minute_utc = int(os.getenv('TODAY_WORKFLOW_SCHEDULE_MINUTE_UTC', '0')) + _scheduler_instance.scheduler.add_job( + generate_scheduled_daily_workflows, + trigger=CronTrigger(hour=today_workflow_hour_utc, minute=today_workflow_minute_utc, timezone='UTC'), + id='generate_daily_workflows', + replace_existing=True, + max_instances=1, + coalesce=True, + misfire_grace_time=3600, + ) return _scheduler_instance diff --git a/backend/services/today_workflow_service.py b/backend/services/today_workflow_service.py index 9883767f..e48d9faf 100644 --- a/backend/services/today_workflow_service.py +++ b/backend/services/today_workflow_service.py @@ -8,6 +8,7 @@ from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask from models.agent_activity_models import AgentAlert from services.agent_activity_service import AgentActivityService, build_agent_event_payload from services.llm_providers.main_text_generation import llm_text_gen +from services.database import get_all_user_ids, get_session_for_user from loguru import logger PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"] @@ -604,7 +605,12 @@ async def generate_agent_enhanced_plan( return result -async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]: +async def get_or_create_daily_workflow_plan( + db: Session, + user_id: str, + date: Optional[str] = None, + creation_source: str = "manual", +) -> tuple[DailyWorkflowPlan, bool]: from starlette.concurrency import run_in_threadpool date_str = date or _today_date_str() @@ -646,7 +652,10 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt plan = DailyWorkflowPlan( user_id=user_id, date=date_str, - source="agent", + source=creation_source, + generation_mode=_derive_generation_mode(plan_data), + committee_agent_count=_count_committee_agents(tasks), + fallback_used=_plan_uses_fallback(tasks), plan_json=plan_data, created_at=datetime.utcnow(), updated_at=datetime.utcnow(), @@ -685,6 +694,80 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt return plan, True +def _derive_generation_mode(plan_data: Dict[str, Any]) -> str: + tasks = plan_data.get("tasks", []) if isinstance(plan_data, dict) else [] + source_modes = set() + for task in tasks: + metadata = task.get("metadata") if isinstance(task, dict) else {} + metadata = metadata if isinstance(metadata, dict) else {} + source_agent = str(metadata.get("source_agent") or "").strip() + source = str(metadata.get("source") or "").strip() + if source_agent: + source_modes.add("agent_committee") + elif source in {"controlled_fallback", "llm_pillar_backfill"}: + source_modes.add(source) + + if "agent_committee" in source_modes: + return "agent_committee" + if "controlled_fallback" in source_modes: + return "controlled_fallback" + if "llm_pillar_backfill" in source_modes: + return "llm_pillar_backfill" + return "llm_generation" + + +def _count_committee_agents(tasks: List[Dict[str, Any]]) -> int: + agents = set() + for task in tasks: + metadata = task.get("metadata") if isinstance(task, dict) else {} + metadata = metadata if isinstance(metadata, dict) else {} + source_agent = str(metadata.get("source_agent") or "").strip() + if source_agent: + agents.add(source_agent) + return len(agents) + + +def _plan_uses_fallback(tasks: List[Dict[str, Any]]) -> bool: + for task in tasks: + metadata = task.get("metadata") if isinstance(task, dict) else {} + metadata = metadata if isinstance(metadata, dict) else {} + source = str(metadata.get("source") or "").strip() + if source in {"controlled_fallback", "llm_pillar_backfill"}: + return True + return False + + +async def generate_scheduled_daily_workflows() -> Dict[str, int]: + user_ids = get_all_user_ids() + stats = {"users_seen": 0, "created": 0, "existing": 0, "failed": 0} + + for user_id in user_ids: + stats["users_seen"] += 1 + db = None + try: + db = get_session_for_user(user_id) + plan, created = await get_or_create_daily_workflow_plan( + db, + user_id, + creation_source="scheduled", + ) + if created: + stats["created"] += 1 + logger.info("Scheduled daily workflow created for user {} date {}", user_id, plan.date) + else: + stats["existing"] += 1 + logger.info("Scheduled daily workflow already exists for user {} date {}", user_id, plan.date) + except Exception as e: + stats["failed"] += 1 + logger.error("Scheduled daily workflow generation failed for user {}: {}", user_id, e) + finally: + if db: + db.close() + + logger.info("Scheduled daily workflow run complete: {}", stats) + return stats + + def update_task_status( db: Session, user_id: str, diff --git a/backend/services/wavespeed/generators/video/generation.py b/backend/services/wavespeed/generators/video/generation.py index b6f2af66..9c353e42 100644 --- a/backend/services/wavespeed/generators/video/generation.py +++ b/backend/services/wavespeed/generators/video/generation.py @@ -3,6 +3,7 @@ Video generation operations (text-to-video and image-to-video). """ import requests +import json from typing import Any, Dict, Optional from fastapi import HTTPException @@ -12,6 +13,19 @@ from .base import VideoBase logger = get_service_logger("wavespeed.generators.video.generation") +def _extract_wavespeed_message(response_text: str) -> str: + """Best-effort extraction of WaveSpeed error message from response payload.""" + if not response_text: + return "" + try: + parsed = json.loads(response_text) + if isinstance(parsed, dict): + return str(parsed.get("message") or parsed.get("error") or "") + except (json.JSONDecodeError, TypeError, ValueError): + return "" + return "" + + class VideoGeneration(VideoBase): """Video generation operations.""" @@ -31,6 +45,25 @@ class VideoGeneration(VideoBase): response = requests.post(url, headers=self._get_headers(), json=payload, timeout=timeout) if response.status_code != 200: logger.error(f"[WaveSpeed] Submission failed: {response.status_code} {response.text}") + + error_message = _extract_wavespeed_message(response.text) + if "insufficient credits" in error_message.lower() or "credit" in error_message.lower(): + raise HTTPException( + status_code=429, + detail={ + "error": "Insufficient WaveSpeed credits", + "message": "Insufficient credits. Please top up to continue video generation.", + "provider": "wavespeed", + "usage_info": { + "provider": "wavespeed", + "type": "credits", + "limit_type": "provider_credits", + "operation_type": "scene_animation", + "action_required": "top_up", + }, + }, + ) + raise HTTPException( status_code=502, detail={ @@ -75,6 +108,25 @@ class VideoGeneration(VideoBase): if response.status_code != 200: logger.error(f"[WaveSpeed] Text-to-video submission failed: {response.status_code} {response.text}") + + error_message = _extract_wavespeed_message(response.text) + if "insufficient credits" in error_message.lower() or "credit" in error_message.lower(): + raise HTTPException( + status_code=429, + detail={ + "error": "Insufficient WaveSpeed credits", + "message": "Insufficient credits. Please top up to continue video generation.", + "provider": "wavespeed", + "usage_info": { + "provider": "wavespeed", + "type": "credits", + "limit_type": "provider_credits", + "operation_type": "video_generation", + "action_required": "top_up", + }, + }, + ) + raise HTTPException( status_code=502, detail={ @@ -174,6 +226,25 @@ class VideoGeneration(VideoBase): if response.status_code != 200: logger.error(f"[WaveSpeed] Text-to-video submission failed: {response.status_code} {response.text}") + + error_message = _extract_wavespeed_message(response.text) + if "insufficient credits" in error_message.lower() or "credit" in error_message.lower(): + raise HTTPException( + status_code=429, + detail={ + "error": "Insufficient WaveSpeed credits", + "message": "Insufficient credits. Please top up to continue video generation.", + "provider": "wavespeed", + "usage_info": { + "provider": "wavespeed", + "type": "credits", + "limit_type": "provider_credits", + "operation_type": "video_generation", + "action_required": "top_up", + }, + }, + ) + raise HTTPException( status_code=502, detail={ diff --git a/docs/BACKEND_LOG_RCA_TRACKER.md b/docs/BACKEND_LOG_RCA_TRACKER.md new file mode 100644 index 00000000..3671d2a0 --- /dev/null +++ b/docs/BACKEND_LOG_RCA_TRACKER.md @@ -0,0 +1,361 @@ +# Backend Log RCA Tracker + +## Purpose + +This document is the working catalog for backend issues observed in runtime logs. + +For each issue, capture: +- error signature +- observed symptoms +- likely root cause analysis +- confidence level +- files to inspect/edit +- fix strategy notes +- validation steps +- status + +## Triage Rules + +- Do not fix directly from logs alone unless root cause is confirmed. +- Prefer grouping repeated log lines under one issue. +- Track the first failing subsystem, then downstream effects. +- Separate configuration problems from code defects. +- Keep this document updated before and after each fix. + +## Issue 1: Clerk token verification failures on authenticated endpoints + +- **Status**: Open +- **Severity**: High +- **Subsystem**: Authentication / request pipeline +- **Error signatures**: + - `Unverified token rejected (production).` + - `AUTHENTICATION ERROR: Token verification failed for endpoint: GET /api/...` +- **Observed endpoints in logs**: + - `/api/content-planning/monitoring/lightweight-stats` + - `/api/content-planning/monitoring/health` + - `/api/subscription/dashboard/...` + - `/api/subscription/alerts/...` + - `/api/subscription/status/...` +- **Observed behavior**: + - Requests reach authenticated endpoints. + - Clerk verification fails. + - Fallback unverified decode path is attempted. + - Production mode rejects the token. +- **Primary RCA hypothesis**: + - The backend is receiving bearer tokens that do not successfully validate against the resolved Clerk JWKS/issuer configuration. + - The middleware then falls back to unverified decode, but production mode explicitly rejects that path. +- **Secondary RCA hypotheses**: + - Frontend token/audience/issuer mismatch. + - Wrong Clerk environment variables loaded in backend. + - Issuer-derived JWKS URL resolution is inconsistent with actual Clerk instance. + - Requests may be sent before a valid session token is available. +- **Evidence in code**: + - `backend/middleware/auth_middleware.py` + - `ClerkAuthMiddleware.__init__` + - `ClerkAuthMiddleware.verify_token` + - `get_current_user` + - Relevant logic: + - derives JWKS URL from token issuer or cached publishable key instance + - falls back to `jwt.decode(..., verify_signature=False)` + - rejects unverified tokens when `ALLOW_UNVERIFIED_JWT_DEV` is false +- **Likely files to inspect/edit later**: + - `backend/middleware/auth_middleware.py` + - possibly frontend auth/session request layer if token attachment is inconsistent +- **Confidence**: Medium +- **Root-cause questions to answer**: + - Are `CLERK_SECRET_KEY` and publishable key values from the same Clerk instance? + - Is the token issuer exactly matching the intended Clerk environment? + - Are failing requests sent with stale, dev, or cross-environment tokens? + - Are these requests triggered before Clerk session hydration on the frontend? +- **Validation after fix**: + - Authenticated endpoints return 200 with verified user context. + - No `Unverified token rejected (production)` log spam for healthy requests. + +## Issue 2: Hugging Face structured JSON generation failing with model not found + +- **Status**: Open +- **Severity**: High +- **Subsystem**: LLM provider / workflow generation +- **Error signatures**: + - `HF structured model not found: %s. Trying fallback model.` + - `Hugging Face API call failed: Not Found` + - `HF structured model not found (no response_format path): %s` + - `Hugging Face structured JSON generation failed: NotFoundError: Not Found` + - `[llm_text_gen] Provider huggingface failed: RetryError[...]` +- **Observed behavior**: + - Structured JSON call tries primary model. + - Fallback model sequence also fails. + - Retry without `response_format` still fails with `NotFound`. + - Upstream caller falls through to another provider or fallback path. +- **Primary RCA hypothesis**: + - The configured Hugging Face model identifier is invalid, unavailable to the account/provider, or incompatible with the current OpenAI-compatible Hugging Face endpoint. +- **Secondary RCA hypotheses**: + - Base URL/API key/provider configuration is wrong. + - Fallback model list contains provider-specific model ids not available in the current account/region. + - Structured generation path assumes chat completions support for models that only exist on a different inference route. +- **Evidence in code**: + - `backend/services/llm_providers/huggingface_provider.py` + - `_fallback_model_sequence` + - `huggingface_structured_json_response` + - The code retries: + - with `response_format={"type": "json_object"}` + - then again without `response_format` + - Both paths still fail with `NotFoundError`, which points more strongly to model/base-url availability than schema formatting. +- **Likely files to inspect/edit later**: + - `backend/services/llm_providers/huggingface_provider.py` + - provider selection/orchestration file calling Hugging Face as primary for structured JSON + - environment/config file for HF model names and API base URL +- **Confidence**: High +- **Root-cause questions to answer**: + - Which exact model string is being passed as the primary model in the failing call? + - What base URL and API key are being used for the OpenAI client? + - Are the fallback model ids valid for the currently configured Hugging Face inference provider? +- **Validation after fix**: + - A structured JSON test request succeeds with the intended model or a verified fallback. + - No `NotFoundError` for the chosen model list. + +## Issue 3: txtai indexing attempted before service initialization completes + +- **Status**: Open +- **Severity**: Medium +- **Subsystem**: Semantic indexing / background tasks +- **Error signatures**: + - `Cannot index content - service not initialized for user ...` +- **Observed behavior**: + - Background indexing is triggered. + - `TxtaiIntelligenceService.index_content` calls `_ensure_initialized()`. + - `_ensure_initialized()` starts background initialization and returns immediately. + - `index_content` then checks `_initialized`, sees false, and fails fast. +- **Primary RCA hypothesis**: + - There is a race condition between lazy background initialization and immediate indexing/search calls. + - `SIF_FAIL_FAST=true` (default) causes operations to raise RuntimeError instead of gracefully deferring. +- **Evidence in code**: + - `backend/services/intelligence/txtai_service.py`: + - Line 57: `self.fail_fast = str(os.getenv("SIF_FAIL_FAST", "true")).lower() in {"1", "true", "yes", "on"}` + - Lines 234-235: `index_content` raises RuntimeError if `fail_fast` and not initialized + - Lines 284-285: `search` raises RuntimeError if `fail_fast` and not initialized + - Lines 319-320: `get_similarity` raises RuntimeError if `fail_fast` and not initialized + - `_ensure_initialized` is intentionally non-blocking (starts background thread) + - `backend/api/today_workflow.py`: + - `_index_tasks_to_sif` triggers indexing in background after workflow actions +- **Likely files to inspect/edit later**: + - `backend/services/intelligence/txtai_service.py` + - `backend/api/today_workflow.py` + - any other callers that assume initialization is synchronous +- **Confidence**: High +- **Potential downstream impact**: + - workflow/task indexing silently fails + - semantic search quality degrades + - noisy logs obscure higher-priority failures +- **Root-cause questions to answer**: + - Should `index_content` await `_ensure_initialized_async()` instead of using the non-blocking path? + - Should callers tolerate deferred indexing instead of fail-fast behavior? + - Is `SIF_FAIL_FAST=true` appropriate for background indexing operations? + - Should `SIF_FAIL_FAST` default to `false` for background operations? +- **Validation after fix**: + - First indexing call after startup succeeds or is gracefully deferred without error spam. + +## Issue 4: Today workflow endpoint reload observed during active debugging + +- **Status**: Observed +- **Severity**: Low +- **Subsystem**: Development reload / workflow API +- **Log signature**: + - `StatReload detected changes in 'api\today_workflow.py'. Reloading...` +- **Observed behavior**: + - Development server reloads due to file edits. +- **RCA**: + - Expected dev-server behavior, not itself a product bug. +- **Files involved**: + - `backend/api/today_workflow.py` +- **Confidence**: High +- **Action**: + - No fix needed; keep separate from actual runtime defects. + +## Cross-Issue Notes + +- The auth failures and the workflow/indexing issues may be independent. +- The Hugging Face failure may trigger fallback task generation, which can still create workflows while hiding the upstream provider problem. +- txtai indexing failures appear to be a post-generation side effect, not the root cause of generation failure. +- **LiteLLM was investigated and dropped as a false herring** – no project-level SIF/txtai wiring to LiteLLM was found. +- The SIF agent local-model path is **separate** from txtai embeddings and may be the source of the "local model used to work" feedback. + +## Candidate Investigation Order + +1. Authentication verification mismatch +2. Hugging Face model/provider availability mismatch +3. txtai initialization race (with `SIF_FAIL_FAST` behavior) +4. SIF agent local-model defaults (Qwen 1.5B vs lighter alternatives) +5. Any downstream workflow symptoms after the above are stabilized + +## Minimal Fix Paths (Pre-Implementation) + +### For Issue 3 (txtai init race): +- **Option A**: Change `SIF_FAIL_FAST` default to `false` for background operations + - Allows graceful deferral instead of RuntimeError + - Minimal code change, no logic changes +- **Option B**: Use `_ensure_initialized_async()` in `index_content`/`search`/`get_similarity` + - Awaits initialization before proceeding + - More robust but requires async refactoring +- **Option C**: Add initialization state callbacks to callers + - More complex, may not be necessary + +### For Issue 5 (SIF agent local-model drift): +- **Option A**: Change default `model_name` in `SIFBaseAgent.__init__` to lighter model + - Example: `Qwen/Qwen2.5-0.5B-Instruct` or `TinyLlama/TinyLlama-1.1B-Chat-v1.0` + - Single-line change, immediate effect +- **Option B**: Add env/config override for default agent local model + - More flexible, requires config wiring + - Allows runtime tuning without code changes +- **Option C**: Keep current default and rely on existing fallback chain + - The fallback chain already tries lighter models if memory fails + - May be sufficient if memory detection works correctly + +## Current Evidence Sources + +- Runtime logs from terminal `python` process `22056` +- `backend/middleware/auth_middleware.py` +- `backend/services/llm_providers/huggingface_provider.py` +- `backend/services/intelligence/txtai_service.py` +- `backend/api/today_workflow.py` +- `backend/services/today_workflow_service.py` + +## Issue 5: SIF agent local-model drift (distinct from txtai embeddings) + +- **Status**: Open +- **Severity**: Medium +- **Subsystem**: SIF agents / local LLM wrappers +- **Error signatures**: + - (No direct log signature yet; this is a hypothesis from user feedback that "local model used to work") +- **Observed behavior**: + - User reports that a local model used to work for SIF agents now seems heavier or less responsive. + - The SIF agent path is **separate** from txtai embeddings. +- **Primary RCA hypothesis**: + - The SIF agent local LLM wrapper path uses a 1.5B parameter model by default, which may be heavier than the previous local model. + - This is distinct from txtai embeddings, which still use `sentence-transformers/all-MiniLM-L6-v2`. +- **Evidence in code**: + - `backend/services/intelligence/sif_agents.py`: + - Lines 47-51: `LOCAL_LLM_FALLBACKS = ["Qwen/Qwen2.5-1.5B-Instruct", "Qwen/Qwen2.5-0.5B-Instruct", "TinyLlama/TinyLlama-1.1B-Chat-v1.0"]` + - Lines 53-139: `LocalLLMWrapper` tries models in order, with memory issue detection and automatic fallback to smaller models + - Line 141: `SIFBaseAgent.__init__` default `model_name="Qwen/Qwen2.5-1.5B-Instruct"` + - `backend/services/intelligence/txtai_service.py`: + - Line 48: Still uses `sentence-transformers/all-MiniLM-L6-v2` for embeddings +- **Likely files to inspect/edit later**: + - `backend/services/intelligence/sif_agents.py` + - `backend/services/intelligence/agents/specialized/base.py` + - any config/env that controls default agent local model +- **Confidence**: Medium +- **Root-cause questions to answer**: + - What was the previous local model default for SIF agents? + - Is `Qwen/Qwen2.5-1.5B-Instruct` actually too heavy for the user’s laptop? + - Should the default be changed to `Qwen/Qwen2.5-0.5B-Instruct` or `TinyLlama/TinyLlama-1.1B-Chat-v1.0`? + - Are there any env/config overrides that could make this configurable? +- **Validation after fix**: + - SIF agents use a CPU-friendly local model (e.g., smaller Qwen variant or TinyLlama). + - Agent generation completes without excessive CPU/memory pressure. + +## Issue 6: Model initialization blocking and module unification + +- **Status**: Open +- **Severity**: High +- **Subsystem**: Startup / model loading / module architecture +- **Error signatures**: + - (No direct log signature; architectural issue) +- **Observed behavior**: + - `start_alwrity_backend.py` pre-downloads `Qwen/Qwen2.5-3B-Instruct` **synchronously** before server starts (line 122). + - `sif_agents.py` defaults to `Qwen/Qwen2.5-1.5B-Instruct` and uses lazy loading via `LocalLLMWrapper`. + - `txtai_service.py` uses `sentence-transformers/all-MiniLM-L6-v2` for embeddings. + - Three separate modules handle model loading, creating confusion. + - User wants fail-fast semantics (catch bugs, avoid silent failures) AND proper fallback. + - User wants non-blocking model downloads for SIF/agents. +- **Primary RCA hypothesis**: + - Startup script blocks on model download, contradicting non-blocking requirement. + - Model size mismatch: startup downloads 3B, agents default to 1.5B. + - Fail-fast in `txtai_service.py` prevents fallback from working. + - Module separation (`txtai_service.py`, `sif_agents.py`, `start_alwrity_backend.py`) creates confusion. +- **Evidence in code**: + - `start_alwrity_backend.py`: + - Line 122: `target_model = "Qwen/Qwen2.5-3B-Instruct"` + - Lines 127-131: `snapshot_download()` is **blocking** call + - Lines 117-120: Skips on Render/Railway but **not** on local dev + - `sif_agents.py`: + - Line 48: `LOCAL_LLM_FALLBACKS = ["Qwen/Qwen2.5-1.5B-Instruct", "Qwen/Qwen2.5-0.5B-Instruct", "TinyLlama/TinyLlama-1.1B-Chat-v1.0"]` + - Line 141: Default `model_name="Qwen/Qwen2.5-1.5B-Instruct"` + - Line 150: Uses `LocalLLMWrapper` for lazy loading + - Lines 94-130: Has fallback logic with memory issue detection + - `txtai_service.py`: + - Line 57: `SIF_FAIL_FAST=true` (default) causes RuntimeError + - Lines 234-235, 284-285, 319-320: Fail-fast prevents fallback +- **Likely files to inspect/edit later**: + - `start_alwrity_backend.py` (remove blocking download) + - `services/intelligence/sif_agents.py` (unify model defaults) + - `services/intelligence/txtai_service.py` (fix fail-fast with fallback) + - Create unified `services/intelligence/model_registry.py` or similar +- **Confidence**: High +- **Root-cause questions to answer**: + - Should model download be truly non-blocking (background thread)? + - Should fail-fast be conditional (e.g., only for critical paths, not background ops)? + - Should module unification create a single `ModelRegistry` or `ModelManager`? + - How to ensure JSON/response structure compatibility across fallback chain? +- **Validation after fix**: + - Server starts without blocking on model download. + - SIF agents use consistent model defaults. + - Fail-fast catches bugs but allows fallback for non-critical ops. + - Single module handles all model loading logic. + +## Minimal Fix Paths (Pre-Implementation) + +### For Issue 3 (txtai init race) - REVISED: +- **Option A**: Change `SIF_FAIL_FAST` to be **conditional** (not global) + - Keep fail-fast for critical paths (user-initiated ops) + - Allow graceful deferral for background ops (indexing, clustering) + - Requires distinguishing operation types +- **Option B**: Use `_ensure_initialized_async()` for **blocking ops only** + - Keep non-blocking for background ops + - Awaits init for user-facing ops + - More robust but requires async refactoring +- **Option C**: Add operation-type-aware fail-fast + - Pass `critical=True/False` to operations + - Fail-fast only when `critical=True` + - Most aligned with user requirements + +### For Issue 5 (SIF agent local-model drift) - REVISED: +- **Option A**: Change default to lighter model AND improve fallback chain + - Default: `Qwen/Qwen2.5-0.5B-Instruct` (lighter) + - Fallback: `0.5B → TinyLlama 1.1B` + - Ensure JSON/response structure compatibility +- **Option B**: Add env/config override + keep fallback chain + - `SIF_AGENT_MODEL` env var + - Fallback chain remains as-is + - More flexible +- **Option C**: Keep current default and rely on existing fallback chain + - **RECOMMENDED**: Already has memory detection and fallback + - Just need to ensure JSON compatibility + +### For Issue 6 (Model blocking + module unification): +- **Option A**: Remove blocking download from startup script + - Delete `bootstrap_local_llm_models()` call + - Let `LocalLLMWrapper` handle lazy loading + - Minimal change, immediate non-blocking +- **Option B**: Make download non-blocking (background thread) + - Keep pre-download but in background + - Server starts immediately + - More complex +- **Option C**: Create unified `ModelRegistry` module + - Single source of truth for model defaults + - Centralized download/cache logic + - Eliminates confusion between modules + - **RECOMMENDED for long-term** + +## Session Update Log + +### 2026-03-10 + +- Created initial RCA tracker document. +- Seeded first three concrete issues from supplied logs. +- No fixes applied from this document yet. +- Added Issue 5: SIF agent local-model drift (LiteLLM dropped as false herring). +- Refined Issue 3 with `SIF_FAIL_FAST` behavior details. +- Added minimal fix paths for Issues 3 and 5. +- Added Issue 6: Model initialization blocking and module unification. +- Updated minimal fix paths based on user requirements (fail-fast + fallback, non-blocking, unification). diff --git a/frontend/package.json b/frontend/package.json index 82af1d38..59daa402 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -36,9 +36,9 @@ "zustand": "^5.0.7" }, "scripts": { - "start": "react-scripts start", - "build": "node --max_old_space_size=8192 node_modules/react-scripts/scripts/build.js", - "build:nomap": "node --max_old_space_size=8192 -e \"process.env.GENERATE_SOURCEMAP='false'; require('./node_modules/react-scripts/scripts/build');\"", + "start": "node --max_old_space_size=12288 node_modules/react-scripts/scripts/start.js", + "build": "node --max_old_space_size=12288 node_modules/react-scripts/scripts/build.js", + "build:nomap": "node --max_old_space_size=12288 -e \"process.env.GENERATE_SOURCEMAP='false'; require('./node_modules/react-scripts/scripts/build');\"", "test": "react-scripts test", "eject": "react-scripts eject", "analyze": "npm run build && npx source-map-explorer 'build/static/js/*.js' --html bundle-report.html", diff --git a/frontend/src/components/MainDashboard/MainDashboard.tsx b/frontend/src/components/MainDashboard/MainDashboard.tsx index 5b8f90b9..eb9cbb0f 100644 --- a/frontend/src/components/MainDashboard/MainDashboard.tsx +++ b/frontend/src/components/MainDashboard/MainDashboard.tsx @@ -6,6 +6,7 @@ import { Snackbar, useTheme } from '@mui/material'; +import { Lightbulb } from '@mui/icons-material'; import { motion, AnimatePresence } from 'framer-motion'; import { useNavigate } from 'react-router-dom'; import { useAuth } from '@clerk/clerk-react'; @@ -63,7 +64,9 @@ const MainDashboard: React.FC = () => { const { currentWorkflow, workflowProgress, + scheduleStatus, isLoading: workflowLoading, + loadTodayWorkflow, generateDailyWorkflow, startWorkflow, pauseWorkflow, @@ -71,19 +74,18 @@ const MainDashboard: React.FC = () => { } = useWorkflowStore(); const { userId } = useAuth(); - // Initialize workflow on component mount React.useEffect(() => { const initializeWorkflow = async () => { try { if (!userId) return; - await generateDailyWorkflow(userId); + await loadTodayWorkflow(); } catch (error) { - console.warn('Failed to initialize workflow:', error); + console.warn('Failed to load today workflow:', error); } }; initializeWorkflow(); - }, [generateDailyWorkflow, userId]); + }, [loadTodayWorkflow, userId]); // Debug logging for workflow state (only in development) React.useEffect(() => { @@ -238,6 +240,17 @@ const MainDashboard: React.FC = () => { // Note: filteredCategories removed as it's not used in the current implementation + const statusChips = React.useMemo(() => { + const scheduled = !!scheduleStatus?.scheduled_run_completed; + return [ + { + label: scheduled ? 'Scheduled workflow ready' : 'Scheduled workflow pending', + color: scheduled ? '#22c55e' : '#ef4444', + icon: , + }, + ]; + }, [scheduleStatus]); + if (loading) { return ; } @@ -297,7 +310,7 @@ const MainDashboard: React.FC = () => { workflowId.startsWith('daily-'); @@ -47,9 +48,6 @@ const normalizeServerWorkflow = (workflow: DailyWorkflow): DailyWorkflow => ({ : [], }); -const isServerUnavailableError = (error: unknown): boolean => - error instanceof ConnectionError || error instanceof NetworkError; - const toWorkflowError = (error: unknown, fallbackMessage: string): WorkflowError => { if (error instanceof WorkflowError) return error; @@ -109,6 +107,7 @@ interface WorkflowState { currentWorkflow: DailyWorkflow | null; workflowProgress: WorkflowProgress | null; navigationState: NavigationState | null; + scheduleStatus: TodayWorkflowScheduleStatus | null; // User preferences userPreferences: UserWorkflowPreferences | null; @@ -121,6 +120,8 @@ interface WorkflowState { degradedModeReason: string | null; // Actions + loadTodayWorkflow: (date?: string) => Promise; + refreshScheduleStatus: (date?: string) => Promise; generateDailyWorkflow: (userId: string, date?: string) => Promise; startWorkflow: (workflowId: string) => Promise; pauseWorkflow: (workflowId: string) => Promise; @@ -154,6 +155,7 @@ export const useWorkflowStore = create()( currentWorkflow: null, workflowProgress: null, navigationState: null, + scheduleStatus: null, userPreferences: null, isWorkflowModalOpen: false, isLoading: false, @@ -161,14 +163,82 @@ export const useWorkflowStore = create()( isDegradedMode: false, degradedModeReason: null, + loadTodayWorkflow: async (date?: string) => { + set({ isLoading: true, error: null }); + + try { + const resp = await apiClient.get('/api/today-workflow', { params: date ? { date } : {} }); + const serverWorkflow = resp?.data?.data?.workflow as DailyWorkflow | undefined; + const planSummary = resp?.data?.data?.plan?.provenance_summary; + const scheduleStatus = resp?.data?.data?.schedule_status as TodayWorkflowScheduleStatus | undefined; + + if (serverWorkflow && Array.isArray(serverWorkflow.tasks)) { + if (planSummary && !serverWorkflow.provenanceSummary) { + serverWorkflow.provenanceSummary = planSummary; + } + const normalizedWorkflow = normalizeServerWorkflow(serverWorkflow); + const derived = computeProgressAndNavigation(normalizedWorkflow); + set({ + currentWorkflow: normalizedWorkflow, + workflowProgress: derived.progress, + navigationState: derived.navigation, + scheduleStatus: scheduleStatus || null, + isLoading: false, + isDegradedMode: false, + degradedModeReason: null, + }); + return; + } + + throw new WorkflowError({ + code: 'WORKFLOW_SCHEMA_INVALID', + message: 'Server workflow response is missing a valid tasks array.', + timestamp: new Date(), + recoverable: false, + suggestedAction: 'Refresh and try again. If this persists, contact support.' + }); + } catch (error: any) { + if (error?.response?.status === 404) { + set({ + currentWorkflow: null, + workflowProgress: null, + navigationState: null, + isLoading: false, + isDegradedMode: false, + degradedModeReason: null, + }); + await get().refreshScheduleStatus(date); + return; + } + + set({ + error: toWorkflowError(error, 'Failed to load workflow from server.'), + isLoading: false, + isDegradedMode: false, + degradedModeReason: null, + }); + } + }, + + refreshScheduleStatus: async (date?: string) => { + try { + const resp = await apiClient.get('/api/today-workflow/status', { params: date ? { date } : {} }); + const scheduleStatus = resp?.data?.data as TodayWorkflowScheduleStatus | undefined; + set({ scheduleStatus: scheduleStatus || null }); + } catch { + set({ scheduleStatus: null }); + } + }, + // Generate daily workflow generateDailyWorkflow: async (userId: string, date?: string) => { set({ isLoading: true, error: null }); try { - const resp = await apiClient.get('/api/today-workflow', { params: date ? { date } : {} }); + const resp = await apiClient.post('/api/today-workflow/generate', null, { params: date ? { date } : {} }); const serverWorkflow = resp?.data?.data?.workflow as DailyWorkflow | undefined; const planSummary = resp?.data?.data?.plan?.provenance_summary; + const scheduleStatus = resp?.data?.data?.schedule_status as TodayWorkflowScheduleStatus | undefined; if (serverWorkflow && Array.isArray(serverWorkflow.tasks)) { if (planSummary && !serverWorkflow.provenanceSummary) { @@ -180,6 +250,7 @@ export const useWorkflowStore = create()( currentWorkflow: normalizedWorkflow, workflowProgress: derived.progress, navigationState: derived.navigation, + scheduleStatus: scheduleStatus || null, isLoading: false, isDegradedMode: false, degradedModeReason: null, @@ -195,34 +266,11 @@ export const useWorkflowStore = create()( suggestedAction: 'Refresh and try again. If this persists, contact support.' }); } catch (error) { - if (!isServerUnavailableError(error)) { - set({ - error: toWorkflowError(error, 'Failed to load workflow from server.'), - isLoading: false, - isDegradedMode: false, - degradedModeReason: null, - }); - return; - } - } - - try { - const workflow = await taskWorkflowOrchestrator.generateDailyWorkflow(userId, date); - const progress = taskWorkflowOrchestrator.getWorkflowProgress(workflow.id); - const navigation = taskWorkflowOrchestrator.getNavigationState(workflow.id); set({ - currentWorkflow: workflow, - workflowProgress: progress, - navigationState: navigation, - isLoading: false, - isDegradedMode: true, - degradedModeReason: 'Server workflow unavailable. Using local fallback workflow.', - error: null, - }); - } catch (error) { - set({ - error: toWorkflowError(error, 'Failed to generate local fallback workflow.'), + error: toWorkflowError(error, 'Failed to generate workflow from server.'), isLoading: false, + isDegradedMode: false, + degradedModeReason: null, }); } }, diff --git a/frontend/src/types/workflow.ts b/frontend/src/types/workflow.ts index c28a986c..cb7856fd 100644 --- a/frontend/src/types/workflow.ts +++ b/frontend/src/types/workflow.ts @@ -14,6 +14,14 @@ export interface WorkflowProvenanceSummary { taskSourceBreakdown: Partial>; } +export interface TodayWorkflowScheduleStatus { + date: string; + generated: boolean; + scheduled_run_completed: boolean; + source: string | null; + created_at?: string | null; +} + export interface TodayTask { id: string; pillarId: string; diff --git a/frontend/tsconfig.json b/frontend/tsconfig.json index e0887626..5977cdba 100644 --- a/frontend/tsconfig.json +++ b/frontend/tsconfig.json @@ -24,7 +24,7 @@ "./node_modules/@types", "./src/types" ], - "types": ["jest", "node"] + "types": ["node"] }, "include": [ "src"