ALwrity version 0.5.5

This commit is contained in:
ajaysi
2025-08-13 17:38:54 +05:30
parent 66ece49705
commit 2b8c66c4d0
23 changed files with 3080 additions and 976 deletions

View File

@@ -332,7 +332,7 @@ async def generate_comprehensive_strategy_polling(
"onboarding_data": onboarding_data,
"user_id": user_id,
"generation_config": config or {}
}
}
# Create strategy generation config
generation_config = StrategyGenerationConfig(

View File

@@ -26,6 +26,8 @@ class AutoFillRefreshService:
- Optionally augments with AI overrides (hook, not persisted)
- Returns payload in the same shape as AutoFillService.get_autofill, plus meta
"""
logger.info(f"AutoFillRefreshService: starting build_fresh_payload | user=%s | use_ai=%s | ai_only=%s", user_id, use_ai, ai_only)
# Base context from onboarding analysis (used for AI context only when ai_only)
logger.debug("AutoFillRefreshService: processing onboarding context | user=%s", user_id)
base_context = await self.autofill.integration.process_onboarding_data(user_id, self.db)
@@ -37,6 +39,33 @@ class AutoFillRefreshService:
bool((base_context or {}).get('api_keys_data')),
bool((base_context or {}).get('onboarding_session')),
)
# Log detailed context analysis
logger.info(f"AutoFillRefreshService: detailed context analysis | user=%s", user_id)
if base_context:
website_analysis = base_context.get('website_analysis', {})
research_preferences = base_context.get('research_preferences', {})
api_keys_data = base_context.get('api_keys_data', {})
onboarding_session = base_context.get('onboarding_session', {})
logger.info(f" - Website analysis keys: {list(website_analysis.keys()) if website_analysis else 'None'}")
logger.info(f" - Research preferences keys: {list(research_preferences.keys()) if research_preferences else 'None'}")
logger.info(f" - API keys data keys: {list(api_keys_data.keys()) if api_keys_data else 'None'}")
logger.info(f" - Onboarding session keys: {list(onboarding_session.keys()) if onboarding_session else 'None'}")
# Log specific data points
if website_analysis:
logger.info(f" - Website URL: {website_analysis.get('website_url', 'Not found')}")
logger.info(f" - Website status: {website_analysis.get('status', 'Unknown')}")
if research_preferences:
logger.info(f" - Research depth: {research_preferences.get('research_depth', 'Not found')}")
logger.info(f" - Content types: {research_preferences.get('content_types', 'Not found')}")
if api_keys_data:
logger.info(f" - API providers: {api_keys_data.get('providers', [])}")
logger.info(f" - Total keys: {api_keys_data.get('total_keys', 0)}")
else:
logger.warning(f"AutoFillRefreshService: no base context available | user=%s", user_id)
try:
w = (base_context or {}).get('website_analysis') or {}
r = (base_context or {}).get('research_preferences') or {}
@@ -50,6 +79,16 @@ class AutoFillRefreshService:
ai_payload = await self.structured_ai.generate_autofill_fields(user_id, base_context)
meta = ai_payload.get('meta') or {}
logger.info("AI-only payload meta: ai_used=%s overrides=%s", meta.get('ai_used'), meta.get('ai_overrides_count'))
# Log detailed AI payload analysis
logger.info(f"AutoFillRefreshService: AI payload analysis | user=%s", user_id)
logger.info(f" - AI used: {meta.get('ai_used', False)}")
logger.info(f" - AI overrides count: {meta.get('ai_overrides_count', 0)}")
logger.info(f" - Success rate: {meta.get('success_rate', 0):.1f}%")
logger.info(f" - Attempts: {meta.get('attempts', 0)}")
logger.info(f" - Missing fields: {len(meta.get('missing_fields', []))}")
logger.info(f" - Fields generated: {len(ai_payload.get('fields', {}))}")
return ai_payload
except Exception as e:
logger.error("AI-only structured generation failed | user=%s | err=%s", user_id, repr(e))
@@ -68,6 +107,7 @@ class AutoFillRefreshService:
}
# Fallback to previous behavior (DB + sparse overrides)
logger.info("AutoFillRefreshService: using fallback behavior (DB + sparse overrides)")
payload = await self.autofill.get_autofill(user_id)
logger.info("AutoFillRefreshService: Base payload fields: %d", len(payload.get('fields', {})))

View File

@@ -496,10 +496,21 @@ Generate the complete JSON with all 30 fields personalized for {website_url}:
logger.info("AIStructuredAutofillService: generating %d fields | user=%s", len(CORE_FIELDS), user_id)
logger.debug("AIStructuredAutofillService: properties=%d", len(schema.get('properties', {})))
# Log context summary for debugging
logger.info("AIStructuredAutofillService: context summary | user=%s", user_id)
logger.info(" - Website analysis exists: %s", bool(context_summary.get('user_profile', {}).get('website_url')))
logger.info(" - Research config: %s", context_summary.get('research_config', {}).get('research_depth', 'None'))
logger.info(" - API capabilities: %s", len(context_summary.get('api_capabilities', {}).get('providers', [])))
logger.info(" - Content analysis: %s", bool(context_summary.get('content_analysis')))
logger.info(" - Audience insights: %s", bool(context_summary.get('audience_insights')))
# Log prompt length for debugging
logger.info("AIStructuredAutofillService: prompt length=%d chars | user=%s", len(prompt), user_id)
last_result = None
for attempt in range(self.max_retries + 1):
try:
logger.info(f"AI structured call attempt {attempt + 1}/{self.max_retries + 1}")
logger.info(f"AI structured call attempt {attempt + 1}/{self.max_retries + 1} | user=%s", user_id)
result = await self.ai.execute_structured_json_call(
service_type=AIServiceType.STRATEGIC_INTELLIGENCE,
prompt=prompt,
@@ -507,8 +518,34 @@ Generate the complete JSON with all 30 fields personalized for {website_url}:
)
last_result = result
# Log AI response details
logger.info(f"AI response received | attempt={attempt + 1} | user=%s", user_id)
if isinstance(result, dict):
logger.info(f" - Response keys: {list(result.keys())}")
logger.info(f" - Response type: dict with {len(result)} items")
# Handle wrapped response from AI service manager
if 'data' in result and 'success' in result:
# This is a wrapped response from AI service manager
if result.get('success'):
# Extract the actual AI response from the 'data' field
ai_response = result.get('data', {})
logger.info(f" - Extracted AI response from wrapped response")
logger.info(f" - AI response keys: {list(ai_response.keys()) if isinstance(ai_response, dict) else 'N/A'}")
last_result = ai_response
else:
# AI service failed
error_msg = result.get('error', 'Unknown AI service error')
logger.error(f" - AI service failed: {error_msg}")
last_result = {'error': error_msg}
elif 'error' in result:
logger.error(f" - AI returned error: {result['error']}")
else:
logger.warning(f" - Response type: {type(result)}")
# Check if we should retry
if not self._should_retry(result, attempt):
if not self._should_retry(last_result, attempt):
logger.info(f"Retry not needed | attempt={attempt + 1} | user=%s", user_id)
break
# Add a small delay before retry

View File

@@ -7,6 +7,7 @@ import logging
from typing import Dict, Any, Optional, List
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
import traceback
# Import database models
from models.enhanced_strategy_models import (
@@ -39,6 +40,13 @@ class OnboardingDataIntegrationService:
api_keys_data = self._get_api_keys_data(user_id, db)
onboarding_session = self._get_onboarding_session(user_id, db)
# Log data source status
logger.info(f"Data source status for user {user_id}:")
logger.info(f" - Website analysis: {'✅ Found' if website_analysis else '❌ Missing'}")
logger.info(f" - Research preferences: {'✅ Found' if research_preferences else '❌ Missing'}")
logger.info(f" - API keys data: {'✅ Found' if api_keys_data else '❌ Missing'}")
logger.info(f" - Onboarding session: {'✅ Found' if onboarding_session else '❌ Missing'}")
# Process and integrate data
integrated_data = {
'website_analysis': website_analysis,
@@ -49,6 +57,14 @@ class OnboardingDataIntegrationService:
'processing_timestamp': datetime.utcnow().isoformat()
}
# Log data quality assessment
data_quality = integrated_data['data_quality']
logger.info(f"Data quality assessment for user {user_id}:")
logger.info(f" - Completeness: {data_quality.get('completeness', 0):.2f}")
logger.info(f" - Freshness: {data_quality.get('freshness', 0):.2f}")
logger.info(f" - Relevance: {data_quality.get('relevance', 0):.2f}")
logger.info(f" - Confidence: {data_quality.get('confidence', 0):.2f}")
# Store integrated data
await self._store_integrated_data(user_id, integrated_data, db)
@@ -57,6 +73,7 @@ class OnboardingDataIntegrationService:
except Exception as e:
logger.error(f"Error processing onboarding data for user {user_id}: {str(e)}")
logger.error("Traceback:\n%s", traceback.format_exc())
return self._get_fallback_data()
def _get_website_analysis(self, user_id: int, db: Session) -> Dict[str, Any]:

View File

@@ -7,7 +7,20 @@ import google.genai as genai
from google.genai import types
from dotenv import load_dotenv
load_dotenv(Path('../../../.env'))
# Fix the environment loading path - load from backend directory
current_dir = Path(__file__).parent.parent # services directory
backend_dir = current_dir.parent # backend directory
env_path = backend_dir / '.env'
if env_path.exists():
load_dotenv(env_path)
print(f"Loaded .env from: {env_path}")
else:
# Fallback to current directory
load_dotenv()
print(f"No .env found at {env_path}, using current directory")
from loguru import logger
logger.remove()
logger.add(sys.stdout,
@@ -31,14 +44,33 @@ import logging
logging.basicConfig(level=logging.INFO, format='[%(asctime)s-%(levelname)s-%(module)s-%(lineno)d]- %(message)s')
logger = logging.getLogger(__name__)
def get_gemini_api_key() -> str:
"""Get Gemini API key with proper error handling."""
api_key = os.getenv('GEMINI_API_KEY')
if not api_key:
error_msg = "GEMINI_API_KEY environment variable is not set. Please set it in your .env file."
logger.error(error_msg)
raise ValueError(error_msg)
# Validate API key format (basic check)
if not api_key.startswith('AIza'):
error_msg = "GEMINI_API_KEY appears to be invalid. It should start with 'AIza'."
logger.error(error_msg)
raise ValueError(error_msg)
return api_key
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def gemini_text_response(prompt, temperature, top_p, n, max_tokens, system_prompt):
""" Common functiont to get response from gemini pro Text. """
#FIXME: Include : https://github.com/google-gemini/cookbook/blob/main/quickstarts/rest/System_instructions_REST.ipynb
try:
client = genai.Client(api_key=os.getenv('GEMINI_API_KEY'))
api_key = get_gemini_api_key()
client = genai.Client(api_key=api_key)
logger.info("✅ Gemini client initialized successfully")
except Exception as err:
logger.error(f"Failed to configure Gemini: {err}")
raise
logger.info(f"Temp: {temperature}, MaxTokens: {max_tokens}, TopP: {top_p}, N: {n}")
# Set up AI model config
generation_config = {
@@ -121,20 +153,32 @@ async def test_gemini_api_key(api_key: str) -> tuple[bool, str]:
tuple[bool, str]: A tuple containing (is_valid, message)
"""
try:
# Validate API key format first
if not api_key:
return False, "API key is empty"
if not api_key.startswith('AIza'):
return False, "API key format appears invalid (should start with 'AIza')"
# Configure Gemini with the provided key
genai.configure(api_key=api_key)
client = genai.Client(api_key=api_key)
# Try to list models as a simple API test
models = genai.list_models()
models = client.models.list()
# Check if Gemini Pro is available
if any(model.name == "gemini-pro" for model in models):
model_names = [model.name for model in models]
logger.info(f"Available models: {model_names}")
if any("gemini" in model_name.lower() for model_name in model_names):
return True, "Gemini API key is valid"
else:
return False, "Gemini Pro model not available with this API key"
return False, "No Gemini models available with this API key"
except Exception as e:
return False, f"Error testing Gemini API key: {str(e)}"
error_msg = f"Error testing Gemini API key: {str(e)}"
logger.error(error_msg)
return False, error_msg
def gemini_pro_text_gen(prompt, temperature=0.7, top_p=0.9, top_k=40, max_tokens=2048):
"""
@@ -151,18 +195,20 @@ def gemini_pro_text_gen(prompt, temperature=0.7, top_p=0.9, top_k=40, max_tokens
str: The generated text completion
"""
try:
# Configure the model
model = genai.GenerativeModel('gemini-pro')
# Get API key with proper error handling
api_key = get_gemini_api_key()
client = genai.Client(api_key=api_key)
# Generate content
response = model.generate_content(
prompt,
generation_config=genai.types.GenerationConfig(
# Generate content using the new client
response = client.models.generate_content(
model='gemini-2.5-flash',
contents=prompt,
config=types.GenerateContentConfig(
max_output_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
top_k=top_k,
max_output_tokens=max_tokens,
)
),
)
# Return the generated text
@@ -210,7 +256,10 @@ def gemini_structured_json_response(prompt, schema, temperature=0.7, top_p=0.9,
Generate structured JSON response using Google's Gemini Pro model.
"""
try:
client = genai.Client(api_key=os.getenv('GEMINI_API_KEY'))
# Get API key with proper error handling
api_key = get_gemini_api_key()
client = genai.Client(api_key=api_key)
logger.info("✅ Gemini client initialized for structured JSON response")
# Build config using official SDK schema type
try:
@@ -329,6 +378,10 @@ def gemini_structured_json_response(prompt, schema, temperature=0.7, top_p=0.9,
logger.error(f"Error parsing structured response: {e}")
return {"error": f"Failed to parse JSON response: {e}", "raw_response": (response.text or '')}
except ValueError as e:
# API key related errors
logger.error(f"API key error in Gemini Pro structured JSON generation: {e}")
return {"error": str(e)}
except Exception as e:
logger.error(f"Error in Gemini Pro structured JSON generation: {e}")
return {"error": str(e)}

142
backend/test_env_check.py Normal file
View File

@@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
Test script to check environment variables and API key loading.
"""
import os
import sys
from pathlib import Path
# Add the backend directory to the Python path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from dotenv import load_dotenv
def test_environment_loading():
"""Test environment variable loading."""
print("🔍 Testing environment variable loading...")
# Check current working directory
print(f"Current working directory: {os.getcwd()}")
# Check if .env file exists in various locations
possible_env_paths = [
Path('.env'), # Current directory
Path('../.env'), # Parent directory
Path('../../.env'), # Grandparent directory
Path('../../../.env'), # Great-grandparent directory
Path('backend/.env'), # Backend directory
]
print("\n📁 Checking for .env files:")
for env_path in possible_env_paths:
if env_path.exists():
print(f"✅ Found .env file: {env_path.absolute()}")
else:
print(f"❌ No .env file: {env_path.absolute()}")
# Try to load .env from different locations
print("\n🔄 Attempting to load .env files:")
for env_path in possible_env_paths:
if env_path.exists():
print(f"Loading .env from: {env_path.absolute()}")
load_dotenv(env_path)
break
else:
print("⚠️ No .env file found, trying to load from current directory")
load_dotenv()
# Check environment variables
print("\n🔑 Checking environment variables:")
env_vars_to_check = [
'GEMINI_API_KEY',
'GOOGLE_API_KEY',
'OPENAI_API_KEY',
'DATABASE_URL',
'SECRET_KEY'
]
for var in env_vars_to_check:
value = os.getenv(var)
if value:
# Show first few characters for security
masked_value = value[:8] + "..." if len(value) > 8 else "***"
print(f"{var}: {masked_value}")
else:
print(f"{var}: Not set")
# Test specific Gemini API key loading
print("\n🤖 Testing Gemini API key loading:")
gemini_key = os.getenv('GEMINI_API_KEY')
if gemini_key:
print(f"✅ GEMINI_API_KEY found: {gemini_key[:8]}...")
# Test if the key looks valid
if len(gemini_key) > 20:
print("✅ API key length looks valid")
else:
print("⚠️ API key seems too short")
else:
print("❌ GEMINI_API_KEY not found")
# Check alternative names
alternative_keys = ['GOOGLE_API_KEY', 'GEMINI_KEY', 'GOOGLE_AI_API_KEY']
for alt_key in alternative_keys:
alt_value = os.getenv(alt_key)
if alt_value:
print(f"⚠️ Found alternative key {alt_key}: {alt_value[:8]}...")
return gemini_key is not None
def test_gemini_provider_import():
"""Test importing the Gemini provider."""
print("\n🧪 Testing Gemini provider import...")
try:
from services.llm_providers.gemini_provider import gemini_structured_json_response
print("✅ Successfully imported gemini_structured_json_response")
return True
except Exception as e:
print(f"❌ Failed to import Gemini provider: {e}")
return False
def test_ai_service_manager_import():
"""Test importing the AI service manager."""
print("\n🧪 Testing AI service manager import...")
try:
from services.ai_service_manager import AIServiceManager
print("✅ Successfully imported AIServiceManager")
# Try to create an instance
ai_manager = AIServiceManager()
print("✅ Successfully created AIServiceManager instance")
return True
except Exception as e:
print(f"❌ Failed to import/create AI service manager: {e}")
return False
if __name__ == "__main__":
print("🚀 Starting environment and API key validation tests")
print("=" * 60)
# Test environment loading
env_ok = test_environment_loading()
# Test imports
gemini_import_ok = test_gemini_provider_import()
ai_manager_ok = test_ai_service_manager_import()
print("\n" + "=" * 60)
print("📊 Test Results Summary:")
print(f"Environment loading: {'✅ PASS' if env_ok else '❌ FAIL'}")
print(f"Gemini provider import: {'✅ PASS' if gemini_import_ok else '❌ FAIL'}")
print(f"AI service manager: {'✅ PASS' if ai_manager_ok else '❌ FAIL'}")
if not env_ok:
print("\n💡 To fix environment issues:")
print("1. Create a .env file in the backend directory")
print("2. Add your GEMINI_API_KEY to the .env file")
print("3. Example: GEMINI_API_KEY=your_actual_api_key_here")
print("\n" + "=" * 60)

View File

@@ -0,0 +1,463 @@
#!/usr/bin/env python3
"""
Test script to validate onboarding data existence in the database.
This script checks if onboarding data exists for test users and validates the data flow.
"""
import sys
import os
import asyncio
import logging
from datetime import datetime
from typing import Dict, Any, Optional
# Add the backend directory to the Python path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from sqlalchemy.orm import Session
from services.database import get_db_session
from models.onboarding import OnboardingSession, WebsiteAnalysis, ResearchPreferences, APIKey
from models.enhanced_strategy_models import OnboardingDataIntegration
from api.content_planning.services.content_strategy.onboarding.data_integration import OnboardingDataIntegrationService
from api.content_planning.services.content_strategy.autofill.ai_structured_autofill import AIStructuredAutofillService
from services.ai_service_manager import AIServiceManager
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('onboarding_test.log')
]
)
logger = logging.getLogger(__name__)
class OnboardingDataValidator:
"""Validator for onboarding data existence and quality."""
def __init__(self):
self.db_session = get_db_session()
self.data_integration_service = OnboardingDataIntegrationService()
self.ai_service = AIStructuredAutofillService()
self.ai_manager = AIServiceManager()
def test_database_connection(self) -> bool:
"""Test database connection."""
try:
# Simple query to test connection
from sqlalchemy import text
result = self.db_session.execute(text("SELECT 1"))
logger.info("✅ Database connection successful")
return True
except Exception as e:
logger.error(f"❌ Database connection failed: {e}")
return False
def check_onboarding_sessions(self, user_ids: list = None) -> Dict[int, Dict[str, Any]]:
"""Check onboarding sessions for given user IDs."""
if user_ids is None:
user_ids = [1, 2, 3] # Default test user IDs
results = {}
for user_id in user_ids:
logger.info(f"🔍 Checking onboarding session for user {user_id}")
try:
session = self.db_session.query(OnboardingSession).filter(
OnboardingSession.user_id == user_id
).order_by(OnboardingSession.updated_at.desc()).first()
if session:
results[user_id] = {
'session_exists': True,
'session_id': session.id,
'status': session.status,
'progress': session.progress,
'created_at': session.created_at.isoformat(),
'updated_at': session.updated_at.isoformat(),
'data': session.to_dict() if hasattr(session, 'to_dict') else str(session)
}
logger.info(f"✅ Onboarding session found for user {user_id}: {session.status}")
else:
results[user_id] = {
'session_exists': False,
'error': 'No onboarding session found'
}
logger.warning(f"❌ No onboarding session found for user {user_id}")
except Exception as e:
results[user_id] = {
'session_exists': False,
'error': str(e)
}
logger.error(f"❌ Error checking onboarding session for user {user_id}: {e}")
return results
def check_website_analysis(self, user_ids: list = None) -> Dict[int, Dict[str, Any]]:
"""Check website analysis data for given user IDs."""
if user_ids is None:
user_ids = [1, 2, 3]
results = {}
for user_id in user_ids:
logger.info(f"🔍 Checking website analysis for user {user_id}")
try:
# Get onboarding session first
session = self.db_session.query(OnboardingSession).filter(
OnboardingSession.user_id == user_id
).order_by(OnboardingSession.updated_at.desc()).first()
if not session:
results[user_id] = {
'website_analysis_exists': False,
'error': 'No onboarding session found'
}
continue
# Get website analysis
website_analysis = self.db_session.query(WebsiteAnalysis).filter(
WebsiteAnalysis.session_id == session.id
).order_by(WebsiteAnalysis.updated_at.desc()).first()
if website_analysis:
results[user_id] = {
'website_analysis_exists': True,
'analysis_id': website_analysis.id,
'website_url': website_analysis.website_url,
'status': website_analysis.status,
'created_at': website_analysis.created_at.isoformat(),
'updated_at': website_analysis.updated_at.isoformat(),
'data_keys': list(website_analysis.to_dict().keys()) if hasattr(website_analysis, 'to_dict') else []
}
logger.info(f"✅ Website analysis found for user {user_id}: {website_analysis.website_url}")
else:
results[user_id] = {
'website_analysis_exists': False,
'error': 'No website analysis found'
}
logger.warning(f"❌ No website analysis found for user {user_id}")
except Exception as e:
results[user_id] = {
'website_analysis_exists': False,
'error': str(e)
}
logger.error(f"❌ Error checking website analysis for user {user_id}: {e}")
return results
def check_research_preferences(self, user_ids: list = None) -> Dict[int, Dict[str, Any]]:
"""Check research preferences data for given user IDs."""
if user_ids is None:
user_ids = [1, 2, 3]
results = {}
for user_id in user_ids:
logger.info(f"🔍 Checking research preferences for user {user_id}")
try:
# Get onboarding session first
session = self.db_session.query(OnboardingSession).filter(
OnboardingSession.user_id == user_id
).order_by(OnboardingSession.updated_at.desc()).first()
if not session:
results[user_id] = {
'research_preferences_exists': False,
'error': 'No onboarding session found'
}
continue
# Get research preferences
research_prefs = self.db_session.query(ResearchPreferences).filter(
ResearchPreferences.session_id == session.id
).first()
if research_prefs:
results[user_id] = {
'research_preferences_exists': True,
'prefs_id': research_prefs.id,
'research_depth': research_prefs.research_depth,
'content_types': research_prefs.content_types,
'created_at': research_prefs.created_at.isoformat(),
'updated_at': research_prefs.updated_at.isoformat(),
'data_keys': list(research_prefs.to_dict().keys()) if hasattr(research_prefs, 'to_dict') else []
}
logger.info(f"✅ Research preferences found for user {user_id}: {research_prefs.research_depth}")
else:
results[user_id] = {
'research_preferences_exists': False,
'error': 'No research preferences found'
}
logger.warning(f"❌ No research preferences found for user {user_id}")
except Exception as e:
results[user_id] = {
'research_preferences_exists': False,
'error': str(e)
}
logger.error(f"❌ Error checking research preferences for user {user_id}: {e}")
return results
def check_api_keys(self, user_ids: list = None) -> Dict[int, Dict[str, Any]]:
"""Check API keys data for given user IDs."""
if user_ids is None:
user_ids = [1, 2, 3]
results = {}
for user_id in user_ids:
logger.info(f"🔍 Checking API keys for user {user_id}")
try:
# Get onboarding session first
session = self.db_session.query(OnboardingSession).filter(
OnboardingSession.user_id == user_id
).order_by(OnboardingSession.updated_at.desc()).first()
if not session:
results[user_id] = {
'api_keys_exist': False,
'error': 'No onboarding session found'
}
continue
# Get API keys
api_keys = self.db_session.query(APIKey).filter(
APIKey.session_id == session.id
).all()
if api_keys:
results[user_id] = {
'api_keys_exist': True,
'count': len(api_keys),
'providers': [key.provider for key in api_keys],
'created_at': api_keys[0].created_at.isoformat() if api_keys else None,
'updated_at': api_keys[0].updated_at.isoformat() if api_keys else None
}
logger.info(f"✅ API keys found for user {user_id}: {len(api_keys)} keys")
else:
results[user_id] = {
'api_keys_exist': False,
'error': 'No API keys found'
}
logger.warning(f"❌ No API keys found for user {user_id}")
except Exception as e:
results[user_id] = {
'api_keys_exist': False,
'error': str(e)
}
logger.error(f"❌ Error checking API keys for user {user_id}: {e}")
return results
async def test_data_integration_service(self, user_id: int = 1) -> Dict[str, Any]:
"""Test the data integration service."""
logger.info(f"🔍 Testing data integration service for user {user_id}")
try:
# Test the process_onboarding_data method
integrated_data = await self.data_integration_service.process_onboarding_data(user_id, self.db_session)
if integrated_data:
result = {
'success': True,
'has_website_analysis': bool(integrated_data.get('website_analysis')),
'has_research_preferences': bool(integrated_data.get('research_preferences')),
'has_api_keys_data': bool(integrated_data.get('api_keys_data')),
'has_onboarding_session': bool(integrated_data.get('onboarding_session')),
'data_quality': integrated_data.get('data_quality', {}),
'processing_timestamp': integrated_data.get('processing_timestamp'),
'context_keys': list(integrated_data.keys())
}
logger.info(f"✅ Data integration successful for user {user_id}")
logger.info(f" Website analysis: {result['has_website_analysis']}")
logger.info(f" Research preferences: {result['has_research_preferences']}")
logger.info(f" API keys: {result['has_api_keys_data']}")
logger.info(f" Onboarding session: {result['has_onboarding_session']}")
return result
else:
logger.error(f"❌ Data integration returned None for user {user_id}")
return {'success': False, 'error': 'No data returned'}
except Exception as e:
logger.error(f"❌ Data integration failed for user {user_id}: {e}")
return {'success': False, 'error': str(e)}
async def test_ai_service_configuration(self) -> Dict[str, Any]:
"""Test AI service configuration."""
logger.info("🔍 Testing AI service configuration")
try:
# Test basic AI service functionality
test_prompt = "Generate a simple test response"
test_schema = {
"type": "OBJECT",
"properties": {
"test_field": {"type": "STRING", "description": "A test field"}
},
"required": ["test_field"]
}
# Test the AI service manager
result = await self.ai_manager.execute_structured_json_call(
service_type="STRATEGIC_INTELLIGENCE",
prompt=test_prompt,
schema=test_schema
)
if result and not result.get('error'):
logger.info("✅ AI service configuration successful")
return {
'success': True,
'ai_service_working': True,
'test_response': result
}
else:
logger.error(f"❌ AI service test failed: {result.get('error', 'Unknown error')}")
return {
'success': False,
'ai_service_working': False,
'error': result.get('error', 'Unknown error')
}
except Exception as e:
logger.error(f"❌ AI service configuration test failed: {e}")
return {
'success': False,
'ai_service_working': False,
'error': str(e)
}
async def test_ai_structured_autofill(self, user_id: int = 1) -> Dict[str, Any]:
"""Test the AI structured autofill service."""
logger.info(f"🔍 Testing AI structured autofill for user {user_id}")
try:
# First get the context
integrated_data = await self.data_integration_service.process_onboarding_data(user_id, self.db_session)
if not integrated_data:
logger.error(f"❌ No integrated data available for user {user_id}")
return {'success': False, 'error': 'No integrated data available'}
# Test the AI structured autofill
result = await self.ai_service.generate_autofill_fields(user_id, integrated_data)
if result:
meta = result.get('meta', {})
fields = result.get('fields', {})
test_result = {
'success': True,
'ai_used': meta.get('ai_used', False),
'ai_overrides_count': meta.get('ai_overrides_count', 0),
'success_rate': meta.get('success_rate', 0),
'attempts': meta.get('attempts', 0),
'missing_fields': meta.get('missing_fields', []),
'fields_generated': len(fields),
'sample_fields': list(fields.keys())[:5] if fields else []
}
logger.info(f"✅ AI structured autofill test completed for user {user_id}")
logger.info(f" AI used: {test_result['ai_used']}")
logger.info(f" Fields generated: {test_result['fields_generated']}")
logger.info(f" Success rate: {test_result['success_rate']:.1f}%")
logger.info(f" Attempts: {test_result['attempts']}")
return test_result
else:
logger.error(f"❌ AI structured autofill returned None for user {user_id}")
return {'success': False, 'error': 'No result returned'}
except Exception as e:
logger.error(f"❌ AI structured autofill test failed for user {user_id}: {e}")
return {'success': False, 'error': str(e)}
def print_summary(self, results: Dict[str, Any]):
"""Print a summary of all test results."""
logger.info("\n" + "="*80)
logger.info("📊 ONBOARDING DATA VALIDATION SUMMARY")
logger.info("="*80)
for test_name, result in results.items():
logger.info(f"\n🔍 {test_name.upper()}:")
if isinstance(result, dict):
for key, value in result.items():
if isinstance(value, dict):
logger.info(f" {key}:")
for sub_key, sub_value in value.items():
logger.info(f" {sub_key}: {sub_value}")
else:
logger.info(f" {key}: {value}")
else:
logger.info(f" {result}")
logger.info("\n" + "="*80)
def cleanup(self):
"""Clean up database session."""
if self.db_session:
self.db_session.close()
async def main():
"""Main test function."""
logger.info("🚀 Starting onboarding data validation tests")
validator = OnboardingDataValidator()
try:
# Test database connection
db_connected = validator.test_database_connection()
if not db_connected:
logger.error("❌ Cannot proceed without database connection")
return
# Test user IDs to check
test_user_ids = [1, 2, 3]
# Run all tests
results = {
'database_connection': db_connected,
'onboarding_sessions': validator.check_onboarding_sessions(test_user_ids),
'website_analysis': validator.check_website_analysis(test_user_ids),
'research_preferences': validator.check_research_preferences(test_user_ids),
'api_keys': validator.check_api_keys(test_user_ids),
'data_integration': await validator.test_data_integration_service(1),
'ai_service_config': await validator.test_ai_service_configuration(),
'ai_structured_autofill': await validator.test_ai_structured_autofill(1)
}
# Print summary
validator.print_summary(results)
# Determine overall status
overall_success = all([
results['database_connection'],
any(session.get('session_exists', False) for session in results['onboarding_sessions'].values()),
results['data_integration']['success'],
results['ai_service_config']['success']
])
if overall_success:
logger.info("✅ All critical tests passed!")
else:
logger.error("❌ Some critical tests failed!")
except Exception as e:
logger.error(f"❌ Test execution failed: {e}")
finally:
validator.cleanup()
if __name__ == "__main__":
asyncio.run(main())