ALwrity version 0.5.4

This commit is contained in:
ajaysi
2025-08-09 23:14:16 +05:30
parent 01fe1e0a9c
commit 5c08b6e007
42 changed files with 3514 additions and 2148 deletions

View File

@@ -0,0 +1,4 @@
# Dedicated auto-fill package for Content Strategy Builder inputs
# Exposes AutoFillService for orchestrating onboarding data → normalized → transformed → frontend fields
from .autofill_service import AutoFillService

View File

@@ -0,0 +1,141 @@
from typing import Any, Dict, Optional
from sqlalchemy.orm import Session
import logging
import traceback
from .autofill_service import AutoFillService
from ...ai_analytics_service import ContentPlanningAIAnalyticsService
from .ai_structured_autofill import AIStructuredAutofillService
logger = logging.getLogger(__name__)
class AutoFillRefreshService:
"""Generates a fresh auto-fill payload for the Strategy Builder.
This service does NOT persist anything. Intended for refresh flows.
"""
def __init__(self, db: Session):
self.db = db
self.autofill = AutoFillService(db)
self.ai_analytics = ContentPlanningAIAnalyticsService()
self.structured_ai = AIStructuredAutofillService()
async def build_fresh_payload(self, user_id: int, use_ai: bool = True, ai_only: bool = False) -> Dict[str, Any]:
"""Build a fresh auto-fill payload.
- Reads latest onboarding-integrated data
- Optionally augments with AI overrides (hook, not persisted)
- Returns payload in the same shape as AutoFillService.get_autofill, plus meta
"""
# Base context from onboarding analysis (used for AI context only when ai_only)
logger.debug("AutoFillRefreshService: processing onboarding context | user=%s", user_id)
base_context = await self.autofill.integration.process_onboarding_data(user_id, self.db)
logger.debug(
"AutoFillRefreshService: context keys=%s | website=%s research=%s api=%s session=%s",
list(base_context.keys()) if isinstance(base_context, dict) else 'n/a',
bool((base_context or {}).get('website_analysis')),
bool((base_context or {}).get('research_preferences')),
bool((base_context or {}).get('api_keys_data')),
bool((base_context or {}).get('onboarding_session')),
)
try:
w = (base_context or {}).get('website_analysis') or {}
r = (base_context or {}).get('research_preferences') or {}
logger.debug("AutoFillRefreshService: website keys=%s | research keys=%s", len(list(w.keys())) if hasattr(w,'keys') else 0, len(list(r.keys())) if hasattr(r,'keys') else 0)
except Exception:
pass
if ai_only and use_ai:
logger.info("AutoFillRefreshService: AI-only refresh enabled; generating full 30+ fields via AI")
try:
ai_payload = await self.structured_ai.generate_autofill_fields(user_id, base_context)
meta = ai_payload.get('meta') or {}
logger.info("AI-only payload meta: ai_used=%s overrides=%s", meta.get('ai_used'), meta.get('ai_overrides_count'))
return ai_payload
except Exception as e:
logger.error("AI-only structured generation failed | user=%s | err=%s", user_id, repr(e))
logger.error("Traceback:\n%s", traceback.format_exc())
raise
# Fallback to previous behavior (DB + sparse overrides)
payload = await self.autofill.get_autofill(user_id)
logger.info("AutoFillRefreshService: Base payload fields: %d", len(payload.get('fields', {})))
ai_overrides: Dict[str, Any] = {}
if use_ai:
# Hook to integrate AI-generated overrides for certain fields, if available
ai_overrides = await self._generate_ai_overrides(user_id, payload)
if ai_overrides:
logger.debug("AutoFillRefreshService: merging %d AI overrides", len(ai_overrides))
# Merge AI overrides into fields while preserving sources/transparency
fields = payload.get('fields', {})
for key, override_value in ai_overrides.items():
if key in fields and isinstance(fields[key], dict):
fields[key]['value'] = override_value
else:
fields[key] = {'value': override_value, 'source': 'ai_refresh', 'confidence': 0.8}
payload['fields'] = fields
# Label sources for overridden fields as coming from AI refresh (non-persistent)
sources = payload.get('sources', {})
for key in ai_overrides.keys():
sources[key] = 'ai_refresh'
payload['sources'] = sources
# If ai_only requested, we still keep onboarding values where AI is silent (fallback), but we track AI usage
overridden_keys = list(ai_overrides.keys())
payload['meta'] = {
'ai_used': len(overridden_keys) > 0,
'ai_overrides_count': len(overridden_keys),
'ai_override_fields': overridden_keys,
'ai_only': ai_only,
}
logger.info("AutoFillRefreshService: Applied AI overrides for %d fields: %s", len(ai_overrides), overridden_keys)
return payload
async def _generate_ai_overrides(self, user_id: int, base_payload: Dict[str, Any]) -> Dict[str, Any]:
"""Produce AI overrides for selected fields based on current context.
Calls AI analytics with force refresh to avoid stale DB values.
Logs raw AI response and mapped overrides for transparency.
"""
try:
logger.info(f"AutoFillRefreshService: Invoking AI analytics for user {user_id} with force refresh")
ai_resp = await self.ai_analytics.get_ai_analytics(user_id=user_id, strategy_id=None, force_refresh=True) # type: ignore
# Log high-level response structure
if isinstance(ai_resp, dict):
keys = list(ai_resp.keys())
logger.info(f"AI analytics response keys: {keys}")
# Optionally log truncated insights/recommendations
insights = ai_resp.get('insights')
recs = ai_resp.get('recommendations')
if insights is not None:
logger.info(f"AI insights count: {len(insights) if hasattr(insights, '__len__') else 'n/a'}")
if recs is not None:
logger.info(f"AI recommendations count: {len(recs) if hasattr(recs, '__len__') else 'n/a'}")
else:
logger.warning("AI analytics response is not a dict; skipping mapping")
return {}
# Minimal, conservative mapping attempt (only if safely found)
overrides: Dict[str, Any] = {}
# Example: try to map preferred_formats from recommendations if present
try:
recs = ai_resp.get('recommendations') or {}
if isinstance(recs, dict):
pf = recs.get('preferred_formats')
if pf:
overrides['preferred_formats'] = pf
# Example: target_metrics from insights/metrics if present
insights = ai_resp.get('insights') or {}
if isinstance(insights, dict):
tm = insights.get('target_metrics') or insights.get('kpi_targets')
if tm:
overrides['target_metrics'] = tm
except Exception as map_err:
logger.warning(f"AI override mapping encountered an issue: {map_err}")
logger.info(f"AI override mapping produced {len(overrides)} fields: {list(overrides.keys())}")
return overrides
except Exception as e:
logger.error(f"AI override generation failed: {e}")
return {}

