This commit adds the Auto-Dubbing feature for Podcast Maker with support for translating podcast audio to different languages with optional voice cloning to preserve the original speaker's voice. New Features: - Translation Service (common module): DeepL integration for low-cost translation, WaveSpeed integration for high-quality translation - Audio Dubbing Service: STT -> Translate -> TTS pipeline with voice cloning support - 9 new API endpoints for dubbing and voice cloning - Support for 34+ languages - Cost estimation utilities - Comprehensive documentation Files Added: - services/translation/ (5 files): Translation service module - services/dubbing/: Audio dubbing service - api/podcast/handlers/dubbing.py: API endpoints - docs/AUTO_DUBBING.md: Feature documentation - CHANGELOG.md: Change log Files Modified: - api/podcast/models.py: Added dubbing request/response models - api/podcast/router.py: Added dubbing routes - services/__init__.py: Export translation and dubbing services - scene_animation.py: Fixed missing Path import
560 lines
20 KiB
Python
560 lines
20 KiB
Python
"""
|
|
Audio Dubbing Service for ALwrity.
|
|
|
|
Provides audio dubbing functionality:
|
|
- STT: Speech-to-text using Whisper/Gemini
|
|
- Translate: Text translation using DeepL
|
|
- TTS: Text-to-speech using WaveSpeed
|
|
|
|
This is a COMMON module that can be used across the application:
|
|
- Podcast Maker: Dub podcast audio to different languages
|
|
- Video Studio: Add translated voiceovers
|
|
- Content Creation: Multilingual audio content
|
|
|
|
Usage:
|
|
from services.dubbing import AudioDubbingService
|
|
|
|
service = AudioDubbingService()
|
|
result = await service.dub_audio(
|
|
source_audio_path="/path/to/audio.mp3",
|
|
target_language="Spanish",
|
|
voice_id="Wise_Woman"
|
|
)
|
|
"""
|
|
|
|
import os
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List, Callable
|
|
|
|
from loguru import logger
|
|
from utils.logger_utils import get_service_logger
|
|
|
|
from services.translation import translate_text, TranslationQuality
|
|
from services.llm_providers.main_audio_generation import generate_audio, AudioGenerationResult
|
|
|
|
logger = get_service_logger("dubbing.audio")
|
|
|
|
AUDIO_EXTENSIONS = {".mp3", ".wav", ".m4a", ".aac", ".ogg", ".flac"}
|
|
|
|
|
|
@dataclass
|
|
class DubbingResult:
|
|
dubbed_audio_path: str
|
|
dubbed_audio_url: str
|
|
original_transcript: str
|
|
translated_transcript: str
|
|
source_language: str
|
|
target_language: str
|
|
voice_id: str
|
|
duration_seconds: int
|
|
file_size: int
|
|
cost: float
|
|
quality: str
|
|
voice_clone_used: bool = False
|
|
cloned_voice_id: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class VoiceCloneInfo:
|
|
voice_id: str
|
|
voice_url: str
|
|
source_language: str
|
|
accuracy: float
|
|
file_size: int
|
|
|
|
|
|
class AudioDubbingService:
|
|
|
|
def __init__(
|
|
self,
|
|
output_dir: Optional[Path] = None,
|
|
default_voice_id: str = "Wise_Woman",
|
|
):
|
|
self.output_dir = output_dir or self._get_default_output_dir()
|
|
self.default_voice_id = default_voice_id
|
|
self._ensure_output_dir()
|
|
|
|
logger.info(f"[AudioDubbingService] Initialized with output dir: {self.output_dir}")
|
|
|
|
def _get_default_output_dir(self) -> Path:
|
|
from pathlib import Path
|
|
return Path(__file__).resolve().parents[3] / "data" / "media" / "dubbed_audio"
|
|
|
|
def _ensure_output_dir(self) -> None:
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _download_audio(self, source: str) -> tuple[bytes, str]:
|
|
if source.startswith(("http://", "https://")):
|
|
import httpx
|
|
with httpx.Client(timeout=60.0) as client:
|
|
response = client.get(source)
|
|
response.raise_for_status()
|
|
content_type = response.headers.get("content-type", "audio/mpeg")
|
|
return response.content, content_type
|
|
else:
|
|
path = Path(source)
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Audio file not found: {source}")
|
|
return path.read_bytes(), self._get_mime_type(path)
|
|
|
|
def _get_mime_type(self, path: Path) -> str:
|
|
ext = path.suffix.lower()
|
|
mime_types = {
|
|
".mp3": "audio/mpeg",
|
|
".wav": "audio/wav",
|
|
".m4a": "audio/mp4",
|
|
".aac": "audio/aac",
|
|
".ogg": "audio/ogg",
|
|
".flac": "audio/flac",
|
|
}
|
|
return mime_types.get(ext, "audio/mpeg")
|
|
|
|
def _transcribe_audio(self, audio_path: str, audio_bytes: Optional[bytes] = None) -> str:
|
|
from services.llm_providers.audio_to_text_generation.gemini_audio_text import transcribe_audio
|
|
|
|
temp_path = None
|
|
try:
|
|
if audio_bytes:
|
|
import tempfile
|
|
suffix = ".mp3"
|
|
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as f:
|
|
f.write(audio_bytes)
|
|
temp_path = f.name
|
|
audio_path = temp_path
|
|
|
|
transcript = transcribe_audio(audio_path)
|
|
|
|
if not transcript:
|
|
raise RuntimeError("Failed to transcribe audio")
|
|
|
|
logger.info(f"[AudioDubbing] Transcribed {len(transcript)} characters")
|
|
return transcript
|
|
|
|
finally:
|
|
if temp_path and os.path.exists(temp_path):
|
|
os.unlink(temp_path)
|
|
|
|
def _save_audio(self, audio_bytes: bytes, suffix: str = ".mp3") -> tuple[Path, str, int]:
|
|
unique_id = str(uuid.uuid4())[:8]
|
|
filename = f"dubbed_{unique_id}{suffix}"
|
|
filepath = self.output_dir / filename
|
|
|
|
filepath.write_bytes(audio_bytes)
|
|
|
|
audio_url = f"/api/podcast/dub/audio/{filename}"
|
|
file_size = len(audio_bytes)
|
|
|
|
logger.info(f"[AudioDubbing] Saved dubbed audio: {filepath} ({file_size} bytes)")
|
|
|
|
return filepath, audio_url, file_size
|
|
|
|
def _detect_source_language(self, transcript: str) -> str:
|
|
try:
|
|
from services.llm_providers.audio_to_text_generation.gemini_audio_text import transcribe_audio
|
|
return "en"
|
|
except Exception:
|
|
return "auto"
|
|
|
|
def clone_voice_from_audio(
|
|
self,
|
|
source_audio: str,
|
|
custom_voice_id: Optional[str] = None,
|
|
accuracy: float = 0.7,
|
|
language_boost: Optional[str] = None,
|
|
user_id: Optional[str] = None,
|
|
) -> VoiceCloneInfo:
|
|
"""
|
|
Clone voice from source audio file.
|
|
|
|
Args:
|
|
source_audio: Path or URL to source audio
|
|
custom_voice_id: Custom name for the cloned voice
|
|
accuracy: Cloning accuracy (0.1-1.0, default: 0.7)
|
|
language_boost: Language to boost (e.g., "Spanish")
|
|
user_id: User ID for tracking
|
|
|
|
Returns:
|
|
VoiceCloneInfo with cloned voice details
|
|
"""
|
|
audio_bytes, content_type = self._download_audio(source_audio)
|
|
|
|
if not custom_voice_id:
|
|
unique_suffix = str(uuid.uuid4())[:8]
|
|
custom_voice_id = f"cloned_voice_{unique_suffix}"
|
|
|
|
from services.llm_providers.main_audio_generation import clone_voice
|
|
|
|
result = clone_voice(
|
|
audio_bytes=audio_bytes,
|
|
custom_voice_id=custom_voice_id,
|
|
accuracy=accuracy,
|
|
language_boost=language_boost,
|
|
user_id=user_id,
|
|
)
|
|
|
|
self._ensure_output_dir()
|
|
voice_filename = f"voice_{custom_voice_id}.mp3"
|
|
voice_path = self.output_dir / voice_filename
|
|
voice_path.write_bytes(result.preview_audio_bytes)
|
|
|
|
voice_url = f"/api/podcast/dub/voices/{voice_filename}"
|
|
|
|
logger.info(f"[AudioDubbing] Voice cloned: {custom_voice_id}")
|
|
|
|
return VoiceCloneInfo(
|
|
voice_id=custom_voice_id,
|
|
voice_url=voice_url,
|
|
source_language=language_boost or "auto",
|
|
accuracy=accuracy,
|
|
file_size=result.file_size,
|
|
)
|
|
|
|
def dub_audio_with_voice_clone(
|
|
self,
|
|
source_audio: str,
|
|
target_language: str,
|
|
source_language: Optional[str] = None,
|
|
custom_voice_id: Optional[str] = None,
|
|
accuracy: float = 0.7,
|
|
speed: float = 1.0,
|
|
emotion: str = "happy",
|
|
quality: str = "high",
|
|
user_id: Optional[str] = None,
|
|
progress_callback: Optional[Callable[[float, str], None]] = None,
|
|
) -> DubbingResult:
|
|
"""
|
|
Dub audio to target language while preserving original voice.
|
|
|
|
Pipeline: Source Audio → Voice Clone → STT → Translate → TTS (cloned voice) → Dubbed Audio
|
|
|
|
Args:
|
|
source_audio: Path or URL to source audio file
|
|
target_language: Target language for dubbing
|
|
source_language: Source language (auto-detected if None)
|
|
custom_voice_id: Custom name for the cloned voice
|
|
accuracy: Voice cloning accuracy (0.1-1.0)
|
|
speed: Speech speed (0.5-2.0)
|
|
emotion: Emotion for TTS voice
|
|
quality: Translation quality ("high" recommended for voice clone)
|
|
user_id: User ID for tracking
|
|
progress_callback: Optional callback for progress updates
|
|
|
|
Returns:
|
|
DubbingResult with dubbed audio details
|
|
"""
|
|
try:
|
|
if progress_callback:
|
|
progress_callback(0.05, "Cloning source voice...")
|
|
|
|
voice_info = self.clone_voice_from_audio(
|
|
source_audio=source_audio,
|
|
custom_voice_id=custom_voice_id,
|
|
accuracy=accuracy,
|
|
language_boost=target_language,
|
|
user_id=user_id,
|
|
)
|
|
|
|
if progress_callback:
|
|
progress_callback(0.15, "Voice cloned. Downloading audio...")
|
|
|
|
audio_bytes, content_type = self._download_audio(source_audio)
|
|
|
|
if progress_callback:
|
|
progress_callback(0.20, "Transcribing audio...")
|
|
|
|
transcript = self._transcribe_audio(source_audio, audio_bytes)
|
|
if not source_language:
|
|
source_language = self._detect_source_language(transcript)
|
|
|
|
logger.info(f"[AudioDubbing] Transcript: {transcript[:100]}...")
|
|
|
|
if progress_callback:
|
|
progress_callback(0.40, "Translating text...")
|
|
|
|
translation_result = translate_text(
|
|
text=transcript,
|
|
target_language=target_language,
|
|
source_language=source_language,
|
|
quality=TranslationQuality.HIGH,
|
|
)
|
|
translated_text = translation_result.translated_text
|
|
|
|
logger.info(f"[AudioDubbing] Translated to {target_language}: {translated_text[:100]}...")
|
|
|
|
if progress_callback:
|
|
progress_callback(0.65, "Generating dubbed audio with cloned voice...")
|
|
|
|
audio_result = generate_audio(
|
|
text=translated_text,
|
|
voice_id=voice_info.voice_id,
|
|
speed=speed,
|
|
emotion=emotion,
|
|
user_id=user_id,
|
|
language_boost=target_language,
|
|
)
|
|
|
|
if progress_callback:
|
|
progress_callback(0.90, "Saving dubbed audio...")
|
|
|
|
suffix = ".mp3"
|
|
filepath, audio_url, file_size = self._save_audio(
|
|
audio_result.audio_bytes,
|
|
suffix
|
|
)
|
|
|
|
if progress_callback:
|
|
progress_callback(1.0, "Dubbing with voice clone complete!")
|
|
|
|
voice_clone_cost = 0.05
|
|
total_cost = voice_clone_cost + translation_result.metadata.get("estimated_cost", 0.0)
|
|
|
|
logger.info(f"[AudioDubbing] Voice clone dubbing complete! Output: {filepath}")
|
|
|
|
return DubbingResult(
|
|
dubbed_audio_path=str(filepath),
|
|
dubbed_audio_url=audio_url,
|
|
original_transcript=transcript,
|
|
translated_transcript=translated_text,
|
|
source_language=source_language or "auto",
|
|
target_language=target_language,
|
|
voice_id=voice_info.voice_id,
|
|
duration_seconds=0,
|
|
file_size=file_size,
|
|
cost=total_cost,
|
|
quality=quality,
|
|
voice_clone_used=True,
|
|
cloned_voice_id=voice_info.voice_id,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"[AudioDubbing] Voice clone dubbing error: {str(e)}")
|
|
raise
|
|
|
|
def dub_audio(
|
|
self,
|
|
source_audio: str,
|
|
target_language: str,
|
|
source_language: Optional[str] = None,
|
|
voice_id: Optional[str] = None,
|
|
speed: float = 1.0,
|
|
emotion: str = "happy",
|
|
quality: str = "low",
|
|
use_voice_clone: bool = False,
|
|
custom_voice_id: Optional[str] = None,
|
|
accuracy: float = 0.7,
|
|
user_id: Optional[str] = None,
|
|
progress_callback: Optional[Callable[[float, str], None]] = None,
|
|
) -> DubbingResult:
|
|
"""
|
|
Dub audio to target language.
|
|
|
|
Pipeline: Source Audio → STT → Translate → TTS → Dubbed Audio
|
|
|
|
If use_voice_clone=True:
|
|
Pipeline: Source Audio → Voice Clone → STT → Translate → TTS (cloned voice) → Dubbed Audio
|
|
|
|
Args:
|
|
source_audio: Path or URL to source audio file
|
|
target_language: Target language for dubbing
|
|
source_language: Source language (auto-detected if None)
|
|
voice_id: Voice ID for TTS (default: "Wise_Woman")
|
|
speed: Speech speed (0.5-2.0)
|
|
emotion: Emotion for TTS voice
|
|
quality: Translation quality ("low" for DeepL, "high" for WaveSpeed)
|
|
use_voice_clone: Use voice cloning to preserve original voice (recommended for high quality)
|
|
custom_voice_id: Custom name for the cloned voice
|
|
accuracy: Voice cloning accuracy (0.1-1.0) when use_voice_clone=True
|
|
user_id: User ID for tracking
|
|
progress_callback: Optional callback for progress updates
|
|
|
|
Returns:
|
|
DubbingResult with dubbed audio details
|
|
"""
|
|
if use_voice_clone:
|
|
return self.dub_audio_with_voice_clone(
|
|
source_audio=source_audio,
|
|
target_language=target_language,
|
|
source_language=source_language,
|
|
custom_voice_id=custom_voice_id,
|
|
accuracy=accuracy,
|
|
speed=speed,
|
|
emotion=emotion,
|
|
quality=quality,
|
|
user_id=user_id,
|
|
progress_callback=progress_callback,
|
|
)
|
|
|
|
voice_id = voice_id or self.default_voice_id
|
|
translation_quality = TranslationQuality.HIGH if quality == "high" else TranslationQuality.LOW
|
|
|
|
try:
|
|
if progress_callback:
|
|
progress_callback(0.1, "Downloading source audio...")
|
|
|
|
audio_bytes, content_type = self._download_audio(source_audio)
|
|
logger.info(f"[AudioDubbing] Downloaded audio: {len(audio_bytes)} bytes")
|
|
|
|
if progress_callback:
|
|
progress_callback(0.2, "Transcribing audio...")
|
|
|
|
transcript = self._transcribe_audio(source_audio, audio_bytes)
|
|
if not source_language:
|
|
source_language = self._detect_source_language(transcript)
|
|
|
|
logger.info(f"[AudioDubbing] Transcript: {transcript[:100]}...")
|
|
|
|
if progress_callback:
|
|
progress_callback(0.4, "Translating text...")
|
|
|
|
translation_result = translate_text(
|
|
text=transcript,
|
|
target_language=target_language,
|
|
source_language=source_language,
|
|
quality=translation_quality,
|
|
)
|
|
translated_text = translation_result.translated_text
|
|
|
|
logger.info(f"[AudioDubbing] Translated to {target_language}: {translated_text[:100]}...")
|
|
|
|
if progress_callback:
|
|
progress_callback(0.6, "Generating dubbed audio...")
|
|
|
|
audio_result = generate_audio(
|
|
text=translated_text,
|
|
voice_id=voice_id,
|
|
speed=speed,
|
|
emotion=emotion,
|
|
user_id=user_id,
|
|
)
|
|
|
|
if progress_callback:
|
|
progress_callback(0.9, "Saving dubbed audio...")
|
|
|
|
suffix = ".mp3"
|
|
filepath, audio_url, file_size = self._save_audio(
|
|
audio_result.audio_bytes,
|
|
suffix
|
|
)
|
|
|
|
if progress_callback:
|
|
progress_callback(1.0, "Dubbing complete!")
|
|
|
|
cost = translation_result.metadata.get("estimated_cost", 0.0)
|
|
|
|
logger.info(f"[AudioDubbing] Complete! Output: {filepath}")
|
|
|
|
return DubbingResult(
|
|
dubbed_audio_path=str(filepath),
|
|
dubbed_audio_url=audio_url,
|
|
original_transcript=transcript,
|
|
translated_transcript=translated_text,
|
|
source_language=source_language or "auto",
|
|
target_language=target_language,
|
|
voice_id=voice_id,
|
|
duration_seconds=0,
|
|
file_size=file_size,
|
|
cost=cost,
|
|
quality=quality,
|
|
voice_clone_used=False,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"[AudioDubbing] Error: {str(e)}")
|
|
raise
|
|
|
|
def dub_audio_batch(
|
|
self,
|
|
source_audios: List[str],
|
|
target_language: str,
|
|
source_language: Optional[str] = None,
|
|
voice_id: Optional[str] = None,
|
|
speed: float = 1.0,
|
|
quality: str = "low",
|
|
user_id: Optional[str] = None,
|
|
) -> List[DubbingResult]:
|
|
"""
|
|
Dub multiple audio files to target language.
|
|
|
|
Args:
|
|
source_audios: List of audio paths/URLs
|
|
target_language: Target language
|
|
source_language: Source language (auto-detected if None)
|
|
voice_id: Voice ID for TTS
|
|
speed: Speech speed
|
|
quality: Translation quality
|
|
user_id: User ID
|
|
|
|
Returns:
|
|
List of DubbingResult
|
|
"""
|
|
results = []
|
|
|
|
for i, audio in enumerate(source_audios):
|
|
logger.info(f"[AudioDubbing] Processing {i+1}/{len(source_audios)}: {audio}")
|
|
|
|
result = self.dub_audio(
|
|
source_audio=audio,
|
|
target_language=target_language,
|
|
source_language=source_language,
|
|
voice_id=voice_id,
|
|
speed=speed,
|
|
quality=quality,
|
|
user_id=user_id,
|
|
)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
def estimate_cost(
|
|
self,
|
|
audio_duration_seconds: float,
|
|
target_language: str,
|
|
quality: str = "low",
|
|
use_voice_clone: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Estimate the cost for dubbing.
|
|
|
|
Args:
|
|
audio_duration_seconds: Duration of source audio
|
|
target_language: Target language
|
|
quality: Translation quality
|
|
use_voice_clone: Whether voice cloning is used
|
|
|
|
Returns:
|
|
Dictionary with cost breakdown
|
|
"""
|
|
estimated_chars = int(audio_duration_seconds * 15)
|
|
|
|
if quality == "low":
|
|
translation_cost = estimated_chars * 0.00001
|
|
else:
|
|
translation_cost = estimated_chars * 0.0001
|
|
|
|
tts_cost = estimated_chars * 0.001
|
|
|
|
voice_clone_cost = 0.05 if use_voice_clone else 0.0
|
|
|
|
return {
|
|
"estimated_characters": estimated_chars,
|
|
"translation_cost": translation_cost,
|
|
"tts_cost": tts_cost,
|
|
"voice_clone_cost": voice_clone_cost,
|
|
"total_cost": translation_cost + tts_cost + voice_clone_cost,
|
|
"currency": "USD",
|
|
"breakdown": {
|
|
"low_quality": {
|
|
"translation": f"${translation_cost:.4f} ({estimated_chars} chars @ $0.00001/char)",
|
|
"tts": f"${tts_cost:.4f} ({estimated_chars} chars @ $0.001/char)",
|
|
"voice_clone": f"${voice_clone_cost:.2f}" if voice_clone_cost else "N/A",
|
|
},
|
|
"high_quality": {
|
|
"translation": f"${estimated_chars * 0.0001:.4f}",
|
|
"tts": f"${tts_cost:.4f}",
|
|
"voice_clone": f"${voice_clone_cost:.2f}" if voice_clone_cost else "N/A",
|
|
}
|
|
}
|
|
}
|