- Fix voice clone preview saved as .wav regardless of actual format (MP3/WebM content from WaveSpeed was saved with .wav extension causing NotSupportedError) - Add detect_audio_format() and ensure_audio_extension() to media_utils - Fix assets_serving.py: use storage_paths for root resolution, add proper MIME types to FileResponse, add auth via query token for <audio> elements - Fix assets_serving.py: add path traversal security check - Fix step4_asset_routes.py: use get_user_workspace() instead of WORKSPACE_DIR, detect actual audio format before saving preview - Fix get_db() in database.py: raise HTTPException(401) instead of raw Exception, catch engine creation failures with HTTPException(503) - Fix avatar.py: add auth error handling, diagnostic logging for path resolution, graceful DB save degradation
243 lines
9.2 KiB
Python
243 lines
9.2 KiB
Python
"""
|
|
Media Utility Functions
|
|
|
|
Centralized helper functions for loading and managing media assets across modules.
|
|
Promotes reuse between Podcast, YouTube, and other media-heavy modules.
|
|
|
|
DEPRECATED: The global DATA_MEDIA_DIR paths below are legacy and will be removed.
|
|
New code should use workspace-scoped paths via utils.storage_paths or module-specific
|
|
resolvers (e.g., api.podcast.constants.get_podcast_media_dir).
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Optional, List
|
|
from urllib.parse import urlparse
|
|
|
|
from services.database import WORKSPACE_DIR
|
|
from utils.storage_paths import get_repo_root
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Base Directories — use get_repo_root() for consistent resolution
|
|
ROOT_DIR = get_repo_root()
|
|
|
|
# DEPRECATED: Global data/media paths — kept for backward-compat read fallback only.
|
|
# New writes must go to workspace-scoped paths. Do NOT add new consumers.
|
|
DATA_MEDIA_DIR = ROOT_DIR / "data" / "media"
|
|
|
|
# Module-specific directories (DEPRECATED — use workspace-scoped resolvers instead)
|
|
YOUTUBE_AVATARS_DIR = DATA_MEDIA_DIR / "youtube_avatars"
|
|
YOUTUBE_IMAGES_DIR = DATA_MEDIA_DIR / "youtube_images"
|
|
PODCAST_IMAGES_DIR = DATA_MEDIA_DIR / "podcast_images"
|
|
PODCAST_AVATARS_DIR = PODCAST_IMAGES_DIR / "avatars"
|
|
|
|
def ensure_media_dirs() -> None:
|
|
"""Create shared media directories at runtime."""
|
|
for directory in [YOUTUBE_AVATARS_DIR, YOUTUBE_IMAGES_DIR, PODCAST_IMAGES_DIR, PODCAST_AVATARS_DIR]:
|
|
directory.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def resolve_media_path(media_url_or_path: str, user_id: Optional[str] = None) -> Optional[Path]:
|
|
"""
|
|
Resolve a media URL or filename to a concrete file path on disk.
|
|
|
|
Handles cross-module lookups (e.g. checking podcast avatars if not found in youtube).
|
|
|
|
Args:
|
|
media_url_or_path: URL path (e.g. /api/youtube/avatars/foo.png) or filename
|
|
user_id: Optional user ID for tenant-scoped resolution (recommended)
|
|
|
|
Returns:
|
|
Path object if found, None otherwise
|
|
"""
|
|
if not media_url_or_path:
|
|
return None
|
|
|
|
ensure_media_dirs()
|
|
|
|
try:
|
|
# Extract filename from URL/path
|
|
if "/" in media_url_or_path or "\\" in media_url_or_path:
|
|
# It's a URL or path
|
|
parsed = urlparse(media_url_or_path)
|
|
path = parsed.path if parsed.scheme else media_url_or_path
|
|
filename = path.split("/")[-1].split("?")[0]
|
|
else:
|
|
# It's just a filename
|
|
filename = media_url_or_path.split("?")[0]
|
|
|
|
if not filename:
|
|
return None
|
|
|
|
# Handle workspace avatar assets: /api/assets/{user_id}/avatars/{filename}
|
|
if "/api/assets/" in media_url_or_path and "/avatars/" in media_url_or_path:
|
|
try:
|
|
parsed_path = urlparse(media_url_or_path).path
|
|
parts = parsed_path.split("/")
|
|
if len(parts) >= 6:
|
|
asset_user_id = parts[3]
|
|
safe_user_id = "".join(c for c in asset_user_id if c.isalnum() or c in ("-", "_"))
|
|
if safe_user_id == asset_user_id:
|
|
safe_filename = os.path.basename(filename)
|
|
assets_path = Path(WORKSPACE_DIR) / f"workspace_{safe_user_id}" / "assets" / "avatars" / safe_filename
|
|
if assets_path.exists() and assets_path.is_file():
|
|
logger.debug(f"[MediaUtils] Resolved assets avatar {media_url_or_path} to {assets_path}")
|
|
return assets_path
|
|
except Exception as exc:
|
|
logger.error(f"[MediaUtils] Error resolving assets avatar path: {exc}")
|
|
|
|
# Define search paths in order of likelihood
|
|
# We search all avatar/image directories (DEPRECATED: global paths — kept for backward-compat reads)
|
|
search_paths: List[Path] = [
|
|
YOUTUBE_AVATARS_DIR / filename,
|
|
PODCAST_AVATARS_DIR / filename,
|
|
YOUTUBE_IMAGES_DIR / filename,
|
|
PODCAST_IMAGES_DIR / filename,
|
|
# Fallback for nested podcast images (if they exist directly in podcast_images)
|
|
PODCAST_IMAGES_DIR / "avatars" / filename
|
|
]
|
|
|
|
# Check specific module based on URL prefix if present
|
|
if "/api/youtube/" in media_url_or_path:
|
|
# Prioritize YouTube paths
|
|
pass # Already first in list
|
|
elif "/api/podcast/" in media_url_or_path:
|
|
# Prioritize Podcast paths: use centralized podcast media resolution
|
|
try:
|
|
# Import the centralized function that checks tenant workspace first
|
|
from api.podcast.constants import get_podcast_media_read_dirs
|
|
podcast_dirs = get_podcast_media_read_dirs("image", user_id=user_id)
|
|
search_paths = []
|
|
for pod_dir in podcast_dirs:
|
|
# Add both avatar and image subdirectories
|
|
search_paths.append(pod_dir / "avatars" / filename)
|
|
search_paths.append(pod_dir / filename)
|
|
except ImportError:
|
|
# Fallback if podcast constants not available
|
|
search_paths = [
|
|
PODCAST_AVATARS_DIR / filename,
|
|
PODCAST_IMAGES_DIR / filename,
|
|
]
|
|
|
|
# Iterate and find first existing file
|
|
for path in search_paths:
|
|
if path.exists() and path.is_file():
|
|
logger.debug(f"[MediaUtils] Resolved {media_url_or_path} to {path}")
|
|
return path
|
|
|
|
logger.warning(f"[MediaUtils] Could not resolve media path for: {media_url_or_path}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"[MediaUtils] Error resolving media path: {e}")
|
|
return None
|
|
|
|
|
|
def load_media_bytes(media_url_or_path: str) -> Optional[bytes]:
|
|
"""
|
|
Load media bytes from a URL or path with cross-module fallback.
|
|
|
|
Args:
|
|
media_url_or_path: URL path or filename
|
|
|
|
Returns:
|
|
File bytes if found, None otherwise
|
|
"""
|
|
path = resolve_media_path(media_url_or_path)
|
|
if path:
|
|
try:
|
|
return path.read_bytes()
|
|
except Exception as e:
|
|
logger.error(f"[MediaUtils] Error reading file {path}: {e}")
|
|
return None
|
|
return None
|
|
|
|
|
|
# Audio format magic bytes signatures
|
|
_AUDIO_SIGNATURES = [
|
|
(b"\xff\xfb", "mp3"), # MP3 (MPEG-1 Layer 3, common)
|
|
(b"\xff\xf3", "mp3"), # MP3 (MPEG-2.5 Layer 3)
|
|
(b"\xff\xf2", "mp3"), # MP3 (MPEG-2 Layer 3)
|
|
(b"\xff\xfa", "mp3"), # MP3 (MPEG-2 Layer 3 variant)
|
|
(b"ID3", "mp3"), # MP3 with ID3 tag
|
|
(b"RIFF", "wav"), # WAV (RIFF header)
|
|
(b"OggS", "ogg"), # OGG
|
|
(b"fLaC", "flac"), # FLAC
|
|
(b"\x1a\x45\xdf\xa3", "webm"), # WebM / Matroska
|
|
(b"ftyp", "m4a"), # MP4/M4A (ftyp box follows offset 4)
|
|
]
|
|
|
|
|
|
def detect_audio_format(audio_bytes: bytes) -> tuple[str, str]:
|
|
"""Detect the actual audio format from content magic bytes.
|
|
|
|
Returns:
|
|
Tuple of (format_name, mime_type).
|
|
Falls back to ('wav', 'audio/wav') if no signature matches.
|
|
"""
|
|
if not audio_bytes or len(audio_bytes) < 4:
|
|
return "wav", "audio/wav"
|
|
|
|
for signature, fmt in _AUDIO_SIGNATURES:
|
|
if signature == b"ftyp":
|
|
# M4A/MP4: 'ftyp' appears at offset 4
|
|
if len(audio_bytes) > 8 and audio_bytes[4:8] == b"ftyp":
|
|
return "m4a", "audio/mp4"
|
|
elif audio_bytes[:len(signature)] == signature:
|
|
mime_map = {
|
|
"mp3": "audio/mpeg",
|
|
"wav": "audio/wav",
|
|
"ogg": "audio/ogg",
|
|
"flac": "audio/flac",
|
|
"webm": "audio/webm",
|
|
"m4a": "audio/mp4",
|
|
}
|
|
return fmt, mime_map.get(fmt, "audio/wav")
|
|
|
|
# Check for Opus-in-OGG (Opus magic after OGG pages)
|
|
if b"OpusHead" in audio_bytes[:100]:
|
|
return "ogg", "audio/ogg"
|
|
|
|
# Check for MP4/M4A container (atoms starting with size + type)
|
|
if len(audio_bytes) > 8:
|
|
atom_type = audio_bytes[4:8]
|
|
if atom_type in (b"moov", b"mdat", b"free", b"skip"):
|
|
return "m4a", "audio/mp4"
|
|
|
|
return "wav", "audio/wav"
|
|
|
|
|
|
def ensure_audio_extension(filename: str, audio_bytes: bytes) -> str:
|
|
"""Adjust filename extension to match the actual audio format in audio_bytes.
|
|
|
|
Args:
|
|
filename: Original filename (may have wrong extension like .wav for mp3 data)
|
|
audio_bytes: The actual audio data bytes
|
|
|
|
Returns:
|
|
Filename with corrected extension based on content format.
|
|
"""
|
|
fmt, _ = detect_audio_format(audio_bytes)
|
|
ext_map = {
|
|
"mp3": ".mp3",
|
|
"wav": ".wav",
|
|
"ogg": ".ogg",
|
|
"flac": ".flac",
|
|
"webm": ".webm",
|
|
"m4a": ".m4a",
|
|
"opus": ".ogg",
|
|
}
|
|
|
|
correct_ext = ext_map.get(fmt, ".wav")
|
|
path = Path(filename)
|
|
current_ext = path.suffix.lower()
|
|
|
|
if current_ext != correct_ext:
|
|
logger.info(f"[MediaUtils] Correcting audio extension: {filename} -> {path.stem}{correct_ext} (detected format: {fmt})")
|
|
return f"{path.stem}{correct_ext}"
|
|
|
|
return filename
|