diff --git a/backend/services/llm_providers/huggingface_provider.py b/backend/services/llm_providers/huggingface_provider.py index 3cbfd50a..5b23e47d 100644 --- a/backend/services/llm_providers/huggingface_provider.py +++ b/backend/services/llm_providers/huggingface_provider.py @@ -56,7 +56,10 @@ from typing import Optional, Dict, Any, List from loguru import logger from utils.logger_utils import get_service_logger, emit_routing_event +<<<<<<< HEAD from .routing_policy import PREMIUM_DEFAULT_MODEL, SIF_LOW_COST_MODEL_DEFAULTS +======= +>>>>>>> pr-421 # Use service-specific logger to avoid conflicts logger = get_service_logger("huggingface_provider") @@ -191,8 +194,12 @@ def huggingface_text_response( max_tokens: int = 2048, top_p: float = 0.9, system_prompt: Optional[str] = None, +<<<<<<< HEAD api_key: Optional[str] = None, tenant_user_id: Optional[str] = None, +======= + tenant_user_id: Optional[str] = None +>>>>>>> pr-421 ) -> str: """ Generate text response using Hugging Face Inference Providers API. @@ -264,6 +271,7 @@ def huggingface_text_response( response = None last_error = None +<<<<<<< HEAD for candidate_model in _fallback_model_sequence(model, fallback_models): # Emit routing event for each model attempt route_intent = "primary" if candidate_model == model else "fallback" @@ -277,6 +285,25 @@ def huggingface_text_response( extra={"original_model": model, "api_call": True} ) +======= + fallback_models_tried = [] + fallback_count = 0 + for candidate_model in _fallback_model_sequence(model): + fallback_models_tried.append(candidate_model) + route_intent = "primary" if fallback_count == 0 else "fallback" + emit_routing_event( + logger, + flow_type="text_generation", + route_intent=route_intent, + provider_selected="huggingface", + model_selected=candidate_model, + preferred_provider="huggingface", + fallback_count=fallback_count, + fallback_models_tried=fallback_models_tried, + tenant_user_id=tenant_user_id, + extra={"hf_request_type": "text"}, + ) +>>>>>>> pr-421 try: response = client.chat.completions.create( model=candidate_model, @@ -290,11 +317,16 @@ def huggingface_text_response( break except NotFoundError as nf_err: last_error = nf_err +<<<<<<< HEAD logger.warning("HF text model not found: {}", candidate_model) continue except Exception as call_err: last_error = call_err logger.warning("HF text call failed for model {}: {}", candidate_model, _error_details(call_err)) +======= + fallback_count += 1 + logger.warning("HF model not found: {}. Trying fallback model.", candidate_model) +>>>>>>> pr-421 continue if response is None: @@ -336,8 +368,12 @@ def huggingface_structured_json_response( temperature: float = 0.7, max_tokens: int = 8192, system_prompt: Optional[str] = None, +<<<<<<< HEAD api_key: Optional[str] = None, tenant_user_id: Optional[str] = None, +======= + tenant_user_id: Optional[str] = None +>>>>>>> pr-421 ) -> Dict[str, Any]: """ Generate structured JSON response using Hugging Face Inference Providers API. @@ -413,8 +449,51 @@ def huggingface_structured_json_response( json_schema_str = json.dumps(schema, indent=2) messages[-1]["content"] += f"\n\nJSON Schema:\n{json_schema_str}" +<<<<<<< HEAD response = None last_error = None +======= + # Add rate limiting to prevent expensive API calls + import time + time.sleep(1) # 1 second delay between API calls + + try: + response = None + last_error = None + fallback_models_tried = [] + fallback_count = 0 + for candidate_model in _fallback_model_sequence(model): + fallback_models_tried.append(candidate_model) + route_intent = "primary" if fallback_count == 0 else "fallback" + emit_routing_event( + logger, + flow_type="text_generation", + route_intent=route_intent, + provider_selected="huggingface", + model_selected=candidate_model, + preferred_provider="huggingface", + fallback_count=fallback_count, + fallback_models_tried=fallback_models_tried, + tenant_user_id=tenant_user_id, + extra={"hf_request_type": "structured_json"}, + ) + try: + response = client.chat.completions.create( + model=candidate_model, + messages=messages, + temperature=temperature, + max_tokens=max_tokens, + response_format={"type": "json_object"} # Try to enforce JSON mode if supported + ) + if candidate_model != model: + logger.warning("HF structured generation switched to fallback model: {}", candidate_model) + break + except NotFoundError as nf_err: + last_error = nf_err + fallback_count += 1 + logger.warning("HF structured model not found: {}. Trying fallback model.", candidate_model) + continue +>>>>>>> pr-421 for candidate_model in _fallback_model_sequence(model, fallback_models): # Emit routing event for each model attempt @@ -430,6 +509,7 @@ def huggingface_structured_json_response( ) try: +<<<<<<< HEAD response = client.chat.completions.create( model=candidate_model, messages=messages, @@ -448,6 +528,49 @@ def huggingface_structured_json_response( msg = str(err).lower() if "422" in msg or "not supported" in msg: +======= + parsed_json = json.loads(response_text) + logger.info("✅ Hugging Face structured JSON response parsed successfully") + return parsed_json + except json.JSONDecodeError as json_err: + logger.error(f"❌ JSON parsing failed: {json_err}") + logger.error(f"Raw response: {response_text}") + + # Try to extract JSON from the response using regex + json_match = re.search(r'\{.*\}', response_text, re.DOTALL) + if json_match: + try: + extracted_json = json.loads(json_match.group()) + logger.info("✅ JSON extracted using regex fallback") + return extracted_json + except json.JSONDecodeError: + pass + + return {"error": "Failed to parse JSON response", "raw_response": response_text} + + except Exception as e: + logger.error(f"❌ Hugging Face API call failed: {e}") + # If 422 Unprocessable Entity (often due to response_format not supported), retry without it + if "422" in str(e) or "not supported" in str(e).lower() or isinstance(e, NotFoundError): + logger.info("Retrying without response_format...") + response = None + last_error = None + for candidate_model in _fallback_model_sequence(model): + fallback_models_tried.append(candidate_model) + route_intent = "primary" if fallback_count == 0 else "fallback" + emit_routing_event( + logger, + flow_type="text_generation", + route_intent=route_intent, + provider_selected="huggingface", + model_selected=candidate_model, + preferred_provider="huggingface", + fallback_count=fallback_count, + fallback_models_tried=fallback_models_tried, + tenant_user_id=tenant_user_id, + extra={"hf_request_type": "structured_json_no_response_format"}, + ) +>>>>>>> pr-421 try: response = client.chat.completions.create( model=candidate_model, @@ -458,8 +581,15 @@ def huggingface_structured_json_response( if candidate_model != model: logger.warning("HF structured fallback(no response_format) model: {}", candidate_model) break +<<<<<<< HEAD except Exception as second_err: last_error = second_err +======= + except NotFoundError as nf_err: + last_error = nf_err + fallback_count += 1 + logger.warning("HF structured model not found (no response_format path): {}", candidate_model) +>>>>>>> pr-421 continue if response is None: diff --git a/backend/services/llm_providers/main_text_generation.py b/backend/services/llm_providers/main_text_generation.py index db529981..f6731ebb 100644 --- a/backend/services/llm_providers/main_text_generation.py +++ b/backend/services/llm_providers/main_text_generation.py @@ -45,9 +45,445 @@ PREMIUM_HF_MINIMAL_FALLBACK_MODELS = [ # Legacy compatibility - any imports that other modules might expect from .gemini_provider import gemini_text_response, gemini_structured_json_response from .huggingface_provider import huggingface_text_response, huggingface_structured_json_response +<<<<<<< HEAD from .tenant_provider_config import tenant_provider_config_resolver from .routing_policy import ( PREMIUM_DEFAULT_MODEL, SIF_LOW_COST_MODEL_DEFAULTS, resolve_text_provider_alias, ) +======= +from ...utils.logger_utils import emit_routing_event + + +def llm_text_gen( + prompt: str, + system_prompt: Optional[str] = None, + json_struct: Optional[Dict[str, Any]] = None, + user_id: str = None, + preferred_hf_models: Optional[List[str]] = None, +) -> str: + """ + Generate text using Language Model (LLM) based on the provided prompt. + + Args: + prompt (str): The prompt to generate text from. + system_prompt (str, optional): Custom system prompt to use instead of the default one. + json_struct (dict, optional): JSON schema structure for structured responses. + user_id (str): Clerk user ID for subscription checking (required). + + Returns: + str: Generated text based on the prompt. + + Raises: + RuntimeError: If subscription limits are exceeded or user_id is missing. + """ + try: + logger.info("[llm_text_gen] Starting text generation") + logger.debug(f"[llm_text_gen] Prompt length: {len(prompt)} characters") + + # Set default values for LLM parameters + gpt_provider = "google" # Default to Google Gemini + model = "gemini-2.0-flash-001" + temperature = 0.7 + max_tokens = 4000 + top_p = 0.9 + n = 1 + fp = 16 + frequency_penalty = 0.0 + presence_penalty = 0.0 + + # Check for GPT_PROVIDER environment variable + env_provider = os.getenv('GPT_PROVIDER', '').lower() + if env_provider in ['gemini', 'google']: + gpt_provider = "google" + model = "gemini-2.0-flash-001" + elif env_provider in ['hf_response_api', 'huggingface', 'hf']: + gpt_provider = "huggingface" + model = "mistralai/Mistral-7B-Instruct-v0.3:groq" + + # Default blog characteristics + blog_tone = "Professional" + blog_demographic = "Professional" + blog_type = "Informational" + blog_language = "English" + blog_output_format = "markdown" + blog_length = 2000 + + # Check which providers have API keys available using APIKeyManager + api_key_manager = APIKeyManager() + available_providers = [] + if api_key_manager.get_api_key("gemini"): + available_providers.append("google") + if api_key_manager.get_api_key("hf_token"): + available_providers.append("huggingface") + + preferred_provider = env_provider or None + flow_type = "text_generation" + route_intent = "primary" + fallback_count = 0 + fallback_models_tried = [] + + # If no environment variable set, auto-detect based on available keys + if not env_provider: + # Prefer Google Gemini if available, otherwise use Hugging Face + if "google" in available_providers: + gpt_provider = "google" + model = "gemini-2.0-flash-001" + elif "huggingface" in available_providers: + gpt_provider = "huggingface" + model = "mistralai/Mistral-7B-Instruct-v0.3:groq" + else: + logger.error("[llm_text_gen] No API keys found for supported providers.") + raise RuntimeError("No LLM API keys configured. Configure GEMINI_API_KEY or HF_TOKEN to enable AI responses.") + else: + # Environment variable was set, validate it's supported + if gpt_provider not in available_providers: + logger.warning(f"[llm_text_gen] Provider {gpt_provider} not available, falling back to available providers") + if "google" in available_providers: + gpt_provider = "google" + model = "gemini-2.0-flash-001" + elif "huggingface" in available_providers: + gpt_provider = "huggingface" + model = "mistralai/Mistral-7B-Instruct-v0.3:groq" + else: + raise RuntimeError("No supported providers available.") + + if gpt_provider == "huggingface" and preferred_hf_models: + model = preferred_hf_models[0] + logger.info(f"[llm_text_gen] Using preferred low-cost HF model: {model}") + + fallback_models_tried.append(model) + + logger.debug(f"[llm_text_gen] Using provider: {gpt_provider}, model: {model}") + emit_routing_event( + logger, + flow_type=flow_type, + route_intent=route_intent, + provider_selected=gpt_provider, + model_selected=model, + preferred_provider=preferred_provider, + fallback_count=fallback_count, + fallback_models_tried=fallback_models_tried, + tenant_user_id=user_id, + extra={"available_providers": available_providers}, + ) + + # Map provider name to APIProvider enum (define at function scope for usage tracking) + from models.subscription_models import APIProvider + provider_enum = None + # Store actual provider name for logging (e.g., "huggingface", "gemini") + actual_provider_name = None + if gpt_provider == "google": + provider_enum = APIProvider.GEMINI + actual_provider_name = "gemini" # Use "gemini" for consistency in logs + elif gpt_provider == "huggingface": + provider_enum = APIProvider.MISTRAL # HuggingFace maps to Mistral enum for usage tracking + actual_provider_name = "huggingface" # Keep actual provider name for logs + + if not provider_enum: + raise RuntimeError(f"Unknown provider {gpt_provider} for subscription checking") + + # SUBSCRIPTION CHECK - Required and strict enforcement + if not user_id: + raise RuntimeError("user_id is required for subscription checking. Please provide Clerk user ID.") + + try: + from services.database import get_session_for_user + from services.subscription import UsageTrackingService, PricingService + from models.subscription_models import UsageSummary + + db = get_session_for_user(user_id) + if not db: + logger.error(f"[llm_text_gen] Could not get database session for user {user_id}") + raise RuntimeError("Database connection failed") + try: + + usage_service = UsageTrackingService(db) + pricing_service = PricingService(db) + + # Estimate tokens from prompt (input tokens) + # CRITICAL: Use worst-case scenario (input + max_tokens) for validation to prevent abuse + # This ensures we block requests that would exceed limits even if response is longer than expected + input_tokens = int(len(prompt.split()) * 1.3) + # Worst-case estimate: assume maximum possible output tokens (max_tokens if specified) + # This prevents abuse where actual response tokens exceed the estimate + if max_tokens: + estimated_output_tokens = max_tokens # Use maximum allowed output tokens + else: + # If max_tokens not specified, use conservative estimate (input * 1.5) + estimated_output_tokens = int(input_tokens * 1.5) + estimated_total_tokens = input_tokens + estimated_output_tokens + + # Check limits using sync method from pricing service (strict enforcement) + can_proceed, message, usage_info = pricing_service.check_usage_limits( + user_id=user_id, + provider=provider_enum, + tokens_requested=estimated_total_tokens, + actual_provider_name=actual_provider_name # Pass actual provider name for correct error messages + ) + + if not can_proceed: + logger.warning(f"[llm_text_gen] Subscription limit exceeded for user {user_id}: {message}") + # Raise HTTPException(429) with usage info so frontend can display subscription modal + error_detail = { + 'error': message, + 'message': message, + 'provider': actual_provider_name or provider_enum.value, + 'usage_info': usage_info if usage_info else {} + } + raise HTTPException(status_code=429, detail=error_detail) + + # Get current usage for limit checking only + current_period = pricing_service.get_current_billing_period(user_id) or datetime.now().strftime("%Y-%m") + usage = db.query(UsageSummary).filter( + UsageSummary.user_id == user_id, + UsageSummary.billing_period == current_period + ).first() + + # No separate log here - we'll create unified log after API call and usage tracking + + finally: + db.close() + except HTTPException: + # Re-raise HTTPExceptions (e.g., 429 subscription limit) - preserve error details + raise + except RuntimeError: + # Re-raise subscription limit errors + raise + except Exception as sub_error: + # STRICT: Fail on subscription check errors + logger.error(f"[llm_text_gen] Subscription check failed for user {user_id}: {sub_error}") + raise RuntimeError(f"Subscription check failed: {str(sub_error)}") + + # Construct the system prompt if not provided + if system_prompt is None: + system_instructions = f"""You are a highly skilled content writer with a knack for creating engaging and informative content. + Your expertise spans various writing styles and formats. + + Writing Style Guidelines: + - Tone: {blog_tone} + - Target Audience: {blog_demographic} + - Content Type: {blog_type} + - Language: {blog_language} + - Output Format: {blog_output_format} + - Target Length: {blog_length} words + + Please provide responses that are: + - Well-structured and easy to read + - Engaging and informative + - Tailored to the specified tone and audience + - Professional yet accessible + - Optimized for the target content type + """ + else: + system_instructions = system_prompt + + # Generate response based on provider + response_text = None + actual_provider_used = gpt_provider + try: + if gpt_provider == "google": + if json_struct: + response_text = gemini_structured_json_response( + prompt=prompt, + schema=json_struct, + temperature=temperature, + top_p=top_p, + top_k=n, + max_tokens=max_tokens, + system_prompt=system_instructions + ) + else: + response_text = gemini_text_response( + prompt=prompt, + temperature=temperature, + top_p=top_p, + n=n, + max_tokens=max_tokens, + system_prompt=system_instructions + ) + elif gpt_provider == "huggingface": + if json_struct: + response_text = huggingface_structured_json_response( + prompt=prompt, + schema=json_struct, + model=model, + temperature=temperature, + max_tokens=max_tokens, + system_prompt=system_instructions, + tenant_user_id=user_id + ) + else: + response_text = huggingface_text_response( + prompt=prompt, + model=model, + temperature=temperature, + max_tokens=max_tokens, + top_p=top_p, + system_prompt=system_instructions, + tenant_user_id=user_id + ) + else: + logger.error(f"[llm_text_gen] Unknown provider: {gpt_provider}") + raise RuntimeError("Unknown LLM provider. Supported providers: google, huggingface") + + # TRACK USAGE after successful API call + if response_text: + logger.info(f"[llm_text_gen] ✅ API call successful, tracking usage for user {user_id}, provider {provider_enum.value}") + try: + from services.intelligence.agents.agent_usage_tracking import track_agent_usage_sync + + # Estimate tokens + tokens_input = int(len(prompt.split()) * 1.3) + + # Calculate duration (mocking it since we didn't track start time explicitly in this function) + # Ideally we should track start_time at beginning of function + duration = 0.5 + + track_agent_usage_sync( + user_id=user_id, + model_name=model, + prompt=prompt, + response_text=response_text, + duration=duration + ) + + except Exception as usage_error: + # Non-blocking: log error but don't fail the request + logger.error(f"[llm_text_gen] ❌ Failed to track usage: {usage_error}", exc_info=True) + + return response_text + except Exception as provider_error: + logger.error(f"[llm_text_gen] Provider {gpt_provider} failed: {str(provider_error)}") + + # CIRCUIT BREAKER: Only try ONE fallback to prevent expensive API calls + fallback_providers = ["google", "huggingface"] + fallback_providers = [p for p in fallback_providers if p in available_providers and p != gpt_provider] + + if fallback_providers: + fallback_provider = fallback_providers[0] # Only try the first available + try: + logger.info(f"[llm_text_gen] Trying SINGLE fallback provider: {fallback_provider}") + actual_provider_used = fallback_provider + fallback_count += 1 + route_intent = "fallback" + + # Update provider enum for fallback + if fallback_provider == "google": + provider_enum = APIProvider.GEMINI + actual_provider_name = "gemini" + fallback_model = "gemini-2.0-flash-lite" + fallback_models_tried.append(fallback_model) + elif fallback_provider == "huggingface": + provider_enum = APIProvider.MISTRAL + actual_provider_name = "huggingface" + fallback_model = "mistralai/Mistral-7B-Instruct-v0.3:groq" + fallback_models_tried.append(fallback_model) + + emit_routing_event( + logger, + flow_type=flow_type, + route_intent=route_intent, + provider_selected=fallback_provider, + model_selected=fallback_model, + preferred_provider=preferred_provider, + fallback_count=fallback_count, + fallback_models_tried=fallback_models_tried, + tenant_user_id=user_id, + extra={"available_providers": available_providers}, + ) + + if fallback_provider == "google": + if json_struct: + response_text = gemini_structured_json_response( + prompt=prompt, + schema=json_struct, + temperature=temperature, + top_p=top_p, + top_k=n, + max_tokens=max_tokens, + system_prompt=system_instructions + ) + else: + response_text = gemini_text_response( + prompt=prompt, + temperature=temperature, + top_p=top_p, + n=n, + max_tokens=max_tokens, + system_prompt=system_instructions + ) + elif fallback_provider == "huggingface": + if json_struct: + response_text = huggingface_structured_json_response( + prompt=prompt, + schema=json_struct, + model="mistralai/Mistral-7B-Instruct-v0.3:groq", + temperature=temperature, + max_tokens=max_tokens, + system_prompt=system_instructions, + tenant_user_id=user_id + ) + else: + response_text = huggingface_text_response( + prompt=prompt, + model="mistralai/Mistral-7B-Instruct-v0.3:groq", + temperature=temperature, + max_tokens=max_tokens, + top_p=top_p, + system_prompt=system_instructions, + tenant_user_id=user_id + ) + + # TRACK USAGE after successful fallback call + if response_text: + logger.info(f"[llm_text_gen] ✅ Fallback API call successful, tracking usage for user {user_id}, provider {provider_enum.value}") + try: + from services.intelligence.agents.agent_usage_tracking import track_agent_usage_sync + + # Estimate tokens + tokens_input = int(len(prompt.split()) * 1.3) + + track_agent_usage_sync( + user_id=user_id, + model_name=fallback_model, + prompt=prompt, + response_text=response_text, + duration=0.5 # Approximate duration + ) + except Exception as usage_error: + logger.error(f"[llm_text_gen] ❌ Failed to track fallback usage: {usage_error}", exc_info=True) + + return response_text + except Exception as fallback_error: + logger.error(f"[llm_text_gen] Fallback provider {fallback_provider} also failed: {str(fallback_error)}") + + # CIRCUIT BREAKER: Stop immediately to prevent expensive API calls + logger.error("[llm_text_gen] CIRCUIT BREAKER: Stopping to prevent expensive API calls.") + raise RuntimeError("All LLM providers failed to generate a response.") + + except Exception as e: + logger.error(f"[llm_text_gen] Error during text generation: {str(e)}") + raise + +def check_gpt_provider(gpt_provider: str) -> bool: + """Check if the specified GPT provider is supported.""" + supported_providers = ["google", "huggingface"] + return gpt_provider in supported_providers + +def get_api_key(gpt_provider: str) -> Optional[str]: + """Get API key for the specified provider.""" + try: + api_key_manager = APIKeyManager() + provider_mapping = { + "google": "gemini", + "huggingface": "hf_token" + } + + mapped_provider = provider_mapping.get(gpt_provider, gpt_provider) + return api_key_manager.get_api_key(mapped_provider) + except Exception as e: + logger.error(f"[get_api_key] Error getting API key for {gpt_provider}: {str(e)}") + return None +>>>>>>> pr-421 diff --git a/backend/utils/logger_utils.py b/backend/utils/logger_utils.py index 9cca5693..d569fa27 100644 --- a/backend/utils/logger_utils.py +++ b/backend/utils/logger_utils.py @@ -2,11 +2,17 @@ Logger utilities to prevent conflicts between different logging configurations. """ +import hashlib +import json from loguru import logger import sys +<<<<<<< HEAD import hashlib import json from typing import Any, Dict, Optional +======= +from typing import Any, Dict, List, Optional +>>>>>>> pr-421 def safe_logger_config(format_string: str, level: str = "INFO"): @@ -56,6 +62,7 @@ def get_service_logger(service_name: str, format_string: str = None): return logger.bind(service=service_name) +<<<<<<< HEAD def _mask_user_id(user_id: Optional[str]) -> str: """Mask user ID for privacy in logs.""" if not user_id: @@ -96,11 +103,42 @@ def emit_routing_event( payload: Dict[str, Any] = { "flow_type": flow_type, "route_intent": route_intent, +======= +def _mask_tenant_user_id(tenant_user_id: Optional[str]) -> Optional[str]: + """Return a stable hash for a tenant user id so logs avoid exposing raw IDs.""" + if not tenant_user_id: + return None + return hashlib.sha256(tenant_user_id.encode("utf-8")).hexdigest()[:12] + + +def emit_routing_event( + service_logger, + *, + flow_type: str, + route_intent: str, + provider_selected: Optional[str], + model_selected: Optional[str], + preferred_provider: Optional[str], + fallback_count: int = 0, + fallback_models_tried: Optional[List[str]] = None, + tenant_user_id: Optional[str] = None, + event_name: str = "llm_routing_event", + level: str = "INFO", + extra: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """Emit a standardized structured model-routing event for AI facades.""" + payload: Dict[str, Any] = { + "event_name": event_name, + "flow_type": flow_type, + "route_intent": route_intent, + "flow_type/route_intent": f"{flow_type}/{route_intent}", +>>>>>>> pr-421 "provider_selected": provider_selected, "model_selected": model_selected, "preferred_provider": preferred_provider, "fallback_count": fallback_count, "fallback_models_tried": fallback_models_tried or [], +<<<<<<< HEAD "tenant": _mask_user_id(tenant_user_id), } @@ -109,3 +147,13 @@ def emit_routing_event( log_method = getattr(logger_instance, level.lower(), logger_instance.info) log_method("[llm_routing] {}", json.dumps(payload, sort_keys=True, default=str)) +======= + "tenant_user_id": _mask_tenant_user_id(tenant_user_id), + } + if extra: + payload.update(extra) + + log_method = getattr(service_logger, level.lower(), service_logger.info) + log_method("{}", json.dumps(payload, sort_keys=True)) + return payload +>>>>>>> pr-421