View File

@@ -0,0 +1,187 @@
import json
import logging
import traceback
from typing import Any, Dict
from services.ai_service_manager import AIServiceManager, AIServiceType
logger = logging.getLogger(__name__)
CORE_FIELDS = [
'business_objectives','target_metrics','content_budget','team_size','implementation_timeline',
'market_share','competitive_position','performance_metrics','content_preferences','consumption_patterns',
'audience_pain_points','buying_journey','seasonal_trends','engagement_metrics','top_competitors',
'competitor_content_strategies','market_gaps','industry_trends','emerging_trends','preferred_formats',
'content_mix','content_frequency','optimal_timing','quality_metrics','editorial_guidelines','brand_voice',
'traffic_sources','conversion_rates','content_roi_targets','ab_testing_capabilities'
]
JSON_FIELDS = {
'business_objectives', 'target_metrics', 'content_preferences'
}
ARRAY_FIELDS = {
'preferred_formats'
}
class AIStructuredAutofillService:
"""Generate the complete 30+ Strategy Builder fields strictly from AI using onboarding context only."""
def __init__(self) -> None:
self.ai = AIServiceManager()
def _build_context_summary(self, context: Dict[str, Any]) -> Dict[str, Any]:
website = context.get('website_analysis') or {}
research = context.get('research_preferences') or {}
api_keys = context.get('api_keys_data') or {}
session = context.get('onboarding_session') or {}
summary = {
'website_summary': {
'website_url': website.get('website_url'),
'industry': website.get('industry'),
'content_types': website.get('content_types'),
'target_audience': website.get('target_audience'),
'performance_metrics': website.get('performance_metrics'),
'seo_summary': website.get('seo_analysis')
},
'research_summary': {
'audience_segments': research.get('audience_segments'),
'content_preferences': research.get('content_preferences'),
'consumption_patterns': research.get('consumption_patterns'),
'seasonality': research.get('seasonal_trends')
},
'api_summary': {
'providers': api_keys.get('providers'),
'total_keys': api_keys.get('total_keys')
},
'session_summary': {
'business_size': session.get('business_size'),
'region': session.get('region')
}
}
try:
logger.debug(
"AI Structured Autofill: context presence | website=%s research=%s api=%s session=%s",
bool(website), bool(research), bool(api_keys), bool(session)
)
logger.debug(
"AI Structured Autofill: website keys=%s research keys=%s",
len(list(website.keys())) if hasattr(website, 'keys') else 0,
len(list(research.keys())) if hasattr(research, 'keys') else 0,
)
except Exception:
pass
return summary
def _build_schema(self) -> Dict[str, Any]:
# Build a Gemini SDK-compatible Schema (dict equivalent), not JSON Schema.
# Avoid unsupported keys like oneOf/additionalProperties.
properties: Dict[str, Any] = {}
typed_overrides: Dict[str, Any] = {
# Use STRING for complex JSON-bearing fields to avoid OBJECT property constraints
'business_objectives': {"type": "STRING"},
'target_metrics': {"type": "STRING"},
'content_preferences': {"type": "STRING"},
# Known arrays
'preferred_formats': {"type": "ARRAY", "items": {"type": "STRING"}},
# Known selects
'content_frequency': {"type": "STRING"},
}
for key in CORE_FIELDS:
properties[key] = typed_overrides.get(key, {"type": "STRING"})
schema = {
"type": "OBJECT",
"properties": properties,
# Property ordering can help response consistency per Gemini docs
"propertyOrdering": CORE_FIELDS,
}
logger.debug("AI Structured Autofill: schema built (SDK) with %d properties", len(CORE_FIELDS))
return schema
def _build_prompt(self, context_summary: Dict[str, Any]) -> str:
prompt = (
"You are a senior content strategy system. Using ONLY the provided context (do not copy raw\n"
"values), infer professional, actionable values for ALL of the following 30+ strategy fields.\n"
"Output strictly valid JSON matching the given schema. Provide concise, business-ready values.\n"
"If you are uncertain, infer the most reasonable assumption for a small business. Do not leave\n"
"fields empty.\n\n"
f"CONTEXT:\n{json.dumps(context_summary, indent=2)}\n\n"
"FIELDS TO PRODUCE (keys only; values inferred):\n"
f"{CORE_FIELDS}\n"
)
logger.debug("AI Structured Autofill: prompt preview=%d chars", len(prompt))
return prompt
def _normalize_value(self, key: str, value: Any) -> Any:
if value is None:
return None
# Parse JSON-bearing fields if they arrived as JSON strings
if key in JSON_FIELDS:
if isinstance(value, str):
try:
return json.loads(value)
except Exception:
# Keep as string if not valid JSON
return value
return value
# Coerce arrays from comma-separated strings where applicable
if key in ARRAY_FIELDS:
if isinstance(value, str):
split = [s.strip() for s in value.split(',') if s.strip()]
return split if split else None
if isinstance(value, list):
return [str(v) for v in value]
return None
return value
async def generate_autofill_fields(self, user_id: int, context: Dict[str, Any]) -> Dict[str, Any]:
context_summary = self._build_context_summary(context)
schema = self._build_schema()
prompt = self._build_prompt(context_summary)
logger.info("AIStructuredAutofillService: generating 30+ fields | user=%s", user_id)
logger.debug("AIStructuredAutofillService: properties=%d", len(schema.get('properties', {})))
try:
result = await self.ai.execute_structured_json_call(
service_type=AIServiceType.STRATEGIC_INTELLIGENCE,
prompt=prompt,
schema=schema
)
except Exception as e:
logger.error("AI structured call failed | user=%s | err=%s", user_id, repr(e))
logger.error("Traceback:\n%s", traceback.format_exc())
raise
if not isinstance(result, dict):
raise ValueError("AI did not return a structured JSON object")
try:
logger.debug("AI structured result keys=%d | sample keys=%s", len(list(result.keys())), list(result.keys())[:8])
except Exception:
pass
# Build UI fields map using only non-null normalized values
fields: Dict[str, Any] = {}
sources: Dict[str, str] = {}
non_null_keys = []
for key in CORE_FIELDS:
raw_value = result.get(key)
norm_value = self._normalize_value(key, raw_value)
if norm_value is not None and norm_value != "" and norm_value != []:
fields[key] = { 'value': norm_value, 'source': 'ai_refresh', 'confidence': 0.8 }
sources[key] = 'ai_refresh'
non_null_keys.append(key)
missing_fields = [k for k in CORE_FIELDS if k not in non_null_keys]
payload = {
'fields': fields,
'sources': sources,
'meta': {
'ai_used': len(non_null_keys) > 0,
'ai_overrides_count': len(non_null_keys),
'ai_override_fields': non_null_keys,
'ai_only': True,
'missing_fields': missing_fields
}
}
logger.info("AI structured autofill completed | non_null_fields=%d missing=%d", len(non_null_keys), len(missing_fields))
return payload

