Add tenant-aware video studio storage path resolver

This commit is contained in:
ي
2026-03-12 14:58:27 +05:30
parent bc49329ed6
commit 29c268dda8
3 changed files with 222 additions and 13 deletions

View File

@@ -1,8 +1,6 @@
from fastapi import APIRouter, UploadFile, File, Form, BackgroundTasks, HTTPException, Depends
from fastapi.responses import FileResponse
from typing import Optional, Dict, Any
import shutil
import os
from pathlib import Path
from services.wavespeed.infinitetalk import animate_scene_with_voiceover
from ..task_manager import task_manager
@@ -11,13 +9,10 @@ from loguru import logger
from services.database import get_engine_for_user
from sqlalchemy.orm import sessionmaker
from utils.asset_tracker import save_asset_to_library
from utils.storage_paths import resolve_user_media_path, get_workspace_root, get_legacy_video_studio_upload_dirs
router = APIRouter()
# Define storage directory
UPLOAD_DIR = Path("backend/data/video_studio/uploads")
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
def _process_avatar_generation(task_id: str, image_path: Path, audio_path: Path, user_id: str, resolution: str, model: str):
"""
Background task to process avatar generation using shared InfiniteTalk service.
@@ -51,7 +46,8 @@ def _process_avatar_generation(task_id: str, image_path: Path, audio_path: Path,
# Save the resulting video bytes to a file
video_filename = f"video_{task_id}.mp4"
video_path = UPLOAD_DIR / video_filename
output_dir = resolve_user_media_path(user_id, "video_studio", "videos", create=True)
video_path = output_dir / video_filename
with open(video_path, "wb") as f:
f.write(result["video_bytes"])
@@ -127,8 +123,9 @@ async def create_avatar_video(
# Generate temp paths
image_ext = Path(image.filename).suffix or ".png"
audio_ext = Path(audio.filename).suffix or ".mp3"
image_path = UPLOAD_DIR / f"img_{task_id}{image_ext}"
audio_path = UPLOAD_DIR / f"aud_{task_id}{audio_ext}"
upload_dir = resolve_user_media_path(user_id, "video_studio", "uploads", create=True)
image_path = upload_dir / f"img_{task_id}{image_ext}"
audio_path = upload_dir / f"aud_{task_id}{audio_ext}"
try:
# Save uploaded files
@@ -167,7 +164,23 @@ async def get_task_status(task_id: str, current_user: dict = Depends(get_current
@router.get("/download/{filename}")
async def download_video(filename: str):
file_path = UPLOAD_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(file_path)
candidate_paths = []
workspace_root = get_workspace_root()
candidate_paths.extend([
workspace_root / f"workspace_*" / "media" / "video_studio" / "videos" / filename,
workspace_root / f"workspace_*" / "media" / "video_studio" / "uploads" / filename,
])
for legacy_dir in get_legacy_video_studio_upload_dirs():
candidate_paths.append(legacy_dir / filename)
for candidate in candidate_paths:
if "*" in str(candidate):
for matched in workspace_root.glob(str(candidate.relative_to(workspace_root))):
if matched.exists() and matched.is_file():
return FileResponse(matched)
elif candidate.exists() and candidate.is_file():
return FileResponse(candidate)
raise HTTPException(status_code=404, detail="File not found")

View File

@@ -0,0 +1,127 @@
#!/usr/bin/env python3
"""Migrate legacy Video Studio upload files into tenant workspace media folders."""
from __future__ import annotations
import argparse
import json
import re
import shutil
import sqlite3
from pathlib import Path
from services.database import WORKSPACE_DIR
from utils.storage_paths import resolve_user_media_path, get_legacy_video_studio_upload_dirs
TASK_FILE_PATTERN = re.compile(r"^(img|aud|video)_([0-9a-fA-F-]{36})")
def _read_task_mappings() -> dict[str, str]:
"""Build task_id -> user_id mapping from available per-user databases."""
mapping: dict[str, str] = {}
workspace_root = Path(WORKSPACE_DIR)
if not workspace_root.exists():
return mapping
for workspace_dir in workspace_root.glob("workspace_*"):
safe_workspace_id = workspace_dir.name.removeprefix("workspace_")
db_candidates = [
workspace_dir / "db" / f"alwrity_{safe_workspace_id}.db",
workspace_dir / "db" / "alwrity.db",
]
db_path = next((p for p in db_candidates if p.exists()), None)
if not db_path:
continue
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='video_generation_tasks'"
)
if not cursor.fetchone():
continue
cursor.execute("SELECT task_id, user_id, result FROM video_generation_tasks")
for task_id, db_user_id, raw_result in cursor.fetchall():
if not task_id:
continue
resolved_user = db_user_id or safe_workspace_id
mapping[str(task_id)] = str(resolved_user)
if raw_result:
try:
parsed = json.loads(raw_result) if isinstance(raw_result, str) else raw_result
if isinstance(parsed, dict):
video_url = parsed.get("video_url", "")
if video_url:
filename = Path(video_url).name
match = TASK_FILE_PATTERN.match(filename)
if match:
mapping.setdefault(match.group(2), str(resolved_user))
except Exception:
pass
finally:
if conn:
conn.close()
return mapping
def migrate_legacy_uploads(apply: bool = False) -> tuple[int, int]:
task_to_user = _read_task_mappings()
moved = 0
skipped = 0
for legacy_dir in get_legacy_video_studio_upload_dirs():
if not legacy_dir.exists():
continue
for file_path in legacy_dir.iterdir():
if not file_path.is_file():
continue
match = TASK_FILE_PATTERN.match(file_path.name)
if not match:
skipped += 1
continue
prefix, task_id = match.groups()
user_id = task_to_user.get(task_id)
if not user_id:
skipped += 1
continue
media_type = "videos" if prefix == "video" else "uploads"
target_dir = resolve_user_media_path(user_id, "video_studio", media_type, create=True)
target_path = target_dir / file_path.name
if target_path.exists():
skipped += 1
continue
print(f"{'[DRY RUN] Would move' if not apply else 'Moving'} {file_path} -> {target_path}")
if apply:
target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(file_path), str(target_path))
moved += 1
return moved, skipped
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--apply", action="store_true", help="Apply changes (default is dry-run)")
args = parser.parse_args()
moved, skipped = migrate_legacy_uploads(apply=args.apply)
mode = "APPLY" if args.apply else "DRY-RUN"
print(f"[{mode}] Completed. moved={moved}, skipped={skipped}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,69 @@
from __future__ import annotations
from pathlib import Path
from typing import Iterable
_SAFE_CHARS = {"-", "_"}
def sanitize_user_id(user_id: str) -> str:
"""Return filesystem-safe user id used in workspace folder names."""
return "".join(c for c in str(user_id) if c.isalnum() or c in _SAFE_CHARS)
def _sanitize_segment(value: str, fallback: str) -> str:
cleaned = "".join(c for c in str(value) if c.isalnum() or c in _SAFE_CHARS)
return cleaned or fallback
def get_repo_root() -> Path:
"""Return repository root as an absolute canonical path."""
return Path(__file__).resolve().parents[2]
def get_workspace_root() -> Path:
"""Return absolute canonical workspace root."""
return (get_repo_root() / "workspace").resolve()
def get_user_workspace(user_id: str) -> Path:
"""Return absolute canonical workspace path for a given user."""
safe_user_id = sanitize_user_id(user_id) or "anonymous"
return (get_workspace_root() / f"workspace_{safe_user_id}").resolve()
def resolve_user_media_path(
user_id: str,
module_name: str,
media_type: str,
*subpaths: Iterable[str],
create: bool = False,
) -> Path:
"""
Resolve a canonical absolute media path under
workspace/workspace_<safe_user_id>/media/<module>/<media_type>/...
"""
safe_module = _sanitize_segment(module_name, "module")
safe_media_type = _sanitize_segment(media_type, "files")
base = get_user_workspace(user_id) / "media" / safe_module / safe_media_type
path = (base.joinpath(*map(str, subpaths))).resolve()
# Prevent escaping outside user workspace through crafted segments.
workspace_root = get_user_workspace(user_id)
if workspace_root not in path.parents and path != workspace_root:
raise ValueError(f"Resolved path escapes workspace: {path}")
if create:
path.mkdir(parents=True, exist_ok=True)
return path
def get_legacy_video_studio_upload_dirs() -> list[Path]:
"""Known historical global upload directories for Video Studio avatar files."""
repo_root = get_repo_root()
return [
(repo_root / "backend" / "data" / "video_studio" / "uploads").resolve(),
(repo_root / "backend" / "backend" / "data" / "video_studio" / "uploads").resolve(),
]