ALwrity LinkedIn Writer: Billing Dashboard: Compact View, Billing Overview, System Health Indicator, Cost Breakdown, Usage Trends, Usage Alerts, Comprehensive API Breakdown

This commit is contained in:
ajaysi
2025-09-11 11:09:10 +05:30
parent b156298e82
commit 1b65a9487b
84 changed files with 10143 additions and 156 deletions

View File

@@ -8,6 +8,7 @@ from sqlalchemy.orm import Session
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
from loguru import logger
from functools import lru_cache
from services.database import get_db
from services.usage_tracking_service import UsageTrackingService
@@ -19,6 +20,12 @@ from models.subscription_models import (
router = APIRouter(prefix="/api/subscription", tags=["subscription"])
# Simple in-process cache for dashboard responses to smooth bursts
# Cache key: (user_id). TTL-like behavior implemented via timestamp check
_dashboard_cache: Dict[str, Dict[str, Any]] = {}
_dashboard_cache_ts: Dict[str, float] = {}
_DASHBOARD_CACHE_TTL_SEC = 2.0
@router.get("/usage/{user_id}")
async def get_user_usage(
user_id: str,
@@ -336,6 +343,12 @@ async def get_dashboard_data(
"""Get comprehensive dashboard data for usage monitoring."""
try:
# Serve from short TTL cache to avoid hammering DB on bursts
import time
now = time.time()
if user_id in _dashboard_cache and (now - _dashboard_cache_ts.get(user_id, 0)) < _DASHBOARD_CACHE_TTL_SEC:
return _dashboard_cache[user_id]
usage_service = UsageTrackingService(db)
pricing_service = PricingService(db)
@@ -372,7 +385,7 @@ async def get_dashboard_data(
current_day = datetime.now().day
projected_cost = (current_cost / current_day) * days_in_period if current_day > 0 else 0
return {
response_payload = {
"success": True,
"data": {
"current_usage": current_usage,
@@ -392,6 +405,9 @@ async def get_dashboard_data(
}
}
}
_dashboard_cache[user_id] = response_payload
_dashboard_cache_ts[user_id] = now
return response_payload
except Exception as e:
logger.error(f"Error getting dashboard data: {e}")

View File

@@ -383,7 +383,7 @@ def should_monitor_endpoint(path: str) -> bool:
"""Check if an endpoint should be monitored."""
return not any(path.endswith(excluded) for excluded in EXCLUDED_ENDPOINTS)
async def check_usage_limits_middleware(request: Request, user_id: str) -> Optional[JSONResponse]:
async def check_usage_limits_middleware(request: Request, user_id: str, request_body: str = None) -> Optional[JSONResponse]:
"""Check usage limits before processing request."""
if not user_id:
return None
@@ -397,17 +397,17 @@ async def check_usage_limits_middleware(request: Request, user_id: str) -> Optio
if not api_provider:
return None
# Get request body to estimate tokens
request_body = None
try:
if hasattr(request, '_body'):
request_body = request._body
else:
# Try to read body (this might not work in all cases)
body = await request.body()
request_body = body.decode('utf-8') if body else None
except:
pass
# Use provided request body or read it if not provided
if request_body is None:
try:
if hasattr(request, '_body'):
request_body = request._body
else:
# Try to read body (this might not work in all cases)
body = await request.body()
request_body = body.decode('utf-8') if body else None
except:
pass
# Estimate tokens needed
tokens_requested = 0
@@ -474,12 +474,7 @@ async def monitoring_middleware(request: Request, call_next):
except:
pass
# Check usage limits before processing
limit_response = await check_usage_limits_middleware(request, user_id)
if limit_response:
return limit_response
# Capture request body for usage tracking
# Capture request body for usage tracking (read once)
request_body = None
try:
if hasattr(request, '_body'):
@@ -490,6 +485,11 @@ async def monitoring_middleware(request: Request, call_next):
except:
pass
# Check usage limits before processing
limit_response = await check_usage_limits_middleware(request, user_id, request_body)
if limit_response:
return limit_response
# Get database session
db = next(get_db())

View File

@@ -23,12 +23,23 @@ from models.subscription_models import Base as SubscriptionBase
# Database configuration
DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///./alwrity.db')
# Create engine
# Create engine with safer pooling defaults and SQLite-friendly settings
engine_kwargs = {
"echo": False, # Set to True for SQL debugging
"pool_pre_ping": True, # Detect stale connections
"pool_recycle": 300, # Recycle connections to avoid timeouts
"pool_size": int(os.getenv("DB_POOL_SIZE", "20")),
"max_overflow": int(os.getenv("DB_MAX_OVERFLOW", "40")),
"pool_timeout": int(os.getenv("DB_POOL_TIMEOUT", "30")),
}
# SQLite needs special handling for multithreaded FastAPI
if DATABASE_URL.startswith("sqlite"):
engine_kwargs["connect_args"] = {"check_same_thread": False}
engine = create_engine(
DATABASE_URL,
echo=False, # Set to True for SQL debugging
pool_pre_ping=True,
pool_recycle=300,
**engine_kwargs,
)
# Create session factory

View File

@@ -25,28 +25,115 @@ class PricingService:
def initialize_default_pricing(self):
"""Initialize default pricing for all API providers."""
# Gemini API Pricing (as of January 2025)
# Gemini API Pricing (Updated as of September 2025 - Official Google AI Pricing)
# Source: https://ai.google.dev/gemini-api/docs/pricing
gemini_pricing = [
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-2.0-flash-lite",
"cost_per_input_token": 0.000000375, # $0.075 per 1M input tokens (up to 128k context)
"cost_per_output_token": 0.0000003, # $0.30 per 1M output tokens
"description": "Gemini 2.0 Flash Lite - Fast and efficient model"
},
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-2.5-flash",
"cost_per_input_token": 0.000000625, # $0.125 per 1M input tokens (up to 1M context)
"cost_per_output_token": 0.000000375, # $0.375 per 1M output tokens
"description": "Gemini 2.5 Flash - Balanced performance and cost"
},
# Gemini 2.5 Pro - Standard Tier
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-2.5-pro",
"cost_per_input_token": 0.00000125, # $1.25 per 1M input tokens (up to 200k context)
"cost_per_output_token": 0.00001, # $10.00 per 1M output tokens
"description": "Gemini 2.5 Pro - Most capable model"
"cost_per_input_token": 0.00000125, # $1.25 per 1M input tokens (prompts <= 200k tokens)
"cost_per_output_token": 0.00001, # $10.00 per 1M output tokens (prompts <= 200k tokens)
"description": "Gemini 2.5 Pro - State-of-the-art multipurpose model for coding and complex reasoning"
},
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-2.5-pro-large",
"cost_per_input_token": 0.0000025, # $2.50 per 1M input tokens (prompts > 200k tokens)
"cost_per_output_token": 0.000015, # $15.00 per 1M output tokens (prompts > 200k tokens)
"description": "Gemini 2.5 Pro - Large context model for prompts > 200k tokens"
},
# Gemini 2.5 Flash - Standard Tier
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-2.5-flash",
"cost_per_input_token": 0.0000003, # $0.30 per 1M input tokens (text/image/video)
"cost_per_output_token": 0.0000025, # $2.50 per 1M output tokens
"description": "Gemini 2.5 Flash - Hybrid reasoning model with 1M token context window"
},
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-2.5-flash-audio",
"cost_per_input_token": 0.000001, # $1.00 per 1M input tokens (audio)
"cost_per_output_token": 0.0000025, # $2.50 per 1M output tokens
"description": "Gemini 2.5 Flash - Audio input model"
},
# Gemini 2.5 Flash-Lite - Standard Tier
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-2.5-flash-lite",
"cost_per_input_token": 0.0000001, # $0.10 per 1M input tokens (text/image/video)
"cost_per_output_token": 0.0000004, # $0.40 per 1M output tokens
"description": "Gemini 2.5 Flash-Lite - Smallest and most cost-effective model for at-scale usage"
},
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-2.5-flash-lite-audio",
"cost_per_input_token": 0.0000003, # $0.30 per 1M input tokens (audio)
"cost_per_output_token": 0.0000004, # $0.40 per 1M output tokens
"description": "Gemini 2.5 Flash-Lite - Audio input model"
},
# Gemini 1.5 Flash - Standard Tier
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-1.5-flash",
"cost_per_input_token": 0.000000075, # $0.075 per 1M input tokens (prompts <= 128k tokens)
"cost_per_output_token": 0.0000003, # $0.30 per 1M output tokens (prompts <= 128k tokens)
"description": "Gemini 1.5 Flash - Fast multimodal model with 1M token context window"
},
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-1.5-flash-large",
"cost_per_input_token": 0.00000015, # $0.15 per 1M input tokens (prompts > 128k tokens)
"cost_per_output_token": 0.0000006, # $0.60 per 1M output tokens (prompts > 128k tokens)
"description": "Gemini 1.5 Flash - Large context model for prompts > 128k tokens"
},
# Gemini 1.5 Flash-8B - Standard Tier
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-1.5-flash-8b",
"cost_per_input_token": 0.0000000375, # $0.0375 per 1M input tokens (prompts <= 128k tokens)
"cost_per_output_token": 0.00000015, # $0.15 per 1M output tokens (prompts <= 128k tokens)
"description": "Gemini 1.5 Flash-8B - Smallest model for lower intelligence use cases"
},
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-1.5-flash-8b-large",
"cost_per_input_token": 0.000000075, # $0.075 per 1M input tokens (prompts > 128k tokens)
"cost_per_output_token": 0.0000003, # $0.30 per 1M output tokens (prompts > 128k tokens)
"description": "Gemini 1.5 Flash-8B - Large context model for prompts > 128k tokens"
},
# Gemini 1.5 Pro - Standard Tier
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-1.5-pro",
"cost_per_input_token": 0.00000125, # $1.25 per 1M input tokens (prompts <= 128k tokens)
"cost_per_output_token": 0.000005, # $5.00 per 1M output tokens (prompts <= 128k tokens)
"description": "Gemini 1.5 Pro - Highest intelligence model with 2M token context window"
},
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-1.5-pro-large",
"cost_per_input_token": 0.0000025, # $2.50 per 1M input tokens (prompts > 128k tokens)
"cost_per_output_token": 0.00001, # $10.00 per 1M output tokens (prompts > 128k tokens)
"description": "Gemini 1.5 Pro - Large context model for prompts > 128k tokens"
},
# Gemini Embedding - Standard Tier
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-embedding",
"cost_per_input_token": 0.00000015, # $0.15 per 1M input tokens
"cost_per_output_token": 0.0, # No output tokens for embeddings
"description": "Gemini Embedding - Newest embeddings model with higher rate limits"
},
# Grounding with Google Search - Standard Tier
{
"provider": APIProvider.GEMINI,
"model_name": "gemini-grounding-search",
"cost_per_request": 0.035, # $35 per 1,000 requests (after free tier)
"cost_per_input_token": 0.0, # No additional token cost for grounding
"cost_per_output_token": 0.0, # No additional token cost for grounding
"description": "Grounding with Google Search - 1,500 RPD free, then $35/1K requests"
}
]

