fix: voice clone preview audio not playing + avatar upload 500 + asset serving
- Fix voice clone preview saved as .wav regardless of actual format (MP3/WebM content from WaveSpeed was saved with .wav extension causing NotSupportedError) - Add detect_audio_format() and ensure_audio_extension() to media_utils - Fix assets_serving.py: use storage_paths for root resolution, add proper MIME types to FileResponse, add auth via query token for <audio> elements - Fix assets_serving.py: add path traversal security check - Fix step4_asset_routes.py: use get_user_workspace() instead of WORKSPACE_DIR, detect actual audio format before saving preview - Fix get_db() in database.py: raise HTTPException(401) instead of raw Exception, catch engine creation failures with HTTPException(503) - Fix avatar.py: add auth error handling, diagnostic logging for path resolution, graceful DB save degradation
This commit is contained in:
@@ -1,52 +1,111 @@
|
|||||||
from fastapi import APIRouter, HTTPException
|
"""
|
||||||
from fastapi.responses import FileResponse
|
Assets Serving Router
|
||||||
|
|
||||||
|
Serves user-uploaded assets (avatars, voice samples) from workspace storage.
|
||||||
|
Uses authenticated or query-token access for security.
|
||||||
|
"""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from services.database import WORKSPACE_DIR, get_user_db_path
|
from fastapi import APIRouter, Depends, HTTPException
|
||||||
|
from fastapi.responses import FileResponse
|
||||||
|
from loguru import logger
|
||||||
|
from typing import Dict, Any
|
||||||
|
|
||||||
|
from middleware.auth_middleware import get_current_user_with_query_token, get_current_user
|
||||||
|
from api.story_writer.utils.auth import require_authenticated_user
|
||||||
|
from utils.storage_paths import get_repo_root
|
||||||
|
|
||||||
router = APIRouter(prefix="/api/assets", tags=["Assets Serving"])
|
router = APIRouter(prefix="/api/assets", tags=["Assets Serving"])
|
||||||
|
|
||||||
|
# MIME type map for common audio/image formats (by file extension)
|
||||||
|
MIME_MAP = {
|
||||||
|
".wav": "audio/wav",
|
||||||
|
".mp3": "audio/mpeg",
|
||||||
|
".ogg": "audio/ogg",
|
||||||
|
".opus": "audio/opus",
|
||||||
|
".webm": "audio/webm",
|
||||||
|
".m4a": "audio/mp4",
|
||||||
|
".aac": "audio/aac",
|
||||||
|
".flac": "audio/flac",
|
||||||
|
".png": "image/png",
|
||||||
|
".jpg": "image/jpeg",
|
||||||
|
".jpeg": "image/jpeg",
|
||||||
|
".gif": "image/gif",
|
||||||
|
".webp": "image/webp",
|
||||||
|
".svg": "image/svg+xml",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_asset_path(user_id: str, category: str, filename: str) -> Path:
|
||||||
|
"""Resolve an asset file path in the user's workspace.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user_id: Clerk user ID (already validated)
|
||||||
|
category: Subdirectory under assets/ (e.g. 'avatars', 'voice_samples')
|
||||||
|
filename: The file name (already sanitized)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Resolved absolute Path to the asset file.
|
||||||
|
"""
|
||||||
|
from utils.storage_paths import sanitize_user_id
|
||||||
|
|
||||||
|
safe_user_id = sanitize_user_id(user_id)
|
||||||
|
repo_root = get_repo_root()
|
||||||
|
|
||||||
|
# Primary path: workspace/workspace_{user_id}/assets/{category}/{filename}
|
||||||
|
primary = (repo_root / "workspace" / f"workspace_{safe_user_id}" / "assets" / category / filename).resolve()
|
||||||
|
|
||||||
|
# Security: ensure resolved path doesn't escape the workspace
|
||||||
|
workspace_dir = (repo_root / "workspace" / f"workspace_{safe_user_id}").resolve()
|
||||||
|
if not str(primary).startswith(str(workspace_dir)):
|
||||||
|
raise HTTPException(status_code=403, detail="Access denied")
|
||||||
|
|
||||||
|
return primary
|
||||||
|
|
||||||
|
|
||||||
|
def _get_media_type(filename: str) -> str:
|
||||||
|
"""Determine MIME type from file extension, with a default fallback."""
|
||||||
|
ext = Path(filename).suffix.lower()
|
||||||
|
return MIME_MAP.get(ext, "application/octet-stream")
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{user_id}/avatars/{filename}")
|
@router.get("/{user_id}/avatars/{filename}")
|
||||||
async def serve_avatar(user_id: str, filename: str):
|
async def serve_avatar(
|
||||||
"""
|
user_id: str,
|
||||||
Serve avatar images directly.
|
filename: str,
|
||||||
Public endpoint relying on unguessable filenames.
|
current_user: Dict[str, Any] = Depends(get_current_user_with_query_token),
|
||||||
"""
|
):
|
||||||
# Sanitize user_id (simple check to prevent directory traversal)
|
"""Serve avatar images. Supports auth via header or query token for <img> elements."""
|
||||||
safe_user_id = "".join(c for c in user_id if c.isalnum() or c in ('-', '_'))
|
require_authenticated_user(current_user)
|
||||||
if safe_user_id != user_id:
|
|
||||||
raise HTTPException(status_code=400, detail="Invalid user ID")
|
|
||||||
|
|
||||||
# Sanitize filename
|
|
||||||
safe_filename = os.path.basename(filename)
|
safe_filename = os.path.basename(filename)
|
||||||
|
file_path = _resolve_asset_path(user_id, "avatars", safe_filename)
|
||||||
# Construct path
|
|
||||||
# workspace/workspace_{user_id}/assets/avatars/{filename}
|
|
||||||
file_path = Path(WORKSPACE_DIR) / f"workspace_{safe_user_id}" / "assets" / "avatars" / safe_filename
|
|
||||||
|
|
||||||
if not file_path.exists():
|
if not file_path.exists():
|
||||||
|
logger.debug(f"[Assets] Avatar not found: {file_path}")
|
||||||
raise HTTPException(status_code=404, detail="Asset not found")
|
raise HTTPException(status_code=404, detail="Asset not found")
|
||||||
|
|
||||||
return FileResponse(file_path)
|
media_type = _get_media_type(safe_filename)
|
||||||
|
return FileResponse(file_path, media_type=media_type)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{user_id}/voice_samples/{filename}")
|
@router.get("/{user_id}/voice_samples/{filename}")
|
||||||
async def serve_voice_sample(user_id: str, filename: str):
|
async def serve_voice_sample(
|
||||||
"""
|
user_id: str,
|
||||||
Serve voice sample audio files directly.
|
filename: str,
|
||||||
"""
|
current_user: Dict[str, Any] = Depends(get_current_user_with_query_token),
|
||||||
# Sanitize user_id
|
):
|
||||||
safe_user_id = "".join(c for c in user_id if c.isalnum() or c in ('-', '_'))
|
"""Serve voice sample audio files. Supports auth via header or query token for <audio> elements."""
|
||||||
if safe_user_id != user_id:
|
require_authenticated_user(current_user)
|
||||||
raise HTTPException(status_code=400, detail="Invalid user ID")
|
|
||||||
|
|
||||||
# Sanitize filename
|
|
||||||
safe_filename = os.path.basename(filename)
|
safe_filename = os.path.basename(filename)
|
||||||
|
file_path = _resolve_asset_path(user_id, "voice_samples", safe_filename)
|
||||||
# Construct path
|
|
||||||
# workspace/workspace_{user_id}/assets/voice_samples/{filename}
|
|
||||||
file_path = Path(WORKSPACE_DIR) / f"workspace_{safe_user_id}" / "assets" / "voice_samples" / safe_filename
|
|
||||||
|
|
||||||
if not file_path.exists():
|
if not file_path.exists():
|
||||||
|
logger.debug(f"[Assets] Voice sample not found: {file_path}")
|
||||||
raise HTTPException(status_code=404, detail="Asset not found")
|
raise HTTPException(status_code=404, detail="Asset not found")
|
||||||
|
|
||||||
return FileResponse(file_path)
|
media_type = _get_media_type(safe_filename)
|
||||||
|
logger.debug(f"[Assets] Serving voice sample: {file_path} ({media_type}, {file_path.stat().st_size} bytes)")
|
||||||
|
return FileResponse(file_path, media_type=media_type)
|
||||||
@@ -28,7 +28,8 @@ import base64
|
|||||||
import os
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from utils.file_storage import save_file_safely, generate_unique_filename
|
from utils.file_storage import save_file_safely, generate_unique_filename
|
||||||
from services.database import get_db, WORKSPACE_DIR
|
from services.database import get_db
|
||||||
|
from utils.storage_paths import get_user_workspace, sanitize_user_id
|
||||||
from utils.asset_tracker import save_asset_to_library
|
from utils.asset_tracker import save_asset_to_library
|
||||||
from models.content_asset_models import ContentAsset, AssetType, AssetSource
|
from models.content_asset_models import ContentAsset, AssetType, AssetSource
|
||||||
from sqlalchemy import desc
|
from sqlalchemy import desc
|
||||||
@@ -234,7 +235,7 @@ async def generate_avatar(
|
|||||||
content_to_save = base64.b64decode(image_data) if isinstance(image_data, str) else image_data
|
content_to_save = base64.b64decode(image_data) if isinstance(image_data, str) else image_data
|
||||||
|
|
||||||
# Construct user assets directory
|
# Construct user assets directory
|
||||||
user_assets_dir = Path(WORKSPACE_DIR) / f"workspace_{user_id}" / "assets" / "avatars"
|
user_assets_dir = get_user_workspace(user_id) / "assets" / "avatars"
|
||||||
|
|
||||||
saved_path, error = save_file_safely(
|
saved_path, error = save_file_safely(
|
||||||
content_to_save,
|
content_to_save,
|
||||||
@@ -332,7 +333,7 @@ async def create_variation_route(
|
|||||||
content_to_save = base64.b64decode(image_data)
|
content_to_save = base64.b64decode(image_data)
|
||||||
|
|
||||||
# Construct user assets directory
|
# Construct user assets directory
|
||||||
user_assets_dir = Path(WORKSPACE_DIR) / f"workspace_{user_id}" / "assets" / "avatars"
|
user_assets_dir = get_user_workspace(user_id) / "assets" / "avatars"
|
||||||
|
|
||||||
saved_path, error = save_file_safely(
|
saved_path, error = save_file_safely(
|
||||||
content_to_save,
|
content_to_save,
|
||||||
@@ -406,7 +407,7 @@ async def enhance_avatar_route(
|
|||||||
content_to_save = base64.b64decode(image_data)
|
content_to_save = base64.b64decode(image_data)
|
||||||
|
|
||||||
# Construct user assets directory
|
# Construct user assets directory
|
||||||
user_assets_dir = Path(WORKSPACE_DIR) / f"workspace_{user_id}" / "assets" / "avatars"
|
user_assets_dir = get_user_workspace(user_id) / "assets" / "avatars"
|
||||||
|
|
||||||
saved_path, error = save_file_safely(
|
saved_path, error = save_file_safely(
|
||||||
content_to_save,
|
content_to_save,
|
||||||
@@ -469,7 +470,7 @@ async def create_voice_clone(
|
|||||||
file_content = await file.read()
|
file_content = await file.read()
|
||||||
filename = generate_unique_filename("voice_sample", Path(file.filename).suffix.lstrip("."))
|
filename = generate_unique_filename("voice_sample", Path(file.filename).suffix.lstrip("."))
|
||||||
|
|
||||||
user_voice_dir = Path(WORKSPACE_DIR) / f"workspace_{user_id}" / "assets" / "voice_samples"
|
user_voice_dir = get_user_workspace(user_id) / "assets" / "voice_samples"
|
||||||
saved_path, error = save_file_safely(file_content, user_voice_dir, filename)
|
saved_path, error = save_file_safely(file_content, user_voice_dir, filename)
|
||||||
|
|
||||||
if error or not saved_path:
|
if error or not saved_path:
|
||||||
@@ -537,16 +538,21 @@ async def create_voice_clone(
|
|||||||
|
|
||||||
# 3. Save Preview Audio (if generated)
|
# 3. Save Preview Audio (if generated)
|
||||||
preview_url = None
|
preview_url = None
|
||||||
|
preview_mime_type = "audio/wav"
|
||||||
if preview_audio_bytes:
|
if preview_audio_bytes:
|
||||||
preview_filename = f"preview_{filename}"
|
from utils.media_utils import detect_audio_format, ensure_audio_extension
|
||||||
# Ensure it ends with .wav
|
detected_fmt, preview_mime_type = detect_audio_format(preview_audio_bytes)
|
||||||
if not preview_filename.endswith(".wav"):
|
logger.info(f"[VoiceClone] Detected preview audio format: {detected_fmt} ({preview_mime_type}), {len(preview_audio_bytes)} bytes")
|
||||||
preview_filename = str(Path(preview_filename).with_suffix('.wav'))
|
|
||||||
|
# Build filename with correct extension based on actual content format
|
||||||
|
preview_filename = f"preview_{Path(filename).stem}"
|
||||||
|
preview_filename = ensure_audio_extension(preview_filename, preview_audio_bytes)
|
||||||
|
logger.info(f"[VoiceClone] Preview filename (corrected ext): {preview_filename}")
|
||||||
|
|
||||||
user_voice_dir = Path(WORKSPACE_DIR) / f"workspace_{user_id}" / "assets" / "voice_samples"
|
user_voice_dir = get_user_workspace(user_id) / "assets" / "voice_samples"
|
||||||
logger.warning(f"[VoiceClone] user_id: {user_id}")
|
logger.info(f"[VoiceClone] user_id: {user_id}")
|
||||||
logger.warning(f"[VoiceClone] user_voice_dir: {user_voice_dir}")
|
logger.info(f"[VoiceClone] user_voice_dir: {user_voice_dir}")
|
||||||
logger.warning(f"[VoiceClone] directory exists: {user_voice_dir.exists()}")
|
logger.info(f"[VoiceClone] directory exists: {user_voice_dir.exists()}")
|
||||||
saved_preview_path, error = save_file_safely(preview_audio_bytes, user_voice_dir, preview_filename)
|
saved_preview_path, error = save_file_safely(preview_audio_bytes, user_voice_dir, preview_filename)
|
||||||
|
|
||||||
if not error and saved_preview_path:
|
if not error and saved_preview_path:
|
||||||
@@ -623,9 +629,15 @@ async def create_voice_design(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Save the result to a temporary file
|
# Save the result to a file with correct extension based on content
|
||||||
filename = generate_unique_filename("voice_design_preview", "wav")
|
from utils.media_utils import detect_audio_format, ensure_audio_extension
|
||||||
user_voice_dir = Path(WORKSPACE_DIR) / f"workspace_{user_id}" / "assets" / "voice_samples"
|
detected_fmt, mime_type = detect_audio_format(result.preview_audio_bytes)
|
||||||
|
logger.info(f"[VoiceDesign] Detected audio format: {detected_fmt} ({mime_type})")
|
||||||
|
|
||||||
|
filename = generate_unique_filename("voice_design_preview", detected_fmt)
|
||||||
|
filename = ensure_audio_extension(filename, result.preview_audio_bytes)
|
||||||
|
|
||||||
|
user_voice_dir = get_user_workspace(user_id) / "assets" / "voice_samples"
|
||||||
saved_path, error = save_file_safely(result.preview_audio_bytes, user_voice_dir, filename)
|
saved_path, error = save_file_safely(result.preview_audio_bytes, user_voice_dir, filename)
|
||||||
|
|
||||||
if error or not saved_path:
|
if error or not saved_path:
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ from pathlib import Path
|
|||||||
import uuid
|
import uuid
|
||||||
import hashlib
|
import hashlib
|
||||||
|
|
||||||
from services.database import get_db
|
from services.database import get_db, get_session_for_user
|
||||||
from middleware.auth_middleware import get_current_user, get_current_user_with_query_token
|
from middleware.auth_middleware import get_current_user, get_current_user_with_query_token
|
||||||
from api.story_writer.utils.auth import require_authenticated_user
|
from api.story_writer.utils.auth import require_authenticated_user
|
||||||
from services.llm_providers.main_image_generation import generate_image
|
from services.llm_providers.main_image_generation import generate_image
|
||||||
@@ -28,6 +28,18 @@ router = APIRouter()
|
|||||||
AVATAR_SUBDIR = PODCAST_AVATARS_SUBDIR
|
AVATAR_SUBDIR = PODCAST_AVATARS_SUBDIR
|
||||||
|
|
||||||
|
|
||||||
|
async def _get_db_or_none(current_user: Dict[str, Any]):
|
||||||
|
"""Try to get a database session, returning None on failure (non-fatal for uploads)."""
|
||||||
|
try:
|
||||||
|
user_id = current_user.get('id') or current_user.get('clerk_user_id')
|
||||||
|
if not user_id:
|
||||||
|
return None
|
||||||
|
return get_session_for_user(user_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[Podcast] DB session unavailable (non-fatal): {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _get_podcast_avatars_dir(user_id: str) -> Path:
|
def _get_podcast_avatars_dir(user_id: str) -> Path:
|
||||||
"""Get podcast avatars directory for a user (workspace-aware)."""
|
"""Get podcast avatars directory for a user (workspace-aware)."""
|
||||||
return get_podcast_media_dir("image", user_id, ensure_exists=True) / AVATAR_SUBDIR
|
return get_podcast_media_dir("image", user_id, ensure_exists=True) / AVATAR_SUBDIR
|
||||||
@@ -44,8 +56,16 @@ async def upload_podcast_avatar(
|
|||||||
Upload a presenter avatar image for a podcast project.
|
Upload a presenter avatar image for a podcast project.
|
||||||
Returns the avatar URL for use in scene image generation.
|
Returns the avatar URL for use in scene image generation.
|
||||||
"""
|
"""
|
||||||
user_id = require_authenticated_user(current_user)
|
try:
|
||||||
|
user_id = require_authenticated_user(current_user)
|
||||||
|
except HTTPException:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[Podcast] Avatar upload auth failed: {e}", exc_info=True)
|
||||||
|
raise HTTPException(status_code=401, detail="Authentication failed")
|
||||||
|
|
||||||
|
logger.info(f"[Podcast] Avatar upload request - user_id={user_id}, project_id={project_id}, content_type={file.content_type}")
|
||||||
|
|
||||||
# Validate file type
|
# Validate file type
|
||||||
if not file.content_type or not file.content_type.startswith('image/'):
|
if not file.content_type or not file.content_type.startswith('image/'):
|
||||||
raise HTTPException(status_code=400, detail="File must be an image")
|
raise HTTPException(status_code=400, detail="File must be an image")
|
||||||
@@ -61,19 +81,20 @@ async def upload_podcast_avatar(
|
|||||||
unique_id = str(uuid.uuid4())[:8]
|
unique_id = str(uuid.uuid4())[:8]
|
||||||
avatar_filename = f"avatar_{project_id or 'temp'}_{unique_id}{file_ext}"
|
avatar_filename = f"avatar_{project_id or 'temp'}_{unique_id}{file_ext}"
|
||||||
avatars_dir = _get_podcast_avatars_dir(user_id)
|
avatars_dir = _get_podcast_avatars_dir(user_id)
|
||||||
|
logger.info(f"[Podcast] Saving avatar to: {avatars_dir / avatar_filename}")
|
||||||
avatar_path = avatars_dir / avatar_filename
|
avatar_path = avatars_dir / avatar_filename
|
||||||
|
|
||||||
# Save file
|
# Save file
|
||||||
with open(avatar_path, "wb") as f:
|
with open(avatar_path, "wb") as f:
|
||||||
f.write(file_content)
|
f.write(file_content)
|
||||||
|
|
||||||
logger.info(f"[Podcast] Avatar uploaded: {avatar_path}")
|
logger.info(f"[Podcast] Avatar uploaded successfully: {avatar_path}")
|
||||||
|
|
||||||
# Create avatar URL
|
# Create avatar URL
|
||||||
avatar_url = f"/api/podcast/images/{AVATAR_SUBDIR}/{avatar_filename}"
|
avatar_url = f"/api/podcast/images/{AVATAR_SUBDIR}/{avatar_filename}"
|
||||||
|
|
||||||
# Save to asset library if project_id provided
|
# Save to asset library if project_id provided and DB session available
|
||||||
if project_id:
|
if project_id and db:
|
||||||
try:
|
try:
|
||||||
save_asset_to_library(
|
save_asset_to_library(
|
||||||
db=db,
|
db=db,
|
||||||
@@ -95,13 +116,17 @@ async def upload_podcast_avatar(
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"[Podcast] Failed to save avatar asset: {e}")
|
logger.warning(f"[Podcast] Failed to save avatar asset (non-fatal): {e}")
|
||||||
|
elif project_id and not db:
|
||||||
|
logger.warning(f"[Podcast] DB session unavailable, skipping asset library save for avatar")
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"avatar_url": avatar_url,
|
"avatar_url": avatar_url,
|
||||||
"avatar_filename": avatar_filename,
|
"avatar_filename": avatar_filename,
|
||||||
"message": "Avatar uploaded successfully"
|
"message": "Avatar uploaded successfully"
|
||||||
}
|
}
|
||||||
|
except HTTPException:
|
||||||
|
raise
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.error(f"[Podcast] Avatar upload failed: {exc}", exc_info=True)
|
logger.error(f"[Podcast] Avatar upload failed: {exc}", exc_info=True)
|
||||||
raise HTTPException(status_code=500, detail=f"Avatar upload failed: {str(exc)}")
|
raise HTTPException(status_code=500, detail=f"Avatar upload failed: {str(exc)}")
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import os
|
|||||||
from sqlalchemy import create_engine
|
from sqlalchemy import create_engine
|
||||||
from sqlalchemy.orm import sessionmaker, Session
|
from sqlalchemy.orm import sessionmaker, Session
|
||||||
from sqlalchemy.exc import SQLAlchemyError
|
from sqlalchemy.exc import SQLAlchemyError
|
||||||
|
from fastapi import HTTPException
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from typing import Optional, List
|
from typing import Optional, List
|
||||||
|
|
||||||
@@ -386,12 +387,15 @@ def get_db(current_user: dict = Depends(get_current_user)):
|
|||||||
"""
|
"""
|
||||||
user_id = current_user.get('id') or current_user.get('clerk_user_id')
|
user_id = current_user.get('id') or current_user.get('clerk_user_id')
|
||||||
if not user_id:
|
if not user_id:
|
||||||
# Fallback or error? For now log error
|
|
||||||
logger.error("No user ID found in context for DB connection")
|
logger.error("No user ID found in context for DB connection")
|
||||||
# Could raise exception, but let's try to be safe
|
raise HTTPException(status_code=401, detail="User ID required for database access")
|
||||||
raise Exception("User ID required for database access")
|
|
||||||
|
|
||||||
engine = get_engine_for_user(user_id)
|
try:
|
||||||
|
engine = get_engine_for_user(user_id)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[DB] Failed to create engine for user {user_id}: {e}", exc_info=True)
|
||||||
|
raise HTTPException(status_code=503, detail="Database temporarily unavailable")
|
||||||
|
|
||||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||||
db = SessionLocal()
|
db = SessionLocal()
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -154,3 +154,89 @@ def load_media_bytes(media_url_or_path: str) -> Optional[bytes]:
|
|||||||
logger.error(f"[MediaUtils] Error reading file {path}: {e}")
|
logger.error(f"[MediaUtils] Error reading file {path}: {e}")
|
||||||
return None
|
return None
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# Audio format magic bytes signatures
|
||||||
|
_AUDIO_SIGNATURES = [
|
||||||
|
(b"\xff\xfb", "mp3"), # MP3 (MPEG-1 Layer 3, common)
|
||||||
|
(b"\xff\xf3", "mp3"), # MP3 (MPEG-2.5 Layer 3)
|
||||||
|
(b"\xff\xf2", "mp3"), # MP3 (MPEG-2 Layer 3)
|
||||||
|
(b"\xff\xfa", "mp3"), # MP3 (MPEG-2 Layer 3 variant)
|
||||||
|
(b"ID3", "mp3"), # MP3 with ID3 tag
|
||||||
|
(b"RIFF", "wav"), # WAV (RIFF header)
|
||||||
|
(b"OggS", "ogg"), # OGG
|
||||||
|
(b"fLaC", "flac"), # FLAC
|
||||||
|
(b"\x1a\x45\xdf\xa3", "webm"), # WebM / Matroska
|
||||||
|
(b"ftyp", "m4a"), # MP4/M4A (ftyp box follows offset 4)
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def detect_audio_format(audio_bytes: bytes) -> tuple[str, str]:
|
||||||
|
"""Detect the actual audio format from content magic bytes.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (format_name, mime_type).
|
||||||
|
Falls back to ('wav', 'audio/wav') if no signature matches.
|
||||||
|
"""
|
||||||
|
if not audio_bytes or len(audio_bytes) < 4:
|
||||||
|
return "wav", "audio/wav"
|
||||||
|
|
||||||
|
for signature, fmt in _AUDIO_SIGNATURES:
|
||||||
|
if signature == b"ftyp":
|
||||||
|
# M4A/MP4: 'ftyp' appears at offset 4
|
||||||
|
if len(audio_bytes) > 8 and audio_bytes[4:8] == b"ftyp":
|
||||||
|
return "m4a", "audio/mp4"
|
||||||
|
elif audio_bytes[:len(signature)] == signature:
|
||||||
|
mime_map = {
|
||||||
|
"mp3": "audio/mpeg",
|
||||||
|
"wav": "audio/wav",
|
||||||
|
"ogg": "audio/ogg",
|
||||||
|
"flac": "audio/flac",
|
||||||
|
"webm": "audio/webm",
|
||||||
|
"m4a": "audio/mp4",
|
||||||
|
}
|
||||||
|
return fmt, mime_map.get(fmt, "audio/wav")
|
||||||
|
|
||||||
|
# Check for Opus-in-OGG (Opus magic after OGG pages)
|
||||||
|
if b"OpusHead" in audio_bytes[:100]:
|
||||||
|
return "ogg", "audio/ogg"
|
||||||
|
|
||||||
|
# Check for MP4/M4A container (atoms starting with size + type)
|
||||||
|
if len(audio_bytes) > 8:
|
||||||
|
atom_type = audio_bytes[4:8]
|
||||||
|
if atom_type in (b"moov", b"mdat", b"free", b"skip"):
|
||||||
|
return "m4a", "audio/mp4"
|
||||||
|
|
||||||
|
return "wav", "audio/wav"
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_audio_extension(filename: str, audio_bytes: bytes) -> str:
|
||||||
|
"""Adjust filename extension to match the actual audio format in audio_bytes.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filename: Original filename (may have wrong extension like .wav for mp3 data)
|
||||||
|
audio_bytes: The actual audio data bytes
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Filename with corrected extension based on content format.
|
||||||
|
"""
|
||||||
|
fmt, _ = detect_audio_format(audio_bytes)
|
||||||
|
ext_map = {
|
||||||
|
"mp3": ".mp3",
|
||||||
|
"wav": ".wav",
|
||||||
|
"ogg": ".ogg",
|
||||||
|
"flac": ".flac",
|
||||||
|
"webm": ".webm",
|
||||||
|
"m4a": ".m4a",
|
||||||
|
"opus": ".ogg",
|
||||||
|
}
|
||||||
|
|
||||||
|
correct_ext = ext_map.get(fmt, ".wav")
|
||||||
|
path = Path(filename)
|
||||||
|
current_ext = path.suffix.lower()
|
||||||
|
|
||||||
|
if current_ext != correct_ext:
|
||||||
|
logger.info(f"[MediaUtils] Correcting audio extension: {filename} -> {path.stem}{correct_ext} (detected format: {fmt})")
|
||||||
|
return f"{path.stem}{correct_ext}"
|
||||||
|
|
||||||
|
return filename
|
||||||
|
|||||||
Reference in New Issue
Block a user