Files
ALwrity/backend/api/story_writer/utils/media_utils.py

267 lines
8.8 KiB
Python

from __future__ import annotations
from pathlib import Path
from typing import Iterable, List, Optional
from urllib.parse import urlparse
from fastapi import HTTPException, status
from loguru import logger
from sqlalchemy.orm import Session
from services.database import get_db
from services.user_workspace_manager import UserWorkspaceManager
BASE_DIR = Path(__file__).resolve().parents[4] # repository root
DATA_MEDIA_DIR = (BASE_DIR / "data" / "media").resolve()
LEGACY_STORY_VIDEOS_DIR = (BASE_DIR / "story_videos").resolve()
LEGACY_WORKSPACE_MEDIA_DIR = (BASE_DIR / "workspace" / "media").resolve()
STORY_MEDIA_SUBDIRS = {
"image": "story_images",
"audio": "story_audio",
"video": "story_videos",
}
# Authoritative policy:
# - New reads/writes should use workspace/workspace_<id>/media/story_*.
# Compatibility fallback order for reads:
# 1) workspace/workspace_<id>/media/story_*
# 2) legacy workspace paths (e.g. content/story_audio, workspace/media/story_videos)
# 3) global data/media/story_*
# 4) old root-level story_videos/*
def _workspace_story_media_dir(workspace_path: Path, media_type: str) -> Path:
return (workspace_path / "media" / STORY_MEDIA_SUBDIRS[media_type]).resolve()
def _workspace_legacy_dirs(workspace_path: Path, media_type: str) -> List[Path]:
if media_type == "audio":
return [(workspace_path / "content" / "story_audio").resolve()]
if media_type == "video":
return [(workspace_path / "media" / "story_videos").resolve()]
return []
def _global_story_media_dir(media_type: str) -> Path:
return (DATA_MEDIA_DIR / STORY_MEDIA_SUBDIRS[media_type]).resolve()
def _global_legacy_dirs(media_type: str) -> List[Path]:
if media_type == "video":
return [
(LEGACY_WORKSPACE_MEDIA_DIR / "story_videos").resolve(),
LEGACY_STORY_VIDEOS_DIR,
]
return []
def _get_workspace_path(user_id: str, db: Optional[Session] = None) -> Optional[Path]:
if not user_id:
return None
session = db
db_gen = None
try:
if session is None:
db_gen = get_db()
session = next(db_gen)
workspace_manager = UserWorkspaceManager(session)
workspace = workspace_manager.get_user_workspace(user_id)
if workspace and workspace.get("workspace_path"):
return Path(workspace["workspace_path"]).resolve()
except Exception as exc:
logger.warning(f"[StoryWriter] Failed to resolve workspace for {user_id}: {exc}")
finally:
if db is None and session is not None:
try:
session.close()
except Exception:
pass
if db_gen is not None:
try:
db_gen.close()
except Exception:
pass
return None
def get_story_media_write_dir(media_type: str, user_id: Optional[str] = None, db: Optional[Session] = None) -> Path:
"""Return the canonical directory used for newly generated story media files."""
if media_type not in STORY_MEDIA_SUBDIRS:
raise ValueError(f"Unsupported media type: {media_type}")
if user_id:
workspace_path = _get_workspace_path(user_id, db)
if workspace_path:
canonical = _workspace_story_media_dir(workspace_path, media_type)
canonical.mkdir(parents=True, exist_ok=True)
return canonical
fallback = _global_story_media_dir(media_type)
fallback.mkdir(parents=True, exist_ok=True)
return fallback
def _safe_candidate(base_dir: Path, filename: str) -> Optional[Path]:
try:
base_resolved = base_dir.resolve()
candidate = (base_resolved / filename).resolve()
candidate.relative_to(base_resolved)
return candidate
except Exception:
return None
def _iter_read_dirs(media_type: str, user_id: Optional[str], db: Optional[Session]) -> Iterable[Path]:
if media_type not in STORY_MEDIA_SUBDIRS:
raise ValueError(f"Unsupported media type: {media_type}")
workspace_path = _get_workspace_path(user_id, db) if user_id else None
if workspace_path:
yield _workspace_story_media_dir(workspace_path, media_type)
for legacy in _workspace_legacy_dirs(workspace_path, media_type):
yield legacy
yield _global_story_media_dir(media_type)
for legacy in _global_legacy_dirs(media_type):
yield legacy
def resolve_story_media_path(
filename: str,
media_type: str,
user_id: Optional[str] = None,
db: Optional[Session] = None,
extra_subdir: Optional[str] = None,
) -> Path:
"""Resolve story media with canonical-first lookup and legacy fallbacks."""
filename = filename.split("?")[0].strip()
if not filename:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found")
for base_dir in _iter_read_dirs(media_type, user_id, db):
search_dir = base_dir / extra_subdir if extra_subdir else base_dir
candidate = _safe_candidate(search_dir, filename)
if not candidate:
continue
if candidate.exists():
return candidate
alternate = _find_alternate_media_file(search_dir, filename)
if alternate:
logger.warning(
"[StoryWriter] Requested media '%s' missing in '%s'; serving '%s'",
filename,
search_dir,
alternate.name,
)
return alternate
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"File not found: {filename}")
def resolve_media_file(base_dir: Path, filename: str) -> Path:
"""Backwards-compatible helper for existing route handlers."""
filename = filename.split("?")[0].strip()
resolved = _safe_candidate(base_dir, filename)
if not resolved:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
if resolved.exists():
return resolved
alternate = _find_alternate_media_file(base_dir, filename)
if alternate:
logger.warning(
"[StoryWriter] Requested media file '%s' missing; serving closest match '%s'",
filename,
alternate.name,
)
return alternate
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"File not found: {filename}")
def load_story_image_bytes(image_url: str, user_id: Optional[str] = None) -> Optional[bytes]:
if not image_url:
return None
try:
parsed = urlparse(image_url)
path = parsed.path if parsed.scheme else image_url
prefix = "/api/story/images/"
if prefix not in path:
logger.warning(f"[StoryWriter] Unsupported image URL for video reference: {image_url}")
return None
filename = path.split(prefix, 1)[1].split("?", 1)[0].strip()
if not filename:
return None
try:
file_path = resolve_story_media_path(filename, "image", user_id)
return file_path.read_bytes()
except HTTPException:
logger.warning(f"[StoryWriter] Referenced scene image not found: {filename}")
return None
except Exception as exc:
logger.error(f"[StoryWriter] Failed to load reference image for video gen: {exc}")
return None
def load_story_audio_bytes(audio_url: str, user_id: Optional[str] = None) -> Optional[bytes]:
if not audio_url:
return None
try:
parsed = urlparse(audio_url)
path = parsed.path if parsed.scheme else audio_url
prefix = "/api/story/audio/"
if prefix not in path:
logger.warning(f"[StoryWriter] Unsupported audio URL for video reference: {audio_url}")
return None
filename = path.split(prefix, 1)[1].split("?", 1)[0].strip()
if not filename:
return None
try:
file_path = resolve_story_media_path(filename, "audio", user_id)
return file_path.read_bytes()
except HTTPException:
logger.warning(f"[StoryWriter] Referenced scene audio not found: {filename}")
return None
except Exception as exc:
logger.error(f"[StoryWriter] Failed to load reference audio for video gen: {exc}")
return None
def _find_alternate_media_file(base_dir: Path, filename: str) -> Optional[Path]:
try:
base_dir = base_dir.resolve()
except Exception:
return None
stem = Path(filename).stem
suffix = Path(filename).suffix
if not suffix or "_" not in stem:
return None
prefix = stem.rsplit("_", 1)[0]
pattern = f"{prefix}_*{suffix}"
try:
candidates = sorted(
(p for p in base_dir.glob(pattern) if p.is_file()),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
except Exception as exc:
logger.debug(f"[StoryWriter] Failed to search alternate media files for {filename}: {exc}")
return None
return candidates[0] if candidates else None