Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
120 lines
4.6 KiB
Python
120 lines
4.6 KiB
Python
"""
|
|
Podcast API Utility Functions
|
|
|
|
Helper functions for loading media files and other utilities.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
from fastapi import HTTPException
|
|
from loguru import logger
|
|
|
|
from .constants import get_podcast_media_read_dirs
|
|
from utils.media_utils import load_media_bytes
|
|
|
|
|
|
def _resolve_podcast_media_file(
|
|
filename: str,
|
|
media_type: str,
|
|
user_id: str | None = None,
|
|
*,
|
|
subdir: Path | None = None,
|
|
) -> Path:
|
|
"""Resolve podcast media file path from tenant workspace first, then legacy global dir."""
|
|
clean_filename = filename.split("?", 1)[0].strip()
|
|
if not clean_filename:
|
|
raise HTTPException(status_code=400, detail="Invalid filename")
|
|
|
|
# Filename must be a basename only (no path separators / traversal)
|
|
filename_path = Path(clean_filename)
|
|
if filename_path.name != clean_filename or clean_filename in {".", ".."}:
|
|
raise HTTPException(status_code=400, detail="Invalid filename")
|
|
|
|
for base_dir in get_podcast_media_read_dirs(media_type, user_id):
|
|
target_dir = (base_dir / subdir).resolve() if subdir else base_dir.resolve()
|
|
candidate = (target_dir / clean_filename).resolve()
|
|
try:
|
|
candidate.relative_to(target_dir)
|
|
except ValueError:
|
|
logger.error(f"[Podcast] Attempted path traversal for {media_type}: {filename}")
|
|
raise HTTPException(status_code=403, detail="Invalid media path")
|
|
if candidate.exists():
|
|
return candidate
|
|
|
|
raise HTTPException(status_code=404, detail=f"{media_type.capitalize()} file not found: {clean_filename}")
|
|
|
|
|
|
def load_podcast_audio_bytes(audio_url: str, user_id: str | None = None) -> bytes:
|
|
"""Load podcast audio bytes from URL. Only handles /api/podcast/audio/ URLs."""
|
|
if not audio_url:
|
|
raise HTTPException(status_code=400, detail="Audio URL is required")
|
|
|
|
try:
|
|
parsed = urlparse(audio_url)
|
|
path = parsed.path if parsed.scheme else audio_url
|
|
|
|
# Only handle /api/podcast/audio/ URLs
|
|
prefix = "/api/podcast/audio/"
|
|
if prefix not in path:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Unsupported audio URL format: {audio_url}. Only /api/podcast/audio/ URLs are supported."
|
|
)
|
|
|
|
filename = path.split(prefix, 1)[1].split("?", 1)[0].strip()
|
|
if not filename:
|
|
raise HTTPException(status_code=400, detail=f"Could not extract filename from URL: {audio_url}")
|
|
|
|
audio_path = _resolve_podcast_media_file(filename, "audio", user_id)
|
|
return audio_path.read_bytes()
|
|
except HTTPException:
|
|
raise
|
|
except Exception as exc:
|
|
logger.error(f"[Podcast] Failed to load audio: {exc}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to load audio: {str(exc)}")
|
|
|
|
|
|
def load_podcast_image_bytes(image_url: str, user_id: str | None = None) -> bytes:
|
|
"""Load podcast image bytes from URL. Resolves from workspace first."""
|
|
if not image_url:
|
|
raise HTTPException(status_code=400, detail="Image URL is required")
|
|
|
|
logger.info(f"[Podcast] Loading image from URL: {image_url}")
|
|
|
|
try:
|
|
parsed = urlparse(image_url)
|
|
path = parsed.path if parsed.scheme else image_url
|
|
|
|
# Extract filename from API image path
|
|
prefix = "/api/podcast/images/"
|
|
if path.startswith(prefix):
|
|
filename = path[len(prefix):].split("?", 1)[0].strip()
|
|
# Handle subdirectories like avatars/
|
|
subdir = None
|
|
if "/" in filename:
|
|
subdir_part = filename.rsplit("/", 1)[0]
|
|
subdir = Path(subdir_part)
|
|
filename = filename.rsplit("/", 1)[1]
|
|
|
|
try:
|
|
image_path = _resolve_podcast_media_file(filename, "image", user_id, subdir=subdir)
|
|
return image_path.read_bytes()
|
|
except HTTPException:
|
|
pass # Fall through to centralized loader
|
|
|
|
# Fall back to centralized media loader
|
|
image_bytes = load_media_bytes(image_url)
|
|
|
|
if not image_bytes:
|
|
logger.error(f"[Podcast] Image file not found for URL: {image_url}")
|
|
raise HTTPException(status_code=404, detail=f"Image file not found: {image_url}")
|
|
|
|
logger.info(f"[Podcast] ✅ Successfully loaded image: {len(image_bytes)} bytes")
|
|
return image_bytes
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as exc:
|
|
logger.error(f"[Podcast] Failed to load image: {exc}")
|
|
raise HTTPException(status_code=500, detail=f"Failed to load image: {str(exc)}")
|