Fix: Step 6 Data Retrieval Issue
This commit is contained in:
@@ -8,6 +8,7 @@ from typing import Dict, Any, List, Optional, Union
|
||||
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
|
||||
from pydantic import BaseModel
|
||||
from loguru import logger
|
||||
import os
|
||||
|
||||
# Rate limiting configuration
|
||||
RATE_LIMIT_DELAY_SECONDS = 2.0 # Delay between API calls to prevent quota exhaustion
|
||||
@@ -20,6 +21,7 @@ from services.persona.core_persona.core_persona_service import CorePersonaServic
|
||||
from services.persona.enhanced_linguistic_analyzer import EnhancedLinguisticAnalyzer
|
||||
from services.persona.persona_quality_improver import PersonaQualityImprover
|
||||
from middleware.auth_middleware import get_current_user
|
||||
from services.user_api_key_context import user_api_keys
|
||||
|
||||
# In-memory task storage (in production, use Redis or database)
|
||||
persona_tasks: Dict[str, Dict[str, Any]] = {}
|
||||
@@ -495,76 +497,107 @@ async def execute_persona_generation_task(task_id: str, persona_request: Persona
|
||||
update_task_status(task_id, "running", 10, "Starting persona generation...")
|
||||
logger.info(f"Task {task_id}: Status updated to running")
|
||||
|
||||
# Step 1: Generate core persona (1 API call)
|
||||
update_task_status(task_id, "running", 20, "Generating core persona...")
|
||||
logger.info(f"Task {task_id}: Step 1 - Generating core persona...")
|
||||
|
||||
core_persona = await asyncio.get_event_loop().run_in_executor(
|
||||
None,
|
||||
core_persona_service.generate_core_persona,
|
||||
persona_request.onboarding_data
|
||||
)
|
||||
|
||||
if "error" in core_persona:
|
||||
update_task_status(task_id, "failed", 0, f"Core persona generation failed: {core_persona['error']}")
|
||||
return
|
||||
|
||||
update_task_status(task_id, "running", 40, "Core persona generated successfully")
|
||||
|
||||
# Add small delay after core persona generation
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
# Step 2: Generate platform adaptations with rate limiting (N API calls with delays)
|
||||
update_task_status(task_id, "running", 50, f"Generating platform adaptations for: {persona_request.selected_platforms}")
|
||||
platform_personas = {}
|
||||
|
||||
total_platforms = len(persona_request.selected_platforms)
|
||||
|
||||
# Process platforms sequentially with small delays to avoid rate limits
|
||||
for i, platform in enumerate(persona_request.selected_platforms):
|
||||
# Inject user-specific API keys into environment for the duration of this background task
|
||||
user_id = _extract_user_id(current_user)
|
||||
env_mapping = {
|
||||
'gemini': 'GEMINI_API_KEY',
|
||||
'exa': 'EXA_API_KEY',
|
||||
'openai': 'OPENAI_API_KEY',
|
||||
'anthropic': 'ANTHROPIC_API_KEY',
|
||||
'mistral': 'MISTRAL_API_KEY',
|
||||
'copilotkit': 'COPILOTKIT_API_KEY',
|
||||
'tavily': 'TAVILY_API_KEY',
|
||||
'serper': 'SERPER_API_KEY',
|
||||
'firecrawl': 'FIRECRAWL_API_KEY',
|
||||
}
|
||||
original_env: Dict[str, Optional[str]] = {}
|
||||
with user_api_keys(user_id) as keys:
|
||||
try:
|
||||
progress = 50 + (i * 40 // total_platforms)
|
||||
update_task_status(task_id, "running", progress, f"Generating {platform} persona ({i+1}/{total_platforms})")
|
||||
for provider, env_var in env_mapping.items():
|
||||
value = keys.get(provider)
|
||||
if value:
|
||||
original_env[env_var] = os.environ.get(env_var)
|
||||
os.environ[env_var] = value
|
||||
logger.debug(f"[BG TASK] Injected {env_var} for user {user_id}")
|
||||
|
||||
# Step 1: Generate core persona (1 API call)
|
||||
update_task_status(task_id, "running", 20, "Generating core persona...")
|
||||
logger.info(f"Task {task_id}: Step 1 - Generating core persona...")
|
||||
|
||||
# Add delay between API calls to prevent rate limiting
|
||||
if i > 0: # Skip delay for first platform
|
||||
update_task_status(task_id, "running", progress, f"Rate limiting: Waiting {RATE_LIMIT_DELAY_SECONDS}s before next API call...")
|
||||
await asyncio.sleep(RATE_LIMIT_DELAY_SECONDS)
|
||||
|
||||
# Generate platform persona
|
||||
result = await generate_single_platform_persona_async(
|
||||
core_persona,
|
||||
platform,
|
||||
core_persona = await asyncio.get_event_loop().run_in_executor(
|
||||
None,
|
||||
core_persona_service.generate_core_persona,
|
||||
persona_request.onboarding_data
|
||||
)
|
||||
|
||||
if isinstance(result, Exception):
|
||||
error_msg = str(result)
|
||||
logger.error(f"Platform {platform} generation failed: {error_msg}")
|
||||
platform_personas[platform] = {"error": error_msg}
|
||||
elif "error" in result:
|
||||
error_msg = result['error']
|
||||
logger.error(f"Platform {platform} generation failed: {error_msg}")
|
||||
platform_personas[platform] = result
|
||||
|
||||
# Check for rate limit errors and suggest retry
|
||||
if "429" in error_msg or "quota" in error_msg.lower() or "rate limit" in error_msg.lower():
|
||||
logger.warning(f"⚠️ Rate limit detected for {platform}. Consider increasing RATE_LIMIT_DELAY_SECONDS")
|
||||
else:
|
||||
platform_personas[platform] = result
|
||||
logger.info(f"✅ {platform} persona generated successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Platform {platform} generation error: {str(e)}")
|
||||
platform_personas[platform] = {"error": str(e)}
|
||||
|
||||
# Step 3: Assess quality (no additional API calls - uses existing data)
|
||||
update_task_status(task_id, "running", 90, "Assessing persona quality...")
|
||||
quality_metrics = await assess_persona_quality_internal(
|
||||
core_persona,
|
||||
platform_personas,
|
||||
persona_request.user_preferences
|
||||
)
|
||||
if "error" in core_persona:
|
||||
update_task_status(task_id, "failed", 0, f"Core persona generation failed: {core_persona['error']}")
|
||||
return
|
||||
|
||||
update_task_status(task_id, "running", 40, "Core persona generated successfully")
|
||||
|
||||
# Add small delay after core persona generation
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
# Step 2: Generate platform adaptations with rate limiting (N API calls with delays)
|
||||
update_task_status(task_id, "running", 50, f"Generating platform adaptations for: {persona_request.selected_platforms}")
|
||||
platform_personas = {}
|
||||
|
||||
total_platforms = len(persona_request.selected_platforms)
|
||||
|
||||
# Process platforms sequentially with small delays to avoid rate limits
|
||||
for i, platform in enumerate(persona_request.selected_platforms):
|
||||
try:
|
||||
progress = 50 + (i * 40 // total_platforms)
|
||||
update_task_status(task_id, "running", progress, f"Generating {platform} persona ({i+1}/{total_platforms})")
|
||||
|
||||
# Add delay between API calls to prevent rate limiting
|
||||
if i > 0: # Skip delay for first platform
|
||||
update_task_status(task_id, "running", progress, f"Rate limiting: Waiting {RATE_LIMIT_DELAY_SECONDS}s before next API call...")
|
||||
await asyncio.sleep(RATE_LIMIT_DELAY_SECONDS)
|
||||
|
||||
# Generate platform persona
|
||||
result = await generate_single_platform_persona_async(
|
||||
core_persona,
|
||||
platform,
|
||||
persona_request.onboarding_data
|
||||
)
|
||||
|
||||
if isinstance(result, Exception):
|
||||
error_msg = str(result)
|
||||
logger.error(f"Platform {platform} generation failed: {error_msg}")
|
||||
platform_personas[platform] = {"error": error_msg}
|
||||
elif "error" in result:
|
||||
error_msg = result['error']
|
||||
logger.error(f"Platform {platform} generation failed: {error_msg}")
|
||||
platform_personas[platform] = result
|
||||
|
||||
# Check for rate limit errors and suggest retry
|
||||
if "429" in error_msg or "quota" in error_msg.lower() or "rate limit" in error_msg.lower():
|
||||
logger.warning(f"⚠️ Rate limit detected for {platform}. Consider increasing RATE_LIMIT_DELAY_SECONDS")
|
||||
else:
|
||||
platform_personas[platform] = result
|
||||
logger.info(f"✅ {platform} persona generated successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Platform {platform} generation error: {str(e)}")
|
||||
platform_personas[platform] = {"error": str(e)}
|
||||
|
||||
# Step 3: Assess quality (no additional API calls - uses existing data)
|
||||
update_task_status(task_id, "running", 90, "Assessing persona quality...")
|
||||
quality_metrics = await assess_persona_quality_internal(
|
||||
core_persona,
|
||||
platform_personas,
|
||||
persona_request.user_preferences
|
||||
)
|
||||
finally:
|
||||
# Restore environment
|
||||
for env_var, original_value in original_env.items():
|
||||
if original_value is None:
|
||||
os.environ.pop(env_var, None)
|
||||
else:
|
||||
os.environ[env_var] = original_value
|
||||
logger.debug(f"[BG TASK] Restored environment for user {user_id}")
|
||||
|
||||
# Log performance metrics
|
||||
successful_platforms = len([p for p in platform_personas.values() if "error" not in p])
|
||||
|
||||
Reference in New Issue
Block a user