Research Wizard and CopilotKit mitigation review

This commit is contained in:
ajaysi
2025-11-04 08:11:57 +05:30
parent e69107b07c
commit 55087c4f37
27 changed files with 2167 additions and 277 deletions

View File

@@ -13,6 +13,7 @@ from functools import lru_cache
from services.database import get_db
from services.subscription import UsageTrackingService, PricingService
from services.subscription.schema_utils import ensure_subscription_plan_columns
import sqlite3
from middleware.auth_middleware import get_current_user
from models.subscription_models import (
APIProvider, SubscriptionPlan, UserSubscription, UsageSummary,
@@ -80,8 +81,11 @@ async def get_subscription_plans(
"""Get all available subscription plans."""
try:
# Ensure required columns exist (handles environments without migrations applied yet)
ensure_subscription_plan_columns(db)
except Exception as schema_err:
logger.warning(f"Schema check failed, will retry on query: {schema_err}")
try:
plans = db.query(SubscriptionPlan).filter(
SubscriptionPlan.is_active == True
).order_by(SubscriptionPlan.price_monthly).all()
@@ -123,7 +127,60 @@ async def get_subscription_plans(
}
}
except Exception as e:
except (sqlite3.OperationalError, Exception) as e:
error_str = str(e).lower()
if 'no such column' in error_str and 'exa_calls_limit' in error_str:
logger.warning("Missing column detected in subscription plans query, attempting schema fix...")
try:
import services.subscription.schema_utils as schema_utils
schema_utils._checked_subscription_plan_columns = False
ensure_subscription_plan_columns(db)
db.expire_all()
# Retry the query
plans = db.query(SubscriptionPlan).filter(
SubscriptionPlan.is_active == True
).order_by(SubscriptionPlan.price_monthly).all()
plans_data = []
for plan in plans:
plans_data.append({
"id": plan.id,
"name": plan.name,
"tier": plan.tier.value,
"price_monthly": plan.price_monthly,
"price_yearly": plan.price_yearly,
"description": plan.description,
"features": plan.features or [],
"limits": {
"ai_text_generation_calls": getattr(plan, 'ai_text_generation_calls_limit', None) or 0,
"gemini_calls": plan.gemini_calls_limit,
"openai_calls": plan.openai_calls_limit,
"anthropic_calls": plan.anthropic_calls_limit,
"mistral_calls": plan.mistral_calls_limit,
"tavily_calls": plan.tavily_calls_limit,
"serper_calls": plan.serper_calls_limit,
"metaphor_calls": plan.metaphor_calls_limit,
"firecrawl_calls": plan.firecrawl_calls_limit,
"stability_calls": plan.stability_calls_limit,
"gemini_tokens": plan.gemini_tokens_limit,
"openai_tokens": plan.openai_tokens_limit,
"anthropic_tokens": plan.anthropic_tokens_limit,
"mistral_tokens": plan.mistral_tokens_limit,
"monthly_cost": plan.monthly_cost_limit
}
})
return {
"success": True,
"data": {
"plans": plans_data,
"total": len(plans_data)
}
}
except Exception as retry_err:
logger.error(f"Schema fix and retry failed: {retry_err}")
raise HTTPException(status_code=500, detail=f"Database schema error: {str(e)}")
logger.error(f"Error getting subscription plans: {e}")
raise HTTPException(status_code=500, detail=str(e))
@@ -239,6 +296,10 @@ async def get_subscription_status(
try:
ensure_subscription_plan_columns(db)
except Exception as schema_err:
logger.warning(f"Schema check failed, will retry on query: {schema_err}")
try:
subscription = db.query(UserSubscription).filter(
UserSubscription.user_id == user_id,
UserSubscription.is_active == True
@@ -333,7 +394,97 @@ async def get_subscription_status(
}
}
except Exception as e:
except (sqlite3.OperationalError, Exception) as e:
error_str = str(e).lower()
if 'no such column' in error_str and 'exa_calls_limit' in error_str:
# Try to fix schema and retry once
logger.warning("Missing column detected in subscription status query, attempting schema fix...")
try:
import services.subscription.schema_utils as schema_utils
schema_utils._checked_subscription_plan_columns = False
ensure_subscription_plan_columns(db)
db.expire_all()
# Retry the query
subscription = db.query(UserSubscription).filter(
UserSubscription.user_id == user_id,
UserSubscription.is_active == True
).first()
if not subscription:
free_plan = db.query(SubscriptionPlan).filter(
SubscriptionPlan.tier == SubscriptionTier.FREE,
SubscriptionPlan.is_active == True
).first()
if free_plan:
return {
"success": True,
"data": {
"active": True,
"plan": "free",
"tier": "free",
"can_use_api": True,
"limits": {
"ai_text_generation_calls": getattr(free_plan, 'ai_text_generation_calls_limit', None) or 0,
"gemini_calls": free_plan.gemini_calls_limit,
"openai_calls": free_plan.openai_calls_limit,
"anthropic_calls": free_plan.anthropic_calls_limit,
"mistral_calls": free_plan.mistral_calls_limit,
"tavily_calls": free_plan.tavily_calls_limit,
"serper_calls": free_plan.serper_calls_limit,
"metaphor_calls": free_plan.metaphor_calls_limit,
"firecrawl_calls": free_plan.firecrawl_calls_limit,
"stability_calls": free_plan.stability_calls_limit,
"monthly_cost": free_plan.monthly_cost_limit
}
}
}
elif subscription:
now = datetime.utcnow()
if subscription.current_period_end < now:
if getattr(subscription, 'auto_renew', False):
try:
from services.pricing_service import PricingService
pricing = PricingService(db)
pricing._ensure_subscription_current(subscription)
except Exception as e2:
logger.error(f"Failed to auto-advance subscription: {e2}")
else:
return {
"success": True,
"data": {
"active": False,
"plan": subscription.plan.tier.value,
"tier": subscription.plan.tier.value,
"can_use_api": False,
"reason": "Subscription expired"
}
}
return {
"success": True,
"data": {
"active": True,
"plan": subscription.plan.tier.value,
"tier": subscription.plan.tier.value,
"can_use_api": True,
"limits": {
"ai_text_generation_calls": getattr(subscription.plan, 'ai_text_generation_calls_limit', None) or 0,
"gemini_calls": subscription.plan.gemini_calls_limit,
"openai_calls": subscription.plan.openai_calls_limit,
"anthropic_calls": subscription.plan.anthropic_calls_limit,
"mistral_calls": subscription.plan.mistral_calls_limit,
"tavily_calls": subscription.plan.tavily_calls_limit,
"serper_calls": subscription.plan.serper_calls_limit,
"metaphor_calls": subscription.plan.metaphor_calls_limit,
"firecrawl_calls": subscription.plan.firecrawl_calls_limit,
"stability_calls": subscription.plan.stability_calls_limit,
"monthly_cost": subscription.plan.monthly_cost_limit
}
}
}
except Exception as retry_err:
logger.error(f"Schema fix and retry failed: {retry_err}")
raise HTTPException(status_code=500, detail=f"Database schema error: {str(e)}")
logger.error(f"Error getting subscription status: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -94,6 +94,12 @@ class ResearchConfig(BaseModel):
include_expert_quotes: bool = True
include_competitors: bool = True
include_trends: bool = True
# Exa-specific options
exa_category: Optional[str] = None # company, research paper, news, linkedin profile, github, tweet, movie, song, personal site, pdf, financial report
exa_include_domains: List[str] = [] # Domain whitelist
exa_exclude_domains: List[str] = [] # Domain blacklist
exa_search_type: Optional[str] = "auto" # "auto", "keyword", "neural"
class BlogResearchRequest(BaseModel):

View File

@@ -26,18 +26,14 @@ class ExaResearchProvider(BaseProvider):
# Build Exa query
query = f"{topic} {industry} {target_audience}"
# Map source types to Exa categories
category = self._map_source_type_to_category(config.source_types)
# Determine category: use exa_category if set, otherwise map from source_types
category = config.exa_category if config.exa_category else self._map_source_type_to_category(config.source_types)
logger.info(f"[Exa Research] Executing search: {query}")
# Execute Exa search
results = self.exa.search_and_contents(
query,
type="auto",
category=category,
num_results=min(config.max_sources, 25),
contents={
# Build search kwargs
search_kwargs = {
'type': config.exa_search_type or "auto",
'num_results': min(config.max_sources, 25),
'contents': {
'text': {'max_characters': 1000},
'summary': {'query': f"Key insights about {topic}"},
'highlights': {
@@ -45,7 +41,20 @@ class ExaResearchProvider(BaseProvider):
'highlights_per_url': 3
}
}
)
}
# Add optional filters
if category:
search_kwargs['category'] = category
if config.exa_include_domains:
search_kwargs['include_domains'] = config.exa_include_domains
if config.exa_exclude_domains:
search_kwargs['exclude_domains'] = config.exa_exclude_domains
logger.info(f"[Exa Research] Executing search: {query}")
# Execute Exa search
results = self.exa.search_and_contents(query, **search_kwargs)
# Transform to standardized format
sources = self._transform_sources(results.results)

View File

@@ -340,12 +340,8 @@ def create_blog_post(
logger.warning("All tag IDs were invalid, not including tagIds in payload")
# Build SEO data from metadata if provided
# TESTING: Skip SEO data temporarily to confirm richContent fix
test_skip_seo = True
if test_skip_seo:
logger.warning("🧪 TESTING: Skipping SEO data to isolate richContent vs seoData issue")
seo_data = None
elif seo_metadata:
seo_data = None
if seo_metadata:
logger.warning(f"📊 Building SEO data from metadata. Keys: {list(seo_metadata.keys())}")
seo_data = build_seo_data(seo_metadata, title)
if seo_data:
@@ -371,13 +367,10 @@ def create_blog_post(
logger.warning("⚠️ SEO data was empty after building - check build_seo_data function")
# Add SEO slug if provided (separate field from seoData)
if seo_metadata and seo_metadata.get('url_slug'):
if seo_metadata.get('url_slug'):
blog_data['draftPost']['seoSlug'] = str(seo_metadata.get('url_slug')).strip()
logger.warning(f"✅ Added SEO slug: {blog_data['draftPost']['seoSlug']}")
if test_skip_seo:
logger.warning("⚠️ SEO data skipped for testing - will add back once richContent is confirmed working")
elif not seo_metadata:
else:
logger.warning("⚠️ No SEO metadata provided to create_blog_post")
# Log the payload structure for debugging (without sensitive data)

View File

@@ -390,17 +390,44 @@ class LimitValidator:
logger.info(f"[Pre-flight Check] 📅 Billing Period: {current_period} (for user {user_id})")
# Ensure schema columns exist before querying
try:
from services.subscription.schema_utils import ensure_usage_summaries_columns
ensure_usage_summaries_columns(self.db)
except Exception as schema_err:
logger.warning(f"Schema check failed, will retry on query error: {schema_err}")
# Explicitly expire any cached objects and refresh from DB to ensure fresh data
self.db.expire_all()
usage = self.db.query(UsageSummary).filter(
UsageSummary.user_id == user_id,
UsageSummary.billing_period == current_period
).first()
try:
usage = self.db.query(UsageSummary).filter(
UsageSummary.user_id == user_id,
UsageSummary.billing_period == current_period
).first()
# CRITICAL: Explicitly refresh from database to get latest values (clears SQLAlchemy cache)
if usage:
self.db.refresh(usage)
# CRITICAL: Explicitly refresh from database to get latest values (clears SQLAlchemy cache)
if usage:
self.db.refresh(usage)
except Exception as query_err:
error_str = str(query_err).lower()
if 'no such column' in error_str and 'exa_calls' in error_str:
logger.warning("Missing column detected in usage query, fixing schema and retrying...")
import sqlite3
import services.subscription.schema_utils as schema_utils
schema_utils._checked_usage_summaries_columns = False
from services.subscription.schema_utils import ensure_usage_summaries_columns
ensure_usage_summaries_columns(self.db)
self.db.expire_all()
# Retry the query
usage = self.db.query(UsageSummary).filter(
UsageSummary.user_id == user_id,
UsageSummary.billing_period == current_period
).first()
if usage:
self.db.refresh(usage)
else:
raise
# Log what we actually read from database
if usage:
@@ -718,8 +745,40 @@ class LimitValidator:
except Exception as e:
error_type = type(e).__name__
error_message = str(e)
logger.error(f"[Pre-flight Check] ❌ Error during comprehensive limit check: {error_type}: {error_message}", exc_info=True)
error_message = str(e).lower()
# Handle missing column errors with schema fix and retry
if 'operationalerror' in error_type.lower() or 'operationalerror' in error_message:
if 'no such column' in error_message and 'exa_calls' in error_message:
logger.warning("Missing column detected in limit check, attempting schema fix...")
try:
import sqlite3
import services.subscription.schema_utils as schema_utils
schema_utils._checked_usage_summaries_columns = False
from services.subscription.schema_utils import ensure_usage_summaries_columns
ensure_usage_summaries_columns(self.db)
self.db.expire_all()
# Retry the query
usage = self.db.query(UsageSummary).filter(
UsageSummary.user_id == user_id,
UsageSummary.billing_period == current_period
).first()
if usage:
self.db.refresh(usage)
# Continue with the rest of the validation using the retried usage
# (The rest of the function logic continues from here)
# For now, we'll let it fall through to return the error since we'd need to duplicate the entire validation logic
# Instead, we'll just log and return, but the next call should succeed
logger.info(f"[Pre-flight Check] Schema fixed, but need to retry validation on next call")
return False, f"Schema updated, please retry: Database schema was updated. Please try again.", {'error_type': 'schema_update', 'retry': True}
except Exception as retry_err:
logger.error(f"Schema fix and retry failed: {retry_err}")
return False, f"Failed to validate limits: {error_type}: {str(e)}", {}
logger.error(f"[Pre-flight Check] ❌ Error during comprehensive limit check: {error_type}: {str(e)}", exc_info=True)
logger.error(f"[Pre-flight Check] ❌ User: {user_id}, Operations count: {len(operations) if operations else 0}")
return False, f"Failed to validate limits: {error_type}: {error_message}", {}
return False, f"Failed to validate limits: {error_type}: {str(e)}", {}

View File

@@ -1,8 +1,11 @@
from typing import Set
from sqlalchemy.orm import Session
from sqlalchemy import text
from loguru import logger
_checked_subscription_plan_columns: bool = False
_checked_usage_summaries_columns: bool = False
def ensure_subscription_plan_columns(db: Session) -> None:
@@ -17,9 +20,11 @@ def ensure_subscription_plan_columns(db: Session) -> None:
return
try:
# Discover existing columns
result = db.execute("PRAGMA table_info(subscription_plans)")
# Discover existing columns using PRAGMA
result = db.execute(text("PRAGMA table_info(subscription_plans)"))
cols: Set[str] = {row[1] for row in result}
logger.debug(f"Schema check: Found {len(cols)} columns in subscription_plans table")
# Columns we may reference in models but might be missing in older DBs
required_columns = {
@@ -28,12 +33,81 @@ def ensure_subscription_plan_columns(db: Session) -> None:
for col_name, ddl in required_columns.items():
if col_name not in cols:
db.execute(f"ALTER TABLE subscription_plans ADD COLUMN {col_name} {ddl}")
db.commit()
except Exception:
# Do not block app if pragma/alter fails; let normal errors surface
db.rollback()
finally:
logger.info(f"Adding missing column {col_name} to subscription_plans table")
try:
db.execute(text(f"ALTER TABLE subscription_plans ADD COLUMN {col_name} {ddl}"))
db.commit()
logger.info(f"Successfully added column {col_name}")
except Exception as alter_err:
logger.error(f"Failed to add column {col_name}: {alter_err}")
db.rollback()
# Don't set flag on error - allow retry
raise
else:
logger.debug(f"Column {col_name} already exists")
# Only set flag if we successfully completed the check
_checked_subscription_plan_columns = True
except Exception as e:
logger.error(f"Error ensuring subscription_plan columns: {e}", exc_info=True)
db.rollback()
# Don't set the flag if there was an error, so we retry next time
_checked_subscription_plan_columns = False
raise
def ensure_usage_summaries_columns(db: Session) -> None:
"""Ensure required columns exist on usage_summaries for runtime safety.
This is a defensive guard for environments where migrations have not yet
been applied. If columns are missing (e.g., exa_calls, exa_cost), we add them
with a safe default so ORM queries do not fail.
"""
global _checked_usage_summaries_columns
if _checked_usage_summaries_columns:
return
try:
# Discover existing columns using PRAGMA
result = db.execute(text("PRAGMA table_info(usage_summaries)"))
cols: Set[str] = {row[1] for row in result}
logger.debug(f"Schema check: Found {len(cols)} columns in usage_summaries table")
# Columns we may reference in models but might be missing in older DBs
required_columns = {
"exa_calls": "INTEGER DEFAULT 0",
"exa_cost": "REAL DEFAULT 0.0",
}
for col_name, ddl in required_columns.items():
if col_name not in cols:
logger.info(f"Adding missing column {col_name} to usage_summaries table")
try:
db.execute(text(f"ALTER TABLE usage_summaries ADD COLUMN {col_name} {ddl}"))
db.commit()
logger.info(f"Successfully added column {col_name}")
except Exception as alter_err:
logger.error(f"Failed to add column {col_name}: {alter_err}")
db.rollback()
# Don't set flag on error - allow retry
raise
else:
logger.debug(f"Column {col_name} already exists")
# Only set flag if we successfully completed the check
_checked_usage_summaries_columns = True
except Exception as e:
logger.error(f"Error ensuring usage_summaries columns: {e}", exc_info=True)
db.rollback()
# Don't set the flag if there was an error, so we retry next time
_checked_usage_summaries_columns = False
raise
def ensure_all_schema_columns(db: Session) -> None:
"""Ensure all required columns exist in subscription-related tables."""
ensure_subscription_plan_columns(db)
ensure_usage_summaries_columns(db)