AI Image and Audio Generation Improvements.

AI Video Generation Pre-Flight Checklist. Cost Estimate Improvements.
This commit is contained in:
ajaysi
2025-12-25 16:26:08 +05:30
parent 59913bffa9
commit 7512933c65
163 changed files with 8938 additions and 37401 deletions

View File

@@ -14,7 +14,7 @@ import uuid
from services.database import get_db
from middleware.auth_middleware import get_current_user, get_current_user_with_query_token
from api.story_writer.utils.auth import require_authenticated_user
from services.llm_providers.main_image_generation import generate_image
from services.llm_providers.main_image_generation import generate_image, generate_character_image
from utils.asset_tracker import save_asset_to_library
from loguru import logger
from ..constants import PODCAST_IMAGES_DIR
@@ -139,10 +139,7 @@ async def generate_podcast_scene_image(
logger.info(f"[Podcast] Using Ideogram Character for scene {request.scene_id} with base avatar")
logger.info(f"[Podcast] Scene prompt: {image_prompt[:150]}...")
# Use Ideogram Character API via WaveSpeed client
from services.wavespeed.client import WaveSpeedClient
wavespeed_client = WaveSpeedClient()
# Use centralized character image generation with subscription checks and tracking
# Use custom settings if provided, otherwise use defaults
style = request.style or "Realistic" # Default to Realistic for professional podcast presenters
rendering_speed = request.rendering_speed or "Quality" # Default to Quality for podcast videos
@@ -163,9 +160,10 @@ async def generate_podcast_scene_image(
logger.info(f"[Podcast] Ideogram Character settings: style={style}, rendering_speed={rendering_speed}, aspect_ratio={aspect_ratio}")
try:
image_bytes = wavespeed_client.generate_character_image(
image_bytes = generate_character_image(
prompt=image_prompt,
reference_image_bytes=base_avatar_bytes,
user_id=user_id,
style=style,
aspect_ratio=aspect_ratio,
rendering_speed=rendering_speed,
@@ -308,39 +306,9 @@ async def generate_podcast_scene_image(
# Create image URL (served via API endpoint)
image_url = f"/api/podcast/images/{image_filename}"
# Estimate cost (rough estimate: ~$0.04 per image for most providers, ~$0.08 for Ideogram Character Quality)
cost = 0.08 if result.provider == "wavespeed" and result.model == "ideogram-ai/ideogram-character" else 0.04
# TRACK USAGE after successful image generation
try:
from models.subscription_models import UsageSummary, APIProvider
from sqlalchemy import text as sql_text
from datetime import datetime
current_period = pricing_service.get_current_billing_period(user_id) or datetime.now().strftime("%Y-%m")
# Update stability_calls and stability_cost (used for all image generation)
# Note: stability_calls is used for all image generation providers, not just Stability AI
update_query = sql_text("""
UPDATE usage_summaries
SET stability_calls = COALESCE(stability_calls, 0) + 1,
stability_cost = COALESCE(stability_cost, 0) + :cost,
total_calls = COALESCE(total_calls, 0) + 1,
total_cost = COALESCE(total_cost, 0) + :cost
WHERE user_id = :user_id AND billing_period = :period
""")
db.execute(update_query, {
'cost': cost,
'user_id': user_id,
'period': current_period
})
db.commit()
logger.info(f"[Podcast] ✅ Tracked image generation usage: user={user_id}, cost=${cost:.4f}, provider={result.provider}")
except Exception as usage_error:
logger.error(f"[Podcast] Failed to track image generation usage: {usage_error}")
db.rollback()
# Don't fail the request if usage tracking fails
# Estimate cost (rough estimate: ~$0.04 per image for most providers, ~$0.10 for Ideogram Character)
# Note: Actual usage tracking is handled by centralized generate_image()/generate_character_image() functions
cost = 0.10 if result.provider == "wavespeed" and result.model == "ideogram-ai/ideogram-character" else 0.04
# Save to asset library
try:

View File

@@ -0,0 +1,376 @@
"""YouTube Creator scene audio generation handlers."""
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from typing import Dict, Any, Optional
from pydantic import BaseModel
from services.database import get_db
from middleware.auth_middleware import get_current_user, get_current_user_with_query_token
from api.story_writer.utils.auth import require_authenticated_user
from utils.asset_tracker import save_asset_to_library
from models.story_models import StoryAudioResult
from services.story_writer.audio_generation_service import StoryAudioGenerationService
from pathlib import Path
from utils.logger_utils import get_service_logger
router = APIRouter(tags=["youtube-audio"])
logger = get_service_logger("api.youtube.audio")
# Audio output directory
base_dir = Path(__file__).parent.parent.parent.parent
YOUTUBE_AUDIO_DIR = base_dir / "youtube_audio"
YOUTUBE_AUDIO_DIR.mkdir(parents=True, exist_ok=True)
# Initialize audio service
audio_service = StoryAudioGenerationService(output_dir=str(YOUTUBE_AUDIO_DIR))
def select_optimal_emotion(scene_title: str, narration: str, video_plan_context: Optional[Dict[str, Any]] = None) -> str:
"""
Intelligently select the best emotion for YouTube content based on scene analysis.
Available emotions: "happy", "sad", "angry", "fearful", "disgusted", "surprised", "neutral"
Returns the selected emotion string.
"""
# Default to happy for engaging YouTube content
selected_emotion = "happy"
scene_text = f"{scene_title} {narration}".lower()
# Hook scenes need excitement and energy
if "hook" in scene_title.lower() or any(word in scene_text for word in ["exciting", "amazing", "unbelievable", "shocking", "wow"]):
selected_emotion = "surprised" # Excited and attention-grabbing
# Emotional stories or inspirational content
elif any(word in scene_text for word in ["emotional", "touching", "heartwarming", "inspiring", "motivational"]):
selected_emotion = "happy" # Warm and uplifting
# Serious or professional content
elif any(word in scene_text for word in ["important", "critical", "serious", "professional", "expert"]):
selected_emotion = "neutral" # Professional and serious
# Problem-solving or tutorial content
elif any(word in scene_text for word in ["problem", "solution", "fix", "help", "guide"]):
selected_emotion = "happy" # Helpful and encouraging
# Call-to-action scenes
elif "cta" in scene_title.lower() or any(word in scene_text for word in ["subscribe", "like", "comment", "share", "action"]):
selected_emotion = "happy" # Confident and encouraging
# Negative or concerning topics
elif any(word in scene_text for word in ["warning", "danger", "risk", "problem", "issue"]):
selected_emotion = "neutral" # Serious but not alarming
# Check video plan context for overall tone
if video_plan_context:
tone = video_plan_context.get("tone", "").lower()
if "serious" in tone or "professional" in tone:
selected_emotion = "neutral"
elif "fun" in tone or "entertaining" in tone:
selected_emotion = "happy"
return selected_emotion
def select_optimal_voice(scene_title: str, narration: str, video_plan_context: Optional[Dict[str, Any]] = None) -> str:
"""
Intelligently select the best voice for YouTube content based on scene analysis.
Analyzes scene title, narration content, and video plan context to choose
the most appropriate voice from available Minimax voices.
Available voices: Wise_Woman, Friendly_Person, Inspirational_girl, Deep_Voice_Man,
Calm_Woman, Casual_Guy, Lively_Girl, Patient_Man, Young_Knight, Determined_Man,
Lovely_Girl, Decent_Boy, Imposing_Manner, Elegant_Man, Abbess, Sweet_Girl_2, Exuberant_Girl
Returns the selected voice_id string.
"""
# Default to Casual_Guy for engaging YouTube content
selected_voice = "Casual_Guy"
# Analyze video plan context for content type
if video_plan_context:
video_type = video_plan_context.get("video_type", "").lower()
target_audience = video_plan_context.get("target_audience", "").lower()
tone = video_plan_context.get("tone", "").lower()
# Educational/Professional content
if any(keyword in video_type for keyword in ["tutorial", "educational", "how-to", "guide", "course"]):
if "professional" in tone or "expert" in target_audience:
selected_voice = "Wise_Woman" # Authoritative and trustworthy
else:
selected_voice = "Patient_Man" # Clear and instructional
# Entertainment/Casual content
elif any(keyword in video_type for keyword in ["entertainment", "vlog", "lifestyle", "story", "review"]):
if "young" in target_audience or "millennial" in target_audience:
selected_voice = "Casual_Guy" # Friendly and relatable
elif "female" in target_audience or "women" in target_audience:
selected_voice = "Lively_Girl" # Energetic and engaging
else:
selected_voice = "Friendly_Person" # Approachable
# Motivational/Inspirational content
elif any(keyword in video_type for keyword in ["motivational", "inspirational", "success", "mindset"]):
selected_voice = "Inspirational_girl" # Uplifting and motivational
# Business/Corporate content
elif any(keyword in video_type for keyword in ["business", "corporate", "finance", "marketing"]):
selected_voice = "Elegant_Man" # Professional and sophisticated
# Tech/Gaming content
elif any(keyword in video_type for keyword in ["tech", "gaming", "software", "app"]):
selected_voice = "Young_Knight" # Energetic and modern
# Analyze scene content for specific voice requirements
scene_text = f"{scene_title} {narration}".lower()
# Hook scenes need energetic, attention-grabbing voices
if "hook" in scene_title.lower() or any(word in scene_text for word in ["attention", "grab", "exciting", "amazing", "unbelievable"]):
selected_voice = "Exuberant_Girl" # Very energetic and enthusiastic
# Emotional/stories need more expressive voices
elif any(word in scene_text for word in ["story", "emotional", "heartwarming", "touching", "inspiring"]):
selected_voice = "Inspirational_girl" # Emotional and inspiring
# Technical explanations need clear, precise voices
elif any(word in scene_text for word in ["technical", "explain", "step-by-step", "process", "how-to"]):
selected_voice = "Calm_Woman" # Clear and methodical
# Call-to-action scenes need confident, persuasive voices
elif "cta" in scene_title.lower() or any(word in scene_text for word in ["subscribe", "like", "comment", "share", "now", "today"]):
selected_voice = "Determined_Man" # Confident and persuasive
logger.info(f"[VoiceSelection] Selected '{selected_voice}' for scene: {scene_title[:50]}...")
return selected_voice
class YouTubeAudioRequest(BaseModel):
scene_id: str
scene_title: str
text: str
voice_id: Optional[str] = None # Will auto-select based on content if not provided
speed: float = 1.0
volume: float = 1.0
pitch: float = 0.0
emotion: str = "happy" # More engaging for YouTube content
english_normalization: bool = False
# Enhanced defaults for high-quality YouTube audio using Minimax Speech 02 HD
# Higher quality settings for professional YouTube content
sample_rate: Optional[int] = 44100 # CD quality: 44100 Hz (valid values: 8000, 16000, 22050, 24000, 32000, 44100)
bitrate: int = 256000 # Highest quality: 256kbps (valid values: 32000, 64000, 128000, 256000)
channel: Optional[str] = "2" # Stereo for richer audio (valid values: "1" or "2")
format: Optional[str] = "mp3" # Universal format for web
language_boost: Optional[str] = "English" # Optimize for English content
enable_sync_mode: bool = True
# Context for intelligent voice/emotion selection
video_plan_context: Optional[Dict[str, Any]] = None # Optional video plan for context-aware voice selection
class YouTubeAudioResponse(BaseModel):
scene_id: str
scene_title: str
audio_filename: str
audio_url: str
provider: str
model: str
voice_id: str
text_length: int
file_size: int
cost: float
@router.post("/audio", response_model=YouTubeAudioResponse)
async def generate_youtube_scene_audio(
request: YouTubeAudioRequest,
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
Generate AI audio for a YouTube scene using shared audio service.
Similar to Podcast's audio generation endpoint.
"""
user_id = require_authenticated_user(current_user)
if not request.text or not request.text.strip():
raise HTTPException(status_code=400, detail="Text is required")
try:
# Preprocess text to remove instructional markers that shouldn't be spoken
# Remove patterns like [Pacing: slow], [Instructions: ...], etc.
import re
processed_text = request.text.strip()
# Remove instructional markers that contain pacing, timing, or other non-spoken content
instructional_patterns = [
r'\[Pacing:\s*[^\]]+\]', # [Pacing: slow]
r'\[Instructions?:\s*[^\]]+\]', # [Instructions: ...]
r'\[Timing:\s*[^\]]+\]', # [Timing: ...]
r'\[Note:\s*[^\]]+\]', # [Note: ...]
r'\[Internal:\s*[^\]]+\]', # [Internal: ...]
]
for pattern in instructional_patterns:
processed_text = re.sub(pattern, '', processed_text, flags=re.IGNORECASE)
# Clean up extra whitespace and normalize
processed_text = re.sub(r'\s+', ' ', processed_text).strip()
if not processed_text:
raise HTTPException(status_code=400, detail="Text became empty after removing instructions. Please provide clean narration text.")
logger.info(f"[YouTubeAudio] Text preprocessing: {len(request.text)} -> {len(processed_text)} characters")
# Intelligent voice and emotion selection based on content analysis
if not request.voice_id:
selected_voice = select_optimal_voice(
request.scene_title,
processed_text,
request.video_plan_context
)
else:
selected_voice = request.voice_id
# Auto-select emotion if not specified or if using defaults
if request.emotion == "happy": # This means it wasn't specifically set by user
selected_emotion = select_optimal_emotion(
request.scene_title,
processed_text,
request.video_plan_context
)
else:
selected_emotion = request.emotion
logger.info(f"[YouTubeAudio] Voice selection: {selected_voice}, Emotion: {selected_emotion}")
# Build kwargs for optional parameters - use defaults if None
# WaveSpeed API requires specific values, so we provide sensible defaults
# This matches Podcast's approach but with explicit defaults to avoid None errors
optional_kwargs = {}
# DEBUG: Log what values we received
logger.info(f"[YouTubeAudio] Request parameters: sample_rate={request.sample_rate}, bitrate={request.bitrate}, channel={request.channel}, format={request.format}, language_boost={request.language_boost}")
# sample_rate: Use provided value or omit (WaveSpeed will use default)
if request.sample_rate is not None:
optional_kwargs["sample_rate"] = request.sample_rate
# bitrate: Always provide a value (default: 128000 = 128kbps)
# Valid values: 32000, 64000, 128000, 256000
# Model already has default of 128000, so request.bitrate will never be None
optional_kwargs["bitrate"] = request.bitrate
# channel: Only include if valid (WaveSpeed only accepts "1" or "2" as strings)
# If None, empty string, or invalid, omit it and WaveSpeed will use default
# NEVER include channel if it's not exactly "1" or "2"
if request.channel is not None and str(request.channel).strip() in ["1", "2"]:
optional_kwargs["channel"] = str(request.channel).strip()
logger.info(f"[YouTubeAudio] Including valid channel: {optional_kwargs['channel']}")
else:
logger.info(f"[YouTubeAudio] Omitting invalid channel: {request.channel}")
# format: Use provided value or omit (WaveSpeed will use default)
if request.format is not None:
optional_kwargs["format"] = request.format
# language_boost: Use provided value or omit (WaveSpeed will use default)
if request.language_boost is not None:
optional_kwargs["language_boost"] = request.language_boost
logger.info(f"[YouTubeAudio] Final optional_kwargs: {optional_kwargs}")
result: StoryAudioResult = audio_service.generate_ai_audio(
scene_number=0,
scene_title=request.scene_title,
text=processed_text,
user_id=user_id,
voice_id=selected_voice,
speed=request.speed or 1.0,
volume=request.volume or 1.0,
pitch=request.pitch or 0.0,
emotion=selected_emotion,
english_normalization=request.english_normalization or False,
enable_sync_mode=request.enable_sync_mode,
**optional_kwargs,
)
# Override URL to use YouTube endpoint instead of story endpoint
if result.get("audio_url") and "/api/story/audio/" in result.get("audio_url", ""):
audio_filename = result.get("audio_filename", "")
result["audio_url"] = f"/api/youtube/audio/{audio_filename}"
except Exception as exc:
logger.error(f"[YouTube] Audio generation failed: {exc}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Audio generation failed: {exc}")
# Save to asset library (youtube_creator module)
try:
if result.get("audio_url"):
save_asset_to_library(
db=db,
user_id=user_id,
asset_type="audio",
source_module="youtube_creator",
filename=result.get("audio_filename", ""),
file_url=result.get("audio_url", ""),
file_path=result.get("audio_path"),
file_size=result.get("file_size"),
mime_type="audio/mpeg",
title=f"{request.scene_title} - YouTube",
description="YouTube scene narration",
tags=["youtube_creator", "audio", request.scene_id],
provider=result.get("provider"),
model=result.get("model"),
cost=result.get("cost"),
asset_metadata={
"scene_id": request.scene_id,
"scene_title": request.scene_title,
"status": "completed",
},
)
except Exception as e:
logger.warning(f"[YouTube] Failed to save audio asset: {e}")
return YouTubeAudioResponse(
scene_id=request.scene_id,
scene_title=request.scene_title,
audio_filename=result.get("audio_filename", ""),
audio_url=result.get("audio_url", ""),
provider=result.get("provider", "wavespeed"),
model=result.get("model", "minimax/speech-02-hd"),
voice_id=result.get("voice_id", selected_voice),
text_length=result.get("text_length", len(request.text)),
file_size=result.get("file_size", 0),
cost=result.get("cost", 0.0),
)
@router.get("/audio/{filename}")
async def serve_youtube_audio(
filename: str,
current_user: Dict[str, Any] = Depends(get_current_user_with_query_token),
):
"""Serve generated YouTube scene audio files.
Supports authentication via Authorization header or token query parameter.
Query parameter is useful for HTML elements like <audio> that cannot send custom headers.
"""
require_authenticated_user(current_user)
# Security check: ensure filename doesn't contain path traversal
if ".." in filename or "/" in filename or "\\" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
audio_path = (YOUTUBE_AUDIO_DIR / filename).resolve()
# Security check: ensure path is within YOUTUBE_AUDIO_DIR
if not str(audio_path).startswith(str(YOUTUBE_AUDIO_DIR)):
raise HTTPException(status_code=403, detail="Access denied")
if not audio_path.exists():
raise HTTPException(status_code=404, detail="Audio file not found")
return FileResponse(audio_path, media_type="audio/mpeg")

View File

@@ -3,8 +3,9 @@
from pathlib import Path
from typing import Dict, Any, Optional
import uuid
from concurrent.futures import ThreadPoolExecutor
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from fastapi.responses import FileResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
@@ -13,10 +14,10 @@ from middleware.auth_middleware import get_current_user
from services.database import get_db
from services.subscription import PricingService
from services.subscription.preflight_validator import validate_image_generation_operations
from services.llm_providers.main_image_generation import generate_image
from services.wavespeed.client import WaveSpeedClient
from services.llm_providers.main_image_generation import generate_image, generate_character_image
from utils.asset_tracker import save_asset_to_library
from utils.logger_utils import get_service_logger
from ..task_manager import task_manager
router = APIRouter(tags=["youtube-image"])
logger = get_service_logger("api.youtube.image")
@@ -27,6 +28,9 @@ YOUTUBE_IMAGES_DIR = base_dir / "youtube_images"
YOUTUBE_IMAGES_DIR.mkdir(parents=True, exist_ok=True)
YOUTUBE_AVATARS_DIR = base_dir / "youtube_avatars"
# Thread pool for background image generation
_image_executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="youtube_image")
class YouTubeImageRequest(BaseModel):
scene_id: str
@@ -40,6 +44,7 @@ class YouTubeImageRequest(BaseModel):
style: Optional[str] = None # e.g., "Realistic", "Fiction"
rendering_speed: Optional[str] = None # e.g., "Quality", "Turbo"
aspect_ratio: Optional[str] = None # e.g., "16:9"
model: Optional[str] = None # e.g., "ideogram-v3-turbo", "qwen-image"
def require_authenticated_user(current_user: Dict[str, Any]) -> str:
@@ -50,13 +55,38 @@ def require_authenticated_user(current_user: Dict[str, Any]) -> str:
return str(user_id)
def _load_base_avatar_bytes(avatar_url: str) -> bytes:
def _load_base_avatar_bytes(avatar_url: str) -> Optional[bytes]:
"""Load base avatar bytes for character consistency."""
filename = avatar_url.split("/")[-1].split("?")[0]
avatar_path = YOUTUBE_AVATARS_DIR / filename
if not avatar_path.exists() or not avatar_path.is_file():
raise HTTPException(status_code=404, detail="Base avatar image not found")
return avatar_path.read_bytes()
try:
# Handle different avatar URL formats
if avatar_url.startswith("/api/youtube/avatars/"):
# YouTube avatar
filename = avatar_url.split("/")[-1].split("?")[0]
avatar_path = YOUTUBE_AVATARS_DIR / filename
elif avatar_url.startswith("/api/podcast/avatars/"):
# Podcast avatar (cross-module usage)
filename = avatar_url.split("/")[-1].split("?")[0]
from pathlib import Path
podcast_avatars_dir = Path(__file__).parent.parent.parent.parent / "podcast_avatars"
avatar_path = podcast_avatars_dir / filename
else:
# Try to extract filename and check YouTube avatars first
filename = avatar_url.split("/")[-1].split("?")[0]
avatar_path = YOUTUBE_AVATARS_DIR / filename
if not avatar_path.exists():
# Fallback to podcast avatars
podcast_avatars_dir = Path(__file__).parent.parent.parent.parent / "podcast_avatars"
avatar_path = podcast_avatars_dir / filename
if not avatar_path.exists() or not avatar_path.is_file():
logger.warning(f"[YouTube] Avatar file not found: {avatar_path}")
return None
logger.info(f"[YouTube] Successfully loaded avatar: {avatar_path}")
return avatar_path.read_bytes()
except Exception as e:
logger.error(f"[YouTube] Error loading avatar from {avatar_url}: {e}")
return None
def _save_scene_image(image_bytes: bytes, scene_id: str) -> Dict[str, str]:
@@ -75,14 +105,22 @@ def _save_scene_image(image_bytes: bytes, scene_id: str) -> Dict[str, str]:
}
@router.post("/image")
class YouTubeImageTaskResponse(BaseModel):
success: bool
task_id: str
message: str
@router.post("/image", response_model=YouTubeImageTaskResponse)
async def generate_youtube_scene_image(
background_tasks: BackgroundTasks,
request: YouTubeImageRequest,
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Generate a YouTube scene image, with optional avatar consistency."""
"""Generate a YouTube scene image with background task processing."""
logger.info(f"[YouTube] Image generation request received: scene='{request.scene_title}', user={current_user.get('id')}")
user_id = require_authenticated_user(current_user)
logger.info(f"[YouTube] User authenticated: {user_id}")
if not request.scene_title:
raise HTTPException(status_code=400, detail="Scene title is required")
@@ -97,25 +135,94 @@ async def generate_youtube_scene_image(
)
logger.info(f"[YouTube] ✅ Pre-flight validation passed for user {user_id}")
# Create background task
logger.info(f"[YouTube] Creating task for user {user_id}")
task_id = task_manager.create_task("youtube_image_generation")
logger.info(
f"[YouTube] Created image generation task {task_id} for user {user_id}, "
f"scene='{request.scene_title}'"
)
# Verify task was created
initial_status = task_manager.get_task_status(task_id)
if not initial_status:
logger.error(f"[YouTube] Failed to create task {task_id} - task not found immediately after creation")
return YouTubeImageTaskResponse(
success=False,
task_id="",
message="Failed to create image generation task. Please try again."
)
# Add background task (pass request data, not database session)
try:
background_tasks.add_task(
_execute_image_generation_task,
task_id=task_id,
request_data=request.dict(), # Convert to dict for background task
user_id=user_id,
)
logger.info(f"[YouTube] Background image generation task added for task {task_id}")
except Exception as bg_error:
logger.error(f"[YouTube] Failed to add background task for {task_id}: {bg_error}", exc_info=True)
# Mark task as failed
task_manager.update_task_status(
task_id,
"failed",
error=str(bg_error),
message="Failed to start image generation task"
)
return YouTubeImageTaskResponse(
success=False,
task_id="",
message=f"Failed to start image generation task: {str(bg_error)}"
)
logger.info(f"[YouTube] Returning success response for task {task_id}")
return YouTubeImageTaskResponse(
success=True,
task_id=task_id,
message=f"Image generation started for '{request.scene_title}'"
)
except HTTPException:
raise
except Exception as exc:
logger.error(f"[YouTube] Failed to create image generation task: {exc}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to start image generation: {str(exc)}")
def _execute_image_generation_task(task_id: str, request_data: dict, user_id: str):
"""Background task to generate YouTube scene image."""
# Reconstruct request object from dict
request = YouTubeImageRequest(**request_data)
logger.info(
f"[YouTubeImageGen] Background task started for task {task_id}, "
f"scene='{request.scene_title}', user={user_id}"
)
db = None
try:
# Update task status to processing
task_manager.update_task_status(
task_id, "processing", progress=10.0, message="Preparing image generation..."
)
# Get database session for this background task
from services.database import get_db
db = next(get_db())
logger.info(f"[YouTubeImageGen] Database session acquired for task {task_id}")
# Load avatar if provided
base_avatar_bytes = None
if request.base_avatar_url:
try:
base_avatar_bytes = _load_base_avatar_bytes(request.base_avatar_url)
logger.info(f"[YouTube] Loaded base avatar for scene {request.scene_id}")
except HTTPException:
raise
except Exception as e:
logger.error(f"[YouTube] Failed to load base avatar: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail={
"error": "Failed to load base avatar",
"message": f"Could not load the base avatar image: {str(e)}",
},
)
base_avatar_bytes = _load_base_avatar_bytes(request.base_avatar_url)
if base_avatar_bytes:
logger.info(f"[YouTubeImageGen] Loaded base avatar for task {task_id}")
else:
logger.warning(f"[YouTubeImageGen] Could not load base avatar for task {task_id}")
# Build prompt
image_prompt = ""
# Build prompt (same logic as before)
if base_avatar_bytes:
prompt_parts = []
if request.scene_title:
@@ -143,32 +250,58 @@ async def generate_youtube_scene_image(
prompt_parts.append("video-optimized composition, 16:9 aspect ratio")
image_prompt = ", ".join(prompt_parts)
# Generate image
task_manager.update_task_status(
task_id, "processing", progress=30.0, message="Generating image..."
)
logger.info(f"[YouTubeImageGen] Starting image generation for task {task_id}")
# Generate image (same logic as before)
provider = "wavespeed"
model = "ideogram-v3-turbo"
if base_avatar_bytes:
logger.info(f"[YouTube] Using character-consistent generation for scene {request.scene_id}")
logger.info(f"[YouTubeImageGen] Using character-consistent generation for task {task_id}")
style = request.style or "Realistic"
rendering_speed = request.rendering_speed or "Quality"
aspect_ratio = request.aspect_ratio or "16:9"
width = request.width or 1024
height = request.height or 576
wavespeed_client = WaveSpeedClient()
image_bytes = wavespeed_client.generate_character_image(
prompt=image_prompt,
reference_image_bytes=base_avatar_bytes,
style=style,
aspect_ratio=aspect_ratio,
rendering_speed=rendering_speed,
timeout=None,
)
model = "ideogram-character"
try:
# Use centralized character image generation with subscription checks and tracking
image_bytes = generate_character_image(
prompt=image_prompt,
reference_image_bytes=base_avatar_bytes,
user_id=user_id,
style=style,
aspect_ratio=aspect_ratio,
rendering_speed=rendering_speed,
timeout=60,
)
model = "ideogram-character"
logger.info(f"[YouTubeImageGen] Character image generation successful for task {task_id}")
except Exception as char_error:
logger.warning(f"[YouTubeImageGen] Character generation failed for task {task_id}: {char_error}")
logger.info(f"[YouTubeImageGen] Falling back to regular image generation for task {task_id}")
# Fall back to regular image generation with subscription tracking
image_options = {
"provider": "wavespeed",
"model": request.model or "ideogram-v3-turbo",
"width": width,
"height": height,
}
result = generate_image(
prompt=image_prompt,
options=image_options,
user_id=user_id,
)
image_bytes = result.image_bytes
else:
logger.info(f"[YouTube] Generating scene {request.scene_id} from scratch")
logger.info(f"[YouTubeImageGen] Generating scene from scratch for task {task_id}")
# Use centralized image generation with subscription tracking
image_options = {
"provider": "wavespeed",
"model": "ideogram-v3-turbo",
"model": request.model or "ideogram-v3-turbo",
"width": request.width or 1024,
"height": request.height or 576,
}
@@ -178,11 +311,34 @@ async def generate_youtube_scene_image(
user_id=user_id,
)
image_bytes = result.image_bytes
provider = result.provider
model = result.model
# Save image
saved = _save_scene_image(image_bytes, request.scene_id)
# Validate image bytes before saving
if not image_bytes or len(image_bytes) == 0:
raise ValueError("Image generation returned empty bytes")
# Basic validation: check if it's a valid image (PNG/JPEG header)
if not (image_bytes.startswith(b'\x89PNG') or image_bytes.startswith(b'\xff\xd8\xff')):
logger.warning(f"[YouTubeImageGen] Generated image may not be valid PNG/JPEG for task {task_id}")
# Don't fail - some formats might be valid, but log warning
task_manager.update_task_status(
task_id, "processing", progress=80.0, message="Saving image..."
)
# Save image with validation
try:
image_metadata = _save_scene_image(image_bytes, request.scene_id)
# Verify file was saved correctly
from pathlib import Path
saved_path = Path(image_metadata["image_path"])
if not saved_path.exists() or saved_path.stat().st_size == 0:
raise IOError(f"Image file was not saved correctly: {saved_path}")
logger.info(f"[YouTubeImageGen] Image saved successfully: {saved_path} ({saved_path.stat().st_size} bytes)")
except Exception as save_error:
logger.error(f"[YouTubeImageGen] Failed to save image for task {task_id}: {save_error}", exc_info=True)
raise
# Save to asset library
try:
@@ -191,41 +347,96 @@ async def generate_youtube_scene_image(
user_id=user_id,
asset_type="image",
source_module="youtube_creator",
filename=saved["image_filename"],
file_url=saved["image_url"],
file_path=saved["image_path"],
filename=image_metadata["image_filename"],
file_url=image_metadata["image_url"],
file_path=image_metadata["image_path"],
file_size=len(image_bytes),
mime_type="image/png",
title=f"YouTube Scene: {request.scene_title or request.scene_id}",
description=request.scene_content or f"Scene image for {request.scene_id}",
prompt=image_prompt,
tags=["youtube_creator", "scene", request.scene_id],
title=f"{request.scene_title} - YouTube Scene",
description=f"YouTube scene image for: {request.scene_title}",
tags=["youtube_creator", "scene_image", f"scene_{request.scene_id}"],
provider=provider,
model=model,
cost=0.10 if model == "ideogram-v3-turbo" else 0.05,
asset_metadata={
"scene_id": request.scene_id,
"scene_title": request.scene_title,
"has_base_avatar": bool(base_avatar_bytes),
"generation_type": "character" if base_avatar_bytes else "scene",
"width": request.width or 1024,
"height": request.height or 576,
},
)
except Exception as e:
logger.warning(f"[YouTube] Failed to save scene image to asset library: {e}")
logger.warning(f"[YouTubeImageGen] Failed to save image asset to library: {e}")
# Success!
task_manager.update_task_status(
task_id,
"completed",
progress=100.0,
message=f"Image generated successfully for '{request.scene_title}'",
result={
"scene_id": request.scene_id,
"scene_title": request.scene_title,
"image_filename": image_metadata["image_filename"],
"image_url": image_metadata["image_url"],
"provider": provider,
"model": model,
"width": request.width or 1024,
"height": request.height or 576,
"file_size": len(image_bytes),
"cost": 0.10 if model == "ideogram-v3-turbo" else 0.05,
}
)
logger.info(f"[YouTubeImageGen] ✅ Task {task_id} completed successfully")
return {
"scene_id": request.scene_id,
"scene_title": request.scene_title,
"image_filename": saved["image_filename"],
"image_url": saved["image_url"],
"width": request.width or 1024,
"height": request.height or 576,
}
except HTTPException:
raise
except Exception as exc:
logger.error(f"[YouTube] Scene image generation failed: {exc}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to generate scene image: {str(exc)}")
error_msg = str(exc)
logger.error(f"[YouTubeImageGen] Task {task_id} failed: {error_msg}", exc_info=True)
task_manager.update_task_status(
task_id,
"failed",
error=error_msg,
message=f"Image generation failed: {error_msg}"
)
finally:
if db:
db.close()
logger.info(f"[YouTubeImageGen] Database session closed for task {task_id}")
@router.get("/image/status/{task_id}")
async def get_image_generation_status(
task_id: str,
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""
Get the status of an image generation task.
Returns current progress, status, and result when complete.
"""
require_authenticated_user(current_user)
logger.info(f"[YouTubeAPI] Getting image generation status for task: {task_id}")
task_status = task_manager.get_task_status(task_id)
if task_status:
logger.info(f"[YouTubeAPI] Task {task_id} status: {task_status.get('status', 'unknown')}, progress: {task_status.get('progress', 0)}, has_result: {'result' in task_status}")
if not task_status:
logger.warning(
f"[YouTubeAPI] Image generation task {task_id} not found."
)
raise HTTPException(
status_code=404,
detail={
"error": "Task not found",
"message": "The image generation task was not found. It may have expired, been cleaned up, or the server may have restarted.",
"task_id": task_id,
"user_action": "Please try generating the image again."
}
)
return task_status
@router.get("/images/{category}/{filename}")

View File

@@ -20,11 +20,15 @@ from services.youtube.renderer import YouTubeVideoRendererService
from services.persona_data_service import PersonaDataService
from services.subscription import PricingService
from services.subscription.preflight_validator import validate_scene_animation_operation
from services.content_asset_service import ContentAssetService
from models.content_asset_models import AssetType, AssetSource
from utils.logger_utils import get_service_logger
from utils.asset_tracker import save_asset_to_library
from services.story_writer.video_generation_service import StoryVideoGenerationService
from .task_manager import task_manager
from .handlers import avatar as avatar_handlers
from .handlers import images as image_handlers
from .handlers import audio as audio_handlers
router = APIRouter(prefix="/youtube", tags=["youtube"])
logger = get_service_logger("api.youtube")
@@ -38,9 +42,10 @@ YOUTUBE_AVATARS_DIR.mkdir(parents=True, exist_ok=True)
YOUTUBE_IMAGES_DIR = base_dir / "youtube_images"
YOUTUBE_IMAGES_DIR.mkdir(parents=True, exist_ok=True)
# Include sub-routers for avatar and images
# Include sub-routers for avatar, images, and audio
router.include_router(avatar_handlers.router)
router.include_router(image_handlers.router)
router.include_router(audio_handlers.router)
# Request/Response Models
@@ -140,6 +145,52 @@ class VideoRenderRequest(BaseModel):
voice_id: str = Field("Wise_Woman", description="Voice ID for narration")
class SceneVideoRenderRequest(BaseModel):
"""Request model for rendering a single scene video."""
scene: Dict[str, Any] = Field(..., description="Single scene data to render")
video_plan: Dict[str, Any] = Field(..., description="Original video plan (context)")
resolution: str = Field("720p", pattern="^(480p|720p|1080p)$", description="Video resolution")
voice_id: str = Field("Wise_Woman", description="Voice ID for narration")
generate_audio_enabled: bool = Field(False, description="Whether to auto-generate audio if missing (default false)")
class SceneVideoRenderResponse(BaseModel):
"""Response model for single scene video rendering."""
success: bool
task_id: Optional[str] = None
message: str
scene_number: Optional[int] = None
class CombineVideosRequest(BaseModel):
"""Request model for combining multiple scene videos."""
video_urls: List[str] = Field(..., description="List of scene video URLs to combine in order")
video_plan: Optional[Dict[str, Any]] = Field(None, description="Original video plan (for metadata)")
resolution: str = Field("720p", pattern="^(480p|720p|1080p)$", description="Target resolution for output")
title: Optional[str] = Field(None, description="Optional title for the final video")
class CombineVideosResponse(BaseModel):
"""Response model for combine videos request."""
success: bool
task_id: Optional[str] = None
message: str
class VideoListResponse(BaseModel):
"""Response model for listing user videos."""
videos: List[Dict[str, Any]]
success: bool = True
message: str = "Videos fetched successfully"
class CombineVideosRequest(BaseModel):
"""Request model for combining multiple scene videos."""
scene_video_urls: List[str] = Field(..., description="List of scene video URLs to combine")
resolution: str = Field("720p", pattern="^(480p|720p|1080p)$", description="Output video resolution")
title: Optional[str] = Field(None, description="Optional title for the combined video")
class VideoRenderResponse(BaseModel):
"""Response model for video rendering."""
success: bool
@@ -151,6 +202,7 @@ class CostEstimateRequest(BaseModel):
"""Request model for cost estimation."""
scenes: List[Dict[str, Any]] = Field(..., description="List of scenes to estimate")
resolution: str = Field("720p", pattern="^(480p|720p|1080p)$", description="Video resolution")
image_model: Optional[str] = Field("ideogram-v3-turbo", description="Image generation model")
class CostEstimateResponse(BaseModel):
@@ -438,6 +490,12 @@ async def start_video_render(
duration = scene.get("duration_estimate", 5)
if duration < 1 or duration > 10:
validation_errors.append(f"Scene {scene_num}: Invalid duration ({duration}s, must be 1-10 seconds)")
# VALIDATION: Check for required assets (image and audio)
if not scene.get("imageUrl"):
validation_errors.append(f"Scene {scene_num}: Missing image. Please generate an image for this scene first.")
if not scene.get("audioUrl"):
validation_errors.append(f"Scene {scene_num}: Missing audio. Please generate audio narration for this scene first.")
if validation_errors:
error_msg = "Validation failed: " + "; ".join(validation_errors)
@@ -511,6 +569,118 @@ async def start_video_render(
)
@router.post("/render/scene", response_model=SceneVideoRenderResponse)
async def render_single_scene_video(
request: SceneVideoRenderRequest,
background_tasks: BackgroundTasks,
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db),
) -> SceneVideoRenderResponse:
"""
Render a single scene video (scene-wise generation).
Returns a task_id for polling.
"""
try:
user_id = require_authenticated_user(current_user)
# Subscription validation (same as full render)
pricing_service = PricingService(db)
validate_scene_animation_operation(
pricing_service=pricing_service,
user_id=user_id
)
scene = request.scene
scene_num = scene.get("scene_number", 0)
# Pre-validation to avoid wasted calls
validation_errors = []
visual_prompt = (scene.get("enhanced_visual_prompt") or scene.get("visual_prompt", "")).strip()
duration = scene.get("duration_estimate", 5)
if not visual_prompt:
validation_errors.append(f"Scene {scene_num}: Missing visual prompt")
elif len(visual_prompt) < 5:
validation_errors.append(f"Scene {scene_num}: Visual prompt too short ({len(visual_prompt)} chars, minimum 5)")
if duration < 1 or duration > 10:
validation_errors.append(f"Scene {scene_num}: Invalid duration ({duration}s, must be 1-10 seconds)")
if not scene.get("imageUrl"):
validation_errors.append(f"Scene {scene_num}: Missing image. Please generate an image first.")
if not scene.get("audioUrl") and not request.generate_audio_enabled:
validation_errors.append(f"Scene {scene_num}: Missing audio. Please generate audio first or enable generate_audio_enabled.")
if validation_errors:
error_msg = "Validation failed: " + "; ".join(validation_errors)
logger.warning(f"[YouTubeAPI] {error_msg}")
return SceneVideoRenderResponse(
success=False,
task_id=None,
message=error_msg,
scene_number=scene_num
)
# Create task
task_id = task_manager.create_task("youtube_scene_video_render")
logger.info(
f"[YouTubeAPI] Created single-scene render task {task_id} for user {user_id}, scene={scene_num}, resolution={request.resolution}"
)
initial_status = task_manager.get_task_status(task_id)
if not initial_status:
logger.error(f"[YouTubeAPI] Failed to create task {task_id} - task not found immediately after creation")
return SceneVideoRenderResponse(
success=False,
task_id=None,
message="Failed to create render task. Please try again.",
scene_number=scene_num
)
# Add background task
try:
background_tasks.add_task(
_execute_scene_video_render_task,
task_id=task_id,
scene=scene,
video_plan=request.video_plan,
user_id=user_id,
resolution=request.resolution,
generate_audio_enabled=request.generate_audio_enabled,
voice_id=request.voice_id,
)
logger.info(f"[YouTubeAPI] Background task added for single scene {task_id}")
except Exception as bg_error:
logger.error(f"[YouTubeAPI] Failed to add background task for {task_id}: {bg_error}", exc_info=True)
task_manager.update_task_status(
task_id,
"failed",
error=str(bg_error),
message="Failed to start background render task"
)
return SceneVideoRenderResponse(
success=False,
task_id=None,
message=f"Failed to start render task: {str(bg_error)}",
scene_number=scene_num
)
return SceneVideoRenderResponse(
success=True,
task_id=task_id,
message=f"Scene {scene_num} rendering started.",
scene_number=scene_num
)
except HTTPException:
raise
except Exception as e:
logger.error(f"[YouTubeAPI] Error starting single-scene render: {e}", exc_info=True)
return SceneVideoRenderResponse(
success=False,
task_id=None,
message=f"Failed to start scene render: {str(e)}",
scene_number=request.scene.get("scene_number") if request and request.scene else None
)
@router.get("/render/{task_id}")
async def get_render_status(
task_id: str,
@@ -553,6 +723,85 @@ async def get_render_status(
)
@router.post("/render/combine", response_model=VideoRenderResponse)
async def combine_videos(
request: CombineVideosRequest,
background_tasks: BackgroundTasks,
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db),
) -> VideoRenderResponse:
"""
Combine multiple scene videos into a final video.
Returns task_id for polling.
"""
try:
user_id = require_authenticated_user(current_user)
# Subscription validation
pricing_service = PricingService(db)
validate_scene_animation_operation(
pricing_service=pricing_service,
user_id=user_id
)
if not request.scene_video_urls or len(request.scene_video_urls) < 2:
return VideoRenderResponse(
success=False,
message="At least two scene videos are required to combine."
)
task_id = task_manager.create_task("youtube_combine_video")
logger.info(
f"[YouTubeAPI] Created combine task {task_id} for user {user_id}, videos={len(request.scene_video_urls)}, resolution={request.resolution}"
)
initial_status = task_manager.get_task_status(task_id)
if not initial_status:
logger.error(f"[YouTubeAPI] Failed to create combine task {task_id} - task not found immediately after creation")
return VideoRenderResponse(
success=False,
message="Failed to create combine task. Please try again."
)
try:
background_tasks.add_task(
_execute_combine_video_task,
task_id=task_id,
scene_video_urls=request.scene_video_urls,
user_id=user_id,
resolution=request.resolution,
title=request.title,
)
logger.info(f"[YouTubeAPI] Background combine task added for {task_id}")
except Exception as bg_error:
logger.error(f"[YouTubeAPI] Failed to add combine background task for {task_id}: {bg_error}", exc_info=True)
task_manager.update_task_status(
task_id,
"failed",
error=str(bg_error),
message="Failed to start combine task"
)
return VideoRenderResponse(
success=False,
message=f"Failed to start combine task: {str(bg_error)}"
)
return VideoRenderResponse(
success=True,
task_id=task_id,
message="Video combination started."
)
except HTTPException:
raise
except Exception as e:
logger.error(f"[YouTubeAPI] Error starting combine: {e}", exc_info=True)
return VideoRenderResponse(
success=False,
message=f"Failed to start combine: {str(e)}"
)
def _execute_video_render_task(
task_id: str,
scenes: List[Dict[str, Any]],
@@ -891,6 +1140,374 @@ def _execute_video_render_task(
)
def _execute_scene_video_render_task(
task_id: str,
scene: Dict[str, Any],
video_plan: Dict[str, Any],
user_id: str,
resolution: str,
generate_audio_enabled: bool,
voice_id: str,
):
"""Background task to render a single scene video (scene-wise generation)."""
scene_num = scene.get("scene_number", 0)
logger.info(
f"[YouTubeRenderer] Background single-scene task started for task {task_id}, scene={scene_num}, user={user_id}"
)
task_status = task_manager.get_task_status(task_id)
if not task_status:
logger.error(
f"[YouTubeRenderer] Task {task_id} not found when single-scene task started."
)
return
try:
task_manager.update_task_status(
task_id, "processing", progress=5.0, message=f"Rendering scene {scene_num}..."
)
renderer = YouTubeVideoRendererService()
scene_result = renderer.render_scene_video(
scene=scene,
video_plan=video_plan,
user_id=user_id,
resolution=resolution,
generate_audio_enabled=generate_audio_enabled,
voice_id=voice_id,
)
total_cost = scene_result.get("cost", 0.0) or 0.0
result = {
"scene_results": [scene_result],
"failed_scenes": [],
"total_cost": total_cost,
"final_video_url": scene_result.get("video_url"),
"num_successful": 1,
"num_failed": 0,
"resolution": resolution,
"partial_success": False,
"scene_number": scene_num,
"video_url": scene_result.get("video_url"),
"video_filename": scene_result.get("video_filename"),
}
task_manager.update_task_status(
task_id,
"completed",
progress=100.0,
message=f"Scene {scene_num} rendered successfully",
result=result,
)
logger.info(
f"[YouTubeRenderer] ✅ Single-scene render {task_id} completed (scene {scene_num}), cost=${total_cost:.2f}"
)
except HTTPException as exc:
error_msg = (
str(exc.detail)
if isinstance(exc.detail, str)
else exc.detail.get("error", "Render failed")
if isinstance(exc.detail, dict)
else "Render failed"
)
logger.error(f"[YouTubeRenderer] Single-scene task {task_id} failed: {error_msg}")
task_manager.update_task_status(
task_id,
"failed",
error=error_msg,
message=f"Scene {scene_num} rendering failed: {error_msg}",
)
except Exception as exc:
error_msg = str(exc)
logger.error(f"[YouTubeRenderer] Single-scene task {task_id} error: {error_msg}", exc_info=True)
task_manager.update_task_status(
task_id,
"failed",
error=error_msg,
message=f"Scene {scene_num} rendering error: {error_msg}",
)
@router.post("/render/combine", response_model=CombineVideosResponse)
async def combine_scene_videos(
request: CombineVideosRequest,
background_tasks: BackgroundTasks,
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db),
) -> CombineVideosResponse:
"""
Combine multiple scene videos into a final video.
Returns task_id for polling.
"""
try:
user_id = require_authenticated_user(current_user)
# Subscription validation (reuse scene animation check)
pricing_service = PricingService(db)
validate_scene_animation_operation(
pricing_service=pricing_service,
user_id=user_id
)
if not request.video_urls or len(request.video_urls) < 2:
return CombineVideosResponse(
success=False,
task_id=None,
message="At least two videos are required to combine."
)
# Pre-validate that referenced video files exist and are within youtube_videos dir
base_dir = Path(__file__).parent.parent.parent.parent
youtube_video_dir = base_dir / "youtube_videos"
missing_files = []
for url in request.video_urls:
filename = Path(url).name # strips query params if present
video_path = youtube_video_dir / filename
# prevent directory traversal
if ".." in filename or "/" in filename or "\\" in filename:
return CombineVideosResponse(
success=False,
task_id=None,
message=f"Invalid video filename: {filename}"
)
if not video_path.exists():
missing_files.append(filename)
if missing_files:
return CombineVideosResponse(
success=False,
task_id=None,
message=f"Video files not found for combine: {', '.join(missing_files)}"
)
# Create task
task_id = task_manager.create_task("youtube_video_combine")
logger.info(
f"[YouTubeAPI] Created combine task {task_id} for user {user_id}, videos={len(request.video_urls)}, resolution={request.resolution}"
)
initial_status = task_manager.get_task_status(task_id)
if not initial_status:
logger.error(f"[YouTubeAPI] Failed to create combine task {task_id} - task not found immediately after creation")
return CombineVideosResponse(
success=False,
task_id=None,
message="Failed to create combine task. Please try again."
)
# Background combine task
try:
background_tasks.add_task(
_execute_combine_video_task,
task_id=task_id,
scene_video_urls=request.video_urls,
user_id=user_id,
resolution=request.resolution,
title=request.title,
)
logger.info(f"[YouTubeAPI] Background combine task added for task {task_id}")
except Exception as bg_error:
logger.error(f"[YouTubeAPI] Failed to add combine task {task_id}: {bg_error}", exc_info=True)
task_manager.update_task_status(
task_id,
"failed",
error=str(bg_error),
message="Failed to start video combination task"
)
return CombineVideosResponse(
success=False,
task_id=None,
message=f"Failed to start combination task: {str(bg_error)}"
)
return CombineVideosResponse(
success=True,
task_id=task_id,
message=f"Combining {len(request.video_urls)} videos...",
)
except HTTPException:
raise
except Exception as e:
logger.error(f"[YouTubeAPI] Error combining videos: {e}", exc_info=True)
return CombineVideosResponse(
success=False,
task_id=None,
message=f"Failed to start video combination: {str(e)}"
)
@router.get("/videos", response_model=VideoListResponse)
async def list_videos(
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db),
) -> VideoListResponse:
"""
List videos for the current user from the asset library (source: youtube_creator).
Used to rescue/persist scene videos after reloads.
"""
user_id = require_authenticated_user(current_user)
asset_service = ContentAssetService(db)
assets = asset_service.get_assets(
user_id=user_id,
asset_type=AssetType.VIDEO,
source_module=AssetSource.YOUTUBE_CREATOR,
limit=100,
)
videos = []
for asset in assets:
videos.append({
"scene_number": asset.asset_metadata.get("scene_number") if asset.asset_metadata else None,
"video_url": asset.file_url,
"filename": asset.filename,
"created_at": asset.created_at,
"resolution": asset.asset_metadata.get("resolution") if asset.asset_metadata else None,
})
return VideoListResponse(videos=videos)
def _execute_combine_video_task(
task_id: str,
scene_video_urls: List[str],
user_id: str,
resolution: str,
title: Optional[str],
):
"""Background task to combine multiple scene videos into one final video."""
logger.info(
f"[YouTubeRenderer] Background combine task started for task {task_id}, videos={len(scene_video_urls)}, user={user_id}"
)
task_status = task_manager.get_task_status(task_id)
if not task_status:
logger.error(f"[YouTubeRenderer] Task {task_id} not found when combine task started.")
return
base_dir = Path(__file__).parent.parent.parent.parent
youtube_video_dir = base_dir / "youtube_videos"
try:
task_manager.update_task_status(
task_id, "processing", progress=5.0, message="Preparing to combine videos..."
)
# Resolve video paths from URLs
video_paths: List[Path] = []
for url in scene_video_urls:
filename = Path(url).name
video_path = youtube_video_dir / filename
if not video_path.exists():
logger.error(f"[YouTubeRenderer] Video file not found for combine: {video_path}")
raise HTTPException(
status_code=404,
detail=f"Video file not found: {filename}",
)
video_paths.append(video_path)
if len(video_paths) < 2:
raise HTTPException(status_code=400, detail="Need at least two videos to combine.")
task_manager.update_task_status(
task_id, "processing", progress=25.0, message="Combining scene videos..."
)
video_service = StoryVideoGenerationService(output_dir=str(youtube_video_dir))
combined_result = video_service.generate_story_video(
scenes=[
{"scene_number": idx + 1, "title": f"Scene {idx + 1}"}
for idx in range(len(video_paths))
],
image_paths=[None] * len(video_paths),
audio_paths=[],
video_paths=[str(p) for p in video_paths],
user_id=user_id,
story_title=title or "YouTube Video",
fps=24,
)
task_manager.update_task_status(
task_id, "processing", progress=90.0, message="Finalizing combined video..."
)
final_path = combined_result["video_path"]
final_url = combined_result["video_url"]
file_size = combined_result.get("file_size", 0)
# Save to asset library
try:
db = next(get_db())
try:
save_asset_to_library(
db=db,
user_id=user_id,
asset_type="video",
source_module="youtube_creator",
filename=Path(final_path).name,
file_url=final_url,
file_path=str(final_path),
file_size=file_size,
mime_type="video/mp4",
title=title or "YouTube Video",
description="Combined YouTube creator video",
tags=["youtube_creator", "video", "combined", resolution],
provider="wavespeed",
model="alibaba/wan-2.5/text-to-video",
cost=0.0,
asset_metadata={
"resolution": resolution,
"status": "completed",
"scene_count": len(video_paths),
},
)
finally:
db.close()
except Exception as e:
logger.warning(f"[YouTubeRenderer] Failed to save combined video to asset library: {e}")
result = {
"video_url": final_url,
"video_path": final_path,
"resolution": resolution,
"scene_count": len(video_paths),
}
task_manager.update_task_status(
task_id,
"completed",
progress=100.0,
message="Combined video generated successfully",
result=result,
)
logger.info(
f"[YouTubeRenderer] ✅ Combine task {task_id} completed, scenes={len(video_paths)}"
)
except HTTPException as exc:
error_msg = exc.detail if isinstance(exc.detail, str) else str(exc.detail)
logger.error(f"[YouTubeRenderer] Combine task {task_id} failed: {error_msg}")
task_manager.update_task_status(
task_id,
"failed",
error=error_msg,
message=f"Combine failed: {error_msg}",
)
except Exception as exc:
error_msg = str(exc)
logger.error(f"[YouTubeRenderer] Combine task {task_id} error: {error_msg}", exc_info=True)
task_manager.update_task_status(
task_id,
"failed",
error=error_msg,
message=f"Combine error: {error_msg}",
)
@router.post("/estimate-cost", response_model=CostEstimateResponse)
async def estimate_render_cost(
request: CostEstimateRequest,
@@ -918,6 +1535,7 @@ async def estimate_render_cost(
estimate = renderer.estimate_render_cost(
scenes=request.scenes,
resolution=request.resolution,
image_model=request.image_model,
)
return CostEstimateResponse(

View File

@@ -140,6 +140,10 @@ def generate_audio(
# Avoid passing duplicate enable_sync_mode; allow override via kwargs
enable_sync_mode = kwargs.pop("enable_sync_mode", True)
# Filter out None values from kwargs to prevent WaveSpeed validation errors
filtered_kwargs = {k: v for k, v in kwargs.items() if v is not None}
logger.info(f"[audio_gen] Filtered kwargs (removed None values): {filtered_kwargs}")
client = WaveSpeedClient()
audio_bytes = client.generate_speech(
text=text,
@@ -149,7 +153,7 @@ def generate_audio(
pitch=pitch,
emotion=emotion,
enable_sync_mode=enable_sync_mode,
**kwargs
**filtered_kwargs
)
logger.info(f"[audio_gen] ✅ API call successful, generated {len(audio_bytes)} bytes")

View File

@@ -1,6 +1,8 @@
from __future__ import annotations
import os
import sys
from datetime import datetime
from typing import Optional, Dict, Any
from .image_generation import (
@@ -110,6 +112,367 @@ def generate_image(prompt: str, options: Optional[Dict[str, Any]] = None, user_i
logger.info("Generating image via provider=%s model=%s", provider_name, image_options.model)
provider = _get_provider(provider_name)
return provider.generate(image_options)
result = provider.generate(image_options)
# TRACK USAGE after successful API call
has_image_bytes = bool(result.image_bytes) if result else False
image_bytes_len = len(result.image_bytes) if (result and result.image_bytes) else 0
logger.info(f"[Image Generation] Checking tracking conditions: user_id={user_id}, has_result={bool(result)}, has_image_bytes={has_image_bytes}, image_bytes_len={image_bytes_len}")
if user_id and result and result.image_bytes:
logger.info(f"[Image Generation] ✅ API call successful, tracking usage for user {user_id}")
try:
from services.database import get_db as get_db_track
db_track = next(get_db_track())
try:
from models.subscription_models import UsageSummary, APIUsageLog, APIProvider
from services.subscription import PricingService
pricing = PricingService(db_track)
current_period = pricing.get_current_billing_period(user_id) or datetime.now().strftime("%Y-%m")
# Get or create usage summary
summary = db_track.query(UsageSummary).filter(
UsageSummary.user_id == user_id,
UsageSummary.billing_period == current_period
).first()
if not summary:
summary = UsageSummary(
user_id=user_id,
billing_period=current_period
)
db_track.add(summary)
db_track.flush()
# Get cost from result metadata or calculate
estimated_cost = 0.0
if result.metadata and "estimated_cost" in result.metadata:
estimated_cost = float(result.metadata["estimated_cost"])
else:
# Fallback: estimate based on provider/model
if provider_name == "wavespeed":
if result.model and "qwen" in result.model.lower():
estimated_cost = 0.05
else:
estimated_cost = 0.10 # ideogram-v3-turbo default
elif provider_name == "stability":
estimated_cost = 0.04
else:
estimated_cost = 0.05 # Default estimate
# Get current values before update
current_calls_before = getattr(summary, "stability_calls", 0) or 0
current_cost_before = getattr(summary, "stability_cost", 0.0) or 0.0
# Update image calls and cost
new_calls = current_calls_before + 1
new_cost = current_cost_before + estimated_cost
# Use direct SQL UPDATE for dynamic attributes
from sqlalchemy import text as sql_text
update_query = sql_text("""
UPDATE usage_summaries
SET stability_calls = :new_calls,
stability_cost = :new_cost
WHERE user_id = :user_id AND billing_period = :period
""")
db_track.execute(update_query, {
'new_calls': new_calls,
'new_cost': new_cost,
'user_id': user_id,
'period': current_period
})
# Update total cost
summary.total_cost = (summary.total_cost or 0.0) + estimated_cost
summary.total_calls = (summary.total_calls or 0) + 1
summary.updated_at = datetime.utcnow()
# Determine API provider based on actual provider
api_provider = APIProvider.STABILITY # Default for image generation
# Create usage log
usage_log = APIUsageLog(
user_id=user_id,
provider=api_provider,
endpoint="/image-generation",
method="POST",
model_used=result.model or "unknown",
tokens_input=0,
tokens_output=0,
tokens_total=0,
cost_input=0.0,
cost_output=0.0,
cost_total=estimated_cost,
response_time=0.0,
status_code=200,
request_size=len(prompt.encode("utf-8")),
response_size=len(result.image_bytes),
billing_period=current_period,
)
db_track.add(usage_log)
# Get plan details for unified log
limits = pricing.get_user_limits(user_id)
plan_name = limits.get('plan_name', 'unknown') if limits else 'unknown'
tier = limits.get('tier', 'unknown') if limits else 'unknown'
image_limit = limits['limits'].get("stability_calls", 0) if limits else 0
# Only show ∞ for Enterprise tier when limit is 0 (unlimited)
image_limit_display = image_limit if (image_limit > 0 or tier != 'enterprise') else ''
# Get related stats for unified log
current_audio_calls = getattr(summary, "audio_calls", 0) or 0
audio_limit = limits['limits'].get("audio_calls", 0) if limits else 0
current_image_edit_calls = getattr(summary, "image_edit_calls", 0) or 0
image_edit_limit = limits['limits'].get("image_edit_calls", 0) if limits else 0
current_video_calls = getattr(summary, "video_calls", 0) or 0
video_limit = limits['limits'].get("video_calls", 0) if limits else 0
db_track.commit()
logger.info(f"[Image Generation] ✅ Successfully tracked usage: user {user_id} -> image -> {new_calls} calls, ${estimated_cost:.4f}")
# UNIFIED SUBSCRIPTION LOG - Shows before/after state in one message
print(f"""
[SUBSCRIPTION] Image Generation
├─ User: {user_id}
├─ Plan: {plan_name} ({tier})
├─ Provider: {provider_name}
├─ Actual Provider: {provider_name}
├─ Model: {result.model or 'unknown'}
├─ Calls: {current_calls_before}{new_calls} / {image_limit_display}
├─ Cost: ${current_cost_before:.4f} → ${new_cost:.4f}
├─ Audio: {current_audio_calls} / {audio_limit if audio_limit > 0 else ''}
├─ Image Editing: {current_image_edit_calls} / {image_edit_limit if image_edit_limit > 0 else ''}
├─ Videos: {current_video_calls} / {video_limit if video_limit > 0 else ''}
└─ Status: ✅ Allowed & Tracked
""", flush=True)
sys.stdout.flush()
except Exception as track_error:
logger.error(f"[Image Generation] ❌ Error tracking usage (non-blocking): {track_error}", exc_info=True)
import traceback
logger.error(f"[Image Generation] Full traceback: {traceback.format_exc()}")
db_track.rollback()
finally:
db_track.close()
except Exception as usage_error:
logger.error(f"[Image Generation] ❌ Failed to track usage: {usage_error}", exc_info=True)
import traceback
logger.error(f"[Image Generation] Full traceback: {traceback.format_exc()}")
else:
logger.warning(f"[Image Generation] ⚠️ Skipping usage tracking: user_id={user_id}, image_bytes={len(result.image_bytes) if result.image_bytes else 0} bytes")
return result
def generate_character_image(
prompt: str,
reference_image_bytes: bytes,
user_id: Optional[str] = None,
style: str = "Realistic",
aspect_ratio: str = "16:9",
rendering_speed: str = "Quality",
timeout: Optional[int] = None,
) -> bytes:
"""Generate character-consistent image with pre-flight validation and usage tracking.
Uses Ideogram Character API via WaveSpeed to maintain character consistency.
Args:
prompt: Text prompt describing the scene/context for the character
reference_image_bytes: Reference image bytes (base avatar)
user_id: User ID for subscription checking (required)
style: Character style type ("Auto", "Fiction", or "Realistic")
aspect_ratio: Aspect ratio ("1:1", "16:9", "9:16", "4:3", "3:4")
rendering_speed: Rendering speed ("Default", "Turbo", "Quality")
timeout: Total timeout in seconds for submission + polling (default: 180)
Returns:
bytes: Generated image bytes with consistent character
"""
# PRE-FLIGHT VALIDATION: Validate image generation before API call
if user_id:
from services.database import get_db
from services.subscription import PricingService
from services.subscription.preflight_validator import validate_image_generation_operations
from fastapi import HTTPException
logger.info(f"[Character Image Generation] 🔍 Starting pre-flight validation for user_id={user_id}")
db = next(get_db())
try:
pricing_service = PricingService(db)
# Raises HTTPException immediately if validation fails
validate_image_generation_operations(
pricing_service=pricing_service,
user_id=user_id,
num_images=1,
)
logger.info(f"[Character Image Generation] ✅ Pre-flight validation passed for user_id={user_id} - proceeding with character image generation")
except HTTPException as http_ex:
# Re-raise immediately - don't proceed with API call
logger.error(f"[Character Image Generation] ❌ Pre-flight validation failed for user_id={user_id} - blocking API call: {http_ex.detail}")
raise
finally:
db.close()
else:
logger.warning(f"[Character Image Generation] ⚠️ No user_id provided - skipping pre-flight validation (this should not happen in production)")
# Generate character image via WaveSpeed
from services.wavespeed.client import WaveSpeedClient
from fastapi import HTTPException
try:
wavespeed_client = WaveSpeedClient()
image_bytes = wavespeed_client.generate_character_image(
prompt=prompt,
reference_image_bytes=reference_image_bytes,
style=style,
aspect_ratio=aspect_ratio,
rendering_speed=rendering_speed,
timeout=timeout,
)
# TRACK USAGE after successful API call
has_image_bytes = bool(image_bytes) if image_bytes else False
image_bytes_len = len(image_bytes) if image_bytes else 0
logger.info(f"[Character Image Generation] Checking tracking conditions: user_id={user_id}, has_image_bytes={has_image_bytes}, image_bytes_len={image_bytes_len}")
if user_id and image_bytes:
logger.info(f"[Character Image Generation] ✅ API call successful, tracking usage for user {user_id}")
try:
from services.database import get_db as get_db_track
db_track = next(get_db_track())
try:
from models.subscription_models import UsageSummary, APIUsageLog, APIProvider
from services.subscription import PricingService
pricing = PricingService(db_track)
current_period = pricing.get_current_billing_period(user_id) or datetime.now().strftime("%Y-%m")
# Get or create usage summary
summary = db_track.query(UsageSummary).filter(
UsageSummary.user_id == user_id,
UsageSummary.billing_period == current_period
).first()
if not summary:
summary = UsageSummary(
user_id=user_id,
billing_period=current_period
)
db_track.add(summary)
db_track.flush()
# Character image cost (same as ideogram-v3-turbo)
estimated_cost = 0.10
current_calls_before = getattr(summary, "stability_calls", 0) or 0
current_cost_before = getattr(summary, "stability_cost", 0.0) or 0.0
new_calls = current_calls_before + 1
new_cost = current_cost_before + estimated_cost
# Use direct SQL UPDATE for dynamic attributes
from sqlalchemy import text as sql_text
update_query = sql_text("""
UPDATE usage_summaries
SET stability_calls = :new_calls,
stability_cost = :new_cost
WHERE user_id = :user_id AND billing_period = :period
""")
db_track.execute(update_query, {
'new_calls': new_calls,
'new_cost': new_cost,
'user_id': user_id,
'period': current_period
})
# Update total cost
summary.total_cost = (summary.total_cost or 0.0) + estimated_cost
summary.total_calls = (summary.total_calls or 0) + 1
summary.updated_at = datetime.utcnow()
# Create usage log
usage_log = APIUsageLog(
user_id=user_id,
provider=APIProvider.STABILITY, # Image generation uses STABILITY provider
endpoint="/image-generation/character",
method="POST",
model_used="ideogram-character",
tokens_input=0,
tokens_output=0,
tokens_total=0,
cost_input=0.0,
cost_output=0.0,
cost_total=estimated_cost,
response_time=0.0,
status_code=200,
request_size=len(prompt.encode("utf-8")),
response_size=len(image_bytes),
billing_period=current_period,
)
db_track.add(usage_log)
# Get plan details for unified log
limits = pricing.get_user_limits(user_id)
plan_name = limits.get('plan_name', 'unknown') if limits else 'unknown'
tier = limits.get('tier', 'unknown') if limits else 'unknown'
image_limit = limits['limits'].get("stability_calls", 0) if limits else 0
image_limit_display = image_limit if (image_limit > 0 or tier != 'enterprise') else ''
# Get related stats
current_audio_calls = getattr(summary, "audio_calls", 0) or 0
audio_limit = limits['limits'].get("audio_calls", 0) if limits else 0
current_image_edit_calls = getattr(summary, "image_edit_calls", 0) or 0
image_edit_limit = limits['limits'].get("image_edit_calls", 0) if limits else 0
current_video_calls = getattr(summary, "video_calls", 0) or 0
video_limit = limits['limits'].get("video_calls", 0) if limits else 0
db_track.commit()
# UNIFIED SUBSCRIPTION LOG
print(f"""
[SUBSCRIPTION] Image Generation (Character)
├─ User: {user_id}
├─ Plan: {plan_name} ({tier})
├─ Provider: wavespeed
├─ Actual Provider: wavespeed
├─ Model: ideogram-character
├─ Calls: {current_calls_before}{new_calls} / {image_limit_display}
├─ Cost: ${current_cost_before:.4f} → ${new_cost:.4f}
├─ Audio: {current_audio_calls} / {audio_limit if audio_limit > 0 else ''}
├─ Image Editing: {current_image_edit_calls} / {image_edit_limit if image_edit_limit > 0 else ''}
├─ Videos: {current_video_calls} / {video_limit if video_limit > 0 else ''}
└─ Status: ✅ Allowed & Tracked
""", flush=True)
sys.stdout.flush()
logger.info(f"[Character Image Generation] ✅ Successfully tracked usage: user {user_id} -> {new_calls} calls, ${estimated_cost:.4f}")
except Exception as track_error:
logger.error(f"[Character Image Generation] ❌ Error tracking usage (non-blocking): {track_error}", exc_info=True)
import traceback
logger.error(f"[Character Image Generation] Full traceback: {traceback.format_exc()}")
db_track.rollback()
finally:
db_track.close()
except Exception as usage_error:
logger.error(f"[Character Image Generation] ❌ Failed to track usage: {usage_error}", exc_info=True)
import traceback
logger.error(f"[Character Image Generation] Full traceback: {traceback.format_exc()}")
else:
logger.warning(f"[Character Image Generation] ⚠️ Skipping usage tracking: user_id={user_id}, image_bytes={len(image_bytes) if image_bytes else 0} bytes")
return image_bytes
except HTTPException:
raise
except Exception as api_error:
logger.error(f"[Character Image Generation] Character image generation API failed: {api_error}")
raise HTTPException(
status_code=502,
detail={
"error": "Character image generation failed",
"message": str(api_error)
}
)

View File

@@ -88,14 +88,49 @@ class YouTubeVideoRendererService:
# Clamp duration to valid WAN 2.5 values (5 or 10 seconds)
duration = 5 if duration_estimate <= 7 else 10
# Log asset usage status
has_existing_image = bool(scene.get("imageUrl"))
has_existing_audio = bool(scene.get("audioUrl"))
logger.info(
f"[YouTubeRenderer] Rendering scene {scene_number}: "
f"resolution={resolution}, duration={duration}s, prompt_length={len(visual_prompt)}"
f"resolution={resolution}, duration={duration}s, prompt_length={len(visual_prompt)}, "
f"has_existing_image={has_existing_image}, has_existing_audio={has_existing_audio}"
)
# Generate audio if requested - only if narration is not empty
# Use existing audio if available, otherwise generate if requested
audio_base64 = None
if generate_audio_enabled and narration and len(narration.strip()) > 0:
scene_audio_url = scene.get("audioUrl")
if scene_audio_url:
# Load existing audio from URL
try:
from pathlib import Path
from urllib.parse import urlparse
# Extract filename from URL (e.g., /api/youtube/audio/filename.mp3)
parsed_url = urlparse(scene_audio_url)
audio_filename = Path(parsed_url.path).name
# Load audio file
base_dir = Path(__file__).parent.parent.parent.parent
youtube_audio_dir = base_dir / "youtube_audio"
audio_path = youtube_audio_dir / audio_filename
if audio_path.exists():
with open(audio_path, "rb") as f:
audio_bytes = f.read()
audio_base64 = base64.b64encode(audio_bytes).decode('utf-8')
logger.info(f"[YouTubeRenderer] Using existing audio for scene {scene_number} from {audio_filename}")
else:
logger.warning(f"[YouTubeRenderer] Audio file not found: {audio_path}, will generate new audio")
raise FileNotFoundError(f"Audio file not found: {audio_path}")
except Exception as e:
logger.warning(f"[YouTubeRenderer] Failed to load existing audio: {e}, will generate new audio")
scene_audio_url = None # Fall back to generation
# Generate audio if not available and generation is enabled
if not audio_base64 and generate_audio_enabled and narration and len(narration.strip()) > 0:
try:
audio_result = generate_audio(
text=narration,
@@ -106,7 +141,7 @@ class YouTubeVideoRendererService:
audio_bytes = audio_result.audio_bytes if hasattr(audio_result, "audio_bytes") else audio_result
# Convert to base64 (just the base64 string, not data URI)
audio_base64 = base64.b64encode(audio_bytes).decode('utf-8')
logger.info(f"[YouTubeRenderer] Generated audio for scene {scene_number}")
logger.info(f"[YouTubeRenderer] Generated new audio for scene {scene_number}")
except Exception as e:
logger.warning(f"[YouTubeRenderer] Audio generation failed: {e}, continuing without audio")
@@ -352,6 +387,7 @@ class YouTubeVideoRendererService:
self,
scenes: List[Dict[str, Any]],
resolution: str = "720p",
image_model: str = "ideogram-v3-turbo",
) -> Dict[str, Any]:
"""
Estimate the cost of rendering a video before actually rendering it.
@@ -369,8 +405,16 @@ class YouTubeVideoRendererService:
"720p": 0.10,
"1080p": 0.15,
}
price_per_second = pricing.get(resolution, 0.10)
# Image generation pricing
image_pricing = {
"ideogram-v3-turbo": 0.10,
"qwen-image": 0.05,
}
image_cost_per_scene = image_pricing.get(image_model, 0.10)
# Filter enabled scenes
enabled_scenes = [s for s in scenes if s.get("enabled", True)]
@@ -378,7 +422,8 @@ class YouTubeVideoRendererService:
scene_costs = []
total_cost = 0.0
total_duration = 0.0
total_image_cost = len(enabled_scenes) * image_cost_per_scene
for scene in enabled_scenes:
scene_number = scene.get("scene_number", 0)
duration_estimate = scene.get("duration_estimate", 5)
@@ -396,7 +441,10 @@ class YouTubeVideoRendererService:
total_cost += scene_cost
total_duration += duration
# Add image costs to total
total_cost += total_image_cost
return {
"resolution": resolution,
"price_per_second": price_per_second,
@@ -408,5 +456,8 @@ class YouTubeVideoRendererService:
"min": round(total_cost * 0.9, 2), # 10% buffer
"max": round(total_cost * 1.1, 2), # 10% buffer
},
"image_model": image_model,
"image_cost_per_scene": image_cost_per_scene,
"total_image_cost": round(total_image_cost, 2),
}

View File

@@ -140,61 +140,87 @@ class YouTubeSceneBuilderService:
scene_duration_range = duration_metadata.get("scene_duration_range", (5, 15))
scene_generation_prompt = f"""You are an expert video scriptwriter. Create detailed scenes for a YouTube video based on this plan.
scene_generation_prompt = f"""You are a top YouTube scriptwriter specializing in engaging, viral content. Create compelling scenes that captivate viewers and maximize watch time.
**Video Plan:**
- Summary: {video_plan.get('video_summary', '')}
- Goal: {video_plan.get('video_goal', '')}
- Key Message: {video_plan.get('key_message', '')}
- Visual Style: {visual_style}
- Tone: {tone}
**VIDEO PLAN:**
📝 Summary: {video_plan.get('video_summary', '')}
🎯 Goal: {video_plan.get('video_goal', '')}
💡 Key Message: {video_plan.get('key_message', '')}
🎨 Visual Style: {visual_style}
🎭 Tone: {tone}
**Hook Strategy:**
**🎣 HOOK STRATEGY:**
{hook_strategy}
**Content Outline:**
{chr(10).join([f"- {section.get('section', '')}: {section.get('description', '')} ({section.get('duration_estimate', 0)}s)" for section in content_outline])}
**📋 CONTENT STRUCTURE:**
{chr(10).join([f" {section.get('section', '')}: {section.get('description', '')} ({section.get('duration_estimate', 0)}s)" for section in content_outline])}
**Call-to-Action:**
**🚀 CALL-TO-ACTION:**
{call_to_action}
**Duration Constraints:**
- Scene duration: {scene_duration_range[0]}-{scene_duration_range[1]} seconds each
- Total target: {duration_metadata.get('target_seconds', 150)} seconds
**⏱️ TIMING CONSTRAINTS:**
Scene duration: {scene_duration_range[0]}-{scene_duration_range[1]} seconds each
Total target: {duration_metadata.get('target_seconds', 150)} seconds
**Your Task:**
Create detailed scenes that include:
1. Scene number and title
2. Narration text (what will be spoken)
3. Visual description (what viewers will see)
4. Duration estimate
5. Emphasis tags (hook, main_content, transition, cta)
**🎬 YOUR MISSION - CREATE VIRAL-WORTHY SCENES:**
**Format as JSON array:**
Write narration that:
✨ **HOOKS IMMEDIATELY** - First {duration_metadata.get('hook_seconds', 10)}s must GRAB attention
🎭 **TELLS A STORY** - Each scene advances the narrative with emotional engagement
💡 **DELIVERS VALUE** - Provide insights, tips, or "aha!" moments in every scene
🔥 **BUILDS EXCITEMENT** - Use power words, questions, and cliffhangers
👥 **CONNECTS PERSONALLY** - Speak directly to the viewer's needs and desires
⚡ **MAINTAINS PACE** - Vary sentence length for natural rhythm
🎯 **DRIVES ACTION** - Build toward the CTA with increasing urgency
**REQUIRED SCENE ELEMENTS:**
1. **scene_number**: Sequential numbering
2. **title**: Catchy, descriptive title (5-8 words max)
3. **narration**: ENGAGING spoken script with:
- Conversational language ("you know what I mean?")
- Rhetorical questions ("Have you ever wondered...?")
- Power transitions ("But here's the game-changer...")
- Emotional hooks ("Imagine this...")
- Action-oriented language ("Let's dive in...")
4. **visual_description**: Cinematic, professional YouTube visuals
5. **duration_estimate**: Realistic speaking time
6. **emphasis**: hook/main_content/transition/cta
7. **visual_cues**: ["dramatic_zoom", "text_overlay", "fast_cuts"]
**🎯 YOUTUBE OPTIMIZATION RULES:**
• **Hook Power**: First 3 seconds = make them stay or lose them
• **Value Density**: Every 10 seconds must deliver new insight
• **Emotional Arc**: Build curiosity → teach → inspire → convert
• **Natural Flow**: Scenes must connect seamlessly
• **CTA Momentum**: Final scene creates irresistible urge to act
**📊 FORMAT AS JSON ARRAY:**
[
{{
"scene_number": 1,
"title": "Hook - Attention Grabber",
"narration": "The spoken text for this scene...",
"visual_description": "Detailed description of what viewers see...",
"duration_estimate": 5,
"title": "The Shocking Truth They Hide",
"narration": "You won't believe what just happened in my latest discovery! I was scrolling through the usual content when BAM - this completely changed everything I thought about [topic]. And get this - it could transform YOUR results too!",
"visual_description": "Dynamic opening shot with shocking text overlay, fast cuts of social media feeds, energetic music swell, close-up of surprised reaction",
"duration_estimate": 8,
"emphasis": "hook",
"visual_cues": ["close-up", "dynamic", "bright"]
"visual_cues": ["shocking_text", "fast_cuts", "music_swell", "reaction_shot"]
}},
...
]
Make sure:
- First scene is a strong hook ({duration_metadata.get('hook_seconds', 10)}s)
- Last scene includes the CTA ({duration_metadata.get('cta_seconds', 10)}s)
- Each scene has clear narration and visual description
- Total duration fits within {duration_metadata.get('target_seconds', 150)} seconds
- Scenes flow naturally from one to the next
"""
**🔥 SUCCESS CRITERIA:**
First scene hooks in 3 seconds
✅ Each scene delivers 1-2 key insights
✅ Narration feels like talking to a friend
Total story arc creates emotional journey
✅ CTA feels like the natural next step
✅ Scenes fit duration perfectly"""
system_prompt = (
"You are an expert video scriptwriter specializing in YouTube content. "
"Your scenes are engaging, well-paced, and optimized for viewer retention."
"You are a master YouTube scriptwriter who creates viral, engaging content that "
"keeps viewers watching until the end. You understand YouTube algorithm optimization, "
"emotional storytelling, and creating irresistible hooks that make viewers hit 'like' and 'subscribe'. "
"Your scripts are conversational, valuable, and conversion-focused."
)
response = llm_text_gen(