merge: resolve conflicts favoring local for frontend packages

This commit is contained in:
ajaysi
2025-08-30 16:16:13 +05:30
29 changed files with 4681 additions and 467 deletions

View File

@@ -177,7 +177,7 @@ class AIQualityAnalysisService:
Focus on strategic depth, clarity, and measurability.
"""
ai_response = await gemini_structured_json_response(
ai_response = gemini_structured_json_response(
prompt=prompt,
schema=QUALITY_ANALYSIS_SCHEMA,
temperature=0.3,

View File

@@ -78,9 +78,15 @@ class PromptChainOrchestrator:
self.comprehensive_user_processor = ComprehensiveUserDataProcessor()
# Inject database service if available
if hasattr(self.comprehensive_user_processor, 'content_planning_db_service') and db_session:
self.comprehensive_user_processor.content_planning_db_service = db_session
logger.info("✅ Database service injected into comprehensive user processor")
if db_session:
try:
from services.content_planning_db import ContentPlanningDBService
db_service = ContentPlanningDBService(db_session)
self.comprehensive_user_processor.content_planning_db_service = db_service
logger.info("✅ Database service injected into comprehensive user processor")
except Exception as e:
logger.error(f"❌ Failed to inject database service: {e}")
self.comprehensive_user_processor.content_planning_db_service = None
# 12-step configuration
self.steps = self._initialize_steps()
@@ -92,21 +98,78 @@ class PromptChainOrchestrator:
"""Initialize all 12 steps of the prompt chain."""
steps = {}
# Create database service if available
db_service = None
if self.db_session:
try:
from services.content_planning_db import ContentPlanningDBService
db_service = ContentPlanningDBService(self.db_session)
logger.info("✅ Database service created for step injection")
except Exception as e:
logger.error(f"❌ Failed to create database service for steps: {e}")
# Phase 1: Foundation (Steps 1-3) - REAL IMPLEMENTATIONS
steps["step_01"] = ContentStrategyAnalysisStep()
steps["step_02"] = GapAnalysisStep()
steps["step_03"] = AudiencePlatformStrategyStep()
# Inject database service into Phase 1 steps
if db_service:
# Step 1: Content Strategy Analysis
if hasattr(steps["step_01"], 'strategy_processor'):
steps["step_01"].strategy_processor.content_planning_db_service = db_service
logger.info("✅ Database service injected into Step 1 strategy processor")
# Step 2: Gap Analysis
if hasattr(steps["step_02"], 'gap_processor'):
steps["step_02"].gap_processor.content_planning_db_service = db_service
logger.info("✅ Database service injected into Step 2 gap processor")
# Step 3: Audience Platform Strategy
if hasattr(steps["step_03"], 'comprehensive_processor'):
steps["step_03"].comprehensive_processor.content_planning_db_service = db_service
logger.info("✅ Database service injected into Step 3 comprehensive processor")
# Phase 2: Structure (Steps 4-6) - REAL IMPLEMENTATIONS
steps["step_04"] = CalendarFrameworkStep()
steps["step_05"] = ContentPillarDistributionStep()
steps["step_06"] = PlatformSpecificStrategyStep()
# Inject database service into Phase 2 steps
if db_service:
# Step 4: Calendar Framework
if hasattr(steps["step_04"], 'comprehensive_user_processor'):
steps["step_04"].comprehensive_user_processor.content_planning_db_service = db_service
logger.info("✅ Database service injected into Step 4 comprehensive processor")
# Step 5: Content Pillar Distribution
if hasattr(steps["step_05"], 'comprehensive_user_processor'):
steps["step_05"].comprehensive_user_processor.content_planning_db_service = db_service
logger.info("✅ Database service injected into Step 5 comprehensive processor")
# Step 6: Platform Specific Strategy
if hasattr(steps["step_06"], 'comprehensive_user_processor'):
steps["step_06"].comprehensive_user_processor.content_planning_db_service = db_service
logger.info("✅ Database service injected into Step 6 comprehensive processor")
# Phase 3: Content (Steps 7-9) - REAL IMPLEMENTATIONS
steps["step_07"] = WeeklyThemeDevelopmentStep()
steps["step_08"] = DailyContentPlanningStep()
steps["step_09"] = ContentRecommendationsStep()
# Inject database service into Phase 3 steps
if db_service:
# Step 7: Weekly Theme Development
if hasattr(steps["step_07"], 'comprehensive_user_processor'):
steps["step_07"].comprehensive_user_processor.content_planning_db_service = db_service
logger.info("✅ Database service injected into Step 7 comprehensive processor")
if hasattr(steps["step_07"], 'strategy_processor'):
steps["step_07"].strategy_processor.content_planning_db_service = db_service
logger.info("✅ Database service injected into Step 7 strategy processor")
if hasattr(steps["step_07"], 'gap_analysis_processor'):
steps["step_07"].gap_analysis_processor.content_planning_db_service = db_service
logger.info("✅ Database service injected into Step 7 gap analysis processor")
# Phase 4: Optimization (Steps 10-12) - REAL IMPLEMENTATIONS
steps["step_10"] = PerformanceOptimizationStep()
steps["step_11"] = StrategyAlignmentValidationStep()

View File

@@ -169,7 +169,7 @@ def gemini_text_response(prompt, temperature, top_p, n, max_tokens, system_promp
# FIXME: Expose model_name in main_config
try:
response = client.models.generate_content(
model='gemini-2.5-pro',
model='gemini-2.0-flash-lite',
contents=prompt,
config=types.GenerateContentConfig(
system_instruction=system_prompt,

View File

@@ -0,0 +1,359 @@
"""
Monitoring Data Service
Handles saving and retrieving monitoring data from database and cache.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime
from sqlalchemy.orm import Session
from sqlalchemy import and_, desc
from models.monitoring_models import (
StrategyMonitoringPlan, MonitoringTask, TaskExecutionLog,
StrategyPerformanceMetrics, StrategyActivationStatus
)
from models.enhanced_strategy_models import EnhancedContentStrategy
logger = logging.getLogger(__name__)
class MonitoringDataService:
"""Service for managing monitoring data in database and cache."""
def __init__(self, db_session: Session):
self.db = db_session
async def save_monitoring_data(self, strategy_id: int, monitoring_plan: Dict[str, Any]) -> bool:
"""Save monitoring plan and tasks to database."""
try:
logger.info(f"Saving monitoring data for strategy {strategy_id}")
logger.info(f"Monitoring plan received: {monitoring_plan}")
# Save the complete monitoring plan
monitoring_plan_record = StrategyMonitoringPlan(
strategy_id=strategy_id,
plan_data=monitoring_plan
)
self.db.add(monitoring_plan_record)
# Save individual monitoring tasks
monitoring_tasks = monitoring_plan.get('monitoringTasks', [])
logger.info(f"Found {len(monitoring_tasks)} monitoring tasks to save")
for i, task_data in enumerate(monitoring_tasks):
logger.info(f"Saving task {i+1}: {task_data.get('title', 'Unknown')}")
task = MonitoringTask(
strategy_id=strategy_id,
component_name=task_data.get('component', ''),
task_title=task_data.get('title', ''),
task_description=task_data.get('description', ''),
assignee=task_data.get('assignee', 'ALwrity'),
frequency=task_data.get('frequency', 'Monthly'),
metric=task_data.get('metric', ''),
measurement_method=task_data.get('measurementMethod', ''),
success_criteria=task_data.get('successCriteria', ''),
alert_threshold=task_data.get('alertThreshold', ''),
status='active'
)
self.db.add(task)
# Save activation status
activation_status = StrategyActivationStatus(
strategy_id=strategy_id,
user_id=1, # Default user ID
activation_date=datetime.utcnow(),
status='active'
)
self.db.add(activation_status)
# Save initial performance metrics
performance_metrics = StrategyPerformanceMetrics(
strategy_id=strategy_id,
user_id=1, # Default user ID
metric_date=datetime.utcnow(),
data_source='monitoring_plan',
confidence_score=85 # High confidence for monitoring plan data
)
self.db.add(performance_metrics)
self.db.commit()
logger.info(f"Successfully saved monitoring data for strategy {strategy_id}")
return True
except Exception as e:
logger.error(f"Error saving monitoring data for strategy {strategy_id}: {e}")
self.db.rollback()
return False
async def get_monitoring_data(self, strategy_id: int) -> Optional[Dict[str, Any]]:
"""Get monitoring data from database."""
try:
logger.info(f"Retrieving monitoring data for strategy {strategy_id}")
# Get the monitoring plan
monitoring_plan = self.db.query(StrategyMonitoringPlan).filter(
StrategyMonitoringPlan.strategy_id == strategy_id
).order_by(desc(StrategyMonitoringPlan.created_at)).first()
if not monitoring_plan:
logger.warning(f"No monitoring plan found for strategy {strategy_id}")
return None
# Get monitoring tasks
tasks = self.db.query(MonitoringTask).filter(
MonitoringTask.strategy_id == strategy_id
).all()
# Get activation status
activation_status = self.db.query(StrategyActivationStatus).filter(
StrategyActivationStatus.strategy_id == strategy_id
).first()
# Get performance metrics
performance_metrics = self.db.query(StrategyPerformanceMetrics).filter(
StrategyPerformanceMetrics.strategy_id == strategy_id
).order_by(desc(StrategyPerformanceMetrics.metric_date)).first()
# Build comprehensive monitoring data
monitoring_data = {
'strategy_id': strategy_id,
'monitoring_plan': monitoring_plan.plan_data,
'monitoring_tasks': [
{
'id': task.id,
'component': task.component_name,
'title': task.task_title,
'description': task.task_description,
'assignee': task.assignee,
'frequency': task.frequency,
'metric': task.metric,
'measurementMethod': task.measurement_method,
'successCriteria': task.success_criteria,
'alertThreshold': task.alert_threshold,
'status': task.status,
'last_executed': task.last_executed.isoformat() if task.last_executed else None,
'next_execution': task.next_execution.isoformat() if task.next_execution else None
}
for task in tasks
],
'activation_status': {
'activation_date': activation_status.activation_date.isoformat() if activation_status else None,
'status': activation_status.status if activation_status else 'unknown',
'performance_score': activation_status.performance_score if activation_status else None
},
'performance_metrics': {
'traffic_growth': performance_metrics.traffic_growth_percentage if performance_metrics else None,
'engagement_rate': performance_metrics.engagement_rate_percentage if performance_metrics else None,
'conversion_rate': performance_metrics.conversion_rate_percentage if performance_metrics else None,
'roi_ratio': performance_metrics.roi_ratio if performance_metrics else None,
'content_quality_score': performance_metrics.content_quality_score if performance_metrics else None,
'data_source': performance_metrics.data_source if performance_metrics else None,
'confidence_score': performance_metrics.confidence_score if performance_metrics else None
},
'created_at': monitoring_plan.created_at.isoformat(),
'updated_at': monitoring_plan.updated_at.isoformat()
}
logger.info(f"Successfully retrieved monitoring data for strategy {strategy_id}")
return monitoring_data
except Exception as e:
logger.error(f"Error retrieving monitoring data for strategy {strategy_id}: {e}")
return None
async def get_analytics_data(self, strategy_id: int) -> Dict[str, Any]:
"""Get analytics data from monitoring data (no external API calls)."""
try:
logger.info(f"Generating analytics data for strategy {strategy_id}")
# Get monitoring data from database
monitoring_data = await self.get_monitoring_data(strategy_id)
if not monitoring_data:
logger.warning(f"No monitoring data found for strategy {strategy_id}")
return self._get_empty_analytics_data()
# Extract analytics from monitoring data
monitoring_plan = monitoring_data['monitoring_plan']
tasks = monitoring_data['monitoring_tasks']
performance_metrics = monitoring_data['performance_metrics']
# Always use monitoring tasks from the plan for rich data, fallback to database tasks
monitoring_tasks = []
if monitoring_plan.get('monitoringTasks'):
# Use rich data from monitoring plan
monitoring_tasks = [
{
'id': i + 1,
'component': task.get('component', ''),
'title': task.get('title', ''),
'description': task.get('description', ''),
'assignee': task.get('assignee', 'ALwrity'),
'frequency': task.get('frequency', 'Monthly'),
'metric': task.get('metric', ''),
'measurementMethod': task.get('measurementMethod', ''),
'successCriteria': task.get('successCriteria', ''),
'alertThreshold': task.get('alertThreshold', ''),
'actionableInsights': task.get('actionableInsights', ''),
'status': 'active',
'last_executed': None,
'next_execution': None
}
for i, task in enumerate(monitoring_plan.get('monitoringTasks', []))
]
elif tasks:
# Fallback to database tasks if plan doesn't have them
monitoring_tasks = [
{
'id': task.id,
'component': task.component_name,
'title': task.task_title,
'description': task.task_description,
'assignee': task.assignee,
'frequency': task.frequency,
'metric': task.metric,
'measurementMethod': task.measurement_method,
'successCriteria': task.success_criteria,
'alertThreshold': task.alert_threshold,
'actionableInsights': '',
'status': task.status,
'last_executed': task.last_executed.isoformat() if task.last_executed else None,
'next_execution': task.next_execution.isoformat() if task.next_execution else None
}
for task in tasks
]
# Always use performance metrics from success metrics for rich data
extracted_metrics = {}
if monitoring_plan.get('successMetrics'):
success_metrics = monitoring_plan['successMetrics']
extracted_metrics = {
'traffic_growth': self._extract_percentage(success_metrics.get('trafficGrowth', {}).get('current', '0%')),
'engagement_rate': self._extract_percentage(success_metrics.get('engagementRate', {}).get('current', '0%')),
'conversion_rate': self._extract_percentage(success_metrics.get('conversionRate', {}).get('current', '0%')),
'roi_ratio': self._extract_ratio(success_metrics.get('roi', {}).get('current', '0:1')),
'content_quality_score': self._extract_percentage(success_metrics.get('contentQuality', {}).get('current', '0%')),
'data_source': 'monitoring_plan',
'confidence_score': 85
}
else:
# Fallback to database metrics if plan doesn't have them
extracted_metrics = {
'traffic_growth': performance_metrics.get('traffic_growth', 0),
'engagement_rate': performance_metrics.get('engagement_rate', 0),
'conversion_rate': performance_metrics.get('conversion_rate', 0),
'roi_ratio': performance_metrics.get('roi_ratio', 0),
'content_quality_score': performance_metrics.get('content_quality_score', 0),
'data_source': performance_metrics.get('data_source', 'database'),
'confidence_score': performance_metrics.get('confidence_score', 70)
}
# Build analytics data from monitoring plan
analytics_data = {
'performance_trends': {
'traffic_growth': extracted_metrics.get('traffic_growth', 0),
'engagement_rate': extracted_metrics.get('engagement_rate', 0),
'conversion_rate': extracted_metrics.get('conversion_rate', 0),
'roi_ratio': extracted_metrics.get('roi_ratio', 0),
'content_quality_score': extracted_metrics.get('content_quality_score', 0)
},
'content_evolution': {
'content_pillars': monitoring_plan.get('contentPillars', []),
'content_mix': monitoring_plan.get('contentMix', {}),
'publishing_frequency': monitoring_plan.get('publishingFrequency', ''),
'quality_metrics': monitoring_plan.get('qualityMetrics', [])
},
'engagement_patterns': {
'audience_segments': monitoring_plan.get('audienceSegments', []),
'engagement_metrics': monitoring_plan.get('engagementMetrics', {}),
'optimal_timing': monitoring_plan.get('optimalTiming', {}),
'platform_performance': monitoring_plan.get('platformPerformance', {})
},
'recommendations': monitoring_plan.get('recommendations', []),
'insights': monitoring_plan.get('insights', []),
'monitoring_data': monitoring_data,
'monitoring_tasks': monitoring_tasks,
'monitoring_plan': monitoring_plan, # Include full monitoring plan for rich data
'success_metrics': monitoring_plan.get('successMetrics', {}), # Include success metrics
'monitoring_schedule': monitoring_plan.get('monitoringSchedule', {}), # Include monitoring schedule
'_source': 'database_monitoring',
'data_freshness': monitoring_data['updated_at'],
'confidence_score': extracted_metrics.get('confidence_score', 85)
}
logger.info(f"Successfully generated analytics data for strategy {strategy_id}")
return analytics_data
except Exception as e:
logger.error(f"Error generating analytics data for strategy {strategy_id}: {e}")
return self._get_empty_analytics_data()
def _get_empty_analytics_data(self) -> Dict[str, Any]:
"""Return empty analytics data structure."""
return {
'performance_trends': {},
'content_evolution': {},
'engagement_patterns': {},
'recommendations': [],
'insights': [],
'monitoring_data': None,
'monitoring_tasks': [],
'_source': 'empty',
'data_freshness': datetime.utcnow().isoformat(),
'confidence_score': 0
}
def _extract_percentage(self, value: str) -> float:
"""Extract percentage value from string like '15%'."""
try:
if isinstance(value, str) and '%' in value:
return float(value.replace('%', ''))
elif isinstance(value, (int, float)):
return float(value)
else:
return 0.0
except (ValueError, TypeError):
return 0.0
def _extract_ratio(self, value: str) -> float:
"""Extract ratio value from string like '3:1'."""
try:
if isinstance(value, str) and ':' in value:
parts = value.split(':')
if len(parts) == 2:
return float(parts[0]) / float(parts[1])
elif isinstance(value, (int, float)):
return float(value)
else:
return 0.0
except (ValueError, TypeError):
return 0.0
async def update_performance_metrics(self, strategy_id: int, metrics: Dict[str, Any]) -> bool:
"""Update performance metrics for a strategy."""
try:
logger.info(f"Updating performance metrics for strategy {strategy_id}")
performance_metrics = StrategyPerformanceMetrics(
strategy_id=strategy_id,
user_id=1, # Default user ID
metric_date=datetime.utcnow(),
traffic_growth_percentage=metrics.get('traffic_growth'),
engagement_rate_percentage=metrics.get('engagement_rate'),
conversion_rate_percentage=metrics.get('conversion_rate'),
roi_ratio=metrics.get('roi_ratio'),
content_quality_score=metrics.get('content_quality_score'),
data_source='manual_update',
confidence_score=metrics.get('confidence_score', 70)
)
self.db.add(performance_metrics)
self.db.commit()
logger.info(f"Successfully updated performance metrics for strategy {strategy_id}")
return True
except Exception as e:
logger.error(f"Error updating performance metrics for strategy {strategy_id}: {e}")
self.db.rollback()
return False