View File

@@ -0,0 +1,79 @@
from typing import Any, Dict, Optional
from sqlalchemy.orm import Session
from ..onboarding.data_integration import OnboardingDataIntegrationService
# Local module imports (to be created in this batch)
from .normalizers.website_normalizer import normalize_website_analysis
from .normalizers.research_normalizer import normalize_research_preferences
from .normalizers.api_keys_normalizer import normalize_api_keys
from .transformer import transform_to_fields
from .quality import calculate_quality_scores_from_raw, calculate_confidence_from_raw, calculate_data_freshness
from .transparency import build_data_sources_map, build_input_data_points
from .schema import validate_output
class AutoFillService:
"""Facade for building Content Strategy auto-fill payload."""
def __init__(self, db: Session):
self.db = db
self.integration = OnboardingDataIntegrationService()
async def get_autofill(self, user_id: int) -> Dict[str, Any]:
# 1) Collect raw integration data
integrated = await self.integration.process_onboarding_data(user_id, self.db)
if not integrated:
raise RuntimeError("No onboarding data available for user")
website_raw = integrated.get('website_analysis', {})
research_raw = integrated.get('research_preferences', {})
api_raw = integrated.get('api_keys_data', {})
session_raw = integrated.get('onboarding_session', {})
# 2) Normalize raw sources
website = await normalize_website_analysis(website_raw)
research = await normalize_research_preferences(research_raw)
api_keys = await normalize_api_keys(api_raw)
# 3) Quality/confidence/freshness (computed from raw, but returned as meta)
quality_scores = calculate_quality_scores_from_raw({
'website_analysis': website_raw,
'research_preferences': research_raw,
'api_keys_data': api_raw,
})
confidence_levels = calculate_confidence_from_raw({
'website_analysis': website_raw,
'research_preferences': research_raw,
'api_keys_data': api_raw,
})
data_freshness = calculate_data_freshness(session_raw)
# 4) Transform to frontend field map
fields = transform_to_fields(
website=website,
research=research,
api_keys=api_keys,
session=session_raw,
)
# 5) Transparency maps
sources = build_data_sources_map(website, research, api_keys)
input_data_points = build_input_data_points(
website_raw=website_raw,
research_raw=research_raw,
api_raw=api_raw,
)
payload = {
'fields': fields,
'sources': sources,
'quality_scores': quality_scores,
'confidence_levels': confidence_levels,
'data_freshness': data_freshness,
'input_data_points': input_data_points,
}
# Validate structure strictly
validate_output(payload)
return payload

