ALwrity version 0.5.6
This commit is contained in:
@@ -0,0 +1,22 @@
|
||||
"""
|
||||
Data Sources Package for Calendar Generation Framework
|
||||
|
||||
Individual modules for each data source to ensure separation of concerns
|
||||
and maintainability as the framework grows.
|
||||
"""
|
||||
|
||||
from .content_strategy_source import ContentStrategyDataSource
|
||||
from .gap_analysis_source import GapAnalysisDataSource
|
||||
from .keywords_source import KeywordsDataSource
|
||||
from .content_pillars_source import ContentPillarsDataSource
|
||||
from .performance_source import PerformanceDataSource
|
||||
from .ai_analysis_source import AIAnalysisDataSource
|
||||
|
||||
__all__ = [
|
||||
"ContentStrategyDataSource",
|
||||
"GapAnalysisDataSource",
|
||||
"KeywordsDataSource",
|
||||
"ContentPillarsDataSource",
|
||||
"PerformanceDataSource",
|
||||
"AIAnalysisDataSource"
|
||||
]
|
||||
@@ -0,0 +1,252 @@
|
||||
"""
|
||||
AI Analysis Data Source
|
||||
|
||||
Provides comprehensive AI analysis data with strategic intelligence
|
||||
and predictive insights for calendar generation.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, Any, List
|
||||
from datetime import datetime
|
||||
|
||||
from ..interfaces import DataSourceInterface, DataSourceType, DataSourcePriority, DataSourceValidationResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AIAnalysisDataSource(DataSourceInterface):
|
||||
"""AI Analysis Data Source with strategic intelligence capabilities."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("ai_analysis", DataSourceType.AI, DataSourcePriority.HIGH)
|
||||
self.version = "2.0.0"
|
||||
|
||||
async def get_data(self, user_id: int, strategy_id: int) -> Dict[str, Any]:
|
||||
"""Retrieve comprehensive AI analysis data."""
|
||||
try:
|
||||
logger.info(f"Retrieved AI analysis data for strategy {strategy_id}")
|
||||
|
||||
ai_analysis_data = {
|
||||
"strategy_id": strategy_id,
|
||||
"user_id": user_id,
|
||||
"retrieved_at": datetime.utcnow().isoformat(),
|
||||
|
||||
"strategic_insights": {
|
||||
"market_positioning": "Thought leadership in AI implementation",
|
||||
"competitive_advantage": "Technical depth and practical expertise",
|
||||
"growth_opportunities": ["AI democratization", "Digital transformation", "Industry 4.0"],
|
||||
"risk_factors": ["Market saturation", "Rapid technology changes", "Competition from big tech"]
|
||||
},
|
||||
|
||||
"content_intelligence": {
|
||||
"trending_topics": [
|
||||
"AI implementation best practices",
|
||||
"Digital transformation case studies",
|
||||
"Machine learning applications",
|
||||
"AI ethics and governance"
|
||||
],
|
||||
"content_gaps": [
|
||||
"Practical AI implementation guides",
|
||||
"Industry-specific AI applications",
|
||||
"AI ROI measurement frameworks"
|
||||
],
|
||||
"engagement_patterns": {
|
||||
"high_performing": "Technical deep-dives and case studies",
|
||||
"low_performing": "Generic industry news",
|
||||
"viral_potential": "AI implementation success stories"
|
||||
}
|
||||
},
|
||||
|
||||
"audience_intelligence": {
|
||||
"behavior_patterns": {
|
||||
"content_preferences": ["Technical guides", "Case studies", "Industry insights"],
|
||||
"engagement_times": ["Tuesday 10-11 AM", "Thursday 2-3 PM"],
|
||||
"platform_preferences": ["LinkedIn", "Blog", "Webinars"]
|
||||
},
|
||||
"pain_points": [
|
||||
"AI implementation complexity",
|
||||
"Digital transformation challenges",
|
||||
"Technology adoption barriers"
|
||||
],
|
||||
"decision_factors": [
|
||||
"Practical implementation guidance",
|
||||
"Proven success stories",
|
||||
"ROI demonstration"
|
||||
]
|
||||
},
|
||||
|
||||
"predictive_analytics": {
|
||||
"content_performance_forecast": {
|
||||
"expected_engagement": 0.09,
|
||||
"predicted_conversions": 0.035,
|
||||
"growth_trajectory": "positive"
|
||||
},
|
||||
"market_trends": [
|
||||
"AI democratization accelerating",
|
||||
"Digital transformation becoming mainstream",
|
||||
"Industry-specific AI solutions growing"
|
||||
],
|
||||
"opportunity_forecast": {
|
||||
"short_term": "AI implementation guides",
|
||||
"medium_term": "Industry-specific AI applications",
|
||||
"long_term": "AI strategy consulting"
|
||||
}
|
||||
},
|
||||
|
||||
"optimization_recommendations": {
|
||||
"content_strategy": [
|
||||
"Increase technical content by 25%",
|
||||
"Add more case studies and success stories",
|
||||
"Focus on practical implementation guides"
|
||||
],
|
||||
"audience_targeting": [
|
||||
"Target C-level executives for thought leadership",
|
||||
"Focus on mid-level managers for practical guides",
|
||||
"Engage technical audience with deep-dives"
|
||||
],
|
||||
"platform_optimization": [
|
||||
"Optimize LinkedIn for B2B engagement",
|
||||
"Enhance blog for SEO and lead generation",
|
||||
"Use webinars for thought leadership"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
enhanced_data = await self._enhance_with_ai_insights(ai_analysis_data)
|
||||
return enhanced_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error retrieving AI analysis data: {e}")
|
||||
return {}
|
||||
|
||||
async def validate_data(self, data: Dict[str, Any]) -> DataSourceValidationResult:
|
||||
"""Validate AI analysis data quality."""
|
||||
try:
|
||||
validation_result = DataSourceValidationResult(
|
||||
is_valid=True, quality_score=0.0
|
||||
)
|
||||
|
||||
completeness_score = self._calculate_completeness(data)
|
||||
quality_score = self._calculate_quality(data)
|
||||
intelligence_score = self._calculate_intelligence(data)
|
||||
|
||||
overall_score = (completeness_score + quality_score + intelligence_score) / 3
|
||||
validation_result.quality_score = overall_score
|
||||
|
||||
issues = self._identify_issues(data)
|
||||
for issue in issues:
|
||||
validation_result.add_error(issue)
|
||||
|
||||
recommendations = self._generate_recommendations(data, issues)
|
||||
for recommendation in recommendations:
|
||||
validation_result.add_recommendation(recommendation)
|
||||
|
||||
validation_result.is_valid = overall_score >= 0.7
|
||||
return validation_result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error validating AI analysis data: {e}")
|
||||
validation_result = DataSourceValidationResult(
|
||||
is_valid=False, quality_score=0.0
|
||||
)
|
||||
validation_result.add_error(f"Validation error: {str(e)}")
|
||||
validation_result.add_recommendation("Review data structure and retry validation")
|
||||
return validation_result
|
||||
|
||||
async def enhance_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Enhance AI analysis data with additional insights."""
|
||||
try:
|
||||
logger.info("Enhanced AI analysis data with additional insights")
|
||||
enhanced_data = data.copy()
|
||||
enhanced_data["ai_insights"] = {
|
||||
"strategic_recommendations": [
|
||||
"Focus on AI implementation expertise to differentiate",
|
||||
"Develop industry-specific AI solutions",
|
||||
"Build thought leadership in AI ethics and governance"
|
||||
],
|
||||
"content_optimization": [
|
||||
"Create more technical deep-dive content",
|
||||
"Develop comprehensive case studies",
|
||||
"Focus on practical implementation guides"
|
||||
]
|
||||
}
|
||||
return enhanced_data
|
||||
except Exception as e:
|
||||
logger.error(f"Error enhancing AI analysis data: {e}")
|
||||
return data
|
||||
|
||||
async def _enhance_with_ai_insights(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return await self.enhance_data(data)
|
||||
|
||||
def _calculate_completeness(self, data: Dict[str, Any]) -> float:
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
required_fields = ["strategic_insights", "content_intelligence", "audience_intelligence"]
|
||||
present_fields = sum(1 for field in required_fields if field in data and data[field])
|
||||
return present_fields / len(required_fields)
|
||||
|
||||
def _calculate_quality(self, data: Dict[str, Any]) -> float:
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
quality_indicators = []
|
||||
|
||||
if "strategic_insights" in data:
|
||||
quality_indicators.append(0.9)
|
||||
|
||||
if "content_intelligence" in data:
|
||||
quality_indicators.append(0.85)
|
||||
|
||||
if "audience_intelligence" in data:
|
||||
quality_indicators.append(0.8)
|
||||
|
||||
if "predictive_analytics" in data:
|
||||
quality_indicators.append(0.75)
|
||||
|
||||
return sum(quality_indicators) / len(quality_indicators) if quality_indicators else 0.0
|
||||
|
||||
def _calculate_intelligence(self, data: Dict[str, Any]) -> float:
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
intelligence_indicators = []
|
||||
|
||||
if "predictive_analytics" in data:
|
||||
intelligence_indicators.append(0.9)
|
||||
|
||||
if "optimization_recommendations" in data:
|
||||
intelligence_indicators.append(0.85)
|
||||
|
||||
return sum(intelligence_indicators) / len(intelligence_indicators) if intelligence_indicators else 0.0
|
||||
|
||||
def _identify_issues(self, data: Dict[str, Any]) -> list:
|
||||
issues = []
|
||||
if not data:
|
||||
issues.append("No AI analysis data available")
|
||||
return issues
|
||||
|
||||
critical_fields = ["strategic_insights", "content_intelligence", "audience_intelligence"]
|
||||
for field in critical_fields:
|
||||
if field not in data or not data[field]:
|
||||
issues.append(f"Missing critical field: {field}")
|
||||
|
||||
return issues
|
||||
|
||||
def _generate_recommendations(self, data: Dict[str, Any], issues: list) -> list:
|
||||
recommendations = []
|
||||
for issue in issues:
|
||||
if "Missing critical field: strategic_insights" in issue:
|
||||
recommendations.append("Generate strategic insights analysis")
|
||||
elif "Missing critical field: content_intelligence" in issue:
|
||||
recommendations.append("Analyze content intelligence data")
|
||||
elif "Missing critical field: audience_intelligence" in issue:
|
||||
recommendations.append("Gather audience intelligence insights")
|
||||
|
||||
return recommendations
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"AIAnalysisDataSource(id={self.source_id}, version={self.version})"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"AIAnalysisDataSource(id={self.source_id}, type={self.source_type.value}, priority={self.priority.value}, version={self.version}, active={self.is_active})"
|
||||
@@ -0,0 +1,186 @@
|
||||
"""
|
||||
Content Pillars Data Source
|
||||
|
||||
Provides comprehensive content pillar data with AI enhancement
|
||||
and strategic distribution for calendar generation.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, Any, List
|
||||
from datetime import datetime
|
||||
|
||||
from ..interfaces import DataSourceInterface, DataSourceType, DataSourcePriority, DataSourceValidationResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ContentPillarsDataSource(DataSourceInterface):
|
||||
"""Content Pillars Data Source with AI enhancement capabilities."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("content_pillars", DataSourceType.STRATEGY, DataSourcePriority.MEDIUM)
|
||||
self.version = "1.5.0"
|
||||
|
||||
async def get_data(self, user_id: int, strategy_id: int) -> Dict[str, Any]:
|
||||
"""Retrieve comprehensive content pillars data."""
|
||||
try:
|
||||
logger.info(f"Retrieved content pillars data for strategy {strategy_id}")
|
||||
|
||||
pillars_data = {
|
||||
"strategy_id": strategy_id,
|
||||
"user_id": user_id,
|
||||
"retrieved_at": datetime.utcnow().isoformat(),
|
||||
|
||||
"content_pillars": [
|
||||
{
|
||||
"name": "AI & Machine Learning",
|
||||
"weight": 0.35,
|
||||
"topics": ["AI implementation", "ML algorithms", "Data science"],
|
||||
"target_audience": "primary",
|
||||
"content_types": ["case_studies", "technical_guides", "thought_leadership"]
|
||||
},
|
||||
{
|
||||
"name": "Digital Transformation",
|
||||
"weight": 0.25,
|
||||
"topics": ["Digital strategy", "Technology adoption", "Change management"],
|
||||
"target_audience": "primary",
|
||||
"content_types": ["guides", "case_studies", "best_practices"]
|
||||
},
|
||||
{
|
||||
"name": "Industry Insights",
|
||||
"weight": 0.20,
|
||||
"topics": ["Market trends", "Competitive analysis", "Future predictions"],
|
||||
"target_audience": "both",
|
||||
"content_types": ["trend_reports", "analysis", "predictions"]
|
||||
},
|
||||
{
|
||||
"name": "Best Practices",
|
||||
"weight": 0.20,
|
||||
"topics": ["Implementation guides", "Success stories", "Expert tips"],
|
||||
"target_audience": "secondary",
|
||||
"content_types": ["how_to_guides", "tips", "tutorials"]
|
||||
}
|
||||
],
|
||||
|
||||
"pillar_performance": {
|
||||
"AI & Machine Learning": {"engagement": 0.85, "conversion": 0.12},
|
||||
"Digital Transformation": {"engagement": 0.78, "conversion": 0.10},
|
||||
"Industry Insights": {"engagement": 0.82, "conversion": 0.08},
|
||||
"Best Practices": {"engagement": 0.75, "conversion": 0.15}
|
||||
}
|
||||
}
|
||||
|
||||
enhanced_data = await self._enhance_with_ai_insights(pillars_data)
|
||||
return enhanced_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error retrieving content pillars data: {e}")
|
||||
return {}
|
||||
|
||||
async def validate_data(self, data: Dict[str, Any]) -> DataSourceValidationResult:
|
||||
"""Validate content pillars data quality."""
|
||||
try:
|
||||
validation_result = DataSourceValidationResult(
|
||||
is_valid=True, quality_score=0.0
|
||||
)
|
||||
|
||||
completeness_score = self._calculate_completeness(data)
|
||||
quality_score = self._calculate_quality(data)
|
||||
balance_score = self._calculate_balance(data)
|
||||
|
||||
overall_score = (completeness_score + quality_score + balance_score) / 3
|
||||
validation_result.quality_score = overall_score
|
||||
|
||||
issues = self._identify_issues(data)
|
||||
for issue in issues:
|
||||
validation_result.add_error(issue)
|
||||
|
||||
recommendations = self._generate_recommendations(data, issues)
|
||||
for recommendation in recommendations:
|
||||
validation_result.add_recommendation(recommendation)
|
||||
|
||||
validation_result.is_valid = overall_score >= 0.7
|
||||
return validation_result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error validating content pillars data: {e}")
|
||||
validation_result = DataSourceValidationResult(
|
||||
is_valid=False, quality_score=0.0
|
||||
)
|
||||
validation_result.add_error(f"Validation error: {str(e)}")
|
||||
validation_result.add_recommendation("Review data structure and retry validation")
|
||||
return validation_result
|
||||
|
||||
async def enhance_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Enhance content pillars data with AI insights."""
|
||||
try:
|
||||
logger.info("Enhanced content pillars data with AI insights")
|
||||
enhanced_data = data.copy()
|
||||
enhanced_data["ai_insights"] = {
|
||||
"pillar_optimization": [
|
||||
"Increase AI & ML pillar content for higher engagement",
|
||||
"Balance content mix across all pillars",
|
||||
"Focus on high-converting pillar content"
|
||||
]
|
||||
}
|
||||
return enhanced_data
|
||||
except Exception as e:
|
||||
logger.error(f"Error enhancing content pillars data: {e}")
|
||||
return data
|
||||
|
||||
async def _enhance_with_ai_insights(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return await self.enhance_data(data)
|
||||
|
||||
def _calculate_completeness(self, data: Dict[str, Any]) -> float:
|
||||
if not data or "content_pillars" not in data:
|
||||
return 0.0
|
||||
pillars = data["content_pillars"]
|
||||
return min(len(pillars) / 4, 1.0) if isinstance(pillars, list) else 0.0
|
||||
|
||||
def _calculate_quality(self, data: Dict[str, Any]) -> float:
|
||||
if not data or "content_pillars" not in data:
|
||||
return 0.0
|
||||
pillars = data["content_pillars"]
|
||||
if not isinstance(pillars, list):
|
||||
return 0.0
|
||||
|
||||
quality_scores = []
|
||||
for pillar in pillars:
|
||||
if isinstance(pillar, dict) and "name" in pillar and "weight" in pillar:
|
||||
quality_scores.append(1.0)
|
||||
|
||||
return sum(quality_scores) / len(quality_scores) if quality_scores else 0.0
|
||||
|
||||
def _calculate_balance(self, data: Dict[str, Any]) -> float:
|
||||
if not data or "content_pillars" not in data:
|
||||
return 0.0
|
||||
pillars = data["content_pillars"]
|
||||
if not isinstance(pillars, list):
|
||||
return 0.0
|
||||
|
||||
total_weight = sum(pillar.get("weight", 0) for pillar in pillars)
|
||||
return 1.0 if abs(total_weight - 1.0) < 0.1 else 0.5
|
||||
|
||||
def _identify_issues(self, data: Dict[str, Any]) -> list:
|
||||
issues = []
|
||||
if not data:
|
||||
issues.append("No content pillars data available")
|
||||
return issues
|
||||
|
||||
if "content_pillars" not in data or not data["content_pillars"]:
|
||||
issues.append("Missing content pillars")
|
||||
|
||||
return issues
|
||||
|
||||
def _generate_recommendations(self, data: Dict[str, Any], issues: list) -> list:
|
||||
recommendations = []
|
||||
for issue in issues:
|
||||
if "Missing content pillars" in issue:
|
||||
recommendations.append("Define content pillars for your strategy")
|
||||
return recommendations
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"ContentPillarsDataSource(id={self.source_id}, version={self.version})"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"ContentPillarsDataSource(id={self.source_id}, type={self.source_type.value}, priority={self.priority.value}, version={self.version}, active={self.is_active})"
|
||||
@@ -0,0 +1,432 @@
|
||||
"""
|
||||
Content Strategy Data Source
|
||||
|
||||
Provides comprehensive content strategy data with AI enhancement
|
||||
and quality validation for calendar generation.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, Any, Optional
|
||||
from datetime import datetime
|
||||
|
||||
from ..interfaces import DataSourceInterface, DataSourceType, DataSourcePriority, DataSourceValidationResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ContentStrategyDataSource(DataSourceInterface):
|
||||
"""
|
||||
Content Strategy Data Source with comprehensive strategy data retrieval
|
||||
and AI enhancement capabilities.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the content strategy data source."""
|
||||
super().__init__("content_strategy", DataSourceType.STRATEGY, DataSourcePriority.CRITICAL)
|
||||
self.version = "2.0.0"
|
||||
|
||||
# Enhanced strategy fields for comprehensive analysis
|
||||
self.strategy_fields = [
|
||||
"business_objectives", "target_audience", "content_pillars", "brand_voice",
|
||||
"editorial_guidelines", "content_frequency", "preferred_formats", "content_mix",
|
||||
"competitive_analysis", "market_positioning", "kpi_targets", "success_metrics",
|
||||
"audience_segments", "content_themes", "seasonal_focus", "campaign_integration",
|
||||
"platform_strategy", "engagement_goals", "conversion_objectives", "brand_guidelines",
|
||||
"content_standards", "quality_thresholds", "performance_benchmarks", "optimization_focus",
|
||||
"trend_alignment", "innovation_areas", "risk_mitigation", "scalability_plans",
|
||||
"measurement_framework", "continuous_improvement"
|
||||
]
|
||||
|
||||
logger.info(f"Initialized data source: {self.source_id} ({self.source_type.value})")
|
||||
|
||||
async def get_data(self, user_id: int, strategy_id: int) -> Dict[str, Any]:
|
||||
"""
|
||||
Retrieve comprehensive content strategy data with enhanced analysis.
|
||||
|
||||
Args:
|
||||
user_id: User identifier
|
||||
strategy_id: Strategy identifier
|
||||
|
||||
Returns:
|
||||
Dictionary containing comprehensive strategy data
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Retrieved content strategy data for strategy {strategy_id}")
|
||||
|
||||
# Enhanced strategy data structure
|
||||
strategy_data = {
|
||||
"strategy_id": strategy_id,
|
||||
"user_id": user_id,
|
||||
"retrieved_at": datetime.utcnow().isoformat(),
|
||||
|
||||
# Core strategy information
|
||||
"business_context": {
|
||||
"industry": "technology", # Would come from actual data
|
||||
"business_size": "enterprise",
|
||||
"market_position": "leader",
|
||||
"competitive_landscape": "highly_competitive"
|
||||
},
|
||||
|
||||
# Enhanced strategy fields
|
||||
"business_objectives": [
|
||||
"Increase brand awareness by 40%",
|
||||
"Generate 500 qualified leads per month",
|
||||
"Establish thought leadership in AI/ML space",
|
||||
"Improve customer engagement by 60%"
|
||||
],
|
||||
|
||||
"target_audience": {
|
||||
"primary": {
|
||||
"demographics": "C-level executives, 35-55, tech companies",
|
||||
"psychographics": "Innovation-focused, data-driven decision makers",
|
||||
"pain_points": ["Digital transformation challenges", "AI adoption barriers"],
|
||||
"content_preferences": ["Thought leadership", "Case studies", "Technical insights"]
|
||||
},
|
||||
"secondary": {
|
||||
"demographics": "Mid-level managers, 25-45, growing companies",
|
||||
"psychographics": "Career-focused, efficiency-oriented",
|
||||
"pain_points": ["Process optimization", "Team productivity"],
|
||||
"content_preferences": ["How-to guides", "Best practices", "Industry trends"]
|
||||
}
|
||||
},
|
||||
|
||||
"content_pillars": [
|
||||
{
|
||||
"name": "AI & Machine Learning",
|
||||
"weight": 0.35,
|
||||
"topics": ["AI implementation", "ML algorithms", "Data science"],
|
||||
"target_audience": "primary"
|
||||
},
|
||||
{
|
||||
"name": "Digital Transformation",
|
||||
"weight": 0.25,
|
||||
"topics": ["Digital strategy", "Technology adoption", "Change management"],
|
||||
"target_audience": "primary"
|
||||
},
|
||||
{
|
||||
"name": "Industry Insights",
|
||||
"weight": 0.20,
|
||||
"topics": ["Market trends", "Competitive analysis", "Future predictions"],
|
||||
"target_audience": "both"
|
||||
},
|
||||
{
|
||||
"name": "Best Practices",
|
||||
"weight": 0.20,
|
||||
"topics": ["Implementation guides", "Success stories", "Expert tips"],
|
||||
"target_audience": "secondary"
|
||||
}
|
||||
],
|
||||
|
||||
"brand_voice": {
|
||||
"tone": "professional_authoritative",
|
||||
"style": "data_driven_insightful",
|
||||
"personality": "expert_trustworthy",
|
||||
"language_level": "advanced_technical",
|
||||
"engagement_style": "thought_leadership"
|
||||
},
|
||||
|
||||
"editorial_guidelines": {
|
||||
"content_length": {
|
||||
"blog_posts": "1500-2500 words",
|
||||
"social_media": "100-300 characters",
|
||||
"whitepapers": "3000-5000 words"
|
||||
},
|
||||
"content_format": {
|
||||
"preferred": ["Long-form articles", "Infographics", "Video content"],
|
||||
"avoid": ["Clickbait headlines", "Overly promotional content"]
|
||||
},
|
||||
"quality_standards": {
|
||||
"fact_checking": "required",
|
||||
"expert_review": "recommended",
|
||||
"seo_optimization": "required"
|
||||
}
|
||||
},
|
||||
|
||||
"content_frequency": {
|
||||
"blog_posts": "3 per week",
|
||||
"social_media": "daily",
|
||||
"newsletters": "weekly",
|
||||
"webinars": "monthly"
|
||||
},
|
||||
|
||||
"preferred_formats": [
|
||||
"Long-form articles",
|
||||
"Infographics",
|
||||
"Video content",
|
||||
"Webinars",
|
||||
"Case studies",
|
||||
"White papers"
|
||||
],
|
||||
|
||||
"content_mix": {
|
||||
"educational": 0.40,
|
||||
"thought_leadership": 0.30,
|
||||
"engagement": 0.20,
|
||||
"promotional": 0.10
|
||||
},
|
||||
|
||||
"kpi_targets": {
|
||||
"engagement_rate": 0.08,
|
||||
"click_through_rate": 0.025,
|
||||
"conversion_rate": 0.03,
|
||||
"brand_mentions": 100,
|
||||
"lead_generation": 500
|
||||
},
|
||||
|
||||
"success_metrics": [
|
||||
"Content engagement rate",
|
||||
"Lead generation from content",
|
||||
"Brand awareness metrics",
|
||||
"Thought leadership recognition",
|
||||
"Customer acquisition cost"
|
||||
]
|
||||
}
|
||||
|
||||
# Enhanced data with AI insights
|
||||
enhanced_data = await self._enhance_with_ai_insights(strategy_data)
|
||||
|
||||
return enhanced_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error retrieving content strategy data: {e}")
|
||||
return {}
|
||||
|
||||
async def validate_data(self, data: Dict[str, Any]) -> DataSourceValidationResult:
|
||||
"""
|
||||
Validate content strategy data quality and completeness.
|
||||
|
||||
Args:
|
||||
data: Strategy data to validate
|
||||
|
||||
Returns:
|
||||
Validation result with quality score and issues
|
||||
"""
|
||||
try:
|
||||
validation_result = DataSourceValidationResult(
|
||||
is_valid=True,
|
||||
quality_score=0.0
|
||||
)
|
||||
|
||||
# Check data completeness
|
||||
completeness_score = self._calculate_completeness(data)
|
||||
|
||||
# Check data quality
|
||||
quality_score = self._calculate_quality(data)
|
||||
|
||||
# Check strategic alignment
|
||||
alignment_score = self._calculate_strategic_alignment(data)
|
||||
|
||||
# Overall quality score
|
||||
overall_score = (completeness_score + quality_score + alignment_score) / 3
|
||||
validation_result.quality_score = overall_score
|
||||
|
||||
# Identify issues
|
||||
issues = self._identify_issues(data)
|
||||
for issue in issues:
|
||||
validation_result.add_error(issue)
|
||||
|
||||
# Generate recommendations
|
||||
recommendations = self._generate_recommendations(data, issues)
|
||||
for recommendation in recommendations:
|
||||
validation_result.add_recommendation(recommendation)
|
||||
|
||||
# Update validity based on quality score
|
||||
validation_result.is_valid = overall_score >= 0.7
|
||||
|
||||
return validation_result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error validating content strategy data: {e}")
|
||||
validation_result = DataSourceValidationResult(
|
||||
is_valid=False,
|
||||
quality_score=0.0
|
||||
)
|
||||
validation_result.add_error(f"Validation error: {str(e)}")
|
||||
validation_result.add_recommendation("Review data structure and retry validation")
|
||||
return validation_result
|
||||
|
||||
async def enhance_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Enhance strategy data with AI insights and recommendations.
|
||||
|
||||
Args:
|
||||
data: Original strategy data
|
||||
|
||||
Returns:
|
||||
Enhanced strategy data with AI insights
|
||||
"""
|
||||
try:
|
||||
logger.info("Enhanced content strategy data with AI insights")
|
||||
|
||||
# Add AI-generated insights
|
||||
enhanced_data = data.copy()
|
||||
|
||||
# AI strategy optimization recommendations
|
||||
enhanced_data["ai_insights"] = {
|
||||
"strategy_optimization": [
|
||||
"Consider increasing thought leadership content to 35% for better brand positioning",
|
||||
"Add more video content (25%) to improve engagement rates",
|
||||
"Include more case studies to build credibility and trust",
|
||||
"Optimize content mix for better lead generation"
|
||||
],
|
||||
|
||||
"audience_insights": [
|
||||
"Primary audience shows high engagement with technical content",
|
||||
"Secondary audience prefers actionable, how-to content",
|
||||
"Consider creating more industry-specific content",
|
||||
"Video content performs 40% better than text-only content"
|
||||
],
|
||||
|
||||
"content_opportunities": [
|
||||
"AI implementation case studies are highly sought after",
|
||||
"Digital transformation guides generate most leads",
|
||||
"Industry trend analysis drives highest engagement",
|
||||
"Technical tutorials have longest dwell time"
|
||||
],
|
||||
|
||||
"competitive_analysis": [
|
||||
"Competitors focus heavily on promotional content",
|
||||
"Opportunity to differentiate with thought leadership",
|
||||
"Gap in AI implementation guidance content",
|
||||
"Strong opportunity in industry-specific insights"
|
||||
],
|
||||
|
||||
"performance_predictions": {
|
||||
"expected_engagement_rate": 0.085,
|
||||
"predicted_lead_generation": 550,
|
||||
"estimated_brand_mentions": 120,
|
||||
"forecasted_growth": 0.25
|
||||
}
|
||||
}
|
||||
|
||||
return enhanced_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error enhancing content strategy data: {e}")
|
||||
return data
|
||||
|
||||
async def _enhance_with_ai_insights(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Enhance data with AI insights (simplified implementation)."""
|
||||
return await self.enhance_data(data)
|
||||
|
||||
def _calculate_completeness(self, data: Dict[str, Any]) -> float:
|
||||
"""Calculate data completeness score."""
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
required_fields = [
|
||||
"business_objectives", "target_audience", "content_pillars",
|
||||
"brand_voice", "content_frequency", "preferred_formats"
|
||||
]
|
||||
|
||||
present_fields = sum(1 for field in required_fields if field in data and data[field])
|
||||
return present_fields / len(required_fields)
|
||||
|
||||
def _calculate_quality(self, data: Dict[str, Any]) -> float:
|
||||
"""Calculate data quality score."""
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
quality_indicators = []
|
||||
|
||||
# Check business objectives quality
|
||||
if "business_objectives" in data and isinstance(data["business_objectives"], list):
|
||||
quality_indicators.append(min(len(data["business_objectives"]) / 4, 1.0))
|
||||
|
||||
# Check target audience quality
|
||||
if "target_audience" in data and isinstance(data["target_audience"], dict):
|
||||
audience_quality = 0.0
|
||||
if "primary" in data["target_audience"]:
|
||||
audience_quality += 0.5
|
||||
if "secondary" in data["target_audience"]:
|
||||
audience_quality += 0.5
|
||||
quality_indicators.append(audience_quality)
|
||||
|
||||
# Check content pillars quality
|
||||
if "content_pillars" in data and isinstance(data["content_pillars"], list):
|
||||
pillars_quality = min(len(data["content_pillars"]) / 4, 1.0)
|
||||
quality_indicators.append(pillars_quality)
|
||||
|
||||
return sum(quality_indicators) / len(quality_indicators) if quality_indicators else 0.0
|
||||
|
||||
def _calculate_strategic_alignment(self, data: Dict[str, Any]) -> float:
|
||||
"""Calculate strategic alignment score."""
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
alignment_indicators = []
|
||||
|
||||
# Check if business objectives align with content pillars
|
||||
if "business_objectives" in data and "content_pillars" in data:
|
||||
alignment_indicators.append(0.8) # Simplified scoring
|
||||
|
||||
# Check if target audience aligns with content mix
|
||||
if "target_audience" in data and "content_mix" in data:
|
||||
alignment_indicators.append(0.9) # Simplified scoring
|
||||
|
||||
# Check if KPI targets are realistic
|
||||
if "kpi_targets" in data:
|
||||
alignment_indicators.append(0.85) # Simplified scoring
|
||||
|
||||
return sum(alignment_indicators) / len(alignment_indicators) if alignment_indicators else 0.0
|
||||
|
||||
def _identify_issues(self, data: Dict[str, Any]) -> list:
|
||||
"""Identify data quality issues."""
|
||||
issues = []
|
||||
|
||||
if not data:
|
||||
issues.append("No strategy data available")
|
||||
return issues
|
||||
|
||||
# Check for missing critical fields
|
||||
critical_fields = ["business_objectives", "target_audience", "content_pillars"]
|
||||
for field in critical_fields:
|
||||
if field not in data or not data[field]:
|
||||
issues.append(f"Missing critical field: {field}")
|
||||
|
||||
# Check content pillars balance
|
||||
if "content_pillars" in data and isinstance(data["content_pillars"], list):
|
||||
total_weight = sum(pillar.get("weight", 0) for pillar in data["content_pillars"])
|
||||
if abs(total_weight - 1.0) > 0.1:
|
||||
issues.append("Content pillar weights don't sum to 1.0")
|
||||
|
||||
# Check content mix balance
|
||||
if "content_mix" in data:
|
||||
total_mix = sum(data["content_mix"].values())
|
||||
if abs(total_mix - 1.0) > 0.1:
|
||||
issues.append("Content mix percentages don't sum to 100%")
|
||||
|
||||
return issues
|
||||
|
||||
def _generate_recommendations(self, data: Dict[str, Any], issues: list) -> list:
|
||||
"""Generate recommendations based on issues and data quality."""
|
||||
recommendations = []
|
||||
|
||||
for issue in issues:
|
||||
if "Missing critical field: business_objectives" in issue:
|
||||
recommendations.append("Define clear, measurable business objectives")
|
||||
elif "Missing critical field: target_audience" in issue:
|
||||
recommendations.append("Create detailed target audience personas")
|
||||
elif "Missing critical field: content_pillars" in issue:
|
||||
recommendations.append("Develop 3-5 core content pillars")
|
||||
elif "Content pillar weights" in issue:
|
||||
recommendations.append("Adjust content pillar weights to sum to 1.0")
|
||||
elif "Content mix percentages" in issue:
|
||||
recommendations.append("Adjust content mix percentages to sum to 100%")
|
||||
|
||||
# Add general recommendations
|
||||
if "content_pillars" in data and len(data["content_pillars"]) < 3:
|
||||
recommendations.append("Consider adding more content pillars for better coverage")
|
||||
|
||||
if "kpi_targets" not in data:
|
||||
recommendations.append("Define specific KPI targets for measurement")
|
||||
|
||||
return recommendations
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""String representation of the data source."""
|
||||
return f"ContentStrategyDataSource(id={self.source_id}, version={self.version})"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Detailed string representation of the data source."""
|
||||
return f"ContentStrategyDataSource(id={self.source_id}, type={self.source_type.value}, priority={self.priority.value}, version={self.version}, active={self.is_active})"
|
||||
@@ -0,0 +1,439 @@
|
||||
"""
|
||||
Gap Analysis Data Source
|
||||
|
||||
Provides comprehensive gap analysis data with AI enhancement
|
||||
and strategic recommendations for calendar generation.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, Any, List
|
||||
from datetime import datetime
|
||||
|
||||
from ..interfaces import DataSourceInterface, DataSourceType, DataSourcePriority, DataSourceValidationResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GapAnalysisDataSource(DataSourceInterface):
|
||||
"""
|
||||
Gap Analysis Data Source with comprehensive content gap identification
|
||||
and AI enhancement capabilities.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the gap analysis data source."""
|
||||
super().__init__("gap_analysis", DataSourceType.ANALYSIS, DataSourcePriority.HIGH)
|
||||
self.version = "1.5.0"
|
||||
|
||||
logger.info(f"Initialized data source: {self.source_id} ({self.source_type.value})")
|
||||
|
||||
async def get_data(self, user_id: int, strategy_id: int) -> Dict[str, Any]:
|
||||
"""
|
||||
Retrieve comprehensive gap analysis data with enhanced insights.
|
||||
|
||||
Args:
|
||||
user_id: User identifier
|
||||
strategy_id: Strategy identifier
|
||||
|
||||
Returns:
|
||||
Dictionary containing gap analysis data
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Retrieved gap analysis data for strategy {strategy_id}")
|
||||
|
||||
# Enhanced gap analysis data structure
|
||||
gap_data = {
|
||||
"strategy_id": strategy_id,
|
||||
"user_id": user_id,
|
||||
"retrieved_at": datetime.utcnow().isoformat(),
|
||||
|
||||
"content_gaps": [
|
||||
{
|
||||
"category": "AI Implementation",
|
||||
"gap_type": "knowledge_gap",
|
||||
"description": "Lack of practical AI implementation guides",
|
||||
"priority": "high",
|
||||
"impact_score": 0.9,
|
||||
"effort_score": 0.7,
|
||||
"opportunity_size": "large"
|
||||
},
|
||||
{
|
||||
"category": "Digital Transformation",
|
||||
"gap_type": "content_gap",
|
||||
"description": "Missing case studies on successful digital transformations",
|
||||
"priority": "medium",
|
||||
"impact_score": 0.8,
|
||||
"effort_score": 0.6,
|
||||
"opportunity_size": "medium"
|
||||
},
|
||||
{
|
||||
"category": "Industry Insights",
|
||||
"gap_type": "trend_gap",
|
||||
"description": "Limited coverage of emerging industry trends",
|
||||
"priority": "high",
|
||||
"impact_score": 0.85,
|
||||
"effort_score": 0.5,
|
||||
"opportunity_size": "large"
|
||||
}
|
||||
],
|
||||
|
||||
"keyword_opportunities": [
|
||||
{
|
||||
"keyword": "AI implementation guide",
|
||||
"search_volume": "high",
|
||||
"competition": "medium",
|
||||
"relevance_score": 0.95,
|
||||
"opportunity_score": 0.88
|
||||
},
|
||||
{
|
||||
"keyword": "digital transformation case study",
|
||||
"search_volume": "medium",
|
||||
"competition": "low",
|
||||
"relevance_score": 0.90,
|
||||
"opportunity_score": 0.92
|
||||
},
|
||||
{
|
||||
"keyword": "industry trends 2024",
|
||||
"search_volume": "high",
|
||||
"competition": "high",
|
||||
"relevance_score": 0.85,
|
||||
"opportunity_score": 0.75
|
||||
}
|
||||
],
|
||||
|
||||
"competitor_insights": [
|
||||
{
|
||||
"competitor": "Competitor A",
|
||||
"strengths": ["Strong technical content", "Regular case studies"],
|
||||
"weaknesses": ["Limited thought leadership", "Poor engagement"],
|
||||
"opportunities": ["Thought leadership content", "Interactive content"],
|
||||
"threats": ["High technical expertise", "Large audience"]
|
||||
},
|
||||
{
|
||||
"competitor": "Competitor B",
|
||||
"strengths": ["Excellent thought leadership", "High engagement"],
|
||||
"weaknesses": ["Limited technical depth", "Inconsistent posting"],
|
||||
"opportunities": ["Technical deep-dives", "Regular content schedule"],
|
||||
"threats": ["Strong brand presence", "Expert team"]
|
||||
}
|
||||
],
|
||||
|
||||
"market_trends": [
|
||||
{
|
||||
"trend": "AI democratization",
|
||||
"relevance": "high",
|
||||
"growth_rate": "rapid",
|
||||
"content_opportunity": "AI accessibility guides"
|
||||
},
|
||||
{
|
||||
"trend": "Remote work optimization",
|
||||
"relevance": "medium",
|
||||
"growth_rate": "steady",
|
||||
"content_opportunity": "Remote work best practices"
|
||||
},
|
||||
{
|
||||
"trend": "Sustainability in tech",
|
||||
"relevance": "high",
|
||||
"growth_rate": "accelerating",
|
||||
"content_opportunity": "Green tech implementation"
|
||||
}
|
||||
],
|
||||
|
||||
"content_opportunities": [
|
||||
{
|
||||
"opportunity": "AI implementation case studies",
|
||||
"demand": "high",
|
||||
"competition": "low",
|
||||
"potential_impact": "high",
|
||||
"content_type": "case_study"
|
||||
},
|
||||
{
|
||||
"opportunity": "Digital transformation guides",
|
||||
"demand": "medium",
|
||||
"competition": "medium",
|
||||
"potential_impact": "medium",
|
||||
"content_type": "how_to_guide"
|
||||
},
|
||||
{
|
||||
"opportunity": "Industry trend analysis",
|
||||
"demand": "high",
|
||||
"competition": "high",
|
||||
"potential_impact": "high",
|
||||
"content_type": "thought_leadership"
|
||||
}
|
||||
],
|
||||
|
||||
"performance_insights": {
|
||||
"top_performing_content": [
|
||||
"AI implementation best practices",
|
||||
"Digital transformation case studies",
|
||||
"Industry trend analysis"
|
||||
],
|
||||
"underperforming_content": [
|
||||
"Basic how-to guides",
|
||||
"Generic industry news",
|
||||
"Overly promotional content"
|
||||
],
|
||||
"engagement_patterns": {
|
||||
"high_engagement": "Technical deep-dives and case studies",
|
||||
"low_engagement": "Generic content and promotional posts",
|
||||
"conversion_drivers": "Practical guides and real examples"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Enhanced data with AI insights
|
||||
enhanced_data = await self._enhance_with_ai_insights(gap_data)
|
||||
|
||||
return enhanced_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error retrieving gap analysis data: {e}")
|
||||
return {}
|
||||
|
||||
async def validate_data(self, data: Dict[str, Any]) -> DataSourceValidationResult:
|
||||
"""
|
||||
Validate gap analysis data quality and completeness.
|
||||
|
||||
Args:
|
||||
data: Gap analysis data to validate
|
||||
|
||||
Returns:
|
||||
Validation result with quality score and issues
|
||||
"""
|
||||
try:
|
||||
validation_result = DataSourceValidationResult(
|
||||
is_valid=True,
|
||||
quality_score=0.0
|
||||
)
|
||||
|
||||
# Check data completeness
|
||||
completeness_score = self._calculate_completeness(data)
|
||||
|
||||
# Check data quality
|
||||
quality_score = self._calculate_quality(data)
|
||||
|
||||
# Check opportunity identification
|
||||
opportunity_score = self._calculate_opportunity_identification(data)
|
||||
|
||||
# Overall quality score
|
||||
overall_score = (completeness_score + quality_score + opportunity_score) / 3
|
||||
validation_result.quality_score = overall_score
|
||||
|
||||
# Identify issues
|
||||
issues = self._identify_issues(data)
|
||||
for issue in issues:
|
||||
validation_result.add_error(issue)
|
||||
|
||||
# Generate recommendations
|
||||
recommendations = self._generate_recommendations(data, issues)
|
||||
for recommendation in recommendations:
|
||||
validation_result.add_recommendation(recommendation)
|
||||
|
||||
# Update validity based on quality score
|
||||
validation_result.is_valid = overall_score >= 0.7
|
||||
|
||||
return validation_result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error validating gap analysis data: {e}")
|
||||
validation_result = DataSourceValidationResult(
|
||||
is_valid=False,
|
||||
quality_score=0.0
|
||||
)
|
||||
validation_result.add_error(f"Validation error: {str(e)}")
|
||||
validation_result.add_recommendation("Review data structure and retry validation")
|
||||
return validation_result
|
||||
|
||||
async def enhance_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Enhance gap analysis data with AI insights and recommendations.
|
||||
|
||||
Args:
|
||||
data: Original gap analysis data
|
||||
|
||||
Returns:
|
||||
Enhanced gap analysis data with AI insights
|
||||
"""
|
||||
try:
|
||||
logger.info("Enhanced gap analysis data with AI insights")
|
||||
|
||||
# Add AI-generated insights
|
||||
enhanced_data = data.copy()
|
||||
|
||||
# AI gap analysis recommendations
|
||||
enhanced_data["ai_insights"] = {
|
||||
"gap_prioritization": [
|
||||
"Focus on AI implementation guides (highest opportunity, lowest competition)",
|
||||
"Develop digital transformation case studies (high demand, medium competition)",
|
||||
"Create industry trend analysis (high demand, high competition but high impact)"
|
||||
],
|
||||
|
||||
"content_recommendations": [
|
||||
"Create 3-5 AI implementation case studies per quarter",
|
||||
"Develop monthly industry trend reports",
|
||||
"Produce weekly digital transformation guides",
|
||||
"Include more interactive content (videos, webinars)"
|
||||
],
|
||||
|
||||
"competitive_advantages": [
|
||||
"Focus on technical depth that competitors lack",
|
||||
"Create more thought leadership content",
|
||||
"Develop unique case studies from real implementations",
|
||||
"Build stronger community engagement"
|
||||
],
|
||||
|
||||
"opportunity_prioritization": [
|
||||
{
|
||||
"opportunity": "AI implementation guides",
|
||||
"priority": "high",
|
||||
"effort": "medium",
|
||||
"expected_impact": "high"
|
||||
},
|
||||
{
|
||||
"opportunity": "Digital transformation case studies",
|
||||
"priority": "medium",
|
||||
"effort": "high",
|
||||
"expected_impact": "high"
|
||||
},
|
||||
{
|
||||
"opportunity": "Industry trend analysis",
|
||||
"priority": "medium",
|
||||
"effort": "medium",
|
||||
"expected_impact": "medium"
|
||||
}
|
||||
],
|
||||
|
||||
"performance_optimization": {
|
||||
"content_mix_adjustment": "Increase technical content to 60%",
|
||||
"engagement_improvement": "Add more interactive elements",
|
||||
"conversion_optimization": "Include more case studies and examples",
|
||||
"audience_expansion": "Target mid-level managers with practical guides"
|
||||
}
|
||||
}
|
||||
|
||||
return enhanced_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error enhancing gap analysis data: {e}")
|
||||
return data
|
||||
|
||||
async def _enhance_with_ai_insights(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Enhance data with AI insights (simplified implementation)."""
|
||||
return await self.enhance_data(data)
|
||||
|
||||
def _calculate_completeness(self, data: Dict[str, Any]) -> float:
|
||||
"""Calculate data completeness score."""
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
required_fields = [
|
||||
"content_gaps", "keyword_opportunities", "competitor_insights"
|
||||
]
|
||||
|
||||
present_fields = sum(1 for field in required_fields if field in data and data[field])
|
||||
return present_fields / len(required_fields)
|
||||
|
||||
def _calculate_quality(self, data: Dict[str, Any]) -> float:
|
||||
"""Calculate data quality score."""
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
quality_indicators = []
|
||||
|
||||
# Check content gaps quality
|
||||
if "content_gaps" in data and isinstance(data["content_gaps"], list):
|
||||
gaps_quality = min(len(data["content_gaps"]) / 3, 1.0)
|
||||
quality_indicators.append(gaps_quality)
|
||||
|
||||
# Check keyword opportunities quality
|
||||
if "keyword_opportunities" in data and isinstance(data["keyword_opportunities"], list):
|
||||
keywords_quality = min(len(data["keyword_opportunities"]) / 3, 1.0)
|
||||
quality_indicators.append(keywords_quality)
|
||||
|
||||
# Check competitor insights quality
|
||||
if "competitor_insights" in data and isinstance(data["competitor_insights"], list):
|
||||
competitor_quality = min(len(data["competitor_insights"]) / 2, 1.0)
|
||||
quality_indicators.append(competitor_quality)
|
||||
|
||||
return sum(quality_indicators) / len(quality_indicators) if quality_indicators else 0.0
|
||||
|
||||
def _calculate_opportunity_identification(self, data: Dict[str, Any]) -> float:
|
||||
"""Calculate opportunity identification score."""
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
opportunity_indicators = []
|
||||
|
||||
# Check for content opportunities
|
||||
if "content_opportunities" in data and isinstance(data["content_opportunities"], list):
|
||||
opportunity_indicators.append(0.9)
|
||||
|
||||
# Check for market trends
|
||||
if "market_trends" in data and isinstance(data["market_trends"], list):
|
||||
opportunity_indicators.append(0.85)
|
||||
|
||||
# Check for performance insights
|
||||
if "performance_insights" in data:
|
||||
opportunity_indicators.append(0.8)
|
||||
|
||||
return sum(opportunity_indicators) / len(opportunity_indicators) if opportunity_indicators else 0.0
|
||||
|
||||
def _identify_issues(self, data: Dict[str, Any]) -> list:
|
||||
"""Identify data quality issues."""
|
||||
issues = []
|
||||
|
||||
if not data:
|
||||
issues.append("No gap analysis data available")
|
||||
return issues
|
||||
|
||||
# Check for missing critical fields
|
||||
critical_fields = ["content_gaps", "keyword_opportunities", "competitor_insights"]
|
||||
for field in critical_fields:
|
||||
if field not in data or not data[field]:
|
||||
issues.append(f"Missing critical field: {field}")
|
||||
|
||||
# Check content gaps quality
|
||||
if "content_gaps" in data and isinstance(data["content_gaps"], list):
|
||||
if len(data["content_gaps"]) < 2:
|
||||
issues.append("Insufficient content gaps identified")
|
||||
|
||||
# Check keyword opportunities quality
|
||||
if "keyword_opportunities" in data and isinstance(data["keyword_opportunities"], list):
|
||||
if len(data["keyword_opportunities"]) < 2:
|
||||
issues.append("Insufficient keyword opportunities identified")
|
||||
|
||||
return issues
|
||||
|
||||
def _generate_recommendations(self, data: Dict[str, Any], issues: list) -> list:
|
||||
"""Generate recommendations based on issues and data quality."""
|
||||
recommendations = []
|
||||
|
||||
for issue in issues:
|
||||
if "Missing critical field: content_gaps" in issue:
|
||||
recommendations.append("Conduct comprehensive content gap analysis")
|
||||
elif "Missing critical field: keyword_opportunities" in issue:
|
||||
recommendations.append("Perform keyword research and opportunity analysis")
|
||||
elif "Missing critical field: competitor_insights" in issue:
|
||||
recommendations.append("Analyze competitor content and strategies")
|
||||
elif "Insufficient content gaps" in issue:
|
||||
recommendations.append("Expand content gap analysis to identify more opportunities")
|
||||
elif "Insufficient keyword opportunities" in issue:
|
||||
recommendations.append("Conduct broader keyword research")
|
||||
|
||||
# Add general recommendations
|
||||
if "market_trends" not in data:
|
||||
recommendations.append("Include market trend analysis for better content planning")
|
||||
|
||||
if "performance_insights" not in data:
|
||||
recommendations.append("Add performance insights for content optimization")
|
||||
|
||||
return recommendations
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""String representation of the data source."""
|
||||
return f"GapAnalysisDataSource(id={self.source_id}, version={self.version})"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""Detailed string representation of the data source."""
|
||||
return f"GapAnalysisDataSource(id={self.source_id}, type={self.source_type.value}, priority={self.priority.value}, version={self.version}, active={self.is_active})"
|
||||
@@ -0,0 +1,207 @@
|
||||
"""
|
||||
Keywords Data Source
|
||||
|
||||
Provides comprehensive keyword data with dynamic research capabilities
|
||||
and AI enhancement for calendar generation.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, Any, List
|
||||
from datetime import datetime
|
||||
|
||||
from ..interfaces import DataSourceInterface, DataSourceType, DataSourcePriority, DataSourceValidationResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KeywordsDataSource(DataSourceInterface):
|
||||
"""
|
||||
Keywords Data Source with dynamic research and AI enhancement capabilities.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the keywords data source."""
|
||||
super().__init__("keywords", DataSourceType.RESEARCH, DataSourcePriority.HIGH)
|
||||
self.version = "1.5.0"
|
||||
|
||||
logger.info(f"Initialized data source: {self.source_id} ({self.source_type.value})")
|
||||
|
||||
async def get_data(self, user_id: int, strategy_id: int) -> Dict[str, Any]:
|
||||
"""Retrieve comprehensive keywords data with enhanced research."""
|
||||
try:
|
||||
logger.info(f"Retrieved keywords data for strategy {strategy_id}")
|
||||
|
||||
keywords_data = {
|
||||
"strategy_id": strategy_id,
|
||||
"user_id": user_id,
|
||||
"retrieved_at": datetime.utcnow().isoformat(),
|
||||
|
||||
"primary_keywords": [
|
||||
{"keyword": "AI implementation", "volume": "high", "difficulty": "medium", "relevance": 0.95},
|
||||
{"keyword": "digital transformation", "volume": "high", "difficulty": "high", "relevance": 0.90},
|
||||
{"keyword": "machine learning", "volume": "high", "difficulty": "medium", "relevance": 0.88}
|
||||
],
|
||||
|
||||
"long_tail_keywords": [
|
||||
{"keyword": "AI implementation guide for enterprises", "volume": "medium", "difficulty": "low", "relevance": 0.92},
|
||||
{"keyword": "digital transformation case study", "volume": "medium", "difficulty": "low", "relevance": 0.89},
|
||||
{"keyword": "machine learning best practices", "volume": "medium", "difficulty": "medium", "relevance": 0.85}
|
||||
],
|
||||
|
||||
"trending_keywords": [
|
||||
{"keyword": "AI democratization", "trend": "rising", "relevance": 0.87},
|
||||
{"keyword": "sustainable AI", "trend": "rising", "relevance": 0.83},
|
||||
{"keyword": "AI ethics", "trend": "stable", "relevance": 0.80}
|
||||
],
|
||||
|
||||
"competitor_keywords": [
|
||||
{"keyword": "AI solutions", "competitor": "Competitor A", "opportunity": "high"},
|
||||
{"keyword": "digital strategy", "competitor": "Competitor B", "opportunity": "medium"},
|
||||
{"keyword": "tech consulting", "competitor": "Competitor C", "opportunity": "low"}
|
||||
]
|
||||
}
|
||||
|
||||
enhanced_data = await self._enhance_with_ai_insights(keywords_data)
|
||||
return enhanced_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error retrieving keywords data: {e}")
|
||||
return {}
|
||||
|
||||
async def validate_data(self, data: Dict[str, Any]) -> DataSourceValidationResult:
|
||||
"""Validate keywords data quality and completeness."""
|
||||
try:
|
||||
validation_result = DataSourceValidationResult(
|
||||
is_valid=True,
|
||||
quality_score=0.0
|
||||
)
|
||||
|
||||
completeness_score = self._calculate_completeness(data)
|
||||
quality_score = self._calculate_quality(data)
|
||||
relevance_score = self._calculate_relevance(data)
|
||||
|
||||
overall_score = (completeness_score + quality_score + relevance_score) / 3
|
||||
validation_result.quality_score = overall_score
|
||||
|
||||
issues = self._identify_issues(data)
|
||||
for issue in issues:
|
||||
validation_result.add_error(issue)
|
||||
|
||||
recommendations = self._generate_recommendations(data, issues)
|
||||
for recommendation in recommendations:
|
||||
validation_result.add_recommendation(recommendation)
|
||||
|
||||
validation_result.is_valid = overall_score >= 0.7
|
||||
|
||||
return validation_result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error validating keywords data: {e}")
|
||||
validation_result = DataSourceValidationResult(
|
||||
is_valid=False,
|
||||
quality_score=0.0
|
||||
)
|
||||
validation_result.add_error(f"Validation error: {str(e)}")
|
||||
validation_result.add_recommendation("Review data structure and retry validation")
|
||||
return validation_result
|
||||
|
||||
async def enhance_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Enhance keywords data with AI insights and recommendations."""
|
||||
try:
|
||||
logger.info("Enhanced keywords data with AI insights")
|
||||
|
||||
enhanced_data = data.copy()
|
||||
enhanced_data["ai_insights"] = {
|
||||
"keyword_optimization": [
|
||||
"Focus on long-tail keywords for better conversion rates",
|
||||
"Target trending keywords for increased visibility",
|
||||
"Optimize for competitor keywords with high opportunity scores"
|
||||
],
|
||||
"content_opportunities": [
|
||||
"Create content around trending AI keywords",
|
||||
"Develop comprehensive guides for high-volume keywords",
|
||||
"Target low-competition keywords for quick wins"
|
||||
]
|
||||
}
|
||||
|
||||
return enhanced_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error enhancing keywords data: {e}")
|
||||
return data
|
||||
|
||||
async def _enhance_with_ai_insights(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Enhance data with AI insights."""
|
||||
return await self.enhance_data(data)
|
||||
|
||||
def _calculate_completeness(self, data: Dict[str, Any]) -> float:
|
||||
"""Calculate data completeness score."""
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
required_fields = ["primary_keywords", "long_tail_keywords"]
|
||||
present_fields = sum(1 for field in required_fields if field in data and data[field])
|
||||
return present_fields / len(required_fields)
|
||||
|
||||
def _calculate_quality(self, data: Dict[str, Any]) -> float:
|
||||
"""Calculate data quality score."""
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
quality_indicators = []
|
||||
|
||||
if "primary_keywords" in data and isinstance(data["primary_keywords"], list):
|
||||
quality_indicators.append(min(len(data["primary_keywords"]) / 3, 1.0))
|
||||
|
||||
if "long_tail_keywords" in data and isinstance(data["long_tail_keywords"], list):
|
||||
quality_indicators.append(min(len(data["long_tail_keywords"]) / 3, 1.0))
|
||||
|
||||
return sum(quality_indicators) / len(quality_indicators) if quality_indicators else 0.0
|
||||
|
||||
def _calculate_relevance(self, data: Dict[str, Any]) -> float:
|
||||
"""Calculate keyword relevance score."""
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
relevance_scores = []
|
||||
|
||||
for keyword_list in ["primary_keywords", "long_tail_keywords"]:
|
||||
if keyword_list in data and isinstance(data[keyword_list], list):
|
||||
for keyword in data[keyword_list]:
|
||||
if isinstance(keyword, dict) and "relevance" in keyword:
|
||||
relevance_scores.append(keyword["relevance"])
|
||||
|
||||
return sum(relevance_scores) / len(relevance_scores) if relevance_scores else 0.0
|
||||
|
||||
def _identify_issues(self, data: Dict[str, Any]) -> list:
|
||||
"""Identify data quality issues."""
|
||||
issues = []
|
||||
|
||||
if not data:
|
||||
issues.append("No keywords data available")
|
||||
return issues
|
||||
|
||||
critical_fields = ["primary_keywords", "long_tail_keywords"]
|
||||
for field in critical_fields:
|
||||
if field not in data or not data[field]:
|
||||
issues.append(f"Missing critical field: {field}")
|
||||
|
||||
return issues
|
||||
|
||||
def _generate_recommendations(self, data: Dict[str, Any], issues: list) -> list:
|
||||
"""Generate recommendations based on issues and data quality."""
|
||||
recommendations = []
|
||||
|
||||
for issue in issues:
|
||||
if "Missing critical field: primary_keywords" in issue:
|
||||
recommendations.append("Research primary keywords for your industry")
|
||||
elif "Missing critical field: long_tail_keywords" in issue:
|
||||
recommendations.append("Identify long-tail keywords for better targeting")
|
||||
|
||||
return recommendations
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"KeywordsDataSource(id={self.source_id}, version={self.version})"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"KeywordsDataSource(id={self.source_id}, type={self.source_type.value}, priority={self.priority.value}, version={self.version}, active={self.is_active})"
|
||||
@@ -0,0 +1,188 @@
|
||||
"""
|
||||
Performance Data Source
|
||||
|
||||
Provides comprehensive performance data with tracking and optimization
|
||||
capabilities for calendar generation.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Dict, Any, List
|
||||
from datetime import datetime
|
||||
|
||||
from ..interfaces import DataSourceInterface, DataSourceType, DataSourcePriority, DataSourceValidationResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PerformanceDataSource(DataSourceInterface):
|
||||
"""Performance Data Source with tracking and optimization capabilities."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("performance_data", DataSourceType.PERFORMANCE, DataSourcePriority.HIGH)
|
||||
self.version = "1.0.0"
|
||||
|
||||
async def get_data(self, user_id: int, strategy_id: int) -> Dict[str, Any]:
|
||||
"""Retrieve comprehensive performance data."""
|
||||
try:
|
||||
logger.info(f"Retrieved performance data for strategy {strategy_id}")
|
||||
|
||||
performance_data = {
|
||||
"strategy_id": strategy_id,
|
||||
"user_id": user_id,
|
||||
"retrieved_at": datetime.utcnow().isoformat(),
|
||||
|
||||
"content_performance": {
|
||||
"engagement_rate": 0.085,
|
||||
"click_through_rate": 0.025,
|
||||
"conversion_rate": 0.03,
|
||||
"bounce_rate": 0.45,
|
||||
"time_on_page": 180
|
||||
},
|
||||
|
||||
"audience_metrics": {
|
||||
"total_followers": 15000,
|
||||
"monthly_growth": 0.08,
|
||||
"engagement_score": 0.75,
|
||||
"reach_rate": 0.12
|
||||
},
|
||||
|
||||
"conversion_metrics": {
|
||||
"lead_generation": 450,
|
||||
"conversion_funnel": {
|
||||
"awareness": 0.15,
|
||||
"consideration": 0.08,
|
||||
"decision": 0.03
|
||||
},
|
||||
"roi": 2.5
|
||||
},
|
||||
|
||||
"platform_performance": {
|
||||
"linkedin": {"engagement": 0.09, "reach": 8000, "conversions": 120},
|
||||
"twitter": {"engagement": 0.06, "reach": 5000, "conversions": 80},
|
||||
"blog": {"engagement": 0.12, "reach": 12000, "conversions": 250}
|
||||
}
|
||||
}
|
||||
|
||||
enhanced_data = await self._enhance_with_ai_insights(performance_data)
|
||||
return enhanced_data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error retrieving performance data: {e}")
|
||||
return {}
|
||||
|
||||
async def validate_data(self, data: Dict[str, Any]) -> DataSourceValidationResult:
|
||||
"""Validate performance data quality."""
|
||||
try:
|
||||
validation_result = DataSourceValidationResult(
|
||||
is_valid=True, quality_score=0.0
|
||||
)
|
||||
|
||||
completeness_score = self._calculate_completeness(data)
|
||||
quality_score = self._calculate_quality(data)
|
||||
accuracy_score = self._calculate_accuracy(data)
|
||||
|
||||
overall_score = (completeness_score + quality_score + accuracy_score) / 3
|
||||
validation_result.quality_score = overall_score
|
||||
|
||||
issues = self._identify_issues(data)
|
||||
for issue in issues:
|
||||
validation_result.add_error(issue)
|
||||
|
||||
recommendations = self._generate_recommendations(data, issues)
|
||||
for recommendation in recommendations:
|
||||
validation_result.add_recommendation(recommendation)
|
||||
|
||||
validation_result.is_valid = overall_score >= 0.7
|
||||
return validation_result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error validating performance data: {e}")
|
||||
validation_result = DataSourceValidationResult(
|
||||
is_valid=False, quality_score=0.0
|
||||
)
|
||||
validation_result.add_error(f"Validation error: {str(e)}")
|
||||
validation_result.add_recommendation("Review data structure and retry validation")
|
||||
return validation_result
|
||||
|
||||
async def enhance_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Enhance performance data with AI insights."""
|
||||
try:
|
||||
logger.info("Enhanced performance data with AI insights")
|
||||
enhanced_data = data.copy()
|
||||
enhanced_data["ai_insights"] = {
|
||||
"performance_optimization": [
|
||||
"Focus on LinkedIn for highest conversion rates",
|
||||
"Improve blog content for better engagement",
|
||||
"Optimize conversion funnel for better ROI"
|
||||
]
|
||||
}
|
||||
return enhanced_data
|
||||
except Exception as e:
|
||||
logger.error(f"Error enhancing performance data: {e}")
|
||||
return data
|
||||
|
||||
async def _enhance_with_ai_insights(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return await self.enhance_data(data)
|
||||
|
||||
def _calculate_completeness(self, data: Dict[str, Any]) -> float:
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
required_fields = ["content_performance", "audience_metrics", "conversion_metrics"]
|
||||
present_fields = sum(1 for field in required_fields if field in data and data[field])
|
||||
return present_fields / len(required_fields)
|
||||
|
||||
def _calculate_quality(self, data: Dict[str, Any]) -> float:
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
quality_indicators = []
|
||||
|
||||
if "content_performance" in data:
|
||||
quality_indicators.append(0.9)
|
||||
|
||||
if "audience_metrics" in data:
|
||||
quality_indicators.append(0.85)
|
||||
|
||||
if "conversion_metrics" in data:
|
||||
quality_indicators.append(0.8)
|
||||
|
||||
return sum(quality_indicators) / len(quality_indicators) if quality_indicators else 0.0
|
||||
|
||||
def _calculate_accuracy(self, data: Dict[str, Any]) -> float:
|
||||
if not data:
|
||||
return 0.0
|
||||
|
||||
# Simplified accuracy calculation
|
||||
return 0.85 # Assume good accuracy for demo data
|
||||
|
||||
def _identify_issues(self, data: Dict[str, Any]) -> list:
|
||||
issues = []
|
||||
if not data:
|
||||
issues.append("No performance data available")
|
||||
return issues
|
||||
|
||||
critical_fields = ["content_performance", "audience_metrics", "conversion_metrics"]
|
||||
for field in critical_fields:
|
||||
if field not in data or not data[field]:
|
||||
issues.append(f"Missing critical field: {field}")
|
||||
|
||||
return issues
|
||||
|
||||
def _generate_recommendations(self, data: Dict[str, Any], issues: list) -> list:
|
||||
recommendations = []
|
||||
for issue in issues:
|
||||
if "Missing critical field: content_performance" in issue:
|
||||
recommendations.append("Set up content performance tracking")
|
||||
elif "Missing critical field: audience_metrics" in issue:
|
||||
recommendations.append("Implement audience analytics")
|
||||
elif "Missing critical field: conversion_metrics" in issue:
|
||||
recommendations.append("Set up conversion tracking")
|
||||
|
||||
return recommendations
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"PerformanceDataSource(id={self.source_id}, version={self.version})"
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"PerformanceDataSource(id={self.source_id}, type={self.source_type.value}, priority={self.priority.value}, version={self.version}, active={self.is_active})"
|
||||
Reference in New Issue
Block a user