ALwrity onboarding final step

This commit is contained in:
ajaysi
2025-10-10 23:19:28 +05:30
parent e3daebec16
commit b1ebe1034e
38 changed files with 4867 additions and 770 deletions

View File

@@ -35,14 +35,31 @@ class StepData:
class OnboardingProgress:
"""Manages onboarding progress with persistence and validation."""
def __init__(self, progress_file: Optional[str] = None):
def __init__(self, progress_file: Optional[str] = None, user_id: Optional[str] = None):
self.steps = self._initialize_steps()
self.current_step = 1
self.started_at = datetime.now().isoformat()
self.last_updated = datetime.now().isoformat()
self.is_completed = False
self.completed_at = None
self.progress_file = progress_file or ".onboarding_progress.json"
self.user_id = user_id # Add user_id for database isolation
# Use user-specific file for backward compatibility
if user_id:
self.progress_file = progress_file or f".onboarding_progress_{user_id}.json"
else:
self.progress_file = progress_file or ".onboarding_progress.json"
# Initialize database service for dual persistence
try:
from services.onboarding_database_service import OnboardingDatabaseService
self.db_service = OnboardingDatabaseService()
self.use_database = True
logger.info(f"Database service initialized for user {user_id}")
except Exception as e:
logger.warning(f"Database service not available, using file only: {e}")
self.db_service = None
self.use_database = False
# Load existing progress if available
self.load_progress()
@@ -192,8 +209,9 @@ class OnboardingProgress:
logger.info("Onboarding completed successfully")
def save_progress(self):
"""Save progress to file."""
"""Save progress to both file and database (dual persistence)."""
try:
# Save to JSON file (backward compatibility)
progress_data = {
"steps": [{
"step_number": step.step_number,
@@ -215,6 +233,65 @@ class OnboardingProgress:
json.dump(progress_data, f, indent=2)
logger.debug(f"Progress saved to {self.progress_file}")
# Also save to database if available and user_id is set
if self.use_database and self.db_service and self.user_id:
try:
from services.database import SessionLocal
db = SessionLocal()
try:
# Update session progress
self.db_service.update_step(self.user_id, self.current_step, db)
# Calculate progress percentage
completed_count = sum(1 for s in self.steps if s.status == StepStatus.COMPLETED)
progress_pct = (completed_count / len(self.steps)) * 100
self.db_service.update_progress(self.user_id, progress_pct, db)
# Save step-specific data to appropriate tables
for step in self.steps:
if step.status == StepStatus.COMPLETED and step.data:
if step.step_number == 1: # API Keys
api_keys = step.data.get('api_keys', {})
for provider, key in api_keys.items():
if key:
# Save to database (for user isolation in production)
self.db_service.save_api_key(self.user_id, provider, key, db)
# Also save to .env file ONLY in local development
# This allows local developers to have keys in .env for convenience
# In production, keys are fetched from database per user
is_local = os.getenv('DEPLOY_ENV', 'local') == 'local'
if is_local:
try:
from services.api_key_manager import APIKeyManager
api_key_manager = APIKeyManager()
api_key_manager.save_api_key(provider, key)
logger.info(f"[LOCAL] API key for {provider} saved to .env file")
except Exception as env_error:
logger.warning(f"[LOCAL] Failed to save {provider} API key to .env file: {env_error}")
else:
logger.info(f"[PRODUCTION] API key for {provider} saved to database only (user: {self.user_id})")
# Log database save confirmation
logger.info(f"✅ DATABASE: API key for {provider} saved to database for user {self.user_id}")
elif step.step_number == 2: # Website Analysis
self.db_service.save_website_analysis(self.user_id, step.data, db)
logger.info(f"✅ DATABASE: Website analysis saved to database for user {self.user_id}")
elif step.step_number == 3: # Research Preferences
self.db_service.save_research_preferences(self.user_id, step.data, db)
logger.info(f"✅ DATABASE: Research preferences saved to database for user {self.user_id}")
elif step.step_number == 4: # Persona Generation
self.db_service.save_persona_data(self.user_id, step.data, db)
logger.info(f"✅ DATABASE: Persona data saved to database for user {self.user_id}")
logger.info(f"Progress also saved to database for user {self.user_id}")
finally:
db.close()
except Exception as db_error:
logger.warning(f"Failed to save to database, JSON file still saved: {db_error}")
# Don't fail if database save fails - JSON is still working
except Exception as e:
logger.error(f"Error saving progress: {str(e)}")
@@ -423,8 +500,34 @@ class APIKeyManager:
try:
if provider in self.api_keys:
self.api_keys[provider] = api_key
self._save_to_env_file(provider, api_key)
logger.info(f"API key saved for {provider}")
# Save to database if available and user_id is set
if hasattr(self, 'use_database') and self.use_database and hasattr(self, 'db_service') and self.db_service and hasattr(self, 'user_id') and self.user_id:
try:
from services.database import SessionLocal
db = SessionLocal()
try:
self.db_service.save_api_key(self.user_id, provider, api_key, db)
logger.info(f"✅ DATABASE: API key for {provider} saved to database for user {self.user_id}")
finally:
db.close()
except Exception as db_error:
logger.warning(f"Failed to save {provider} API key to database: {db_error}")
# Also save to .env file in local mode
is_local = os.getenv('DEPLOY_ENV', 'local') == 'local'
if is_local:
# Special handling for CopilotKit - save to frontend/.env
if provider == 'copilotkit':
self._save_to_frontend_env(api_key)
logger.info(f"[LOCAL] CopilotKit API key saved to frontend/.env file")
else:
# Save other keys to backend/.env
self._save_to_env_file(provider, api_key)
logger.info(f"[LOCAL] API key for {provider} saved to backend/.env file")
else:
logger.info(f"[PRODUCTION] API key for {provider} saved to memory only (database handles persistence)")
return True
else:
logger.error(f"Unknown provider: {provider}")
@@ -490,8 +593,50 @@ class APIKeyManager:
"total_providers": len(self.api_keys)
}
def _save_to_frontend_env(self, api_key: str):
"""Save CopilotKit API key to frontend/.env file."""
try:
# Get the frontend directory path
backend_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
frontend_dir = os.path.join(os.path.dirname(backend_dir), "frontend")
env_path = os.path.join(frontend_dir, ".env")
# Read existing .env file
if os.path.exists(env_path):
with open(env_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
else:
lines = []
# Update or add REACT_APP_COPILOTKIT_API_KEY
key_found = False
updated_lines = []
env_var = "REACT_APP_COPILOTKIT_API_KEY"
for line in lines:
if line.startswith(f"{env_var}="):
updated_lines.append(f"{env_var}={api_key}\n")
key_found = True
else:
updated_lines.append(line)
if not key_found:
# Ensure the file ends with a newline before adding new key
if updated_lines and not updated_lines[-1].endswith('\n'):
updated_lines[-1] += '\n'
updated_lines.append(f"{env_var}={api_key}\n")
# Write back to frontend .env file
with open(env_path, 'w', encoding='utf-8') as f:
f.writelines(updated_lines)
logger.debug(f"CopilotKit API key saved to frontend .env file")
except Exception as e:
logger.error(f"Error saving to frontend .env file: {str(e)}")
def _save_to_env_file(self, provider: str, api_key: str):
"""Save API key to .env file."""
"""Save API key to backend .env file."""
try:
env_mapping = {
"openai": "OPENAI_API_KEY",
@@ -513,11 +658,10 @@ class APIKeyManager:
os.environ[env_var] = api_key
# Update .env file - use backend directory path
import os
backend_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
env_path = os.path.join(backend_dir, ".env")
if os.path.exists(env_path):
with open(env_path, 'r') as f:
with open(env_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
else:
lines = []
@@ -532,13 +676,23 @@ class APIKeyManager:
updated_lines.append(line)
if not key_found:
# Ensure the file ends with a newline before adding new key
if updated_lines and not updated_lines[-1].endswith('\n'):
updated_lines[-1] += '\n'
updated_lines.append(f"{env_var}={api_key}\n")
with open(env_path, 'w') as f:
with open(env_path, 'w', encoding='utf-8') as f:
f.writelines(updated_lines)
# Reload environment variables
load_dotenv(override=True)
# Reload environment variables into current process
load_dotenv(env_path, override=True)
# Verify the key is now in environment
loaded_key = os.environ.get(env_var)
if loaded_key == api_key:
logger.info(f"{env_var} loaded into environment (available for immediate use)")
else:
logger.warning(f"⚠️ {env_var} written to .env but not in environment yet")
logger.debug(f"API key saved to .env file for {provider}")
except Exception as e:
@@ -555,13 +709,17 @@ def get_onboarding_progress() -> OnboardingProgress:
return get_onboarding_progress._instance
def get_onboarding_progress_for_user(user_id: str) -> OnboardingProgress:
"""Get or create a per-user onboarding progress instance persisted to a user-specific file."""
"""Get or create a per-user onboarding progress instance with database persistence."""
global _user_onboarding_progress_cache
safe_user_id = ''.join([c if c.isalnum() or c in ('-', '_') else '_' for c in str(user_id)])
if safe_user_id in _user_onboarding_progress_cache:
return _user_onboarding_progress_cache[safe_user_id]
# Create user-specific progress file for backward compatibility
progress_file = f".onboarding_progress_{safe_user_id}.json"
instance = OnboardingProgress(progress_file=progress_file)
# Pass user_id to enable database persistence
instance = OnboardingProgress(progress_file=progress_file, user_id=user_id)
_user_onboarding_progress_cache[safe_user_id] = instance
return instance

View File

@@ -0,0 +1,418 @@
"""
Onboarding Database Service
Provides database-backed storage for onboarding progress with user isolation.
This replaces the JSON file-based storage with proper database persistence.
"""
from typing import Dict, Any, Optional, List
from datetime import datetime
from loguru import logger
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from models.onboarding import OnboardingSession, APIKey, WebsiteAnalysis, ResearchPreferences, PersonaData
from services.database import get_db
class OnboardingDatabaseService:
"""Database service for onboarding with user isolation."""
def __init__(self, db: Session = None):
"""Initialize with optional database session."""
self.db = db
def get_or_create_session(self, user_id: str, db: Session = None) -> OnboardingSession:
"""Get existing onboarding session or create new one for user."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
# Try to get existing session for this user
session = session_db.query(OnboardingSession).filter(
OnboardingSession.user_id == user_id
).first()
if session:
logger.info(f"Found existing onboarding session for user {user_id}")
return session
# Create new session
session = OnboardingSession(
user_id=user_id,
current_step=1,
progress=0.0,
started_at=datetime.now()
)
session_db.add(session)
session_db.commit()
session_db.refresh(session)
logger.info(f"Created new onboarding session for user {user_id}")
return session
except SQLAlchemyError as e:
logger.error(f"Database error in get_or_create_session: {e}")
session_db.rollback()
raise
def get_session_by_user(self, user_id: str, db: Session = None) -> Optional[OnboardingSession]:
"""Get onboarding session for user."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
return session_db.query(OnboardingSession).filter(
OnboardingSession.user_id == user_id
).first()
except SQLAlchemyError as e:
logger.error(f"Error getting session: {e}")
return None
def update_step(self, user_id: str, step_number: int, db: Session = None) -> bool:
"""Update current step for user."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
session = self.get_or_create_session(user_id, session_db)
session.current_step = step_number
session.updated_at = datetime.now()
session_db.commit()
logger.info(f"Updated user {user_id} to step {step_number}")
return True
except SQLAlchemyError as e:
logger.error(f"Error updating step: {e}")
session_db.rollback()
return False
def update_progress(self, user_id: str, progress: float, db: Session = None) -> bool:
"""Update progress percentage for user."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
session = self.get_or_create_session(user_id, session_db)
session.progress = progress
session.updated_at = datetime.now()
session_db.commit()
logger.info(f"Updated user {user_id} progress to {progress}%")
return True
except SQLAlchemyError as e:
logger.error(f"Error updating progress: {e}")
session_db.rollback()
return False
def save_api_key(self, user_id: str, provider: str, api_key: str, db: Session = None) -> bool:
"""Save API key for user with isolation."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
# Get user's onboarding session
session = self.get_or_create_session(user_id, session_db)
# Check if key already exists for this provider and session
existing_key = session_db.query(APIKey).filter(
APIKey.session_id == session.id,
APIKey.provider == provider
).first()
if existing_key:
# Update existing key
existing_key.key = api_key
existing_key.updated_at = datetime.now()
logger.info(f"Updated {provider} API key for user {user_id}")
else:
# Create new key
new_key = APIKey(
session_id=session.id,
provider=provider,
key=api_key
)
session_db.add(new_key)
logger.info(f"Created new {provider} API key for user {user_id}")
session_db.commit()
return True
except SQLAlchemyError as e:
logger.error(f"Error saving API key: {e}")
session_db.rollback()
return False
def get_api_keys(self, user_id: str, db: Session = None) -> Dict[str, str]:
"""Get all API keys for user."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
session = self.get_session_by_user(user_id, session_db)
if not session:
return {}
keys = session_db.query(APIKey).filter(
APIKey.session_id == session.id
).all()
return {key.provider: key.key for key in keys}
except SQLAlchemyError as e:
logger.error(f"Error getting API keys: {e}")
return {}
def save_website_analysis(self, user_id: str, analysis_data: Dict[str, Any], db: Session = None) -> bool:
"""Save website analysis for user."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
session = self.get_or_create_session(user_id, session_db)
# Check if analysis already exists
existing = session_db.query(WebsiteAnalysis).filter(
WebsiteAnalysis.session_id == session.id
).first()
if existing:
# Update existing
existing.website_url = analysis_data.get('website_url', existing.website_url)
existing.writing_style = analysis_data.get('writing_style')
existing.content_characteristics = analysis_data.get('content_characteristics')
existing.target_audience = analysis_data.get('target_audience')
existing.content_type = analysis_data.get('content_type')
existing.recommended_settings = analysis_data.get('recommended_settings')
existing.crawl_result = analysis_data.get('crawl_result')
existing.style_patterns = analysis_data.get('style_patterns')
existing.style_guidelines = analysis_data.get('style_guidelines')
existing.status = analysis_data.get('status', 'completed')
existing.updated_at = datetime.now()
logger.info(f"Updated website analysis for user {user_id}")
else:
# Create new
analysis = WebsiteAnalysis(
session_id=session.id,
website_url=analysis_data.get('website_url', ''),
writing_style=analysis_data.get('writing_style'),
content_characteristics=analysis_data.get('content_characteristics'),
target_audience=analysis_data.get('target_audience'),
content_type=analysis_data.get('content_type'),
recommended_settings=analysis_data.get('recommended_settings'),
crawl_result=analysis_data.get('crawl_result'),
style_patterns=analysis_data.get('style_patterns'),
style_guidelines=analysis_data.get('style_guidelines'),
status=analysis_data.get('status', 'completed')
)
session_db.add(analysis)
logger.info(f"Created website analysis for user {user_id}")
session_db.commit()
return True
except SQLAlchemyError as e:
logger.error(f"Error saving website analysis: {e}")
session_db.rollback()
return False
def get_website_analysis(self, user_id: str, db: Session = None) -> Optional[Dict[str, Any]]:
"""Get website analysis for user."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
session = self.get_session_by_user(user_id, session_db)
if not session:
return None
analysis = session_db.query(WebsiteAnalysis).filter(
WebsiteAnalysis.session_id == session.id
).first()
return analysis.to_dict() if analysis else None
except SQLAlchemyError as e:
logger.error(f"Error getting website analysis: {e}")
return None
def save_research_preferences(self, user_id: str, preferences: Dict[str, Any], db: Session = None) -> bool:
"""Save research preferences for user."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
session = self.get_or_create_session(user_id, session_db)
# Check if preferences already exist
existing = session_db.query(ResearchPreferences).filter(
ResearchPreferences.session_id == session.id
).first()
if existing:
# Update existing
existing.research_depth = preferences.get('research_depth', existing.research_depth)
existing.content_types = preferences.get('content_types', existing.content_types)
existing.auto_research = preferences.get('auto_research', existing.auto_research)
existing.factual_content = preferences.get('factual_content', existing.factual_content)
existing.writing_style = preferences.get('writing_style')
existing.content_characteristics = preferences.get('content_characteristics')
existing.target_audience = preferences.get('target_audience')
existing.recommended_settings = preferences.get('recommended_settings')
existing.updated_at = datetime.now()
logger.info(f"Updated research preferences for user {user_id}")
else:
# Create new
prefs = ResearchPreferences(
session_id=session.id,
research_depth=preferences.get('research_depth', 'standard'),
content_types=preferences.get('content_types', []),
auto_research=preferences.get('auto_research', True),
factual_content=preferences.get('factual_content', True),
writing_style=preferences.get('writing_style'),
content_characteristics=preferences.get('content_characteristics'),
target_audience=preferences.get('target_audience'),
recommended_settings=preferences.get('recommended_settings')
)
session_db.add(prefs)
logger.info(f"Created research preferences for user {user_id}")
session_db.commit()
return True
except SQLAlchemyError as e:
logger.error(f"Error saving research preferences: {e}")
session_db.rollback()
return False
def save_persona_data(self, user_id: str, persona_data: Dict[str, Any], db: Session = None) -> bool:
"""Save persona data for user."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
session = self.get_or_create_session(user_id, session_db)
# Check if persona data already exists for this user
existing = session_db.query(PersonaData).filter(
PersonaData.session_id == session.id
).first()
if existing:
# Update existing persona data
existing.core_persona = persona_data.get('corePersona')
existing.platform_personas = persona_data.get('platformPersonas')
existing.quality_metrics = persona_data.get('qualityMetrics')
existing.selected_platforms = persona_data.get('selectedPlatforms', [])
existing.updated_at = datetime.utcnow()
logger.info(f"Updated persona data for user {user_id}")
else:
# Create new persona data record
persona = PersonaData(
session_id=session.id,
core_persona=persona_data.get('corePersona'),
platform_personas=persona_data.get('platformPersonas'),
quality_metrics=persona_data.get('qualityMetrics'),
selected_platforms=persona_data.get('selectedPlatforms', [])
)
session_db.add(persona)
logger.info(f"Created persona data for user {user_id}")
session_db.commit()
return True
except SQLAlchemyError as e:
logger.error(f"Error saving persona data: {e}")
session_db.rollback()
return False
def get_research_preferences(self, user_id: str, db: Session = None) -> Optional[Dict[str, Any]]:
"""Get research preferences for user."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
session = self.get_session_by_user(user_id, session_db)
if not session:
return None
prefs = session_db.query(ResearchPreferences).filter(
ResearchPreferences.session_id == session.id
).first()
return prefs.to_dict() if prefs else None
except SQLAlchemyError as e:
logger.error(f"Error getting research preferences: {e}")
return None
def mark_onboarding_complete(self, user_id: str, db: Session = None) -> bool:
"""Mark onboarding as complete for user."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
session = self.get_or_create_session(user_id, session_db)
session.current_step = 6 # Final step
session.progress = 100.0
session.updated_at = datetime.now()
session_db.commit()
logger.info(f"Marked onboarding complete for user {user_id}")
return True
except SQLAlchemyError as e:
logger.error(f"Error marking onboarding complete: {e}")
session_db.rollback()
return False
def get_onboarding_status(self, user_id: str, db: Session = None) -> Dict[str, Any]:
"""Get comprehensive onboarding status for user."""
session_db = db or self.db
if not session_db:
raise ValueError("Database session required")
try:
session = self.get_session_by_user(user_id, session_db)
if not session:
# User hasn't started onboarding yet
return {
"is_completed": False,
"current_step": 1,
"progress": 0.0,
"started_at": None,
"updated_at": None
}
return {
"is_completed": session.current_step >= 6 and session.progress >= 100.0,
"current_step": session.current_step,
"progress": session.progress,
"started_at": session.started_at.isoformat() if session.started_at else None,
"updated_at": session.updated_at.isoformat() if session.updated_at else None
}
except SQLAlchemyError as e:
logger.error(f"Error getting onboarding status: {e}")
return {
"is_completed": False,
"current_step": 1,
"progress": 0.0,
"started_at": None,
"updated_at": None
}

View File

@@ -0,0 +1,150 @@
"""
User API Key Context Manager
Provides user-specific API keys to backend services.
In development: Uses .env file
In production: Fetches from database per user
"""
import os
from typing import Optional, Dict
from loguru import logger
from contextlib import contextmanager
class UserAPIKeyContext:
"""
Context manager for user-specific API keys.
Usage:
with UserAPIKeyContext(user_id) as api_keys:
gemini_key = api_keys.get('gemini')
exa_key = api_keys.get('exa')
# Use keys for this specific user
"""
def __init__(self, user_id: Optional[str] = None):
"""
Initialize with optional user_id.
Args:
user_id: User ID to fetch keys for. If None, uses .env keys (local mode)
"""
self.user_id = user_id
self.keys: Dict[str, str] = {}
self._is_local = os.getenv('DEPLOY_ENV', 'local') == 'local'
def __enter__(self):
"""Load API keys when entering context."""
if self._is_local:
# Local mode: Use .env file
self.keys = self._load_from_env()
logger.debug(f"[LOCAL] Loaded API keys from .env file")
elif self.user_id:
# Production mode: Fetch from database
self.keys = self._load_from_database(self.user_id)
logger.debug(f"[PRODUCTION] Loaded API keys from database for user {self.user_id}")
else:
logger.warning("No user_id provided in production mode - using empty keys")
self.keys = {}
return self.keys
def __exit__(self, exc_type, exc_val, exc_tb):
"""Clean up when exiting context."""
self.keys.clear()
return False # Don't suppress exceptions
def _load_from_env(self) -> Dict[str, str]:
"""Load API keys from environment variables (.env file)."""
return {
'gemini': os.getenv('GEMINI_API_KEY', ''),
'exa': os.getenv('EXA_API_KEY', ''),
'copilotkit': os.getenv('COPILOTKIT_API_KEY', ''),
'openai': os.getenv('OPENAI_API_KEY', ''),
'anthropic': os.getenv('ANTHROPIC_API_KEY', ''),
'tavily': os.getenv('TAVILY_API_KEY', ''),
'serper': os.getenv('SERPER_API_KEY', ''),
'firecrawl': os.getenv('FIRECRAWL_API_KEY', ''),
}
def _load_from_database(self, user_id: str) -> Dict[str, str]:
"""Load API keys from database for specific user."""
try:
from services.onboarding_database_service import OnboardingDatabaseService
from services.database import SessionLocal
db_service = OnboardingDatabaseService()
db = SessionLocal()
try:
keys = db_service.get_api_keys(user_id, db)
logger.info(f"Loaded {len(keys)} API keys from database for user {user_id}")
return keys
finally:
db.close()
except Exception as e:
logger.error(f"Failed to load API keys from database for user {user_id}: {e}")
return {}
@staticmethod
def get_user_key(user_id: Optional[str], provider: str) -> Optional[str]:
"""
Convenience method to get a single API key for a user.
Args:
user_id: User ID (None for development mode)
provider: Provider name (e.g., 'gemini', 'exa')
Returns:
API key string or None
"""
with UserAPIKeyContext(user_id) as keys:
return keys.get(provider)
@contextmanager
def user_api_keys(user_id: Optional[str] = None):
"""
Context manager function for easier usage.
Usage:
from services.user_api_key_context import user_api_keys
with user_api_keys(user_id) as keys:
gemini_key = keys.get('gemini')
"""
context = UserAPIKeyContext(user_id)
try:
yield context.__enter__()
finally:
context.__exit__(None, None, None)
# Convenience function for FastAPI dependency injection
def get_user_api_keys(user_id: str) -> Dict[str, str]:
"""
Get user-specific API keys for use in FastAPI endpoints.
Args:
user_id: User ID from current_user
Returns:
Dictionary of API keys for this user
"""
with UserAPIKeyContext(user_id) as keys:
return keys
def get_gemini_key(user_id: Optional[str] = None) -> Optional[str]:
"""Get Gemini API key for user."""
return UserAPIKeyContext.get_user_key(user_id, 'gemini')
def get_exa_key(user_id: Optional[str] = None) -> Optional[str]:
"""Get Exa API key for user."""
return UserAPIKeyContext.get_user_key(user_id, 'exa')
def get_copilotkit_key(user_id: Optional[str] = None) -> Optional[str]:
"""Get CopilotKit API key for user."""
return UserAPIKeyContext.get_user_key(user_id, 'copilotkit')