View File

@@ -0,0 +1,25 @@
from typing import Any, Dict
async def normalize_api_keys(api_data: Dict[str, Any]) -> Dict[str, Any]:
if not api_data:
return {}
providers = api_data.get('providers', [])
return {
'analytics_data': {
'google_analytics': {
'connected': 'google_analytics' in providers,
'metrics': api_data.get('google_analytics', {}).get('metrics', {})
},
'google_search_console': {
'connected': 'google_search_console' in providers,
'metrics': api_data.get('google_search_console', {}).get('metrics', {})
}
},
'social_media_data': api_data.get('social_media_data', {}),
'competitor_data': api_data.get('competitor_data', {}),
'data_quality': api_data.get('data_quality'),
'confidence_level': api_data.get('confidence_level', 0.8),
'data_freshness': api_data.get('data_freshness', 0.8)
}

View File

@@ -0,0 +1,29 @@
from typing import Any, Dict
async def normalize_research_preferences(research_data: Dict[str, Any]) -> Dict[str, Any]:
if not research_data:
return {}
return {
'content_preferences': {
'preferred_formats': research_data.get('content_types', []),
'content_topics': research_data.get('research_topics', []),
'content_style': research_data.get('writing_style', {}).get('tone', []),
'content_length': 'Medium (1000-2000 words)',
'visual_preferences': ['Infographics', 'Charts', 'Diagrams'],
},
'audience_intelligence': {
'target_audience': research_data.get('target_audience', {}).get('demographics', []),
'pain_points': research_data.get('target_audience', {}).get('pain_points', []),
'buying_journey': research_data.get('target_audience', {}).get('buying_journey', {}),
'consumption_patterns': research_data.get('target_audience', {}).get('consumption_patterns', {}),
},
'research_goals': {
'primary_goals': research_data.get('research_topics', []),
'secondary_goals': research_data.get('content_types', []),
'success_metrics': ['Website traffic', 'Lead quality', 'Engagement rates'],
},
'data_quality': research_data.get('data_quality'),
'confidence_level': research_data.get('confidence_level', 0.8),
'data_freshness': research_data.get('data_freshness', 0.8),
}

