Commit_all_local_changes_after_PR_406_merge

This commit is contained in:
ajaysi
2026-03-10 17:01:36 +05:30
parent f78b5f1e04
commit 8c2d88efb9
17 changed files with 936 additions and 412 deletions

View File

@@ -53,6 +53,39 @@ WORKSPACE_DIR = os.path.join(ROOT_DIR, 'workspace')
# Engine cache for multi-tenant support
_user_engines = {}
def _ensure_daily_workflow_schema(engine, user_id: str) -> None:
"""Backfill required daily_workflow_plans columns for legacy tenant DBs."""
required_columns = {
"generation_mode": "VARCHAR(30) NOT NULL DEFAULT 'llm_generation'",
"committee_agent_count": "INTEGER NOT NULL DEFAULT 0",
"fallback_used": "BOOLEAN NOT NULL DEFAULT 0",
"generation_run_id": "INTEGER",
}
try:
with engine.begin() as conn:
table_check = conn.exec_driver_sql(
"SELECT name FROM sqlite_master WHERE type='table' AND name='daily_workflow_plans'"
).fetchone()
if not table_check:
return
existing_cols = {
row[1] for row in conn.exec_driver_sql("PRAGMA table_info(daily_workflow_plans)").fetchall()
}
for col_name, col_def in required_columns.items():
if col_name not in existing_cols:
conn.exec_driver_sql(
f"ALTER TABLE daily_workflow_plans ADD COLUMN {col_name} {col_def}"
)
logger.warning(
f"Auto-migrated daily_workflow_plans column '{col_name}' for user {user_id}"
)
except Exception as e:
logger.error(f"Failed daily_workflow_plans schema compatibility check for user {user_id}: {e}")
def get_user_db_path(user_id: str) -> str:
"""Get the database path for a specific user."""
# Sanitize user_id to be safe for filesystem
@@ -192,6 +225,7 @@ def init_user_database(user_id: str):
UserBusinessInfoBase.metadata.create_all(bind=engine)
ContentAssetBase.metadata.create_all(bind=engine)
BingAnalyticsBase.metadata.create_all(bind=engine)
_ensure_daily_workflow_schema(engine, user_id)
# Initialize default data for new databases
try:

View File

