Added YouTube Creator scene building flow documentation

This commit is contained in:
ajaysi
2025-12-21 17:15:23 +05:30
parent 1d745c9bc8
commit 59913bffa9
51 changed files with 7478 additions and 631 deletions

View File

@@ -69,7 +69,8 @@ async def get_assets(
):
"""Get user's content assets with optional filtering."""
try:
user_id = current_user.get("user_id") or current_user.get("id")
# Auth middleware returns 'id' as the primary key
user_id = current_user.get("id") or current_user.get("user_id") or current_user.get("clerk_user_id")
if not user_id:
raise HTTPException(status_code=401, detail="User ID not found")

View File

@@ -0,0 +1,11 @@
"""
YouTube Creator handler package.
Contains endpoints for avatar upload/optimization and scene image generation.
"""
# Explicitly define __all__ for clarity
__all__ = []
"""YouTube Creator handlers package."""

View File

@@ -0,0 +1,557 @@
"""YouTube Creator avatar upload and AI optimization handlers."""
from pathlib import Path
import uuid
from typing import Dict, Any, Optional
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from middleware.auth_middleware import get_current_user
from services.database import get_db
from services.llm_providers.main_image_generation import generate_image
from services.llm_providers.main_image_editing import edit_image
from utils.asset_tracker import save_asset_to_library
from utils.logger_utils import get_service_logger
router = APIRouter(prefix="/avatar", tags=["youtube-avatar"])
logger = get_service_logger("api.youtube.avatar")
# Directories
base_dir = Path(__file__).parent.parent.parent.parent
YOUTUBE_AVATARS_DIR = base_dir / "youtube_avatars"
YOUTUBE_AVATARS_DIR.mkdir(parents=True, exist_ok=True)
def require_authenticated_user(current_user: Dict[str, Any]) -> str:
"""Extract and validate user ID from current user."""
user_id = current_user.get("id") if current_user else None
if not user_id:
raise HTTPException(status_code=401, detail="Authentication required")
return str(user_id)
def _load_youtube_image_bytes(image_url: str) -> bytes:
"""Load avatar bytes from a stored YouTube avatar URL."""
filename = image_url.split("/")[-1].split("?")[0]
image_path = YOUTUBE_AVATARS_DIR / filename
if not image_path.exists() or not image_path.is_file():
raise HTTPException(status_code=404, detail="Avatar image not found")
return image_path.read_bytes()
async def _generate_avatar_from_context(
user_id: str,
project_id: Optional[str],
audience: Optional[str] = None,
content_type: Optional[str] = None,
video_plan_json: Optional[str] = None,
brand_style: Optional[str] = None,
db: Optional[Session] = None,
) -> Dict[str, Any]:
"""
Internal function to generate avatar from context.
Can be called from route handler or directly from router.
"""
# Parse video plan if provided
plan_data = {}
avatar_recommendations = {}
if video_plan_json:
try:
import json
plan_data = json.loads(video_plan_json)
avatar_recommendations = plan_data.get("avatar_recommendations", {})
except Exception as e:
logger.warning(f"[YouTube] Failed to parse video plan JSON: {e}")
# Extract context - prioritize user inputs over plan data
# User inputs are more reliable as they represent explicit choices
# Priority: user input > plan data > defaults
plan_target_audience = audience or plan_data.get("target_audience", "")
plan_video_type = content_type or plan_data.get("video_type", "")
# Use user's brand_style if provided, otherwise use plan's visual_style
plan_visual_style = brand_style or plan_data.get("visual_style", "")
plan_tone = plan_data.get("tone", "")
logger.info(
f"[YouTube] Avatar generation context: "
f"video_type={plan_video_type}, audience={plan_target_audience[:50] if plan_target_audience else 'none'}, "
f"brand_style={plan_visual_style[:50] if plan_visual_style else 'none'}"
)
# Build optimized prompt using plan data
prompt_parts = []
# Base avatar description - use recommendations if available
if avatar_recommendations and avatar_recommendations.get("description"):
prompt_parts.append(avatar_recommendations["description"])
else:
prompt_parts.append("Half-length portrait of a professional YouTube creator (25-35 years old)")
# Video type optimization
if plan_video_type:
video_type_lower = plan_video_type.lower()
if video_type_lower == "tutorial":
prompt_parts.append("approachable instructor, professional yet friendly, clear presentation style")
elif video_type_lower == "review":
prompt_parts.append("trustworthy reviewer, confident, credible appearance")
elif video_type_lower == "educational":
prompt_parts.append("knowledgeable educator, professional, warm and engaging")
elif video_type_lower == "entertainment":
prompt_parts.append("energetic creator, expressive, fun and relatable")
elif video_type_lower == "vlog":
prompt_parts.append("authentic person, approachable, real and relatable")
elif video_type_lower == "product_demo":
prompt_parts.append("professional presenter, polished, confident and enthusiastic")
elif video_type_lower == "reaction":
prompt_parts.append("expressive creator, authentic reactions, engaging")
elif video_type_lower == "storytelling":
prompt_parts.append("storyteller, warm, engaging narrator")
elif "tech" in video_type_lower:
prompt_parts.append("tech-forward style")
elif "travel" in video_type_lower:
prompt_parts.append("travel vlogger aesthetic")
elif "education" in video_type_lower or "learn" in video_type_lower:
prompt_parts.append("educational creator, clean and clear presentation")
else:
prompt_parts.append("modern creator style")
elif content_type:
content_lower = content_type.lower()
if "tech" in content_lower:
prompt_parts.append("tech-forward style")
elif "travel" in content_lower:
prompt_parts.append("travel vlogger aesthetic")
elif "education" in content_lower or "learn" in content_lower:
prompt_parts.append("educational creator, clean and clear presentation")
else:
prompt_parts.append("modern creator style")
# Audience optimization
target_audience = plan_target_audience or audience
if target_audience:
audience_lower = target_audience.lower()
if "young" in audience_lower or "gen z" in audience_lower or "millennial" in audience_lower:
prompt_parts.append("youthful, vibrant, modern vibe")
elif "executive" in audience_lower or "professional" in audience_lower or "business" in audience_lower:
prompt_parts.append("polished, credible, authoritative presence")
elif "creative" in audience_lower:
prompt_parts.append("artistic, expressive, creative professional")
elif "parents" in audience_lower or "family" in audience_lower:
prompt_parts.append("warm, approachable, trustworthy presence")
# Visual style from plan
if plan_visual_style:
visual_lower = plan_visual_style.lower()
if "minimal" in visual_lower or "minimalist" in visual_lower:
prompt_parts.append("clean, minimalist aesthetic")
if "tech" in visual_lower or "modern" in visual_lower:
prompt_parts.append("tech-forward, modern style")
if "energetic" in visual_lower or "colorful" in visual_lower or "vibrant" in visual_lower:
prompt_parts.append("vibrant, energetic appearance")
if "cinematic" in visual_lower:
prompt_parts.append("cinematic, polished presentation")
if "professional" in visual_lower:
prompt_parts.append("professional, polished aesthetic")
# Tone from plan
if plan_tone:
tone_lower = plan_tone.lower()
if "casual" in tone_lower:
prompt_parts.append("casual, approachable style")
if "professional" in tone_lower:
prompt_parts.append("professional attire and presentation")
if "energetic" in tone_lower or "fun" in tone_lower:
prompt_parts.append("energetic, lively expression")
if "warm" in tone_lower:
prompt_parts.append("warm, friendly expression")
# Avatar recommendations from plan
if avatar_recommendations:
if avatar_recommendations.get("style"):
prompt_parts.append(avatar_recommendations["style"])
if avatar_recommendations.get("energy"):
prompt_parts.append(avatar_recommendations["energy"])
# Base technical requirements
prompt_parts.extend([
"photo-realistic, professional photography",
"confident, engaging expression",
"professional studio lighting, clean background",
"suitable for video generation and thumbnails",
"ultra realistic, 4k quality, 85mm lens",
"looking at camera, center-focused composition"
])
prompt = ", ".join(prompt_parts)
seed = int(uuid.uuid4().int % (2**32))
image_options = {
"provider": "wavespeed",
"model": "ideogram-v3-turbo",
"width": 1024,
"height": 1024,
"seed": seed,
}
result = generate_image(
prompt=prompt,
options=image_options,
user_id=user_id,
)
unique_id = str(uuid.uuid4())[:8]
avatar_filename = f"yt_generated_{project_id or 'temp'}_{unique_id}.png"
avatar_path = YOUTUBE_AVATARS_DIR / avatar_filename
with open(avatar_path, "wb") as f:
f.write(result.image_bytes)
avatar_url = f"/api/youtube/images/avatars/{avatar_filename}"
logger.info(f"[YouTube] Generated creator avatar: {avatar_path}")
if project_id and db:
try:
save_asset_to_library(
db=db,
user_id=user_id,
asset_type="image",
source_module="youtube_creator",
filename=avatar_filename,
file_url=avatar_url,
file_path=str(avatar_path),
file_size=len(result.image_bytes),
mime_type="image/png",
title=f"YouTube Creator Avatar (Generated) - {project_id}",
description="AI-generated YouTube creator avatar",
prompt=prompt,
tags=["youtube", "avatar", "generated", project_id],
provider=result.provider,
model=result.model,
asset_metadata={
"project_id": project_id,
"type": "generated_presenter",
"status": "completed",
},
)
except Exception as e:
logger.warning(f"[YouTube] Failed to save generated avatar asset: {e}")
return {
"avatar_url": avatar_url,
"avatar_filename": avatar_filename,
"avatar_prompt": prompt,
"message": "Avatar generated successfully",
}
@router.post("/upload")
async def upload_youtube_avatar(
file: UploadFile = File(...),
project_id: Optional[str] = Form(None),
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Upload a YouTube creator avatar image."""
user_id = require_authenticated_user(current_user)
if not file:
raise HTTPException(status_code=400, detail="No file uploaded")
file_content = await file.read()
# Validate size (max 5MB)
if len(file_content) > 5 * 1024 * 1024:
raise HTTPException(status_code=400, detail="Image file size must be less than 5MB")
try:
file_ext = Path(file.filename).suffix or ".png"
unique_id = str(uuid.uuid4())[:8]
avatar_filename = f"yt_avatar_{project_id or 'temp'}_{unique_id}{file_ext}"
avatar_path = YOUTUBE_AVATARS_DIR / avatar_filename
with open(avatar_path, "wb") as f:
f.write(file_content)
avatar_url = f"/api/youtube/images/avatars/{avatar_filename}"
logger.info(f"[YouTube] Avatar uploaded: {avatar_path}")
if project_id:
try:
save_asset_to_library(
db=db,
user_id=user_id,
asset_type="image",
source_module="youtube_creator",
filename=avatar_filename,
file_url=avatar_url,
file_path=str(avatar_path),
file_size=len(file_content),
mime_type=file.content_type or "image/png",
title=f"YouTube Creator Avatar - {project_id}",
description="YouTube creator avatar image",
tags=["youtube", "avatar", project_id],
asset_metadata={
"project_id": project_id,
"type": "creator_avatar",
"status": "completed",
},
)
except Exception as e:
logger.warning(f"[YouTube] Failed to save avatar asset: {e}")
return {
"avatar_url": avatar_url,
"avatar_filename": avatar_filename,
"message": "Avatar uploaded successfully",
}
except Exception as exc:
logger.error(f"[YouTube] Avatar upload failed: {exc}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Avatar upload failed: {str(exc)}")
@router.post("/make-presentable")
async def make_avatar_presentable(
avatar_url: str = Form(...),
project_id: Optional[str] = Form(None),
video_type: Optional[str] = Form(None),
target_audience: Optional[str] = Form(None),
video_goal: Optional[str] = Form(None),
brand_style: Optional[str] = Form(None),
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
Transform an uploaded avatar image into a YouTube-appropriate creator.
Uses AI image editing with enhanced prompts to optimize the uploaded photo.
"""
user_id = require_authenticated_user(current_user)
try:
avatar_bytes = _load_youtube_image_bytes(avatar_url)
logger.info(f"[YouTube] 🔍 Starting avatar transformation for user_id={user_id}, project={project_id}")
logger.info(f"[YouTube] Transforming avatar for project {project_id}")
# Build context-aware transformation prompt using user inputs
prompt_parts = [
"Transform this photo into a professional YouTube creator avatar:",
"Significantly enhance and optimize the image for YouTube video production;",
"Apply professional photo editing: improve lighting, color grading, and composition;",
"Enhance facial features: brighten eyes, smooth skin, add professional makeup if needed;",
"Improve background: replace with clean, professional studio background or subtle gradient;",
"Adjust clothing: ensure professional, YouTube-appropriate attire;",
"Optimize for video: ensure the person looks natural and engaging on camera;",
"Half-length portrait format, person looking directly at camera with confident, engaging expression;",
"Professional studio lighting with soft shadows, high-quality photography;",
"Maintain the person's core appearance and identity while making significant improvements;",
"Ultra realistic, 4k quality, professional photography style;",
"Suitable for video generation, thumbnails, and YouTube channel branding."
]
# Add context from user inputs to make transformation more targeted
if video_type:
video_type_lower = video_type.lower()
if video_type_lower == "tutorial":
prompt_parts.append("Approachable instructor style, professional yet friendly appearance")
elif video_type_lower == "review":
prompt_parts.append("Trustworthy reviewer style, confident and credible appearance")
elif video_type_lower == "educational":
prompt_parts.append("Knowledgeable educator style, professional and warm appearance")
elif video_type_lower == "entertainment":
prompt_parts.append("Energetic creator style, expressive and fun appearance")
elif video_type_lower == "vlog":
prompt_parts.append("Authentic vlogger style, approachable and relatable appearance")
elif video_type_lower == "product_demo":
prompt_parts.append("Professional presenter style, polished and enthusiastic appearance")
elif video_type_lower == "reaction":
prompt_parts.append("Expressive creator style, authentic and engaging appearance")
elif video_type_lower == "storytelling":
prompt_parts.append("Storyteller style, warm and engaging narrator appearance")
if target_audience:
audience_lower = target_audience.lower()
if "young" in audience_lower or "gen z" in audience_lower or "millennial" in audience_lower:
prompt_parts.append("Modern, youthful, vibrant aesthetic")
elif "executive" in audience_lower or "professional" in audience_lower or "business" in audience_lower:
prompt_parts.append("Polished, credible, authoritative professional appearance")
elif "creative" in audience_lower:
prompt_parts.append("Artistic, expressive, creative professional style")
if brand_style:
style_lower = brand_style.lower()
if "minimal" in style_lower or "minimalist" in style_lower:
prompt_parts.append("Clean, minimalist aesthetic")
if "tech" in style_lower or "modern" in style_lower:
prompt_parts.append("Tech-forward, modern style")
if "energetic" in style_lower or "colorful" in style_lower:
prompt_parts.append("Vibrant, energetic appearance")
base_prompt = " ".join(prompt_parts)
# Optimize the prompt using WaveSpeed prompt optimizer for better results
try:
from services.wavespeed.client import WaveSpeedClient
wavespeed_client = WaveSpeedClient()
logger.info(f"[YouTube] Optimizing transformation prompt using WaveSpeed prompt optimizer")
transformation_prompt = wavespeed_client.optimize_prompt(
text=base_prompt,
mode="image",
style="realistic", # Use realistic style for photo editing
enable_sync_mode=True,
timeout=30
)
logger.info(f"[YouTube] Prompt optimized successfully (length: {len(transformation_prompt)} chars)")
except Exception as opt_error:
logger.warning(f"[YouTube] Prompt optimization failed, using base prompt: {opt_error}")
transformation_prompt = base_prompt
# Use HuggingFace for image editing (only available option)
# Note: This uses async processing with polling (~30 seconds expected)
image_options = {
"provider": "huggingface", # Explicitly use HuggingFace (only option for image editing)
"model": None, # Use default model (Qwen/Qwen-Image-Edit)
}
logger.info(f"[YouTube] Starting avatar transformation (this may take ~30 seconds due to async processing)")
result = edit_image(
input_image_bytes=avatar_bytes,
prompt=transformation_prompt,
options=image_options,
user_id=user_id,
)
logger.info(f"[YouTube] ✅ Avatar transformation completed successfully")
unique_id = str(uuid.uuid4())[:8]
transformed_filename = f"yt_presenter_{project_id or 'temp'}_{unique_id}.png"
transformed_path = YOUTUBE_AVATARS_DIR / transformed_filename
with open(transformed_path, "wb") as f:
f.write(result.image_bytes)
transformed_url = f"/api/youtube/images/avatars/{transformed_filename}"
logger.info(f"[YouTube] Transformed avatar saved to: {transformed_path}")
if project_id:
try:
save_asset_to_library(
db=db,
user_id=user_id,
asset_type="image",
source_module="youtube_creator",
filename=transformed_filename,
file_url=transformed_url,
file_path=str(transformed_path),
file_size=len(result.image_bytes),
mime_type="image/png",
title=f"YouTube Creator (Transformed) - {project_id}",
description="AI-transformed YouTube creator avatar from uploaded photo",
prompt=transformation_prompt,
tags=["youtube", "avatar", "presenter", project_id],
provider=result.provider,
model=result.model,
asset_metadata={
"project_id": project_id,
"type": "transformed_presenter",
"original_avatar_url": avatar_url,
"status": "completed",
},
)
except Exception as e:
logger.warning(f"[YouTube] Failed to save transformed avatar asset: {e}")
return {
"avatar_url": transformed_url,
"avatar_filename": transformed_filename,
"message": "Avatar transformed successfully",
}
except Exception as exc:
logger.error(f"[YouTube] Avatar transformation failed: {exc}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Avatar transformation failed: {str(exc)}")
@router.post("/generate")
async def generate_creator_avatar(
project_id: Optional[str] = Form(None),
audience: Optional[str] = Form(None),
content_type: Optional[str] = Form(None),
video_plan_json: Optional[str] = Form(None),
brand_style: Optional[str] = Form(None),
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
Auto-generate a YouTube creator avatar optimized from video plan context.
Uses video plan data (if provided) and user inputs to generate an avatar that matches
the video type, audience, tone, and brand style.
"""
user_id = require_authenticated_user(current_user)
try:
return await _generate_avatar_from_context(
user_id=user_id,
project_id=project_id,
audience=audience,
content_type=content_type,
video_plan_json=video_plan_json,
brand_style=brand_style,
db=db,
)
except Exception as exc:
logger.error(f"[YouTube] Avatar generation failed: {exc}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Avatar generation failed: {str(exc)}")
@router.post("/regenerate")
async def regenerate_creator_avatar(
video_plan_json: str = Form(...),
project_id: Optional[str] = Form(None),
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
Regenerate a YouTube creator avatar using the same video plan context.
Takes the video plan JSON and regenerates an avatar with a different seed
to provide variation while maintaining the same optimization based on plan data.
"""
user_id = require_authenticated_user(current_user)
try:
# Parse video plan to extract context
import json
plan_data = json.loads(video_plan_json)
# Extract context from plan data
audience = plan_data.get("target_audience", "")
content_type = plan_data.get("video_type", "")
brand_style = plan_data.get("visual_style", "")
logger.info(
f"[YouTube] Regenerating avatar for project {project_id}: "
f"video_type={content_type}, audience={audience[:50] if audience else 'none'}"
)
avatar_response = await _generate_avatar_from_context(
user_id=user_id,
project_id=project_id,
audience=audience,
content_type=content_type,
video_plan_json=video_plan_json,
brand_style=brand_style,
db=db,
)
# Return the avatar prompt along with the URL for the frontend
return {
"avatar_url": avatar_response.get("avatar_url"),
"avatar_filename": avatar_response.get("avatar_filename"),
"avatar_prompt": avatar_response.get("avatar_prompt"),
"message": "Avatar regenerated successfully",
}
except Exception as exc:
logger.error(f"[YouTube] Avatar regeneration failed: {exc}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Avatar regeneration failed: {str(exc)}")

View File

@@ -0,0 +1,259 @@
"""YouTube Creator scene image generation handlers."""
from pathlib import Path
from typing import Dict, Any, Optional
import uuid
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import FileResponse
from pydantic import BaseModel
from sqlalchemy.orm import Session
from middleware.auth_middleware import get_current_user
from services.database import get_db
from services.subscription import PricingService
from services.subscription.preflight_validator import validate_image_generation_operations
from services.llm_providers.main_image_generation import generate_image
from services.wavespeed.client import WaveSpeedClient
from utils.asset_tracker import save_asset_to_library
from utils.logger_utils import get_service_logger
router = APIRouter(tags=["youtube-image"])
logger = get_service_logger("api.youtube.image")
# Directories
base_dir = Path(__file__).parent.parent.parent.parent
YOUTUBE_IMAGES_DIR = base_dir / "youtube_images"
YOUTUBE_IMAGES_DIR.mkdir(parents=True, exist_ok=True)
YOUTUBE_AVATARS_DIR = base_dir / "youtube_avatars"
class YouTubeImageRequest(BaseModel):
scene_id: str
scene_title: Optional[str] = None
scene_content: Optional[str] = None
base_avatar_url: Optional[str] = None
idea: Optional[str] = None
width: Optional[int] = 1024
height: Optional[int] = 1024
custom_prompt: Optional[str] = None
style: Optional[str] = None # e.g., "Realistic", "Fiction"
rendering_speed: Optional[str] = None # e.g., "Quality", "Turbo"
aspect_ratio: Optional[str] = None # e.g., "16:9"
def require_authenticated_user(current_user: Dict[str, Any]) -> str:
"""Extract and validate user ID from current user."""
user_id = current_user.get("id") if current_user else None
if not user_id:
raise HTTPException(status_code=401, detail="Authentication required")
return str(user_id)
def _load_base_avatar_bytes(avatar_url: str) -> bytes:
"""Load base avatar bytes for character consistency."""
filename = avatar_url.split("/")[-1].split("?")[0]
avatar_path = YOUTUBE_AVATARS_DIR / filename
if not avatar_path.exists() or not avatar_path.is_file():
raise HTTPException(status_code=404, detail="Base avatar image not found")
return avatar_path.read_bytes()
def _save_scene_image(image_bytes: bytes, scene_id: str) -> Dict[str, str]:
"""Persist generated scene image and return file/url info."""
unique_id = str(uuid.uuid4())[:8]
image_filename = f"yt_scene_{scene_id}_{unique_id}.png"
image_path = YOUTUBE_IMAGES_DIR / image_filename
with open(image_path, "wb") as f:
f.write(image_bytes)
image_url = f"/api/youtube/images/scenes/{image_filename}"
return {
"image_filename": image_filename,
"image_path": str(image_path),
"image_url": image_url,
}
@router.post("/image")
async def generate_youtube_scene_image(
request: YouTubeImageRequest,
current_user: Dict[str, Any] = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Generate a YouTube scene image, with optional avatar consistency."""
user_id = require_authenticated_user(current_user)
if not request.scene_title:
raise HTTPException(status_code=400, detail="Scene title is required")
try:
# Pre-flight subscription validation
pricing_service = PricingService(db)
validate_image_generation_operations(
pricing_service=pricing_service,
user_id=user_id,
num_images=1,
)
logger.info(f"[YouTube] ✅ Pre-flight validation passed for user {user_id}")
base_avatar_bytes = None
if request.base_avatar_url:
try:
base_avatar_bytes = _load_base_avatar_bytes(request.base_avatar_url)
logger.info(f"[YouTube] Loaded base avatar for scene {request.scene_id}")
except HTTPException:
raise
except Exception as e:
logger.error(f"[YouTube] Failed to load base avatar: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail={
"error": "Failed to load base avatar",
"message": f"Could not load the base avatar image: {str(e)}",
},
)
# Build prompt
image_prompt = ""
if base_avatar_bytes:
prompt_parts = []
if request.scene_title:
prompt_parts.append(f"Scene: {request.scene_title}")
if request.scene_content:
content_preview = request.scene_content[:200].replace("\n", " ").strip()
prompt_parts.append(f"Context: {content_preview}")
if request.idea:
prompt_parts.append(f"Video idea: {request.idea[:80].strip()}")
prompt_parts.append("YouTube creator on camera, engaging and dynamic framing")
prompt_parts.append("Clean background, good lighting, thumbnail-friendly composition")
image_prompt = ", ".join(prompt_parts)
else:
prompt_parts = [
"YouTube creator scene",
"clean, modern background",
"good lighting, high contrast for thumbnail clarity",
]
if request.scene_title:
prompt_parts.append(f"Scene theme: {request.scene_title}")
if request.scene_content:
prompt_parts.append(f"Context: {request.scene_content[:120].replace(chr(10), ' ')}")
if request.idea:
prompt_parts.append(f"Topic: {request.idea[:80]}")
prompt_parts.append("video-optimized composition, 16:9 aspect ratio")
image_prompt = ", ".join(prompt_parts)
# Generate image
provider = "wavespeed"
model = "ideogram-v3-turbo"
if base_avatar_bytes:
logger.info(f"[YouTube] Using character-consistent generation for scene {request.scene_id}")
style = request.style or "Realistic"
rendering_speed = request.rendering_speed or "Quality"
aspect_ratio = request.aspect_ratio or "16:9"
width = request.width or 1024
height = request.height or 576
wavespeed_client = WaveSpeedClient()
image_bytes = wavespeed_client.generate_character_image(
prompt=image_prompt,
reference_image_bytes=base_avatar_bytes,
style=style,
aspect_ratio=aspect_ratio,
rendering_speed=rendering_speed,
timeout=None,
)
model = "ideogram-character"
else:
logger.info(f"[YouTube] Generating scene {request.scene_id} from scratch")
image_options = {
"provider": "wavespeed",
"model": "ideogram-v3-turbo",
"width": request.width or 1024,
"height": request.height or 576,
}
result = generate_image(
prompt=request.custom_prompt or image_prompt,
options=image_options,
user_id=user_id,
)
image_bytes = result.image_bytes
provider = result.provider
model = result.model
# Save image
saved = _save_scene_image(image_bytes, request.scene_id)
# Save to asset library
try:
save_asset_to_library(
db=db,
user_id=user_id,
asset_type="image",
source_module="youtube_creator",
filename=saved["image_filename"],
file_url=saved["image_url"],
file_path=saved["image_path"],
file_size=len(image_bytes),
mime_type="image/png",
title=f"YouTube Scene: {request.scene_title or request.scene_id}",
description=request.scene_content or f"Scene image for {request.scene_id}",
prompt=image_prompt,
tags=["youtube_creator", "scene", request.scene_id],
provider=provider,
model=model,
asset_metadata={
"scene_id": request.scene_id,
"scene_title": request.scene_title,
"has_base_avatar": bool(base_avatar_bytes),
"width": request.width or 1024,
"height": request.height or 576,
},
)
except Exception as e:
logger.warning(f"[YouTube] Failed to save scene image to asset library: {e}")
return {
"scene_id": request.scene_id,
"scene_title": request.scene_title,
"image_filename": saved["image_filename"],
"image_url": saved["image_url"],
"width": request.width or 1024,
"height": request.height or 576,
}
except HTTPException:
raise
except Exception as exc:
logger.error(f"[YouTube] Scene image generation failed: {exc}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Failed to generate scene image: {str(exc)}")
@router.get("/images/{category}/{filename}")
async def serve_youtube_image(
category: str,
filename: str,
current_user: Dict[str, Any] = Depends(get_current_user),
):
"""
Serve stored YouTube images (avatars or scenes).
Unified endpoint for both avatar and scene images.
"""
require_authenticated_user(current_user)
if category not in {"avatars", "scenes"}:
raise HTTPException(status_code=400, detail="Invalid image category. Must be 'avatars' or 'scenes'")
if ".." in filename or "/" in filename or "\\" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
directory = YOUTUBE_AVATARS_DIR if category == "avatars" else YOUTUBE_IMAGES_DIR
image_path = directory / filename
if not image_path.exists() or not image_path.is_file():
raise HTTPException(status_code=404, detail="Image not found")
return FileResponse(
path=str(image_path),
media_type="image/png",
filename=filename,
)

View File

@@ -23,14 +23,24 @@ from services.subscription.preflight_validator import validate_scene_animation_o
from utils.logger_utils import get_service_logger
from utils.asset_tracker import save_asset_to_library
from .task_manager import task_manager
from .handlers import avatar as avatar_handlers
from .handlers import images as image_handlers
router = APIRouter(prefix="/youtube", tags=["youtube"])
logger = get_service_logger("api.youtube")
# Video output directory
# Video output and image directories
base_dir = Path(__file__).parent.parent.parent.parent
YOUTUBE_VIDEO_DIR = base_dir / "youtube_videos"
YOUTUBE_VIDEO_DIR.mkdir(parents=True, exist_ok=True)
YOUTUBE_AVATARS_DIR = base_dir / "youtube_avatars"
YOUTUBE_AVATARS_DIR.mkdir(parents=True, exist_ok=True)
YOUTUBE_IMAGES_DIR = base_dir / "youtube_images"
YOUTUBE_IMAGES_DIR.mkdir(parents=True, exist_ok=True)
# Include sub-routers for avatar and images
router.include_router(avatar_handlers.router)
router.include_router(image_handlers.router)
# Request/Response Models
@@ -42,6 +52,23 @@ class VideoPlanRequest(BaseModel):
pattern="^(shorts|medium|long)$",
description="Video duration type: shorts (≤60s), medium (1-4min), long (4-10min)"
)
video_type: Optional[str] = Field(
None,
pattern="^(tutorial|review|educational|entertainment|vlog|product_demo|reaction|storytelling)$",
description="Video format type: tutorial, review, educational, entertainment, vlog, product_demo, reaction, storytelling"
)
target_audience: Optional[str] = Field(
None,
description="Target audience description (helps optimize tone, pace, and style)"
)
video_goal: Optional[str] = Field(
None,
description="Primary goal of the video (educate, sell, entertain, etc.)"
)
brand_style: Optional[str] = Field(
None,
description="Brand visual aesthetic and style preferences"
)
reference_image_description: Optional[str] = Field(
None,
description="Optional description of reference image for visual inspiration"
@@ -55,6 +82,14 @@ class VideoPlanRequest(BaseModel):
pattern="^(blog|story)$",
description="Type of source content: blog or story"
)
avatar_url: Optional[str] = Field(
None,
description="Optional avatar URL if user uploaded one before plan generation"
)
enable_research: Optional[bool] = Field(
True,
description="Enable Exa research to enhance plan with current information, trends, and better SEO keywords (default: True)"
)
class VideoPlanResponse(BaseModel):
@@ -158,6 +193,12 @@ async def create_video_plan(
f"duration={request.duration_type}, user={user_id}"
)
# Note: Research subscription checks are handled by ResearchService internally
# ResearchService validates limits before making API calls and raises HTTPException(429) if exceeded
# Note: Subscription checks for LLM are handled by llm_text_gen internally
# It validates limits before making API calls and raises HTTPException(429) if exceeded
# Get persona data if available
persona_data = None
try:
@@ -168,17 +209,75 @@ async def create_video_plan(
# Generate plan (optimized: for shorts, combine plan + scenes in one call)
planner = YouTubePlannerService()
plan = planner.generate_video_plan(
plan = await planner.generate_video_plan(
user_idea=request.user_idea,
duration_type=request.duration_type,
video_type=request.video_type,
target_audience=request.target_audience,
video_goal=request.video_goal,
brand_style=request.brand_style,
persona_data=persona_data,
reference_image_description=request.reference_image_description,
source_content_id=request.source_content_id,
source_content_type=request.source_content_type,
user_id=user_id,
include_scenes=(request.duration_type == "shorts"), # Optimize shorts
enable_research=getattr(request, 'enable_research', True), # Research enabled by default
)
# Auto-generate avatar if user didn't upload one
# Try to reuse existing avatar from asset library first to save on AI calls during testing
auto_avatar_url = None
if not request.avatar_url:
try:
from services.content_asset_service import ContentAssetService
from models.content_asset_models import AssetType, AssetSource
# Check for existing YouTube creator avatar in asset library
asset_service = ContentAssetService(db)
existing_avatars = asset_service.get_assets(
user_id=user_id,
asset_type=AssetType.IMAGE,
source_module=AssetSource.YOUTUBE_CREATOR,
limit=1, # Get most recent one
)
if existing_avatars and len(existing_avatars) > 0:
# Reuse the most recent avatar
existing_avatar = existing_avatars[0]
auto_avatar_url = existing_avatar.file_url
plan["auto_generated_avatar_url"] = auto_avatar_url
plan["avatar_reused"] = True # Flag to indicate avatar was reused
logger.info(
f"[YouTubeAPI] ♻️ Reusing existing avatar from asset library to save AI call: {auto_avatar_url} "
f"(asset_id: {existing_avatar.id}, created: {existing_avatar.created_at})"
)
else:
# No existing avatar found, generate new one
import uuid
import json
from .handlers.avatar import _generate_avatar_from_context
# Pass both original user inputs AND plan data for better avatar generation
logger.info(f"[YouTubeAPI] 🎨 No existing avatar found, generating new avatar...")
avatar_response = await _generate_avatar_from_context(
user_id=user_id,
project_id=f"plan_{user_id}_{uuid.uuid4().hex[:8]}",
audience=request.target_audience or plan.get("target_audience"), # Prefer user input
content_type=request.video_type, # User's video type selection
video_plan_json=json.dumps(plan),
brand_style=request.brand_style, # User's brand style preference
db=db,
)
auto_avatar_url = avatar_response.get("avatar_url")
avatar_prompt = avatar_response.get("avatar_prompt")
plan["auto_generated_avatar_url"] = auto_avatar_url
plan["avatar_prompt"] = avatar_prompt # Store the AI prompt used for generation
plan["avatar_reused"] = False # Flag to indicate avatar was newly generated
logger.info(f"[YouTubeAPI] ✅ Auto-generated new avatar based on user inputs and plan: {auto_avatar_url}")
except Exception as e:
logger.warning(f"[YouTubeAPI] Avatar generation/reuse failed (non-critical): {e}")
# Non-critical, continue without avatar
return VideoPlanResponse(
success=True,
plan=plan,
@@ -212,12 +311,17 @@ async def build_scenes(
try:
user_id = require_authenticated_user(current_user)
duration_type = request.video_plan.get('duration_type', 'medium')
has_existing_scenes = bool(request.video_plan.get("scenes")) and request.video_plan.get("_scenes_included")
logger.info(
f"[YouTubeAPI] Building scenes: duration={request.video_plan.get('duration_type')}, "
f"custom_script={bool(request.custom_script)}, user={user_id}"
f"[YouTubeAPI] Building scenes: duration={duration_type}, "
f"custom_script={bool(request.custom_script)}, "
f"has_existing_scenes={has_existing_scenes}, "
f"user={user_id}"
)
# Build scenes
# Build scenes (optimized to reuse existing scenes if available)
scene_builder = YouTubeSceneBuilderService()
scenes = scene_builder.build_scenes_from_plan(
video_plan=request.video_plan,

View File

@@ -22,6 +22,7 @@ class AssetType(enum.Enum):
class AssetSource(enum.Enum):
# Add youtube_creator to the enum
"""Source module/tool that generated the asset."""
# Core Content Generation
STORY_WRITER = "story_writer"
@@ -50,6 +51,9 @@ class AssetSource(enum.Enum):
# Podcast Maker
PODCAST_MAKER = "podcast_maker"
# YouTube Creator
YOUTUBE_CREATOR = "youtube_creator"
class ContentAsset(Base):

View File

@@ -85,6 +85,7 @@ def edit_image(
from services.subscription.preflight_validator import validate_image_editing_operations
from fastapi import HTTPException
logger.info(f"[Image Editing] 🔍 Starting pre-flight validation for user_id={user_id}")
db = next(get_db())
try:
pricing_service = PricingService(db)
@@ -93,14 +94,15 @@ def edit_image(
pricing_service=pricing_service,
user_id=user_id
)
logger.info(f"[Image Editing] ✅ Pre-flight validation passed for user_id={user_id} - proceeding with image editing")
except HTTPException as http_ex:
# Re-raise immediately - don't proceed with API call
logger.error(f"[Image Editing] ❌ Pre-flight validation failed - blocking API call")
logger.error(f"[Image Editing] ❌ Pre-flight validation failed for user_id={user_id} - blocking API call: {http_ex.detail}")
raise
finally:
db.close()
logger.info(f"[Image Editing] ✅ Pre-flight validation passed - proceeding with image editing")
else:
logger.warning(f"[Image Editing] ⚠️ No user_id provided - skipping pre-flight validation (this should not happen in production)")
# Validate input
if not input_image_bytes:

View File

@@ -9,6 +9,7 @@ from .image_generation import (
HuggingFaceImageProvider,
GeminiImageProvider,
StabilityImageProvider,
WaveSpeedImageProvider,
)
from utils.logger_utils import get_service_logger
@@ -26,6 +27,8 @@ def _select_provider(explicit: Optional[str]) -> str:
return "huggingface"
if os.getenv("STABILITY_API_KEY"):
return "stability"
if os.getenv("WAVESPEED_API_KEY"):
return "wavespeed"
# Fallback to huggingface to enable a path if configured
return "huggingface"
@@ -37,6 +40,8 @@ def _get_provider(provider_name: str):
return GeminiImageProvider()
if provider_name == "stability":
return StabilityImageProvider()
if provider_name == "wavespeed":
return WaveSpeedImageProvider()
raise ValueError(f"Unknown image provider: {provider_name}")
@@ -56,6 +61,7 @@ def generate_image(prompt: str, options: Optional[Dict[str, Any]] = None, user_i
from services.subscription.preflight_validator import validate_image_generation_operations
from fastapi import HTTPException
logger.info(f"[Image Generation] 🔍 Starting pre-flight validation for user_id={user_id}")
db = next(get_db())
try:
pricing_service = PricingService(db)
@@ -64,14 +70,15 @@ def generate_image(prompt: str, options: Optional[Dict[str, Any]] = None, user_i
pricing_service=pricing_service,
user_id=user_id
)
logger.info(f"[Image Generation] ✅ Pre-flight validation passed for user_id={user_id} - proceeding with image generation")
except HTTPException as http_ex:
# Re-raise immediately - don't proceed with API call
logger.error(f"[Image Generation] ❌ Pre-flight validation failed - blocking API call")
logger.error(f"[Image Generation] ❌ Pre-flight validation failed for user_id={user_id} - blocking API call: {http_ex.detail}")
raise
finally:
db.close()
logger.info(f"[Image Generation] ✅ Pre-flight validation passed - proceeding with image generation")
else:
logger.warning(f"[Image Generation] ⚠️ No user_id provided - skipping pre-flight validation (this should not happen in production)")
opts = options or {}
provider_name = _select_provider(opts.get("provider"))
@@ -96,6 +103,10 @@ def generate_image(prompt: str, options: Optional[Dict[str, Any]] = None, user_i
if provider_name == "huggingface" and not image_options.model:
# Provide a sensible default HF model if none specified
image_options.model = "black-forest-labs/FLUX.1-Krea-dev"
if provider_name == "wavespeed" and not image_options.model:
# Provide a sensible default WaveSpeed model if none specified
image_options.model = "ideogram-v3-turbo"
logger.info("Generating image via provider=%s model=%s", provider_name, image_options.model)
provider = _get_provider(provider_name)

View File

@@ -336,6 +336,8 @@ class StoryVideoGenerationService:
# Match duration to audio if needed
if video_clip.duration > audio_duration:
video_clip = video_clip.subclip(0, audio_duration)
# Re-attach audio after subclip (subclip loses audio)
video_clip = video_clip.with_audio(audio_clip)
elif video_clip.duration < audio_duration:
# Loop the video if it's shorter than audio
loops_needed = int(audio_duration / video_clip.duration) + 1

View File

@@ -177,7 +177,7 @@ class WaveSpeedClient:
f"[WaveSpeed] Too many polling errors ({consecutive_errors}) for {prediction_id}, "
f"status_code={status_code}. Giving up."
)
raise HTTPException(status_code=exc.status_code, detail=detail) from exc
raise HTTPException(status_code=exc.status_code, detail=detail) from exc
backoff = min(30.0, interval_seconds * (2 ** (consecutive_errors - 1)))
logger.warning(
@@ -464,16 +464,17 @@ class WaveSpeedClient:
response_json = response.json()
data = response_json.get("data") or response_json
# Check status - if "created" or "processing", we need to poll even in sync mode
status = data.get("status", "").lower()
outputs = data.get("outputs") or []
prediction_id = data.get("id")
# Handle sync mode - result should be directly in outputs
# BUT: If status is "created" or "processing" with no outputs, fall back to polling
if enable_sync_mode:
outputs = data.get("outputs") or []
if not outputs:
logger.error(f"[WaveSpeed] No outputs in sync mode response: {response.text}")
raise HTTPException(
status_code=502,
detail="WaveSpeed image generator returned no outputs",
)
# If we have outputs and status is "completed", use them directly
if outputs and status == "completed":
logger.info(f"[WaveSpeed] Got immediate results from sync mode (status: {status})")
# Extract image URL from outputs
image_url = None
if isinstance(outputs, list) and len(outputs) > 0:
@@ -504,16 +505,30 @@ class WaveSpeedClient:
detail="Failed to fetch generated image from WaveSpeed URL",
)
# Async mode - poll for result
prediction_id = data.get("id")
# Sync mode returned "created" or "processing" status - need to poll
if not prediction_id:
logger.error(f"[WaveSpeed] No prediction ID in async response: {response.text}")
logger.error(f"[WaveSpeed] Sync mode returned status '{status}' but no prediction ID: {response.text}")
raise HTTPException(
status_code=502,
detail="WaveSpeed response missing prediction id for async mode",
detail="WaveSpeed sync mode returned async response without prediction ID",
)
logger.info(
f"[WaveSpeed] Sync mode returned status '{status}' with no outputs. "
f"Falling back to polling (prediction_id: {prediction_id})"
)
# Fall through to async polling logic below
# Async mode OR sync mode that returned "created"/"processing" - poll for result
if not prediction_id:
logger.error(f"[WaveSpeed] No prediction ID in response: {response.text}")
raise HTTPException(
status_code=502,
detail="WaveSpeed response missing prediction id",
)
# Poll for result
# Poll for result (use longer timeout for image generation)
logger.info(f"[WaveSpeed] Polling for image generation result (prediction_id: {prediction_id}, status: {status})")
result = self.poll_until_complete(prediction_id, timeout_seconds=240, interval_seconds=1.0)
outputs = result.get("outputs") or []

View File

@@ -2,17 +2,95 @@
YouTube Video Planner Service
Generates video plans, outlines, and insights using AI with persona integration.
Supports optional Exa research for enhanced, data-driven plans.
"""
from typing import Dict, Any, Optional, List
from loguru import logger
from fastapi import HTTPException
import os
from services.llm_providers.main_text_generation import llm_text_gen
from utils.logger_utils import get_service_logger
logger = get_service_logger("youtube.planner")
# Video type configurations for optimization
VIDEO_TYPE_CONFIGS = {
"tutorial": {
"hook_strategy": "Problem statement or quick preview of solution",
"structure": "Problem → Steps → Result → Key Takeaways",
"visual_style": "Clean, instructional, screen-recordings or clear demonstrations",
"tone": "Clear, patient, instructional",
"optimal_scenes": "2-6 scenes showing sequential steps",
"avatar_style": "Approachable instructor, professional yet friendly",
"cta_focus": "Subscribe for more tutorials, try it yourself"
},
"review": {
"hook_strategy": "Product reveal or strong opinion statement",
"structure": "Hook → Overview → Pros/Cons → Verdict → CTA",
"visual_style": "Product-focused, close-ups, comparison shots",
"tone": "Honest, engaging, opinionated but fair",
"optimal_scenes": "4-8 scenes covering different aspects",
"avatar_style": "Trustworthy reviewer, confident, credible",
"cta_focus": "Check links in description, subscribe for reviews"
},
"educational": {
"hook_strategy": "Intriguing question or surprising fact",
"structure": "Question → Explanation → Examples → Conclusion",
"visual_style": "Illustrative, concept visualization, animations",
"tone": "Authoritative yet accessible, engaging",
"optimal_scenes": "3-10 scenes breaking down concepts",
"avatar_style": "Knowledgeable educator, professional, warm",
"cta_focus": "Learn more, subscribe for educational content"
},
"entertainment": {
"hook_strategy": "Grab attention immediately with energy/humor",
"structure": "Hook → Setup → Payoff → Share/Subscribe",
"visual_style": "Dynamic, energetic, varied angles, transitions",
"tone": "High energy, funny, engaging, personality-driven",
"optimal_scenes": "3-8 scenes with varied pacing",
"avatar_style": "Energetic creator, expressive, relatable",
"cta_focus": "Like, share, subscribe for more fun content"
},
"vlog": {
"hook_strategy": "Preview of day/event or personal moment",
"structure": "Introduction → Journey/Experience → Reflection → CTA",
"visual_style": "Natural, personal, authentic moments",
"tone": "Conversational, authentic, relatable",
"optimal_scenes": "5-15 scenes following narrative",
"avatar_style": "Authentic person, approachable, real",
"cta_focus": "Follow my journey, subscribe for daily updates"
},
"product_demo": {
"hook_strategy": "Product benefit or transformation",
"structure": "Benefit → Features → Use Cases → CTA",
"visual_style": "Product-focused, polished, commercial quality",
"tone": "Enthusiastic, persuasive, benefit-focused",
"optimal_scenes": "3-7 scenes highlighting features",
"avatar_style": "Professional presenter, polished, confident",
"cta_focus": "Get it now, learn more, special offer"
},
"reaction": {
"hook_strategy": "Preview of reaction or content being reacted to",
"structure": "Setup → Reaction → Commentary → CTA",
"visual_style": "Split-screen or picture-in-picture, expressive",
"tone": "Authentic reactions, engaging commentary",
"optimal_scenes": "4-10 scenes with reactions",
"avatar_style": "Expressive creator, authentic reactions",
"cta_focus": "Watch full video, subscribe for reactions"
},
"storytelling": {
"hook_strategy": "Intriguing opening or compelling question",
"structure": "Hook → Setup → Conflict → Resolution → CTA",
"visual_style": "Cinematic, narrative-driven, emotional",
"tone": "Engaging, immersive, story-focused",
"optimal_scenes": "6-15 scenes following narrative arc",
"avatar_style": "Storyteller, warm, engaging narrator",
"cta_focus": "Subscribe for more stories, share your thoughts"
}
}
class YouTubePlannerService:
"""Service for planning YouTube videos with AI assistance."""
@@ -21,16 +99,21 @@ class YouTubePlannerService:
"""Initialize the planner service."""
logger.info("[YouTubePlanner] Service initialized")
def generate_video_plan(
async def generate_video_plan(
self,
user_idea: str,
duration_type: str, # "shorts", "medium", "long"
video_type: Optional[str] = None, # "tutorial", "review", etc.
target_audience: Optional[str] = None,
video_goal: Optional[str] = None,
brand_style: Optional[str] = None,
persona_data: Optional[Dict[str, Any]] = None,
reference_image_description: Optional[str] = None,
source_content_id: Optional[str] = None, # For blog/story conversion
source_content_type: Optional[str] = None, # "blog", "story"
user_id: str = None,
include_scenes: bool = False, # For shorts: combine plan + scenes in one call
enable_research: bool = True, # Always enable research by default for enhanced plans
) -> Dict[str, Any]:
"""
Generate a comprehensive video plan from user input.
@@ -38,6 +121,10 @@ class YouTubePlannerService:
Args:
user_idea: User's video idea or topic
duration_type: "shorts" (≤60s), "medium" (1-4min), "long" (4-10min)
video_type: Optional video format type (tutorial, review, etc.)
target_audience: Optional target audience description
video_goal: Optional primary goal of the video
brand_style: Optional brand aesthetic preferences
persona_data: Optional persona data for tone/style
reference_image_description: Optional description of reference image
source_content_id: Optional ID of source content (blog/story)
@@ -50,9 +137,14 @@ class YouTubePlannerService:
try:
logger.info(
f"[YouTubePlanner] Generating plan: idea={user_idea[:50]}..., "
f"duration={duration_type}, user={user_id}"
f"duration={duration_type}, video_type={video_type}, user={user_id}"
)
# Get video type config
video_type_config = {}
if video_type and video_type in VIDEO_TYPE_CONFIGS:
video_type_config = VIDEO_TYPE_CONFIGS[video_type]
# Build persona context
persona_context = self._build_persona_context(persona_data)
@@ -78,43 +170,108 @@ class YouTubePlannerService:
- Use this as visual inspiration for the video
"""
# Generate smart defaults based on video type if selected
# When video_type is selected, use its config for defaults; otherwise use user inputs or generic defaults
if video_type_config:
default_tone = video_type_config.get('tone', 'Professional and engaging')
default_visual_style = video_type_config.get('visual_style', 'Professional and engaging')
default_goal = video_goal or f"Create engaging {video_type} content"
default_audience = target_audience or f"Viewers interested in {video_type} content"
else:
# No video type selected - use user inputs or generic defaults
default_tone = 'Professional and engaging'
default_visual_style = 'Professional and engaging'
default_goal = video_goal or 'Engage and inform viewers'
default_audience = target_audience or 'General YouTube audience'
# Perform Exa research if enabled (after defaults are set)
research_context = ""
research_sources = []
research_enabled = False
if enable_research:
logger.info(f"[YouTubePlanner] 🔍 Starting Exa research for plan generation (idea: {user_idea[:50]}...)")
research_enabled = True
try:
research_context, research_sources = await self._perform_exa_research(
user_idea=user_idea,
video_type=video_type,
target_audience=default_audience,
user_id=user_id
)
if research_sources:
logger.info(
f"[YouTubePlanner] ✅ Exa research completed successfully: "
f"{len(research_sources)} sources found. Research context length: {len(research_context)} chars"
)
else:
logger.warning(f"[YouTubePlanner] ⚠️ Exa research completed but no sources returned")
except HTTPException as http_ex:
# Subscription limit exceeded or other HTTP errors
error_detail = http_ex.detail
if isinstance(error_detail, dict):
error_msg = error_detail.get("message", error_detail.get("error", str(http_ex)))
else:
error_msg = str(error_detail)
logger.warning(
f"[YouTubePlanner] ⚠️ Exa research skipped due to subscription limits or error: {error_msg} "
f"(status={http_ex.status_code}). Continuing without research."
)
# Continue without research - non-critical failure
except Exception as e:
error_msg = str(e)
logger.warning(
f"[YouTubePlanner] ⚠️ Exa research failed (non-critical): {error_msg}. "
f"Continuing without research."
)
# Continue without research - non-critical failure
else:
logger.info(f"[YouTubePlanner] Exa research disabled for this plan generation")
# Generate comprehensive video plan
planning_prompt = f"""You are an expert YouTube content strategist. Create a comprehensive video plan based on the user's idea.
video_type_context = ""
if video_type_config:
video_type_context = f"""
**Video Type: {video_type}**
Follow these guidelines:
- Structure: {video_type_config.get('structure', '')}
- Hook: {video_type_config.get('hook_strategy', '')}
- Visual: {video_type_config.get('visual_style', '')}
- Tone: {video_type_config.get('tone', '')}
- CTA: {video_type_config.get('cta_focus', '')}
"""
planning_prompt = f"""Create a YouTube video plan for: "{user_idea}"
**User's Video Idea:**
{user_idea}
**Video Format:** {video_type or 'General'} | **Duration:** {duration_type} ({duration_context['target_seconds']}s target)
**Audience:** {default_audience}
**Goal:** {default_goal}
**Style:** {brand_style or default_visual_style}
**Video Duration Type:**
{duration_type} ({duration_context['description']})
{video_type_context}
**Duration Guidelines:**
- Target length: {duration_context['target_seconds']} seconds
- Hook duration: {duration_context['hook_seconds']} seconds
- Main content: {duration_context['main_seconds']} seconds
- CTA duration: {duration_context['cta_seconds']} seconds
- Maximum scenes: {duration_context['max_scenes']} (for shorts, keep 2-4 scenes total)
**Constraints:**
- Duration: {duration_context['target_seconds']}s (Hook: {duration_context['hook_seconds']}s, Main: {duration_context['main_seconds']}s, CTA: {duration_context['cta_seconds']}s)
- Max scenes: {duration_context['max_scenes']}
{persona_context}
{persona_context if persona_data else ""}
{source_context if source_content_id else ""}
{image_context if reference_image_description else ""}
{research_context if research_context else ""}
{source_context}
**Generate a plan with:**
1. **Video Summary**: 2-3 sentences capturing the essence
2. **Target Audience**: {f"Match: {target_audience}" if target_audience else f"Infer from video idea and {video_type or 'content type'}"}
3. **Video Goal**: {f"Align with: {video_goal}" if video_goal else f"Infer appropriate goal for {video_type or 'this'} content"}
4. **Key Message**: Single memorable takeaway
5. **Hook Strategy**: Engaging opening for first {duration_context['hook_seconds']}s{f" ({video_type_config.get('hook_strategy', '')})" if video_type_config else ""}
6. **Content Outline**: 3-5 sections totaling {duration_context['target_seconds']}s{f" following: {video_type_config.get('structure', '')}" if video_type_config else ""}
7. **Call-to-Action**: Actionable CTA{f" ({video_type_config.get('cta_focus', '')})" if video_type_config else ""}
8. **Visual Style**: Match {brand_style or default_visual_style}
9. **Tone**: {default_tone}
10. **SEO Keywords**: 5-7 relevant terms based on video idea
11. **Avatar Recommendations**: {f"{video_type_config.get('avatar_style', '')} " if video_type_config else ""}matching audience and style
{image_context}
**Your Task:**
Create a detailed video plan that includes:
1. **Video Summary**: A 2-3 sentence overview of what the video will cover
2. **Target Audience**: Who this video is for
3. **Video Goal**: Primary objective (educate, entertain, sell, inspire, etc.)
4. **Key Message**: The main takeaway viewers should remember
5. **Hook Strategy**: Attention-grabbing opening (first {duration_context['hook_seconds']} seconds)
6. **Content Outline**: High-level structure with 3-5 main sections
7. **Call-to-Action**: Clear CTA that fits the video goal
8. **Visual Style**: Recommended visual approach (cinematic, tutorial, vlog, etc.)
9. **Tone**: Recommended tone (professional, casual, energetic, etc.)
10. **SEO Keywords**: 5-7 relevant keywords for YouTube SEO
**Format your response as JSON:**
**Response Format (JSON):**
{{
"video_summary": "...",
"target_audience": "...",
@@ -122,22 +279,27 @@ Create a detailed video plan that includes:
"key_message": "...",
"hook_strategy": "...",
"content_outline": [
{{"section": "Section 1", "description": "...", "duration_estimate": 30}},
{{"section": "Section 2", "description": "...", "duration_estimate": 45}}
{{"section": "...", "description": "...", "duration_estimate": 30}},
{{"section": "...", "description": "...", "duration_estimate": 45}}
],
"call_to_action": "...",
"visual_style": "...",
"tone": "...",
"seo_keywords": ["keyword1", "keyword2", ...]
"seo_keywords": ["keyword1", "keyword2", ...],
"avatar_recommendations": {{
"description": "...",
"style": "...",
"energy": "..."
}}
}}
Make sure the content outline fits within the {duration_type} duration constraints.
**Critical:** Content outline durations must sum to {duration_context['target_seconds']}s (±20%).
"""
system_prompt = (
"You are an expert YouTube content strategist specializing in creating "
"engaging, well-structured video plans. Your plans are data-driven, "
"audience-focused, and optimized for YouTube's algorithm."
"You are an expert YouTube content strategist. Create clear, actionable video plans "
"that are optimized for the specified video type and audience. Focus on accuracy and "
"specificity - these plans will be used to generate actual video content."
)
# For shorts, combine plan + scenes in one call to save API calls
@@ -157,8 +319,8 @@ Create detailed scenes (up to {duration_context['max_scenes']} scenes) that incl
**Scene Format:**
Each scene should be detailed enough for video generation. Total duration must fit within {duration_context['target_seconds']} seconds.
**Update JSON structure to include "scenes" array:**
Add a "scenes" field with the complete scene breakdown.
**Update JSON structure to include "scenes" array and "avatar_recommendations":**
Add a "scenes" field with the complete scene breakdown, and include "avatar_recommendations" with ideal presenter appearance, style, and energy.
"""
json_struct = {
@@ -208,12 +370,20 @@ Add a "scenes" field with the complete scene breakdown.
"duration_estimate", "emphasis"
]
}
},
"avatar_recommendations": {
"type": "object",
"properties": {
"description": {"type": "string"},
"style": {"type": "string"},
"energy": {"type": "string"}
}
}
},
"required": [
"video_summary", "target_audience", "video_goal", "key_message",
"hook_strategy", "content_outline", "call_to_action",
"visual_style", "tone", "seo_keywords", "scenes"
"visual_style", "tone", "seo_keywords", "scenes", "avatar_recommendations"
]
}
else:
@@ -242,16 +412,26 @@ Add a "scenes" field with the complete scene breakdown.
"seo_keywords": {
"type": "array",
"items": {"type": "string"}
},
"avatar_recommendations": {
"type": "object",
"properties": {
"description": {"type": "string"},
"style": {"type": "string"},
"energy": {"type": "string"}
}
}
},
"required": [
"video_summary", "target_audience", "video_goal", "key_message",
"hook_strategy", "content_outline", "call_to_action",
"visual_style", "tone", "seo_keywords"
"visual_style", "tone", "seo_keywords", "avatar_recommendations"
]
}
# Generate plan using LLM
# Generate plan using LLM with structured JSON response
# llm_text_gen handles subscription checks and provider selection automatically
# json_struct ensures deterministic structured response (returns dict, not string)
response = llm_text_gen(
prompt=planning_prompt,
system_prompt=system_prompt,
@@ -259,34 +439,89 @@ Add a "scenes" field with the complete scene breakdown.
json_struct=json_struct
)
# Parse response (handle both dict and JSON string)
# Parse response (structured responses return dict, text responses return string)
if isinstance(response, dict):
plan_data = response
else:
import json
plan_data = json.loads(response)
try:
plan_data = json.loads(response)
except json.JSONDecodeError as e:
logger.error(f"[YouTubePlanner] Failed to parse JSON response: {e}")
logger.debug(f"[YouTubePlanner] Raw response: {response[:500]}")
raise HTTPException(
status_code=500,
detail="Failed to parse video plan response. Please try again."
)
# Validate and enhance plan quality
plan_data = self._validate_and_enhance_plan(
plan_data, duration_context, video_type, video_type_config
)
# Add metadata
plan_data["duration_type"] = duration_type
plan_data["duration_metadata"] = duration_context
plan_data["user_idea"] = user_idea
# If scenes were included, mark them for scene builder
if include_scenes and duration_type == "shorts" and "scenes" in plan_data:
plan_data["_scenes_included"] = True
logger.info(
f"[YouTubePlanner] ✅ Plan + {len(plan_data.get('scenes', []))} scenes "
f"generated in 1 AI call (optimized for shorts)"
)
# Add research metadata to plan
plan_data["research_enabled"] = research_enabled
if research_sources:
plan_data["research_sources"] = research_sources
plan_data["research_sources_count"] = len(research_sources)
else:
if include_scenes and duration_type == "shorts":
plan_data["research_sources"] = []
plan_data["research_sources_count"] = 0
# Log research status in plan metadata for debugging
if research_enabled:
logger.info(
f"[YouTubePlanner] 📊 Plan metadata: research_enabled=True, "
f"research_sources_count={plan_data.get('research_sources_count', 0)}, "
f"research_context_length={len(research_context)} chars"
)
# Validate and process scenes if included (for shorts)
if include_scenes and duration_type == "shorts":
if "scenes" in plan_data and plan_data["scenes"]:
# Validate scenes count and duration
scenes = plan_data["scenes"]
scene_count = len(scenes)
total_scene_duration = sum(
scene.get("duration_estimate", 0) for scene in scenes
)
max_scenes = duration_context["max_scenes"]
target_duration = duration_context["target_seconds"]
if scene_count > max_scenes:
logger.warning(
f"[YouTubePlanner] Scene count ({scene_count}) exceeds max ({max_scenes}). "
f"Truncating to first {max_scenes} scenes."
)
plan_data["scenes"] = scenes[:max_scenes]
# Warn if total duration is off
if abs(total_scene_duration - target_duration) > target_duration * 0.3:
logger.warning(
f"[YouTubePlanner] Total scene duration ({total_scene_duration}s) "
f"differs significantly from target ({target_duration}s)"
)
plan_data["_scenes_included"] = True
logger.info(
f"[YouTubePlanner] ✅ Plan + {len(plan_data['scenes'])} scenes "
f"generated in 1 AI call (optimized for shorts)"
)
else:
# LLM did not return scenes; downstream will regenerate
plan_data["_scenes_included"] = False
logger.warning(
"[YouTubePlanner] Shorts optimization requested but no scenes returned; "
"scene builder will generate scenes separately."
)
logger.info(f"[YouTubePlanner] ✅ Plan generated successfully")
logger.info(f"[YouTubePlanner] ✅ Plan generated successfully")
return plan_data
@@ -355,4 +590,264 @@ Add a "scenes" field with the complete scene breakdown.
}
return contexts.get(duration_type, contexts["medium"])
def _validate_and_enhance_plan(
self,
plan_data: Dict[str, Any],
duration_context: Dict[str, Any],
video_type: Optional[str],
video_type_config: Dict[str, Any],
) -> Dict[str, Any]:
"""
Validate and enhance plan quality before returning.
Performs quality checks:
- Validates required fields
- Validates content outline duration matches target
- Ensures SEO keywords are present
- Validates avatar recommendations
- Adds quality metadata
"""
# Ensure required fields exist
required_fields = [
"video_summary", "target_audience", "video_goal", "key_message",
"hook_strategy", "content_outline", "call_to_action",
"visual_style", "tone", "seo_keywords"
]
missing_fields = [field for field in required_fields if not plan_data.get(field)]
if missing_fields:
logger.warning(f"[YouTubePlanner] Missing required fields: {missing_fields}")
# Fill with defaults to prevent errors
for field in missing_fields:
if field == "seo_keywords":
plan_data[field] = []
elif field == "content_outline":
plan_data[field] = []
else:
plan_data[field] = f"[{field} not generated]"
# Validate content outline duration
if plan_data.get("content_outline"):
total_duration = sum(
section.get("duration_estimate", 0)
for section in plan_data["content_outline"]
)
target_duration = duration_context.get("target_seconds", 150)
# Allow 20% variance
tolerance = target_duration * 0.2
if abs(total_duration - target_duration) > tolerance:
logger.warning(
f"[YouTubePlanner] Content outline duration ({total_duration}s) "
f"doesn't match target ({target_duration}s). Adjusting..."
)
# Normalize durations proportionally
if total_duration > 0:
scale_factor = target_duration / total_duration
for section in plan_data["content_outline"]:
if "duration_estimate" in section:
section["duration_estimate"] = round(
section["duration_estimate"] * scale_factor, 1
)
# Validate SEO keywords
if not plan_data.get("seo_keywords") or len(plan_data["seo_keywords"]) < 3:
logger.warning(
f"[YouTubePlanner] Insufficient SEO keywords ({len(plan_data.get('seo_keywords', []))}). "
f"Plan may need enhancement."
)
# Validate avatar recommendations
if not plan_data.get("avatar_recommendations"):
logger.warning("[YouTubePlanner] Avatar recommendations missing. Generating defaults...")
plan_data["avatar_recommendations"] = {
"description": video_type_config.get("avatar_style", "Professional YouTube creator"),
"style": plan_data.get("visual_style", "Professional"),
"energy": plan_data.get("tone", "Engaging")
}
else:
# Ensure all avatar recommendation fields exist
avatar_rec = plan_data["avatar_recommendations"]
if not avatar_rec.get("description"):
avatar_rec["description"] = video_type_config.get("avatar_style", "Professional YouTube creator")
if not avatar_rec.get("style"):
avatar_rec["style"] = plan_data.get("visual_style", "Professional")
if not avatar_rec.get("energy"):
avatar_rec["energy"] = plan_data.get("tone", "Engaging")
# Add quality metadata
plan_data["_quality_checks"] = {
"content_outline_validated": bool(plan_data.get("content_outline")),
"seo_keywords_count": len(plan_data.get("seo_keywords", [])),
"avatar_recommendations_present": bool(plan_data.get("avatar_recommendations")),
"all_required_fields_present": len(missing_fields) == 0,
}
logger.info(
f"[YouTubePlanner] Plan quality validated: "
f"outline_sections={len(plan_data.get('content_outline', []))}, "
f"seo_keywords={len(plan_data.get('seo_keywords', []))}, "
f"avatar_recs={'yes' if plan_data.get('avatar_recommendations') else 'no'}"
)
return plan_data
async def _perform_exa_research(
self,
user_idea: str,
video_type: Optional[str],
target_audience: str,
user_id: str
) -> tuple[str, List[Dict[str, Any]]]:
"""
Perform Exa research directly using ExaResearchProvider (common module).
Uses the same pattern as podcast research with proper subscription checks.
Returns:
Tuple of (research_context_string, research_sources_list)
"""
try:
# Pre-flight validation for Exa search only (not full blog writer workflow)
# We only need to validate Exa API calls, not LLM operations
from services.database import get_db
from services.subscription import PricingService
from models.subscription_models import APIProvider
db = next(get_db())
try:
pricing_service = PricingService(db)
# Only validate Exa API call, not the full research workflow
operations_to_validate = [
{
'provider': APIProvider.EXA,
'tokens_requested': 0,
'actual_provider_name': 'exa',
'operation_type': 'exa_neural_search'
}
]
can_proceed, message, error_details = pricing_service.check_comprehensive_limits(
user_id=user_id,
operations=operations_to_validate
)
if not can_proceed:
usage_info = error_details.get('usage_info', {}) if error_details else {}
logger.warning(
f"[YouTubePlanner] Exa search blocked for user {user_id}: {message}"
)
raise HTTPException(
status_code=429,
detail={
'error': message,
'message': message,
'provider': 'exa',
'usage_info': usage_info if usage_info else error_details
}
)
logger.info(f"[YouTubePlanner] Exa search pre-flight validation passed for user {user_id}")
except HTTPException:
raise
except Exception as e:
logger.warning(f"[YouTubePlanner] Exa search pre-flight validation failed: {e}")
raise
finally:
db.close()
# Use ExaResearchProvider directly (common module, same as podcast)
from services.blog_writer.research.exa_provider import ExaResearchProvider
from types import SimpleNamespace
# Build research query
query_parts = [user_idea]
if video_type:
query_parts.append(f"{video_type} video")
if target_audience and target_audience != "General YouTube audience":
query_parts.append(target_audience)
research_query = " ".join(query_parts)
# Configure Exa research (same pattern as podcast)
cfg = SimpleNamespace(
exa_search_type="neural",
exa_category="web", # Focus on web content for YouTube
exa_include_domains=[],
exa_exclude_domains=[],
max_sources=10, # Limit sources for cost efficiency
source_types=[],
)
# Perform research
provider = ExaResearchProvider()
result = await provider.search(
prompt=research_query,
topic=user_idea,
industry="",
target_audience=target_audience,
config=cfg,
user_id=user_id,
)
# Track usage
cost_total = 0.0
if isinstance(result, dict):
cost_total = result.get("cost", {}).get("total", 0.005) if result.get("cost") else 0.005
provider.track_exa_usage(user_id, cost_total)
# Extract sources and content
sources = result.get("sources", []) or []
research_content = result.get("content", "")
# Build research context for prompt
research_context = ""
if research_content and sources:
# Limit content to 2000 chars to avoid token bloat
limited_content = research_content[:2000]
research_context = f"""
**Research & Current Information:**
Based on current web research, here are relevant insights and trends:
{limited_content}
**Key Research Sources ({len(sources)} sources):**
"""
# Add top 5 sources for context
for idx, source in enumerate(sources[:5], 1):
title = source.get("title", "Untitled") or "Untitled"
url = source.get("url", "") or ""
excerpt = (source.get("excerpt", "") or "")[:200]
if not excerpt:
excerpt = (source.get("summary", "") or "")[:200]
research_context += f"\n{idx}. {title}\n {excerpt}\n Source: {url}\n"
research_context += "\n**Use this research to:**\n"
research_context += "- Identify current trends and popular angles\n"
research_context += "- Enhance SEO keywords with real search data\n"
research_context += "- Ensure content is relevant and up-to-date\n"
research_context += "- Reference credible sources in the plan\n"
research_context += "- Identify gaps or unique angles not covered by competitors\n"
# Format sources for response
formatted_sources = []
for source in sources:
formatted_sources.append({
"title": source.get("title", "") or "",
"url": source.get("url", "") or "",
"excerpt": (source.get("excerpt", "") or "")[:300],
"published_at": source.get("published_at"),
"credibility_score": source.get("credibility_score", 0.85) or 0.85,
})
logger.info(f"[YouTubePlanner] Exa research completed: {len(formatted_sources)} sources found")
return research_context, formatted_sources
except HTTPException:
# Re-raise HTTPException (subscription limits, etc.)
raise
except Exception as e:
logger.error(f"[YouTubePlanner] Research error: {e}", exc_info=True)
# Non-critical failure - return empty research
return "", []