View File

@@ -0,0 +1,44 @@
from typing import Any, Dict
async def normalize_website_analysis(website_data: Dict[str, Any]) -> Dict[str, Any]:
if not website_data:
return {}
processed_data = {
'website_url': website_data.get('website_url'),
'industry': website_data.get('target_audience', {}).get('industry_focus'),
'market_position': 'Emerging',
'business_size': 'Medium',
'target_audience': website_data.get('target_audience', {}).get('demographics'),
'content_goals': website_data.get('content_type', {}).get('purpose', []),
'performance_metrics': {
'traffic': website_data.get('performance_metrics', {}).get('traffic', 10000),
'conversion_rate': website_data.get('performance_metrics', {}).get('conversion_rate', 2.5),
'bounce_rate': website_data.get('performance_metrics', {}).get('bounce_rate', 50.0),
'avg_session_duration': website_data.get('performance_metrics', {}).get('avg_session_duration', 150),
'estimated_market_share': website_data.get('performance_metrics', {}).get('estimated_market_share')
},
'traffic_sources': website_data.get('traffic_sources', {
'organic': 70,
'social': 20,
'direct': 7,
'referral': 3
}),
'content_gaps': website_data.get('style_guidelines', {}).get('content_gaps', []),
'topics': website_data.get('content_type', {}).get('primary_type', []),
'content_quality_score': website_data.get('content_quality_score', 7.5),
'seo_opportunities': website_data.get('style_guidelines', {}).get('seo_opportunities', []),
'competitors': website_data.get('competitors', []),
'competitive_advantages': website_data.get('style_guidelines', {}).get('advantages', []),
'market_gaps': website_data.get('style_guidelines', {}).get('market_gaps', []),
'data_quality': website_data.get('data_quality'),
'confidence_level': website_data.get('confidence_level', 0.8),
'data_freshness': website_data.get('data_freshness', 0.8),
'content_budget': website_data.get('content_budget'),
'team_size': website_data.get('team_size'),
'implementation_timeline': website_data.get('implementation_timeline'),
'market_share': website_data.get('market_share'),
'target_metrics': website_data.get('target_metrics'),
}
return processed_data

View File

@@ -0,0 +1,61 @@
from typing import Any, Dict
from datetime import datetime
def calculate_quality_scores_from_raw(data_sources: Dict[str, Any]) -> Dict[str, float]:
scores: Dict[str, float] = {}
for source, data in data_sources.items():
if isinstance(data, dict) and data:
total = len(data)
non_null = len([v for v in data.values() if v is not None])
scores[source] = (non_null / total) * 100 if total else 0.0
else:
scores[source] = 0.0
return scores
def calculate_confidence_from_raw(data_sources: Dict[str, Any]) -> Dict[str, float]:
levels: Dict[str, float] = {}
if data_sources.get('website_analysis'):
levels['website_analysis'] = data_sources['website_analysis'].get('confidence_level', 0.8)
if data_sources.get('research_preferences'):
levels['research_preferences'] = data_sources['research_preferences'].get('confidence_level', 0.7)
if data_sources.get('api_keys_data'):
levels['api_keys_data'] = data_sources['api_keys_data'].get('confidence_level', 0.6)
return levels
def calculate_data_freshness(onboarding_session: Any) -> Dict[str, Any]:
try:
updated_at = None
if hasattr(onboarding_session, 'updated_at'):
updated_at = onboarding_session.updated_at
elif isinstance(onboarding_session, dict):
updated_at = onboarding_session.get('last_updated') or onboarding_session.get('updated_at')
if not updated_at:
return {'status': 'unknown', 'age_days': 'unknown'}
if isinstance(updated_at, str):
try:
updated_at = datetime.fromisoformat(updated_at.replace('Z', '+00:00'))
except ValueError:
return {'status': 'unknown', 'age_days': 'unknown'}
age_days = (datetime.utcnow() - updated_at).days
if age_days <= 7:
status = 'fresh'
elif age_days <= 30:
status = 'recent'
elif age_days <= 90:
status = 'aging'
else:
status = 'stale'
return {
'status': status,
'age_days': age_days,
'last_updated': updated_at.isoformat() if hasattr(updated_at, 'isoformat') else str(updated_at)
}
except Exception:
return {'status': 'unknown', 'age_days': 'unknown'}

View File

