Fix merge conflicts and resolve circular import issues

- Resolve conflict markers in logging_config.py, main.py, app.py
- Fix circular imports in story_writer services (image/audio/video generation)
  by using lazy imports for get_story_media_write_dir
- Restore clean versions of:
  - sif_agents.py
  - tenant_provider_config.py
  - personalization_service.py
  - huggingface_provider.py
  - main_text_generation.py
  - logger_utils.py
- Use setup_clean_logging() consistently across app.py and main.py
- Restore verbose_mode handling in start_alwrity_backend.py
This commit is contained in:
ajaysi
2026-03-22 10:45:05 +05:30
parent d412275748
commit d557bd4918
13 changed files with 232 additions and 1179 deletions

View File

@@ -10,8 +10,6 @@ Key Features:
- Comprehensive error handling and logging
- Automatic API key management
- Support for various Hugging Face models via Inference Providers
- Explicit fallback model sequences
- Client caching for performance
Best Practices:
1. Use structured output for complex, multi-field responses
@@ -49,24 +47,35 @@ Last Updated: January 2025
"""
import os
import sys
from pathlib import Path
import json
import re
from functools import lru_cache
from typing import Optional, Dict, Any, List
from typing import Optional, Dict, Any
from dotenv import load_dotenv
# Fix the environment loading path - load from backend directory
current_dir = Path(__file__).parent.parent # services directory
backend_dir = current_dir.parent # backend directory
env_path = backend_dir / '.env'
if env_path.exists():
load_dotenv(env_path)
print(f"Loaded .env from: {env_path}")
else:
# Fallback to current directory
load_dotenv()
print(f"No .env found at {env_path}, using current directory")
from loguru import logger
from utils.logger_utils import get_service_logger, emit_routing_event
<<<<<<< HEAD
from .routing_policy import PREMIUM_DEFAULT_MODEL, SIF_LOW_COST_MODEL_DEFAULTS
=======
>>>>>>> pr-421
from utils.logger_utils import get_service_logger
# Use service-specific logger to avoid conflicts
logger = get_service_logger("huggingface_provider")
from tenacity import (
retry,
retry_if_exception,
stop_after_attempt,
wait_random_exponential,
)
@@ -81,57 +90,13 @@ except ImportError:
logger.warn("OpenAI library not available. Install with: pip install openai")
HF_FALLBACK_MODELS = [
PREMIUM_DEFAULT_MODEL,
"openai/gpt-oss-120b:groq",
"moonshotai/Kimi-K2-Instruct-0905:groq",
"meta-llama/Llama-3.1-8B-Instruct:groq",
SIF_LOW_COST_MODEL_DEFAULTS[0],
"mistralai/Mistral-7B-Instruct-v0.3:groq",
]
def _should_retry_hf_error(exc: Exception) -> bool:
"""Determine if an error should trigger a retry based on error type and message."""
if isinstance(exc, NotFoundError):
return False # Don't retry model not found errors
msg = str(exc).lower()
# Don't retry authentication errors
if any(keyword in msg for keyword in ["unauthorized", "forbidden", "401", "403", "invalid api key"]):
return False
# Don't retry billing/quota errors
if any(keyword in msg for keyword in ["insufficient", "quota", "billing", "payment", "credits", "balance"]):
return False
# Retry rate limiting and server errors
if any(keyword in msg for keyword in ["rate limit", "429", "500", "502", "503", "504", "timeout"]):
return True
# Default to retry for unknown errors
return True
def _classify_hf_error(exc: Exception) -> str:
"""Classify Hugging Face errors for better error reporting."""
msg = str(exc).lower()
if any(keyword in msg for keyword in ["insufficient", "quota", "billing", "payment", "credits", "balance"]):
return "billing_or_quota"
if any(keyword in msg for keyword in ["unauthorized", "forbidden", "401", "403"]):
return "auth_or_permission"
if "not found" in msg or "404" in msg:
return "model_not_found"
if any(keyword in msg for keyword in ["rate limit", "429"]):
return "rate_limit"
if any(keyword in msg for keyword in ["timeout", "500", "502", "503", "504"]):
return "server_error"
return "unknown"
def _error_details(exc: Exception) -> Dict[str, str]:
"""Extract error details for logging."""
return {
"type": type(exc).__name__,
"message": str(exc),
"repr": repr(exc),
}
def _candidate_model_variants(model: str):
"""Yield model ids to try for a single logical model preference."""
if not model:
@@ -147,9 +112,8 @@ def _candidate_model_variants(model: str):
yield base_model
def _fallback_model_sequence(model: str, fallback_models: Optional[List[str]] = None):
"""Generate a sequence of models to try as fallbacks."""
sequence = [model] + (fallback_models or HF_FALLBACK_MODELS)
def _fallback_model_sequence(model: str):
sequence = [model] + HF_FALLBACK_MODELS
seen = set()
for preferred_model in sequence:
for candidate in _candidate_model_variants(preferred_model):
@@ -157,10 +121,9 @@ def _fallback_model_sequence(model: str, fallback_models: Optional[List[str]] =
seen.add(candidate)
yield candidate
def get_huggingface_api_key(explicit_api_key: Optional[str] = None) -> str:
def get_huggingface_api_key() -> str:
"""Get Hugging Face API key with proper error handling."""
api_key = explicit_api_key or os.getenv('HF_TOKEN')
api_key = os.getenv('HF_TOKEN')
if not api_key:
error_msg = "HF_TOKEN environment variable is not set. Please set it in your .env file."
logger.error(error_msg)
@@ -174,32 +137,14 @@ def get_huggingface_api_key(explicit_api_key: Optional[str] = None) -> str:
return api_key
@lru_cache(maxsize=16)
def _get_hf_client(api_key: str):
"""Get cached Hugging Face client for better performance."""
return OpenAI(base_url="https://router.huggingface.co/v1", api_key=api_key)
@retry(
retry=retry_if_exception(_should_retry_hf_error),
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(6),
)
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def huggingface_text_response(
prompt: str,
model: str = PREMIUM_DEFAULT_MODEL,
fallback_models: Optional[List[str]] = None,
model: str = "openai/gpt-oss-120b:groq",
temperature: float = 0.7,
max_tokens: int = 2048,
top_p: float = 0.9,
system_prompt: Optional[str] = None,
<<<<<<< HEAD
api_key: Optional[str] = None,
tenant_user_id: Optional[str] = None,
=======
tenant_user_id: Optional[str] = None
>>>>>>> pr-421
system_prompt: Optional[str] = None
) -> str:
"""
Generate text response using Hugging Face Inference Providers API.
@@ -209,13 +154,11 @@ def huggingface_text_response(
Args:
prompt (str): The input prompt for the AI model
model (str): Hugging Face model identifier (default: PREMIUM_DEFAULT_MODEL)
fallback_models (list, optional): Custom fallback models to try
model (str): Hugging Face model identifier (default: "openai/gpt-oss-120b:groq")
temperature (float): Controls randomness (0.0-1.0)
max_tokens (int): Maximum tokens in response
top_p (float): Nucleus sampling parameter (0.0-1.0)
system_prompt (str, optional): System instruction for the model
api_key (str, optional): Explicit API key override
Returns:
str: Generated text response
@@ -228,17 +171,32 @@ def huggingface_text_response(
- Set max_tokens based on expected response length
- Use system_prompt to guide model behavior
- Handle errors gracefully in calling functions
Example:
result = huggingface_text_response(
prompt="Write a blog post about AI",
model="openai/gpt-oss-120b:groq",
temperature=0.7,
max_tokens=2048,
system_prompt="You are a professional content writer."
)
"""
try:
if not OPENAI_AVAILABLE:
raise ImportError("OpenAI library not available. Install with: pip install openai")
# Get API key with proper error handling
hf_api_key = get_huggingface_api_key(api_key)
logger.info(f"🔑 Hugging Face API key loaded: {bool(hf_api_key)} (length: {len(hf_api_key) if hf_api_key else 0})")
api_key = get_huggingface_api_key()
logger.info(f"🔑 Hugging Face API key loaded: {bool(api_key)} (length: {len(api_key) if api_key else 0})")
if not api_key:
raise Exception("HF_TOKEN not found in environment variables")
# Initialize Hugging Face client
client = _get_hf_client(hf_api_key)
client = OpenAI(
base_url=f"https://router.huggingface.co/hf/v1",
api_key=api_key,
)
logger.info("✅ Hugging Face client initialized for text response")
# Prepare input for the API
@@ -269,41 +227,13 @@ def huggingface_text_response(
logger.info("🚀 Making Hugging Face API call (chat completion)...")
# Add rate limiting to prevent expensive API calls
import time
time.sleep(1) # 1 second delay between API calls
response = None
last_error = None
<<<<<<< HEAD
for candidate_model in _fallback_model_sequence(model, fallback_models):
# Emit routing event for each model attempt
route_intent = "primary" if candidate_model == model else "fallback"
emit_routing_event(
logger,
flow_type="huggingface_text",
route_intent=route_intent,
provider_selected="huggingface",
model_selected=candidate_model,
tenant_user_id=tenant_user_id,
extra={"original_model": model, "api_call": True}
)
=======
fallback_models_tried = []
fallback_count = 0
for candidate_model in _fallback_model_sequence(model):
fallback_models_tried.append(candidate_model)
route_intent = "primary" if fallback_count == 0 else "fallback"
emit_routing_event(
logger,
flow_type="text_generation",
route_intent=route_intent,
provider_selected="huggingface",
model_selected=candidate_model,
preferred_provider="huggingface",
fallback_count=fallback_count,
fallback_models_tried=fallback_models_tried,
tenant_user_id=tenant_user_id,
extra={"hf_request_type": "text"},
)
>>>>>>> pr-421
try:
response = client.chat.completions.create(
model=candidate_model,
@@ -313,67 +243,41 @@ def huggingface_text_response(
max_tokens=max_tokens
)
if candidate_model != model:
logger.warning("HF text fallback model used: {}", candidate_model)
logger.warning("HF text generation switched to fallback model: {}", candidate_model)
break
except NotFoundError as nf_err:
last_error = nf_err
<<<<<<< HEAD
logger.warning("HF text model not found: {}", candidate_model)
continue
except Exception as call_err:
last_error = call_err
logger.warning("HF text call failed for model {}: {}", candidate_model, _error_details(call_err))
=======
fallback_count += 1
logger.warning("HF model not found: {}. Trying fallback model.", candidate_model)
>>>>>>> pr-421
continue
if response is None:
raise last_error or RuntimeError("All fallback models failed")
raise last_error or Exception("Hugging Face text generation failed: all fallback models failed")
# Extract text from response
generated_text = response.choices[0].message.content or ""
generated_text = response.choices[0].message.content
# Clean up the response
generated_text = re.sub(r'```[a-zA-Z]*\n?', '', generated_text)
generated_text = re.sub(r'```\n?', '', generated_text)
generated_text = generated_text.strip()
if generated_text:
# Remove any markdown formatting if present
generated_text = re.sub(r'```[a-zA-Z]*\n?', '', generated_text)
generated_text = re.sub(r'```\n?', '', generated_text)
generated_text = generated_text.strip()
logger.info(f"✅ Hugging Face text response generated successfully (length: {len(generated_text)})")
return generated_text
except Exception as exc:
details = _error_details(exc)
logger.error(
"❌ Hugging Face text generation failed | error_class={} | type={} | message={} | repr={}",
_classify_hf_error(exc),
details["type"],
details["message"],
details["repr"],
)
raise Exception(f"Hugging Face text generation failed: {exc}") from exc
except Exception as e:
logger.error(f"❌ Hugging Face text generation failed: {str(e)}")
raise Exception(f"Hugging Face text generation failed: {str(e)}")
@retry(
retry=retry_if_exception(_should_retry_hf_error),
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(6),
)
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def huggingface_structured_json_response(
prompt: str,
schema: Dict[str, Any],
model: str = PREMIUM_DEFAULT_MODEL,
fallback_models: Optional[List[str]] = None,
model: str = "openai/gpt-oss-120b:groq",
temperature: float = 0.7,
max_tokens: int = 8192,
system_prompt: Optional[str] = None,
<<<<<<< HEAD
api_key: Optional[str] = None,
tenant_user_id: Optional[str] = None,
=======
tenant_user_id: Optional[str] = None
>>>>>>> pr-421
system_prompt: Optional[str] = None
) -> Dict[str, Any]:
"""
Generate structured JSON response using Hugging Face Inference Providers API.
@@ -384,12 +288,10 @@ def huggingface_structured_json_response(
Args:
prompt (str): The input prompt for the AI model
schema (dict): JSON schema defining the expected output structure
model (str): Hugging Face model identifier (default: PREMIUM_DEFAULT_MODEL)
fallback_models (list, optional): Custom fallback models to try
model (str): Hugging Face model identifier (default: "openai/gpt-oss-120b:groq")
temperature (float): Controls randomness (0.0-1.0). Use 0.1-0.3 for structured output
max_tokens (int): Maximum tokens in response. Use 8192 for complex outputs
system_prompt (str, optional): System instruction for the model
api_key (str, optional): Explicit API key override
Returns:
dict: Parsed JSON response matching the provided schema
@@ -403,17 +305,42 @@ def huggingface_structured_json_response(
- Set max_tokens to 8192 for complex multi-field responses
- Avoid deeply nested schemas with many required fields
- Test with smaller outputs first, then scale up
Example:
schema = {
"type": "object",
"properties": {
"tasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string"},
"description": {"type": "string"}
}
}
}
}
}
result = huggingface_structured_json_response(prompt, schema, temperature=0.2, max_tokens=8192)
"""
try:
if not OPENAI_AVAILABLE:
raise ImportError("OpenAI library not available. Install with: pip install openai")
# Get API key with proper error handling
hf_api_key = get_huggingface_api_key(api_key)
logger.info(f"🔑 Hugging Face API key loaded: {bool(hf_api_key)} (length: {len(hf_api_key) if hf_api_key else 0})")
api_key = get_huggingface_api_key()
logger.info(f"🔑 Hugging Face API key loaded: {bool(api_key)} (length: {len(api_key) if api_key else 0})")
if not api_key:
raise Exception("HF_TOKEN not found in environment variables")
# Initialize OpenAI client with Hugging Face base URL
client = _get_hf_client(hf_api_key)
# Use standard Inference API endpoint
client = OpenAI(
base_url=f"https://router.huggingface.co/hf/v1",
api_key=api_key,
)
logger.info("✅ Hugging Face client initialized for structured JSON response")
# Prepare input for the API
@@ -427,6 +354,7 @@ def huggingface_structured_json_response(
})
# Add user prompt with JSON instruction
# For HF models, explicit JSON instruction in prompt is often better than response_format
json_instruction = "Please respond with valid JSON that matches the provided schema."
messages.append({
"role": "user",
@@ -445,14 +373,13 @@ def huggingface_structured_json_response(
logger.info("🚀 Making Hugging Face structured API call...")
# Make the API call using standard Chat Completions
logger.info("🚀 Making Hugging Face API call (chat completion)...")
# Add JSON schema to prompt for guidance
json_schema_str = json.dumps(schema, indent=2)
messages[-1]["content"] += f"\n\nJSON Schema:\n{json_schema_str}"
<<<<<<< HEAD
response = None
last_error = None
=======
# Add rate limiting to prevent expensive API calls
import time
time.sleep(1) # 1 second delay between API calls
@@ -460,23 +387,7 @@ def huggingface_structured_json_response(
try:
response = None
last_error = None
fallback_models_tried = []
fallback_count = 0
for candidate_model in _fallback_model_sequence(model):
fallback_models_tried.append(candidate_model)
route_intent = "primary" if fallback_count == 0 else "fallback"
emit_routing_event(
logger,
flow_type="text_generation",
route_intent=route_intent,
provider_selected="huggingface",
model_selected=candidate_model,
preferred_provider="huggingface",
fallback_count=fallback_count,
fallback_models_tried=fallback_models_tried,
tenant_user_id=tenant_user_id,
extra={"hf_request_type": "structured_json"},
)
try:
response = client.chat.completions.create(
model=candidate_model,
@@ -490,45 +401,23 @@ def huggingface_structured_json_response(
break
except NotFoundError as nf_err:
last_error = nf_err
fallback_count += 1
logger.warning("HF structured model not found: {}. Trying fallback model.", candidate_model)
continue
>>>>>>> pr-421
for candidate_model in _fallback_model_sequence(model, fallback_models):
# Emit routing event for each model attempt
route_intent = "primary" if candidate_model == model else "fallback"
emit_routing_event(
logger,
flow_type="huggingface_structured",
route_intent=route_intent,
provider_selected="huggingface",
model_selected=candidate_model,
tenant_user_id=tenant_user_id,
extra={"original_model": model, "api_call": True, "response_format": "json_object"}
)
if response is None:
raise last_error or Exception("Hugging Face structured generation failed: all fallback models failed")
response_text = response.choices[0].message.content
# Clean up response text if needed
response_text = response_text.strip()
if response_text.startswith("```json"):
response_text = response_text[7:]
if response_text.endswith("```"):
response_text = response_text[:-3]
response_text = response_text.strip()
try:
<<<<<<< HEAD
response = client.chat.completions.create(
model=candidate_model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
response_format={"type": "json_object"}
)
if candidate_model != model:
logger.warning("HF structured fallback model used: {}", candidate_model)
break
except Exception as err:
last_error = err
if isinstance(err, NotFoundError):
logger.warning("HF structured model not found: {}", candidate_model)
continue
msg = str(err).lower()
if "422" in msg or "not supported" in msg:
=======
parsed_json = json.loads(response_text)
logger.info("✅ Hugging Face structured JSON response parsed successfully")
return parsed_json
@@ -556,75 +445,43 @@ def huggingface_structured_json_response(
response = None
last_error = None
for candidate_model in _fallback_model_sequence(model):
fallback_models_tried.append(candidate_model)
route_intent = "primary" if fallback_count == 0 else "fallback"
emit_routing_event(
logger,
flow_type="text_generation",
route_intent=route_intent,
provider_selected="huggingface",
model_selected=candidate_model,
preferred_provider="huggingface",
fallback_count=fallback_count,
fallback_models_tried=fallback_models_tried,
tenant_user_id=tenant_user_id,
extra={"hf_request_type": "structured_json_no_response_format"},
)
>>>>>>> pr-421
try:
response = client.chat.completions.create(
model=candidate_model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
max_tokens=max_tokens
)
if candidate_model != model:
logger.warning("HF structured fallback(no response_format) model: {}", candidate_model)
logger.warning("HF structured no-response_format fallback model: {}", candidate_model)
break
<<<<<<< HEAD
except Exception as second_err:
last_error = second_err
=======
except NotFoundError as nf_err:
last_error = nf_err
fallback_count += 1
logger.warning("HF structured model not found (no response_format path): {}", candidate_model)
>>>>>>> pr-421
continue
if response is None:
raise last_error or RuntimeError("All fallback models failed")
if response is None:
raise last_error or e
response_text = response.choices[0].message.content
# ... (same parsing logic would apply, simplified here for brevity)
try:
return json.loads(response_text)
except:
# Regex fallback
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
return json.loads(json_match.group())
return {"error": "Failed to parse JSON response", "raw_response": response_text}
raise e
response_text = (response.choices[0].message.content or "").strip()
# Clean up response text if needed
if response_text.startswith("```json"):
response_text = response_text[7:]
if response_text.endswith("```"):
response_text = response_text[:-3]
response_text = response_text.strip()
try:
parsed_json = json.loads(response_text)
logger.info("✅ Hugging Face structured JSON response parsed successfully")
return parsed_json
except json.JSONDecodeError:
json_match = re.search(r"\{.*\}", response_text, re.DOTALL)
if json_match:
return json.loads(json_match.group())
return {"error": "Failed to parse JSON response", "raw_response": response_text}
except Exception as exc:
details = _error_details(exc)
logger.error(
"❌ Hugging Face structured JSON generation failed | error_class={} | type={} | message={} | repr={}",
_classify_hf_error(exc),
details["type"],
details["message"],
details["repr"],
)
raise Exception(f"Hugging Face structured JSON generation failed: {exc}") from exc
except Exception as e:
error_msg = str(e) if str(e) else repr(e)
error_type = type(e).__name__
logger.error(f"❌ Hugging Face structured JSON generation failed: {error_type}: {error_msg}")
logger.error(f"❌ Full exception details: {repr(e)}")
import traceback
logger.error(f"❌ Traceback: {traceback.format_exc()}")
raise Exception(f"Hugging Face structured JSON generation failed: {error_type}: {error_msg}")
def get_available_models() -> list:
"""
@@ -634,15 +491,14 @@ def get_available_models() -> list:
list: List of available model identifiers
"""
return [
PREMIUM_DEFAULT_MODEL,
"openai/gpt-oss-120b:groq",
"moonshotai/Kimi-K2-Instruct-0905:groq",
"Qwen/Qwen2.5-VL-7B-Instruct",
"meta-llama/Llama-3.1-8B-Instruct:groq",
"microsoft/Phi-3-medium-4k-instruct:groq",
SIF_LOW_COST_MODEL_DEFAULTS[0]
"mistralai/Mistral-7B-Instruct-v0.3:groq"
]
def validate_model(model: str) -> bool:
"""
Validate if a model identifier is supported.

