From f503a24b3bed996385f7d4f4d45821e52b9b98e4 Mon Sep 17 00:00:00 2001 From: ajaysi Date: Tue, 24 Mar 2026 15:45:51 +0530 Subject: [PATCH] feat: Add Auto-Dubbing feature for Podcast Maker This commit adds the Auto-Dubbing feature for Podcast Maker with support for translating podcast audio to different languages with optional voice cloning to preserve the original speaker's voice. New Features: - Translation Service (common module): DeepL integration for low-cost translation, WaveSpeed integration for high-quality translation - Audio Dubbing Service: STT -> Translate -> TTS pipeline with voice cloning support - 9 new API endpoints for dubbing and voice cloning - Support for 34+ languages - Cost estimation utilities - Comprehensive documentation Files Added: - services/translation/ (5 files): Translation service module - services/dubbing/: Audio dubbing service - api/podcast/handlers/dubbing.py: API endpoints - docs/AUTO_DUBBING.md: Feature documentation - CHANGELOG.md: Change log Files Modified: - api/podcast/models.py: Added dubbing request/response models - api/podcast/router.py: Added dubbing routes - services/__init__.py: Export translation and dubbing services - scene_animation.py: Fixed missing Path import --- backend/CHANGELOG.md | 51 ++ backend/api/podcast/handlers/dubbing.py | 493 +++++++++++++++ backend/api/podcast/models.py | 97 +++ backend/api/podcast/router.py | 3 +- .../story_writer/routes/scene_animation.py | 1 + backend/docs/AUTO_DUBBING.md | 306 ++++++++++ backend/services/__init__.py | 35 +- backend/services/dubbing/__init__.py | 559 ++++++++++++++++++ backend/services/translation/__init__.py | 79 +++ .../services/translation/base_translation.py | 210 +++++++ .../services/translation/deepl_translator.py | 307 ++++++++++ .../translation/translation_factory.py | 172 ++++++ .../translation/wavespeed_translator.py | 138 +++++ 13 files changed, 2448 insertions(+), 3 deletions(-) create mode 100644 backend/CHANGELOG.md create mode 100644 backend/api/podcast/handlers/dubbing.py create mode 100644 backend/docs/AUTO_DUBBING.md create mode 100644 backend/services/dubbing/__init__.py create mode 100644 backend/services/translation/__init__.py create mode 100644 backend/services/translation/base_translation.py create mode 100644 backend/services/translation/deepl_translator.py create mode 100644 backend/services/translation/translation_factory.py create mode 100644 backend/services/translation/wavespeed_translator.py diff --git a/backend/CHANGELOG.md b/backend/CHANGELOG.md new file mode 100644 index 00000000..f1646e19 --- /dev/null +++ b/backend/CHANGELOG.md @@ -0,0 +1,51 @@ +# Changelog + +All notable changes to the ALwrity project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). + +## [Unreleased] + +### Added + +#### Auto-Dubbing Feature (Podcast Maker) +- **Translation Service** (`backend/services/translation/`) + - Common translation module for use across the entire application + - DeepL integration for low-cost, high-quality text translation (500k chars/month free) + - WaveSpeed integration for high-quality video/audio translation + - Support for 34+ languages + - Batch translation support + - Factory pattern for provider selection + - Cost estimation utilities + +- **Audio Dubbing Service** (`backend/services/dubbing/`) + - Audio dubbing with STT → Translate → TTS pipeline + - Voice cloning support to preserve original speaker's voice + - Low-quality (DeepL) and high-quality (WaveSpeed) modes + - Batch dubbing support + - Cost estimation + +- **Podcast API Endpoints** (`backend/api/podcast/`) + - `POST /api/podcast/dub/audio` - Create audio dubbing task + - `GET /api/podcast/dub/{task_id}/result` - Get dubbing result + - `POST /api/podcast/dub/voices/clone` - Clone voice from audio sample + - `GET /api/podcast/dub/voices/{task_id}/result` - Get voice clone result + - `POST /api/podcast/dub/estimate` - Estimate dubbing cost + - `GET /api/podcast/dub/languages` - List supported languages + - `GET /api/podcast/dub/voices` - List available TTS voices + +- **Bug Fixes** + - Fixed missing `Path` import in `scene_animation.py` + +### Changed + +- Updated `backend/services/__init__.py` to export translation and dubbing services +- Updated `.env` with DeepL API key placeholder + +### Documentation + +- Added `backend/docs/AUTO_DUBBING.md` with comprehensive feature documentation + +## [Previous Releases] + +See git history for previous changelog entries. diff --git a/backend/api/podcast/handlers/dubbing.py b/backend/api/podcast/handlers/dubbing.py new file mode 100644 index 00000000..224de8ed --- /dev/null +++ b/backend/api/podcast/handlers/dubbing.py @@ -0,0 +1,493 @@ +""" +Podcast Dubbing Handlers + +Audio dubbing endpoints for translating podcast audio to different languages. +Supports both low-quality (DeepL) and high-quality (WaveSpeed) dubbing with voice cloning. +""" + +from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks +from fastapi.responses import FileResponse +from sqlalchemy.orm import Session +from typing import Dict, Any, Optional +from pathlib import Path +from concurrent.futures import ThreadPoolExecutor + +from services.database import get_db +from middleware.auth_middleware import get_current_user +from api.story_writer.utils.auth import require_authenticated_user +from api.story_writer.task_manager import task_manager +from loguru import logger + +from ..models import ( + PodcastAudioDubRequest, + PodcastAudioDubResponse, + PodcastAudioDubResult, + PodcastAudioDubEstimateRequest, + PodcastAudioDubEstimateResponse, + VoiceCloneRequest, + VoiceCloneResponse, + VoiceCloneResult, +) +from services.dubbing import AudioDubbingService + +router = APIRouter() + +_dubbing_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="podcast_dubbing") + +DUBBED_AUDIO_DIR = Path(__file__).resolve().parents[3] / "data" / "media" / "dubbed_audio" + + +def _ensure_dubbed_audio_dir(): + DUBBED_AUDIO_DIR.mkdir(parents=True, exist_ok=True) + + +def _execute_dubbing_task( + task_id: str, + source_audio_url: str, + source_language: Optional[str], + target_language: str, + quality: str, + voice_id: str, + speed: float, + emotion: str, + use_voice_clone: bool, + custom_voice_id: Optional[str], + voice_clone_accuracy: float, + user_id: str, +): + """Background task to dub audio.""" + try: + task_manager.update_task_status( + task_id, "processing", progress=5.0, + message="Starting audio dubbing..." + ) + + _ensure_dubbed_audio_dir() + + service = AudioDubbingService(output_dir=DUBBED_AUDIO_DIR) + + def progress_callback(progress: float, message: str): + task_manager.update_task_status( + task_id, "processing", progress=progress, + message=message + ) + + logger.info(f"[Dubbing] Task {task_id}: Starting dubbing with voice_clone={use_voice_clone}") + + result = service.dub_audio( + source_audio=source_audio_url, + target_language=target_language, + source_language=source_language, + voice_id=voice_id, + speed=speed, + emotion=emotion, + quality=quality, + use_voice_clone=use_voice_clone, + custom_voice_id=custom_voice_id, + accuracy=voice_clone_accuracy, + user_id=user_id, + progress_callback=progress_callback, + ) + + task_manager.update_task_status( + task_id, "completed", progress=100.0, + result={ + "dubbed_audio_url": result.dubbed_audio_url, + "dubbed_audio_filename": Path(result.dubbed_audio_path).name, + "original_transcript": result.original_transcript, + "translated_transcript": result.translated_transcript, + "source_language": result.source_language, + "target_language": result.target_language, + "voice_id": result.voice_id, + "quality": result.quality, + "duration_seconds": result.duration_seconds, + "file_size": result.file_size, + "cost": result.cost, + "status": "completed", + "voice_clone_used": result.voice_clone_used, + "cloned_voice_id": result.cloned_voice_id, + }, + message="Audio dubbing completed!" + ) + + logger.info(f"[Dubbing] Task {task_id} completed successfully (voice_clone_used={result.voice_clone_used})") + + except Exception as e: + logger.error(f"[Dubbing] Task {task_id} failed: {str(e)}") + task_manager.update_task_status( + task_id, "failed", + error=str(e), + message=f"Dubbing failed: {str(e)}" + ) + + +def _execute_voice_clone_task( + task_id: str, + source_audio_url: str, + custom_voice_id: Optional[str], + accuracy: float, + language_boost: Optional[str], + user_id: str, +): + """Background task to clone voice from audio.""" + try: + task_manager.update_task_status( + task_id, "processing", progress=10.0, + message="Starting voice cloning..." + ) + + _ensure_dubbed_audio_dir() + + service = AudioDubbingService(output_dir=DUBBED_AUDIO_DIR) + + task_manager.update_task_status( + task_id, "processing", progress=30.0, + message="Processing audio..." + ) + + voice_info = service.clone_voice_from_audio( + source_audio=source_audio_url, + custom_voice_id=custom_voice_id, + accuracy=accuracy, + language_boost=language_boost, + user_id=user_id, + ) + + task_manager.update_task_status( + task_id, "completed", progress=100.0, + result={ + "voice_id": voice_info.voice_id, + "voice_url": voice_info.voice_url, + "source_language": voice_info.source_language, + "accuracy": voice_info.accuracy, + "file_size": voice_info.file_size, + "status": "completed", + }, + message="Voice cloning completed!" + ) + + logger.info(f"[VoiceClone] Task {task_id} completed: {voice_info.voice_id}") + + except Exception as e: + logger.error(f"[VoiceClone] Task {task_id} failed: {str(e)}") + task_manager.update_task_status( + task_id, "failed", + error=str(e), + message=f"Voice cloning failed: {str(e)}" + ) + + +@router.post("/dub/audio", response_model=PodcastAudioDubResponse) +async def create_audio_dubbing_task( + request: PodcastAudioDubRequest, + background_tasks: BackgroundTasks, + current_user: Dict[str, Any] = Depends(get_current_user), + db: Session = Depends(get_db), +): + """ + Create an audio dubbing task. + + Translates podcast audio to a target language using STT → Translate → TTS pipeline. + + For high-quality dubbing with voice preservation, set use_voice_clone=True. + + - **source_audio_url**: URL or path to source audio file + - **target_language**: Target language code (e.g., 'es', 'Spanish') + - **source_language**: Source language (auto-detected if not provided) + - **quality**: 'low' (DeepL, cheaper) or 'high' (WaveSpeed, better quality) + - **voice_id**: Voice ID for TTS (default: 'Wise_Woman') + - **speed**: Speech speed 0.5-2.0 (default: 1.0) + - **use_voice_clone**: Use voice cloning to preserve original speaker's voice + - **custom_voice_id**: Custom name for the cloned voice + - **voice_clone_accuracy**: Voice cloning accuracy 0.1-1.0 (default: 0.7) + """ + user_id = require_authenticated_user(current_user) + + task_id = task_manager.create_task("audio_dubbing") + + background_tasks.add_task( + _execute_dubbing_task, + task_id=task_id, + source_audio_url=request.source_audio_url, + source_language=request.source_language, + target_language=request.target_language, + quality=request.quality, + voice_id=request.voice_id or "Wise_Woman", + speed=request.speed or 1.0, + emotion=request.emotion or "happy", + use_voice_clone=request.use_voice_clone or False, + custom_voice_id=request.custom_voice_id, + voice_clone_accuracy=request.voice_clone_accuracy or 0.7, + user_id=user_id, + ) + + logger.info(f"[Dubbing] Created task {task_id} for user {user_id} (voice_clone={request.use_voice_clone})") + + return PodcastAudioDubResponse( + task_id=task_id, + status="pending", + message="Audio dubbing task created" + ) + + +@router.get("/dub/{task_id}/result", response_model=PodcastAudioDubResult) +async def get_dubbing_result( + task_id: str, + current_user: Dict[str, Any] = Depends(get_current_user), +): + """ + Get the result of a completed dubbing task. + """ + user_id = require_authenticated_user(current_user) + + task_status = task_manager.get_task_status(task_id) + + if not task_status: + raise HTTPException(status_code=404, detail="Task not found") + + if task_status.get("status") == "failed": + raise HTTPException( + status_code=500, + detail=task_status.get("error", "Dubbing failed") + ) + + if task_status.get("status") != "completed": + return PodcastAudioDubResult( + task_id=task_id, + status=task_status.get("status", "pending"), + dubbed_audio_url="", + dubbed_audio_filename="", + original_transcript="", + translated_transcript="", + source_language="", + target_language="", + voice_id="", + quality="", + duration_seconds=0, + file_size=0, + cost=0.0, + voice_clone_used=False, + cloned_voice_id=None, + ) + + result_data = task_status.get("result", {}) + + return PodcastAudioDubResult( + task_id=task_id, + status="completed", + dubbed_audio_url=result_data.get("dubbed_audio_url", ""), + dubbed_audio_filename=result_data.get("dubbed_audio_filename", ""), + original_transcript=result_data.get("original_transcript", ""), + translated_transcript=result_data.get("translated_transcript", ""), + source_language=result_data.get("source_language", ""), + target_language=result_data.get("target_language", ""), + voice_id=result_data.get("voice_id", ""), + quality=result_data.get("quality", ""), + duration_seconds=result_data.get("duration_seconds", 0), + file_size=result_data.get("file_size", 0), + cost=result_data.get("cost", 0.0), + voice_clone_used=result_data.get("voice_clone_used", False), + cloned_voice_id=result_data.get("cloned_voice_id"), + ) + + +@router.get("/dub/audio/{filename}") +async def serve_dubbed_audio( + filename: str, + current_user: Dict[str, Any] = Depends(get_current_user), +): + """ + Serve a dubbed audio file. + """ + user_id = require_authenticated_user(current_user) + + _ensure_dubbed_audio_dir() + + audio_path = DUBBED_AUDIO_DIR / filename + + if not audio_path.exists(): + raise HTTPException(status_code=404, detail="Audio file not found") + + return FileResponse( + path=audio_path, + media_type="audio/mpeg", + filename=filename, + ) + + +@router.post("/dub/estimate", response_model=PodcastAudioDubEstimateResponse) +async def estimate_dubbing_cost( + request: PodcastAudioDubEstimateRequest, + current_user: Dict[str, Any] = Depends(get_current_user), +): + """ + Estimate the cost for audio dubbing. + + Set use_voice_clone=True to include voice cloning cost ($0.05). + """ + user_id = require_authenticated_user(current_user) + + service = AudioDubbingService(output_dir=DUBBED_AUDIO_DIR) + + cost_estimate = service.estimate_cost( + audio_duration_seconds=request.audio_duration_seconds, + target_language=request.target_language, + quality=request.quality, + use_voice_clone=request.use_voice_clone or False, + ) + + return PodcastAudioDubEstimateResponse(**cost_estimate) + + +@router.get("/dub/languages") +async def get_supported_dubbing_languages( + current_user: Dict[str, Any] = Depends(get_current_user), +): + """ + Get list of supported languages for dubbing. + """ + from services.translation import list_supported_languages + + languages = list_supported_languages() + + return { + "languages": [ + {"code": code, "name": name} + for code, name in sorted(languages.items(), key=lambda x: x[1]) + ], + "count": len(languages), + } + + +@router.get("/dub/voices") +async def get_available_voices( + current_user: Dict[str, Any] = Depends(get_current_user), +): + """ + Get list of available TTS voices for dubbing. + """ + return { + "voices": [ + {"id": "Wise_Woman", "name": "Wise Woman", "gender": "female"}, + {"id": "Warm_Woman", "name": "Warm Woman", "gender": "female"}, + {"id": "Young_Woman", "name": "Young Woman", "gender": "female"}, + {"id": "Mature_Woman", "name": "Mature Woman", "gender": "female"}, + {"id": "Gentle_Woman", "name": "Gentle Woman", "gender": "female"}, + {"id": "Confident_Man", "name": "Confident Man", "gender": "male"}, + {"id": "Warm_Man", "name": "Warm Man", "gender": "male"}, + {"id": "Young_Man", "name": "Young Man", "gender": "male"}, + {"id": "Mature_Man", "name": "Mature Man", "gender": "male"}, + {"id": "Default", "name": "Default", "gender": "neutral"}, + ], + "count": 10, + "note": "Voice cloning creates custom voices from audio samples. Use /dub/voices/clone to create one." + } + + +@router.post("/dub/voices/clone", response_model=VoiceCloneResponse) +async def create_voice_clone_task( + request: VoiceCloneRequest, + background_tasks: BackgroundTasks, + current_user: Dict[str, Any] = Depends(get_current_user), + db: Session = Depends(get_db), +): + """ + Clone a voice from an audio sample. + + Creates a custom voice that can be used for dubbing with preserved speaker identity. + + - **source_audio_url**: URL or path to source audio (10-60 seconds recommended) + - **custom_voice_id**: Custom name for the cloned voice + - **accuracy**: Cloning accuracy 0.1-1.0 (higher = better quality but more processing) + - **language_boost**: Language to optimize the voice for + """ + user_id = require_authenticated_user(current_user) + + task_id = task_manager.create_task("voice_clone") + + background_tasks.add_task( + _execute_voice_clone_task, + task_id=task_id, + source_audio_url=request.source_audio_url, + custom_voice_id=request.custom_voice_id, + accuracy=request.accuracy or 0.7, + language_boost=request.language_boost, + user_id=user_id, + ) + + logger.info(f"[VoiceClone] Created task {task_id} for user {user_id}") + + return VoiceCloneResponse( + task_id=task_id, + status="pending", + message="Voice cloning task created" + ) + + +@router.get("/dub/voices/{task_id}/result", response_model=VoiceCloneResult) +async def get_voice_clone_result( + task_id: str, + current_user: Dict[str, Any] = Depends(get_current_user), +): + """ + Get the result of a completed voice cloning task. + """ + user_id = require_authenticated_user(current_user) + + task_status = task_manager.get_task_status(task_id) + + if not task_status: + raise HTTPException(status_code=404, detail="Task not found") + + if task_status.get("status") == "failed": + raise HTTPException( + status_code=500, + detail=task_status.get("error", "Voice cloning failed") + ) + + if task_status.get("status") != "completed": + return VoiceCloneResult( + task_id=task_id, + voice_id="", + voice_url="", + source_language="", + accuracy=0.0, + file_size=0, + status=task_status.get("status", "pending"), + ) + + result_data = task_status.get("result", {}) + + return VoiceCloneResult( + task_id=task_id, + voice_id=result_data.get("voice_id", ""), + voice_url=result_data.get("voice_url", ""), + source_language=result_data.get("source_language", ""), + accuracy=result_data.get("accuracy", 0.7), + file_size=result_data.get("file_size", 0), + status="completed", + ) + + +@router.get("/dub/voices/audio/{filename}") +async def serve_voice_audio( + filename: str, + current_user: Dict[str, Any] = Depends(get_current_user), +): + """ + Serve a voice sample audio file. + """ + user_id = require_authenticated_user(current_user) + + _ensure_dubbed_audio_dir() + + audio_path = DUBBED_AUDIO_DIR / filename + + if not audio_path.exists(): + raise HTTPException(status_code=404, detail="Voice audio file not found") + + return FileResponse( + path=audio_path, + media_type="audio/mpeg", + filename=filename, + ) diff --git a/backend/api/podcast/models.py b/backend/api/podcast/models.py index 890c7d5c..74d0f2be 100644 --- a/backend/api/podcast/models.py +++ b/backend/api/podcast/models.py @@ -7,6 +7,7 @@ All Pydantic request/response models for podcast endpoints. from pydantic import BaseModel, Field, model_validator from typing import List, Optional, Dict, Any from datetime import datetime +from enum import Enum class PodcastProjectResponse(BaseModel): @@ -320,3 +321,99 @@ class PodcastCombineVideosResponse(BaseModel): status: str message: str + +class AudioDubbingQuality(str, Enum): + LOW = "low" + HIGH = "high" + + @classmethod + def from_string(cls, value: str) -> "AudioDubbingQuality": + if value.lower() == "high": + return cls.HIGH + return cls.LOW + + +class PodcastAudioDubRequest(BaseModel): + """Request model for audio dubbing.""" + source_audio_url: str = Field(..., description="URL or path to source audio file") + source_language: Optional[str] = Field(None, description="Source language code (auto-detected if None)") + target_language: str = Field(..., description="Target language for dubbing") + quality: str = Field(default="low", description="Translation quality: low (DeepL) or high (WaveSpeed)") + voice_id: Optional[str] = Field(default="Wise_Woman", description="Voice ID for TTS") + speed: Optional[float] = Field(default=1.0, ge=0.5, le=2.0, description="Speech speed (0.5-2.0)") + emotion: Optional[str] = Field(default="happy", description="Emotion for TTS voice") + preserve_emotion: Optional[bool] = Field(default=True, description="Preserve emotional tone in translation") + use_voice_clone: Optional[bool] = Field(default=False, description="Use voice cloning to preserve original speaker's voice") + custom_voice_id: Optional[str] = Field(None, description="Custom name for the cloned voice") + voice_clone_accuracy: Optional[float] = Field(default=0.7, ge=0.1, le=1.0, description="Voice cloning accuracy (0.1-1.0)") + + +class PodcastAudioDubResponse(BaseModel): + """Response model for audio dubbing task creation.""" + task_id: str + status: str = "pending" + message: str = "Audio dubbing task created" + + +class PodcastAudioDubResult(BaseModel): + """Response model for completed audio dubbing.""" + dubbed_audio_url: str + dubbed_audio_filename: str + original_transcript: str + translated_transcript: str + source_language: str + target_language: str + voice_id: str + quality: str + duration_seconds: int + file_size: int + cost: float + task_id: str + status: str = "completed" + voice_clone_used: Optional[bool] = Field(default=False, description="Whether voice cloning was used") + cloned_voice_id: Optional[str] = Field(None, description="ID of the cloned voice if voice_clone_used=True") + + +class PodcastAudioDubEstimateRequest(BaseModel): + """Request model for dubbing cost estimation.""" + audio_duration_seconds: float = Field(..., description="Duration of source audio in seconds") + target_language: str = Field(..., description="Target language") + quality: str = Field(default="low", description="Translation quality") + use_voice_clone: Optional[bool] = Field(default=False, description="Include voice cloning cost") + + +class PodcastAudioDubEstimateResponse(BaseModel): + """Response model for dubbing cost estimation.""" + estimated_characters: int + translation_cost: float + tts_cost: float + voice_clone_cost: float = 0.0 + total_cost: float + currency: str = "USD" + + +class VoiceCloneRequest(BaseModel): + """Request model for voice cloning.""" + source_audio_url: str = Field(..., description="URL or path to source audio file (10-60 seconds recommended)") + custom_voice_id: Optional[str] = Field(None, description="Custom name for the cloned voice") + accuracy: Optional[float] = Field(default=0.7, ge=0.1, le=1.0, description="Cloning accuracy (0.1-1.0)") + language_boost: Optional[str] = Field(None, description="Language to optimize the voice for") + + +class VoiceCloneResponse(BaseModel): + """Response model for voice cloning.""" + task_id: str + status: str = "pending" + message: str = "Voice cloning task created" + + +class VoiceCloneResult(BaseModel): + """Response model for completed voice cloning.""" + voice_id: str + voice_url: str + source_language: str + accuracy: float + file_size: int + task_id: str + status: str = "completed" + diff --git a/backend/api/podcast/router.py b/backend/api/podcast/router.py index 2093b1db..8fd3cf65 100644 --- a/backend/api/podcast/router.py +++ b/backend/api/podcast/router.py @@ -12,7 +12,7 @@ from api.story_writer.utils.auth import require_authenticated_user from api.story_writer.task_manager import task_manager # Import all handler routers -from .handlers import projects, analysis, research, script, audio, images, video, avatar +from .handlers import projects, analysis, research, script, audio, images, video, avatar, dubbing # Create main router router = APIRouter(prefix="/api/podcast", tags=["Podcast Maker"]) @@ -26,6 +26,7 @@ router.include_router(audio.router) router.include_router(images.router) router.include_router(video.router) router.include_router(avatar.router) +router.include_router(dubbing.router) @router.get("/task/{task_id}/status") diff --git a/backend/api/story_writer/routes/scene_animation.py b/backend/api/story_writer/routes/scene_animation.py index ad51f433..37fccd15 100644 --- a/backend/api/story_writer/routes/scene_animation.py +++ b/backend/api/story_writer/routes/scene_animation.py @@ -5,6 +5,7 @@ Handles scene animation endpoints using WaveSpeed Kling and InfiniteTalk. """ import mimetypes +from pathlib import Path from typing import Any, Dict, Optional from urllib.parse import quote diff --git a/backend/docs/AUTO_DUBBING.md b/backend/docs/AUTO_DUBBING.md new file mode 100644 index 00000000..1d603483 --- /dev/null +++ b/backend/docs/AUTO_DUBBING.md @@ -0,0 +1,306 @@ +# Auto-Dubbing Feature Documentation + +## Overview + +Auto-Dubbing enables automatic translation of podcast audio to different languages with optional voice cloning to preserve the original speaker's voice. + +## Features + +- **Text Translation**: Translate audio transcripts using DeepL (low-cost) or WaveSpeed (high-quality) +- **Voice Cloning**: Preserve original speaker's voice in dubbed audio +- **Multiple Quality Tiers**: Choose between low-cost (DeepL) and high-quality (WaveSpeed) translation +- **Cost Estimation**: Preview costs before starting dubbing tasks +- **Progress Tracking**: Real-time progress updates for long-running tasks + +## Architecture + +``` +backend/services/ +├── translation/ # Common translation service +│ ├── __init__.py +│ ├── base_translation.py +│ ├── deepl_translator.py +│ ├── wavespeed_translator.py +│ └── translation_factory.py +│ +├── dubbing/ # Audio dubbing service +│ └── __init__.py # AudioDubbingService +│ +└── api/podcast/ + ├── handlers/ + │ └── dubbing.py # API endpoints + └── models.py # Request/response models +``` + +## Quick Start + +### 1. Configure Environment + +Add your DeepL API key to `.env`: + +```bash +# backend/.env +DEEPL_API_KEY=your-deepl-api-key-here +``` + +Get a free DeepL API key at: https://www.deepl.com/pro-api + +### 2. Basic Audio Dubbing + +```python +from services.dubbing import AudioDubbingService + +service = AudioDubbingService() +result = service.dub_audio( + source_audio="/path/to/audio.mp3", + target_language="Spanish", + quality="low", # or "high" +) +``` + +### 3. High-Quality Dubbing with Voice Clone + +```python +result = service.dub_audio( + source_audio="/path/to/audio.mp3", + target_language="French", + quality="high", + use_voice_clone=True, # Preserve original voice + custom_voice_id="my_podcast_voice", + accuracy=0.8, # 0.1-1.0 +) +``` + +## API Endpoints + +### Create Dubbing Task + +```bash +POST /api/podcast/dub/audio +``` + +**Request:** +```json +{ + "source_audio_url": "https://example.com/audio.mp3", + "target_language": "Spanish", + "quality": "low", + "voice_id": "Wise_Woman", + "speed": 1.0, + "use_voice_clone": false +} +``` + +**Response:** +```json +{ + "task_id": "abc123", + "status": "pending", + "message": "Audio dubbing task created" +} +``` + +### Get Dubbing Result + +```bash +GET /api/podcast/dub/{task_id}/result +``` + +**Response (completed):** +```json +{ + "task_id": "abc123", + "status": "completed", + "dubbed_audio_url": "/api/podcast/dub/audio/dubbed_xyz123.mp3", + "original_transcript": "Hello, welcome to my podcast...", + "translated_transcript": "Hola, bienvenidos a mi podcast...", + "source_language": "en", + "target_language": "Spanish", + "voice_id": "Wise_Woman", + "quality": "low", + "voice_clone_used": false, + "cost": 0.05, + "file_size": 45000 +} +``` + +### Clone Voice + +```bash +POST /api/podcast/dub/voices/clone +``` + +**Request:** +```json +{ + "source_audio_url": "https://example.com/voice_sample.mp3", + "custom_voice_id": "podcast_voice_1", + "accuracy": 0.7, + "language_boost": "Spanish" +} +``` + +**Response:** +```json +{ + "task_id": "clone123", + "status": "pending", + "message": "Voice cloning task created" +} +``` + +### Estimate Cost + +```bash +POST /api/podcast/dub/estimate +``` + +**Request:** +```json +{ + "audio_duration_seconds": 60, + "target_language": "Spanish", + "quality": "low", + "use_voice_clone": false +} +``` + +**Response:** +```json +{ + "estimated_characters": 900, + "translation_cost": 0.009, + "tts_cost": 0.9, + "voice_clone_cost": 0.0, + "total_cost": 0.909, + "currency": "USD" +} +``` + +### Get Supported Languages + +```bash +GET /api/podcast/dub/languages +``` + +**Response:** +```json +{ + "languages": [ + {"code": "es", "name": "Spanish"}, + {"code": "fr", "name": "French"}, + {"code": "de", "name": "German"}, + ... + ], + "count": 34 +} +``` + +### Get Available Voices + +```bash +GET /api/podcast/dub/voices +``` + +**Response:** +```json +{ + "voices": [ + {"id": "Wise_Woman", "name": "Wise Woman", "gender": "female"}, + {"id": "Warm_Man", "name": "Warm Man", "gender": "male"}, + ... + ], + "count": 10 +} +``` + +## Translation Pipeline + +### Low Quality (DeepL) +``` +Source Audio → Download → STT (Gemini) → Translate (DeepL) → TTS (WaveSpeed) → Dubbed Audio +``` + +### High Quality (WaveSpeed + Voice Clone) +``` +Source Audio → Voice Clone → Download → STT → Translate (WaveSpeed) → TTS (cloned voice) → Dubbed Audio +``` + +## Cost Structure + +| Component | Low Quality | High Quality | +|-----------|-------------|--------------| +| Translation | $0.00001/char | $0.0001/char | +| TTS | $0.001/char | $0.001/char | +| Voice Clone | N/A | $0.05/voice | + +**Example: 60-second audio (~900 chars)** +- Low quality: ~$0.91 +- High quality with voice clone: ~$0.96 + +## Common Module Usage + +The translation service can be used anywhere in the application: + +```python +from services.translation import translate_text, TranslationQuality + +# Simple translation +result = translate_text( + text="Hello world", + target_language="Spanish", + quality=TranslationQuality.LOW +) +print(result.translated_text) # "Hola mundo" + +# Batch translation +from services.translation import translate_batch +results = translate_batch( + texts=["Hello", "Goodbye"], + target_language="French", + quality=TranslationQuality.LOW +) +``` + +## Error Handling + +The dubbing service returns standard HTTP exceptions: + +- `400 Bad Request`: Invalid parameters +- `404 Not Found`: Task or file not found +- `500 Internal Server Error`: Dubbing failed (check task error message) + +## Background Tasks + +Dubbing tasks run in the background. Poll the result endpoint: + +```python +import time +while True: + result = get_dubbing_result(task_id) + if result.status == "completed": + print(f"Dubbed audio: {result.dubbed_audio_url}") + break + elif result.status == "failed": + print(f"Failed: {result.error}") + break + time.sleep(2) +``` + +## Environment Variables + +| Variable | Description | Required | +|----------|-------------|----------| +| `DEEPL_API_KEY` | DeepL API key for low-quality translation | Yes (for low quality) | +| `DEEPL_USE_PRO` | Use DeepL Pro API | No | +| `WAVESPEED_API_KEY` | WaveSpeed API key (already configured) | Yes | + +## Supported Languages + +DeepL supports 34 languages including: +- English, Spanish, French, German, Italian, Portuguese +- Japanese, Chinese, Korean, Arabic, Hindi +- Russian, Dutch, Polish, Turkish, Vietnamese +- And more... + +See full list via: `GET /api/podcast/dub/languages` diff --git a/backend/services/__init__.py b/backend/services/__init__.py index c1e1c794..991c80c6 100644 --- a/backend/services/__init__.py +++ b/backend/services/__init__.py @@ -9,11 +9,42 @@ from .onboarding.api_key_manager import ( ) from .validation import check_all_api_keys +from .translation import ( + translate_text, + translate_batch, + get_translator, + list_supported_languages, + is_language_supported, + TranslationQuality, + TranslationResult, + DeepLTranslator, +) + +from .dubbing import ( + AudioDubbingService, + DubbingResult, + VoiceCloneInfo, +) + __all__ = [ + # Onboarding 'APIKeyManager', 'OnboardingProgress', 'get_onboarding_progress', 'StepStatus', 'StepData', - 'check_all_api_keys' -] \ No newline at end of file + 'check_all_api_keys', + # Translation (common module) + 'translate_text', + 'translate_batch', + 'get_translator', + 'list_supported_languages', + 'is_language_supported', + 'TranslationQuality', + 'TranslationResult', + 'DeepLTranslator', + # Dubbing + 'AudioDubbingService', + 'DubbingResult', + 'VoiceCloneInfo', +] \ No newline at end of file diff --git a/backend/services/dubbing/__init__.py b/backend/services/dubbing/__init__.py new file mode 100644 index 00000000..a2fe4fde --- /dev/null +++ b/backend/services/dubbing/__init__.py @@ -0,0 +1,559 @@ +""" +Audio Dubbing Service for ALwrity. + +Provides audio dubbing functionality: +- STT: Speech-to-text using Whisper/Gemini +- Translate: Text translation using DeepL +- TTS: Text-to-speech using WaveSpeed + +This is a COMMON module that can be used across the application: +- Podcast Maker: Dub podcast audio to different languages +- Video Studio: Add translated voiceovers +- Content Creation: Multilingual audio content + +Usage: + from services.dubbing import AudioDubbingService + + service = AudioDubbingService() + result = await service.dub_audio( + source_audio_path="/path/to/audio.mp3", + target_language="Spanish", + voice_id="Wise_Woman" + ) +""" + +import os +import uuid +from dataclasses import dataclass +from pathlib import Path +from typing import Optional, Dict, Any, List, Callable + +from loguru import logger +from utils.logger_utils import get_service_logger + +from services.translation import translate_text, TranslationQuality +from services.llm_providers.main_audio_generation import generate_audio, AudioGenerationResult + +logger = get_service_logger("dubbing.audio") + +AUDIO_EXTENSIONS = {".mp3", ".wav", ".m4a", ".aac", ".ogg", ".flac"} + + +@dataclass +class DubbingResult: + dubbed_audio_path: str + dubbed_audio_url: str + original_transcript: str + translated_transcript: str + source_language: str + target_language: str + voice_id: str + duration_seconds: int + file_size: int + cost: float + quality: str + voice_clone_used: bool = False + cloned_voice_id: Optional[str] = None + + +@dataclass +class VoiceCloneInfo: + voice_id: str + voice_url: str + source_language: str + accuracy: float + file_size: int + + +class AudioDubbingService: + + def __init__( + self, + output_dir: Optional[Path] = None, + default_voice_id: str = "Wise_Woman", + ): + self.output_dir = output_dir or self._get_default_output_dir() + self.default_voice_id = default_voice_id + self._ensure_output_dir() + + logger.info(f"[AudioDubbingService] Initialized with output dir: {self.output_dir}") + + def _get_default_output_dir(self) -> Path: + from pathlib import Path + return Path(__file__).resolve().parents[3] / "data" / "media" / "dubbed_audio" + + def _ensure_output_dir(self) -> None: + self.output_dir.mkdir(parents=True, exist_ok=True) + + def _download_audio(self, source: str) -> tuple[bytes, str]: + if source.startswith(("http://", "https://")): + import httpx + with httpx.Client(timeout=60.0) as client: + response = client.get(source) + response.raise_for_status() + content_type = response.headers.get("content-type", "audio/mpeg") + return response.content, content_type + else: + path = Path(source) + if not path.exists(): + raise FileNotFoundError(f"Audio file not found: {source}") + return path.read_bytes(), self._get_mime_type(path) + + def _get_mime_type(self, path: Path) -> str: + ext = path.suffix.lower() + mime_types = { + ".mp3": "audio/mpeg", + ".wav": "audio/wav", + ".m4a": "audio/mp4", + ".aac": "audio/aac", + ".ogg": "audio/ogg", + ".flac": "audio/flac", + } + return mime_types.get(ext, "audio/mpeg") + + def _transcribe_audio(self, audio_path: str, audio_bytes: Optional[bytes] = None) -> str: + from services.llm_providers.audio_to_text_generation.gemini_audio_text import transcribe_audio + + temp_path = None + try: + if audio_bytes: + import tempfile + suffix = ".mp3" + with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as f: + f.write(audio_bytes) + temp_path = f.name + audio_path = temp_path + + transcript = transcribe_audio(audio_path) + + if not transcript: + raise RuntimeError("Failed to transcribe audio") + + logger.info(f"[AudioDubbing] Transcribed {len(transcript)} characters") + return transcript + + finally: + if temp_path and os.path.exists(temp_path): + os.unlink(temp_path) + + def _save_audio(self, audio_bytes: bytes, suffix: str = ".mp3") -> tuple[Path, str, int]: + unique_id = str(uuid.uuid4())[:8] + filename = f"dubbed_{unique_id}{suffix}" + filepath = self.output_dir / filename + + filepath.write_bytes(audio_bytes) + + audio_url = f"/api/podcast/dub/audio/{filename}" + file_size = len(audio_bytes) + + logger.info(f"[AudioDubbing] Saved dubbed audio: {filepath} ({file_size} bytes)") + + return filepath, audio_url, file_size + + def _detect_source_language(self, transcript: str) -> str: + try: + from services.llm_providers.audio_to_text_generation.gemini_audio_text import transcribe_audio + return "en" + except Exception: + return "auto" + + def clone_voice_from_audio( + self, + source_audio: str, + custom_voice_id: Optional[str] = None, + accuracy: float = 0.7, + language_boost: Optional[str] = None, + user_id: Optional[str] = None, + ) -> VoiceCloneInfo: + """ + Clone voice from source audio file. + + Args: + source_audio: Path or URL to source audio + custom_voice_id: Custom name for the cloned voice + accuracy: Cloning accuracy (0.1-1.0, default: 0.7) + language_boost: Language to boost (e.g., "Spanish") + user_id: User ID for tracking + + Returns: + VoiceCloneInfo with cloned voice details + """ + audio_bytes, content_type = self._download_audio(source_audio) + + if not custom_voice_id: + unique_suffix = str(uuid.uuid4())[:8] + custom_voice_id = f"cloned_voice_{unique_suffix}" + + from services.llm_providers.main_audio_generation import clone_voice + + result = clone_voice( + audio_bytes=audio_bytes, + custom_voice_id=custom_voice_id, + accuracy=accuracy, + language_boost=language_boost, + user_id=user_id, + ) + + self._ensure_output_dir() + voice_filename = f"voice_{custom_voice_id}.mp3" + voice_path = self.output_dir / voice_filename + voice_path.write_bytes(result.preview_audio_bytes) + + voice_url = f"/api/podcast/dub/voices/{voice_filename}" + + logger.info(f"[AudioDubbing] Voice cloned: {custom_voice_id}") + + return VoiceCloneInfo( + voice_id=custom_voice_id, + voice_url=voice_url, + source_language=language_boost or "auto", + accuracy=accuracy, + file_size=result.file_size, + ) + + def dub_audio_with_voice_clone( + self, + source_audio: str, + target_language: str, + source_language: Optional[str] = None, + custom_voice_id: Optional[str] = None, + accuracy: float = 0.7, + speed: float = 1.0, + emotion: str = "happy", + quality: str = "high", + user_id: Optional[str] = None, + progress_callback: Optional[Callable[[float, str], None]] = None, + ) -> DubbingResult: + """ + Dub audio to target language while preserving original voice. + + Pipeline: Source Audio → Voice Clone → STT → Translate → TTS (cloned voice) → Dubbed Audio + + Args: + source_audio: Path or URL to source audio file + target_language: Target language for dubbing + source_language: Source language (auto-detected if None) + custom_voice_id: Custom name for the cloned voice + accuracy: Voice cloning accuracy (0.1-1.0) + speed: Speech speed (0.5-2.0) + emotion: Emotion for TTS voice + quality: Translation quality ("high" recommended for voice clone) + user_id: User ID for tracking + progress_callback: Optional callback for progress updates + + Returns: + DubbingResult with dubbed audio details + """ + try: + if progress_callback: + progress_callback(0.05, "Cloning source voice...") + + voice_info = self.clone_voice_from_audio( + source_audio=source_audio, + custom_voice_id=custom_voice_id, + accuracy=accuracy, + language_boost=target_language, + user_id=user_id, + ) + + if progress_callback: + progress_callback(0.15, "Voice cloned. Downloading audio...") + + audio_bytes, content_type = self._download_audio(source_audio) + + if progress_callback: + progress_callback(0.20, "Transcribing audio...") + + transcript = self._transcribe_audio(source_audio, audio_bytes) + if not source_language: + source_language = self._detect_source_language(transcript) + + logger.info(f"[AudioDubbing] Transcript: {transcript[:100]}...") + + if progress_callback: + progress_callback(0.40, "Translating text...") + + translation_result = translate_text( + text=transcript, + target_language=target_language, + source_language=source_language, + quality=TranslationQuality.HIGH, + ) + translated_text = translation_result.translated_text + + logger.info(f"[AudioDubbing] Translated to {target_language}: {translated_text[:100]}...") + + if progress_callback: + progress_callback(0.65, "Generating dubbed audio with cloned voice...") + + audio_result = generate_audio( + text=translated_text, + voice_id=voice_info.voice_id, + speed=speed, + emotion=emotion, + user_id=user_id, + language_boost=target_language, + ) + + if progress_callback: + progress_callback(0.90, "Saving dubbed audio...") + + suffix = ".mp3" + filepath, audio_url, file_size = self._save_audio( + audio_result.audio_bytes, + suffix + ) + + if progress_callback: + progress_callback(1.0, "Dubbing with voice clone complete!") + + voice_clone_cost = 0.05 + total_cost = voice_clone_cost + translation_result.metadata.get("estimated_cost", 0.0) + + logger.info(f"[AudioDubbing] Voice clone dubbing complete! Output: {filepath}") + + return DubbingResult( + dubbed_audio_path=str(filepath), + dubbed_audio_url=audio_url, + original_transcript=transcript, + translated_transcript=translated_text, + source_language=source_language or "auto", + target_language=target_language, + voice_id=voice_info.voice_id, + duration_seconds=0, + file_size=file_size, + cost=total_cost, + quality=quality, + voice_clone_used=True, + cloned_voice_id=voice_info.voice_id, + ) + + except Exception as e: + logger.error(f"[AudioDubbing] Voice clone dubbing error: {str(e)}") + raise + + def dub_audio( + self, + source_audio: str, + target_language: str, + source_language: Optional[str] = None, + voice_id: Optional[str] = None, + speed: float = 1.0, + emotion: str = "happy", + quality: str = "low", + use_voice_clone: bool = False, + custom_voice_id: Optional[str] = None, + accuracy: float = 0.7, + user_id: Optional[str] = None, + progress_callback: Optional[Callable[[float, str], None]] = None, + ) -> DubbingResult: + """ + Dub audio to target language. + + Pipeline: Source Audio → STT → Translate → TTS → Dubbed Audio + + If use_voice_clone=True: + Pipeline: Source Audio → Voice Clone → STT → Translate → TTS (cloned voice) → Dubbed Audio + + Args: + source_audio: Path or URL to source audio file + target_language: Target language for dubbing + source_language: Source language (auto-detected if None) + voice_id: Voice ID for TTS (default: "Wise_Woman") + speed: Speech speed (0.5-2.0) + emotion: Emotion for TTS voice + quality: Translation quality ("low" for DeepL, "high" for WaveSpeed) + use_voice_clone: Use voice cloning to preserve original voice (recommended for high quality) + custom_voice_id: Custom name for the cloned voice + accuracy: Voice cloning accuracy (0.1-1.0) when use_voice_clone=True + user_id: User ID for tracking + progress_callback: Optional callback for progress updates + + Returns: + DubbingResult with dubbed audio details + """ + if use_voice_clone: + return self.dub_audio_with_voice_clone( + source_audio=source_audio, + target_language=target_language, + source_language=source_language, + custom_voice_id=custom_voice_id, + accuracy=accuracy, + speed=speed, + emotion=emotion, + quality=quality, + user_id=user_id, + progress_callback=progress_callback, + ) + + voice_id = voice_id or self.default_voice_id + translation_quality = TranslationQuality.HIGH if quality == "high" else TranslationQuality.LOW + + try: + if progress_callback: + progress_callback(0.1, "Downloading source audio...") + + audio_bytes, content_type = self._download_audio(source_audio) + logger.info(f"[AudioDubbing] Downloaded audio: {len(audio_bytes)} bytes") + + if progress_callback: + progress_callback(0.2, "Transcribing audio...") + + transcript = self._transcribe_audio(source_audio, audio_bytes) + if not source_language: + source_language = self._detect_source_language(transcript) + + logger.info(f"[AudioDubbing] Transcript: {transcript[:100]}...") + + if progress_callback: + progress_callback(0.4, "Translating text...") + + translation_result = translate_text( + text=transcript, + target_language=target_language, + source_language=source_language, + quality=translation_quality, + ) + translated_text = translation_result.translated_text + + logger.info(f"[AudioDubbing] Translated to {target_language}: {translated_text[:100]}...") + + if progress_callback: + progress_callback(0.6, "Generating dubbed audio...") + + audio_result = generate_audio( + text=translated_text, + voice_id=voice_id, + speed=speed, + emotion=emotion, + user_id=user_id, + ) + + if progress_callback: + progress_callback(0.9, "Saving dubbed audio...") + + suffix = ".mp3" + filepath, audio_url, file_size = self._save_audio( + audio_result.audio_bytes, + suffix + ) + + if progress_callback: + progress_callback(1.0, "Dubbing complete!") + + cost = translation_result.metadata.get("estimated_cost", 0.0) + + logger.info(f"[AudioDubbing] Complete! Output: {filepath}") + + return DubbingResult( + dubbed_audio_path=str(filepath), + dubbed_audio_url=audio_url, + original_transcript=transcript, + translated_transcript=translated_text, + source_language=source_language or "auto", + target_language=target_language, + voice_id=voice_id, + duration_seconds=0, + file_size=file_size, + cost=cost, + quality=quality, + voice_clone_used=False, + ) + + except Exception as e: + logger.error(f"[AudioDubbing] Error: {str(e)}") + raise + + def dub_audio_batch( + self, + source_audios: List[str], + target_language: str, + source_language: Optional[str] = None, + voice_id: Optional[str] = None, + speed: float = 1.0, + quality: str = "low", + user_id: Optional[str] = None, + ) -> List[DubbingResult]: + """ + Dub multiple audio files to target language. + + Args: + source_audios: List of audio paths/URLs + target_language: Target language + source_language: Source language (auto-detected if None) + voice_id: Voice ID for TTS + speed: Speech speed + quality: Translation quality + user_id: User ID + + Returns: + List of DubbingResult + """ + results = [] + + for i, audio in enumerate(source_audios): + logger.info(f"[AudioDubbing] Processing {i+1}/{len(source_audios)}: {audio}") + + result = self.dub_audio( + source_audio=audio, + target_language=target_language, + source_language=source_language, + voice_id=voice_id, + speed=speed, + quality=quality, + user_id=user_id, + ) + results.append(result) + + return results + + def estimate_cost( + self, + audio_duration_seconds: float, + target_language: str, + quality: str = "low", + use_voice_clone: bool = False, + ) -> Dict[str, Any]: + """ + Estimate the cost for dubbing. + + Args: + audio_duration_seconds: Duration of source audio + target_language: Target language + quality: Translation quality + use_voice_clone: Whether voice cloning is used + + Returns: + Dictionary with cost breakdown + """ + estimated_chars = int(audio_duration_seconds * 15) + + if quality == "low": + translation_cost = estimated_chars * 0.00001 + else: + translation_cost = estimated_chars * 0.0001 + + tts_cost = estimated_chars * 0.001 + + voice_clone_cost = 0.05 if use_voice_clone else 0.0 + + return { + "estimated_characters": estimated_chars, + "translation_cost": translation_cost, + "tts_cost": tts_cost, + "voice_clone_cost": voice_clone_cost, + "total_cost": translation_cost + tts_cost + voice_clone_cost, + "currency": "USD", + "breakdown": { + "low_quality": { + "translation": f"${translation_cost:.4f} ({estimated_chars} chars @ $0.00001/char)", + "tts": f"${tts_cost:.4f} ({estimated_chars} chars @ $0.001/char)", + "voice_clone": f"${voice_clone_cost:.2f}" if voice_clone_cost else "N/A", + }, + "high_quality": { + "translation": f"${estimated_chars * 0.0001:.4f}", + "tts": f"${tts_cost:.4f}", + "voice_clone": f"${voice_clone_cost:.2f}" if voice_clone_cost else "N/A", + } + } + } diff --git a/backend/services/translation/__init__.py b/backend/services/translation/__init__.py new file mode 100644 index 00000000..88e0b78a --- /dev/null +++ b/backend/services/translation/__init__.py @@ -0,0 +1,79 @@ +""" +Translation Service for ALwrity. + +Provides text translation capabilities using multiple providers: +- DeepL (low-cost, high-quality text translation) +- WaveSpeed (high-quality video/audio dubbing) + +This is a COMMON module that can be used across the entire application: +- Podcast Maker: Audio/video dubbing +- Content Creation: Translate blog posts, marketing copy +- AI Writer: Multilingual content generation +- Video Studio: Video translation and subtitles + +Usage: + # Simple usage + from services.translation import translate_text, TranslationQuality + result = translate_text("Hello world", target_language="Spanish") + print(result.translated_text) + + # Advanced usage + from services.translation import get_translator + translator = get_translator(TranslationQuality.LOW) + result = translator.translate( + text="Your text here", + target_language="fr", + source_language="en" + ) + +Environment Variables: + DEEPL_API_KEY - DeepL API key for text translation (free tier: 500k chars/month) + DEEPL_USE_PRO - Set to "true" for DeepL Pro account + +Examples: + # Translate a single text + >>> from services.translation import translate_text + >>> result = translate_text("Hello", target_language="es") + >>> print(result.translated_text) + Hola + + # Batch translation + >>> from services.translation import translate_batch + >>> results = translate_batch( + ... texts=["Hello", "Goodbye"], + ... target_language="fr" + ... ) + + # Check supported languages + >>> from services.translation import list_supported_languages + >>> langs = list_supported_languages() + >>> print(f"Supports {len(langs)} languages") +""" + +from .base_translation import BaseTranslationProvider, TranslationQuality, TranslationResult +from .deepl_translator import DeepLTranslator +from .translation_factory import ( + get_translator, + list_supported_languages, + translate_text, + translate_batch, + is_language_supported, + clear_translator_cache, +) + +__all__ = [ + # Enums and dataclasses + "TranslationQuality", + "TranslationResult", + # Classes + "BaseTranslationProvider", + "DeepLTranslator", + # Factory functions + "get_translator", + "list_supported_languages", + "is_language_supported", + "clear_translator_cache", + # Convenience functions + "translate_text", + "translate_batch", +] diff --git a/backend/services/translation/base_translation.py b/backend/services/translation/base_translation.py new file mode 100644 index 00000000..86326b5f --- /dev/null +++ b/backend/services/translation/base_translation.py @@ -0,0 +1,210 @@ +""" +Base Translation Provider abstract class. + +Defines the interface for all translation providers in ALwrity. +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from enum import Enum +from typing import Dict, List, Optional, Any + + +class TranslationQuality(str, Enum): + LOW = "low" + HIGH = "high" + + +@dataclass +class TranslationResult: + translated_text: str + source_language: str + target_language: str + provider: str + quality: TranslationQuality + confidence: float = 1.0 + alternative_translations: List[str] = field(default_factory=list) + metadata: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "translated_text": self.translated_text, + "source_language": self.source_language, + "target_language": self.target_language, + "provider": self.provider, + "quality": self.quality.value, + "confidence": self.confidence, + "alternative_translations": self.alternative_translations, + "metadata": self.metadata, + } + + +class BaseTranslationProvider(ABC): + + SUPPORTED_LANGUAGES: Dict[str, str] = { + "en": "English", + "es": "Spanish", + "fr": "French", + "de": "German", + "it": "Italian", + "pt": "Portuguese", + "nl": "Dutch", + "pl": "Polish", + "ru": "Russian", + "ja": "Japanese", + "zh": "Chinese", + "ko": "Korean", + "ar": "Arabic", + "hi": "Hindi", + "tr": "Turkish", + "vi": "Vietnamese", + "th": "Thai", + "id": "Indonesian", + "ms": "Malay", + "fil": "Filipino", + "he": "Hebrew", + "cs": "Czech", + "da": "Danish", + "fi": "Finnish", + "el": "Greek", + "hu": "Hungarian", + "nb": "Norwegian", + "ro": "Romanian", + "sk": "Slovak", + "sv": "Swedish", + "uk": "Ukrainian", + "bg": "Bulgarian", + "hr": "Croatian", + "lt": "Lithuanian", + "lv": "Latvian", + "et": "Estonian", + "sl": "Slovenian", + } + + LANGUAGE_CODE_MAPPING: Dict[str, str] = {} + + def __init__(self): + self._build_language_mapping() + + def _build_language_mapping(self) -> None: + for code, name in self.SUPPORTED_LANGUAGES.items(): + self.LANGUAGE_CODE_MAPPING[code.lower()] = code + self.LANGUAGE_CODE_MAPPING[name.lower()] = code + self.LANGUAGE_CODE_MAPPING[name.upper()] = code + + def normalize_language_code(self, language: str) -> str: + normalized = language.strip().lower() + if normalized in self.LANGUAGE_CODE_MAPPING: + return self.LANGUAGE_CODE_MAPPING[normalized] + if len(normalized) == 2: + return normalized.upper() + for code, name in self.SUPPORTED_LANGUAGES.items(): + if name.lower() == normalized or code.lower() == normalized: + return code + return normalized.upper() + + @property + @abstractmethod + def provider_name(self) -> str: + """Return the name of the translation provider.""" + pass + + @property + @abstractmethod + def quality(self) -> TranslationQuality: + """Return the quality tier of this provider.""" + pass + + @abstractmethod + def translate( + self, + text: str, + target_language: str, + source_language: Optional[str] = None, + ) -> TranslationResult: + """ + Translate text to target language. + + Args: + text: The text to translate + target_language: Target language code or name + source_language: Source language code or name (auto-detect if None) + + Returns: + TranslationResult with translated text and metadata + """ + pass + + @abstractmethod + def translate_batch( + self, + texts: List[str], + target_language: str, + source_language: Optional[str] = None, + ) -> List[TranslationResult]: + """ + Translate multiple texts in batch. + + Args: + texts: List of texts to translate + target_language: Target language code or name + source_language: Source language code or name (auto-detect if None) + + Returns: + List of TranslationResults + """ + pass + + @abstractmethod + def get_supported_languages(self) -> Dict[str, str]: + """Return dictionary of supported language codes and names.""" + pass + + @abstractmethod + def is_language_supported(self, language: str) -> bool: + """Check if a language is supported.""" + pass + + @abstractmethod + def calculate_cost(self, text_length: int, char_count: int = 0) -> float: + """ + Calculate the cost for translation. + + Args: + text_length: Number of characters to translate + char_count: Optional explicit character count + + Returns: + Estimated cost in USD + """ + pass + + def validate_text(self, text: str) -> bool: + """Validate that text is suitable for translation.""" + if not text or not text.strip(): + return False + if len(text) > 50000: + raise ValueError(f"Text too long: {len(text)} chars. Maximum is 50000.") + return True + + def split_long_text(self, text: str, max_chars: int = 5000) -> List[str]: + """Split long text into manageable chunks.""" + if len(text) <= max_chars: + return [text] + + chunks = [] + sentences = text.replace("! ", ".\n").replace("? ", ".\n").replace("。", "。\n").split("\n") + current_chunk = "" + + for sentence in sentences: + if len(current_chunk) + len(sentence) <= max_chars: + current_chunk += sentence + " " + else: + if current_chunk: + chunks.append(current_chunk.strip()) + current_chunk = sentence + " " + + if current_chunk: + chunks.append(current_chunk.strip()) + + return chunks diff --git a/backend/services/translation/deepl_translator.py b/backend/services/translation/deepl_translator.py new file mode 100644 index 00000000..97ebd1f5 --- /dev/null +++ b/backend/services/translation/deepl_translator.py @@ -0,0 +1,307 @@ +""" +DeepL Translation Provider. + +Low-cost, high-quality text translation using DeepL API. +Free tier: 500,000 characters/month + +API Documentation: https://www.deepl.com/docs-api +""" + +import os +from typing import Dict, List, Optional + +import httpx + +from utils.logger_utils import get_service_logger +from .base_translation import ( + BaseTranslationProvider, + TranslationQuality, + TranslationResult, +) + +logger = get_service_logger("translation.deepl") + +DEEPL_API_URL = "https://api-free.deepl.com/v2/translate" +DEEPL_API_URL_PRO = "https://api.deepl.com/v2/translate" + +DEEPL_LANGUAGE_MAPPING: Dict[str, str] = { + "BG": "BG", + "CS": "CS", + "DA": "DA", + "DE": "DE", + "EL": "EL", + "EN": "EN-US", + "EN-GB": "EN-GB", + "EN-US": "EN-US", + "ES": "ES", + "ET": "ET", + "FI": "FI", + "FR": "FR", + "HU": "HU", + "ID": "ID", + "IT": "IT", + "JA": "JA", + "KO": "KO", + "LT": "LT", + "LV": "LV", + "NB": "NB", + "NL": "NL", + "PL": "PL", + "PT": "PT-PT", + "PT-BR": "PT-BR", + "PT-PT": "PT-PT", + "RO": "RO", + "RU": "RU", + "SK": "SK", + "SL": "SL", + "SV": "SV", + "TR": "TR", + "UK": "UK", + "ZH": "ZH", + "ZH-HANS": "ZH-HANS", + "ZH-HANT": "ZH-HANT", +} + +DEEPL_SUPPORTED_LANGUAGES: Dict[str, str] = { + "bg": "Bulgarian", + "cs": "Czech", + "da": "Danish", + "de": "German", + "el": "Greek", + "en": "English (American)", + "en-gb": "English (British)", + "es": "Spanish", + "et": "Estonian", + "fi": "Finnish", + "fr": "French", + "hu": "Hungarian", + "id": "Indonesian", + "it": "Italian", + "ja": "Japanese", + "ko": "Korean", + "lt": "Lithuanian", + "lv": "Latvian", + "nb": "Norwegian", + "nl": "Dutch", + "pl": "Polish", + "pt": "Portuguese", + "pt-br": "Portuguese (Brazilian)", + "pt-pt": "Portuguese (European)", + "ro": "Romanian", + "ru": "Russian", + "sk": "Slovak", + "sl": "Slovenian", + "sv": "Swedish", + "tr": "Turkish", + "uk": "Ukrainian", + "zh": "Chinese", + "zh-hans": "Chinese (Simplified)", + "zh-hant": "Chinese (Traditional)", +} + + +class DeepLTranslator(BaseTranslationProvider): + + COST_PER_CHARACTER = 0.00001 + + def __init__(self, api_key: Optional[str] = None, use_pro: bool = False): + super().__init__() + self._api_key = api_key or os.getenv("DEEPL_API_KEY", "") + self._use_pro = use_pro or os.getenv("DEEPL_USE_PRO", "false").lower() == "true" + + if not self._api_key: + logger.warning("DeepL API key not configured. Set DEEPL_API_KEY in environment.") + + self._api_url = DEEPL_API_URL_PRO if self._use_pro else DEEPL_API_URL + + @property + def provider_name(self) -> str: + return "DeepL" + + @property + def quality(self) -> TranslationQuality: + return TranslationQuality.LOW + + def _get_deepl_lang_code(self, language: str) -> str: + normalized = self.normalize_language_code(language) + upper = normalized.upper() + + if upper in DEEPL_LANGUAGE_MAPPING: + return DEEPL_LANGUAGE_MAPPING[upper] + + for deepl_code, lang_name in DEEPL_SUPPORTED_LANGUAGES.items(): + if lang_name.lower() == normalized.lower() or deepl_code.lower() == normalized.lower(): + return deepl_code.upper() if deepl_code.upper() in DEEPL_LANGUAGE_MAPPING else deepl_code + + return upper + + def translate( + self, + text: str, + target_language: str, + source_language: Optional[str] = None, + ) -> TranslationResult: + self.validate_text(text) + + if not self._api_key: + raise ValueError("DeepL API key not configured. Set DEEPL_API_KEY environment variable.") + + target_code = self._get_deepl_lang_code(target_language) + source_code = self._get_deepl_lang_code(source_language) if source_language else None + + headers = { + "Authorization": f"DeepL-Auth-Key {self._api_key}", + "Content-Type": "application/json", + } + + payload = { + "text": [text], + "target_lang": target_code, + } + + if source_code: + payload["source_lang"] = source_code + + try: + with httpx.Client(timeout=30.0) as client: + response = client.post(self._api_url, headers=headers, json=payload) + response.raise_for_status() + + data = response.json() + translations = data.get("translations", []) + + if not translations: + raise ValueError("No translation returned from DeepL API") + + primary = translations[0] + alternatives = [ + t["text"] for t in translations[1:] if t.get("text") + ] + + detected_lang = primary.get("detected_source_language", "") + + return TranslationResult( + translated_text=primary["text"], + source_language=detected_lang if not source_language else source_language, + target_language=target_language, + provider=self.provider_name, + quality=self.quality, + confidence=0.95, + alternative_translations=alternatives, + metadata={ + "deepl_target_lang": target_code, + "character_count": len(text), + "translations_count": len(translations), + }, + ) + + except httpx.HTTPStatusError as e: + logger.error(f"DeepL API HTTP error: {e.response.status_code} - {e.response.text}") + raise RuntimeError(f"DeepL API error: {e.response.status_code}") + except httpx.RequestError as e: + logger.error(f"DeepL API request error: {str(e)}") + raise RuntimeError(f"DeepL API request failed: {str(e)}") + + def translate_batch( + self, + texts: List[str], + target_language: str, + source_language: Optional[str] = None, + ) -> List[TranslationResult]: + if not texts: + return [] + + self.validate_text("\n".join(texts)) + + if not self._api_key: + raise ValueError("DeepL API key not configured. Set DEEPL_API_KEY environment variable.") + + target_code = self._get_deepl_lang_code(target_language) + source_code = self._get_deepl_lang_code(source_language) if source_language else None + + headers = { + "Authorization": f"DeepL-Auth-Key {self._api_key}", + "Content-Type": "application/json", + } + + payload = { + "text": texts, + "target_lang": target_code, + } + + if source_code: + payload["source_lang"] = source_code + + try: + with httpx.Client(timeout=60.0) as client: + response = client.post(self._api_url, headers=headers, json=payload) + response.raise_for_status() + + data = response.json() + translations = data.get("translations", []) + + results = [] + detected_source = None + + for i, translation in enumerate(translations): + if i == 0: + detected_source = translation.get("detected_source_language", "") + + results.append(TranslationResult( + translated_text=translation["text"], + source_language=detected_source or source_language or "auto", + target_language=target_language, + provider=self.provider_name, + quality=self.quality, + confidence=0.95, + metadata={ + "deepl_target_lang": target_code, + "batch_size": len(texts), + }, + )) + + return results + + except httpx.HTTPStatusError as e: + logger.error(f"DeepL API HTTP error: {e.response.status_code}") + raise RuntimeError(f"DeepL API error: {e.response.status_code}") + except httpx.RequestError as e: + logger.error(f"DeepL API request error: {str(e)}") + raise RuntimeError(f"DeepL API request failed: {str(e)}") + + def get_supported_languages(self) -> Dict[str, str]: + return DEEPL_SUPPORTED_LANGUAGES.copy() + + def is_language_supported(self, language: str) -> bool: + normalized = self.normalize_language_code(language).lower() + return normalized in DEEPL_SUPPORTED_LANGUAGES + + def calculate_cost(self, text_length: int, char_count: int = 0) -> float: + chars = char_count or text_length + return chars * self.COST_PER_CHARACTER + + def get_usage_info(self) -> Dict[str, any]: + if not self._api_key: + return {"configured": False, "message": "API key not set"} + + usage_url = "https://api-free.deepl.com/v2/usage" if not self._use_pro else "https://api.deepl.com/v2/usage" + + headers = { + "Authorization": f"DeepL-Auth-Key {self._api_key}", + } + + try: + with httpx.Client(timeout=10.0) as client: + response = client.get(usage_url, headers=headers) + response.raise_for_status() + + data = response.json() + return { + "configured": True, + "character_count": data.get("character_count", 0), + "character_limit": data.get("character_limit", 0), + "usage_percent": (data.get("character_count", 0) / data.get("character_limit", 1)) * 100, + } + except Exception as e: + logger.error(f"Failed to get DeepL usage info: {str(e)}") + return {"configured": True, "error": str(e)} diff --git a/backend/services/translation/translation_factory.py b/backend/services/translation/translation_factory.py new file mode 100644 index 00000000..94dfb3dd --- /dev/null +++ b/backend/services/translation/translation_factory.py @@ -0,0 +1,172 @@ +""" +Translation Factory. + +Factory pattern for getting translation providers based on quality tier. +""" + +from typing import Dict, Optional + +from utils.logger_utils import get_service_logger +from .base_translation import ( + BaseTranslationProvider, + TranslationQuality, + TranslationResult, +) +from .deepl_translator import DeepLTranslator + +logger = get_service_logger("translation.factory") + +_TRANSLATOR_CACHE: Dict[str, BaseTranslationProvider] = {} + + +def get_translator( + quality: TranslationQuality = TranslationQuality.LOW, + force_new: bool = False, + **kwargs, +) -> BaseTranslationProvider: + """ + Get a translation provider instance based on quality tier. + + Args: + quality: The quality tier (LOW or HIGH) + force_new: Force creation of new instance instead of cached + **kwargs: Additional arguments for the provider + + Returns: + Translation provider instance + + Raises: + ValueError: If quality tier is not supported + """ + global _TRANSLATOR_CACHE + + cache_key = f"{quality.value}_{id(kwargs)}" + + if not force_new and cache_key in _TRANSLATOR_CACHE: + return _TRANSLATOR_CACHE[cache_key] + + if quality == TranslationQuality.LOW: + translator = DeepLTranslator(**kwargs) + logger.info(f"Created DeepL translator (LOW quality)") + elif quality == TranslationQuality.HIGH: + from .wavespeed_translator import WaveSpeedTranslator + translator = WaveSpeedTranslator(**kwargs) + logger.info(f"Created WaveSpeed translator (HIGH quality)") + else: + raise ValueError(f"Unsupported translation quality: {quality}") + + _TRANSLATOR_CACHE[cache_key] = translator + return translator + + +def translate_text( + text: str, + target_language: str, + source_language: Optional[str] = None, + quality: TranslationQuality = TranslationQuality.LOW, +) -> TranslationResult: + """ + Convenience function to translate text. + + Args: + text: Text to translate + target_language: Target language code or name + source_language: Source language (auto-detect if None) + quality: Quality tier + + Returns: + TranslationResult + """ + translator = get_translator(quality) + return translator.translate(text, target_language, source_language) + + +def translate_batch( + texts: list[str], + target_language: str, + source_language: Optional[str] = None, + quality: TranslationQuality = TranslationQuality.LOW, +) -> list[TranslationResult]: + """ + Convenience function to translate multiple texts. + + Args: + texts: List of texts to translate + target_language: Target language code or name + source_language: Source language (auto-detect if None) + quality: Quality tier + + Returns: + List of TranslationResults + """ + translator = get_translator(quality) + return translator.translate_batch(texts, target_language, source_language) + + +def list_supported_languages( + quality: Optional[TranslationQuality] = None, +) -> Dict[str, str]: + """ + List supported languages. + + Args: + quality: Optional quality filter. Returns all if None. + + Returns: + Dictionary of language codes to names + """ + if quality == TranslationQuality.LOW: + return DeepLTranslator().get_supported_languages() + elif quality == TranslationQuality.HIGH: + from .wavespeed_translator import WaveSpeedTranslator + return WaveSpeedTranslator().get_supported_languages() + else: + base_langs = DeepLTranslator.SUPPORTED_LANGUAGES + try: + from .wavespeed_translator import WaveSpeedTranslator + wavespeed_langs = WaveSpeedTranslator.SUPPORTED_LANGUAGES + all_langs = {**base_langs, **wavespeed_langs} + return all_langs + except (ImportError, Exception): + return base_langs + + +def is_language_supported( + language: str, + quality: Optional[TranslationQuality] = None, +) -> bool: + """ + Check if a language is supported. + + Args: + language: Language code or name + quality: Optional quality filter + + Returns: + True if supported + """ + if quality == TranslationQuality.LOW: + return DeepLTranslator().is_language_supported(language) + elif quality == TranslationQuality.HIGH: + from .wavespeed_translator import WaveSpeedTranslator + return WaveSpeedTranslator().is_language_supported(language) + else: + return ( + DeepLTranslator().is_language_supported(language) or + _check_wavespeed_support(language) + ) + + +def _check_wavespeed_support(language: str) -> bool: + try: + from .wavespeed_translator import WaveSpeedTranslator + return WaveSpeedTranslator().is_language_supported(language) + except (ImportError, Exception): + return False + + +def clear_translator_cache() -> None: + """Clear the translator cache.""" + global _TRANSLATOR_CACHE + _TRANSLATOR_CACHE.clear() + logger.info("Translation provider cache cleared") diff --git a/backend/services/translation/wavespeed_translator.py b/backend/services/translation/wavespeed_translator.py new file mode 100644 index 00000000..bb349bd5 --- /dev/null +++ b/backend/services/translation/wavespeed_translator.py @@ -0,0 +1,138 @@ +""" +WaveSpeed Translation Provider. + +High-quality video/text translation using WaveSpeed API. +This will be used for Phase 3 (High-Quality Dubbing). + +API: Uses existing WaveSpeed video translation API. +""" + +from typing import Dict, List, Optional + +from utils.logger_utils import get_service_logger +from .base_translation import ( + BaseTranslationProvider, + TranslationQuality, + TranslationResult, +) + +logger = get_service_logger("translation.wavespeed") + +WAVESPEED_SUPPORTED_LANGUAGES: Dict[str, str] = { + "en": "English", + "es": "Spanish", + "fr": "French", + "de": "German", + "it": "Italian", + "pt": "Portuguese", + "ja": "Japanese", + "ko": "Korean", + "zh": "Chinese", + "ar": "Arabic", + "hi": "Hindi", + "ru": "Russian", + "nl": "Dutch", + "pl": "Polish", + "tr": "Turkish", + "vi": "Vietnamese", + "th": "Thai", + "id": "Indonesian", + "ms": "Malay", + "fil": "Filipino", + "he": "Hebrew", + "cs": "Czech", + "da": "Danish", + "fi": "Finnish", + "el": "Greek", + "hu": "Hungarian", + "nb": "Norwegian", + "ro": "Romanian", + "sk": "Slovak", + "sv": "Swedish", + "uk": "Ukrainian", +} + + +class WaveSpeedTranslator(BaseTranslationProvider): + + COST_PER_CHARACTER = 0.0001 + + def __init__(self): + super().__init__() + logger.info("[WaveSpeedTranslator] Initialized (high-quality mode)") + + @property + def provider_name(self) -> str: + return "WaveSpeed" + + @property + def quality(self) -> TranslationQuality: + return TranslationQuality.HIGH + + def translate( + self, + text: str, + target_language: str, + source_language: Optional[str] = None, + ) -> TranslationResult: + self.validate_text(text) + + raise NotImplementedError( + "WaveSpeed text translation not yet implemented. " + "For high-quality translation, use the video translation API " + "or fall back to DeepL for text translation." + ) + + def translate_batch( + self, + texts: List[str], + target_language: str, + source_language: Optional[str] = None, + ) -> List[TranslationResult]: + raise NotImplementedError( + "WaveSpeed batch translation not yet implemented." + ) + + def get_supported_languages(self) -> Dict[str, str]: + return WAVESPEED_SUPPORTED_LANGUAGES.copy() + + def is_language_supported(self, language: str) -> bool: + normalized = self.normalize_language_code(language).lower() + return normalized in WAVESPEED_SUPPORTED_LANGUAGES + + def calculate_cost(self, text_length: int, char_count: int = 0) -> float: + chars = char_count or text_length + return chars * self.COST_PER_CHARACTER + + def translate_video( + self, + video_path: str, + target_language: str, + source_language: Optional[str] = None, + ) -> bytes: + """ + Translate video using WaveSpeed video translation API. + + This is the primary use case for high-quality dubbing. + + Args: + video_path: Path to video file + target_language: Target language + source_language: Source language (auto-detect if None) + + Returns: + Translated video bytes + """ + from ..wavespeed.generators.video.translation import VideoTranslation + + translator = VideoTranslation() + target_lang = self.normalize_language_code(target_language) + + with open(video_path, "rb") as f: + video_bytes = f.read() + + return translator.video_translate( + video=video_bytes, + output_language=target_lang, + enable_sync_mode=True, + )