@@ -0,0 +1,39 @@
from typing import Any, Dict
REQUIRED_TOP_LEVEL_KEYS = {
'fields': dict,
'sources': dict,
'quality_scores': dict,
'confidence_levels': dict,
'data_freshness': dict,
'input_data_points': dict,
}
def validate_output(payload: Dict[str, Any]) -> None:
# Top-level keys and types
for key, typ in REQUIRED_TOP_LEVEL_KEYS.items():
if key not in payload:
raise ValueError(f"Autofill payload missing key: {key}")
if not isinstance(payload[key], typ):
raise ValueError(f"Autofill payload key '{key}' must be {typ.__name__}")
fields = payload['fields']
if not isinstance(fields, dict):
raise ValueError("fields must be an object")
# Allow empty fields, but validate structure when present
for field_id, spec in fields.items():
if not isinstance(spec, dict):
raise ValueError(f"Field '{field_id}' must be an object")
for k in ('value', 'source', 'confidence'):
if k not in spec:
raise ValueError(f"Field '{field_id}' missing '{k}'")
if spec['source'] not in ('website_analysis', 'research_preferences', 'api_keys_data', 'onboarding_session'):
raise ValueError(f"Field '{field_id}' has invalid source: {spec['source']}")
try:
c = float(spec['confidence'])
except Exception:
raise ValueError(f"Field '{field_id}' confidence must be numeric")
if c < 0.0 or c > 1.0:
raise ValueError(f"Field '{field_id}' confidence must be in [0,1]")

View File

