fix: centralize ROOT_DIR resolution, fix workspace path on Render.com, cleanup legacy paths

- Upgrade utils/storage_paths.py with robust find_repo_root() (env var override + validation + fallback)
- Remove broken _find_root() from podcast/constants.py, import from storage_paths instead
- Fix ROOT_DIR resolving to backend/ instead of project root (caused avatar upload 500s on Render.com)
- Fix video_combination_service.py default output dir (was writing to data/media instead of workspace)
- Add deprecation comments to global data/media constants in media_utils.py
- Pass user_id through resolve_media_path for tenant-scoped podcast resolution
- Add ALWRITY_ROOT_DIR env var support for explicit production overrides
- Log warning when get_podcast_media_dir called without user_id
- Use OperationButton with cost display for scene action buttons
This commit is contained in:
ajaysi
2026-04-22 06:28:45 +05:30
parent 6e9c11744c
commit c5d625945f
5 changed files with 173 additions and 91 deletions

View File

@@ -2,31 +2,17 @@
Podcast API Constants
Centralized constants and directory configuration for podcast module.
All workspace paths use utils.storage_paths for root resolution.
"""
import os
from pathlib import Path
from typing import Literal
from loguru import logger
from services.story_writer.audio_generation_service import StoryAudioGenerationService
from utils.storage_paths import get_repo_root, sanitize_user_id as _sanitize_user_id
# Directory paths
# Find root by looking for 'data' or 'backend' folder
def _find_root() -> Path:
"""Find project root by searching up for data directory."""
current = Path(__file__).resolve()
for _ in range(10): # max 10 levels up
if (current / "data").exists() and (current / "data" / "media").exists():
return current
if (current / "backend").exists():
return current / "backend"
parent = current.parent
if parent == current:
break
current = parent
# Fallback: assume backend is root
return Path(__file__).resolve().parents[1]
ROOT_DIR = _find_root()
ROOT_DIR = get_repo_root()
# Video subdirectory (relative to workspace media dir)
AI_VIDEO_SUBDIR = Path("AI_Videos")
@@ -38,10 +24,6 @@ PODCAST_AVATARS_SUBDIR = Path("avatars")
MediaType = Literal["audio", "image", "video", "chart"]
def _sanitize_user_id(user_id: str) -> str:
return "".join(c for c in user_id if c.isalnum() or c in ("-", "_"))
def get_podcast_media_dir(
media_type: MediaType,
user_id: str | None = None,
@@ -50,9 +32,10 @@ def get_podcast_media_dir(
) -> Path:
"""
Resolve podcast media directory (workspace-only for multi-tenant isolation).
Always requires user_id for tenant isolation. Falls back to default workspace
Requires user_id for tenant isolation. Falls back to default workspace
only if no user_id provided (for backward compat in development).
Logs a warning in production when user_id is missing.
"""
media_subdir = {
"audio": "podcast_audio",
@@ -67,13 +50,11 @@ def get_podcast_media_dir(
ROOT_DIR / "workspace" / f"workspace_{sanitized}" / "media" / media_subdir
).resolve()
else:
# Development fallback: use a default workspace
logger.warning(f"[Podcast] get_podcast_media_dir called without user_id for {media_type} — using default workspace. This should not happen in production.")
resolved_dir = (
ROOT_DIR / "workspace" / "workspace_alwrity" / "media" / media_subdir
).resolve()
logger.warning(f"[Podcast] get_podcast_media_dir: type={media_type}, user_id={user_id}, resolved={resolved_dir}")
if ensure_exists:
resolved_dir.mkdir(parents=True, exist_ok=True)

View File

@@ -17,20 +17,26 @@ from loguru import logger
class PodcastVideoCombinationService:
"""Service for combining podcast scene videos into final episodes."""
def __init__(self, output_dir: Optional[str] = None):
def __init__(self, output_dir: Optional[str] = None, user_id: Optional[str] = None):
"""
Initialize the podcast video combination service.
Parameters:
output_dir (str, optional): Directory to save combined videos.
Defaults to 'backend/podcast_videos/Final_Videos' if not provided.
user_id (str, optional): User ID for workspace-scoped output.
Either output_dir or user_id must be provided for workspace isolation.
"""
if output_dir:
self.output_dir = Path(output_dir)
elif user_id:
from api.podcast.constants import get_podcast_media_dir
self.output_dir = get_podcast_media_dir("video", user_id, ensure_exists=True) / "Final_Videos"
else:
# Default to root/data/media/podcast_videos/Final_Videos directory
base_dir = Path(__file__).resolve().parents[3]
self.output_dir = base_dir / "data" / "media" / "podcast_videos" / "Final_Videos"
from utils.storage_paths import get_user_workspace, sanitize_user_id
logger.warning("[PodcastVideoCombination] No output_dir or user_id provided — using default workspace. This should not happen in production.")
default_user = sanitize_user_id("alwrity")
self.output_dir = get_user_workspace(default_user) / "media" / "podcast_videos" / "Final_Videos"
self.output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"[PodcastVideoCombination] Initialized with output directory: {self.output_dir}")

View File

@@ -3,6 +3,10 @@ Media Utility Functions
Centralized helper functions for loading and managing media assets across modules.
Promotes reuse between Podcast, YouTube, and other media-heavy modules.
DEPRECATED: The global DATA_MEDIA_DIR paths below are legacy and will be removed.
New code should use workspace-scoped paths via utils.storage_paths or module-specific
resolvers (e.g., api.podcast.constants.get_podcast_media_dir).
"""
import logging
@@ -12,16 +16,19 @@ from typing import Optional, List
from urllib.parse import urlparse
from services.database import WORKSPACE_DIR
from utils.storage_paths import get_repo_root
# Configure logging
logger = logging.getLogger(__name__)
# Base Directories
# backend/utils/media_utils.py -> parents[2] = backend/.. = root
ROOT_DIR = Path(__file__).resolve().parents[2]
# Base Directories — use get_repo_root() for consistent resolution
ROOT_DIR = get_repo_root()
# DEPRECATED: Global data/media paths — kept for backward-compat read fallback only.
# New writes must go to workspace-scoped paths. Do NOT add new consumers.
DATA_MEDIA_DIR = ROOT_DIR / "data" / "media"
# Module-specific directories
# Module-specific directories (DEPRECATED — use workspace-scoped resolvers instead)
YOUTUBE_AVATARS_DIR = DATA_MEDIA_DIR / "youtube_avatars"
YOUTUBE_IMAGES_DIR = DATA_MEDIA_DIR / "youtube_images"
PODCAST_IMAGES_DIR = DATA_MEDIA_DIR / "podcast_images"
@@ -33,7 +40,7 @@ def ensure_media_dirs() -> None:
directory.mkdir(parents=True, exist_ok=True)
def resolve_media_path(media_url_or_path: str) -> Optional[Path]:
def resolve_media_path(media_url_or_path: str, user_id: Optional[str] = None) -> Optional[Path]:
"""
Resolve a media URL or filename to a concrete file path on disk.
@@ -41,6 +48,7 @@ def resolve_media_path(media_url_or_path: str) -> Optional[Path]:
Args:
media_url_or_path: URL path (e.g. /api/youtube/avatars/foo.png) or filename
user_id: Optional user ID for tenant-scoped resolution (recommended)
Returns:
Path object if found, None otherwise
@@ -70,9 +78,9 @@ def resolve_media_path(media_url_or_path: str) -> Optional[Path]:
parsed_path = urlparse(media_url_or_path).path
parts = parsed_path.split("/")
if len(parts) >= 6:
user_id = parts[3]
safe_user_id = "".join(c for c in user_id if c.isalnum() or c in ("-", "_"))
if safe_user_id == user_id:
asset_user_id = parts[3]
safe_user_id = "".join(c for c in asset_user_id if c.isalnum() or c in ("-", "_"))
if safe_user_id == asset_user_id:
safe_filename = os.path.basename(filename)
assets_path = Path(WORKSPACE_DIR) / f"workspace_{safe_user_id}" / "assets" / "avatars" / safe_filename
if assets_path.exists() and assets_path.is_file():
@@ -82,7 +90,7 @@ def resolve_media_path(media_url_or_path: str) -> Optional[Path]:
logger.error(f"[MediaUtils] Error resolving assets avatar path: {exc}")
# Define search paths in order of likelihood
# We search all avatar/image directories
# We search all avatar/image directories (DEPRECATED: global paths — kept for backward-compat reads)
search_paths: List[Path] = [
YOUTUBE_AVATARS_DIR / filename,
PODCAST_AVATARS_DIR / filename,
@@ -101,7 +109,7 @@ def resolve_media_path(media_url_or_path: str) -> Optional[Path]:
try:
# Import the centralized function that checks tenant workspace first
from api.podcast.constants import get_podcast_media_read_dirs
podcast_dirs = get_podcast_media_read_dirs("image")
podcast_dirs = get_podcast_media_read_dirs("image", user_id=user_id)
search_paths = []
for pod_dir in podcast_dirs:
# Add both avatar and image subdirectories

View File

@@ -1,8 +1,11 @@
from __future__ import annotations
import os
from pathlib import Path
from typing import Iterable
from loguru import logger
_SAFE_CHARS = {"-", "_"}
@@ -16,9 +19,46 @@ def _sanitize_segment(value: str, fallback: str) -> str:
return cleaned or fallback
def find_repo_root() -> Path:
"""Find the project repository root directory.
Resolution order:
1. ALWRITY_ROOT_DIR environment variable (explicit override for production)
2. Deterministic path from this file (storage_paths.py is at utils/)
3. Walk-up fallback looking for a 'backend/' directory at project root
Returns an absolute, resolved Path.
"""
env_root = os.environ.get("ALWRITY_ROOT_DIR")
if env_root:
root = Path(env_root).resolve()
if root.is_dir():
return root
# storage_paths.py is at backend/utils/storage_paths.py
# project root is parents[2] (utils -> backend -> root)
this_file = Path(__file__).resolve()
candidate = this_file.parents[2]
if (candidate / "backend").is_dir():
return candidate
# Walk-up fallback for unusual deployments
current = this_file.parent
for _ in range(10):
if (current / "backend").is_dir():
return current
parent = current.parent
if parent == current:
break
current = parent
return this_file.parents[2]
def get_repo_root() -> Path:
"""Return repository root as an absolute canonical path."""
return Path(__file__).resolve().parents[2]
return find_repo_root()
def get_workspace_root() -> Path:
@@ -67,3 +107,7 @@ def get_legacy_video_studio_upload_dirs() -> list[Path]:
(repo_root / "backend" / "data" / "video_studio" / "uploads").resolve(),
(repo_root / "backend" / "backend" / "data" / "video_studio" / "uploads").resolve(),
]
# Log resolved root at import time for production debugging
logger.info(f"[StoragePaths] Repository root resolved to: {get_repo_root()}")