Unify story media path resolution across services and routes

This commit is contained in:
ي
2026-03-12 14:59:45 +05:30
parent bc49329ed6
commit 22df52f9d6
6 changed files with 235 additions and 216 deletions

View File

@@ -1,99 +1,190 @@
from __future__ import annotations
from pathlib import Path
from typing import Optional
from typing import Iterable, List, Optional
from urllib.parse import urlparse
from fastapi import HTTPException, status
from loguru import logger
from sqlalchemy.orm import Session
from services.database import get_db
from services.user_workspace_manager import UserWorkspaceManager
BASE_DIR = Path(__file__).resolve().parents[4] # root/
# Default global media directory matches story image/audio services (root/data/media)
DATA_MEDIA_DIR = BASE_DIR / "data" / "media"
BASE_DIR = Path(__file__).resolve().parents[4] # repository root
DATA_MEDIA_DIR = (BASE_DIR / "data" / "media").resolve()
LEGACY_STORY_VIDEOS_DIR = (BASE_DIR / "story_videos").resolve()
LEGACY_WORKSPACE_MEDIA_DIR = (BASE_DIR / "workspace" / "media").resolve()
STORY_IMAGES_DIR = (DATA_MEDIA_DIR / "story_images").resolve()
STORY_AUDIO_DIR = (DATA_MEDIA_DIR / "story_audio").resolve()
STORY_MEDIA_SUBDIRS = {
"image": "story_images",
"audio": "story_audio",
"video": "story_videos",
}
def _get_user_media_path(user_id: str, media_type: str) -> Optional[Path]:
"""Resolve user-specific media directory."""
# Authoritative policy:
# - New reads/writes should use workspace/workspace_<id>/media/story_*.
# Compatibility fallback order for reads:
# 1) workspace/workspace_<id>/media/story_*
# 2) legacy workspace paths (e.g. content/story_audio, workspace/media/story_videos)
# 3) global data/media/story_*
# 4) old root-level story_videos/*
def _workspace_story_media_dir(workspace_path: Path, media_type: str) -> Path:
return (workspace_path / "media" / STORY_MEDIA_SUBDIRS[media_type]).resolve()
def _workspace_legacy_dirs(workspace_path: Path, media_type: str) -> List[Path]:
if media_type == "audio":
return [(workspace_path / "content" / "story_audio").resolve()]
if media_type == "video":
return [(workspace_path / "media" / "story_videos").resolve()]
return []
def _global_story_media_dir(media_type: str) -> Path:
return (DATA_MEDIA_DIR / STORY_MEDIA_SUBDIRS[media_type]).resolve()
def _global_legacy_dirs(media_type: str) -> List[Path]:
if media_type == "video":
return [
(LEGACY_WORKSPACE_MEDIA_DIR / "story_videos").resolve(),
LEGACY_STORY_VIDEOS_DIR,
]
return []
def _get_workspace_path(user_id: str, db: Optional[Session] = None) -> Optional[Path]:
if not user_id:
return None
session = db
db_gen = None
try:
# We need a new session for this operation
db_gen = get_db()
db = next(db_gen)
try:
workspace_manager = UserWorkspaceManager(db)
workspace = workspace_manager.get_user_workspace(user_id)
if workspace:
# media/story_images or media/story_audio
subdir = "story_images" if media_type == "image" else "story_audio"
path = Path(workspace['workspace_path']) / "media" / subdir
path.mkdir(parents=True, exist_ok=True)
return path
finally:
# Ensure we close the session if it's not managed by dependency injection
# Since get_db yields, we can't easily close it unless we manage the generator
# But get_db uses SessionLocal() which should be closed.
# However, get_db is a generator. We should really use a context manager or dependency.
# Here we just took next(db), so it's an open session.
# We should probably close it.
# Actually, UserWorkspaceManager uses the passed db.
# Let's assume standard usage pattern for manual DB access.
pass
# Note: The generator usage here is a bit tricky for cleanup.
# Ideally we'd have a context manager.
# For now, let's rely on garbage collection or explicit close if possible.
# But SQLAlchemy sessions should be closed.
# db.close() # valid if db is Session
except Exception as e:
logger.warning(f"Failed to resolve user workspace path for {user_id}: {e}")
if session is None:
db_gen = get_db()
session = next(db_gen)
workspace_manager = UserWorkspaceManager(session)
workspace = workspace_manager.get_user_workspace(user_id)
if workspace and workspace.get("workspace_path"):
return Path(workspace["workspace_path"]).resolve()
except Exception as exc:
logger.warning(f"[StoryWriter] Failed to resolve workspace for {user_id}: {exc}")
finally:
if db is None and session is not None:
try:
session.close()
except Exception:
pass
if db_gen is not None:
try:
db_gen.close()
except Exception:
pass
return None
def resolve_story_media_path(filename: str, media_type: str, user_id: Optional[str] = None) -> Path:
"""
Resolve a story media file path, checking user workspace first then global directory.
media_type: 'image' or 'audio'
"""
filename = filename.split("?")[0].strip()
# 1. Try user workspace
if user_id:
user_path = _get_user_media_path(user_id, media_type)
if user_path:
file_path = (user_path / filename).resolve()
# Guard against traversal
if str(file_path).startswith(str(user_path)) and file_path.exists():
return file_path
def get_story_media_write_dir(media_type: str, user_id: Optional[str] = None, db: Optional[Session] = None) -> Path:
"""Return the canonical directory used for newly generated story media files."""
if media_type not in STORY_MEDIA_SUBDIRS:
raise ValueError(f"Unsupported media type: {media_type}")
if user_id:
workspace_path = _get_workspace_path(user_id, db)
if workspace_path:
canonical = _workspace_story_media_dir(workspace_path, media_type)
canonical.mkdir(parents=True, exist_ok=True)
return canonical
fallback = _global_story_media_dir(media_type)
fallback.mkdir(parents=True, exist_ok=True)
return fallback
def _safe_candidate(base_dir: Path, filename: str) -> Optional[Path]:
try:
base_resolved = base_dir.resolve()
candidate = (base_resolved / filename).resolve()
candidate.relative_to(base_resolved)
return candidate
except Exception:
return None
def _iter_read_dirs(media_type: str, user_id: Optional[str], db: Optional[Session]) -> Iterable[Path]:
if media_type not in STORY_MEDIA_SUBDIRS:
raise ValueError(f"Unsupported media type: {media_type}")
workspace_path = _get_workspace_path(user_id, db) if user_id else None
if workspace_path:
yield _workspace_story_media_dir(workspace_path, media_type)
for legacy in _workspace_legacy_dirs(workspace_path, media_type):
yield legacy
yield _global_story_media_dir(media_type)
for legacy in _global_legacy_dirs(media_type):
yield legacy
def resolve_story_media_path(
filename: str,
media_type: str,
user_id: Optional[str] = None,
db: Optional[Session] = None,
extra_subdir: Optional[str] = None,
) -> Path:
"""Resolve story media with canonical-first lookup and legacy fallbacks."""
filename = filename.split("?")[0].strip()
if not filename:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="File not found")
for base_dir in _iter_read_dirs(media_type, user_id, db):
search_dir = base_dir / extra_subdir if extra_subdir else base_dir
candidate = _safe_candidate(search_dir, filename)
if not candidate:
continue
if candidate.exists():
return candidate
alternate = _find_alternate_media_file(search_dir, filename)
if alternate:
logger.warning(
"[StoryWriter] Requested media '%s' missing in '%s'; serving '%s'",
filename,
search_dir,
alternate.name,
)
return alternate
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"File not found: {filename}")
def resolve_media_file(base_dir: Path, filename: str) -> Path:
"""Backwards-compatible helper for existing route handlers."""
filename = filename.split("?")[0].strip()
resolved = _safe_candidate(base_dir, filename)
if not resolved:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
if resolved.exists():
return resolved
# 2. Fallback to global directory
base_dir = STORY_IMAGES_DIR if media_type == "image" else STORY_AUDIO_DIR
file_path = (base_dir / filename).resolve()
if not str(file_path).startswith(str(base_dir)):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
if file_path.exists():
return file_path
# 3. If not found, try alternate in global (legacy behavior support)
alternate = _find_alternate_media_file(base_dir, filename)
if alternate:
logger.warning(f"[StoryWriter] Serving alternate media for {filename}: {alternate.name}")
logger.warning(
"[StoryWriter] Requested media file '%s' missing; serving closest match '%s'",
filename,
alternate.name,
)
return alternate
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"File not found: {filename}")
def load_story_image_bytes(image_url: str, user_id: Optional[str] = None) -> Optional[bytes]:
"""
Resolve an authenticated story image URL (e.g., /api/story/images/<file>) to raw bytes.
Returns None if the file cannot be located.
"""
if not image_url:
return None
@@ -109,25 +200,18 @@ def load_story_image_bytes(image_url: str, user_id: Optional[str] = None) -> Opt
if not filename:
return None
# Try to resolve path using helper
try:
file_path = resolve_story_media_path(filename, "image", user_id)
return file_path.read_bytes()
except HTTPException:
# Not found
logger.warning(f"[StoryWriter] Referenced scene image not found: {filename}")
return None
except Exception as exc:
logger.error(f"[StoryWriter] Failed to load reference image for video gen: {exc}")
return None
def load_story_audio_bytes(audio_url: str, user_id: Optional[str] = None) -> Optional[bytes]:
"""
Resolve an authenticated story audio URL (e.g., /api/story/audio/<file>) to raw bytes.
Returns None if the file cannot be located.
"""
if not audio_url:
return None
@@ -143,54 +227,18 @@ def load_story_audio_bytes(audio_url: str, user_id: Optional[str] = None) -> Opt
if not filename:
return None
# Try to resolve path using helper
try:
file_path = resolve_story_media_path(filename, "audio", user_id)
return file_path.read_bytes()
except HTTPException:
# Not found
logger.warning(f"[StoryWriter] Referenced scene audio not found: {filename}")
return None
except Exception as exc:
logger.error(f"[StoryWriter] Failed to load reference audio for video gen: {exc}")
return None
def resolve_media_file(base_dir: Path, filename: str) -> Path:
"""
Returns a safe resolved path for a media file stored under base_dir.
Guards against directory traversal and ensures the file exists.
"""
filename = filename.split("?")[0].strip()
resolved = (base_dir / filename).resolve()
try:
resolved.relative_to(base_dir.resolve())
except ValueError:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
if not resolved.exists():
alternate = _find_alternate_media_file(base_dir, filename)
if alternate:
logger.warning(
"[StoryWriter] Requested media file '%s' missing; serving closest match '%s'",
filename,
alternate.name,
)
return alternate
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"File not found: {filename}")
return resolved
def _find_alternate_media_file(base_dir: Path, filename: str) -> Optional[Path]:
"""
Attempt to find the most recent media file that matches the original name prefix.
This helps when files are regenerated with new UUID/hash suffixes but the frontend still
references an older filename.
"""
try:
base_dir = base_dir.resolve()
except Exception:
@@ -216,5 +264,3 @@ def _find_alternate_media_file(base_dir: Path, filename: str) -> Optional[Path]:
return None
return candidates[0] if candidates else None