@@ -0,0 +1,268 @@
from typing import Any, Dict
def transform_to_fields(*, website: Dict[str, Any], research: Dict[str, Any], api_keys: Dict[str, Any], session: Dict[str, Any]) -> Dict[str, Any]:
fields: Dict[str, Any] = {}
# Business Context
if website.get('content_goals'):
fields['business_objectives'] = {
'value': website.get('content_goals'),
'source': 'website_analysis',
'confidence': website.get('confidence_level')
}
if website.get('target_metrics'):
fields['target_metrics'] = {
'value': website.get('target_metrics'),
'source': 'website_analysis',
'confidence': website.get('confidence_level')
}
elif website.get('performance_metrics'):
fields['target_metrics'] = {
'value': website.get('performance_metrics'),
'source': 'website_analysis',
'confidence': website.get('confidence_level')
}
# content_budget with session fallback
if website.get('content_budget') is not None:
fields['content_budget'] = {
'value': website.get('content_budget'),
'source': 'website_analysis',
'confidence': website.get('confidence_level')
}
elif isinstance(session, dict) and session.get('budget') is not None:
fields['content_budget'] = {
'value': session.get('budget'),
'source': 'onboarding_session',
'confidence': 0.7
}
# team_size with session fallback
if website.get('team_size') is not None:
fields['team_size'] = {
'value': website.get('team_size'),
'source': 'website_analysis',
'confidence': website.get('confidence_level')
}
elif isinstance(session, dict) and session.get('team_size') is not None:
fields['team_size'] = {
'value': session.get('team_size'),
'source': 'onboarding_session',
'confidence': 0.7
}
# implementation_timeline with session fallback
if website.get('implementation_timeline'):
fields['implementation_timeline'] = {
'value': website.get('implementation_timeline'),
'source': 'website_analysis',
'confidence': website.get('confidence_level')
}
elif isinstance(session, dict) and session.get('timeline'):
fields['implementation_timeline'] = {
'value': session.get('timeline'),
'source': 'onboarding_session',
'confidence': 0.7
}
# market_share with derive from performance metrics
if website.get('market_share'):
fields['market_share'] = {
'value': website.get('market_share'),
'source': 'website_analysis',
'confidence': website.get('confidence_level')
}
elif website.get('performance_metrics'):
fields['market_share'] = {
'value': website.get('performance_metrics', {}).get('estimated_market_share', None),
'source': 'website_analysis',
'confidence': website.get('confidence_level')
}
# performance metrics
fields['performance_metrics'] = {
'value': website.get('performance_metrics', {}),
'source': 'website_analysis',
'confidence': website.get('confidence_level', 0.8)
}
# Audience Intelligence
audience_research = research.get('audience_intelligence', {})
content_prefs = research.get('content_preferences', {})
fields['content_preferences'] = {
'value': content_prefs,
'source': 'research_preferences',
'confidence': research.get('confidence_level', 0.8)
}
fields['consumption_patterns'] = {
'value': audience_research.get('consumption_patterns', {}),
'source': 'research_preferences',
'confidence': research.get('confidence_level', 0.8)
}
fields['audience_pain_points'] = {
'value': audience_research.get('pain_points', []),
'source': 'research_preferences',
'confidence': research.get('confidence_level', 0.8)
}
fields['buying_journey'] = {
'value': audience_research.get('buying_journey', {}),
'source': 'research_preferences',
'confidence': research.get('confidence_level', 0.8)
}
fields['seasonal_trends'] = {
'value': ['Q1: Planning', 'Q2: Execution', 'Q3: Optimization', 'Q4: Review'],
'source': 'research_preferences',
'confidence': research.get('confidence_level', 0.7)
}
fields['engagement_metrics'] = {
'value': {
'avg_session_duration': website.get('performance_metrics', {}).get('avg_session_duration', 180),
'bounce_rate': website.get('performance_metrics', {}).get('bounce_rate', 45.5),
'pages_per_session': 2.5,
},
'source': 'website_analysis',
'confidence': website.get('confidence_level', 0.8)
}
# Competitive Intelligence
fields['top_competitors'] = {
'value': website.get('competitors', [
'Competitor A - Industry Leader',
'Competitor B - Emerging Player',
'Competitor C - Niche Specialist'
]),
'source': 'website_analysis',
'confidence': website.get('confidence_level', 0.8)
}
fields['competitor_content_strategies'] = {
'value': ['Educational content', 'Case studies', 'Thought leadership'],
'source': 'website_analysis',
'confidence': website.get('confidence_level', 0.7)
}
fields['market_gaps'] = {
'value': website.get('market_gaps', []),
'source': 'website_analysis',
'confidence': website.get('confidence_level', 0.8)
}
fields['industry_trends'] = {
'value': ['Digital transformation', 'AI/ML adoption', 'Remote work'],
'source': 'website_analysis',
'confidence': website.get('confidence_level', 0.8)
}
fields['emerging_trends'] = {
'value': ['Voice search optimization', 'Video content', 'Interactive content'],
'source': 'website_analysis',
'confidence': website.get('confidence_level', 0.7)
}
# Content Strategy
fields['preferred_formats'] = {
'value': content_prefs.get('preferred_formats', ['Blog posts', 'Whitepapers', 'Webinars', 'Case studies', 'Videos']),
'source': 'research_preferences',
'confidence': research.get('confidence_level', 0.8)
}
fields['content_mix'] = {
'value': {
'blog_posts': 40,
'whitepapers': 20,
'webinars': 15,
'case_studies': 15,
'videos': 10,
},
'source': 'research_preferences',
'confidence': research.get('confidence_level', 0.8)
}
fields['content_frequency'] = {
'value': 'Weekly',
'source': 'research_preferences',
'confidence': research.get('confidence_level', 0.8)
}
fields['optimal_timing'] = {
'value': {
'best_days': ['Tuesday', 'Wednesday', 'Thursday'],
'best_times': ['9:00 AM', '1:00 PM', '3:00 PM']
},
'source': 'research_preferences',
'confidence': research.get('confidence_level', 0.7)
}
fields['quality_metrics'] = {
'value': {
'readability_score': 8.5,
'engagement_target': 5.0,
'conversion_target': 2.0
},
'source': 'research_preferences',
'confidence': research.get('confidence_level', 0.8)
}
fields['editorial_guidelines'] = {
'value': {
'tone': content_prefs.get('content_style', ['Professional', 'Educational']),
'length': content_prefs.get('content_length', 'Medium (1000-2000 words)'),
'formatting': ['Use headers', 'Include visuals', 'Add CTAs']
},
'source': 'research_preferences',
'confidence': research.get('confidence_level', 0.8)
}
fields['brand_voice'] = {
'value': {
'tone': 'Professional yet approachable',
'style': 'Educational and authoritative',
'personality': 'Expert, helpful, trustworthy'
},
'source': 'research_preferences',
'confidence': research.get('confidence_level', 0.8)
}
# Performance & Analytics
fields['traffic_sources'] = {
'value': website.get('traffic_sources', {}),
'source': 'website_analysis',
'confidence': website.get('confidence_level', 0.8)
}
fields['conversion_rates'] = {
'value': {
'overall': website.get('performance_metrics', {}).get('conversion_rate', 3.2),
'blog': 2.5,
'landing_pages': 4.0,
'email': 5.5,
},
'source': 'website_analysis',
'confidence': website.get('confidence_level', 0.8)
}
fields['content_roi_targets'] = {
'value': {
'target_roi': 300,
'cost_per_lead': 50,
'lifetime_value': 500,
},
'source': 'website_analysis',
'confidence': website.get('confidence_level', 0.7)
}
fields['ab_testing_capabilities'] = {
'value': True,
'source': 'api_keys_data',
'confidence': api_keys.get('confidence_level', 0.8)
}
return fields

