"feat:enhance-podcast-topic-ai"

This commit is contained in:
ajaysi
2026-03-11 19:09:27 +05:30
parent e472861967
commit 01881bb405
51 changed files with 3627 additions and 218 deletions

View File

@@ -83,6 +83,7 @@ from utils.logger_utils import get_service_logger
logger = get_service_logger("gemini_provider")
from tenacity import (
retry,
retry_if_exception,
stop_after_attempt,
wait_random_exponential,
)
@@ -114,7 +115,27 @@ def get_gemini_api_key() -> str:
return api_key
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def _is_non_retryable_gemini_error(exc: Exception) -> bool:
"""Skip retries for deterministic quota exhaustion and auth errors."""
msg = str(exc).lower()
return (
"resource_exhausted" in msg
or "quota exceeded" in msg
or "free_tier" in msg
or "requestsperday" in msg
or "authentication" in msg
or "permission denied" in msg
or "invalid api key" in msg
)
def _should_retry_gemini_error(exc: Exception) -> bool:
return not _is_non_retryable_gemini_error(exc)
@retry(
retry=retry_if_exception(_should_retry_gemini_error),
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(6),
)
def gemini_text_response(prompt, temperature, top_p, n, max_tokens, system_prompt):
"""
Generate text response using Google's Gemini Pro model.
@@ -182,7 +203,7 @@ def gemini_text_response(prompt, temperature, top_p, n, max_tokens, system_promp
#logger.info(f"Number of Token in Prompt Sent: {model.count_tokens(prompt)}")
return response.text
except Exception as err:
logger.error(f"Failed to get response from Gemini: {err}. Retrying.")
logger.error(f"Failed to get response from Gemini: {err}")
raise

View File

@@ -51,7 +51,7 @@ import sys
from pathlib import Path
import json
import re
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, List
from dotenv import load_dotenv
@@ -76,6 +76,7 @@ logger = get_service_logger("huggingface_provider")
from tenacity import (
retry,
retry_if_exception,
stop_after_attempt,
wait_random_exponential,
)
@@ -90,10 +91,10 @@ except ImportError:
logger.warn("OpenAI library not available. Install with: pip install openai")
HF_FALLBACK_MODELS = [
"openai/gpt-oss-120b:groq",
"moonshotai/Kimi-K2-Instruct-0905:groq",
"meta-llama/Llama-3.1-8B-Instruct:groq",
"mistralai/Mistral-7B-Instruct-v0.3:groq",
"openai/gpt-oss-120b:cerebras",
"moonshotai/Kimi-K2-Instruct-0905:cerebras",
"meta-llama/Llama-3.1-8B-Instruct:cerebras",
"mistralai/Mistral-7B-Instruct-v0.3:cerebras",
]
@@ -102,7 +103,7 @@ def _candidate_model_variants(model: str):
if not model:
return
# Try configured model first (supports provider suffixes like ":groq")
# Try configured model first (supports provider suffixes like ":cerebras")
yield model
# Fallback to base repo id when provider suffix is not recognized by the router
@@ -112,8 +113,13 @@ def _candidate_model_variants(model: str):
yield base_model
def _fallback_model_sequence(model: str):
sequence = [model] + HF_FALLBACK_MODELS
def _fallback_model_sequence(model: str, fallback_models: Optional[List[str]] = None):
# IMPORTANT: Do not apply implicit global fallback chains.
# Callers must explicitly provide fallback_models when they want multi-model retries.
if fallback_models:
sequence = [model] + fallback_models
else:
sequence = [model]
seen = set()
for preferred_model in sequence:
for candidate in _candidate_model_variants(preferred_model):
@@ -121,6 +127,57 @@ def _fallback_model_sequence(model: str):
seen.add(candidate)
yield candidate
def _is_non_retryable_hf_error(exc: Exception) -> bool:
"""Skip retries for deterministic HF failures (e.g., unknown model ids, billing)."""
msg = str(exc).lower()
status = getattr(exc, "status_code", None)
# Non-retryable errors
if isinstance(exc, NotFoundError) or "not found" in msg or "404" in msg:
return True
if status == 402 or "402" in msg or "depleted" in msg or "credits" in msg:
return True
if status == 401 or "unauthorized" in msg or "401" in msg:
return True
if status == 403 or "forbidden" in msg or "403" in msg:
return True
return False
def _should_retry_hf_error(exc: Exception) -> bool:
return not _is_non_retryable_hf_error(exc)
def _classify_hf_error(exc: Exception) -> str:
"""Classify HF failures for actionable logs."""
msg = str(exc).lower()
if any(token in msg for token in ["insufficient", "balance", "quota", "billing", "payment", "402"]):
return "billing_or_quota"
if "unauthorized" in msg or "forbidden" in msg or "401" in msg or "403" in msg:
return "auth_or_permission"
if "not found" in msg or "404" in msg:
return "model_not_found"
return "unknown"
def _hf_error_details(exc: Exception) -> str:
"""Return compact, actionable exception details for logs."""
status = getattr(exc, "status_code", None)
err_type = type(exc).__name__
message = str(exc)
raw_body = getattr(exc, "body", None)
details = f"type={err_type}"
if status is not None:
details += f", status={status}"
if message:
details += f", message={message}"
if raw_body:
details += f", body={raw_body}"
details += f", repr={repr(exc)}"
return details
def get_huggingface_api_key() -> str:
"""Get Hugging Face API key with proper error handling."""
api_key = os.getenv('HF_TOKEN')
@@ -137,10 +194,15 @@ def get_huggingface_api_key() -> str:
return api_key
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
@retry(
retry=retry_if_exception(_should_retry_hf_error),
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(6),
)
def huggingface_text_response(
prompt: str,
model: str = "openai/gpt-oss-120b:groq",
model: str = "openai/gpt-oss-120b:cerebras",
fallback_models: Optional[List[str]] = None,
temperature: float = 0.7,
max_tokens: int = 2048,
top_p: float = 0.9,
@@ -175,7 +237,7 @@ def huggingface_text_response(
Example:
result = huggingface_text_response(
prompt="Write a blog post about AI",
model="openai/gpt-oss-120b:groq",
model="openai/gpt-oss-120b:cerebras",
temperature=0.7,
max_tokens=2048,
system_prompt="You are a professional content writer."
@@ -194,7 +256,7 @@ def huggingface_text_response(
# Initialize Hugging Face client
client = OpenAI(
base_url=f"https://router.huggingface.co/hf/v1",
base_url="https://router.huggingface.co/v1",
api_key=api_key,
)
logger.info("✅ Hugging Face client initialized for text response")
@@ -231,27 +293,14 @@ def huggingface_text_response(
import time
time.sleep(1) # 1 second delay between API calls
response = None
last_error = None
for candidate_model in _fallback_model_sequence(model):
try:
response = client.chat.completions.create(
model=candidate_model,
messages=messages,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens
)
if candidate_model != model:
logger.warning("HF text generation switched to fallback model: {}", candidate_model)
break
except NotFoundError as nf_err:
last_error = nf_err
logger.warning("HF model not found: {}. Trying fallback model.", candidate_model)
continue
if response is None:
raise last_error or Exception("Hugging Face text generation failed: all fallback models failed")
# Call exactly the requested model; no retries, no fallbacks, no variants
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens
)
# Extract text from response
generated_text = response.choices[0].message.content
@@ -267,14 +316,31 @@ def huggingface_text_response(
return generated_text
except Exception as e:
logger.error(f"❌ Hugging Face text generation failed: {str(e)}")
error_class = _classify_hf_error(e)
error_details = _hf_error_details(e)
logger.error(f"❌ Hugging Face text generation failed: {error_details}")
# Extra diagnostics: try to capture raw response if available
if hasattr(e, 'response') and e.response is not None:
logger.error(f"🔍 HF Error Diagnostics:")
logger.error(f" - Status: {e.response.status_code}")
logger.error(f" - Headers: {dict(e.response.headers)}")
try:
body_json = e.response.json()
logger.error(f" - Body JSON: {json.dumps(body_json, indent=2)}")
except Exception:
logger.error(f" - Body Raw: {e.response.text[:1000]}")
else:
logger.error(f"🔍 No HTTP response attached to exception object.")
raise Exception(f"Hugging Face text generation failed: {str(e)}")
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def huggingface_structured_json_response(
prompt: str,
schema: Dict[str, Any],
model: str = "openai/gpt-oss-120b:groq",
model: str = "openai/gpt-oss-120b:cerebras",
fallback_models: Optional[List[str]] = None,
temperature: float = 0.7,
max_tokens: int = 8192,
system_prompt: Optional[str] = None
@@ -338,7 +404,7 @@ def huggingface_structured_json_response(
# Initialize OpenAI client with Hugging Face base URL
# Use standard Inference API endpoint
client = OpenAI(
base_url=f"https://router.huggingface.co/hf/v1",
base_url="https://router.huggingface.co/v1",
api_key=api_key,
)
logger.info("✅ Hugging Face client initialized for structured JSON response")
@@ -387,7 +453,7 @@ def huggingface_structured_json_response(
try:
response = None
last_error = None
for candidate_model in _fallback_model_sequence(model):
for candidate_model in _fallback_model_sequence(model, fallback_models):
try:
response = client.chat.completions.create(
model=candidate_model,
@@ -444,7 +510,7 @@ def huggingface_structured_json_response(
logger.info("Retrying without response_format...")
response = None
last_error = None
for candidate_model in _fallback_model_sequence(model):
for candidate_model in _fallback_model_sequence(model, fallback_models):
try:
response = client.chat.completions.create(
model=candidate_model,

View File

@@ -22,6 +22,8 @@ def llm_text_gen(
json_struct: Optional[Dict[str, Any]] = None,
user_id: str = None,
preferred_hf_models: Optional[List[str]] = None,
preferred_provider: Optional[str] = None,
flow_type: Optional[str] = None,
) -> str:
"""
Generate text using Language Model (LLM) based on the provided prompt.
@@ -39,12 +41,16 @@ def llm_text_gen(
RuntimeError: If subscription limits are exceeded or user_id is missing.
"""
try:
logger.info("[llm_text_gen] Starting text generation")
resolved_flow_type = flow_type or ("sif_agent" if preferred_hf_models else "premium_tool")
flow_tag = f"flow_type={resolved_flow_type}"
subscription_preflight_completed = False
logger.info(f"[llm_text_gen][{flow_tag}] Starting text generation")
logger.debug(f"[llm_text_gen] Prompt length: {len(prompt)} characters")
# Set default values for LLM parameters
gpt_provider = "google" # Default to Google Gemini
model = "gemini-2.0-flash-001"
gpt_provider = "huggingface" # Default to premium HF route for ALwrity AI tools
model = "openai/gpt-oss-120b:cerebras"
temperature = 0.7
max_tokens = 4000
top_p = 0.9
@@ -55,12 +61,87 @@ def llm_text_gen(
# Check for GPT_PROVIDER environment variable
env_provider = os.getenv('GPT_PROVIDER', '').lower()
if env_provider in ['gemini', 'google']:
gpt_provider = "google"
model = "gemini-2.0-flash-001"
elif env_provider in ['hf_response_api', 'huggingface', 'hf']:
gpt_provider = "huggingface"
model = "mistralai/Mistral-7B-Instruct-v0.3:groq"
provider_list = [p.strip() for p in env_provider.split(',') if p.strip()]
# Determine if we're in strict mode (single provider) or fallback mode (multiple providers)
strict_provider_mode = len(provider_list) == 1
if provider_list:
# Use first provider as primary
primary_provider = provider_list[0]
if primary_provider in ['gemini', 'google']:
gpt_provider = "google"
model = "gemini-2.0-flash-001"
elif primary_provider in ['hf_response_api', 'huggingface', 'hf']:
gpt_provider = "huggingface"
model = "openai/gpt-oss-120b:cerebras"
elif primary_provider == 'wavespeed':
gpt_provider = "wavespeed"
model = "openai/gpt-oss-120b"
else:
# Auto-detect mode
strict_provider_mode = False # Auto-detect allows fallbacks
gpt_provider = None
model = None
# Explicit per-call provider override (used by tool-specific flows like podcast maker)
if preferred_provider:
preferred_providers = [p.strip() for p in preferred_provider.split(',') if p.strip()]
# If explicit provider is set, it's strict mode (no cross-provider fallbacks)
strict_provider_mode = len(preferred_providers) == 1
primary_provider = preferred_providers[0]
if primary_provider in ['gemini', 'google']:
gpt_provider = "google"
model = "gemini-2.0-flash-001"
elif primary_provider in ['hf_response_api', 'huggingface', 'hf']:
gpt_provider = "huggingface"
model = "openai/gpt-oss-120b:cerebras"
elif primary_provider == 'wavespeed':
gpt_provider = "wavespeed"
model = "openai/gpt-oss-120b"
# Handle TEXTGEN_AI_MODELS for model selection
textgen_models_env = os.getenv('TEXTGEN_AI_MODELS', '').strip()
model_list = [m.strip() for m in textgen_models_env.split(',') if m.strip()] if textgen_models_env else []
strict_model_mode = len(model_list) == 1
# Map model names to actual provider models
if model_list:
if gpt_provider == "huggingface":
# Handle both short names and full model names
model_mapping = {
"gpt-oss": "openai/gpt-oss-120b:cerebras",
"gpt-oss-120b": "openai/gpt-oss-120b:cerebras",
"mistral": "mistralai/Mistral-7B-Instruct-v0.3:cerebras",
"mistral-7b": "mistralai/Mistral-7B-Instruct-v0.3:cerebras",
"llama": "meta-llama/Llama-3.1-8B-Instruct:cerebras",
"llama-8b": "meta-llama/Llama-3.1-8B-Instruct:cerebras",
"llama-70b": "meta-llama/Llama-3.1-70B-Instruct:cerebras"
}
# If model name contains "/", assume it's already a full model name
if "/" in model_list[0]:
model = model_list[0]
else:
model = model_mapping.get(model_list[0], model_list[0])
elif gpt_provider == "wavespeed":
# Handle both short names and full model names
model_mapping = {
"gpt-oss": "openai/gpt-oss-120b",
"gpt-oss-120b": "openai/gpt-oss-120b",
"mistral": "mistralai/Mistral-7B-Instruct-v0.3",
"mistral-7b": "mistralai/Mistral-7B-Instruct-v0.3",
"llama": "meta-llama/Llama-3.1-8B-Instruct",
"llama-8b": "meta-llama/Llama-3.1-8B-Instruct",
"llama-70b": "meta-llama/Llama-3.1-70B-Instruct"
}
# If model name contains "/", assume it's already a full model name
if "/" in model_list[0]:
model = model_list[0]
else:
model = model_mapping.get(model_list[0], model_list[0])
elif gpt_provider == "google":
model = "gemini-2.0-flash-001" # Google has fewer options
# Default blog characteristics
blog_tone = "Professional"
@@ -77,42 +158,89 @@ def llm_text_gen(
available_providers.append("google")
if api_key_manager.get_api_key("hf_token"):
available_providers.append("huggingface")
if api_key_manager.get_api_key("wavespeed"):
available_providers.append("wavespeed")
logger.info(
f"[llm_text_gen][{flow_tag}] Provider preflight: env_provider='{env_provider or 'auto'}', "
f"provider_list={provider_list}, strict_provider_mode={strict_provider_mode}, "
f"available_providers={available_providers}, preferred_provider={preferred_provider or 'none'}"
)
if model_list:
logger.info(
f"[llm_text_gen][{flow_tag}] Model configuration: model_list={model_list}, "
f"strict_model_mode={strict_model_mode}"
)
# If no environment variable set, auto-detect based on available keys
if not env_provider:
# Prefer Google Gemini if available, otherwise use Hugging Face
if "google" in available_providers:
if preferred_provider:
# Respect explicit per-call preference if the provider key exists
if gpt_provider not in available_providers:
logger.warning(
f"[llm_text_gen] Preferred provider {gpt_provider} unavailable, falling back to available providers"
)
if "huggingface" in available_providers:
gpt_provider = "huggingface"
model = "openai/gpt-oss-120b:cerebras"
elif "wavespeed" in available_providers:
gpt_provider = "wavespeed"
model = "openai/gpt-oss-120b"
elif "google" in available_providers:
gpt_provider = "google"
model = "gemini-2.0-flash-001"
else:
logger.error("[llm_text_gen] No API keys found for supported providers.")
raise RuntimeError("No LLM API keys configured. Configure GEMINI_API_KEY or HF_TOKEN to enable AI responses.")
elif preferred_hf_models and "huggingface" in available_providers:
# Low-cost SIF/agent flows pass preferred_hf_models; route directly to HF.
gpt_provider = "huggingface"
model = preferred_hf_models[0]
logger.info(f"[llm_text_gen] Using preferred low-cost HF model: {model}")
elif "google" in available_providers:
gpt_provider = "google"
model = "gemini-2.0-flash-001"
elif "huggingface" in available_providers:
gpt_provider = "huggingface"
model = "mistralai/Mistral-7B-Instruct-v0.3:groq"
model = "openai/gpt-oss-120b:cerebras"
elif "wavespeed" in available_providers:
gpt_provider = "wavespeed"
model = "openai/gpt-oss-120b"
else:
logger.error("[llm_text_gen] No API keys found for supported providers.")
raise RuntimeError("No LLM API keys configured. Configure GEMINI_API_KEY or HF_TOKEN to enable AI responses.")
else:
# Environment variable was set, validate it's supported
if gpt_provider not in available_providers:
logger.warning(f"[llm_text_gen] Provider {gpt_provider} not available, falling back to available providers")
if "google" in available_providers:
gpt_provider = "google"
model = "gemini-2.0-flash-001"
elif "huggingface" in available_providers:
gpt_provider = "huggingface"
model = "mistralai/Mistral-7B-Instruct-v0.3:groq"
if strict_provider_mode:
# Strict mode: fail if specified provider not available
raise RuntimeError(f"Provider {gpt_provider} not available. Available: {available_providers}")
else:
raise RuntimeError("No supported providers available.")
# Fallback mode: try other providers
logger.warning(f"[llm_text_gen] Provider {gpt_provider} not available, falling back to available providers")
if "google" in available_providers:
gpt_provider = "google"
model = "gemini-2.0-flash-001"
elif "huggingface" in available_providers:
gpt_provider = "huggingface"
model = "openai/gpt-oss-120b:cerebras"
elif "wavespeed" in available_providers:
gpt_provider = "wavespeed"
model = "openai/gpt-oss-120b"
else:
raise RuntimeError("No supported providers available.")
if gpt_provider == "huggingface" and preferred_hf_models:
model = preferred_hf_models[0]
logger.info(f"[llm_text_gen] Using preferred low-cost HF model: {model}")
logger.debug(f"[llm_text_gen] Using provider: {gpt_provider}, model: {model}")
logger.info(f"[llm_text_gen][{flow_tag}] Using provider={gpt_provider}, model={model}")
# Map provider name to APIProvider enum (define at function scope for usage tracking)
from models.subscription_models import APIProvider
provider_enum = None
# Store actual provider name for logging (e.g., "huggingface", "gemini")
# Store actual provider name for logging (e.g., "huggingface", "gemini", "wavespeed")
actual_provider_name = None
if gpt_provider == "google":
provider_enum = APIProvider.GEMINI
@@ -120,6 +248,9 @@ def llm_text_gen(
elif gpt_provider == "huggingface":
provider_enum = APIProvider.MISTRAL # HuggingFace maps to Mistral enum for usage tracking
actual_provider_name = "huggingface" # Keep actual provider name for logs
elif gpt_provider == "wavespeed":
provider_enum = APIProvider.WAVESPEED
actual_provider_name = "wavespeed"
if not provider_enum:
raise RuntimeError(f"Unknown provider {gpt_provider} for subscription checking")
@@ -132,6 +263,11 @@ def llm_text_gen(
from services.database import get_session_for_user
from services.subscription import UsageTrackingService, PricingService
from models.subscription_models import UsageSummary
logger.info(
f"[llm_text_gen][{flow_tag}] Starting subscription preflight for user={user_id}, "
f"provider={actual_provider_name}, model={model}"
)
db = get_session_for_user(user_id)
if not db:
@@ -162,6 +298,12 @@ def llm_text_gen(
tokens_requested=estimated_total_tokens,
actual_provider_name=actual_provider_name # Pass actual provider name for correct error messages
)
subscription_preflight_completed = True
logger.info(
f"[llm_text_gen][{flow_tag}] Subscription preflight complete: can_proceed={can_proceed}, "
f"estimated_tokens={estimated_total_tokens}, provider={actual_provider_name}"
)
if not can_proceed:
logger.warning(f"[llm_text_gen] Subscription limit exceeded for user {user_id}: {message}")
@@ -219,6 +361,32 @@ def llm_text_gen(
else:
system_instructions = system_prompt
# HF behavior: fail fast on selected model; no intra-provider model fallback chain.
hf_fallback_models: List[str] = []
# Set up model fallbacks based on strict_model_mode
if not strict_model_mode and model_list and len(model_list) > 1:
# Multi-model mode: create fallback list from TEXTGEN_AI_MODELS
if gpt_provider == "huggingface":
model_mapping = {
"gpt-oss": "openai/gpt-oss-120b:cerebras",
"gpt-oss-120b": "openai/gpt-oss-120b:cerebras",
"mistral": "mistralai/Mistral-7B-Instruct-v0.3:cerebras",
"mistral-7b": "mistralai/Mistral-7B-Instruct-v0.3:cerebras",
"llama": "meta-llama/Llama-3.1-8B-Instruct:cerebras",
"llama-8b": "meta-llama/Llama-3.1-8B-Instruct:cerebras",
"llama-70b": "meta-llama/Llama-3.1-70B-Instruct:cerebras"
}
hf_fallback_models = []
for model_name in model_list[1:]:
if "/" in model_name:
# Full model name, use as-is
hf_fallback_models.append(model_name)
else:
# Short name, map it
mapped_model = model_mapping.get(model_name, model_name)
hf_fallback_models.append(mapped_model)
# Generate response based on provider
response_text = None
actual_provider_used = gpt_provider
@@ -249,6 +417,7 @@ def llm_text_gen(
prompt=prompt,
schema=json_struct,
model=model,
fallback_models=hf_fallback_models,
temperature=temperature,
max_tokens=max_tokens,
system_prompt=system_instructions
@@ -257,6 +426,29 @@ def llm_text_gen(
response_text = huggingface_text_response(
prompt=prompt,
model=model,
fallback_models=hf_fallback_models,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
system_prompt=system_instructions
)
elif gpt_provider == "wavespeed":
from .wavespeed_provider import wavespeed_text_response, wavespeed_structured_json_response
if json_struct:
response_text = wavespeed_structured_json_response(
prompt=prompt,
schema=json_struct,
model=model,
fallback_models=None, # No fallbacks for WaveSpeed initially
temperature=temperature,
max_tokens=max_tokens,
system_prompt=system_instructions
)
else:
response_text = wavespeed_text_response(
prompt=prompt,
model=model,
fallback_models=None, # No fallbacks for WaveSpeed initially
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
@@ -264,11 +456,13 @@ def llm_text_gen(
)
else:
logger.error(f"[llm_text_gen] Unknown provider: {gpt_provider}")
raise RuntimeError("Unknown LLM provider. Supported providers: google, huggingface")
raise RuntimeError("Unknown LLM provider. Supported providers: google, huggingface, wavespeed")
# TRACK USAGE after successful API call
if response_text:
logger.info(f"[llm_text_gen] ✅ API call successful, tracking usage for user {user_id}, provider {provider_enum.value}")
logger.info(
f"[llm_text_gen][{flow_tag}] ✅ API call successful, tracking usage for user {user_id}, provider {provider_enum.value}"
)
try:
from services.intelligence.agents.agent_usage_tracking import track_agent_usage_sync
@@ -293,16 +487,37 @@ def llm_text_gen(
return response_text
except Exception as provider_error:
logger.error(f"[llm_text_gen] Provider {gpt_provider} failed: {str(provider_error)}")
logger.error(
f"[llm_text_gen][{flow_tag}] Provider {gpt_provider} failed: {str(provider_error)} | "
f"subscription_preflight_completed={subscription_preflight_completed} | model={model}"
)
# CIRCUIT BREAKER: Only try ONE fallback to prevent expensive API calls
fallback_providers = ["google", "huggingface"]
# Use provider list from environment if available, otherwise default
if provider_list and len(provider_list) > 1:
# Use the specified fallback providers from GPT_PROVIDER
fallback_providers = provider_list[1:] # Skip the primary (already tried)
else:
# Default fallback order
fallback_providers = ["google", "huggingface", "wavespeed"]
# Filter to available providers and exclude current failed provider
fallback_providers = [p for p in fallback_providers if p in available_providers and p != gpt_provider]
# Skip fallbacks if in strict provider mode
if strict_provider_mode:
logger.info(f"[llm_text_gen][{flow_tag}] Strict provider mode enabled; skipping cross-provider fallback")
fallback_providers = []
if preferred_provider:
# Caller explicitly pinned provider (e.g. podcast premium HF). Avoid cross-provider fallback noise.
logger.info(f"[llm_text_gen][{flow_tag}] preferred_provider is set; skipping cross-provider fallback")
fallback_providers = []
if fallback_providers:
fallback_provider = fallback_providers[0] # Only try the first available
try:
logger.info(f"[llm_text_gen] Trying SINGLE fallback provider: {fallback_provider}")
logger.info(f"[llm_text_gen][{flow_tag}] Trying SINGLE fallback provider: {fallback_provider}")
actual_provider_used = fallback_provider
# Update provider enum for fallback
@@ -313,7 +528,11 @@ def llm_text_gen(
elif fallback_provider == "huggingface":
provider_enum = APIProvider.MISTRAL
actual_provider_name = "huggingface"
fallback_model = "mistralai/Mistral-7B-Instruct-v0.3:groq"
fallback_model = preferred_hf_models[0] if preferred_hf_models else "openai/gpt-oss-120b:cerebras"
elif fallback_provider == "wavespeed":
provider_enum = APIProvider.WAVESPEED
actual_provider_name = "wavespeed"
fallback_model = "openai/gpt-oss-120b"
if fallback_provider == "google":
if json_struct:
@@ -340,7 +559,8 @@ def llm_text_gen(
response_text = huggingface_structured_json_response(
prompt=prompt,
schema=json_struct,
model="mistralai/Mistral-7B-Instruct-v0.3:groq",
model=fallback_model,
fallback_models=hf_fallback_models,
temperature=temperature,
max_tokens=max_tokens,
system_prompt=system_instructions
@@ -348,7 +568,30 @@ def llm_text_gen(
else:
response_text = huggingface_text_response(
prompt=prompt,
model="mistralai/Mistral-7B-Instruct-v0.3:groq",
model=fallback_model,
fallback_models=hf_fallback_models,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
system_prompt=system_instructions
)
elif fallback_provider == "wavespeed":
from .wavespeed_provider import wavespeed_text_response, wavespeed_structured_json_response
if json_struct:
response_text = wavespeed_structured_json_response(
prompt=prompt,
schema=json_struct,
model=fallback_model,
fallback_models=None,
temperature=temperature,
max_tokens=max_tokens,
system_prompt=system_instructions
)
else:
response_text = wavespeed_text_response(
prompt=prompt,
model=fallback_model,
fallback_models=None,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
@@ -357,7 +600,9 @@ def llm_text_gen(
# TRACK USAGE after successful fallback call
if response_text:
logger.info(f"[llm_text_gen] ✅ Fallback API call successful, tracking usage for user {user_id}, provider {provider_enum.value}")
logger.info(
f"[llm_text_gen][{flow_tag}] ✅ Fallback API call successful, tracking usage for user {user_id}, provider {provider_enum.value}"
)
try:
from services.intelligence.agents.agent_usage_tracking import track_agent_usage_sync
@@ -376,19 +621,19 @@ def llm_text_gen(
return response_text
except Exception as fallback_error:
logger.error(f"[llm_text_gen] Fallback provider {fallback_provider} also failed: {str(fallback_error)}")
logger.error(f"[llm_text_gen][{flow_tag}] Fallback provider {fallback_provider} also failed: {str(fallback_error)}")
# CIRCUIT BREAKER: Stop immediately to prevent expensive API calls
logger.error("[llm_text_gen] CIRCUIT BREAKER: Stopping to prevent expensive API calls.")
logger.error(f"[llm_text_gen][{flow_tag}] CIRCUIT BREAKER: Stopping to prevent expensive API calls.")
raise RuntimeError("All LLM providers failed to generate a response.")
except Exception as e:
logger.error(f"[llm_text_gen] Error during text generation: {str(e)}")
logger.error(f"[llm_text_gen][{flow_tag}] Error during text generation: {str(e)}")
raise
def check_gpt_provider(gpt_provider: str) -> bool:
"""Check if the specified GPT provider is supported."""
supported_providers = ["google", "huggingface"]
supported_providers = ["google", "huggingface", "wavespeed"]
return gpt_provider in supported_providers
def get_api_key(gpt_provider: str) -> Optional[str]:
@@ -397,7 +642,8 @@ def get_api_key(gpt_provider: str) -> Optional[str]:
api_key_manager = APIKeyManager()
provider_mapping = {
"google": "gemini",
"huggingface": "hf_token"
"huggingface": "hf_token",
"wavespeed": "wavespeed"
}
mapped_provider = provider_mapping.get(gpt_provider, gpt_provider)

View File

@@ -0,0 +1,527 @@
"""
WaveSpeed LLM Provider Module for ALwrity
This module provides functions for interacting with WaveSpeed's LLM API
using the OpenAI-compatible interface for text generation.
Key Features:
- Text response generation with retry logic
- Comprehensive error handling and logging
- Automatic API key management
- Support for gpt-oss and other WaveSpeed models
- Integration with subscription/preflight checks
Best Practices:
1. Use appropriate temperature for your use case (0.7 for creative, 0.1-0.3 for factual)
2. Set max_tokens based on expected response length
3. Use system_prompt to guide model behavior
4. Handle errors gracefully in calling functions
Usage Examples:
# Text response
result = wavespeed_text_response(prompt, temperature=0.7, max_tokens=2048)
# Structured JSON response
schema = {"type": "object", "properties": {"title": {"type": "string"}}}
result = wavespeed_structured_json_response(prompt, schema, temperature=0.2, max_tokens=8192)
Dependencies:
- openai (for WaveSpeed OpenAI-compatible API)
- tenacity (for retry logic)
- logging (for debugging)
- json (for fallback parsing)
Author: ALwrity Team
Version: 1.0
Last Updated: March 2026
"""
import os
import sys
from pathlib import Path
import json
import re
from typing import Optional, Dict, Any, List
from dotenv import load_dotenv
# Fix the environment loading path - load from backend directory
current_dir = Path(__file__).parent.parent # services directory
backend_dir = current_dir.parent # backend directory
env_path = backend_dir / '.env'
if env_path.exists():
load_dotenv(env_path)
print(f"Loaded .env from: {env_path}")
else:
# Fallback to current directory
load_dotenv()
print(f"No .env found at {env_path}, using current directory")
from loguru import logger
from utils.logger_utils import get_service_logger
# Use service-specific logger to avoid conflicts
logger = get_service_logger("wavespeed_provider")
from tenacity import (
retry,
retry_if_exception,
stop_after_attempt,
wait_random_exponential,
)
try:
from openai import OpenAI
from openai import NotFoundError
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
NotFoundError = Exception
logger.warn("OpenAI library not available. Install with: pip install openai")
# Default WaveSpeed models for fallback
WAVESPEED_FALLBACK_MODELS = [
"openai/gpt-oss-120b",
"meta-llama/Llama-3.1-8B-Instruct",
"mistralai/Mistral-7B-Instruct-v0.3",
"google/gemma-7b-it",
]
def _candidate_model_variants(model: str):
"""Yield model ids to try for a single logical model preference."""
if not model:
return
# Try configured model first
yield model
# Fallback to base repo id when provider suffix is not recognized by the router
if ":" in model:
base_model = model.split(":", 1)[0]
if base_model:
yield base_model
def _fallback_model_sequence(model: str, fallback_models: Optional[List[str]] = None):
# IMPORTANT: Do not apply implicit global fallback chains.
# Callers must explicitly provide fallback_models when they want multi-model retries.
if fallback_models:
sequence = [model] + fallback_models
else:
sequence = [model]
seen = set()
for preferred_model in sequence:
for candidate in _candidate_model_variants(preferred_model):
if candidate and candidate not in seen:
seen.add(candidate)
yield candidate
def _is_non_retryable_wavespeed_error(exc: Exception) -> bool:
"""Skip retries for deterministic WaveSpeed failures (e.g., unknown model ids, billing)."""
msg = str(exc).lower()
status = getattr(exc, "status_code", None)
# Non-retryable errors
if isinstance(exc, NotFoundError) or "not found" in msg or "404" in msg:
return True
if status == 402 or "402" in msg or "depleted" in msg or "credits" in msg:
return True
if status == 401 or "unauthorized" in msg or "401" in msg:
return True
if status == 403 or "forbidden" in msg or "403" in msg:
return True
return False
def _should_retry_wavespeed_error(exc: Exception) -> bool:
return not _is_non_retryable_wavespeed_error(exc)
def _classify_wavespeed_error(exc: Exception) -> str:
"""Classify WaveSpeed failures for actionable logs."""
msg = str(exc).lower()
if any(token in msg for token in ["insufficient", "balance", "quota", "billing", "payment", "402"]):
return "billing_or_quota"
if "unauthorized" in msg or "forbidden" in msg or "401" in msg or "403" in msg:
return "auth_or_permission"
if "not found" in msg or "404" in msg:
return "model_not_found"
return "unknown"
def _wavespeed_error_details(exc: Exception) -> str:
"""Return compact, actionable exception details for logs."""
status = getattr(exc, "status_code", None)
err_type = type(exc).__name__
message = str(exc)
raw_body = getattr(exc, "body", None)
details = f"type={err_type}"
if status is not None:
details += f", status={status}"
if message:
details += f", message={message}"
if raw_body:
details += f", body={raw_body}"
details += f", repr={repr(exc)}"
return details
def get_wavespeed_api_key() -> str:
"""Get WaveSpeed API key with proper error handling."""
api_key = os.getenv('WAVESPEED_API_KEY')
if not api_key:
error_msg = "WAVESPEED_API_KEY environment variable is not set. Please set it in your .env file."
logger.error(error_msg)
raise ValueError(error_msg)
# Validate API key format (basic check)
if not api_key or len(api_key) < 10:
error_msg = "WAVESPEED_API_KEY appears to be invalid."
logger.error(error_msg)
raise ValueError(error_msg)
return api_key
@retry(
retry=retry_if_exception(_should_retry_wavespeed_error),
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(6),
)
def wavespeed_text_response(
prompt: str,
model: str = "openai/gpt-oss-120b",
fallback_models: Optional[List[str]] = None,
temperature: float = 0.7,
max_tokens: int = 2048,
top_p: float = 0.9,
system_prompt: Optional[str] = None
) -> str:
"""
Generate text response using WaveSpeed LLM API.
This function uses the WaveSpeed OpenAI-compatible API for text generation
with built-in retry logic and error handling.
Args:
prompt (str): The input prompt for the AI model
model (str): WaveSpeed model identifier (default: "openai/gpt-oss-120b")
temperature (float): Controls randomness (0.0-1.0)
max_tokens (int): Maximum tokens in response
top_p (float): Nucleus sampling parameter (0.0-1.0)
system_prompt (str, optional): System instruction for the model
Returns:
str: Generated text response
Raises:
Exception: If API key is missing or API call fails
Best Practices:
- Use appropriate temperature for your use case (0.7 for creative, 0.1-0.3 for factual)
- Set max_tokens based on expected response length
- Use system_prompt to guide model behavior
- Handle errors gracefully in calling functions
Example:
result = wavespeed_text_response(
prompt="Write a blog post about AI",
model="openai/gpt-oss-120b",
temperature=0.7,
max_tokens=2048,
system_prompt="You are a professional content writer."
)
"""
try:
if not OPENAI_AVAILABLE:
raise ImportError("OpenAI library not available. Install with: pip install openai")
# Get API key with proper error handling
api_key = get_wavespeed_api_key()
logger.info(f"🔑 WaveSpeed API key loaded: {bool(api_key)} (length: {len(api_key) if api_key else 0})")
if not api_key:
raise Exception("WAVESPEED_API_KEY not found in environment variables")
# Initialize WaveSpeed client
client = OpenAI(
base_url="https://llm.wavespeed.ai/v1",
api_key=api_key,
)
logger.info("✅ WaveSpeed client initialized for text response")
# Prepare input for the API
messages = []
# Add system prompt if provided
if system_prompt:
messages.append({
"role": "system",
"content": system_prompt
})
# Add user prompt
messages.append({
"role": "user",
"content": prompt
})
# Add debugging for API call
logger.info(
"WaveSpeed text call | model={} | prompt_len={} | temp={} | top_p={} | max_tokens={}",
model,
len(prompt) if isinstance(prompt, str) else '<non-str>',
temperature,
top_p,
max_tokens,
)
logger.info("🚀 Making WaveSpeed API call (chat completion)...")
# Add rate limiting to prevent expensive API calls
import time
time.sleep(1) # 1 second delay between API calls
# Call exactly the requested model; no retries, no fallbacks, no variants
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens
)
# Extract text from response
generated_text = response.choices[0].message.content
# Clean up the response
if generated_text:
# Remove any markdown formatting if present
generated_text = re.sub(r'```[a-zA-Z]*\n?', '', generated_text)
generated_text = re.sub(r'```\n?', '', generated_text)
generated_text = generated_text.strip()
logger.info(f"✅ WaveSpeed text response generated successfully (length: {len(generated_text)})")
return generated_text
except Exception as e:
error_class = _classify_wavespeed_error(e)
error_details = _wavespeed_error_details(e)
logger.error(f"❌ WaveSpeed text generation failed: {error_details}")
# Extra diagnostics: try to capture raw response if available
if hasattr(e, 'response') and e.response is not None:
logger.error(f"🔍 WaveSpeed Error Diagnostics:")
logger.error(f" - Status: {e.response.status_code}")
logger.error(f" - Headers: {dict(e.response.headers)}")
try:
body_json = e.response.json()
logger.error(f" - Body JSON: {json.dumps(body_json, indent=2)}")
except Exception:
logger.error(f" - Body Raw: {e.response.text[:1000]}")
else:
logger.error(f"🔍 No HTTP response attached to exception object.")
raise Exception(f"WaveSpeed text generation failed: {str(e)}")
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def wavespeed_structured_json_response(
prompt: str,
schema: Dict[str, Any],
model: str = "openai/gpt-oss-120b",
fallback_models: Optional[List[str]] = None,
temperature: float = 0.7,
max_tokens: int = 8192,
system_prompt: Optional[str] = None
) -> Dict[str, Any]:
"""
Generate structured JSON response using WaveSpeed LLM API.
This function uses the WaveSpeed OpenAI-compatible API with structured output support
to generate JSON responses that match a provided schema.
Args:
prompt (str): The input prompt for the AI model
schema (dict): JSON schema defining the expected output structure
model (str): WaveSpeed model identifier (default: "openai/gpt-oss-120b")
temperature (float): Controls randomness (0.0-1.0). Use 0.1-0.3 for structured output
max_tokens (int): Maximum tokens in response. Use 8192 for complex outputs
system_prompt (str, optional): System instruction for the model
Returns:
dict: Parsed JSON response matching the provided schema
Raises:
Exception: If API key is missing or API call fails
Best Practices:
- Keep schemas simple and flat to avoid truncation
- Use low temperature (0.1-0.3) for consistent structured output
- Set max_tokens to 8192 for complex multi-field responses
- Avoid deeply nested schemas with many required fields
- Test with smaller outputs first, then scale up
Example:
schema = {
"type": "object",
"properties": {
"tasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string"},
"description": {"type": "string"}
}
}
}
}
}
result = wavespeed_structured_json_response(prompt, schema, temperature=0.2, max_tokens=8192)
"""
try:
if not OPENAI_AVAILABLE:
raise ImportError("OpenAI library not available. Install with: pip install openai")
# Get API key with proper error handling
api_key = get_wavespeed_api_key()
logger.info(f"🔑 WaveSpeed API key loaded: {bool(api_key)} (length: {len(api_key) if api_key else 0})")
if not api_key:
raise Exception("WAVESPEED_API_KEY not found in environment variables")
# Initialize OpenAI client with WaveSpeed base URL
client = OpenAI(
base_url="https://llm.wavespeed.ai/v1",
api_key=api_key,
)
logger.info("✅ WaveSpeed client initialized for structured JSON response")
# Prepare input for the API
messages = []
# Add system prompt if provided
if system_prompt:
messages.append({
"role": "system",
"content": system_prompt
})
# Add user prompt with JSON instruction
json_instruction = "Please respond with valid JSON that matches the provided schema."
messages.append({
"role": "user",
"content": f"{prompt}\n\n{json_instruction}"
})
# Add debugging for API call
logger.info(
"WaveSpeed structured call | model={} | prompt_len={} | schema_kind={} | temp={} | max_tokens={}",
model,
len(prompt) if isinstance(prompt, str) else '<non-str>',
type(schema).__name__,
temperature,
max_tokens,
)
logger.info("🚀 Making WaveSpeed structured API call...")
# Add JSON schema to prompt for guidance
json_schema_str = json.dumps(schema, indent=2)
messages[-1]["content"] += f"\n\nJSON Schema:\n{json_schema_str}"
# Add rate limiting to prevent expensive API calls
import time
time.sleep(1) # 1 second delay between API calls
try:
response = None
last_error = None
for candidate_model in _fallback_model_sequence(model, fallback_models):
try:
response = client.chat.completions.create(
model=candidate_model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
response_format={"type": "json_object"} # Try to enforce JSON mode if supported
)
if candidate_model != model:
logger.warning("WaveSpeed structured generation switched to fallback model: {}", candidate_model)
break
except NotFoundError as nf_err:
last_error = nf_err
logger.warning("WaveSpeed structured model not found: {}. Trying fallback model.", candidate_model)
continue
if response is None:
raise last_error or Exception("WaveSpeed structured generation failed: all fallback models failed")
response_text = response.choices[0].message.content
# Clean up response text if needed
response_text = response_text.strip()
if response_text.startswith("```json"):
response_text = response_text[7:]
if response_text.endswith("```"):
response_text = response_text[:-3]
response_text = response_text.strip()
try:
parsed_json = json.loads(response_text)
logger.info("✅ WaveSpeed structured JSON response parsed successfully")
return parsed_json
except json.JSONDecodeError as json_err:
logger.error(f"❌ JSON parsing failed: {json_err}")
logger.error(f"Raw response: {response_text}")
# Try to extract JSON from the response using regex
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
try:
extracted_json = json.loads(json_match.group())
logger.info("✅ JSON extracted using regex fallback")
return extracted_json
except json.JSONDecodeError:
pass
return {"error": "Failed to parse JSON response", "raw_response": response_text}
except Exception as e:
logger.error(f"❌ WaveSpeed API call failed: {e}")
# If 422 Unprocessable Entity (often due to response_format not supported), retry without it
if "422" in str(e) or "not supported" in str(e).lower() or isinstance(e, NotFoundError):
logger.info("Retrying without response_format...")
response = None
last_error = None
for candidate_model in _fallback_model_sequence(model, fallback_models):
try:
response = client.chat.completions.create(
model=candidate_model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
if candidate_model != model:
logger.warning("WaveSpeed structured no-response-format fallback model: {}", candidate_model)
break
except NotFoundError as nf_err:
last_error = nf_err
logger.warning("WaveSpeed structured model not found (no response_format path): {}", candidate_model)
continue
if response is None:
raise last_error or e
response_text = response.choices[0].message.content
# ... (same parsing logic would apply, simplified here for brevity)
try:
return json.loads(response_text)
except:
# Regex fallback
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
return json.loads(json_match.group())
return {"error": "Failed to parse JSON response", "raw_response": response_text}
raise e
except Exception as e:
error_msg = str(e) if str(e) else repr(e)
error_type = type(e).__name__
logger.error(f"❌ WaveSpeed structured JSON generation failed [{error_type}]: {error_msg}")
raise Exception(f"WaveSpeed structured JSON generation failed: {error_msg}")