Alwrity monitoring data service

This commit is contained in:
ajaysi
2025-08-28 11:11:55 +05:30
parent be88e931ea
commit f76381030b
14 changed files with 1000 additions and 320 deletions

View File

@@ -20,6 +20,9 @@ from .content_strategy.routes import router as content_strategy_router
# Import quality analysis routes
from ..quality_analysis_routes import router as quality_analysis_router
# Import monitoring routes
from ..monitoring_routes import router as monitoring_routes_router
# Create main router
router = APIRouter(prefix="/api/content-planning", tags=["content-planning"])
@@ -41,6 +44,9 @@ router.include_router(content_strategy_router)
# Include quality analysis routes
router.include_router(quality_analysis_router)
# Include monitoring routes
router.include_router(monitoring_routes_router)
# Add health check endpoint
@router.get("/health")
async def content_planning_health_check():

View File

@@ -1,4 +1,4 @@
from fastapi import APIRouter, HTTPException, Depends, Query
from fastapi import APIRouter, HTTPException, Depends, Query, Body
from typing import Dict, Any
import logging
from datetime import datetime, timedelta
@@ -8,6 +8,7 @@ import json
from services.monitoring_plan_generator import MonitoringPlanGenerator
from services.strategy_service import StrategyService
from services.monitoring_data_service import MonitoringDataService
from services.database import get_db
from models.monitoring_models import (
StrategyMonitoringPlan, MonitoringTask, TaskExecutionLog,
@@ -42,11 +43,13 @@ async def generate_monitoring_plan(strategy_id: int):
@router.post("/{strategy_id}/activate-with-monitoring")
async def activate_strategy_with_monitoring(
strategy_id: int,
monitoring_plan: Dict[str, Any]
monitoring_plan: Dict[str, Any] = Body(...),
db: Session = Depends(get_db)
):
"""Activate strategy with monitoring plan"""
try:
strategy_service = StrategyService()
monitoring_service = MonitoringDataService(db)
# Activate strategy
activation_success = await strategy_service.activate_strategy(strategy_id)
@@ -56,10 +59,10 @@ async def activate_strategy_with_monitoring(
detail=f"Failed to activate strategy {strategy_id}"
)
# Save monitoring plan
plan_success = await strategy_service.save_monitoring_plan(strategy_id, monitoring_plan)
if not plan_success:
logger.warning(f"Failed to save monitoring plan for strategy {strategy_id}")
# Save monitoring data to database
monitoring_success = await monitoring_service.save_monitoring_data(strategy_id, monitoring_plan)
if not monitoring_success:
logger.warning(f"Failed to save monitoring data for strategy {strategy_id}")
logger.info(f"Successfully activated strategy {strategy_id} with monitoring")
return {
@@ -77,16 +80,16 @@ async def activate_strategy_with_monitoring(
)
@router.get("/{strategy_id}/monitoring-plan")
async def get_monitoring_plan(strategy_id: int):
async def get_monitoring_plan(strategy_id: int, db: Session = Depends(get_db)):
"""Get monitoring plan for a strategy"""
try:
strategy_service = StrategyService()
monitoring_plan = await strategy_service.get_monitoring_plan(strategy_id)
monitoring_service = MonitoringDataService(db)
monitoring_data = await monitoring_service.get_monitoring_data(strategy_id)
if monitoring_plan:
if monitoring_data:
return {
"success": True,
"data": monitoring_plan
"data": monitoring_data
}
else:
raise HTTPException(
@@ -102,6 +105,25 @@ async def get_monitoring_plan(strategy_id: int):
detail=f"Failed to get monitoring plan: {str(e)}"
)
@router.get("/{strategy_id}/analytics-data")
async def get_analytics_data(strategy_id: int, db: Session = Depends(get_db)):
"""Get analytics data from monitoring data (no external API calls)"""
try:
monitoring_service = MonitoringDataService(db)
analytics_data = await monitoring_service.get_analytics_data(strategy_id)
return {
"success": True,
"data": analytics_data,
"message": "Analytics data retrieved from monitoring database"
}
except Exception as e:
logger.error(f"Error getting analytics data for strategy {strategy_id}: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to get analytics data: {str(e)}"
)
@router.get("/{strategy_id}/performance-history")
async def get_strategy_performance_history(strategy_id: int, days: int = 30):
"""Get performance history for a strategy"""
@@ -496,194 +518,105 @@ async def get_transparency_data(
StrategyPerformanceMetrics.strategy_id == strategy_id
).order_by(desc(StrategyPerformanceMetrics.created_at)).first()
# Build transparency data
# Build transparency data from actual monitoring tasks
transparency_data = []
# Traffic Growth Metric
traffic_growth_data = {
"metricName": "Traffic Growth",
"currentValue": 15.7, # This would come from actual analytics
"unit": "%",
"dataFreshness": {
"lastUpdated": task_logs[0].execution_date.isoformat() if task_logs else datetime.now().isoformat(),
"updateFrequency": "Every 4 hours",
"dataSource": "Google Analytics + AI Analysis",
"confidence": 92
},
"measurementMethodology": {
"description": "Organic traffic growth compared to previous period",
"calculationMethod": "Percentage change in organic sessions over 30-day rolling period, weighted by content performance and user engagement",
"dataPoints": ["Organic Sessions", "Page Views", "Bounce Rate", "Time on Site", "Content Performance"],
"validationProcess": "Cross-validated with Google Search Console data and AI-powered content performance analysis"
},
"monitoringTasks": [],
"strategyMapping": {
"relatedComponents": ["Strategic Insights", "Content Strategy", "Audience Analysis"],
"impactAreas": ["Brand Awareness", "Lead Generation", "Market Reach"],
"dependencies": ["SEO Optimization", "Content Quality", "User Experience"]
},
"aiInsights": {
"trendAnalysis": "Traffic growth shows strong upward trend with 15.7% increase. Top-performing content categories are educational blog posts and case studies.",
"recommendations": [
"Increase content production in educational blog category by 25%",
"Optimize case study content for better search visibility",
"Implement A/B testing for content headlines",
"Focus on long-form content (2000+ words) which shows 40% higher engagement"
],
"riskFactors": ["Seasonal traffic fluctuations", "Competitor content strategy changes", "Algorithm updates"],
"opportunities": ["Video content expansion", "Guest posting opportunities", "Social media amplification"]
}
}
# Add real monitoring tasks - map based on task content and purpose
# Group tasks by component for better organization
tasks_by_component = {}
for task in monitoring_tasks:
task_title_lower = task.task_title.lower()
task_description_lower = task.task_description.lower()
# Traffic Growth related tasks
if any(keyword in task_title_lower or keyword in task_description_lower
for keyword in ['traffic', 'organic', 'goal', 'strategic', 'performance', 'prediction']):
task_data = {
"title": task.task_title,
"description": task.task_description,
"assignee": task.assignee,
"frequency": task.frequency,
"metric": task.metric,
"measurementMethod": task.measurement_method,
"successCriteria": task.success_criteria,
"alertThreshold": task.alert_threshold,
"actionableInsights": getattr(task, 'actionable_insights', None),
"status": "active",
"lastExecuted": task_logs[0].execution_date.isoformat() if task_logs else None
}
traffic_growth_data["monitoringTasks"].append(task_data)
transparency_data.append(traffic_growth_data)
# Engagement Rate Metric
engagement_data = {
"metricName": "Engagement Rate",
"currentValue": 8.3,
"unit": "%",
"dataFreshness": {
"lastUpdated": task_logs[0].execution_date.isoformat() if task_logs else datetime.now().isoformat(),
"updateFrequency": "Every 2 hours",
"dataSource": "Social Media Analytics + Website Analytics",
"confidence": 88
},
"measurementMethodology": {
"description": "Average engagement rate across all content and social media",
"calculationMethod": "Weighted average of likes, shares, comments, and time spent across all platforms",
"dataPoints": ["Social Media Engagement", "Website Comments", "Time on Page", "Social Shares", "Email Engagement"],
"validationProcess": "Cross-platform validation using multiple analytics tools and AI sentiment analysis"
},
"monitoringTasks": [],
"strategyMapping": {
"relatedComponents": ["Audience Analysis", "Content Strategy", "Social Media Strategy"],
"impactAreas": ["Brand Engagement", "Community Building", "Customer Loyalty"],
"dependencies": ["Content Quality", "Social Media Presence", "Community Management"]
},
"aiInsights": {
"trendAnalysis": "Engagement rate is stable at 8.3% with peak engagement during lunch hours and early evenings.",
"recommendations": [
"Increase video content production by 50%",
"Optimize posting times for peak engagement hours",
"Implement interactive content elements",
"Focus on community-building content"
component = task.component_name or 'General'
if component not in tasks_by_component:
tasks_by_component[component] = []
tasks_by_component[component].append(task)
# Create transparency data for each component
for component, tasks in tasks_by_component.items():
component_data = {
"metricName": component,
"currentValue": len(tasks),
"unit": "tasks",
"dataFreshness": {
"lastUpdated": task_logs[0].execution_date.isoformat() if task_logs else datetime.now().isoformat(),
"updateFrequency": "Real-time",
"dataSource": "Monitoring System",
"confidence": 95
},
"measurementMethodology": {
"description": f"AI-powered monitoring for {component} with {len(tasks)} active tasks",
"calculationMethod": "Automated monitoring with real-time data collection and analysis",
"dataPoints": [task.metric for task in tasks if task.metric],
"validationProcess": "Cross-validated with multiple data sources and AI analysis"
},
"monitoringTasks": [
{
"title": task.task_title,
"description": task.task_description,
"assignee": task.assignee,
"frequency": task.frequency,
"metric": task.metric,
"measurementMethod": task.measurement_method,
"successCriteria": task.success_criteria,
"alertThreshold": task.alert_threshold,
"status": task.status,
"lastExecuted": task.last_executed.isoformat() if task.last_executed else None
}
for task in tasks
],
"riskFactors": ["Platform algorithm changes", "Content fatigue", "Competition for attention"],
"opportunities": ["Live streaming opportunities", "User-generated content campaigns", "Influencer collaborations"]
}
}
# Add engagement-related tasks
for task in monitoring_tasks:
task_title_lower = task.task_title.lower()
task_description_lower = task.task_description.lower()
if any(keyword in task_title_lower or keyword in task_description_lower
for keyword in ['engagement', 'social', 'community', 'audience', 'insight', 'competitive']):
task_data = {
"title": task.task_title,
"description": task.task_description,
"assignee": task.assignee,
"frequency": task.frequency,
"metric": task.metric,
"measurementMethod": task.measurement_method,
"successCriteria": task.success_criteria,
"alertThreshold": task.alert_threshold,
"actionableInsights": getattr(task, 'actionable_insights', None),
"status": "active",
"lastExecuted": task_logs[0].execution_date.isoformat() if task_logs else None
"strategyMapping": {
"relatedComponents": [component],
"impactAreas": ["Performance Monitoring", "Strategy Optimization", "Risk Management"],
"dependencies": ["Data Collection", "AI Analysis", "Alert System"]
},
"aiInsights": {
"trendAnalysis": f"Active monitoring for {component} with {len(tasks)} configured tasks",
"recommendations": [
"Monitor task execution status regularly",
"Review performance metrics weekly",
"Adjust thresholds based on performance trends"
],
"riskFactors": ["Task execution failures", "Data collection issues", "System downtime"],
"opportunities": ["Automated optimization", "Predictive analytics", "Enhanced monitoring"]
}
engagement_data["monitoringTasks"].append(task_data)
transparency_data.append(engagement_data)
# Conversion Rate Metric
conversion_data = {
"metricName": "Conversion Rate",
"currentValue": 2.1,
"unit": "%",
"dataFreshness": {
"lastUpdated": task_logs[0].execution_date.isoformat() if task_logs else datetime.now().isoformat(),
"updateFrequency": "Every 6 hours",
"dataSource": "Google Analytics + CRM Data",
"confidence": 85
},
"measurementMethodology": {
"description": "Content-driven conversion rate across all touchpoints",
"calculationMethod": "Conversions divided by total visitors, weighted by content attribution and customer journey analysis",
"dataPoints": ["Website Conversions", "Email Signups", "Lead Form Submissions", "Content Downloads", "Sales Attribution"],
"validationProcess": "CRM integration validation and conversion funnel analysis"
},
"monitoringTasks": [],
"strategyMapping": {
"relatedComponents": ["Performance Predictions", "Implementation Roadmap", "Risk Assessment"],
"impactAreas": ["Revenue Generation", "Lead Quality", "Customer Acquisition"],
"dependencies": ["Content Quality", "User Experience", "Lead Nurturing"]
},
"aiInsights": {
"trendAnalysis": "Conversion rate is improving steadily with 2.1% current rate. Top-converting content includes case studies and product demos.",
"recommendations": [
"Increase case study and demo content production",
"Optimize mobile user experience further",
"Implement personalized content recommendations",
"A/B test call-to-action buttons and forms"
],
"riskFactors": ["Market competition", "Economic factors", "Technology changes"],
"opportunities": ["Personalization opportunities", "Automation implementation", "Cross-selling strategies"]
}
}
# Add conversion-related tasks
for task in monitoring_tasks:
task_title_lower = task.task_title.lower()
task_description_lower = task.task_description.lower()
if any(keyword in task_title_lower or keyword in task_description_lower
for keyword in ['conversion', 'funnel', 'implementation', 'resource', 'risk', 'mitigation']):
task_data = {
"title": task.task_title,
"description": task.task_description,
"assignee": task.assignee,
"frequency": task.frequency,
"metric": task.metric,
"measurementMethod": task.measurement_method,
"successCriteria": task.success_criteria,
"alertThreshold": task.alert_threshold,
"actionableInsights": getattr(task, 'actionable_insights', None),
"status": "active",
"lastExecuted": task_logs[0].execution_date.isoformat() if task_logs else None
transparency_data.append(component_data)
# If no monitoring tasks found, create a default transparency entry
if not transparency_data:
transparency_data = [{
"metricName": "Strategy Monitoring",
"currentValue": 0,
"unit": "tasks",
"dataFreshness": {
"lastUpdated": datetime.now().isoformat(),
"updateFrequency": "Real-time",
"dataSource": "Monitoring System",
"confidence": 0
},
"measurementMethodology": {
"description": "No monitoring tasks configured yet",
"calculationMethod": "Manual setup required",
"dataPoints": [],
"validationProcess": "Not applicable"
},
"monitoringTasks": [],
"strategyMapping": {
"relatedComponents": ["Strategy"],
"impactAreas": ["Monitoring"],
"dependencies": ["Setup"]
},
"aiInsights": {
"trendAnalysis": "No monitoring data available",
"recommendations": ["Set up monitoring tasks", "Configure alerts", "Enable data collection"],
"riskFactors": ["No monitoring in place"],
"opportunities": ["Implement comprehensive monitoring"]
}
conversion_data["monitoringTasks"].append(task_data)
transparency_data.append(conversion_data)
}]
# Return the transparency data
return {
"success": True,
"data": transparency_data,
"message": "Transparency data retrieved successfully"
"message": f"Transparency data retrieved successfully for strategy {strategy_id}"
}
except Exception as e:

View File

@@ -177,7 +177,7 @@ class AIQualityAnalysisService:
Focus on strategic depth, clarity, and measurability.
"""
ai_response = await gemini_structured_json_response(
ai_response = gemini_structured_json_response(
prompt=prompt,
schema=QUALITY_ANALYSIS_SCHEMA,
temperature=0.3,

View File

@@ -0,0 +1,359 @@
"""
Monitoring Data Service
Handles saving and retrieving monitoring data from database and cache.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime
from sqlalchemy.orm import Session
from sqlalchemy import and_, desc
from models.monitoring_models import (
StrategyMonitoringPlan, MonitoringTask, TaskExecutionLog,
StrategyPerformanceMetrics, StrategyActivationStatus
)
from models.enhanced_strategy_models import EnhancedContentStrategy
logger = logging.getLogger(__name__)
class MonitoringDataService:
"""Service for managing monitoring data in database and cache."""
def __init__(self, db_session: Session):
self.db = db_session
async def save_monitoring_data(self, strategy_id: int, monitoring_plan: Dict[str, Any]) -> bool:
"""Save monitoring plan and tasks to database."""
try:
logger.info(f"Saving monitoring data for strategy {strategy_id}")
logger.info(f"Monitoring plan received: {monitoring_plan}")
# Save the complete monitoring plan
monitoring_plan_record = StrategyMonitoringPlan(
strategy_id=strategy_id,
plan_data=monitoring_plan
)
self.db.add(monitoring_plan_record)
# Save individual monitoring tasks
monitoring_tasks = monitoring_plan.get('monitoringTasks', [])
logger.info(f"Found {len(monitoring_tasks)} monitoring tasks to save")
for i, task_data in enumerate(monitoring_tasks):
logger.info(f"Saving task {i+1}: {task_data.get('title', 'Unknown')}")
task = MonitoringTask(
strategy_id=strategy_id,
component_name=task_data.get('component', ''),
task_title=task_data.get('title', ''),
task_description=task_data.get('description', ''),
assignee=task_data.get('assignee', 'ALwrity'),
frequency=task_data.get('frequency', 'Monthly'),
metric=task_data.get('metric', ''),
measurement_method=task_data.get('measurementMethod', ''),
success_criteria=task_data.get('successCriteria', ''),
alert_threshold=task_data.get('alertThreshold', ''),
status='active'
)
self.db.add(task)
# Save activation status
activation_status = StrategyActivationStatus(
strategy_id=strategy_id,
user_id=1, # Default user ID
activation_date=datetime.utcnow(),
status='active'
)
self.db.add(activation_status)
# Save initial performance metrics
performance_metrics = StrategyPerformanceMetrics(
strategy_id=strategy_id,
user_id=1, # Default user ID
metric_date=datetime.utcnow(),
data_source='monitoring_plan',
confidence_score=85 # High confidence for monitoring plan data
)
self.db.add(performance_metrics)
self.db.commit()
logger.info(f"Successfully saved monitoring data for strategy {strategy_id}")
return True
except Exception as e:
logger.error(f"Error saving monitoring data for strategy {strategy_id}: {e}")
self.db.rollback()
return False
async def get_monitoring_data(self, strategy_id: int) -> Optional[Dict[str, Any]]:
"""Get monitoring data from database."""
try:
logger.info(f"Retrieving monitoring data for strategy {strategy_id}")
# Get the monitoring plan
monitoring_plan = self.db.query(StrategyMonitoringPlan).filter(
StrategyMonitoringPlan.strategy_id == strategy_id
).order_by(desc(StrategyMonitoringPlan.created_at)).first()
if not monitoring_plan:
logger.warning(f"No monitoring plan found for strategy {strategy_id}")
return None
# Get monitoring tasks
tasks = self.db.query(MonitoringTask).filter(
MonitoringTask.strategy_id == strategy_id
).all()
# Get activation status
activation_status = self.db.query(StrategyActivationStatus).filter(
StrategyActivationStatus.strategy_id == strategy_id
).first()
# Get performance metrics
performance_metrics = self.db.query(StrategyPerformanceMetrics).filter(
StrategyPerformanceMetrics.strategy_id == strategy_id
).order_by(desc(StrategyPerformanceMetrics.metric_date)).first()
# Build comprehensive monitoring data
monitoring_data = {
'strategy_id': strategy_id,
'monitoring_plan': monitoring_plan.plan_data,
'monitoring_tasks': [
{
'id': task.id,
'component': task.component_name,
'title': task.task_title,
'description': task.task_description,
'assignee': task.assignee,
'frequency': task.frequency,
'metric': task.metric,
'measurementMethod': task.measurement_method,
'successCriteria': task.success_criteria,
'alertThreshold': task.alert_threshold,
'status': task.status,
'last_executed': task.last_executed.isoformat() if task.last_executed else None,
'next_execution': task.next_execution.isoformat() if task.next_execution else None
}
for task in tasks
],
'activation_status': {
'activation_date': activation_status.activation_date.isoformat() if activation_status else None,
'status': activation_status.status if activation_status else 'unknown',
'performance_score': activation_status.performance_score if activation_status else None
},
'performance_metrics': {
'traffic_growth': performance_metrics.traffic_growth_percentage if performance_metrics else None,
'engagement_rate': performance_metrics.engagement_rate_percentage if performance_metrics else None,
'conversion_rate': performance_metrics.conversion_rate_percentage if performance_metrics else None,
'roi_ratio': performance_metrics.roi_ratio if performance_metrics else None,
'content_quality_score': performance_metrics.content_quality_score if performance_metrics else None,
'data_source': performance_metrics.data_source if performance_metrics else None,
'confidence_score': performance_metrics.confidence_score if performance_metrics else None
},
'created_at': monitoring_plan.created_at.isoformat(),
'updated_at': monitoring_plan.updated_at.isoformat()
}
logger.info(f"Successfully retrieved monitoring data for strategy {strategy_id}")
return monitoring_data
except Exception as e:
logger.error(f"Error retrieving monitoring data for strategy {strategy_id}: {e}")
return None
async def get_analytics_data(self, strategy_id: int) -> Dict[str, Any]:
"""Get analytics data from monitoring data (no external API calls)."""
try:
logger.info(f"Generating analytics data for strategy {strategy_id}")
# Get monitoring data from database
monitoring_data = await self.get_monitoring_data(strategy_id)
if not monitoring_data:
logger.warning(f"No monitoring data found for strategy {strategy_id}")
return self._get_empty_analytics_data()
# Extract analytics from monitoring data
monitoring_plan = monitoring_data['monitoring_plan']
tasks = monitoring_data['monitoring_tasks']
performance_metrics = monitoring_data['performance_metrics']
# Always use monitoring tasks from the plan for rich data, fallback to database tasks
monitoring_tasks = []
if monitoring_plan.get('monitoringTasks'):
# Use rich data from monitoring plan
monitoring_tasks = [
{
'id': i + 1,
'component': task.get('component', ''),
'title': task.get('title', ''),
'description': task.get('description', ''),
'assignee': task.get('assignee', 'ALwrity'),
'frequency': task.get('frequency', 'Monthly'),
'metric': task.get('metric', ''),
'measurementMethod': task.get('measurementMethod', ''),
'successCriteria': task.get('successCriteria', ''),
'alertThreshold': task.get('alertThreshold', ''),
'actionableInsights': task.get('actionableInsights', ''),
'status': 'active',
'last_executed': None,
'next_execution': None
}
for i, task in enumerate(monitoring_plan.get('monitoringTasks', []))
]
elif tasks:
# Fallback to database tasks if plan doesn't have them
monitoring_tasks = [
{
'id': task.id,
'component': task.component_name,
'title': task.task_title,
'description': task.task_description,
'assignee': task.assignee,
'frequency': task.frequency,
'metric': task.metric,
'measurementMethod': task.measurement_method,
'successCriteria': task.success_criteria,
'alertThreshold': task.alert_threshold,
'actionableInsights': '',
'status': task.status,
'last_executed': task.last_executed.isoformat() if task.last_executed else None,
'next_execution': task.next_execution.isoformat() if task.next_execution else None
}
for task in tasks
]
# Always use performance metrics from success metrics for rich data
extracted_metrics = {}
if monitoring_plan.get('successMetrics'):
success_metrics = monitoring_plan['successMetrics']
extracted_metrics = {
'traffic_growth': self._extract_percentage(success_metrics.get('trafficGrowth', {}).get('current', '0%')),
'engagement_rate': self._extract_percentage(success_metrics.get('engagementRate', {}).get('current', '0%')),
'conversion_rate': self._extract_percentage(success_metrics.get('conversionRate', {}).get('current', '0%')),
'roi_ratio': self._extract_ratio(success_metrics.get('roi', {}).get('current', '0:1')),
'content_quality_score': self._extract_percentage(success_metrics.get('contentQuality', {}).get('current', '0%')),
'data_source': 'monitoring_plan',
'confidence_score': 85
}
else:
# Fallback to database metrics if plan doesn't have them
extracted_metrics = {
'traffic_growth': performance_metrics.get('traffic_growth', 0),
'engagement_rate': performance_metrics.get('engagement_rate', 0),
'conversion_rate': performance_metrics.get('conversion_rate', 0),
'roi_ratio': performance_metrics.get('roi_ratio', 0),
'content_quality_score': performance_metrics.get('content_quality_score', 0),
'data_source': performance_metrics.get('data_source', 'database'),
'confidence_score': performance_metrics.get('confidence_score', 70)
}
# Build analytics data from monitoring plan
analytics_data = {
'performance_trends': {
'traffic_growth': extracted_metrics.get('traffic_growth', 0),
'engagement_rate': extracted_metrics.get('engagement_rate', 0),
'conversion_rate': extracted_metrics.get('conversion_rate', 0),
'roi_ratio': extracted_metrics.get('roi_ratio', 0),
'content_quality_score': extracted_metrics.get('content_quality_score', 0)
},
'content_evolution': {
'content_pillars': monitoring_plan.get('contentPillars', []),
'content_mix': monitoring_plan.get('contentMix', {}),
'publishing_frequency': monitoring_plan.get('publishingFrequency', ''),
'quality_metrics': monitoring_plan.get('qualityMetrics', [])
},
'engagement_patterns': {
'audience_segments': monitoring_plan.get('audienceSegments', []),
'engagement_metrics': monitoring_plan.get('engagementMetrics', {}),
'optimal_timing': monitoring_plan.get('optimalTiming', {}),
'platform_performance': monitoring_plan.get('platformPerformance', {})
},
'recommendations': monitoring_plan.get('recommendations', []),
'insights': monitoring_plan.get('insights', []),
'monitoring_data': monitoring_data,
'monitoring_tasks': monitoring_tasks,
'monitoring_plan': monitoring_plan, # Include full monitoring plan for rich data
'success_metrics': monitoring_plan.get('successMetrics', {}), # Include success metrics
'monitoring_schedule': monitoring_plan.get('monitoringSchedule', {}), # Include monitoring schedule
'_source': 'database_monitoring',
'data_freshness': monitoring_data['updated_at'],
'confidence_score': extracted_metrics.get('confidence_score', 85)
}
logger.info(f"Successfully generated analytics data for strategy {strategy_id}")
return analytics_data
except Exception as e:
logger.error(f"Error generating analytics data for strategy {strategy_id}: {e}")
return self._get_empty_analytics_data()
def _get_empty_analytics_data(self) -> Dict[str, Any]:
"""Return empty analytics data structure."""
return {
'performance_trends': {},
'content_evolution': {},
'engagement_patterns': {},
'recommendations': [],
'insights': [],
'monitoring_data': None,
'monitoring_tasks': [],
'_source': 'empty',
'data_freshness': datetime.utcnow().isoformat(),
'confidence_score': 0
}
def _extract_percentage(self, value: str) -> float:
"""Extract percentage value from string like '15%'."""
try:
if isinstance(value, str) and '%' in value:
return float(value.replace('%', ''))
elif isinstance(value, (int, float)):
return float(value)
else:
return 0.0
except (ValueError, TypeError):
return 0.0
def _extract_ratio(self, value: str) -> float:
"""Extract ratio value from string like '3:1'."""
try:
if isinstance(value, str) and ':' in value:
parts = value.split(':')
if len(parts) == 2:
return float(parts[0]) / float(parts[1])
elif isinstance(value, (int, float)):
return float(value)
else:
return 0.0
except (ValueError, TypeError):
return 0.0
async def update_performance_metrics(self, strategy_id: int, metrics: Dict[str, Any]) -> bool:
"""Update performance metrics for a strategy."""
try:
logger.info(f"Updating performance metrics for strategy {strategy_id}")
performance_metrics = StrategyPerformanceMetrics(
strategy_id=strategy_id,
user_id=1, # Default user ID
metric_date=datetime.utcnow(),
traffic_growth_percentage=metrics.get('traffic_growth'),
engagement_rate_percentage=metrics.get('engagement_rate'),
conversion_rate_percentage=metrics.get('conversion_rate'),
roi_ratio=metrics.get('roi_ratio'),
content_quality_score=metrics.get('content_quality_score'),
data_source='manual_update',
confidence_score=metrics.get('confidence_score', 70)
)
self.db.add(performance_metrics)
self.db.commit()
logger.info(f"Successfully updated performance metrics for strategy {strategy_id}")
return True
except Exception as e:
logger.error(f"Error updating performance metrics for strategy {strategy_id}: {e}")
self.db.rollback()
return False

View File

@@ -9,6 +9,7 @@ import os
import sys
import subprocess
import time
import argparse
from pathlib import Path
def install_requirements():
@@ -213,18 +214,25 @@ def setup_environment():
print("✅ Environment setup complete")
def start_backend():
def start_backend(enable_reload=False):
"""Start the backend server."""
print("🚀 Starting ALwrity Backend...")
# Set environment variables
os.environ.setdefault("HOST", "0.0.0.0")
os.environ.setdefault("PORT", "8000")
os.environ.setdefault("RELOAD", "true")
# Set reload based on argument or environment variable
if enable_reload:
os.environ.setdefault("RELOAD", "true")
print(" 🔄 Development mode: Auto-reload enabled")
else:
os.environ.setdefault("RELOAD", "false")
print(" 🏭 Production mode: Auto-reload disabled")
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "8000"))
reload = os.getenv("RELOAD", "true").lower() == "true"
reload = os.getenv("RELOAD", "false").lower() == "true"
print(f" 📍 Host: {host}")
print(f" 🔌 Port: {port}")
@@ -242,12 +250,48 @@ def start_backend():
print(" 📈 API Monitoring: http://localhost:8000/api/content-planning/monitoring/health")
print("\n⏹️ Press Ctrl+C to stop the server")
print("=" * 60)
print("\n💡 Usage:")
print(" Production mode (default): python start_alwrity_backend.py")
print(" Development mode: python start_alwrity_backend.py --dev")
print(" With auto-reload: python start_alwrity_backend.py --reload")
print("=" * 60)
uvicorn.run(
"app:app",
host=host,
port=port,
reload=reload,
reload_excludes=[
"*.pyc",
"*.pyo",
"*.pyd",
"__pycache__",
"*.log",
"*.sqlite",
"*.db",
"*.tmp",
"*.temp",
"test_*.py",
"temp_*.py",
"monitoring_data_service.py",
"test_monitoring_save.py",
"*.json",
"*.yaml",
"*.yml",
".env*",
"logs/*",
"cache/*",
"tmp/*",
"temp/*",
"middleware/*",
"models/*",
"scripts/*"
],
reload_includes=[
"app.py",
"api/**/*.py",
"services/**/*.py"
],
log_level="info"
)
@@ -261,6 +305,12 @@ def start_backend():
def main():
"""Main function to set up and start the backend."""
# Parse command line arguments
parser = argparse.ArgumentParser(description="ALwrity Backend Server")
parser.add_argument("--reload", action="store_true", help="Enable auto-reload for development")
parser.add_argument("--dev", action="store_true", help="Enable development mode (auto-reload)")
args = parser.parse_args()
print("🎯 ALwrity Backend Server")
print("=" * 40)
@@ -279,8 +329,9 @@ def main():
# Setup environment
setup_environment()
# Start backend
return start_backend()
# Start backend with reload option
enable_reload = args.reload or args.dev
return start_backend(enable_reload=enable_reload)
if __name__ == "__main__":
success = main()