View File

@@ -0,0 +1,389 @@
from typing import Dict, Any, List, Optional
from sqlalchemy.orm import Session
from loguru import logger
from services.onboarding_data_service import OnboardingDataService
from services.user_data_service import UserDataService
from services.llm_providers.gemini_provider import gemini_text_response, gemini_structured_json_response
class StrategyCopilotService:
"""Service for CopilotKit strategy assistance using Gemini."""
def __init__(self, db: Session):
self.db = db
self.onboarding_service = OnboardingDataService()
self.user_data_service = UserDataService(db)
async def generate_category_data(
self,
category: str,
user_description: str,
current_form_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Generate data for a specific category."""
try:
# Get user onboarding data
user_id = 1 # TODO: Get from auth context
onboarding_data = self.onboarding_service.get_personalized_ai_inputs(user_id)
# Build prompt for category generation
prompt = self._build_category_generation_prompt(
category, user_description, current_form_data, onboarding_data
)
# Generate response using Gemini
response = gemini_text_response(
prompt=prompt,
temperature=0.3,
top_p=0.9,
n=1,
max_tokens=2048,
system_prompt="You are ALwrity's Strategy Assistant. Generate appropriate values for strategy fields."
)
# Parse and validate response
generated_data = self._parse_category_response(response, category)
return generated_data
except Exception as e:
logger.error(f"Error generating category data: {str(e)}")
raise
async def validate_field(self, field_id: str, value: Any) -> Dict[str, Any]:
"""Validate a specific strategy field."""
try:
# Get field definition
field_definition = self._get_field_definition(field_id)
# Build validation prompt
prompt = self._build_validation_prompt(field_definition, value)
# Generate validation response using Gemini
response = gemini_text_response(
prompt=prompt,
temperature=0.2,
top_p=0.9,
n=1,
max_tokens=1024,
system_prompt="You are ALwrity's Strategy Assistant. Validate field values and provide suggestions."
)
# Parse validation result
validation_result = self._parse_validation_response(response)
return validation_result
except Exception as e:
logger.error(f"Error validating field {field_id}: {str(e)}")
raise
async def analyze_strategy(self, form_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze complete strategy for completeness and coherence."""
try:
# Get user data for context
user_id = 1 # TODO: Get from auth context
onboarding_data = self.onboarding_service.get_personalized_ai_inputs(user_id)
# Build analysis prompt
prompt = self._build_analysis_prompt(form_data, onboarding_data)
# Generate analysis using Gemini
response = gemini_text_response(
prompt=prompt,
temperature=0.3,
top_p=0.9,
n=1,
max_tokens=2048,
system_prompt="You are ALwrity's Strategy Assistant. Analyze strategies for completeness and coherence."
)
# Parse analysis result
analysis_result = self._parse_analysis_response(response)
return analysis_result
except Exception as e:
logger.error(f"Error analyzing strategy: {str(e)}")
raise
async def generate_field_suggestions(
self,
field_id: str,
current_form_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Generate suggestions for a specific field."""
try:
# Get field definition
field_definition = self._get_field_definition(field_id)
# Get user data
user_id = 1 # TODO: Get from auth context
onboarding_data = self.onboarding_service.get_personalized_ai_inputs(user_id)
# Build suggestions prompt
prompt = self._build_suggestions_prompt(
field_definition, current_form_data, onboarding_data
)
# Generate suggestions using Gemini
response = gemini_text_response(
prompt=prompt,
temperature=0.4,
top_p=0.9,
n=1,
max_tokens=1024,
system_prompt="You are ALwrity's Strategy Assistant. Generate helpful suggestions for strategy fields."
)
# Parse suggestions
suggestions = self._parse_suggestions_response(response)
return suggestions
except Exception as e:
logger.error(f"Error generating suggestions for {field_id}: {str(e)}")
raise
def _build_category_generation_prompt(
self,
category: str,
user_description: str,
current_form_data: Dict[str, Any],
onboarding_data: Dict[str, Any]
) -> str:
"""Build prompt for category data generation."""
return f"""
You are ALwrity's Strategy Assistant. Generate data for the {category} category based on the user's description.
User Description: {user_description}
Current Form Data: {current_form_data}
Onboarding Data: {onboarding_data}
Category Fields: {self._get_category_fields(category)}
Generate appropriate values for all fields in the {category} category. Return only valid JSON with field IDs as keys.
Example response format:
{{
"field_id": "value",
"another_field": "value"
}}
"""
def _build_validation_prompt(self, field_definition: Dict[str, Any], value: Any) -> str:
"""Build prompt for field validation."""
return f"""
Validate the following field value:
Field: {field_definition['label']}
Description: {field_definition['description']}
Required: {field_definition['required']}
Type: {field_definition['type']}
Value: {value}
Return JSON with: {{"isValid": boolean, "suggestion": string, "confidence": number}}
Example response:
{{
"isValid": true,
"suggestion": "This looks good!",
"confidence": 0.95
}}
"""
def _build_analysis_prompt(
self,
form_data: Dict[str, Any],
onboarding_data: Dict[str, Any]
) -> str:
"""Build prompt for strategy analysis."""
return f"""
Analyze the following content strategy for completeness, coherence, and alignment:
Form Data: {form_data}
Onboarding Data: {onboarding_data}
Return JSON with: {{
"completeness": number,
"coherence": number,
"alignment": number,
"suggestions": [string],
"missingFields": [string],
"improvements": [string]
}}
Example response:
{{
"completeness": 85,
"coherence": 90,
"alignment": 88,
"suggestions": ["Consider adding more specific metrics"],
"missingFields": ["content_budget"],
"improvements": ["Add timeline details"]
}}
"""
def _build_suggestions_prompt(
self,
field_definition: Dict[str, Any],
current_form_data: Dict[str, Any],
onboarding_data: Dict[str, Any]
) -> str:
"""Build prompt for field suggestions."""
return f"""
Generate suggestions for the following field:
Field: {field_definition['label']}
Description: {field_definition['description']}
Required: {field_definition['required']}
Type: {field_definition['type']}
Current Form Data: {current_form_data}
Onboarding Data: {onboarding_data}
Return JSON with: {{
"suggestions": [string],
"reasoning": string,
"confidence": number
}}
Example response:
{{
"suggestions": ["Focus on measurable outcomes", "Align with business goals"],
"reasoning": "Based on your business context, measurable outcomes will be most effective",
"confidence": 0.92
}}
"""
def _get_field_definition(self, field_id: str) -> Dict[str, Any]:
"""Get field definition from STRATEGIC_INPUT_FIELDS."""
# This would be imported from the frontend field definitions
# For now, return a basic structure
return {
"id": field_id,
"label": field_id.replace("_", " ").title(),
"description": f"Description for {field_id}",
"required": True,
"type": "text"
}
def _get_category_fields(self, category: str) -> List[str]:
"""Get fields for a specific category."""
# This would be imported from the frontend field definitions
category_fields = {
"business_context": [
"business_objectives", "target_metrics", "content_budget", "team_size",
"implementation_timeline", "market_share", "competitive_position", "performance_metrics"
],
"audience_intelligence": [
"content_preferences", "consumption_patterns", "audience_pain_points",
"buying_journey", "seasonal_trends", "engagement_metrics"
],
"competitive_intelligence": [
"top_competitors", "competitor_content_strategies", "market_gaps",
"industry_trends", "emerging_trends"
],
"content_strategy": [
"preferred_formats", "content_mix", "content_frequency", "optimal_timing",
"quality_metrics", "editorial_guidelines", "brand_voice"
],
"performance_analytics": [
"traffic_sources", "conversion_rates", "content_roi_targets", "ab_testing_capabilities"
]
}
return category_fields.get(category, [])
def _parse_category_response(self, response: str, category: str) -> Dict[str, Any]:
"""Parse LLM response for category data."""
try:
import json
# Clean up the response to extract JSON
response = response.strip()
if response.startswith("```json"):
response = response[7:]
if response.endswith("```"):
response = response[:-3]
response = response.strip()
parsed_data = json.loads(response)
# Validate that we have actual data
if not isinstance(parsed_data, dict) or len(parsed_data) == 0:
raise Exception("Invalid or empty response data")
return parsed_data
except Exception as e:
logger.error(f"Error parsing category response: {str(e)}")
raise Exception(f"Failed to parse category response: {str(e)}")
def _parse_validation_response(self, response: str) -> Dict[str, Any]:
"""Parse LLM response for validation."""
try:
import json
# Clean up the response to extract JSON
response = response.strip()
if response.startswith("```json"):
response = response[7:]
if response.endswith("```"):
response = response[:-3]
response = response.strip()
parsed_data = json.loads(response)
# Validate required fields
if not isinstance(parsed_data, dict) or 'isValid' not in parsed_data:
raise Exception("Invalid validation response format")
return parsed_data
except Exception as e:
logger.error(f"Error parsing validation response: {str(e)}")
raise Exception(f"Failed to parse validation response: {str(e)}")
def _parse_analysis_response(self, response: str) -> Dict[str, Any]:
"""Parse LLM response for analysis."""
try:
import json
# Clean up the response to extract JSON
response = response.strip()
if response.startswith("```json"):
response = response[7:]
if response.endswith("```"):
response = response[:-3]
response = response.strip()
parsed_data = json.loads(response)
# Validate required fields
required_fields = ['completeness', 'coherence', 'alignment']
if not isinstance(parsed_data, dict) or not all(field in parsed_data for field in required_fields):
raise Exception("Invalid analysis response format")
return parsed_data
except Exception as e:
logger.error(f"Error parsing analysis response: {str(e)}")
raise Exception(f"Failed to parse analysis response: {str(e)}")
def _parse_suggestions_response(self, response: str) -> Dict[str, Any]:
"""Parse LLM response for suggestions."""
try:
import json
# Clean up the response to extract JSON
response = response.strip()
if response.startswith("```json"):
response = response[7:]
if response.endswith("```"):
response = response[:-3]
response = response.strip()
parsed_data = json.loads(response)
# Validate required fields
if not isinstance(parsed_data, dict) or 'suggestions' not in parsed_data:
raise Exception("Invalid suggestions response format")
return parsed_data
except Exception as e:
logger.error(f"Error parsing suggestions response: {str(e)}")
raise Exception(f"Failed to parse suggestions response: {str(e)}")