import json import logging from typing import Dict, Any, List from datetime import datetime from services.llm_providers.gemini_provider import gemini_structured_json_response from services.strategy_service import StrategyService logger = logging.getLogger(__name__) class MonitoringPlanGenerator: def __init__(self): self.strategy_service = StrategyService() async def generate_monitoring_plan(self, strategy_id: int) -> Dict[str, Any]: """Generate comprehensive monitoring plan for a strategy""" try: # Get strategy data strategy_data = await self.strategy_service.get_strategy_by_id(strategy_id) if not strategy_data: raise Exception(f"Strategy with ID {strategy_id} not found") # Prepare prompt context prompt_context = self._prepare_prompt_context(strategy_data) logger.debug( "MonitoringPlanGenerator: Prepared prompt context | strategy_id=%s | keys=%s", strategy_id, list(prompt_context.keys()) ) # Generate monitoring plan using AI monitoring_plan = await self._generate_plan_with_ai(prompt_context) # Validate the plan structure if not self._validate_monitoring_plan(monitoring_plan): raise Exception("Generated monitoring plan has invalid structure") # Validate and enhance the plan enhanced_plan = await self._enhance_monitoring_plan(monitoring_plan, strategy_data) # Save monitoring plan to database await self._save_monitoring_plan(strategy_id, enhanced_plan) logger.info(f"Successfully generated monitoring plan for strategy {strategy_id}") return enhanced_plan except Exception as e: logger.error(f"Error generating monitoring plan for strategy {strategy_id}: {e}") # Don't mark as success if there's an error raise Exception(f"Failed to generate monitoring plan: {str(e)}") def _prepare_prompt_context(self, strategy_data: Dict[str, Any]) -> Dict[str, Any]: """Prepare context for AI prompt""" # Extract strategy components strategic_insights = strategy_data.get('strategic_insights', {}) competitive_analysis = strategy_data.get('competitive_analysis', {}) performance_predictions = strategy_data.get('performance_predictions', {}) implementation_roadmap = strategy_data.get('implementation_roadmap', {}) risk_assessment = strategy_data.get('risk_assessment', {}) return { "strategy_name": strategy_data.get('name', 'Content Strategy'), "industry": strategy_data.get('industry', 'General'), "business_goals": strategy_data.get('business_goals', []), "content_pillars": strategy_data.get('content_pillars', []), "target_audience": strategy_data.get('target_audience', {}), "competitive_landscape": competitive_analysis.get('competitors', []), "strategic_insights": strategic_insights, "performance_predictions": performance_predictions, "implementation_roadmap": implementation_roadmap, "risk_assessment": risk_assessment } async def _generate_plan_with_ai(self, prompt_context: Dict[str, Any]) -> Dict[str, Any]: """Generate monitoring plan using AI""" prompt = self._build_monitoring_prompt(prompt_context) logger.debug( "MonitoringPlanGenerator: Built prompt | length=%s | preview=%s...", len(prompt), (prompt[:240].replace("\n", " ") if isinstance(prompt, str) else "") ) # Define schema for 8 tasks (2 per component) to avoid truncation monitoring_plan_schema = { "type": "object", "properties": { "monitoringTasks": { "type": "array", "items": { "type": "object", "properties": { "component": {"type": "string"}, "title": {"type": "string"}, "description": {"type": "string"}, "assignee": {"type": "string"}, "frequency": {"type": "string"}, "metric": {"type": "string"}, "measurementMethod": {"type": "string"}, "successCriteria": {"type": "string"}, "alertThreshold": {"type": "string"}, "actionableInsights": {"type": "string"} } } } } } logger.debug( "MonitoringPlanGenerator: Schema prepared | schema_type=%s", type(monitoring_plan_schema).__name__ ) try: # Structured response only (no fallback) logger.info("MonitoringPlanGenerator: Invoking Gemini structured JSON response") response = gemini_structured_json_response( prompt=prompt, schema=monitoring_plan_schema, temperature=0.1, max_tokens=8192 ) logger.debug( "MonitoringPlanGenerator: Received AI response | type=%s", type(response) ) # Handle response - gemini_structured_json_response returns dict directly if isinstance(response, dict): if "error" in response: logger.error("MonitoringPlanGenerator: Gemini returned error dict | error=%s", response.get("error")) raise Exception(f"Gemini error: {response.get('error')}") logger.debug( "MonitoringPlanGenerator: Parsed response dict keys=%s", list(response.keys()) ) monitoring_plan = response elif isinstance(response, str): # If it's a string, try to parse as JSON try: monitoring_plan = json.loads(response) except json.JSONDecodeError as e: logger.error("MonitoringPlanGenerator: Failed to parse AI response as JSON: %s", e) raise Exception(f"Invalid AI response format: {str(e)}") else: logger.error("MonitoringPlanGenerator: Unexpected response type from AI service: %s", type(response)) raise Exception(f"Unexpected response type from AI service: {type(response)}") logger.info( "MonitoringPlanGenerator: AI monitoring plan generated | has_tasks=%s", isinstance(monitoring_plan.get("monitoringTasks"), list) ) # Compute totals from the returned tasks monitoring_tasks = monitoring_plan.get("monitoringTasks", []) total_tasks = len(monitoring_tasks) alwrity_tasks = sum(1 for task in monitoring_tasks if task.get("assignee") == "ALwrity") human_tasks = sum(1 for task in monitoring_tasks if task.get("assignee") == "Human") # Add computed totals to the plan monitoring_plan["totalTasks"] = total_tasks monitoring_plan["alwrityTasks"] = alwrity_tasks monitoring_plan["humanTasks"] = human_tasks monitoring_plan["metricsCount"] = total_tasks logger.info( "MonitoringPlanGenerator: Computed totals | total=%s | alwrity=%s | human=%s", total_tasks, alwrity_tasks, human_tasks ) return monitoring_plan except Exception as e: logger.error(f"Error calling AI service: {e}") raise Exception(f"AI service error: {str(e)}") def _build_monitoring_prompt(self, context: Dict[str, Any]) -> str: """Build the AI prompt for monitoring plan generation""" return f"""Generate a monitoring plan for content strategy: {context['strategy_name']} in {context['industry']} industry. Create exactly 8 monitoring tasks (2 per component) across 5 strategy components: 1. Strategic Insights 2. Competitive Analysis 3. Performance Predictions 4. Implementation Roadmap 5. Risk Assessment Each task must include: component, title, description, assignee (ALwrity or Human), frequency (Daily, Weekly, Monthly, or Quarterly), metric, measurement method, success criteria, alert threshold, and actionable insights. Return a JSON object with monitoringTasks array containing 8 task objects.""" def _generate_default_plan(self, context: Dict[str, Any]) -> Dict[str, Any]: """Generate a default monitoring plan if AI fails""" return { "totalTasks": 15, "alwrityTasks": 10, "humanTasks": 5, "metricsCount": 15, "components": [ { "name": "Strategic Insights", "icon": "TrendingUpIcon", "tasks": [ { "title": "Monitor Market Positioning Effectiveness", "description": "Track how well the strategic positioning is performing in the market", "assignee": "ALwrity", "frequency": "Weekly", "metric": "Market Position Score", "measurementMethod": "Competitive analysis and brand mention tracking", "successCriteria": "Maintain top 3 market position", "alertThreshold": "Drop below top 5 position" }, { "title": "Track Strategic Goal Achievement", "description": "Monitor progress toward defined business objectives", "assignee": "Human", "frequency": "Monthly", "metric": "Goal Achievement Rate", "measurementMethod": "KPI tracking and business metrics analysis", "successCriteria": "Achieve 80% of strategic goals", "alertThreshold": "Drop below 60% achievement" }, { "title": "Analyze Strategic Insights Performance", "description": "Evaluate the effectiveness of strategic insights and recommendations", "assignee": "ALwrity", "frequency": "Weekly", "metric": "Insight Effectiveness Score", "measurementMethod": "Performance data analysis and trend identification", "successCriteria": "Maintain 85%+ effectiveness score", "alertThreshold": "Drop below 70% effectiveness" } ] }, { "name": "Competitive Analysis", "icon": "EmojiEventsIcon", "tasks": [ { "title": "Monitor Competitor Activities", "description": "Track competitor content strategies and market activities", "assignee": "ALwrity", "frequency": "Daily", "metric": "Competitor Activity Score", "measurementMethod": "Automated competitor monitoring and analysis", "successCriteria": "Stay ahead of competitor activities", "alertThreshold": "Competitor gains significant advantage" }, { "title": "Track Competitive Positioning", "description": "Monitor our competitive position in the market", "assignee": "ALwrity", "frequency": "Weekly", "metric": "Competitive Position Rank", "measurementMethod": "Market share and positioning analysis", "successCriteria": "Maintain top 3 competitive position", "alertThreshold": "Drop below top 5 position" }, { "title": "Validate Competitive Intelligence", "description": "Review and validate competitive analysis insights", "assignee": "Human", "frequency": "Monthly", "metric": "Intelligence Accuracy Score", "measurementMethod": "Manual review and validation", "successCriteria": "Maintain 90%+ accuracy", "alertThreshold": "Drop below 80% accuracy" } ] }, { "name": "Performance Predictions", "icon": "AssessmentIcon", "tasks": [ { "title": "Monitor Prediction Accuracy", "description": "Track the accuracy of performance predictions", "assignee": "ALwrity", "frequency": "Weekly", "metric": "Prediction Accuracy Rate", "measurementMethod": "Compare predictions with actual performance", "successCriteria": "Maintain 85%+ prediction accuracy", "alertThreshold": "Drop below 70% accuracy" }, { "title": "Update Prediction Models", "description": "Refine prediction models based on new data", "assignee": "ALwrity", "frequency": "Monthly", "metric": "Model Performance Score", "measurementMethod": "Model validation and performance testing", "successCriteria": "Improve model performance by 5%+", "alertThreshold": "Model performance degrades" }, { "title": "Review Prediction Insights", "description": "Analyze prediction insights and business implications", "assignee": "Human", "frequency": "Monthly", "metric": "Insight Actionability Score", "measurementMethod": "Manual review and business analysis", "successCriteria": "Generate actionable insights", "alertThreshold": "Insights become less actionable" } ] }, { "name": "Implementation Roadmap", "icon": "CheckCircleIcon", "tasks": [ { "title": "Track Implementation Progress", "description": "Monitor progress on implementation roadmap milestones", "assignee": "ALwrity", "frequency": "Weekly", "metric": "Implementation Progress Rate", "measurementMethod": "Milestone tracking and progress analysis", "successCriteria": "Achieve 90%+ of milestones on time", "alertThreshold": "Fall behind by more than 2 weeks" }, { "title": "Monitor Resource Utilization", "description": "Track resource allocation and utilization efficiency", "assignee": "ALwrity", "frequency": "Weekly", "metric": "Resource Efficiency Score", "measurementMethod": "Resource tracking and efficiency analysis", "successCriteria": "Maintain 85%+ resource efficiency", "alertThreshold": "Drop below 70% efficiency" }, { "title": "Review Implementation Effectiveness", "description": "Evaluate the effectiveness of implementation strategies", "assignee": "Human", "frequency": "Monthly", "metric": "Implementation Success Rate", "measurementMethod": "Manual review and effectiveness assessment", "successCriteria": "Achieve 80%+ implementation success", "alertThreshold": "Drop below 60% success rate" } ] }, { "name": "Risk Assessment", "icon": "StarIcon", "tasks": [ { "title": "Monitor Risk Indicators", "description": "Track identified risk factors and their status", "assignee": "ALwrity", "frequency": "Daily", "metric": "Risk Level Score", "measurementMethod": "Risk factor monitoring and analysis", "successCriteria": "Maintain low risk level (score < 30)", "alertThreshold": "Risk level increases above 50" }, { "title": "Track Risk Mitigation Effectiveness", "description": "Monitor the effectiveness of risk mitigation strategies", "assignee": "ALwrity", "frequency": "Weekly", "metric": "Mitigation Effectiveness Rate", "measurementMethod": "Risk reduction tracking and analysis", "successCriteria": "Achieve 80%+ risk mitigation success", "alertThreshold": "Drop below 60% mitigation success" }, { "title": "Review Risk Management Decisions", "description": "Evaluate risk management decisions and their outcomes", "assignee": "Human", "frequency": "Monthly", "metric": "Risk Management Score", "measurementMethod": "Manual review and decision analysis", "successCriteria": "Maintain 85%+ risk management effectiveness", "alertThreshold": "Drop below 70% effectiveness" } ] } ] } async def _enhance_monitoring_plan(self, plan: Dict[str, Any], strategy_data: Dict[str, Any]) -> Dict[str, Any]: """Enhance AI-generated plan with additional context and validation""" enhanced_plan = plan.copy() # Add monitoring schedule enhanced_plan["monitoringSchedule"] = { "dailyChecks": ["Performance metrics", "Alert monitoring", "Risk indicators"], "weeklyReviews": ["Trend analysis", "Competitive updates", "Implementation progress"], "monthlyAssessments": ["Strategy effectiveness", "Goal progress", "Risk management"], "quarterlyPlanning": ["Strategy optimization", "Goal refinement", "Resource allocation"] } # Add success metrics enhanced_plan["successMetrics"] = { "trafficGrowth": {"target": "25%+", "current": "0%"}, "engagementRate": {"target": "15%+", "current": "0%"}, "conversionRate": {"target": "10%+", "current": "0%"}, "roi": {"target": "3:1+", "current": "0:1"}, "strategyAdoption": {"target": "90%+", "current": "0%"}, "contentQuality": {"target": "85%+", "current": "0%"}, "competitivePosition": {"target": "Top 3", "current": "Unknown"}, "audienceGrowth": {"target": "20%+", "current": "0%"} } # Add metadata enhanced_plan["metadata"] = { "generatedAt": datetime.now().isoformat(), "strategyId": strategy_data.get('id'), "strategyName": strategy_data.get('name'), "version": "1.0" } return enhanced_plan async def _save_monitoring_plan(self, strategy_id: int, plan: Dict[str, Any]): """Save monitoring plan to database""" try: # Use the strategy service to save the monitoring plan success = await self.strategy_service.save_monitoring_plan(strategy_id, plan) if success: logger.info(f"Monitoring plan saved to database for strategy {strategy_id}") else: logger.warning(f"Failed to save monitoring plan to database for strategy {strategy_id}") except Exception as e: logger.error(f"Error saving monitoring plan: {e}") # Don't raise the error as the plan generation was successful def _validate_monitoring_plan(self, plan: Dict[str, Any]) -> bool: """Validate the structure of the generated monitoring plan""" try: # Check that monitoringTasks is a list and has content monitoring_tasks = plan.get("monitoringTasks", []) if not isinstance(monitoring_tasks, list): logger.error("monitoringTasks must be a list") return False if len(monitoring_tasks) == 0: logger.error("No monitoring tasks generated") return False # Validate we have the expected number of tasks (8) if len(monitoring_tasks) != 8: logger.warning(f"Expected 8 tasks, got {len(monitoring_tasks)}") # Validate each task structure required_task_fields = [ "component", "title", "description", "assignee", "frequency", "metric", "measurementMethod", "successCriteria", "alertThreshold", "actionableInsights" ] for i, task in enumerate(monitoring_tasks): for field in required_task_fields: if field not in task: logger.error(f"Task {i} missing required field: {field}") return False # Validate assignee is either "ALwrity" or "Human" if task.get("assignee") not in ["ALwrity", "Human"]: logger.error(f"Task {i} has invalid assignee: {task.get('assignee')}") return False # Validate computed totals are present (added after AI response) computed_fields = ["totalTasks", "alwrityTasks", "humanTasks", "metricsCount"] for field in computed_fields: if field not in plan: logger.error(f"Missing computed field in monitoring plan: {field}") return False return True except Exception as e: logger.error(f"Error validating monitoring plan: {e}") return False