"Add_structured_routing_logs_to_text_generation_modular"

This commit is contained in:
ajaysi
2026-03-12 17:12:15 +05:30
parent e85c7d442e
commit 482a600e14
3 changed files with 127 additions and 2 deletions

View File

@@ -55,7 +55,7 @@ from functools import lru_cache
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
from loguru import logger from loguru import logger
from utils.logger_utils import get_service_logger from utils.logger_utils import get_service_logger, emit_routing_event
from .routing_policy import PREMIUM_DEFAULT_MODEL, SIF_LOW_COST_MODEL_DEFAULTS from .routing_policy import PREMIUM_DEFAULT_MODEL, SIF_LOW_COST_MODEL_DEFAULTS
# Use service-specific logger to avoid conflicts # Use service-specific logger to avoid conflicts
@@ -192,6 +192,7 @@ def huggingface_text_response(
top_p: float = 0.9, top_p: float = 0.9,
system_prompt: Optional[str] = None, system_prompt: Optional[str] = None,
api_key: Optional[str] = None, api_key: Optional[str] = None,
tenant_user_id: Optional[str] = None,
) -> str: ) -> str:
""" """
Generate text response using Hugging Face Inference Providers API. Generate text response using Hugging Face Inference Providers API.
@@ -264,6 +265,18 @@ def huggingface_text_response(
response = None response = None
last_error = None last_error = None
for candidate_model in _fallback_model_sequence(model, fallback_models): for candidate_model in _fallback_model_sequence(model, fallback_models):
# Emit routing event for each model attempt
route_intent = "primary" if candidate_model == model else "fallback"
emit_routing_event(
logger,
flow_type="huggingface_text",
route_intent=route_intent,
provider_selected="huggingface",
model_selected=candidate_model,
tenant_user_id=tenant_user_id,
extra={"original_model": model, "api_call": True}
)
try: try:
response = client.chat.completions.create( response = client.chat.completions.create(
model=candidate_model, model=candidate_model,
@@ -324,6 +337,7 @@ def huggingface_structured_json_response(
max_tokens: int = 8192, max_tokens: int = 8192,
system_prompt: Optional[str] = None, system_prompt: Optional[str] = None,
api_key: Optional[str] = None, api_key: Optional[str] = None,
tenant_user_id: Optional[str] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Generate structured JSON response using Hugging Face Inference Providers API. Generate structured JSON response using Hugging Face Inference Providers API.
@@ -403,6 +417,18 @@ def huggingface_structured_json_response(
last_error = None last_error = None
for candidate_model in _fallback_model_sequence(model, fallback_models): for candidate_model in _fallback_model_sequence(model, fallback_models):
# Emit routing event for each model attempt
route_intent = "primary" if candidate_model == model else "fallback"
emit_routing_event(
logger,
flow_type="huggingface_structured",
route_intent=route_intent,
provider_selected="huggingface",
model_selected=candidate_model,
tenant_user_id=tenant_user_id,
extra={"original_model": model, "api_call": True, "response_format": "json_object"}
)
try: try:
response = client.chat.completions.create( response = client.chat.completions.create(
model=candidate_model, model=candidate_model,

View File

@@ -19,6 +19,7 @@ from ..routing_policy import (
SIF_LOW_COST_MODEL_DEFAULTS, SIF_LOW_COST_MODEL_DEFAULTS,
resolve_text_provider_alias, resolve_text_provider_alias,
) )
from ...utils.logger_utils import emit_routing_event
PREMIUM_HF_MINIMAL_FALLBACK_MODELS = [ PREMIUM_HF_MINIMAL_FALLBACK_MODELS = [
@@ -58,6 +59,10 @@ def llm_text_gen(
resolved_flow_type = flow_type or ("sif_agent" if preferred_hf_models else "premium_tool") resolved_flow_type = flow_type or ("sif_agent" if preferred_hf_models else "premium_tool")
flow_tag = f"flow_type={resolved_flow_type}" flow_tag = f"flow_type={resolved_flow_type}"
subscription_preflight_completed = False subscription_preflight_completed = False
# Initialize routing state for structured logging
fallback_count = 0
fallback_models_tried = []
logger.info(f"[llm_text_gen][{flow_tag}] Starting text generation") logger.info(f"[llm_text_gen][{flow_tag}] Starting text generation")
logger.debug(f"[llm_text_gen] Prompt length: {len(prompt)} characters") logger.debug(f"[llm_text_gen] Prompt length: {len(prompt)} characters")
@@ -138,6 +143,20 @@ def llm_text_gen(
os.environ["HF_TOKEN"] = resolved_key os.environ["HF_TOKEN"] = resolved_key
logger.debug(f"[llm_text_gen] Using provider: {gpt_provider}, model: {model}") logger.debug(f"[llm_text_gen] Using provider: {gpt_provider}, model: {model}")
# Emit routing event for primary selection
emit_routing_event(
logger,
flow_type=resolved_flow_type,
route_intent="primary",
provider_selected=gpt_provider,
model_selected=model,
preferred_provider=preferred_provider,
fallback_count=fallback_count,
fallback_models_tried=fallback_models_tried,
tenant_user_id=user_id,
extra={"available_providers": available_providers}
)
# Map provider name to APIProvider enum (define at function scope for usage tracking) # Map provider name to APIProvider enum (define at function scope for usage tracking)
from models.subscription_models import APIProvider from models.subscription_models import APIProvider
@@ -303,6 +322,7 @@ def llm_text_gen(
max_tokens=max_tokens, max_tokens=max_tokens,
system_prompt=system_instructions, system_prompt=system_instructions,
allow_model_variant_fallback=hf_allow_model_variant_fallback, allow_model_variant_fallback=hf_allow_model_variant_fallback,
tenant_user_id=user_id
) )
else: else:
response_text = huggingface_text_response( response_text = huggingface_text_response(
@@ -312,7 +332,8 @@ def llm_text_gen(
temperature=temperature, temperature=temperature,
max_tokens=max_tokens, max_tokens=max_tokens,
top_p=top_p, top_p=top_p,
system_prompt=system_instructions system_prompt=system_instructions,
tenant_user_id=user_id
) )
else: else:
logger.error(f"[llm_text_gen] Unknown provider: {gpt_provider}") logger.error(f"[llm_text_gen] Unknown provider: {gpt_provider}")
@@ -360,16 +381,34 @@ def llm_text_gen(
try: try:
logger.info(f"[llm_text_gen][{flow_tag}] Trying SINGLE fallback provider: {fallback_provider}") logger.info(f"[llm_text_gen][{flow_tag}] Trying SINGLE fallback provider: {fallback_provider}")
actual_provider_used = fallback_provider actual_provider_used = fallback_provider
fallback_count += 1
route_intent = "fallback"
# Update provider enum for fallback # Update provider enum for fallback
if fallback_provider == "google": if fallback_provider == "google":
provider_enum = APIProvider.GEMINI provider_enum = APIProvider.GEMINI
actual_provider_name = "gemini" actual_provider_name = "gemini"
fallback_model = "gemini-2.0-flash-lite" fallback_model = "gemini-2.0-flash-lite"
fallback_models_tried.append(fallback_model)
elif fallback_provider == "huggingface": elif fallback_provider == "huggingface":
provider_enum = APIProvider.MISTRAL provider_enum = APIProvider.MISTRAL
actual_provider_name = "huggingface" actual_provider_name = "huggingface"
fallback_model = preferred_hf_models[0] if preferred_hf_models else PREMIUM_DEFAULT_MODEL fallback_model = preferred_hf_models[0] if preferred_hf_models else PREMIUM_DEFAULT_MODEL
fallback_models_tried.append(fallback_model)
# Emit routing event for fallback attempt
emit_routing_event(
logger,
flow_type=resolved_flow_type,
route_intent=route_intent,
provider_selected=fallback_provider,
model_selected=fallback_model,
preferred_provider=preferred_provider,
fallback_count=fallback_count,
fallback_models_tried=fallback_models_tried,
tenant_user_id=user_id,
extra={"available_providers": available_providers}
)
if fallback_provider == "google": if fallback_provider == "google":
if json_struct: if json_struct:
@@ -402,6 +441,7 @@ def llm_text_gen(
system_prompt=system_instructions, system_prompt=system_instructions,
fallback_models=PREMIUM_HF_MINIMAL_FALLBACK_MODELS, fallback_models=PREMIUM_HF_MINIMAL_FALLBACK_MODELS,
allow_model_variant_fallback=True, allow_model_variant_fallback=True,
tenant_user_id=user_id
) )
else: else:
response_text = huggingface_text_response( response_text = huggingface_text_response(
@@ -413,6 +453,7 @@ def llm_text_gen(
system_prompt=system_instructions, system_prompt=system_instructions,
fallback_models=PREMIUM_HF_MINIMAL_FALLBACK_MODELS, fallback_models=PREMIUM_HF_MINIMAL_FALLBACK_MODELS,
allow_model_variant_fallback=True, allow_model_variant_fallback=True,
tenant_user_id=user_id
) )
# TRACK USAGE after successful fallback call # TRACK USAGE after successful fallback call

View File

@@ -4,6 +4,9 @@ Logger utilities to prevent conflicts between different logging configurations.
from loguru import logger from loguru import logger
import sys import sys
import hashlib
import json
from typing import Any, Dict, Optional
def safe_logger_config(format_string: str, level: str = "INFO"): def safe_logger_config(format_string: str, level: str = "INFO"):
@@ -51,3 +54,58 @@ def get_service_logger(service_name: str, format_string: str = None):
safe_logger_config(format_string) safe_logger_config(format_string)
return logger.bind(service=service_name) return logger.bind(service=service_name)
def _mask_user_id(user_id: Optional[str]) -> str:
"""Mask user ID for privacy in logs."""
if not user_id:
return "anonymous"
return hashlib.sha256(str(user_id).encode("utf-8")).hexdigest()[:12]
def emit_routing_event(
logger_instance,
flow_type: str,
*,
route_intent: str = "primary",
provider_selected: str,
model_selected: str,
preferred_provider: Optional[str] = None,
fallback_count: int = 0,
fallback_models_tried: Optional[list] = None,
tenant_user_id: Optional[str] = None,
extra: Optional[Dict[str, Any]] = None,
level: str = "info"
) -> None:
"""
Emit structured routing event for LLM provider selection.
Args:
logger_instance: Logger instance to use
flow_type: Type of flow (e.g., "sif_agent", "premium_tool")
route_intent: Route intent ("primary" or "fallback")
provider_selected: Selected provider name
model_selected: Selected model name
preferred_provider: Preferred provider (if any)
fallback_count: Number of fallback attempts made
fallback_models_tried: List of models tried as fallbacks
tenant_user_id: Tenant user ID (will be hashed)
extra: Additional fields to include
level: Log level to use
"""
payload: Dict[str, Any] = {
"flow_type": flow_type,
"route_intent": route_intent,
"provider_selected": provider_selected,
"model_selected": model_selected,
"preferred_provider": preferred_provider,
"fallback_count": fallback_count,
"fallback_models_tried": fallback_models_tried or [],
"tenant": _mask_user_id(tenant_user_id),
}
if extra:
payload.update(extra)
log_method = getattr(logger_instance, level.lower(), logger_instance.info)
log_method("[llm_routing] {}", json.dumps(payload, sort_keys=True, default=str))