Add standardized structured routing logs for text generation

This commit is contained in:
ي
2026-03-12 15:05:07 +05:30
parent b410ece4ca
commit d01d4af62f
3 changed files with 146 additions and 7 deletions

View File

@@ -69,7 +69,7 @@ else:
print(f"No .env found at {env_path}, using current directory")
from loguru import logger
from utils.logger_utils import get_service_logger
from utils.logger_utils import get_service_logger, emit_routing_event
# Use service-specific logger to avoid conflicts
logger = get_service_logger("huggingface_provider")
@@ -144,7 +144,8 @@ def huggingface_text_response(
temperature: float = 0.7,
max_tokens: int = 2048,
top_p: float = 0.9,
system_prompt: Optional[str] = None
system_prompt: Optional[str] = None,
tenant_user_id: Optional[str] = None
) -> str:
"""
Generate text response using Hugging Face Inference Providers API.
@@ -233,7 +234,23 @@ def huggingface_text_response(
response = None
last_error = None
fallback_models_tried = []
fallback_count = 0
for candidate_model in _fallback_model_sequence(model):
fallback_models_tried.append(candidate_model)
route_intent = "primary" if fallback_count == 0 else "fallback"
emit_routing_event(
logger,
flow_type="text_generation",
route_intent=route_intent,
provider_selected="huggingface",
model_selected=candidate_model,
preferred_provider="huggingface",
fallback_count=fallback_count,
fallback_models_tried=fallback_models_tried,
tenant_user_id=tenant_user_id,
extra={"hf_request_type": "text"},
)
try:
response = client.chat.completions.create(
model=candidate_model,
@@ -247,6 +264,7 @@ def huggingface_text_response(
break
except NotFoundError as nf_err:
last_error = nf_err
fallback_count += 1
logger.warning("HF model not found: {}. Trying fallback model.", candidate_model)
continue
@@ -277,7 +295,8 @@ def huggingface_structured_json_response(
model: str = "openai/gpt-oss-120b:groq",
temperature: float = 0.7,
max_tokens: int = 8192,
system_prompt: Optional[str] = None
system_prompt: Optional[str] = None,
tenant_user_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Generate structured JSON response using Hugging Face Inference Providers API.
@@ -387,7 +406,23 @@ def huggingface_structured_json_response(
try:
response = None
last_error = None
fallback_models_tried = []
fallback_count = 0
for candidate_model in _fallback_model_sequence(model):
fallback_models_tried.append(candidate_model)
route_intent = "primary" if fallback_count == 0 else "fallback"
emit_routing_event(
logger,
flow_type="text_generation",
route_intent=route_intent,
provider_selected="huggingface",
model_selected=candidate_model,
preferred_provider="huggingface",
fallback_count=fallback_count,
fallback_models_tried=fallback_models_tried,
tenant_user_id=tenant_user_id,
extra={"hf_request_type": "structured_json"},
)
try:
response = client.chat.completions.create(
model=candidate_model,
@@ -401,6 +436,7 @@ def huggingface_structured_json_response(
break
except NotFoundError as nf_err:
last_error = nf_err
fallback_count += 1
logger.warning("HF structured model not found: {}. Trying fallback model.", candidate_model)
continue
@@ -445,6 +481,20 @@ def huggingface_structured_json_response(
response = None
last_error = None
for candidate_model in _fallback_model_sequence(model):
fallback_models_tried.append(candidate_model)
route_intent = "primary" if fallback_count == 0 else "fallback"
emit_routing_event(
logger,
flow_type="text_generation",
route_intent=route_intent,
provider_selected="huggingface",
model_selected=candidate_model,
preferred_provider="huggingface",
fallback_count=fallback_count,
fallback_models_tried=fallback_models_tried,
tenant_user_id=tenant_user_id,
extra={"hf_request_type": "structured_json_no_response_format"},
)
try:
response = client.chat.completions.create(
model=candidate_model,
@@ -457,6 +507,7 @@ def huggingface_structured_json_response(
break
except NotFoundError as nf_err:
last_error = nf_err
fallback_count += 1
logger.warning("HF structured model not found (no response_format path): {}", candidate_model)
continue

View File

@@ -14,6 +14,7 @@ from ..onboarding.api_key_manager import APIKeyManager
from .gemini_provider import gemini_text_response, gemini_structured_json_response
from .huggingface_provider import huggingface_text_response, huggingface_structured_json_response
from ...utils.logger_utils import emit_routing_event
def llm_text_gen(
@@ -77,6 +78,12 @@ def llm_text_gen(
available_providers.append("google")
if api_key_manager.get_api_key("hf_token"):
available_providers.append("huggingface")
preferred_provider = env_provider or None
flow_type = "text_generation"
route_intent = "primary"
fallback_count = 0
fallback_models_tried = []
# If no environment variable set, auto-detect based on available keys
if not env_provider:
@@ -106,8 +113,22 @@ def llm_text_gen(
if gpt_provider == "huggingface" and preferred_hf_models:
model = preferred_hf_models[0]
logger.info(f"[llm_text_gen] Using preferred low-cost HF model: {model}")
fallback_models_tried.append(model)
logger.debug(f"[llm_text_gen] Using provider: {gpt_provider}, model: {model}")
emit_routing_event(
logger,
flow_type=flow_type,
route_intent=route_intent,
provider_selected=gpt_provider,
model_selected=model,
preferred_provider=preferred_provider,
fallback_count=fallback_count,
fallback_models_tried=fallback_models_tried,
tenant_user_id=user_id,
extra={"available_providers": available_providers},
)
# Map provider name to APIProvider enum (define at function scope for usage tracking)
from models.subscription_models import APIProvider
@@ -251,7 +272,8 @@ def llm_text_gen(
model=model,
temperature=temperature,
max_tokens=max_tokens,
system_prompt=system_instructions
system_prompt=system_instructions,
tenant_user_id=user_id
)
else:
response_text = huggingface_text_response(
@@ -260,7 +282,8 @@ def llm_text_gen(
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
system_prompt=system_instructions
system_prompt=system_instructions,
tenant_user_id=user_id
)
else:
logger.error(f"[llm_text_gen] Unknown provider: {gpt_provider}")
@@ -304,17 +327,34 @@ def llm_text_gen(
try:
logger.info(f"[llm_text_gen] Trying SINGLE fallback provider: {fallback_provider}")
actual_provider_used = fallback_provider
fallback_count += 1
route_intent = "fallback"
# Update provider enum for fallback
if fallback_provider == "google":
provider_enum = APIProvider.GEMINI
actual_provider_name = "gemini"
fallback_model = "gemini-2.0-flash-lite"
fallback_models_tried.append(fallback_model)
elif fallback_provider == "huggingface":
provider_enum = APIProvider.MISTRAL
actual_provider_name = "huggingface"
fallback_model = "mistralai/Mistral-7B-Instruct-v0.3:groq"
fallback_models_tried.append(fallback_model)
emit_routing_event(
logger,
flow_type=flow_type,
route_intent=route_intent,
provider_selected=fallback_provider,
model_selected=fallback_model,
preferred_provider=preferred_provider,
fallback_count=fallback_count,
fallback_models_tried=fallback_models_tried,
tenant_user_id=user_id,
extra={"available_providers": available_providers},
)
if fallback_provider == "google":
if json_struct:
response_text = gemini_structured_json_response(
@@ -343,7 +383,8 @@ def llm_text_gen(
model="mistralai/Mistral-7B-Instruct-v0.3:groq",
temperature=temperature,
max_tokens=max_tokens,
system_prompt=system_instructions
system_prompt=system_instructions,
tenant_user_id=user_id
)
else:
response_text = huggingface_text_response(
@@ -352,7 +393,8 @@ def llm_text_gen(
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
system_prompt=system_instructions
system_prompt=system_instructions,
tenant_user_id=user_id
)
# TRACK USAGE after successful fallback call

View File

@@ -2,8 +2,11 @@
Logger utilities to prevent conflicts between different logging configurations.
"""
import hashlib
import json
from loguru import logger
import sys
from typing import Any, Dict, List, Optional
def safe_logger_config(format_string: str, level: str = "INFO"):
@@ -51,3 +54,46 @@ def get_service_logger(service_name: str, format_string: str = None):
safe_logger_config(format_string)
return logger.bind(service=service_name)
def _mask_tenant_user_id(tenant_user_id: Optional[str]) -> Optional[str]:
"""Return a stable hash for a tenant user id so logs avoid exposing raw IDs."""
if not tenant_user_id:
return None
return hashlib.sha256(tenant_user_id.encode("utf-8")).hexdigest()[:12]
def emit_routing_event(
service_logger,
*,
flow_type: str,
route_intent: str,
provider_selected: Optional[str],
model_selected: Optional[str],
preferred_provider: Optional[str],
fallback_count: int = 0,
fallback_models_tried: Optional[List[str]] = None,
tenant_user_id: Optional[str] = None,
event_name: str = "llm_routing_event",
level: str = "INFO",
extra: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Emit a standardized structured model-routing event for AI facades."""
payload: Dict[str, Any] = {
"event_name": event_name,
"flow_type": flow_type,
"route_intent": route_intent,
"flow_type/route_intent": f"{flow_type}/{route_intent}",
"provider_selected": provider_selected,
"model_selected": model_selected,
"preferred_provider": preferred_provider,
"fallback_count": fallback_count,
"fallback_models_tried": fallback_models_tried or [],
"tenant_user_id": _mask_tenant_user_id(tenant_user_id),
}
if extra:
payload.update(extra)
log_method = getattr(service_logger, level.lower(), service_logger.info)
log_method("{}", json.dumps(payload, sort_keys=True))
return payload