From 482a600e145756d79b2cf9f336d6fcc9a7a270e3 Mon Sep 17 00:00:00 2001 From: ajaysi Date: Thu, 12 Mar 2026 17:12:15 +0530 Subject: [PATCH] "Add_structured_routing_logs_to_text_generation_modular" --- .../llm_providers/huggingface_provider.py | 28 ++++++++- .../textgen_utils/llm_text_generator.py | 43 +++++++++++++- backend/utils/logger_utils.py | 58 +++++++++++++++++++ 3 files changed, 127 insertions(+), 2 deletions(-) diff --git a/backend/services/llm_providers/huggingface_provider.py b/backend/services/llm_providers/huggingface_provider.py index 20e28bdd..3cbfd50a 100644 --- a/backend/services/llm_providers/huggingface_provider.py +++ b/backend/services/llm_providers/huggingface_provider.py @@ -55,7 +55,7 @@ from functools import lru_cache from typing import Optional, Dict, Any, List from loguru import logger -from utils.logger_utils import get_service_logger +from utils.logger_utils import get_service_logger, emit_routing_event from .routing_policy import PREMIUM_DEFAULT_MODEL, SIF_LOW_COST_MODEL_DEFAULTS # Use service-specific logger to avoid conflicts @@ -192,6 +192,7 @@ def huggingface_text_response( top_p: float = 0.9, system_prompt: Optional[str] = None, api_key: Optional[str] = None, + tenant_user_id: Optional[str] = None, ) -> str: """ Generate text response using Hugging Face Inference Providers API. @@ -264,6 +265,18 @@ def huggingface_text_response( response = None last_error = None for candidate_model in _fallback_model_sequence(model, fallback_models): + # Emit routing event for each model attempt + route_intent = "primary" if candidate_model == model else "fallback" + emit_routing_event( + logger, + flow_type="huggingface_text", + route_intent=route_intent, + provider_selected="huggingface", + model_selected=candidate_model, + tenant_user_id=tenant_user_id, + extra={"original_model": model, "api_call": True} + ) + try: response = client.chat.completions.create( model=candidate_model, @@ -324,6 +337,7 @@ def huggingface_structured_json_response( max_tokens: int = 8192, system_prompt: Optional[str] = None, api_key: Optional[str] = None, + tenant_user_id: Optional[str] = None, ) -> Dict[str, Any]: """ Generate structured JSON response using Hugging Face Inference Providers API. @@ -403,6 +417,18 @@ def huggingface_structured_json_response( last_error = None for candidate_model in _fallback_model_sequence(model, fallback_models): + # Emit routing event for each model attempt + route_intent = "primary" if candidate_model == model else "fallback" + emit_routing_event( + logger, + flow_type="huggingface_structured", + route_intent=route_intent, + provider_selected="huggingface", + model_selected=candidate_model, + tenant_user_id=tenant_user_id, + extra={"original_model": model, "api_call": True, "response_format": "json_object"} + ) + try: response = client.chat.completions.create( model=candidate_model, diff --git a/backend/services/llm_providers/textgen_utils/llm_text_generator.py b/backend/services/llm_providers/textgen_utils/llm_text_generator.py index ef96e347..228b4285 100644 --- a/backend/services/llm_providers/textgen_utils/llm_text_generator.py +++ b/backend/services/llm_providers/textgen_utils/llm_text_generator.py @@ -19,6 +19,7 @@ from ..routing_policy import ( SIF_LOW_COST_MODEL_DEFAULTS, resolve_text_provider_alias, ) +from ...utils.logger_utils import emit_routing_event PREMIUM_HF_MINIMAL_FALLBACK_MODELS = [ @@ -58,6 +59,10 @@ def llm_text_gen( resolved_flow_type = flow_type or ("sif_agent" if preferred_hf_models else "premium_tool") flow_tag = f"flow_type={resolved_flow_type}" subscription_preflight_completed = False + + # Initialize routing state for structured logging + fallback_count = 0 + fallback_models_tried = [] logger.info(f"[llm_text_gen][{flow_tag}] Starting text generation") logger.debug(f"[llm_text_gen] Prompt length: {len(prompt)} characters") @@ -138,6 +143,20 @@ def llm_text_gen( os.environ["HF_TOKEN"] = resolved_key logger.debug(f"[llm_text_gen] Using provider: {gpt_provider}, model: {model}") + + # Emit routing event for primary selection + emit_routing_event( + logger, + flow_type=resolved_flow_type, + route_intent="primary", + provider_selected=gpt_provider, + model_selected=model, + preferred_provider=preferred_provider, + fallback_count=fallback_count, + fallback_models_tried=fallback_models_tried, + tenant_user_id=user_id, + extra={"available_providers": available_providers} + ) # Map provider name to APIProvider enum (define at function scope for usage tracking) from models.subscription_models import APIProvider @@ -303,6 +322,7 @@ def llm_text_gen( max_tokens=max_tokens, system_prompt=system_instructions, allow_model_variant_fallback=hf_allow_model_variant_fallback, + tenant_user_id=user_id ) else: response_text = huggingface_text_response( @@ -312,7 +332,8 @@ def llm_text_gen( temperature=temperature, max_tokens=max_tokens, top_p=top_p, - system_prompt=system_instructions + system_prompt=system_instructions, + tenant_user_id=user_id ) else: logger.error(f"[llm_text_gen] Unknown provider: {gpt_provider}") @@ -360,16 +381,34 @@ def llm_text_gen( try: logger.info(f"[llm_text_gen][{flow_tag}] Trying SINGLE fallback provider: {fallback_provider}") actual_provider_used = fallback_provider + fallback_count += 1 + route_intent = "fallback" # Update provider enum for fallback if fallback_provider == "google": provider_enum = APIProvider.GEMINI actual_provider_name = "gemini" fallback_model = "gemini-2.0-flash-lite" + fallback_models_tried.append(fallback_model) elif fallback_provider == "huggingface": provider_enum = APIProvider.MISTRAL actual_provider_name = "huggingface" fallback_model = preferred_hf_models[0] if preferred_hf_models else PREMIUM_DEFAULT_MODEL + fallback_models_tried.append(fallback_model) + + # Emit routing event for fallback attempt + emit_routing_event( + logger, + flow_type=resolved_flow_type, + route_intent=route_intent, + provider_selected=fallback_provider, + model_selected=fallback_model, + preferred_provider=preferred_provider, + fallback_count=fallback_count, + fallback_models_tried=fallback_models_tried, + tenant_user_id=user_id, + extra={"available_providers": available_providers} + ) if fallback_provider == "google": if json_struct: @@ -402,6 +441,7 @@ def llm_text_gen( system_prompt=system_instructions, fallback_models=PREMIUM_HF_MINIMAL_FALLBACK_MODELS, allow_model_variant_fallback=True, + tenant_user_id=user_id ) else: response_text = huggingface_text_response( @@ -413,6 +453,7 @@ def llm_text_gen( system_prompt=system_instructions, fallback_models=PREMIUM_HF_MINIMAL_FALLBACK_MODELS, allow_model_variant_fallback=True, + tenant_user_id=user_id ) # TRACK USAGE after successful fallback call diff --git a/backend/utils/logger_utils.py b/backend/utils/logger_utils.py index 04752820..9cca5693 100644 --- a/backend/utils/logger_utils.py +++ b/backend/utils/logger_utils.py @@ -4,6 +4,9 @@ Logger utilities to prevent conflicts between different logging configurations. from loguru import logger import sys +import hashlib +import json +from typing import Any, Dict, Optional def safe_logger_config(format_string: str, level: str = "INFO"): @@ -51,3 +54,58 @@ def get_service_logger(service_name: str, format_string: str = None): safe_logger_config(format_string) return logger.bind(service=service_name) + + +def _mask_user_id(user_id: Optional[str]) -> str: + """Mask user ID for privacy in logs.""" + if not user_id: + return "anonymous" + return hashlib.sha256(str(user_id).encode("utf-8")).hexdigest()[:12] + + +def emit_routing_event( + logger_instance, + flow_type: str, + *, + route_intent: str = "primary", + provider_selected: str, + model_selected: str, + preferred_provider: Optional[str] = None, + fallback_count: int = 0, + fallback_models_tried: Optional[list] = None, + tenant_user_id: Optional[str] = None, + extra: Optional[Dict[str, Any]] = None, + level: str = "info" +) -> None: + """ + Emit structured routing event for LLM provider selection. + + Args: + logger_instance: Logger instance to use + flow_type: Type of flow (e.g., "sif_agent", "premium_tool") + route_intent: Route intent ("primary" or "fallback") + provider_selected: Selected provider name + model_selected: Selected model name + preferred_provider: Preferred provider (if any) + fallback_count: Number of fallback attempts made + fallback_models_tried: List of models tried as fallbacks + tenant_user_id: Tenant user ID (will be hashed) + extra: Additional fields to include + level: Log level to use + """ + payload: Dict[str, Any] = { + "flow_type": flow_type, + "route_intent": route_intent, + "provider_selected": provider_selected, + "model_selected": model_selected, + "preferred_provider": preferred_provider, + "fallback_count": fallback_count, + "fallback_models_tried": fallback_models_tried or [], + "tenant": _mask_user_id(tenant_user_id), + } + + if extra: + payload.update(extra) + + log_method = getattr(logger_instance, level.lower(), logger_instance.info) + log_method("[llm_routing] {}", json.dumps(payload, sort_keys=True, default=str))