View File

@@ -2,8 +2,6 @@
This service provides the main LLM text generation functionality,
migrated from the legacy lib/gpt_providers/text_generation/main_text_generation.py
This is a clean version that imports from modular components to avoid merge conflicts.
"""
import os
@@ -13,47 +11,9 @@ from datetime import datetime
from loguru import logger
from fastapi import HTTPException
# Import all functionality from our modular textgen_utils package
from .textgen_utils import (
llm_text_gen,
check_gpt_provider,
get_api_key,
_normalize_provider,
_parse_csv_env,
_resolve_provider_sequence,
_map_logical_model_to_provider_model,
_resolve_model_sequence,
)
# Re-export all the main functions for backward compatibility
__all__ = [
"llm_text_gen",
"check_gpt_provider",
"get_api_key",
"_normalize_provider",
"_parse_csv_env",
"_resolve_provider_sequence",
"_map_logical_model_to_provider_model",
"_resolve_model_sequence",
]
# Maintain any additional constants or configurations that might be needed
PREMIUM_HF_MINIMAL_FALLBACK_MODELS = [
"openai/gpt-oss-120b:groq",
]
# Legacy compatibility - any imports that other modules might expect
from .gemini_provider import gemini_text_response, gemini_structured_json_response
from .huggingface_provider import huggingface_text_response, huggingface_structured_json_response
<<<<<<< HEAD
from .tenant_provider_config import tenant_provider_config_resolver
from .routing_policy import (
PREMIUM_DEFAULT_MODEL,
SIF_LOW_COST_MODEL_DEFAULTS,
resolve_text_provider_alias,
)
=======
from ...utils.logger_utils import emit_routing_event
def llm_text_gen(
@@ -93,14 +53,17 @@ def llm_text_gen(
frequency_penalty = 0.0
presence_penalty = 0.0
# Check for GPT_PROVIDER environment variable
env_provider = os.getenv('GPT_PROVIDER', '').lower()
if env_provider in ['gemini', 'google']:
provider_cfg = tenant_provider_config_resolver.resolve(
modality="text",
user_id=user_id,
)
selected_provider = (provider_cfg.selected_providers or [None])[0]
if selected_provider in ["gemini", "google"]:
gpt_provider = "google"
model = "gemini-2.0-flash-001"
elif env_provider in ['hf_response_api', 'huggingface', 'hf']:
model = provider_cfg.model_policy.get("default_model") or "gemini-2.0-flash-001"
elif selected_provider == "huggingface":
gpt_provider = "huggingface"
model = "mistralai/Mistral-7B-Instruct-v0.3:groq"
model = provider_cfg.model_policy.get("default_model") or "mistralai/Mistral-7B-Instruct-v0.3:groq"
# Default blog characteristics
blog_tone = "Professional"
@@ -110,64 +73,32 @@ def llm_text_gen(
blog_output_format = "markdown"
blog_length = 2000
# Check which providers have API keys available using APIKeyManager
api_key_manager = APIKeyManager()
available_providers = []
if api_key_manager.get_api_key("gemini"):
available_providers.append("google")
if api_key_manager.get_api_key("hf_token"):
available_providers.append("huggingface")
for provider in ("google", "huggingface"):
if get_api_key(provider, user_id=user_id):
available_providers.append(provider)
preferred_provider = env_provider or None
flow_type = "text_generation"
route_intent = "primary"
fallback_count = 0
fallback_models_tried = []
# If no environment variable set, auto-detect based on available keys
if not env_provider:
# Prefer Google Gemini if available, otherwise use Hugging Face
if "google" in available_providers:
gpt_provider = "google"
model = "gemini-2.0-flash-001"
elif "huggingface" in available_providers:
gpt_provider = "huggingface"
model = "mistralai/Mistral-7B-Instruct-v0.3:groq"
if gpt_provider not in available_providers:
logger.warning(f"[llm_text_gen] Provider {gpt_provider} unavailable for user {user_id}, falling back.")
if available_providers:
gpt_provider = available_providers[0]
else:
logger.error("[llm_text_gen] No API keys found for supported providers.")
raise RuntimeError("No LLM API keys configured. Configure GEMINI_API_KEY or HF_TOKEN to enable AI responses.")
else:
# Environment variable was set, validate it's supported
if gpt_provider not in available_providers:
logger.warning(f"[llm_text_gen] Provider {gpt_provider} not available, falling back to available providers")
if "google" in available_providers:
gpt_provider = "google"
model = "gemini-2.0-flash-001"
elif "huggingface" in available_providers:
gpt_provider = "huggingface"
model = "mistralai/Mistral-7B-Instruct-v0.3:groq"
else:
raise RuntimeError("No supported providers available.")
raise RuntimeError("No LLM API keys configured for tenant or environment defaults.")
# Ensure downstream provider clients (currently env-based) receive resolved key
resolved_key = get_api_key(gpt_provider, user_id=user_id)
if gpt_provider == "google" and resolved_key:
os.environ["GEMINI_API_KEY"] = resolved_key
os.environ.setdefault("GOOGLE_API_KEY", resolved_key)
elif gpt_provider == "huggingface" and resolved_key:
os.environ["HF_TOKEN"] = resolved_key
if gpt_provider == "huggingface" and preferred_hf_models:
model = preferred_hf_models[0]
logger.info(f"[llm_text_gen] Using preferred low-cost HF model: {model}")
fallback_models_tried.append(model)
logger.debug(f"[llm_text_gen] Using provider: {gpt_provider}, model: {model}")
emit_routing_event(
logger,
flow_type=flow_type,
route_intent=route_intent,
provider_selected=gpt_provider,
model_selected=model,
preferred_provider=preferred_provider,
fallback_count=fallback_count,
fallback_models_tried=fallback_models_tried,
tenant_user_id=user_id,
extra={"available_providers": available_providers},
)
# Map provider name to APIProvider enum (define at function scope for usage tracking)
from models.subscription_models import APIProvider
@@ -311,8 +242,7 @@ def llm_text_gen(
model=model,
temperature=temperature,
max_tokens=max_tokens,
system_prompt=system_instructions,
tenant_user_id=user_id
system_prompt=system_instructions
)
else:
response_text = huggingface_text_response(
@@ -321,8 +251,7 @@ def llm_text_gen(
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
system_prompt=system_instructions,
tenant_user_id=user_id
system_prompt=system_instructions
)
else:
logger.error(f"[llm_text_gen] Unknown provider: {gpt_provider}")
@@ -366,34 +295,17 @@ def llm_text_gen(
try:
logger.info(f"[llm_text_gen] Trying SINGLE fallback provider: {fallback_provider}")
actual_provider_used = fallback_provider
fallback_count += 1
route_intent = "fallback"
# Update provider enum for fallback
if fallback_provider == "google":
provider_enum = APIProvider.GEMINI
actual_provider_name = "gemini"
fallback_model = "gemini-2.0-flash-lite"
fallback_models_tried.append(fallback_model)
elif fallback_provider == "huggingface":
provider_enum = APIProvider.MISTRAL
actual_provider_name = "huggingface"
fallback_model = "mistralai/Mistral-7B-Instruct-v0.3:groq"
fallback_models_tried.append(fallback_model)
emit_routing_event(
logger,
flow_type=flow_type,
route_intent=route_intent,
provider_selected=fallback_provider,
model_selected=fallback_model,
preferred_provider=preferred_provider,
fallback_count=fallback_count,
fallback_models_tried=fallback_models_tried,
tenant_user_id=user_id,
extra={"available_providers": available_providers},
)
if fallback_provider == "google":
if json_struct:
response_text = gemini_structured_json_response(
@@ -422,8 +334,7 @@ def llm_text_gen(
model="mistralai/Mistral-7B-Instruct-v0.3:groq",
temperature=temperature,
max_tokens=max_tokens,
system_prompt=system_instructions,
tenant_user_id=user_id
system_prompt=system_instructions
)
else:
response_text = huggingface_text_response(
@@ -432,8 +343,7 @@ def llm_text_gen(
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
system_prompt=system_instructions,
tenant_user_id=user_id
system_prompt=system_instructions
)
# TRACK USAGE after successful fallback call
@@ -472,18 +382,16 @@ def check_gpt_provider(gpt_provider: str) -> bool:
supported_providers = ["google", "huggingface"]
return gpt_provider in supported_providers
def get_api_key(gpt_provider: str) -> Optional[str]:
def get_api_key(gpt_provider: str, user_id: Optional[str] = None) -> Optional[str]:
"""Get API key for the specified provider."""
try:
api_key_manager = APIKeyManager()
provider_mapping = {
"google": "gemini",
"huggingface": "hf_token"
"huggingface": "huggingface"
}
mapped_provider = provider_mapping.get(gpt_provider, gpt_provider)
return api_key_manager.get_api_key(mapped_provider)
key, _source = tenant_provider_config_resolver.resolve_provider_key(mapped_provider, user_id=user_id)
return key
except Exception as e:
logger.error(f"[get_api_key] Error getting API key for {gpt_provider}: {str(e)}")
return None
>>>>>>> pr-421

View File

@@ -1,88 +1,3 @@
<<<<<<< HEAD
"""Tenant-aware provider configuration and API key resolution for LLM providers."""
from __future__ import annotations
import os
import time
from typing import Dict, Optional
from loguru import logger
from services.database import get_session_for_user
from models.onboarding import APIKey, OnboardingSession
_PROVIDER_KEY_MAP = {
"google": "gemini",
"gemini": "gemini",
"huggingface": "hf_token",
"hf": "hf_token",
"hf_response_api": "hf_token",
}
_PROVIDER_ENV_MAP = {
"gemini": "GEMINI_API_KEY",
"hf_token": "HF_TOKEN",
}
_CACHE_TTL_SECONDS = int(os.getenv("TENANT_PROVIDER_CACHE_TTL", "60"))
_cache: Dict[str, tuple[float, Optional[str]]] = {}
def _cache_key(user_id: Optional[str], provider_key: str) -> str:
return f"{user_id or 'global'}::{provider_key}"
def _normalize_provider(provider: str) -> str:
return _PROVIDER_KEY_MAP.get((provider or "").lower(), (provider or "").lower())
def get_tenant_api_key(user_id: Optional[str], provider: str) -> Optional[str]:
provider_key = _normalize_provider(provider)
ck = _cache_key(user_id, provider_key)
cached = _cache.get(ck)
now = time.time()
if cached and (now - cached[0]) < _CACHE_TTL_SECONDS:
return cached[1]
key: Optional[str] = None
if user_id:
db = None
try:
db = get_session_for_user(user_id)
if db:
record = (
db.query(APIKey.key)
.join(OnboardingSession, APIKey.session_id == OnboardingSession.id)
.filter(OnboardingSession.user_id == user_id, APIKey.provider == provider_key)
.order_by(APIKey.updated_at.desc())
.first()
)
if record and record[0]:
key = record[0]
except Exception as exc:
logger.debug("tenant api-key lookup failed for user={}, provider={}: {}", user_id, provider_key, exc)
finally:
if db:
db.close()
if not key:
env_var = _PROVIDER_ENV_MAP.get(provider_key)
if env_var:
key = os.getenv(env_var)
_cache[ck] = (now, key)
return key
def get_available_text_providers(user_id: Optional[str]) -> list[str]:
providers = []
if get_tenant_api_key(user_id, "gemini"):
providers.append("google")
if get_tenant_api_key(user_id, "huggingface"):
providers.append("huggingface")
return providers
=======
from __future__ import annotations
import os
@@ -251,4 +166,3 @@ class TenantProviderConfigResolver:
tenant_provider_config_resolver = TenantProviderConfigResolver()
>>>>>>> pr-420