ALwrity version 0.5.5

This commit is contained in:
ajaysi
2025-08-15 08:28:34 +05:30
parent 2b8c66c4d0
commit 55a97b2fd4
77 changed files with 8717 additions and 7567 deletions

View File

@@ -121,12 +121,27 @@ async def stream_autofill_refresh(
# Phase: Build prompt
yield {"type": "progress", "phase": "prompt", "message": "Preparing prompt…", "progress": 30}
# Phase: AI call - run in background and heartbeat until completion
# Phase: AI call with transparency - run in background and yield transparency messages
yield {"type": "progress", "phase": "ai", "message": "Calling AI…", "progress": 45}
import asyncio
# Create a queue to collect transparency messages
transparency_messages = []
async def yield_transparency_message(message):
transparency_messages.append(message)
logger.info(f"📊 Transparency message collected: {message.get('type', 'unknown')} - {message.get('message', 'no message')}")
return message
# Run the transparency-enabled payload generation
ai_task = asyncio.create_task(
refresh_service.build_fresh_payload(actual_user_id, use_ai=use_ai, ai_only=ai_only)
refresh_service.build_fresh_payload_with_transparency(
actual_user_id,
use_ai=use_ai,
ai_only=ai_only,
yield_callback=yield_transparency_message
)
)
# Heartbeat loop while AI is running
@@ -135,10 +150,23 @@ async def stream_autofill_refresh(
elapsed = (datetime.utcnow() - start_time).total_seconds()
heartbeat_progress = min(heartbeat_progress + 3, 85)
yield {"type": "progress", "phase": "ai_running", "message": f"AI running… {int(elapsed)}s", "progress": heartbeat_progress}
await asyncio.sleep(2)
# Yield any transparency messages that have been collected
while transparency_messages:
message = transparency_messages.pop(0)
logger.info(f"📤 Yielding transparency message: {message.get('type', 'unknown')}")
yield message
await asyncio.sleep(1) # Check more frequently
# Retrieve result or error
final_payload = await ai_task
# Yield any remaining transparency messages after task completion
while transparency_messages:
message = transparency_messages.pop(0)
logger.info(f"📤 Yielding remaining transparency message: {message.get('type', 'unknown')}")
yield message
# Phase: Validate & map
yield {"type": "progress", "phase": "validate", "message": "Validating…", "progress": 92}
@@ -185,7 +213,7 @@ async def refresh_autofill(
actual_user_id = user_id or 1
started = datetime.utcnow()
refresh_service = AutoFillRefreshService(db)
payload = await refresh_service.build_fresh_payload(actual_user_id, use_ai=use_ai, ai_only=ai_only)
payload = await refresh_service.build_fresh_payload_with_transparency(actual_user_id, use_ai=use_ai, ai_only=ai_only)
total_ms = int((datetime.utcnow() - started).total_seconds() * 1000)
meta = payload.get('meta') or {}
meta.update({'http_total_ms': total_ms, 'http_started_at': started.isoformat()})

View File

@@ -67,7 +67,8 @@ async def stream_data(data_generator):
yield f"data: {json.dumps(chunk)}\n\n"
else:
yield f"data: {json.dumps({'message': str(chunk)})}\n\n"
await asyncio.sleep(0.1) # Small delay to prevent overwhelming
# Force immediate flushing by yielding an empty line
yield "\n"
@router.get("/stream/strategies")
async def stream_enhanced_strategies(
@@ -1027,61 +1028,96 @@ async def accept_autofill_inputs(
async def stream_autofill_refresh(
user_id: Optional[int] = Query(None, description="User ID to build auto-fill for"),
use_ai: bool = Query(True, description="Use AI augmentation during refresh"),
ai_only: bool = Query(False, description="AI-first refresh: return AI overrides when available"),
ai_only: bool = Query(True, description="🚨 CRITICAL: Force AI-only generation to ensure real AI values"),
db: Session = Depends(get_db)
):
"""SSE endpoint to stream steps while generating a fresh auto-fill payload (no DB writes)."""
"""SSE endpoint to stream steps while generating a fresh auto-fill payload (FORCE REAL AI GENERATION)."""
async def refresh_generator():
try:
actual_user_id = user_id or 1
start_time = datetime.utcnow()
logger.info(f"🚀 Starting auto-fill refresh stream for user: {actual_user_id}")
yield {"type": "status", "phase": "init", "message": "Starting…", "progress": 5}
logger.info(f"🚀 Starting auto-fill refresh stream for user: {actual_user_id} (FORCE AI GENERATION)")
yield {"type": "status", "phase": "init", "message": "Starting fresh AI generation", "progress": 5}
refresh_service = AutoFillRefreshService(db)
# Phase: Collect onboarding context
yield {"type": "progress", "phase": "context", "message": "Collecting context…", "progress": 15}
yield {"type": "progress", "phase": "context", "message": "Collecting fresh context…", "progress": 15}
# We deliberately do not emit DB-derived values; context is used inside the service
# Phase: Build prompt
yield {"type": "progress", "phase": "prompt", "message": "Preparing prompt…", "progress": 30}
yield {"type": "progress", "phase": "prompt", "message": "Preparing AI prompt…", "progress": 30}
# Phase: AI call - run in background and heartbeat until completion
yield {"type": "progress", "phase": "ai", "message": "Calling AI…", "progress": 45}
# Phase: AI call with transparency - run in background and yield transparency messages
yield {"type": "progress", "phase": "ai", "message": "Calling AI for fresh generation", "progress": 45}
# Add test transparency messages to verify the stream is working
logger.info("🧪 Adding test transparency messages")
yield {"type": "autofill_initialization", "message": "Starting fresh strategy inputs generation process...", "progress": 5}
yield {"type": "autofill_data_collection", "message": "Collecting and analyzing fresh data sources...", "progress": 10}
yield {"type": "autofill_data_quality", "message": "Assessing fresh data quality and completeness...", "progress": 15}
import asyncio
# Simplified approach: directly yield transparency messages
await asyncio.sleep(0.5)
# Phase 8: Alignment Check
yield {"type": "autofill_alignment_check", "message": "Checking strategy alignment and consistency...", "progress": 40}
await asyncio.sleep(0.5)
# Phase 9: Final Review
yield {"type": "autofill_final_review", "message": "Performing final review and optimization...", "progress": 45}
await asyncio.sleep(0.5)
# Phase 10: Complete
logger.info("🧪 Yielding autofill_complete message")
yield {"type": "autofill_complete", "message": "Fresh strategy inputs generation completed successfully...", "progress": 50}
await asyncio.sleep(0.5)
# 🚨 CRITICAL: Force AI generation with transparency
logger.info("🔍 Starting FORCED AI generation with transparency...")
ai_task = asyncio.create_task(
refresh_service.build_fresh_payload(actual_user_id, use_ai=use_ai, ai_only=ai_only)
refresh_service.build_fresh_payload_with_transparency(
actual_user_id,
use_ai=True, # 🚨 CRITICAL: Force AI usage
ai_only=True, # 🚨 CRITICAL: Force AI-only generation
yield_callback=None # We'll handle transparency messages separately
)
)
# Heartbeat loop while AI is running
heartbeat_progress = 50
while not ai_task.done():
elapsed = (datetime.utcnow() - start_time).total_seconds()
heartbeat_progress = min(heartbeat_progress + 3, 85)
yield {"type": "progress", "phase": "ai_running", "message": f"AI running… {int(elapsed)}s", "progress": heartbeat_progress}
await asyncio.sleep(2)
# Retrieve result or error
# Wait for AI task to complete
logger.info("🔍 Waiting for FORCED AI task to complete...")
final_payload = await ai_task
logger.info("🔍 FORCED AI task completed successfully")
# 🚨 CRITICAL: Validate that we got real AI-generated data
meta = final_payload.get('meta', {})
if not meta.get('ai_used', False) or meta.get('ai_overrides_count', 0) == 0:
logger.error("❌ CRITICAL: AI generation failed to produce real values")
yield {"type": "error", "message": "AI generation failed to produce real values. Please try again.", "progress": 100}
return
logger.info("✅ SUCCESS: Real AI-generated values confirmed")
# Phase: Validate & map
yield {"type": "progress", "phase": "validate", "message": "Validating…", "progress": 92}
yield {"type": "progress", "phase": "validate", "message": "Validating fresh AI data", "progress": 92}
# Phase: Transparency
yield {"type": "progress", "phase": "finalize", "message": "Finalizing…", "progress": 96}
yield {"type": "progress", "phase": "finalize", "message": "Finalizing fresh AI results", "progress": 96}
total_ms = int((datetime.utcnow() - start_time).total_seconds() * 1000)
meta = final_payload.get('meta') or {}
meta.update({
'sse_total_ms': total_ms,
'sse_started_at': start_time.isoformat()
'sse_started_at': start_time.isoformat(),
'data_source': 'fresh_ai_generation', # 🚨 CRITICAL: Mark as fresh AI generation
'ai_generation_forced': True # 🚨 CRITICAL: Mark as forced AI generation
})
final_payload['meta'] = meta
yield {"type": "result", "status": "success", "data": final_payload, "progress": 100}
logger.info(f"✅ Auto-fill refresh stream completed for user: {actual_user_id} in {total_ms} ms")
logger.info(f"✅ Auto-fill refresh stream completed for user: {actual_user_id} in {total_ms} ms (FRESH AI GENERATION)")
except Exception as e:
logger.error(f"❌ Error in auto-fill refresh stream: {str(e)}")
yield {"type": "error", "message": str(e), "timestamp": datetime.utcnow().isoformat()}
@@ -1090,7 +1126,9 @@ async def stream_autofill_refresh(
stream_data(refresh_generator()),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "*",
@@ -1111,7 +1149,7 @@ async def refresh_autofill(
actual_user_id = user_id or 1
started = datetime.utcnow()
refresh_service = AutoFillRefreshService(db)
payload = await refresh_service.build_fresh_payload(actual_user_id, use_ai=use_ai, ai_only=ai_only)
payload = await refresh_service.build_fresh_payload_with_transparency(actual_user_id, use_ai=use_ai, ai_only=ai_only)
total_ms = int((datetime.utcnow() - started).total_seconds() * 1000)
meta = payload.get('meta') or {}
meta.update({'http_total_ms': total_ms, 'http_started_at': started.isoformat()})

View File

@@ -137,7 +137,7 @@ class ContentPlanningAIAnalyticsService:
raise ContentPlanningErrorHandler.handle_general_error(e, "generate_strategic_intelligence")
async def get_ai_analytics(self, user_id: Optional[int] = None, strategy_id: Optional[int] = None, force_refresh: bool = False) -> Dict[str, Any]:
"""Get AI analytics with real personalized insights - Database first approach."""
"""Get AI analytics with real personalized insights - FORCE FRESH AI GENERATION."""
try:
logger.info(f"🚀 Starting AI analytics for user: {user_id}, strategy: {strategy_id}, force_refresh: {force_refresh}")
start_time = time.time()
@@ -145,37 +145,51 @@ class ContentPlanningAIAnalyticsService:
# Use user_id or default to 1
current_user_id = user_id or 1
# Skip database check if force_refresh is True
# 🚨 CRITICAL: Always force fresh AI generation for refresh operations
if force_refresh:
logger.info(f"🔄 FORCE REFRESH: Deleting all cached AI analysis for user {current_user_id}")
try:
await self.ai_analysis_db_service.delete_old_ai_analyses(days_old=0)
logger.info(f"✅ Deleted all cached AI analysis for user {current_user_id}")
except Exception as e:
logger.warning(f"⚠️ Failed to delete cached analysis: {str(e)}")
# 🚨 CRITICAL: Skip database check for refresh operations to ensure fresh AI generation
if not force_refresh:
# First, try to get existing AI analysis from database
# Only check database for non-refresh operations
logger.info(f"🔍 Checking database for existing AI analysis for user {current_user_id}")
existing_analysis = await self.ai_analysis_db_service.get_latest_ai_analysis(
user_id=current_user_id,
analysis_type="comprehensive_analysis",
strategy_id=strategy_id,
max_age_hours=24 # Use cached results up to 24 hours old
max_age_hours=1 # 🚨 CRITICAL: Reduced from 24 hours to 1 hour to minimize stale data
)
if existing_analysis:
logger.info(f"✅ Found existing AI analysis in database: {existing_analysis.get('id', 'unknown')}")
cache_age_hours = (datetime.utcnow() - existing_analysis.get('created_at', datetime.utcnow())).total_seconds() / 3600
logger.info(f"✅ Found existing AI analysis in database: {existing_analysis.get('id', 'unknown')} (age: {cache_age_hours:.1f} hours)")
# Return cached results
return {
"insights": existing_analysis.get('insights', []),
"recommendations": existing_analysis.get('recommendations', []),
"total_insights": len(existing_analysis.get('insights', [])),
"total_recommendations": len(existing_analysis.get('recommendations', [])),
"generated_at": existing_analysis.get('created_at', datetime.utcnow()).isoformat(),
"ai_service_status": existing_analysis.get('ai_service_status', 'operational'),
"processing_time": f"{existing_analysis.get('processing_time', 0):.2f}s" if existing_analysis.get('processing_time') else "cached",
"personalized_data_used": True if existing_analysis.get('personalized_data_used') else False,
"data_source": "database_cache",
"cache_age_hours": (datetime.utcnow() - existing_analysis.get('created_at', datetime.utcnow())).total_seconds() / 3600,
"user_profile": existing_analysis.get('personalized_data_used', {})
}
# Return cached results only if very recent (less than 1 hour)
if cache_age_hours < 1:
logger.info(f"📋 Using cached AI analysis (age: {cache_age_hours:.1f} hours)")
return {
"insights": existing_analysis.get('insights', []),
"recommendations": existing_analysis.get('recommendations', []),
"total_insights": len(existing_analysis.get('insights', [])),
"total_recommendations": len(existing_analysis.get('recommendations', [])),
"generated_at": existing_analysis.get('created_at', datetime.utcnow()).isoformat(),
"ai_service_status": existing_analysis.get('ai_service_status', 'operational'),
"processing_time": f"{existing_analysis.get('processing_time', 0):.2f}s" if existing_analysis.get('processing_time') else "cached",
"personalized_data_used": True if existing_analysis.get('personalized_data_used') else False,
"data_source": "database_cache",
"cache_age_hours": cache_age_hours,
"user_profile": existing_analysis.get('personalized_data_used', {})
}
else:
logger.info(f"🔄 Cached analysis too old ({cache_age_hours:.1f} hours) - generating fresh AI analysis")
# No recent analysis found or force refresh requested, run new AI analysis
logger.info(f"🔄 Running new AI analysis for user {current_user_id} (force_refresh: {force_refresh})")
# 🚨 CRITICAL: Always run fresh AI analysis for refresh operations
logger.info(f"🔄 Running FRESH AI analysis for user {current_user_id} (force_refresh: {force_refresh})")
# Get personalized inputs from onboarding data
personalized_inputs = self.onboarding_service.get_personalized_ai_inputs(current_user_id)

View File

@@ -6,6 +6,7 @@ import traceback
from .autofill_service import AutoFillService
from ...ai_analytics_service import ContentPlanningAIAnalyticsService
from .ai_structured_autofill import AIStructuredAutofillService
from .transparency_service import AutofillTransparencyService
logger = logging.getLogger(__name__)
@@ -19,6 +20,7 @@ class AutoFillRefreshService:
self.autofill = AutoFillService(db)
self.ai_analytics = ContentPlanningAIAnalyticsService()
self.structured_ai = AIStructuredAutofillService()
self.transparency = AutofillTransparencyService(db)
async def build_fresh_payload(self, user_id: int, use_ai: bool = True, ai_only: bool = False) -> Dict[str, Any]:
"""Build a fresh auto-fill payload.
@@ -73,8 +75,9 @@ class AutoFillRefreshService:
except Exception:
pass
if ai_only and use_ai:
logger.info("AutoFillRefreshService: AI-only refresh enabled; generating full 30+ fields via AI")
# 🚨 CRITICAL: Always use AI-only generation for refresh to ensure real AI values
if use_ai:
logger.info("AutoFillRefreshService: FORCING AI-only generation for refresh to ensure real AI values")
try:
ai_payload = await self.structured_ai.generate_autofill_fields(user_id, base_context)
meta = ai_payload.get('meta') or {}
@@ -89,11 +92,28 @@ class AutoFillRefreshService:
logger.info(f" - Missing fields: {len(meta.get('missing_fields', []))}")
logger.info(f" - Fields generated: {len(ai_payload.get('fields', {}))}")
# 🚨 VALIDATION: Ensure we have real AI-generated data
if not meta.get('ai_used', False) or meta.get('ai_overrides_count', 0) == 0:
logger.error("❌ CRITICAL: AI generation failed to produce real values - returning error")
return {
'fields': {},
'sources': {},
'meta': {
'ai_used': False,
'ai_overrides_count': 0,
'ai_override_fields': [],
'ai_only': True,
'error': 'AI generation failed to produce real values. Please try again.',
'data_source': 'ai_generation_failed'
}
}
logger.info("✅ SUCCESS: Real AI-generated values produced")
return ai_payload
except Exception as e:
logger.error("AI-only structured generation failed | user=%s | err=%s", user_id, repr(e))
logger.error("Traceback:\n%s", traceback.format_exc())
# Return graceful fallback instead of raising
# Return error instead of fallback to prevent stale data
return {
'fields': {},
'sources': {},
@@ -102,91 +122,197 @@ class AutoFillRefreshService:
'ai_overrides_count': 0,
'ai_override_fields': [],
'ai_only': True,
'error': str(e)
'error': f'AI generation failed: {str(e)}. Please try again.',
'data_source': 'ai_generation_error'
}
}
# Fallback to previous behavior (DB + sparse overrides)
logger.info("AutoFillRefreshService: using fallback behavior (DB + sparse overrides)")
payload = await self.autofill.get_autofill(user_id)
logger.info("AutoFillRefreshService: Base payload fields: %d", len(payload.get('fields', {})))
ai_overrides: Dict[str, Any] = {}
if use_ai:
# Hook to integrate AI-generated overrides for certain fields, if available
ai_overrides = await self._generate_ai_overrides(user_id, payload)
if ai_overrides:
logger.debug("AutoFillRefreshService: merging %d AI overrides", len(ai_overrides))
# Merge AI overrides into fields while preserving sources/transparency
fields = payload.get('fields', {})
for key, override_value in ai_overrides.items():
if key in fields and isinstance(fields[key], dict):
fields[key]['value'] = override_value
else:
fields[key] = {'value': override_value, 'source': 'ai_refresh', 'confidence': 0.8}
payload['fields'] = fields
# Label sources for overridden fields as coming from AI refresh (non-persistent)
sources = payload.get('sources', {})
for key in ai_overrides.keys():
sources[key] = 'ai_refresh'
payload['sources'] = sources
# If ai_only requested, we still keep onboarding values where AI is silent (fallback), but we track AI usage
overridden_keys = list(ai_overrides.keys())
payload['meta'] = {
'ai_used': len(overridden_keys) > 0,
'ai_overrides_count': len(overridden_keys),
'ai_override_fields': overridden_keys,
'ai_only': ai_only,
# 🚨 CRITICAL: If AI is disabled, return error instead of stale database data
logger.error("❌ CRITICAL: AI generation is disabled - cannot provide real AI values")
return {
'fields': {},
'sources': {},
'meta': {
'ai_used': False,
'ai_overrides_count': 0,
'ai_override_fields': [],
'ai_only': False,
'error': 'AI generation is required for refresh. Please enable AI and try again.',
'data_source': 'ai_disabled'
}
}
logger.info("AutoFillRefreshService: Applied AI overrides for %d fields: %s", len(ai_overrides), overridden_keys)
return payload
async def _generate_ai_overrides(self, user_id: int, base_payload: Dict[str, Any]) -> Dict[str, Any]:
"""Produce AI overrides for selected fields based on current context.
Calls AI analytics with force refresh to avoid stale DB values.
Logs raw AI response and mapped overrides for transparency.
async def build_fresh_payload_with_transparency(self, user_id: int, use_ai: bool = True, ai_only: bool = False, yield_callback=None) -> Dict[str, Any]:
"""Build a fresh auto-fill payload with transparency messages.
Args:
user_id: User ID to build payload for
use_ai: Whether to use AI augmentation
ai_only: Whether to use AI-only generation
yield_callback: Callback function to yield transparency messages
"""
try:
logger.info(f"AutoFillRefreshService: Invoking AI analytics for user {user_id} with force refresh")
ai_resp = await self.ai_analytics.get_ai_analytics(user_id=user_id, strategy_id=None, force_refresh=True) # type: ignore
# Log high-level response structure
if isinstance(ai_resp, dict):
keys = list(ai_resp.keys())
logger.info(f"AI analytics response keys: {keys}")
# Optionally log truncated insights/recommendations
insights = ai_resp.get('insights')
recs = ai_resp.get('recommendations')
if insights is not None:
logger.info(f"AI insights count: {len(insights) if hasattr(insights, '__len__') else 'n/a'}")
if recs is not None:
logger.info(f"AI recommendations count: {len(recs) if hasattr(recs, '__len__') else 'n/a'}")
else:
logger.warning("AI analytics response is not a dict; skipping mapping")
return {}
# Minimal, conservative mapping attempt (only if safely found)
overrides: Dict[str, Any] = {}
# Example: try to map preferred_formats from recommendations if present
logger.info(f"AutoFillRefreshService: starting build_fresh_payload_with_transparency | user=%s | use_ai=%s | ai_only=%s", user_id, use_ai, ai_only)
# Phase 1: Initialization
if yield_callback:
logger.info("AutoFillRefreshService: generating autofill_initialization message")
await yield_callback(self.transparency.generate_phase_message('autofill_initialization'))
# Phase 2: Data Collection
if yield_callback:
logger.info("AutoFillRefreshService: generating autofill_data_collection message")
await yield_callback(self.transparency.generate_phase_message('autofill_data_collection'))
# Base context from onboarding analysis
logger.debug("AutoFillRefreshService: processing onboarding context | user=%s", user_id)
base_context = await self.autofill.integration.process_onboarding_data(user_id, self.db)
# Phase 3: Data Quality Assessment
if yield_callback:
data_source_summary = self.transparency.get_data_source_summary(base_context)
context = {'data_sources': data_source_summary}
await yield_callback(self.transparency.generate_phase_message('autofill_data_quality', context))
# Phase 4: Context Analysis
if yield_callback:
await yield_callback(self.transparency.generate_phase_message('autofill_context_analysis'))
# Phase 5: Strategy Generation
if yield_callback:
await yield_callback(self.transparency.generate_phase_message('autofill_strategy_generation'))
if ai_only and use_ai:
logger.info("AutoFillRefreshService: AI-only refresh enabled; generating full 30+ fields via AI")
# Phase 6: Field Generation
if yield_callback:
await yield_callback(self.transparency.generate_phase_message('autofill_field_generation'))
try:
recs = ai_resp.get('recommendations') or {}
if isinstance(recs, dict):
pf = recs.get('preferred_formats')
if pf:
overrides['preferred_formats'] = pf
# Example: target_metrics from insights/metrics if present
insights = ai_resp.get('insights') or {}
if isinstance(insights, dict):
tm = insights.get('target_metrics') or insights.get('kpi_targets')
if tm:
overrides['target_metrics'] = tm
except Exception as map_err:
logger.warning(f"AI override mapping encountered an issue: {map_err}")
logger.info(f"AI override mapping produced {len(overrides)} fields: {list(overrides.keys())}")
return overrides
except Exception as e:
logger.error(f"AI override generation failed: {e}")
return {}
ai_payload = await self.structured_ai.generate_autofill_fields(user_id, base_context)
meta = ai_payload.get('meta') or {}
# 🚨 VALIDATION: Ensure we have real AI-generated data
if not meta.get('ai_used', False) or meta.get('ai_overrides_count', 0) == 0:
logger.error("❌ CRITICAL: AI generation failed to produce real values - returning error")
return {
'fields': {},
'sources': {},
'meta': {
'ai_used': False,
'ai_overrides_count': 0,
'ai_override_fields': [],
'ai_only': True,
'error': 'AI generation failed to produce real values. Please try again.',
'data_source': 'ai_generation_failed'
}
}
# Phase 7: Quality Validation
if yield_callback:
validation_context = {
'validation_results': {
'passed': len(ai_payload.get('fields', {})),
'total': 30 # Approximate total fields
}
}
await yield_callback(self.transparency.generate_phase_message('autofill_quality_validation', validation_context))
# Phase 8: Alignment Check
if yield_callback:
await yield_callback(self.transparency.generate_phase_message('autofill_alignment_check'))
# Phase 9: Final Review
if yield_callback:
await yield_callback(self.transparency.generate_phase_message('autofill_final_review'))
# Phase 10: Complete
if yield_callback:
logger.info("AutoFillRefreshService: generating autofill_complete message")
await yield_callback(self.transparency.generate_phase_message('autofill_complete'))
logger.info("✅ SUCCESS: Real AI-generated values produced with transparency")
return ai_payload
except Exception as e:
logger.error("AI-only structured generation failed | user=%s | err=%s", user_id, repr(e))
logger.error("Traceback:\n%s", traceback.format_exc())
return {
'fields': {},
'sources': {},
'meta': {
'ai_used': False,
'ai_overrides_count': 0,
'ai_override_fields': [],
'ai_only': True,
'error': f'AI generation failed: {str(e)}. Please try again.',
'data_source': 'ai_generation_error'
}
}
# 🚨 CRITICAL: Force AI generation for refresh - no fallback to database
if use_ai:
logger.info("AutoFillRefreshService: FORCING AI generation for refresh to ensure real AI values")
# Phase 6: Field Generation (for AI generation)
if yield_callback:
await yield_callback(self.transparency.generate_phase_message('autofill_field_generation'))
try:
ai_payload = await self.structured_ai.generate_autofill_fields(user_id, base_context)
meta = ai_payload.get('meta') or {}
# 🚨 VALIDATION: Ensure we have real AI-generated data
if not meta.get('ai_used', False) or meta.get('ai_overrides_count', 0) == 0:
logger.error("❌ CRITICAL: AI generation failed to produce real values - returning error")
return {
'fields': {},
'sources': {},
'meta': {
'ai_used': False,
'ai_overrides_count': 0,
'ai_override_fields': [],
'ai_only': False,
'error': 'AI generation failed to produce real values. Please try again.',
'data_source': 'ai_generation_failed'
}
}
# Phase 7-10: Validation, Alignment, Review, Complete
if yield_callback:
await yield_callback(self.transparency.generate_phase_message('autofill_quality_validation'))
await yield_callback(self.transparency.generate_phase_message('autofill_alignment_check'))
await yield_callback(self.transparency.generate_phase_message('autofill_final_review'))
await yield_callback(self.transparency.generate_phase_message('autofill_complete'))
logger.info("✅ SUCCESS: Real AI-generated values produced with transparency")
return ai_payload
except Exception as e:
logger.error("AI generation failed | user=%s | err=%s", user_id, repr(e))
logger.error("Traceback:\n%s", traceback.format_exc())
return {
'fields': {},
'sources': {},
'meta': {
'ai_used': False,
'ai_overrides_count': 0,
'ai_override_fields': [],
'ai_only': False,
'error': f'AI generation failed: {str(e)}. Please try again.',
'data_source': 'ai_generation_error'
}
}
# 🚨 CRITICAL: If AI is disabled, return error instead of stale database data
logger.error("❌ CRITICAL: AI generation is disabled - cannot provide real AI values")
return {
'fields': {},
'sources': {},
'meta': {
'ai_used': False,
'ai_overrides_count': 0,
'ai_override_fields': [],
'ai_only': False,
'error': 'AI generation is required for refresh. Please enable AI and try again.',
'data_source': 'ai_disabled'
}
}

View File

@@ -0,0 +1,575 @@
"""
Transparency Service for Autofill Process
Generates educational content and transparency messages for the strategy inputs autofill process.
"""
from typing import Dict, Any, List, Optional
from sqlalchemy.orm import Session
from loguru import logger
import json
from datetime import datetime
class AutofillTransparencyService:
"""Service for generating educational content and transparency messages during autofill process."""
def __init__(self, db: Session):
self.db = db
def calculate_field_confidence_score(self, field_id: str, data_source: str, input_data: Any) -> float:
"""Calculate confidence score for a specific field based on data quality and completeness."""
# Base confidence scores by data source
source_confidence = {
'website_analysis': 0.85,
'research_preferences': 0.92,
'api_keys': 0.78,
'onboarding_session': 0.88,
'unknown': 0.70
}
base_confidence = source_confidence.get(data_source, 0.70)
# Adjust based on data completeness
completeness_score = self._calculate_data_completeness(input_data)
# Adjust based on data freshness (if applicable)
freshness_score = self._calculate_data_freshness(data_source)
# Adjust based on field-specific factors
field_factor = self._get_field_specific_factor(field_id)
# Calculate final confidence score
final_confidence = base_confidence * completeness_score * freshness_score * field_factor
# Ensure confidence is between 0.5 and 1.0
return max(0.5, min(1.0, final_confidence))
def calculate_field_data_quality(self, field_id: str, data_source: str, input_data: Any) -> float:
"""Calculate data quality score for a specific field."""
# Base quality scores by data source
source_quality = {
'website_analysis': 0.88,
'research_preferences': 0.94,
'api_keys': 0.82,
'onboarding_session': 0.90,
'unknown': 0.75
}
base_quality = source_quality.get(data_source, 0.75)
# Adjust based on data structure and format
structure_score = self._calculate_data_structure_quality(input_data)
# Adjust based on data consistency
consistency_score = self._calculate_data_consistency(field_id, input_data)
# Adjust based on field-specific quality factors
field_quality_factor = self._get_field_quality_factor(field_id)
# Calculate final quality score
final_quality = base_quality * structure_score * consistency_score * field_quality_factor
# Ensure quality is between 0.6 and 1.0
return max(0.6, min(1.0, final_quality))
def _calculate_data_completeness(self, input_data: Any) -> float:
"""Calculate data completeness score."""
if input_data is None:
return 0.3
if isinstance(input_data, str):
return 0.8 if len(input_data.strip()) > 10 else 0.5
if isinstance(input_data, (list, tuple)):
return 0.9 if len(input_data) > 0 else 0.4
if isinstance(input_data, dict):
# Check if dict has meaningful content
if len(input_data) == 0:
return 0.4
# Check if values are not empty
non_empty_values = sum(1 for v in input_data.values() if v and str(v).strip())
return 0.7 + (0.2 * (non_empty_values / len(input_data)))
return 0.8
def _calculate_data_freshness(self, data_source: str) -> float:
"""Calculate data freshness score."""
# Mock freshness calculation - in real implementation, this would check timestamps
freshness_scores = {
'website_analysis': 0.95, # Usually recent
'research_preferences': 0.90, # User-provided, recent
'api_keys': 0.85, # Configuration data
'onboarding_session': 0.92, # Recent user input
'unknown': 0.80
}
return freshness_scores.get(data_source, 0.80)
def _calculate_data_structure_quality(self, input_data: Any) -> float:
"""Calculate data structure quality score."""
if input_data is None:
return 0.5
if isinstance(input_data, str):
# Check if string is well-formed
if len(input_data.strip()) > 0:
return 0.9
return 0.6
if isinstance(input_data, (list, tuple)):
# Check if list has proper structure
if len(input_data) > 0:
return 0.95
return 0.7
if isinstance(input_data, dict):
# Check if dict has proper structure
if len(input_data) > 0:
return 0.92
return 0.6
return 0.8
def _calculate_data_consistency(self, field_id: str, input_data: Any) -> float:
"""Calculate data consistency score."""
# Mock consistency calculation - in real implementation, this would check against expected formats
if input_data is None:
return 0.6
# Field-specific consistency checks
consistency_factors = {
'business_objectives': 0.95,
'target_metrics': 0.92,
'content_budget': 0.88,
'team_size': 0.90,
'implementation_timeline': 0.85,
'market_share': 0.87,
'competitive_position': 0.89,
'performance_metrics': 0.91,
'content_preferences': 0.93,
'consumption_patterns': 0.90,
'audience_pain_points': 0.88,
'buying_journey': 0.89,
'seasonal_trends': 0.86,
'engagement_metrics': 0.92,
'top_competitors': 0.90,
'competitor_content_strategies': 0.87,
'market_gaps': 0.85,
'industry_trends': 0.88,
'emerging_trends': 0.84,
'preferred_formats': 0.93,
'content_mix': 0.89,
'content_frequency': 0.91,
'optimal_timing': 0.88,
'quality_metrics': 0.90,
'editorial_guidelines': 0.87,
'brand_voice': 0.89,
'traffic_sources': 0.92,
'conversion_rates': 0.88,
'content_roi_targets': 0.86,
'ab_testing_capabilities': 0.90
}
return consistency_factors.get(field_id, 0.85)
def _get_field_specific_factor(self, field_id: str) -> float:
"""Get field-specific confidence factor."""
# Some fields are inherently more reliable than others
field_factors = {
'business_objectives': 1.0, # High confidence
'target_metrics': 0.95,
'content_budget': 0.90,
'team_size': 0.92,
'implementation_timeline': 0.88,
'market_share': 0.85,
'competitive_position': 0.87,
'performance_metrics': 0.93,
'content_preferences': 0.96, # User-provided, high confidence
'consumption_patterns': 0.89,
'audience_pain_points': 0.86,
'buying_journey': 0.88,
'seasonal_trends': 0.84,
'engagement_metrics': 0.91,
'top_competitors': 0.89,
'competitor_content_strategies': 0.85,
'market_gaps': 0.83,
'industry_trends': 0.87,
'emerging_trends': 0.82,
'preferred_formats': 0.94,
'content_mix': 0.88,
'content_frequency': 0.90,
'optimal_timing': 0.86,
'quality_metrics': 0.89,
'editorial_guidelines': 0.85,
'brand_voice': 0.87,
'traffic_sources': 0.91,
'conversion_rates': 0.88,
'content_roi_targets': 0.85,
'ab_testing_capabilities': 0.89
}
return field_factors.get(field_id, 0.85)
def _get_field_quality_factor(self, field_id: str) -> float:
"""Get field-specific quality factor."""
# Quality factors based on data complexity and reliability
quality_factors = {
'business_objectives': 0.95,
'target_metrics': 0.93,
'content_budget': 0.90,
'team_size': 0.92,
'implementation_timeline': 0.88,
'market_share': 0.86,
'competitive_position': 0.89,
'performance_metrics': 0.94,
'content_preferences': 0.96,
'consumption_patterns': 0.91,
'audience_pain_points': 0.87,
'buying_journey': 0.89,
'seasonal_trends': 0.85,
'engagement_metrics': 0.93,
'top_competitors': 0.90,
'competitor_content_strategies': 0.86,
'market_gaps': 0.84,
'industry_trends': 0.88,
'emerging_trends': 0.83,
'preferred_formats': 0.95,
'content_mix': 0.89,
'content_frequency': 0.91,
'optimal_timing': 0.87,
'quality_metrics': 0.92,
'editorial_guidelines': 0.86,
'brand_voice': 0.88,
'traffic_sources': 0.93,
'conversion_rates': 0.89,
'content_roi_targets': 0.86,
'ab_testing_capabilities': 0.90
}
return quality_factors.get(field_id, 0.87)
def get_field_mapping_with_metrics(self, auto_populated_fields: Dict[str, Any], data_sources: Dict[str, str], input_data_points: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Get field mapping with confidence scores and data quality metrics."""
field_categories = {
'Business Context': [
'business_objectives', 'target_metrics', 'content_budget', 'team_size',
'implementation_timeline', 'market_share', 'competitive_position', 'performance_metrics'
],
'Audience Intelligence': [
'content_preferences', 'consumption_patterns', 'audience_pain_points',
'buying_journey', 'seasonal_trends', 'engagement_metrics'
],
'Competitive Intelligence': [
'top_competitors', 'competitor_content_strategies', 'market_gaps',
'industry_trends', 'emerging_trends'
],
'Content Strategy': [
'preferred_formats', 'content_mix', 'content_frequency', 'optimal_timing',
'quality_metrics', 'editorial_guidelines', 'brand_voice'
],
'Performance & Analytics': [
'traffic_sources', 'conversion_rates', 'content_roi_targets', 'ab_testing_capabilities'
]
}
result = []
for category_name, field_ids in field_categories.items():
category_fields = []
for field_id in field_ids:
data_source = data_sources.get(field_id, 'unknown')
input_data = input_data_points.get(field_id)
field_value = auto_populated_fields.get(field_id)
# Calculate real confidence and quality scores
confidence_score = self.calculate_field_confidence_score(field_id, data_source, input_data)
data_quality_score = self.calculate_field_data_quality(field_id, data_source, input_data)
category_fields.append({
'fieldId': field_id,
'label': field_id.replace('_', ' ').title(),
'source': data_source,
'value': field_value,
'confidence': confidence_score,
'dataQuality': data_quality_score,
'inputData': input_data
})
result.append({
'category': category_name,
'fields': category_fields
})
return result
def get_phase_educational_content(self, phase: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Generate educational content for a specific phase of the autofill process."""
educational_content = {
'title': '',
'description': '',
'points': [],
'tips': [],
'phase': phase,
'timestamp': datetime.utcnow().isoformat()
}
if phase == 'autofill_initialization':
educational_content.update({
'title': 'Initializing Strategy Inputs Generation',
'description': 'We\'re preparing to analyze your data and generate personalized strategy inputs.',
'points': [
'Analyzing your business context and industry data',
'Preparing AI models for strategy input generation',
'Setting up data quality assessment frameworks',
'Initializing transparency and educational content systems'
],
'tips': [
'This phase ensures all systems are ready for optimal generation',
'The initialization process adapts to your specific business context',
'We\'ll provide real-time transparency throughout the entire process'
]
})
elif phase == 'autofill_data_collection':
educational_content.update({
'title': 'Collecting and Analyzing Data Sources',
'description': 'We\'re gathering and analyzing all available data sources to inform your strategy inputs.',
'points': [
'Retrieving your website analysis and content insights',
'Analyzing competitor data and market positioning',
'Processing research preferences and target audience data',
'Integrating API configurations and external data sources'
],
'tips': [
'More comprehensive data leads to more accurate strategy inputs',
'We prioritize data quality over quantity for better results',
'All data sources are analyzed for relevance and reliability'
]
})
elif phase == 'autofill_data_quality':
educational_content.update({
'title': 'Assessing Data Quality and Completeness',
'description': 'We\'re evaluating the quality and completeness of your data to ensure optimal strategy generation.',
'points': [
'Evaluating data freshness and relevance',
'Assessing completeness of business context information',
'Analyzing data consistency across different sources',
'Identifying potential data gaps and opportunities'
],
'tips': [
'High-quality data ensures more accurate and actionable strategy inputs',
'We\'ll highlight any data gaps that could impact strategy quality',
'Data quality scores help you understand confidence levels'
]
})
elif phase == 'autofill_context_analysis':
educational_content.update({
'title': 'Analyzing Business Context and Strategic Framework',
'description': 'We\'re analyzing your business context to create a strategic framework for content planning.',
'points': [
'Understanding your business objectives and goals',
'Analyzing market position and competitive landscape',
'Evaluating target audience and customer journey',
'Identifying content opportunities and strategic priorities'
],
'tips': [
'This analysis forms the foundation for all strategy inputs',
'We consider both internal and external factors',
'The framework adapts to your specific industry and business model'
]
})
elif phase == 'autofill_strategy_generation':
educational_content.update({
'title': 'Generating Strategic Insights and Recommendations',
'description': 'We\'re generating strategic insights and recommendations based on your data analysis.',
'points': [
'Creating strategic insights from analyzed data',
'Generating actionable recommendations for content strategy',
'Identifying key opportunities and competitive advantages',
'Developing strategic priorities and focus areas'
],
'tips': [
'Strategic insights are tailored to your specific business context',
'Recommendations are actionable and measurable',
'We focus on opportunities that align with your business objectives'
]
})
elif phase == 'autofill_field_generation':
educational_content.update({
'title': 'Generating Individual Strategy Input Fields',
'description': 'We\'re generating specific strategy input fields based on your data and strategic analysis.',
'points': [
'Generating business context and objectives',
'Creating audience intelligence and insights',
'Developing competitive intelligence and positioning',
'Formulating content strategy and performance metrics'
],
'tips': [
'Each field is generated with confidence scores and quality metrics',
'Fields are validated for consistency and alignment',
'You can review and modify any generated field'
]
})
elif phase == 'autofill_quality_validation':
educational_content.update({
'title': 'Validating Generated Strategy Inputs',
'description': 'We\'re validating all generated strategy inputs for quality, consistency, and alignment.',
'points': [
'Checking data quality and completeness',
'Validating field consistency and alignment',
'Ensuring strategic coherence across all inputs',
'Identifying any potential issues or improvements'
],
'tips': [
'Quality validation ensures reliable and actionable strategy inputs',
'We check for consistency across all generated fields',
'Any issues are flagged for your review and consideration'
]
})
elif phase == 'autofill_alignment_check':
educational_content.update({
'title': 'Checking Strategy Alignment and Consistency',
'description': 'We\'re ensuring all strategy inputs are aligned and consistent with your business objectives.',
'points': [
'Verifying alignment with business objectives',
'Checking consistency across strategic inputs',
'Ensuring coherence with market positioning',
'Validating strategic priorities and focus areas'
],
'tips': [
'Alignment ensures all strategy inputs work together effectively',
'Consistency prevents conflicting strategic directions',
'Strategic coherence maximizes the impact of your content strategy'
]
})
elif phase == 'autofill_final_review':
educational_content.update({
'title': 'Performing Final Review and Optimization',
'description': 'We\'re conducting a final review and optimization of all strategy inputs.',
'points': [
'Reviewing all generated strategy inputs',
'Optimizing for maximum strategic impact',
'Ensuring all inputs are actionable and measurable',
'Preparing final strategy input recommendations'
],
'tips': [
'Final review ensures optimal quality and strategic value',
'Optimization maximizes the effectiveness of your strategy',
'All inputs are ready for immediate implementation'
]
})
elif phase == 'autofill_complete':
educational_content.update({
'title': 'Strategy Inputs Generation Completed Successfully',
'description': 'Your strategy inputs have been generated successfully with comprehensive transparency and quality assurance.',
'points': [
'All 30 strategy input fields have been generated',
'Quality validation and alignment checks completed',
'Confidence scores and data quality metrics provided',
'Strategy inputs ready for implementation and review'
],
'tips': [
'Review the generated inputs and modify as needed',
'Use confidence scores to prioritize high-quality inputs',
'The transparency data helps you understand data source influence'
]
})
return educational_content
def get_transparency_message(self, phase: str, context: Dict[str, Any] = None) -> str:
"""Generate a transparency message for a specific phase."""
messages = {
'autofill_initialization': 'Starting strategy inputs generation process...',
'autofill_data_collection': 'Collecting and analyzing data sources from your onboarding and research...',
'autofill_data_quality': 'Assessing data quality and completeness for optimal strategy generation...',
'autofill_context_analysis': 'Analyzing your business context and creating strategic framework...',
'autofill_strategy_generation': 'Generating strategic insights and recommendations using AI...',
'autofill_field_generation': 'Generating individual strategy input fields based on your data...',
'autofill_quality_validation': 'Validating generated strategy inputs for quality and consistency...',
'autofill_alignment_check': 'Checking strategy alignment and consistency across all inputs...',
'autofill_final_review': 'Performing final review and optimization of strategy inputs...',
'autofill_complete': 'Strategy inputs generation completed successfully!'
}
base_message = messages.get(phase, f'Processing phase: {phase}')
# Add context-specific details if available
if context and 'data_sources' in context:
data_sources = context['data_sources']
if data_sources:
source_count = len(data_sources)
base_message += f' (Analyzing {source_count} data sources)'
return base_message
def get_data_source_summary(self, base_context: Dict[str, Any]) -> Dict[str, List[str]]:
"""Get a summary of data sources and their associated fields."""
# Extract data sources from base context
data_sources = {}
# Website analysis fields
website_fields = ['business_objectives', 'target_metrics', 'content_budget', 'team_size',
'implementation_timeline', 'market_share', 'competitive_position',
'performance_metrics', 'engagement_metrics', 'top_competitors',
'competitor_content_strategies', 'market_gaps', 'industry_trends',
'emerging_trends', 'traffic_sources', 'conversion_rates', 'content_roi_targets']
# Research preferences fields
research_fields = ['content_preferences', 'consumption_patterns', 'audience_pain_points',
'buying_journey', 'seasonal_trends', 'preferred_formats', 'content_mix',
'content_frequency', 'optimal_timing', 'quality_metrics', 'editorial_guidelines',
'brand_voice']
# API configuration fields
api_fields = ['ab_testing_capabilities']
# Onboarding session fields (fallback for any remaining fields)
onboarding_fields = []
# Map fields to data sources
for field in website_fields:
data_sources[field] = 'website_analysis'
for field in research_fields:
data_sources[field] = 'research_preferences'
for field in api_fields:
data_sources[field] = 'api_keys'
# Group fields by data source
source_summary = {}
for field, source in data_sources.items():
if source not in source_summary:
source_summary[source] = []
source_summary[source].append(field)
return source_summary
def generate_phase_message(self, phase: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""Generate a complete phase message with transparency information."""
message = self.get_transparency_message(phase, context)
educational_content = self.get_phase_educational_content(phase, context)
return {
'type': phase,
'message': message,
'educational_content': educational_content,
'timestamp': datetime.utcnow().isoformat(),
'context': context or {}
}