View File

@@ -32,6 +32,11 @@ class YouTubeSceneBuilderService:
"""
Build structured scenes from a video plan.
This method is optimized to minimize AI calls:
- For shorts: Reuses scenes if already generated in plan (0 AI calls)
- For medium/long: Generates scenes + batch enhances (1-3 AI calls total)
- Custom script: Parses script without AI calls (0 AI calls)
Args:
video_plan: Video plan from planner service
user_id: Clerk user ID for subscription checking
@@ -41,22 +46,38 @@ class YouTubeSceneBuilderService:
List of scene dictionaries with narration, visual prompts, timing, etc.
"""
try:
duration_type = video_plan.get('duration_type', 'medium')
logger.info(
f"[YouTubeSceneBuilder] Building scenes from plan: "
f"duration={video_plan.get('duration_type')}, "
f"sections={len(video_plan.get('content_outline', []))}"
f"duration={duration_type}, "
f"sections={len(video_plan.get('content_outline', []))}, "
f"user={user_id}"
)
duration_metadata = video_plan.get("duration_metadata", {})
max_scenes = duration_metadata.get("max_scenes", 10)
# If custom script provided, parse it into scenes
if custom_script:
# Optimization: Check if scenes already exist in plan (prevents duplicate generation)
# This can happen if plan was generated with include_scenes=True for shorts
existing_scenes = video_plan.get("scenes", [])
if existing_scenes and video_plan.get("_scenes_included"):
# Scenes already generated in plan - reuse them (0 AI calls)
logger.info(
f"[YouTubeSceneBuilder] ♻️ Reusing {len(existing_scenes)} scenes from plan "
f"(duration={duration_type}) - skipping generation to save AI calls"
)
scenes = self._normalize_scenes_from_plan(video_plan, duration_metadata)
# If custom script provided, parse it into scenes (0 AI calls for parsing)
elif custom_script:
logger.info(
f"[YouTubeSceneBuilder] Parsing custom script for scene generation "
f"(0 AI calls required)"
)
scenes = self._parse_custom_script(
custom_script, video_plan, duration_metadata, user_id
)
# For shorts, check if scenes were already generated in plan (optimization)
elif video_plan.get("_scenes_included") and video_plan.get("duration_type") == "shorts":
elif video_plan.get("_scenes_included") and duration_type == "shorts":
prebuilt = video_plan.get("scenes") or []
if prebuilt:
logger.info(