View File

@@ -0,0 +1,98 @@
from typing import Any, Dict
def build_data_sources_map(website: Dict[str, Any], research: Dict[str, Any], api_keys: Dict[str, Any]) -> Dict[str, str]:
sources: Dict[str, str] = {}
website_fields = ['business_objectives', 'target_metrics', 'content_budget', 'team_size',
'implementation_timeline', 'market_share', 'competitive_position',
'performance_metrics', 'engagement_metrics', 'top_competitors',
'competitor_content_strategies', 'market_gaps', 'industry_trends',
'emerging_trends', 'traffic_sources', 'conversion_rates', 'content_roi_targets']
research_fields = ['content_preferences', 'consumption_patterns', 'audience_pain_points',
'buying_journey', 'seasonal_trends', 'preferred_formats', 'content_mix',
'content_frequency', 'optimal_timing', 'quality_metrics', 'editorial_guidelines',
'brand_voice']
api_fields = ['ab_testing_capabilities']
for f in website_fields:
sources[f] = 'website_analysis'
for f in research_fields:
sources[f] = 'research_preferences'
for f in api_fields:
sources[f] = 'api_keys_data'
return sources
def build_input_data_points(*, website_raw: Dict[str, Any], research_raw: Dict[str, Any], api_raw: Dict[str, Any]) -> Dict[str, Any]:
input_data_points: Dict[str, Any] = {}
if website_raw:
input_data_points['business_objectives'] = {
'website_content': website_raw.get('content_goals', 'Not available'),
'meta_description': website_raw.get('meta_description', 'Not available'),
'about_page': website_raw.get('about_page_content', 'Not available'),
'page_title': website_raw.get('page_title', 'Not available'),
'content_analysis': website_raw.get('content_analysis', {})
}
if research_raw:
input_data_points['target_metrics'] = {
'research_preferences': research_raw.get('target_audience', 'Not available'),
'industry_benchmarks': research_raw.get('industry_benchmarks', 'Not available'),
'competitor_analysis': research_raw.get('competitor_analysis', 'Not available'),
'market_research': research_raw.get('market_research', 'Not available')
}
if research_raw:
input_data_points['content_preferences'] = {
'user_preferences': research_raw.get('content_types', 'Not available'),
'industry_trends': research_raw.get('industry_trends', 'Not available'),
'consumption_patterns': research_raw.get('consumption_patterns', 'Not available'),
'audience_research': research_raw.get('audience_research', 'Not available')
}
if website_raw or research_raw:
input_data_points['preferred_formats'] = {
'existing_content': website_raw.get('existing_content_types', 'Not available') if website_raw else 'Not available',
'engagement_metrics': website_raw.get('engagement_metrics', 'Not available') if website_raw else 'Not available',
'platform_analysis': research_raw.get('platform_preferences', 'Not available') if research_raw else 'Not available',
'content_performance': website_raw.get('content_performance', 'Not available') if website_raw else 'Not available'
}
if research_raw:
input_data_points['content_frequency'] = {
'audience_research': research_raw.get('content_frequency_preferences', 'Not available'),
'industry_standards': research_raw.get('industry_frequency', 'Not available'),
'competitor_frequency': research_raw.get('competitor_frequency', 'Not available'),
'optimal_timing': research_raw.get('optimal_timing', 'Not available')
}
if website_raw:
input_data_points['content_budget'] = {
'website_analysis': website_raw.get('budget_indicators', 'Not available'),
'industry_standards': website_raw.get('industry_budget', 'Not available'),
'company_size': website_raw.get('company_size', 'Not available'),
'market_position': website_raw.get('market_position', 'Not available')
}
if website_raw:
input_data_points['team_size'] = {
'company_profile': website_raw.get('company_profile', 'Not available'),
'content_volume': website_raw.get('content_volume', 'Not available'),
'industry_standards': website_raw.get('industry_team_size', 'Not available'),
'budget_constraints': website_raw.get('budget_constraints', 'Not available')
}
if research_raw:
input_data_points['implementation_timeline'] = {
'project_scope': research_raw.get('project_scope', 'Not available'),
'resource_availability': research_raw.get('resource_availability', 'Not available'),
'industry_timeline': research_raw.get('industry_timeline', 'Not available'),
'complexity_assessment': research_raw.get('complexity_assessment', 'Not available')
}
return input_data_points