diff --git a/backend/api/video_studio/handlers/avatar.py b/backend/api/video_studio/handlers/avatar.py index f7570d2d..51fedb36 100644 --- a/backend/api/video_studio/handlers/avatar.py +++ b/backend/api/video_studio/handlers/avatar.py @@ -1,8 +1,6 @@ from fastapi import APIRouter, UploadFile, File, Form, BackgroundTasks, HTTPException, Depends from fastapi.responses import FileResponse -from typing import Optional, Dict, Any import shutil -import os from pathlib import Path from services.wavespeed.infinitetalk import animate_scene_with_voiceover from ..task_manager import task_manager @@ -11,13 +9,10 @@ from loguru import logger from services.database import get_engine_for_user from sqlalchemy.orm import sessionmaker from utils.asset_tracker import save_asset_to_library +from utils.storage_paths import resolve_user_media_path, get_workspace_root, get_legacy_video_studio_upload_dirs router = APIRouter() -# Define storage directory -UPLOAD_DIR = Path("backend/data/video_studio/uploads") -UPLOAD_DIR.mkdir(parents=True, exist_ok=True) - def _process_avatar_generation(task_id: str, image_path: Path, audio_path: Path, user_id: str, resolution: str, model: str): """ Background task to process avatar generation using shared InfiniteTalk service. @@ -51,7 +46,8 @@ def _process_avatar_generation(task_id: str, image_path: Path, audio_path: Path, # Save the resulting video bytes to a file video_filename = f"video_{task_id}.mp4" - video_path = UPLOAD_DIR / video_filename + output_dir = resolve_user_media_path(user_id, "video_studio", "videos", create=True) + video_path = output_dir / video_filename with open(video_path, "wb") as f: f.write(result["video_bytes"]) @@ -127,8 +123,9 @@ async def create_avatar_video( # Generate temp paths image_ext = Path(image.filename).suffix or ".png" audio_ext = Path(audio.filename).suffix or ".mp3" - image_path = UPLOAD_DIR / f"img_{task_id}{image_ext}" - audio_path = UPLOAD_DIR / f"aud_{task_id}{audio_ext}" + upload_dir = resolve_user_media_path(user_id, "video_studio", "uploads", create=True) + image_path = upload_dir / f"img_{task_id}{image_ext}" + audio_path = upload_dir / f"aud_{task_id}{audio_ext}" try: # Save uploaded files @@ -167,7 +164,23 @@ async def get_task_status(task_id: str, current_user: dict = Depends(get_current @router.get("/download/{filename}") async def download_video(filename: str): - file_path = UPLOAD_DIR / filename - if not file_path.exists(): - raise HTTPException(status_code=404, detail="File not found") - return FileResponse(file_path) + candidate_paths = [] + + workspace_root = get_workspace_root() + candidate_paths.extend([ + workspace_root / f"workspace_*" / "media" / "video_studio" / "videos" / filename, + workspace_root / f"workspace_*" / "media" / "video_studio" / "uploads" / filename, + ]) + + for legacy_dir in get_legacy_video_studio_upload_dirs(): + candidate_paths.append(legacy_dir / filename) + + for candidate in candidate_paths: + if "*" in str(candidate): + for matched in workspace_root.glob(str(candidate.relative_to(workspace_root))): + if matched.exists() and matched.is_file(): + return FileResponse(matched) + elif candidate.exists() and candidate.is_file(): + return FileResponse(candidate) + + raise HTTPException(status_code=404, detail="File not found") diff --git a/backend/scripts/migrate_video_studio_uploads_to_workspace.py b/backend/scripts/migrate_video_studio_uploads_to_workspace.py new file mode 100644 index 00000000..84b6f0b0 --- /dev/null +++ b/backend/scripts/migrate_video_studio_uploads_to_workspace.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +"""Migrate legacy Video Studio upload files into tenant workspace media folders.""" + +from __future__ import annotations + +import argparse +import json +import re +import shutil +import sqlite3 +from pathlib import Path + +from services.database import WORKSPACE_DIR +from utils.storage_paths import resolve_user_media_path, get_legacy_video_studio_upload_dirs + +TASK_FILE_PATTERN = re.compile(r"^(img|aud|video)_([0-9a-fA-F-]{36})") + + +def _read_task_mappings() -> dict[str, str]: + """Build task_id -> user_id mapping from available per-user databases.""" + mapping: dict[str, str] = {} + workspace_root = Path(WORKSPACE_DIR) + + if not workspace_root.exists(): + return mapping + + for workspace_dir in workspace_root.glob("workspace_*"): + safe_workspace_id = workspace_dir.name.removeprefix("workspace_") + db_candidates = [ + workspace_dir / "db" / f"alwrity_{safe_workspace_id}.db", + workspace_dir / "db" / "alwrity.db", + ] + + db_path = next((p for p in db_candidates if p.exists()), None) + if not db_path: + continue + + conn = None + try: + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='video_generation_tasks'" + ) + if not cursor.fetchone(): + continue + + cursor.execute("SELECT task_id, user_id, result FROM video_generation_tasks") + for task_id, db_user_id, raw_result in cursor.fetchall(): + if not task_id: + continue + + resolved_user = db_user_id or safe_workspace_id + mapping[str(task_id)] = str(resolved_user) + + if raw_result: + try: + parsed = json.loads(raw_result) if isinstance(raw_result, str) else raw_result + if isinstance(parsed, dict): + video_url = parsed.get("video_url", "") + if video_url: + filename = Path(video_url).name + match = TASK_FILE_PATTERN.match(filename) + if match: + mapping.setdefault(match.group(2), str(resolved_user)) + except Exception: + pass + finally: + if conn: + conn.close() + + return mapping + + +def migrate_legacy_uploads(apply: bool = False) -> tuple[int, int]: + task_to_user = _read_task_mappings() + moved = 0 + skipped = 0 + + for legacy_dir in get_legacy_video_studio_upload_dirs(): + if not legacy_dir.exists(): + continue + + for file_path in legacy_dir.iterdir(): + if not file_path.is_file(): + continue + + match = TASK_FILE_PATTERN.match(file_path.name) + if not match: + skipped += 1 + continue + + prefix, task_id = match.groups() + user_id = task_to_user.get(task_id) + if not user_id: + skipped += 1 + continue + + media_type = "videos" if prefix == "video" else "uploads" + target_dir = resolve_user_media_path(user_id, "video_studio", media_type, create=True) + target_path = target_dir / file_path.name + + if target_path.exists(): + skipped += 1 + continue + + print(f"{'[DRY RUN] Would move' if not apply else 'Moving'} {file_path} -> {target_path}") + if apply: + target_path.parent.mkdir(parents=True, exist_ok=True) + shutil.move(str(file_path), str(target_path)) + moved += 1 + + return moved, skipped + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--apply", action="store_true", help="Apply changes (default is dry-run)") + args = parser.parse_args() + + moved, skipped = migrate_legacy_uploads(apply=args.apply) + mode = "APPLY" if args.apply else "DRY-RUN" + print(f"[{mode}] Completed. moved={moved}, skipped={skipped}") + + +if __name__ == "__main__": + main() diff --git a/backend/utils/storage_paths.py b/backend/utils/storage_paths.py new file mode 100644 index 00000000..f0fbabeb --- /dev/null +++ b/backend/utils/storage_paths.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Iterable + +_SAFE_CHARS = {"-", "_"} + + +def sanitize_user_id(user_id: str) -> str: + """Return filesystem-safe user id used in workspace folder names.""" + return "".join(c for c in str(user_id) if c.isalnum() or c in _SAFE_CHARS) + + +def _sanitize_segment(value: str, fallback: str) -> str: + cleaned = "".join(c for c in str(value) if c.isalnum() or c in _SAFE_CHARS) + return cleaned or fallback + + +def get_repo_root() -> Path: + """Return repository root as an absolute canonical path.""" + return Path(__file__).resolve().parents[2] + + +def get_workspace_root() -> Path: + """Return absolute canonical workspace root.""" + return (get_repo_root() / "workspace").resolve() + + +def get_user_workspace(user_id: str) -> Path: + """Return absolute canonical workspace path for a given user.""" + safe_user_id = sanitize_user_id(user_id) or "anonymous" + return (get_workspace_root() / f"workspace_{safe_user_id}").resolve() + + +def resolve_user_media_path( + user_id: str, + module_name: str, + media_type: str, + *subpaths: Iterable[str], + create: bool = False, +) -> Path: + """ + Resolve a canonical absolute media path under + workspace/workspace_/media///... + """ + safe_module = _sanitize_segment(module_name, "module") + safe_media_type = _sanitize_segment(media_type, "files") + + base = get_user_workspace(user_id) / "media" / safe_module / safe_media_type + path = (base.joinpath(*map(str, subpaths))).resolve() + + # Prevent escaping outside user workspace through crafted segments. + workspace_root = get_user_workspace(user_id) + if workspace_root not in path.parents and path != workspace_root: + raise ValueError(f"Resolved path escapes workspace: {path}") + + if create: + path.mkdir(parents=True, exist_ok=True) + + return path + + +def get_legacy_video_studio_upload_dirs() -> list[Path]: + """Known historical global upload directories for Video Studio avatar files.""" + repo_root = get_repo_root() + return [ + (repo_root / "backend" / "data" / "video_studio" / "uploads").resolve(), + (repo_root / "backend" / "backend" / "data" / "video_studio" / "uploads").resolve(), + ]