feat: enhance billing dashboard with historical data & security hardening

- Fix usage tracking zero-value bug with self-healing logic
- Add month selector for historical usage views
- Implement start-of-month graceful initialization
- Merge PR #372: Harden user-scoped access in subscription routes
- Fix UI bugs in UsageDashboard component
This commit is contained in:
ajaysi
2026-03-05 10:21:56 +05:30
parent 261c224dca
commit 26131232c7
8 changed files with 234 additions and 161 deletions

View File

@@ -555,7 +555,10 @@ async def get_agent_huddle_feed_endpoint(
try:
user_id = str(current_user.get("id"))
service = AgentActivityService(db, user_id)
feed = service.get_huddle_feed(
# Use run_in_threadpool to execute the blocking service call in a separate thread
feed = await run_in_threadpool(
service.get_huddle_feed,
since=since,
cursor=cursor,
runs_limit=runs_limit,

View File

@@ -48,15 +48,19 @@ async def get_today_workflow(
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db),
) -> Dict[str, Any]:
from starlette.concurrency import run_in_threadpool
user_id = str(current_user.get("id"))
plan, created = await get_or_create_daily_workflow_plan(db, user_id, date=date)
tasks = (
db.query(DailyWorkflowTask)
.filter(DailyWorkflowTask.plan_id == plan.id, DailyWorkflowTask.user_id == user_id)
.order_by(DailyWorkflowTask.created_at.asc())
.all()
)
def _fetch_tasks():
return (
db.query(DailyWorkflowTask)
.filter(DailyWorkflowTask.plan_id == plan.id, DailyWorkflowTask.user_id == user_id)
.order_by(DailyWorkflowTask.created_at.asc())
.all()
)
tasks = await run_in_threadpool(_fetch_tasks)
response_tasks = []
for t in tasks:
@@ -100,18 +104,26 @@ async def get_today_workflow(
from datetime import date as date_type, timedelta
y_str = (date_type.fromisoformat(plan.date) - timedelta(days=1)).isoformat()
y_plan = (
db.query(DailyWorkflowPlan)
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == y_str)
.first()
)
if y_plan:
y_tasks = (
db.query(DailyWorkflowTask)
.filter(DailyWorkflowTask.plan_id == y_plan.id, DailyWorkflowTask.user_id == user_id)
.order_by(DailyWorkflowTask.created_at.asc())
.all()
def _fetch_yesterday():
y_plan = (
db.query(DailyWorkflowPlan)
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == y_str)
.first()
)
if y_plan:
y_tasks = (
db.query(DailyWorkflowTask)
.filter(DailyWorkflowTask.plan_id == y_plan.id, DailyWorkflowTask.user_id == user_id)
.order_by(DailyWorkflowTask.created_at.asc())
.all()
)
return y_tasks
return []
y_tasks = await run_in_threadpool(_fetch_yesterday)
if y_tasks:
y_response = []
for t in y_tasks:
y_response.append(

View File

@@ -158,25 +158,25 @@ def huggingface_text_response(
if not api_key:
raise Exception("HF_TOKEN not found in environment variables")
# Initialize Hugging Face client using Responses API
# Initialize Hugging Face client
client = OpenAI(
base_url="https://router.huggingface.co/v1",
base_url=f"https://router.huggingface.co/hf/v1",
api_key=api_key,
)
logger.info("✅ Hugging Face client initialized for text response")
# Prepare input for the API
input_content = []
messages = []
# Add system prompt if provided
if system_prompt:
input_content.append({
messages.append({
"role": "system",
"content": system_prompt
})
# Add user prompt
input_content.append({
messages.append({
"role": "user",
"content": prompt
})
@@ -191,31 +191,23 @@ def huggingface_text_response(
max_tokens,
)
logger.info("🚀 Making Hugging Face API call...")
logger.info("🚀 Making Hugging Face API call (chat completion)...")
# Add rate limiting to prevent expensive API calls
import time
time.sleep(1) # 1 second delay between API calls
# Make the API call using Responses API
response = client.responses.parse(
# Make the API call using Chat Completions
response = client.chat.completions.create(
model=model,
input=input_content,
messages=messages,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens
)
# Extract text from response
if hasattr(response, 'output_text') and response.output_text:
generated_text = response.output_text
elif hasattr(response, 'output') and response.output:
# Handle case where output is a list
if isinstance(response.output, list) and len(response.output) > 0:
generated_text = response.output[0].get('content', '')
else:
generated_text = str(response.output)
else:
generated_text = str(response)
generated_text = response.choices[0].message.content
# Clean up the response
if generated_text:
@@ -296,26 +288,28 @@ def huggingface_structured_json_response(
if not api_key:
raise Exception("HF_TOKEN not found in environment variables")
# Initialize Hugging Face client using Responses API
# Initialize OpenAI client with Hugging Face base URL
# Use standard Inference API endpoint
client = OpenAI(
base_url="https://router.huggingface.co/v1",
base_url=f"https://router.huggingface.co/hf/v1",
api_key=api_key,
)
logger.info("✅ Hugging Face client initialized for structured JSON response")
# Prepare input for the API
input_content = []
messages = []
# Add system prompt if provided
if system_prompt:
input_content.append({
messages.append({
"role": "system",
"content": system_prompt
})
# Add user prompt with JSON instruction
# For HF models, explicit JSON instruction in prompt is often better than response_format
json_instruction = "Please respond with valid JSON that matches the provided schema."
input_content.append({
messages.append({
"role": "user",
"content": f"{prompt}\n\n{json_instruction}"
})
@@ -332,52 +326,39 @@ def huggingface_structured_json_response(
logger.info("🚀 Making Hugging Face structured API call...")
# Make the API call using Responses API with structured output
# Use simple text generation and parse JSON manually to avoid API format issues
logger.info("🚀 Making Hugging Face API call (text mode with JSON parsing)...")
# Make the API call using standard Chat Completions
logger.info("🚀 Making Hugging Face API call (chat completion)...")
# Add JSON instruction to the prompt
json_instruction = "\n\nPlease respond with valid JSON that matches this exact structure:\n" + json.dumps(schema, indent=2)
input_content[-1]["content"] = input_content[-1]["content"] + json_instruction
# Add JSON schema to prompt for guidance
json_schema_str = json.dumps(schema, indent=2)
messages[-1]["content"] += f"\n\nJSON Schema:\n{json_schema_str}"
# Add rate limiting to prevent expensive API calls
import time
time.sleep(1) # 1 second delay between API calls
response = client.responses.parse(
model=model,
input=input_content,
temperature=temperature
)
# Extract structured data from response
if hasattr(response, 'output_parsed') and response.output_parsed:
# The new API returns parsed data directly (Pydantic model case)
logger.info("✅ Hugging Face structured JSON response parsed successfully")
# Convert Pydantic model to dict if needed
if hasattr(response.output_parsed, 'model_dump'):
return response.output_parsed.model_dump()
elif hasattr(response.output_parsed, 'dict'):
return response.output_parsed.dict()
else:
return response.output_parsed
elif hasattr(response, 'output_text') and response.output_text:
# Fallback to text parsing if output_parsed is not available
response_text = response.output_text
# Clean up the response text
response_text = re.sub(r'```json\n?', '', response_text)
response_text = re.sub(r'```\n?', '', response_text)
response_text = response_text.strip()
try:
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
response_format={"type": "json_object"} # Try to enforce JSON mode if supported
)
# Fix common markdown artefacts that break JSON, e.g. lines starting with **"key":
# **"narration": "text"
# becomes:
# "narration": "text"
response_text = re.sub(r'^\s*\*\*(?=\s*")', '', response_text, flags=re.MULTILINE)
response_text = response.choices[0].message.content
# Clean up response text if needed
response_text = response_text.strip()
if response_text.startswith("```json"):
response_text = response_text[7:]
if response_text.endswith("```"):
response_text = response_text[:-3]
response_text = response_text.strip()
try:
parsed_json = json.loads(response_text)
logger.info("✅ Hugging Face structured JSON response parsed from text")
logger.info("✅ Hugging Face structured JSON response parsed successfully")
return parsed_json
except json.JSONDecodeError as json_err:
logger.error(f"❌ JSON parsing failed: {json_err}")
@@ -393,20 +374,30 @@ def huggingface_structured_json_response(
except json.JSONDecodeError:
pass
# If all else fails, return a structured error response
logger.error("❌ All JSON parsing attempts failed")
return {
"error": "Failed to parse JSON response",
"raw_response": response_text,
"schema_expected": schema
}
else:
logger.error("❌ No valid response data found")
return {
"error": "No valid response data found",
"raw_response": str(response),
"schema_expected": schema
}
return {"error": "Failed to parse JSON response", "raw_response": response_text}
except Exception as e:
logger.error(f"❌ Hugging Face API call failed: {e}")
# If 422 Unprocessable Entity (often due to response_format not supported), retry without it
if "422" in str(e) or "not supported" in str(e).lower():
logger.info("Retrying without response_format...")
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
response_text = response.choices[0].message.content
# ... (same parsing logic would apply, simplified here for brevity)
try:
return json.loads(response_text)
except:
# Regex fallback
json_match = re.search(r'\{.*\}', response_text, re.DOTALL)
if json_match:
return json.loads(json_match.group())
return {"error": "Failed to parse JSON response", "raw_response": response_text}
raise e
except Exception as e:
error_msg = str(e) if str(e) else repr(e)

View File

@@ -54,7 +54,7 @@ def llm_text_gen(prompt: str, system_prompt: Optional[str] = None, json_struct:
model = "gemini-2.0-flash-001"
elif env_provider in ['hf_response_api', 'huggingface', 'hf']:
gpt_provider = "huggingface"
model = "openai/gpt-oss-120b:groq"
model = "mistralai/Mistral-7B-Instruct-v0.3"
# Default blog characteristics
blog_tone = "Professional"
@@ -80,7 +80,7 @@ def llm_text_gen(prompt: str, system_prompt: Optional[str] = None, json_struct:
model = "gemini-2.0-flash-001"
elif "huggingface" in available_providers:
gpt_provider = "huggingface"
model = "openai/gpt-oss-120b:groq"
model = "mistralai/Mistral-7B-Instruct-v0.3"
else:
logger.error("[llm_text_gen] No API keys found for supported providers.")
raise RuntimeError("No LLM API keys configured. Configure GEMINI_API_KEY or HF_TOKEN to enable AI responses.")
@@ -93,7 +93,7 @@ def llm_text_gen(prompt: str, system_prompt: Optional[str] = None, json_struct:
model = "gemini-2.0-flash-001"
elif "huggingface" in available_providers:
gpt_provider = "huggingface"
model = "openai/gpt-oss-120b:groq"
model = "mistralai/Mistral-7B-Instruct-v0.3"
else:
raise RuntimeError("No supported providers available.")
@@ -303,7 +303,7 @@ def llm_text_gen(prompt: str, system_prompt: Optional[str] = None, json_struct:
elif fallback_provider == "huggingface":
provider_enum = APIProvider.MISTRAL
actual_provider_name = "huggingface"
fallback_model = "openai/gpt-oss-120b:groq"
fallback_model = "mistralai/Mistral-7B-Instruct-v0.3"
if fallback_provider == "google":
if json_struct:
@@ -330,7 +330,7 @@ def llm_text_gen(prompt: str, system_prompt: Optional[str] = None, json_struct:
response_text = huggingface_structured_json_response(
prompt=prompt,
schema=json_struct,
model="openai/gpt-oss-120b:groq",
model="mistralai/Mistral-7B-Instruct-v0.3",
temperature=temperature,
max_tokens=max_tokens,
system_prompt=system_instructions
@@ -338,7 +338,7 @@ def llm_text_gen(prompt: str, system_prompt: Optional[str] = None, json_struct:
else:
response_text = huggingface_text_response(
prompt=prompt,
model="openai/gpt-oss-120b:groq",
model="mistralai/Mistral-7B-Instruct-v0.3",
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,

View File

@@ -477,14 +477,20 @@ async def get_lightweight_stats(user_id: str) -> Dict[str, Any]:
# Optimized: Single query with conditional aggregation instead of two separate queries
# This is much faster as it only scans the table once
stats = db.query(
func.count(APIRequest.id).label('total_requests'),
func.sum(
case((APIRequest.status_code >= 400, 1), else_=0)
).label('total_errors')
).filter(
APIRequest.timestamp >= five_minutes_ago
).first()
# Use run_in_threadpool to avoid blocking the event loop with sync DB query
from starlette.concurrency import run_in_threadpool
def _fetch_stats():
return db.query(
func.count(APIRequest.id).label('total_requests'),
func.sum(
case((APIRequest.status_code >= 400, 1), else_=0)
).label('total_errors')
).filter(
APIRequest.timestamp >= five_minutes_ago
).first()
stats = await run_in_threadpool(_fetch_stats)
recent_requests = stats.total_requests or 0 if stats else 0
recent_errors = int(stats.total_errors or 0) if stats else 0

View File

@@ -476,54 +476,65 @@ async def generate_agent_enhanced_plan(db: Session, user_id: str, date: str) ->
async def get_or_create_daily_workflow_plan(db: Session, user_id: str, date: Optional[str] = None) -> tuple[DailyWorkflowPlan, bool]:
from starlette.concurrency import run_in_threadpool
date_str = date or _today_date_str()
existing = (
db.query(DailyWorkflowPlan)
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str)
.first()
)
def _get_existing():
return (
db.query(DailyWorkflowPlan)
.filter(DailyWorkflowPlan.user_id == user_id, DailyWorkflowPlan.date == date_str)
.first()
)
existing = await run_in_threadpool(_get_existing)
if existing:
return existing, False
plan_data = await generate_agent_enhanced_plan(db, user_id, date_str)
tasks = plan_data.get("tasks", [])
plan = DailyWorkflowPlan(
user_id=user_id,
date=date_str,
source="agent",
plan_json=plan_data,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
db.add(plan)
db.commit()
db.refresh(plan)
for t in tasks:
pillar_id = str(t.get("pillarId") or "").lower().strip()
if pillar_id not in PILLAR_IDS:
continue
task = DailyWorkflowTask(
plan_id=plan.id,
def _create_plan():
plan = DailyWorkflowPlan(
user_id=user_id,
pillar_id=pillar_id,
title=str(t.get("title") or "Task").strip()[:255],
description=str(t.get("description") or "").strip(),
status=_coerce_status(t.get("status")),
priority=_coerce_priority(t.get("priority")),
estimated_time=int(t.get("estimatedTime") or 15),
action_type=str(t.get("actionType") or "navigate").strip()[:20],
action_url=str(t.get("actionUrl") or "").strip() or None,
enabled=bool(t.get("enabled", True)),
dependencies=t.get("dependencies") if isinstance(t.get("dependencies"), list) else None,
metadata_json=t.get("metadata") if isinstance(t.get("metadata"), dict) else None,
date=date_str,
source="agent",
plan_json=plan_data,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
db.add(task)
db.commit()
db.refresh(plan)
db.add(plan)
db.commit()
db.refresh(plan)
for t in tasks:
pillar_id = str(t.get("pillarId") or "").lower().strip()
if pillar_id not in PILLAR_IDS:
continue
task = DailyWorkflowTask(
plan_id=plan.id,
user_id=user_id,
pillar_id=pillar_id,
title=str(t.get("title") or "Task").strip()[:255],
description=str(t.get("description") or "").strip(),
status=_coerce_status(t.get("status")),
priority=_coerce_priority(t.get("priority")),
estimated_time=int(t.get("estimatedTime") or 15),
action_type=str(t.get("actionType") or "navigate").strip()[:20],
action_url=str(t.get("actionUrl") or "").strip(),
dependencies=json.dumps(t.get("dependencies") or []),
metadata_json=t.get("metadata") or {},
enabled=bool(t.get("enabled", True)),
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
db.add(task)
db.commit()
return plan
plan = await run_in_threadpool(_create_plan)
return plan, True

View File

@@ -61,13 +61,16 @@ const EnhancedStrategyActivationButton: React.FC<EnhancedStrategyActivationButto
const handleSetupMonitoring = async (monitoringPlan: any) => {
setIsLoading(true);
// try {
setActivationProgress(10);
try {
console.log('🎯 EnhancedStrategyActivationButton: handleSetupMonitoring called');
// Get strategy ID
const strategyId = strategyData?.id || 1;
// Step 1: Generate monitoring plan if not provided
setActivationProgress(30);
let finalMonitoringPlan = monitoringPlan;
if (!finalMonitoringPlan) {
console.log('🎯 Generating monitoring plan...');
@@ -85,6 +88,8 @@ const EnhancedStrategyActivationButton: React.FC<EnhancedStrategyActivationButto
}
}
setActivationProgress(60);
// Step 2: Activate strategy with monitoring plan
console.log('🎯 Activating strategy with monitoring...');
try {
@@ -98,19 +103,31 @@ const EnhancedStrategyActivationButton: React.FC<EnhancedStrategyActivationButto
} catch (error) {
console.warn('Could not activate strategy with monitoring:', error);
// Continue with local activation only
} finally {
setIsLoading(false);
}
setActivationProgress(80);
// Step 3: Call the local confirmation function
console.log('🎯 EnhancedStrategyActivationButton: Calling onConfirmStrategy()');
await onConfirmStrategy();
console.log('🎯 EnhancedStrategyActivationButton: onConfirmStrategy() completed');
// } catch (error) {
// setIsLoading(false);
// console.error('Strategy activation failed:', error);
// throw error;
// }
setActivationProgress(100);
setIsSuccess(true);
setShowSuccessMessage(true);
// Reset success state after a delay
setTimeout(() => {
setIsSuccess(false);
}, 3000);
} catch (error) {
console.error('Strategy activation failed:', error);
// Optional: Show error message
} finally {
setIsLoading(false);
setActivationProgress(0);
}
};
/* const setupAnalyticsAndMonitoring = async (strategyId: number, monitoringPlan: any) => {
@@ -212,6 +229,8 @@ const EnhancedStrategyActivationButton: React.FC<EnhancedStrategyActivationButto
size={20}
sx={{ color: 'white' }}
/>
) : isSuccess ? (
<CheckIcon />
) : strategyConfirmed ? (
<AutoAwesomeIcon />
) : (
@@ -289,13 +308,24 @@ const EnhancedStrategyActivationButton: React.FC<EnhancedStrategyActivationButto
) : isSuccess ? (
<motion.div
key="success"
initial={{ opacity: 0, scale: 0.8 }}
animate={{ opacity: 1, scale: 1 }}
transition={{ duration: 0.3 }}
variants={successVariants}
initial="initial"
animate="animate"
exit="exit"
>
<Typography variant="button" sx={{ fontWeight: 600 }}>
Strategy Activated! 🎉
</Typography>
<Box sx={{ display: 'flex', alignItems: 'center', gap: 1 }}>
<CelebrationIcon />
<Typography variant="button" sx={{ fontWeight: 600 }}>
Strategy Activated!
</Typography>
<motion.span
variants={confettiVariants}
initial="initial"
animate="animate"
>
🎉
</motion.span>
</Box>
</motion.div>
) : strategyConfirmed ? (
<motion.div

View File

@@ -85,17 +85,37 @@ export const useAgentHuddleFeed = (options?: { detailTier?: 'summary' | 'detaile
useEffect(() => {
stopRef.current = false;
let pollingTimer: ReturnType<typeof setInterval> | null = null;
let backoffMs = BASE_BACKOFF_MS;
// If polling fails repeatedly, try to reconnect SSE
const handlePollingError = () => {
if (connectionMode === 'polling' && reconnectAttemptRef.current < 5) {
connect();
}
};
const startPolling = () => {
setConnectionMode('polling');
if (pollingTimer) clearInterval(pollingTimer);
pollingTimer = setInterval(async () => {
const poll = async () => {
if (document.hidden) return;
try {
await loadSnapshot(cursorRef.current);
} catch {
// no-op
backoffMs = BASE_BACKOFF_MS;
} catch (err: any) {
if (err?.response?.status === 429) {
// Exponential backoff for 429s
backoffMs = Math.min(backoffMs * 2, 60000);
clearInterval(pollingTimer!);
pollingTimer = setInterval(poll, backoffMs);
} else {
handlePollingError();
}
}
}, 7000);
};
pollingTimer = setInterval(poll, 10000);
};
const connect = async () => {