484 lines
24 KiB
Python
484 lines
24 KiB
Python
import json
|
|
import logging
|
|
from typing import Dict, Any, List
|
|
from datetime import datetime
|
|
|
|
from services.llm_providers.gemini_provider import gemini_structured_json_response
|
|
from services.strategy_service import StrategyService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MonitoringPlanGenerator:
|
|
def __init__(self):
|
|
self.strategy_service = StrategyService()
|
|
|
|
async def generate_monitoring_plan(self, strategy_id: int) -> Dict[str, Any]:
|
|
"""Generate comprehensive monitoring plan for a strategy"""
|
|
|
|
try:
|
|
# Get strategy data
|
|
strategy_data = await self.strategy_service.get_strategy_by_id(strategy_id)
|
|
|
|
if not strategy_data:
|
|
raise Exception(f"Strategy with ID {strategy_id} not found")
|
|
|
|
# Prepare prompt context
|
|
prompt_context = self._prepare_prompt_context(strategy_data)
|
|
logger.debug(
|
|
"MonitoringPlanGenerator: Prepared prompt context | strategy_id=%s | keys=%s",
|
|
strategy_id,
|
|
list(prompt_context.keys())
|
|
)
|
|
|
|
# Generate monitoring plan using AI
|
|
monitoring_plan = await self._generate_plan_with_ai(prompt_context)
|
|
|
|
# Validate the plan structure
|
|
if not self._validate_monitoring_plan(monitoring_plan):
|
|
raise Exception("Generated monitoring plan has invalid structure")
|
|
|
|
# Validate and enhance the plan
|
|
enhanced_plan = await self._enhance_monitoring_plan(monitoring_plan, strategy_data)
|
|
|
|
# Save monitoring plan to database
|
|
await self._save_monitoring_plan(strategy_id, enhanced_plan)
|
|
|
|
logger.info(f"Successfully generated monitoring plan for strategy {strategy_id}")
|
|
return enhanced_plan
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating monitoring plan for strategy {strategy_id}: {e}")
|
|
# Don't mark as success if there's an error
|
|
raise Exception(f"Failed to generate monitoring plan: {str(e)}")
|
|
|
|
def _prepare_prompt_context(self, strategy_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Prepare context for AI prompt"""
|
|
|
|
# Extract strategy components
|
|
strategic_insights = strategy_data.get('strategic_insights', {})
|
|
competitive_analysis = strategy_data.get('competitive_analysis', {})
|
|
performance_predictions = strategy_data.get('performance_predictions', {})
|
|
implementation_roadmap = strategy_data.get('implementation_roadmap', {})
|
|
risk_assessment = strategy_data.get('risk_assessment', {})
|
|
|
|
return {
|
|
"strategy_name": strategy_data.get('name', 'Content Strategy'),
|
|
"industry": strategy_data.get('industry', 'General'),
|
|
"business_goals": strategy_data.get('business_goals', []),
|
|
"content_pillars": strategy_data.get('content_pillars', []),
|
|
"target_audience": strategy_data.get('target_audience', {}),
|
|
"competitive_landscape": competitive_analysis.get('competitors', []),
|
|
"strategic_insights": strategic_insights,
|
|
"performance_predictions": performance_predictions,
|
|
"implementation_roadmap": implementation_roadmap,
|
|
"risk_assessment": risk_assessment
|
|
}
|
|
|
|
async def _generate_plan_with_ai(self, prompt_context: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Generate monitoring plan using AI"""
|
|
|
|
prompt = self._build_monitoring_prompt(prompt_context)
|
|
logger.debug(
|
|
"MonitoringPlanGenerator: Built prompt | length=%s | preview=%s...",
|
|
len(prompt),
|
|
(prompt[:240].replace("\n", " ") if isinstance(prompt, str) else "<non-str>")
|
|
)
|
|
|
|
# Define schema for 8 tasks (2 per component) to avoid truncation
|
|
monitoring_plan_schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"monitoringTasks": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"component": {"type": "string"},
|
|
"title": {"type": "string"},
|
|
"description": {"type": "string"},
|
|
"assignee": {"type": "string"},
|
|
"frequency": {"type": "string"},
|
|
"metric": {"type": "string"},
|
|
"measurementMethod": {"type": "string"},
|
|
"successCriteria": {"type": "string"},
|
|
"alertThreshold": {"type": "string"},
|
|
"actionableInsights": {"type": "string"}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
logger.debug(
|
|
"MonitoringPlanGenerator: Schema prepared | schema_type=%s",
|
|
type(monitoring_plan_schema).__name__
|
|
)
|
|
|
|
try:
|
|
# Structured response only (no fallback)
|
|
logger.info("MonitoringPlanGenerator: Invoking Gemini structured JSON response")
|
|
response = gemini_structured_json_response(
|
|
prompt=prompt,
|
|
schema=monitoring_plan_schema,
|
|
temperature=0.1,
|
|
max_tokens=8192
|
|
)
|
|
|
|
logger.debug(
|
|
"MonitoringPlanGenerator: Received AI response | type=%s",
|
|
type(response)
|
|
)
|
|
|
|
# Handle response - gemini_structured_json_response returns dict directly
|
|
if isinstance(response, dict):
|
|
if "error" in response:
|
|
logger.error("MonitoringPlanGenerator: Gemini returned error dict | error=%s", response.get("error"))
|
|
raise Exception(f"Gemini error: {response.get('error')}")
|
|
logger.debug(
|
|
"MonitoringPlanGenerator: Parsed response dict keys=%s",
|
|
list(response.keys())
|
|
)
|
|
monitoring_plan = response
|
|
elif isinstance(response, str):
|
|
# If it's a string, try to parse as JSON
|
|
try:
|
|
monitoring_plan = json.loads(response)
|
|
except json.JSONDecodeError as e:
|
|
logger.error("MonitoringPlanGenerator: Failed to parse AI response as JSON: %s", e)
|
|
raise Exception(f"Invalid AI response format: {str(e)}")
|
|
else:
|
|
logger.error("MonitoringPlanGenerator: Unexpected response type from AI service: %s", type(response))
|
|
raise Exception(f"Unexpected response type from AI service: {type(response)}")
|
|
|
|
logger.info(
|
|
"MonitoringPlanGenerator: AI monitoring plan generated | has_tasks=%s",
|
|
isinstance(monitoring_plan.get("monitoringTasks"), list)
|
|
)
|
|
|
|
# Compute totals from the returned tasks
|
|
monitoring_tasks = monitoring_plan.get("monitoringTasks", [])
|
|
total_tasks = len(monitoring_tasks)
|
|
alwrity_tasks = sum(1 for task in monitoring_tasks if task.get("assignee") == "ALwrity")
|
|
human_tasks = sum(1 for task in monitoring_tasks if task.get("assignee") == "Human")
|
|
|
|
# Add computed totals to the plan
|
|
monitoring_plan["totalTasks"] = total_tasks
|
|
monitoring_plan["alwrityTasks"] = alwrity_tasks
|
|
monitoring_plan["humanTasks"] = human_tasks
|
|
monitoring_plan["metricsCount"] = total_tasks
|
|
|
|
logger.info(
|
|
"MonitoringPlanGenerator: Computed totals | total=%s | alwrity=%s | human=%s",
|
|
total_tasks, alwrity_tasks, human_tasks
|
|
)
|
|
|
|
return monitoring_plan
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calling AI service: {e}")
|
|
raise Exception(f"AI service error: {str(e)}")
|
|
|
|
def _build_monitoring_prompt(self, context: Dict[str, Any]) -> str:
|
|
"""Build the AI prompt for monitoring plan generation"""
|
|
|
|
return f"""Generate a monitoring plan for content strategy: {context['strategy_name']} in {context['industry']} industry.
|
|
|
|
Create exactly 8 monitoring tasks (2 per component) across 5 strategy components:
|
|
1. Strategic Insights
|
|
2. Competitive Analysis
|
|
3. Performance Predictions
|
|
4. Implementation Roadmap
|
|
5. Risk Assessment
|
|
|
|
Each task must include: component, title, description, assignee (ALwrity or Human), frequency (Daily, Weekly, Monthly, or Quarterly), metric, measurement method, success criteria, alert threshold, and actionable insights.
|
|
|
|
Return a JSON object with monitoringTasks array containing 8 task objects."""
|
|
|
|
def _generate_default_plan(self, context: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Generate a default monitoring plan if AI fails"""
|
|
|
|
return {
|
|
"totalTasks": 15,
|
|
"alwrityTasks": 10,
|
|
"humanTasks": 5,
|
|
"metricsCount": 15,
|
|
"components": [
|
|
{
|
|
"name": "Strategic Insights",
|
|
"icon": "TrendingUpIcon",
|
|
"tasks": [
|
|
{
|
|
"title": "Monitor Market Positioning Effectiveness",
|
|
"description": "Track how well the strategic positioning is performing in the market",
|
|
"assignee": "ALwrity",
|
|
"frequency": "Weekly",
|
|
"metric": "Market Position Score",
|
|
"measurementMethod": "Competitive analysis and brand mention tracking",
|
|
"successCriteria": "Maintain top 3 market position",
|
|
"alertThreshold": "Drop below top 5 position"
|
|
},
|
|
{
|
|
"title": "Track Strategic Goal Achievement",
|
|
"description": "Monitor progress toward defined business objectives",
|
|
"assignee": "Human",
|
|
"frequency": "Monthly",
|
|
"metric": "Goal Achievement Rate",
|
|
"measurementMethod": "KPI tracking and business metrics analysis",
|
|
"successCriteria": "Achieve 80% of strategic goals",
|
|
"alertThreshold": "Drop below 60% achievement"
|
|
},
|
|
{
|
|
"title": "Analyze Strategic Insights Performance",
|
|
"description": "Evaluate the effectiveness of strategic insights and recommendations",
|
|
"assignee": "ALwrity",
|
|
"frequency": "Weekly",
|
|
"metric": "Insight Effectiveness Score",
|
|
"measurementMethod": "Performance data analysis and trend identification",
|
|
"successCriteria": "Maintain 85%+ effectiveness score",
|
|
"alertThreshold": "Drop below 70% effectiveness"
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"name": "Competitive Analysis",
|
|
"icon": "EmojiEventsIcon",
|
|
"tasks": [
|
|
{
|
|
"title": "Monitor Competitor Activities",
|
|
"description": "Track competitor content strategies and market activities",
|
|
"assignee": "ALwrity",
|
|
"frequency": "Daily",
|
|
"metric": "Competitor Activity Score",
|
|
"measurementMethod": "Automated competitor monitoring and analysis",
|
|
"successCriteria": "Stay ahead of competitor activities",
|
|
"alertThreshold": "Competitor gains significant advantage"
|
|
},
|
|
{
|
|
"title": "Track Competitive Positioning",
|
|
"description": "Monitor our competitive position in the market",
|
|
"assignee": "ALwrity",
|
|
"frequency": "Weekly",
|
|
"metric": "Competitive Position Rank",
|
|
"measurementMethod": "Market share and positioning analysis",
|
|
"successCriteria": "Maintain top 3 competitive position",
|
|
"alertThreshold": "Drop below top 5 position"
|
|
},
|
|
{
|
|
"title": "Validate Competitive Intelligence",
|
|
"description": "Review and validate competitive analysis insights",
|
|
"assignee": "Human",
|
|
"frequency": "Monthly",
|
|
"metric": "Intelligence Accuracy Score",
|
|
"measurementMethod": "Manual review and validation",
|
|
"successCriteria": "Maintain 90%+ accuracy",
|
|
"alertThreshold": "Drop below 80% accuracy"
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"name": "Performance Predictions",
|
|
"icon": "AssessmentIcon",
|
|
"tasks": [
|
|
{
|
|
"title": "Monitor Prediction Accuracy",
|
|
"description": "Track the accuracy of performance predictions",
|
|
"assignee": "ALwrity",
|
|
"frequency": "Weekly",
|
|
"metric": "Prediction Accuracy Rate",
|
|
"measurementMethod": "Compare predictions with actual performance",
|
|
"successCriteria": "Maintain 85%+ prediction accuracy",
|
|
"alertThreshold": "Drop below 70% accuracy"
|
|
},
|
|
{
|
|
"title": "Update Prediction Models",
|
|
"description": "Refine prediction models based on new data",
|
|
"assignee": "ALwrity",
|
|
"frequency": "Monthly",
|
|
"metric": "Model Performance Score",
|
|
"measurementMethod": "Model validation and performance testing",
|
|
"successCriteria": "Improve model performance by 5%+",
|
|
"alertThreshold": "Model performance degrades"
|
|
},
|
|
{
|
|
"title": "Review Prediction Insights",
|
|
"description": "Analyze prediction insights and business implications",
|
|
"assignee": "Human",
|
|
"frequency": "Monthly",
|
|
"metric": "Insight Actionability Score",
|
|
"measurementMethod": "Manual review and business analysis",
|
|
"successCriteria": "Generate actionable insights",
|
|
"alertThreshold": "Insights become less actionable"
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"name": "Implementation Roadmap",
|
|
"icon": "CheckCircleIcon",
|
|
"tasks": [
|
|
{
|
|
"title": "Track Implementation Progress",
|
|
"description": "Monitor progress on implementation roadmap milestones",
|
|
"assignee": "ALwrity",
|
|
"frequency": "Weekly",
|
|
"metric": "Implementation Progress Rate",
|
|
"measurementMethod": "Milestone tracking and progress analysis",
|
|
"successCriteria": "Achieve 90%+ of milestones on time",
|
|
"alertThreshold": "Fall behind by more than 2 weeks"
|
|
},
|
|
{
|
|
"title": "Monitor Resource Utilization",
|
|
"description": "Track resource allocation and utilization efficiency",
|
|
"assignee": "ALwrity",
|
|
"frequency": "Weekly",
|
|
"metric": "Resource Efficiency Score",
|
|
"measurementMethod": "Resource tracking and efficiency analysis",
|
|
"successCriteria": "Maintain 85%+ resource efficiency",
|
|
"alertThreshold": "Drop below 70% efficiency"
|
|
},
|
|
{
|
|
"title": "Review Implementation Effectiveness",
|
|
"description": "Evaluate the effectiveness of implementation strategies",
|
|
"assignee": "Human",
|
|
"frequency": "Monthly",
|
|
"metric": "Implementation Success Rate",
|
|
"measurementMethod": "Manual review and effectiveness assessment",
|
|
"successCriteria": "Achieve 80%+ implementation success",
|
|
"alertThreshold": "Drop below 60% success rate"
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"name": "Risk Assessment",
|
|
"icon": "StarIcon",
|
|
"tasks": [
|
|
{
|
|
"title": "Monitor Risk Indicators",
|
|
"description": "Track identified risk factors and their status",
|
|
"assignee": "ALwrity",
|
|
"frequency": "Daily",
|
|
"metric": "Risk Level Score",
|
|
"measurementMethod": "Risk factor monitoring and analysis",
|
|
"successCriteria": "Maintain low risk level (score < 30)",
|
|
"alertThreshold": "Risk level increases above 50"
|
|
},
|
|
{
|
|
"title": "Track Risk Mitigation Effectiveness",
|
|
"description": "Monitor the effectiveness of risk mitigation strategies",
|
|
"assignee": "ALwrity",
|
|
"frequency": "Weekly",
|
|
"metric": "Mitigation Effectiveness Rate",
|
|
"measurementMethod": "Risk reduction tracking and analysis",
|
|
"successCriteria": "Achieve 80%+ risk mitigation success",
|
|
"alertThreshold": "Drop below 60% mitigation success"
|
|
},
|
|
{
|
|
"title": "Review Risk Management Decisions",
|
|
"description": "Evaluate risk management decisions and their outcomes",
|
|
"assignee": "Human",
|
|
"frequency": "Monthly",
|
|
"metric": "Risk Management Score",
|
|
"measurementMethod": "Manual review and decision analysis",
|
|
"successCriteria": "Maintain 85%+ risk management effectiveness",
|
|
"alertThreshold": "Drop below 70% effectiveness"
|
|
}
|
|
]
|
|
}
|
|
]
|
|
}
|
|
|
|
async def _enhance_monitoring_plan(self, plan: Dict[str, Any], strategy_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Enhance AI-generated plan with additional context and validation"""
|
|
|
|
enhanced_plan = plan.copy()
|
|
|
|
# Add monitoring schedule
|
|
enhanced_plan["monitoringSchedule"] = {
|
|
"dailyChecks": ["Performance metrics", "Alert monitoring", "Risk indicators"],
|
|
"weeklyReviews": ["Trend analysis", "Competitive updates", "Implementation progress"],
|
|
"monthlyAssessments": ["Strategy effectiveness", "Goal progress", "Risk management"],
|
|
"quarterlyPlanning": ["Strategy optimization", "Goal refinement", "Resource allocation"]
|
|
}
|
|
|
|
# Add success metrics
|
|
enhanced_plan["successMetrics"] = {
|
|
"trafficGrowth": {"target": "25%+", "current": "0%"},
|
|
"engagementRate": {"target": "15%+", "current": "0%"},
|
|
"conversionRate": {"target": "10%+", "current": "0%"},
|
|
"roi": {"target": "3:1+", "current": "0:1"},
|
|
"strategyAdoption": {"target": "90%+", "current": "0%"},
|
|
"contentQuality": {"target": "85%+", "current": "0%"},
|
|
"competitivePosition": {"target": "Top 3", "current": "Unknown"},
|
|
"audienceGrowth": {"target": "20%+", "current": "0%"}
|
|
}
|
|
|
|
# Add metadata
|
|
enhanced_plan["metadata"] = {
|
|
"generatedAt": datetime.now().isoformat(),
|
|
"strategyId": strategy_data.get('id'),
|
|
"strategyName": strategy_data.get('name'),
|
|
"version": "1.0"
|
|
}
|
|
|
|
return enhanced_plan
|
|
|
|
async def _save_monitoring_plan(self, strategy_id: int, plan: Dict[str, Any]):
|
|
"""Save monitoring plan to database"""
|
|
try:
|
|
# Use the strategy service to save the monitoring plan
|
|
success = await self.strategy_service.save_monitoring_plan(strategy_id, plan)
|
|
|
|
if success:
|
|
logger.info(f"Monitoring plan saved to database for strategy {strategy_id}")
|
|
else:
|
|
logger.warning(f"Failed to save monitoring plan to database for strategy {strategy_id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving monitoring plan: {e}")
|
|
# Don't raise the error as the plan generation was successful
|
|
|
|
def _validate_monitoring_plan(self, plan: Dict[str, Any]) -> bool:
|
|
"""Validate the structure of the generated monitoring plan"""
|
|
try:
|
|
# Check that monitoringTasks is a list and has content
|
|
monitoring_tasks = plan.get("monitoringTasks", [])
|
|
if not isinstance(monitoring_tasks, list):
|
|
logger.error("monitoringTasks must be a list")
|
|
return False
|
|
|
|
if len(monitoring_tasks) == 0:
|
|
logger.error("No monitoring tasks generated")
|
|
return False
|
|
|
|
# Validate we have the expected number of tasks (8)
|
|
if len(monitoring_tasks) != 8:
|
|
logger.warning(f"Expected 8 tasks, got {len(monitoring_tasks)}")
|
|
|
|
# Validate each task structure
|
|
required_task_fields = [
|
|
"component", "title", "description", "assignee", "frequency",
|
|
"metric", "measurementMethod", "successCriteria", "alertThreshold", "actionableInsights"
|
|
]
|
|
|
|
for i, task in enumerate(monitoring_tasks):
|
|
for field in required_task_fields:
|
|
if field not in task:
|
|
logger.error(f"Task {i} missing required field: {field}")
|
|
return False
|
|
|
|
# Validate assignee is either "ALwrity" or "Human"
|
|
if task.get("assignee") not in ["ALwrity", "Human"]:
|
|
logger.error(f"Task {i} has invalid assignee: {task.get('assignee')}")
|
|
return False
|
|
|
|
# Validate computed totals are present (added after AI response)
|
|
computed_fields = ["totalTasks", "alwrityTasks", "humanTasks", "metricsCount"]
|
|
for field in computed_fields:
|
|
if field not in plan:
|
|
logger.error(f"Missing computed field in monitoring plan: {field}")
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating monitoring plan: {e}")
|
|
return False
|