@@ -3,7 +3,10 @@ Task Scheduler Package
Modular, pluggable scheduler for ALwrity tasks.
"""
import os
from sqlalchemy.orm import Session
from apscheduler.triggers.cron import CronTrigger
from .core.scheduler import TaskScheduler
from .core.executor_interface import TaskExecutor, TaskExecutionResult
@@ -32,6 +35,7 @@ from .utils.platform_insights_task_loader import load_due_platform_insights_task
from .utils.advertools_task_loader import load_due_advertools_tasks
from .utils.sif_indexing_task_loader import load_due_sif_indexing_tasks
from .utils.market_trends_task_loader import load_due_market_trends_tasks
from services.today_workflow_service import generate_scheduled_daily_workflows
# Global scheduler instance (initialized on first access)
_scheduler_instance: TaskScheduler = None
@@ -143,6 +147,18 @@ def get_scheduler() -> TaskScheduler:
market_trends_executor,
load_due_market_trends_tasks
)
today_workflow_hour_utc = int(os.getenv('TODAY_WORKFLOW_SCHEDULE_HOUR_UTC', '2'))
today_workflow_minute_utc = int(os.getenv('TODAY_WORKFLOW_SCHEDULE_MINUTE_UTC', '0'))
_scheduler_instance.scheduler.add_job(
generate_scheduled_daily_workflows,
trigger=CronTrigger(hour=today_workflow_hour_utc, minute=today_workflow_minute_utc, timezone='UTC'),
id='generate_daily_workflows',
replace_existing=True,
max_instances=1,
coalesce=True,
misfire_grace_time=3600,
)
return _scheduler_instance

View File

@@ -8,6 +8,7 @@ from models.daily_workflow_models import DailyWorkflowPlan, DailyWorkflowTask
from models.agent_activity_models import AgentAlert
from services.agent_activity_service import AgentActivityService, build_agent_event_payload
from services.llm_providers.main_text_generation import llm_text_gen
from services.database import get_all_user_ids, get_session_for_user
from loguru import logger
PILLAR_IDS = ["plan", "generate", "publish", "analyze", "engage", "remarket"]
@@ -604,7 +605,12 @@ async def generate_agent_enhanced_plan(
return result
async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]:
async def get_or_create_daily_workflow_plan(
db: Session,
user_id: str,
date: Optional[str] = None,
creation_source: str = "manual",
) -> tuple[DailyWorkflowPlan, bool]:
from starlette.concurrency import run_in_threadpool
date_str = date or _today_date_str()
@@ -646,7 +652,10 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
plan = DailyWorkflowPlan(
user_id=user_id,
date=date_str,
source="agent",
source=creation_source,
generation_mode=_derive_generation_mode(plan_data),
committee_agent_count=_count_committee_agents(tasks),
fallback_used=_plan_uses_fallback(tasks),
plan_json=plan_data,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
@@ -685,6 +694,80 @@ async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Opt
return plan, True
def _derive_generation_mode(plan_data: Dict[str, Any]) -> str:
tasks = plan_data.get("tasks", []) if isinstance(plan_data, dict) else []
source_modes = set()
for task in tasks:
metadata = task.get("metadata") if isinstance(task, dict) else {}
metadata = metadata if isinstance(metadata, dict) else {}
source_agent = str(metadata.get("source_agent") or "").strip()
source = str(metadata.get("source") or "").strip()
if source_agent:
source_modes.add("agent_committee")
elif source in {"controlled_fallback", "llm_pillar_backfill"}:
source_modes.add(source)
if "agent_committee" in source_modes:
return "agent_committee"
if "controlled_fallback" in source_modes:
return "controlled_fallback"
if "llm_pillar_backfill" in source_modes:
return "llm_pillar_backfill"
return "llm_generation"
def _count_committee_agents(tasks: List[Dict[str, Any]]) -> int:
agents = set()
for task in tasks:
metadata = task.get("metadata") if isinstance(task, dict) else {}
metadata = metadata if isinstance(metadata, dict) else {}
source_agent = str(metadata.get("source_agent") or "").strip()
if source_agent:
agents.add(source_agent)
return len(agents)
def _plan_uses_fallback(tasks: List[Dict[str, Any]]) -> bool:
for task in tasks:
metadata = task.get("metadata") if isinstance(task, dict) else {}
metadata = metadata if isinstance(metadata, dict) else {}
source = str(metadata.get("source") or "").strip()
if source in {"controlled_fallback", "llm_pillar_backfill"}:
return True
return False
async def generate_scheduled_daily_workflows() -> Dict[str, int]:
user_ids = get_all_user_ids()
stats = {"users_seen": 0, "created": 0, "existing": 0, "failed": 0}
for user_id in user_ids:
stats["users_seen"] += 1
db = None
try:
db = get_session_for_user(user_id)
plan, created = await get_or_create_daily_workflow_plan(
db,
user_id,
creation_source="scheduled",
)
if created:
stats["created"] += 1
logger.info("Scheduled daily workflow created for user {} date {}", user_id, plan.date)
else:
stats["existing"] += 1
logger.info("Scheduled daily workflow already exists for user {} date {}", user_id, plan.date)
except Exception as e:
stats["failed"] += 1
logger.error("Scheduled daily workflow generation failed for user {}: {}", user_id, e)
finally:
if db:
db.close()
logger.info("Scheduled daily workflow run complete: {}", stats)
return stats
def update_task_status(
db: Session,
user_id: str,

View File

@@ -3,6 +3,7 @@ Video generation operations (text-to-video and image-to-video).
"""
import requests
import json
from typing import Any, Dict, Optional
from fastapi import HTTPException
@@ -12,6 +13,19 @@ from .base import VideoBase
logger = get_service_logger("wavespeed.generators.video.generation")
def _extract_wavespeed_message(response_text: str) -> str:
"""Best-effort extraction of WaveSpeed error message from response payload."""
if not response_text:
return ""
try:
parsed = json.loads(response_text)
if isinstance(parsed, dict):
return str(parsed.get("message") or parsed.get("error") or "")
except (json.JSONDecodeError, TypeError, ValueError):
return ""
return ""
class VideoGeneration(VideoBase):
"""Video generation operations."""
@@ -31,6 +45,25 @@ class VideoGeneration(VideoBase):
response = requests.post(url, headers=self._get_headers(), json=payload, timeout=timeout)
if response.status_code != 200:
logger.error(f"[WaveSpeed] Submission failed: {response.status_code} {response.text}")
error_message = _extract_wavespeed_message(response.text)
if "insufficient credits" in error_message.lower() or "credit" in error_message.lower():
raise HTTPException(
status_code=429,
detail={
"error": "Insufficient WaveSpeed credits",
"message": "Insufficient credits. Please top up to continue video generation.",
"provider": "wavespeed",
"usage_info": {
"provider": "wavespeed",
"type": "credits",
"limit_type": "provider_credits",
"operation_type": "scene_animation",
"action_required": "top_up",
},
},
)
raise HTTPException(
status_code=502,
detail={
@@ -75,6 +108,25 @@ class VideoGeneration(VideoBase):
if response.status_code != 200:
logger.error(f"[WaveSpeed] Text-to-video submission failed: {response.status_code} {response.text}")
error_message = _extract_wavespeed_message(response.text)
if "insufficient credits" in error_message.lower() or "credit" in error_message.lower():
raise HTTPException(
status_code=429,
detail={
"error": "Insufficient WaveSpeed credits",
"message": "Insufficient credits. Please top up to continue video generation.",
"provider": "wavespeed",
"usage_info": {
"provider": "wavespeed",
"type": "credits",
"limit_type": "provider_credits",
"operation_type": "video_generation",
"action_required": "top_up",
},
},
)
raise HTTPException(
status_code=502,
detail={
@@ -174,6 +226,25 @@ class VideoGeneration(VideoBase):
if response.status_code != 200:
logger.error(f"[WaveSpeed] Text-to-video submission failed: {response.status_code} {response.text}")
error_message = _extract_wavespeed_message(response.text)
if "insufficient credits" in error_message.lower() or "credit" in error_message.lower():
raise HTTPException(
status_code=429,
detail={
"error": "Insufficient WaveSpeed credits",
"message": "Insufficient credits. Please top up to continue video generation.",
"provider": "wavespeed",
"usage_info": {
"provider": "wavespeed",
"type": "credits",
"limit_type": "provider_credits",
"operation_type": "video_generation",
"action_required": "top_up",
},
},
)
raise HTTPException(
status_code=502,
detail={