371 lines
14 KiB
Python
371 lines
14 KiB
Python
"""Transform Studio service for image-to-video and talking avatar generation."""
|
|
|
|
import os
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional
|
|
from dataclasses import dataclass
|
|
from fastapi import HTTPException
|
|
from loguru import logger
|
|
|
|
from .wan25_service import WAN25Service
|
|
from .infinitetalk_adapter import InfiniteTalkService
|
|
from services.llm_providers.main_video_generation import ai_video_generate
|
|
from utils.logger_utils import get_service_logger
|
|
from utils.file_storage import save_file_safely, sanitize_filename
|
|
|
|
logger = get_service_logger("image_studio.transform")
|
|
|
|
|
|
@dataclass
|
|
class TransformImageToVideoRequest:
|
|
"""Request for WAN 2.5 image-to-video."""
|
|
image_base64: str
|
|
prompt: str
|
|
audio_base64: Optional[str] = None
|
|
resolution: str = "720p" # 480p, 720p, 1080p
|
|
duration: int = 5 # 5 or 10 seconds
|
|
negative_prompt: Optional[str] = None
|
|
seed: Optional[int] = None
|
|
enable_prompt_expansion: bool = True
|
|
|
|
|
|
@dataclass
|
|
class TalkingAvatarRequest:
|
|
"""Request for InfiniteTalk talking avatar."""
|
|
image_base64: str
|
|
audio_base64: str
|
|
resolution: str = "720p" # 480p or 720p
|
|
prompt: Optional[str] = None
|
|
mask_image_base64: Optional[str] = None
|
|
seed: Optional[int] = None
|
|
|
|
|
|
class TransformStudioService:
|
|
"""Service for Transform Studio operations."""
|
|
|
|
def __init__(self):
|
|
"""Initialize Transform Studio service."""
|
|
self.wan25_service = WAN25Service()
|
|
self.infinitetalk_service = InfiniteTalkService()
|
|
|
|
# Video output directory
|
|
# __file__ is: backend/services/image_studio/transform_service.py
|
|
# We need: backend/transform_videos
|
|
base_dir = Path(__file__).parent.parent.parent.parent
|
|
self.output_dir = base_dir / "transform_videos"
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Verify directory was created
|
|
if not self.output_dir.exists():
|
|
raise RuntimeError(f"Failed to create transform_videos directory: {self.output_dir}")
|
|
|
|
logger.info(f"[Transform Studio] Initialized with output directory: {self.output_dir}")
|
|
|
|
def _save_video_file(
|
|
self,
|
|
video_bytes: bytes,
|
|
operation_type: str,
|
|
user_id: str,
|
|
) -> Dict[str, Any]:
|
|
"""Save video file to disk.
|
|
|
|
Args:
|
|
video_bytes: Video content as bytes
|
|
operation_type: Type of operation (e.g., "image-to-video", "talking-avatar")
|
|
user_id: User ID for directory organization
|
|
|
|
Returns:
|
|
Dictionary with filename, file_path, and file_url
|
|
"""
|
|
# Create user-specific directory
|
|
user_dir = self.output_dir / user_id
|
|
user_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Generate filename
|
|
filename = f"{operation_type}_{uuid.uuid4().hex[:8]}.mp4"
|
|
filename = sanitize_filename(filename)
|
|
|
|
# Save file
|
|
file_path, error = save_file_safely(
|
|
content=video_bytes,
|
|
directory=user_dir,
|
|
filename=filename,
|
|
max_file_size=500 * 1024 * 1024 # 500MB max for videos
|
|
)
|
|
|
|
if error:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to save video file: {error}"
|
|
)
|
|
|
|
file_url = f"/api/image-studio/videos/{user_id}/{filename}"
|
|
|
|
return {
|
|
"filename": filename,
|
|
"file_path": str(file_path),
|
|
"file_url": file_url,
|
|
"file_size": len(video_bytes),
|
|
}
|
|
|
|
async def transform_image_to_video(
|
|
self,
|
|
request: TransformImageToVideoRequest,
|
|
user_id: str,
|
|
) -> Dict[str, Any]:
|
|
"""Transform image to video using unified video generation entry point.
|
|
|
|
Args:
|
|
request: Transform request
|
|
user_id: User ID for tracking and file organization
|
|
|
|
Returns:
|
|
Dictionary with video URL, metadata, and cost
|
|
"""
|
|
logger.info(
|
|
f"[Transform Studio] Image-to-video request from user {user_id}: "
|
|
f"resolution={request.resolution}, duration={request.duration}s"
|
|
)
|
|
|
|
# Use unified video generation entry point
|
|
# This handles pre-flight validation, generation, and usage tracking
|
|
# Returns dict with video_bytes and full metadata
|
|
result = ai_video_generate(
|
|
image_base64=request.image_base64,
|
|
prompt=request.prompt,
|
|
operation_type="image-to-video",
|
|
provider="wavespeed",
|
|
user_id=user_id,
|
|
duration=request.duration,
|
|
resolution=request.resolution,
|
|
negative_prompt=request.negative_prompt,
|
|
seed=request.seed,
|
|
audio_base64=request.audio_base64,
|
|
enable_prompt_expansion=request.enable_prompt_expansion,
|
|
model="alibaba/wan-2.5/image-to-video",
|
|
)
|
|
|
|
# Extract video bytes and metadata from result
|
|
video_bytes = result["video_bytes"]
|
|
|
|
# Save video to disk
|
|
save_result = self._save_video_file(
|
|
video_bytes=video_bytes,
|
|
operation_type="image-to-video",
|
|
user_id=user_id,
|
|
)
|
|
|
|
# Save to asset library
|
|
try:
|
|
from services.database import get_db
|
|
from utils.asset_tracker import save_asset_to_library
|
|
|
|
db = next(get_db())
|
|
try:
|
|
save_asset_to_library(
|
|
db=db,
|
|
user_id=user_id,
|
|
asset_type="video",
|
|
source_module="image_studio",
|
|
filename=save_result["filename"],
|
|
file_url=save_result["file_url"],
|
|
file_path=save_result["file_path"],
|
|
file_size=save_result["file_size"],
|
|
mime_type="video/mp4",
|
|
title=f"Transform: Image-to-Video ({request.resolution})",
|
|
description=f"Generated video using WAN 2.5: {request.prompt[:100]}",
|
|
prompt=result.get("prompt", request.prompt),
|
|
tags=["image_studio", "transform", "video", "image-to-video", request.resolution],
|
|
provider=result.get("provider", "wavespeed"),
|
|
model=result.get("model_name", "alibaba/wan-2.5/image-to-video"),
|
|
cost=result.get("cost", 0.0),
|
|
asset_metadata={
|
|
"resolution": request.resolution,
|
|
"duration": result.get("duration", float(request.duration)),
|
|
"operation": "image-to-video",
|
|
"width": result.get("width", 1280),
|
|
"height": result.get("height", 720),
|
|
}
|
|
)
|
|
logger.info(f"[Transform Studio] Video saved to asset library")
|
|
finally:
|
|
db.close()
|
|
except Exception as e:
|
|
logger.warning(f"[Transform Studio] Failed to save to asset library: {e}")
|
|
|
|
return {
|
|
"success": True,
|
|
"video_url": save_result["file_url"],
|
|
"video_base64": None, # Don't include base64 for large videos
|
|
"duration": result.get("duration", float(request.duration)),
|
|
"resolution": result.get("resolution", request.resolution),
|
|
"width": result.get("width", 1280),
|
|
"height": result.get("height", 720),
|
|
"file_size": save_result["file_size"],
|
|
"cost": result.get("cost", 0.0),
|
|
"provider": result.get("provider", "wavespeed"),
|
|
"model": result.get("model_name", "alibaba/wan-2.5/image-to-video"),
|
|
"metadata": result.get("metadata", {}),
|
|
}
|
|
|
|
async def create_talking_avatar(
|
|
self,
|
|
request: TalkingAvatarRequest,
|
|
user_id: str,
|
|
) -> Dict[str, Any]:
|
|
"""Create talking avatar using InfiniteTalk.
|
|
|
|
Args:
|
|
request: Talking avatar request
|
|
user_id: User ID for tracking and file organization
|
|
|
|
Returns:
|
|
Dictionary with video URL, metadata, and cost
|
|
"""
|
|
logger.info(
|
|
f"[Transform Studio] Talking avatar request from user {user_id}: "
|
|
f"resolution={request.resolution}"
|
|
)
|
|
|
|
# Generate video using InfiniteTalk
|
|
result = await self.infinitetalk_service.create_talking_avatar(
|
|
image_base64=request.image_base64,
|
|
audio_base64=request.audio_base64,
|
|
resolution=request.resolution,
|
|
prompt=request.prompt,
|
|
mask_image_base64=request.mask_image_base64,
|
|
seed=request.seed,
|
|
user_id=user_id,
|
|
)
|
|
|
|
# Save video to disk
|
|
save_result = self._save_video_file(
|
|
video_bytes=result["video_bytes"],
|
|
operation_type="talking-avatar",
|
|
user_id=user_id,
|
|
)
|
|
|
|
# Track usage
|
|
try:
|
|
usage_info = track_video_usage(
|
|
user_id=user_id,
|
|
provider=result["provider"],
|
|
model_name=result["model_name"],
|
|
prompt=result.get("prompt", ""),
|
|
video_bytes=result["video_bytes"],
|
|
cost_override=result["cost"],
|
|
)
|
|
logger.info(
|
|
f"[Transform Studio] Usage tracked: {usage_info.get('current_calls', 0)} / "
|
|
f"{usage_info.get('video_limit_display', '∞')} videos, "
|
|
f"cost=${result['cost']:.2f}"
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"[Transform Studio] Failed to track usage: {e}")
|
|
|
|
# Save to asset library
|
|
try:
|
|
from services.database import get_db
|
|
from utils.asset_tracker import save_asset_to_library
|
|
|
|
db = next(get_db())
|
|
try:
|
|
save_asset_to_library(
|
|
db=db,
|
|
user_id=user_id,
|
|
asset_type="video",
|
|
source_module="image_studio",
|
|
filename=save_result["filename"],
|
|
file_url=save_result["file_url"],
|
|
file_path=save_result["file_path"],
|
|
file_size=save_result["file_size"],
|
|
mime_type="video/mp4",
|
|
title=f"Transform: Talking Avatar ({request.resolution})",
|
|
description="Generated talking avatar video using InfiniteTalk",
|
|
prompt=result.get("prompt", ""),
|
|
tags=["image_studio", "transform", "video", "talking-avatar", request.resolution],
|
|
provider=result["provider"],
|
|
model=result["model_name"],
|
|
cost=result["cost"],
|
|
asset_metadata={
|
|
"resolution": request.resolution,
|
|
"duration": result.get("duration", 5.0),
|
|
"operation": "talking-avatar",
|
|
"width": result.get("width", 1280),
|
|
"height": result.get("height", 720),
|
|
}
|
|
)
|
|
logger.info(f"[Transform Studio] Video saved to asset library")
|
|
finally:
|
|
db.close()
|
|
except Exception as e:
|
|
logger.warning(f"[Transform Studio] Failed to save to asset library: {e}")
|
|
|
|
return {
|
|
"success": True,
|
|
"video_url": save_result["file_url"],
|
|
"video_base64": None, # Don't include base64 for large videos
|
|
"duration": result.get("duration", 5.0),
|
|
"resolution": result.get("resolution", request.resolution),
|
|
"width": result.get("width", 1280),
|
|
"height": result.get("height", 720),
|
|
"file_size": save_result["file_size"],
|
|
"cost": result["cost"],
|
|
"provider": result["provider"],
|
|
"model": result["model_name"],
|
|
"metadata": result.get("metadata", {}),
|
|
}
|
|
|
|
def estimate_cost(
|
|
self,
|
|
operation: str,
|
|
resolution: str,
|
|
duration: Optional[int] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Estimate cost for transform operation.
|
|
|
|
Args:
|
|
operation: Operation type ("image-to-video" or "talking-avatar")
|
|
resolution: Output resolution
|
|
duration: Video duration in seconds (for image-to-video)
|
|
|
|
Returns:
|
|
Cost estimation details
|
|
"""
|
|
if operation == "image-to-video":
|
|
if duration is None:
|
|
duration = 5
|
|
cost = self.wan25_service.calculate_cost(resolution, duration)
|
|
return {
|
|
"estimated_cost": cost,
|
|
"breakdown": {
|
|
"base_cost": 0.0,
|
|
"per_second": self.wan25_service.calculate_cost(resolution, 1),
|
|
"duration": duration,
|
|
"total": cost,
|
|
},
|
|
"currency": "USD",
|
|
"provider": "wavespeed",
|
|
"model": "alibaba/wan-2.5/image-to-video",
|
|
}
|
|
elif operation == "talking-avatar":
|
|
# InfiniteTalk minimum is 5 seconds
|
|
estimated_duration = duration or 5.0
|
|
cost = self.infinitetalk_service.calculate_cost(resolution, estimated_duration)
|
|
return {
|
|
"estimated_cost": cost,
|
|
"breakdown": {
|
|
"base_cost": 0.0,
|
|
"per_second": self.infinitetalk_service.calculate_cost(resolution, 1.0),
|
|
"duration": estimated_duration,
|
|
"total": cost,
|
|
},
|
|
"currency": "USD",
|
|
"provider": "wavespeed",
|
|
"model": "wavespeed-ai/infinitetalk",
|
|
}
|
|
else:
|
|
raise ValueError(f"Unknown operation: {operation}")
|
|
|