View File

@@ -54,7 +54,7 @@ class UsageTrackingService:
model_used=model_used,
tokens_input=tokens_input,
tokens_output=tokens_output,
tokens_total=tokens_input + tokens_output,
tokens_total=(tokens_input or 0) + (tokens_output or 0),
cost_input=cost_data['cost_input'],
cost_output=cost_data['cost_output'],
cost_total=cost_data['cost_total'],
@@ -75,7 +75,7 @@ class UsageTrackingService:
await self._update_usage_summary(
user_id=user_id,
provider=provider,
tokens_used=tokens_input + tokens_output,
tokens_used=(tokens_input or 0) + (tokens_output or 0),
cost=cost_data['cost_total'],
billing_period=billing_period,
response_time=response_time,
@@ -92,7 +92,7 @@ class UsageTrackingService:
return {
'usage_logged': True,
'cost': cost_data['cost_total'],
'tokens_used': tokens_input + tokens_output,
'tokens_used': (tokens_input or 0) + (tokens_output or 0),
'billing_period': billing_period
}
@@ -304,17 +304,35 @@ class UsageTrackingService:
).order_by(UsageAlert.created_at.desc()).limit(10).all()
if not summary:
# No usage this period
# No usage this period - return complete structure with zeros
provider_breakdown = {}
usage_percentages = {}
# Initialize provider breakdown with zeros
for provider in APIProvider:
provider_name = provider.value
provider_breakdown[provider_name] = {
'calls': 0,
'tokens': 0,
'cost': 0.0
}
usage_percentages[f"{provider_name}_calls"] = 0
usage_percentages['cost'] = 0
return {
'billing_period': billing_period,
'usage_status': 'active',
'total_calls': 0,
'total_tokens': 0,
'total_cost': 0.0,
'avg_response_time': 0.0,
'error_rate': 0.0,
'last_updated': datetime.now().isoformat(),
'limits': limits,
'provider_breakdown': {},
'provider_breakdown': provider_breakdown,
'alerts': [],
'usage_percentages': {}
'usage_percentages': usage_percentages
}
# Calculate usage percentages
@@ -322,8 +340,8 @@ class UsageTrackingService:
if limits:
for provider in APIProvider:
provider_name = provider.value
current_calls = getattr(summary, f"{provider_name}_calls", 0)
call_limit = limits['limits'].get(f"{provider_name}_calls", 0)
current_calls = getattr(summary, f"{provider_name}_calls", 0) or 0
call_limit = limits['limits'].get(f"{provider_name}_calls", 0) or 0
if call_limit > 0:
usage_percentages[f"{provider_name}_calls"] = (current_calls / call_limit) * 100
@@ -331,9 +349,10 @@ class UsageTrackingService:
usage_percentages[f"{provider_name}_calls"] = 0
# Cost usage percentage
cost_limit = limits['limits'].get('monthly_cost', 0)
cost_limit = limits['limits'].get('monthly_cost', 0) or 0
total_cost = summary.total_cost or 0
if cost_limit > 0:
usage_percentages['cost'] = (summary.total_cost / cost_limit) * 100
usage_percentages['cost'] = (total_cost / cost_limit) * 100
else:
usage_percentages['cost'] = 0
@@ -342,19 +361,19 @@ class UsageTrackingService:
for provider in APIProvider:
provider_name = provider.value
provider_breakdown[provider_name] = {
'calls': getattr(summary, f"{provider_name}_calls", 0),
'tokens': getattr(summary, f"{provider_name}_tokens", 0),
'cost': getattr(summary, f"{provider_name}_cost", 0.0)
'calls': getattr(summary, f"{provider_name}_calls", 0) or 0,
'tokens': getattr(summary, f"{provider_name}_tokens", 0) or 0,
'cost': getattr(summary, f"{provider_name}_cost", 0.0) or 0.0
}
return {
'billing_period': billing_period,
'usage_status': summary.usage_status.value,
'total_calls': summary.total_calls,
'total_tokens': summary.total_tokens,
'total_cost': summary.total_cost,
'avg_response_time': summary.avg_response_time,
'error_rate': summary.error_rate,
'usage_status': summary.usage_status.value if hasattr(summary.usage_status, 'value') else str(summary.usage_status),
'total_calls': summary.total_calls or 0,
'total_tokens': summary.total_tokens or 0,
'total_cost': summary.total_cost or 0.0,
'avg_response_time': summary.avg_response_time or 0.0,
'error_rate': summary.error_rate or 0.0,
'limits': limits,
'provider_breakdown': provider_breakdown,
'alerts': [
@@ -405,9 +424,9 @@ class UsageTrackingService:
summary = summary_dict.get(period)
if summary:
trends['total_calls'].append(summary.total_calls)
trends['total_cost'].append(summary.total_cost)
trends['total_tokens'].append(summary.total_tokens)
trends['total_calls'].append(summary.total_calls or 0)
trends['total_cost'].append(summary.total_cost or 0.0)
trends['total_tokens'].append(summary.total_tokens or 0)
# Provider-specific trends
for provider in APIProvider:
@@ -420,13 +439,13 @@ class UsageTrackingService:
}
trends['provider_trends'][provider_name]['calls'].append(
getattr(summary, f"{provider_name}_calls", 0)
getattr(summary, f"{provider_name}_calls", 0) or 0
)
trends['provider_trends'][provider_name]['cost'].append(
getattr(summary, f"{provider_name}_cost", 0.0)
getattr(summary, f"{provider_name}_cost", 0.0) or 0.0
)
trends['provider_trends'][provider_name]['tokens'].append(
getattr(summary, f"{provider_name}_tokens", 0)
getattr(summary, f"{provider_name}_tokens", 0) or 0
)
else:
# No data for this period

View File

@@ -166,7 +166,8 @@ def test_database_tables():
WHERE type='table' AND (
name LIKE '%subscription%' OR
name LIKE '%usage%' OR
name LIKE '%pricing%'
name LIKE '%pricing%' OR
name LIKE '%billing%'
)
ORDER BY name
""")

View File

@@ -0,0 +1,91 @@
#!/usr/bin/env python3
"""
Database validation script for billing system
"""
import sqlite3
from datetime import datetime
def validate_database():
conn = sqlite3.connect('alwrity.db')
cursor = conn.cursor()
print('=== BILLING DATABASE VALIDATION ===')
print(f'Validation timestamp: {datetime.now()}')
print()
# Check subscription-related tables
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND (
name LIKE '%subscription%' OR
name LIKE '%usage%' OR
name LIKE '%billing%' OR
name LIKE '%pricing%' OR
name LIKE '%alert%'
)
ORDER BY name
""")
tables = cursor.fetchall()
print('=== SUBSCRIPTION TABLES ===')
for table in tables:
table_name = table[0]
print(f'\nTable: {table_name}')
# Get table schema
cursor.execute(f'PRAGMA table_info({table_name})')
columns = cursor.fetchall()
print(' Schema:')
for col in columns:
col_id, name, type_name, not_null, default, pk = col
constraints = []
if pk:
constraints.append('PRIMARY KEY')
if not_null:
constraints.append('NOT NULL')
if default:
constraints.append(f'DEFAULT {default}')
constraint_str = f' ({", ".join(constraints)})' if constraints else ''
print(f' {name}: {type_name}{constraint_str}')
# Get row count
cursor.execute(f'SELECT COUNT(*) FROM {table_name}')
count = cursor.fetchone()[0]
print(f' Row count: {count}')
# Sample data for non-empty tables
if count > 0 and count <= 10:
cursor.execute(f'SELECT * FROM {table_name} LIMIT 3')
rows = cursor.fetchall()
print(' Sample data:')
for i, row in enumerate(rows):
print(f' Row {i+1}: {row}')
# Check for user-specific data
print('\n=== USER DATA VALIDATION ===')
# Check if we have user-specific usage data
cursor.execute("SELECT DISTINCT user_id FROM usage_summary LIMIT 5")
users = cursor.fetchall()
print(f'Users with usage data: {[u[0] for u in users]}')
# Check user subscriptions
cursor.execute("SELECT DISTINCT user_id FROM user_subscriptions LIMIT 5")
user_subs = cursor.fetchall()
print(f'Users with subscriptions: {[u[0] for u in user_subs]}')
# Check API usage logs
cursor.execute("SELECT COUNT(*) FROM api_usage_logs")
api_logs_count = cursor.fetchone()[0]
print(f'Total API usage logs: {api_logs_count}')
if api_logs_count > 0:
cursor.execute("SELECT DISTINCT user_id FROM api_usage_logs LIMIT 5")
api_users = cursor.fetchall()
print(f'Users with API usage logs: {[u[0] for u in api_users]}')
conn.close()
print('\n=== VALIDATION COMPLETE ===')
if __name__ == '__main__':
validate_database()