Merge branch 'pr-375'
This commit is contained in:
@@ -11,6 +11,7 @@ from fastapi import Request, Response
|
|||||||
from fastapi.responses import JSONResponse
|
from fastapi.responses import JSONResponse
|
||||||
import time
|
import time
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from typing import Dict, List, Any
|
from typing import Dict, List, Any
|
||||||
from collections import defaultdict, deque
|
from collections import defaultdict, deque
|
||||||
@@ -29,6 +30,51 @@ from .pricing_service import PricingService
|
|||||||
|
|
||||||
from services.database import get_session_for_user, init_user_database
|
from services.database import get_session_for_user, init_user_database
|
||||||
|
|
||||||
|
|
||||||
|
USAGE_LIMITS_EMERGENCY_FAIL_OPEN_ENV = "USAGE_LIMITS_EMERGENCY_FAIL_OPEN"
|
||||||
|
USAGE_LIMIT_ENFORCEMENT_ERROR_METRICS = defaultdict(int)
|
||||||
|
|
||||||
|
|
||||||
|
def _is_usage_limits_emergency_fail_open_enabled() -> bool:
|
||||||
|
"""Allow temporary fail-open behavior during an active incident."""
|
||||||
|
return os.getenv(USAGE_LIMITS_EMERGENCY_FAIL_OPEN_ENV, "false").strip().lower() in {"1", "true", "yes", "on"}
|
||||||
|
|
||||||
|
|
||||||
|
def _record_usage_limit_enforcement_error(
|
||||||
|
*,
|
||||||
|
reason: str,
|
||||||
|
user_id: str,
|
||||||
|
path: str,
|
||||||
|
provider: Optional[APIProvider],
|
||||||
|
fail_open_enabled: bool,
|
||||||
|
):
|
||||||
|
"""Capture structured logs + lightweight counters for enforcement infrastructure failures."""
|
||||||
|
provider_value = provider.value if provider else "unknown"
|
||||||
|
metric_key = f"{reason}:{provider_value}"
|
||||||
|
USAGE_LIMIT_ENFORCEMENT_ERROR_METRICS[metric_key] += 1
|
||||||
|
|
||||||
|
logger.bind(
|
||||||
|
event="usage_limit_enforcement_error",
|
||||||
|
reason=reason,
|
||||||
|
user_id=user_id,
|
||||||
|
path=path,
|
||||||
|
provider=provider_value,
|
||||||
|
fail_open_enabled=fail_open_enabled,
|
||||||
|
metric_key=metric_key,
|
||||||
|
metric_count=USAGE_LIMIT_ENFORCEMENT_ERROR_METRICS[metric_key],
|
||||||
|
).error("Usage limit enforcement infrastructure failure")
|
||||||
|
|
||||||
|
|
||||||
|
def _build_usage_enforcement_unavailable_response() -> JSONResponse:
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=503,
|
||||||
|
content={
|
||||||
|
"error": "Usage limit enforcement unavailable",
|
||||||
|
"message": "Unable to validate usage limits right now. Please retry shortly.",
|
||||||
|
"code": "USAGE_LIMIT_ENFORCEMENT_UNAVAILABLE",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
class DatabaseAPIMonitor:
|
class DatabaseAPIMonitor:
|
||||||
"""Database-backed API monitoring with usage tracking and subscription management."""
|
"""Database-backed API monitoring with usage tracking and subscription management."""
|
||||||
|
|
||||||
@@ -152,10 +198,10 @@ async def check_usage_limits_middleware(request: Request, user_id: str, request_
|
|||||||
path = ""
|
path = ""
|
||||||
|
|
||||||
db = None
|
db = None
|
||||||
|
fail_open_enabled = _is_usage_limits_emergency_fail_open_enabled()
|
||||||
|
api_provider = None
|
||||||
try:
|
try:
|
||||||
db = get_session_for_user(user_id)
|
db = get_session_for_user(user_id)
|
||||||
if not db:
|
|
||||||
return None
|
|
||||||
|
|
||||||
api_monitor = DatabaseAPIMonitor()
|
api_monitor = DatabaseAPIMonitor()
|
||||||
|
|
||||||
@@ -172,6 +218,22 @@ async def check_usage_limits_middleware(request: Request, user_id: str, request_
|
|||||||
if not api_provider:
|
if not api_provider:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# Protected route with provider metering must not silently bypass enforcement.
|
||||||
|
if not db:
|
||||||
|
_record_usage_limit_enforcement_error(
|
||||||
|
reason="database_session_unavailable",
|
||||||
|
user_id=user_id,
|
||||||
|
path=path,
|
||||||
|
provider=api_provider,
|
||||||
|
fail_open_enabled=fail_open_enabled,
|
||||||
|
)
|
||||||
|
if fail_open_enabled:
|
||||||
|
logger.warning(
|
||||||
|
f"Emergency fail-open active ({USAGE_LIMITS_EMERGENCY_FAIL_OPEN_ENV}); bypassing usage limit enforcement for {user_id}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
return _build_usage_enforcement_unavailable_response()
|
||||||
|
|
||||||
# Use provided request body or read it if not provided
|
# Use provided request body or read it if not provided
|
||||||
if request_body is None:
|
if request_body is None:
|
||||||
try:
|
try:
|
||||||
@@ -220,18 +282,66 @@ async def check_usage_limits_middleware(request: Request, user_id: str, request_
|
|||||||
logger.warning(f"Tables missing for user {user_id}, attempting initialization...")
|
logger.warning(f"Tables missing for user {user_id}, attempting initialization...")
|
||||||
try:
|
try:
|
||||||
init_user_database(user_id)
|
init_user_database(user_id)
|
||||||
# Don't retry immediately to avoid loops, just let this request pass
|
_record_usage_limit_enforcement_error(
|
||||||
|
reason="missing_usage_tables",
|
||||||
|
user_id=user_id,
|
||||||
|
path=path,
|
||||||
|
provider=api_provider,
|
||||||
|
fail_open_enabled=fail_open_enabled,
|
||||||
|
)
|
||||||
|
if fail_open_enabled:
|
||||||
|
logger.warning(
|
||||||
|
f"Emergency fail-open active ({USAGE_LIMITS_EMERGENCY_FAIL_OPEN_ENV}); bypassing usage limit enforcement after table initialization for {user_id}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
return _build_usage_enforcement_unavailable_response()
|
||||||
except Exception as init_error:
|
except Exception as init_error:
|
||||||
logger.error(f"Failed to initialize database for user {user_id}: {init_error}")
|
logger.error(f"Failed to initialize database for user {user_id}: {init_error}")
|
||||||
|
_record_usage_limit_enforcement_error(
|
||||||
|
reason="database_init_failed",
|
||||||
|
user_id=user_id,
|
||||||
|
path=path,
|
||||||
|
provider=api_provider,
|
||||||
|
fail_open_enabled=fail_open_enabled,
|
||||||
|
)
|
||||||
|
if fail_open_enabled:
|
||||||
|
logger.warning(
|
||||||
|
f"Emergency fail-open active ({USAGE_LIMITS_EMERGENCY_FAIL_OPEN_ENV}); bypassing usage limit enforcement after failed database init for {user_id}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
return _build_usage_enforcement_unavailable_response()
|
||||||
else:
|
else:
|
||||||
raise e
|
_record_usage_limit_enforcement_error(
|
||||||
|
reason="operational_error",
|
||||||
|
user_id=user_id,
|
||||||
|
path=path,
|
||||||
|
provider=api_provider,
|
||||||
|
fail_open_enabled=fail_open_enabled,
|
||||||
|
)
|
||||||
|
if fail_open_enabled:
|
||||||
|
logger.warning(
|
||||||
|
f"Emergency fail-open active ({USAGE_LIMITS_EMERGENCY_FAIL_OPEN_ENV}); bypassing usage limit enforcement after operational error for {user_id}: {e}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
return _build_usage_enforcement_unavailable_response()
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
_record_usage_limit_enforcement_error(
|
||||||
|
reason="unexpected_enforcement_error",
|
||||||
|
user_id=user_id,
|
||||||
|
path=path,
|
||||||
|
provider=api_provider,
|
||||||
|
fail_open_enabled=fail_open_enabled,
|
||||||
|
)
|
||||||
logger.error(f"Error checking usage limits: {e}")
|
logger.error(f"Error checking usage limits: {e}")
|
||||||
# Don't block requests if usage checking fails
|
if fail_open_enabled:
|
||||||
return None
|
logger.warning(
|
||||||
|
f"Emergency fail-open active ({USAGE_LIMITS_EMERGENCY_FAIL_OPEN_ENV}); bypassing usage limit enforcement for {user_id}"
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
return _build_usage_enforcement_unavailable_response()
|
||||||
finally:
|
finally:
|
||||||
if db is not None:
|
if db is not None:
|
||||||
db.close()
|
db.close()
|
||||||
|
|||||||
Reference in New Issue
Block a user