Added enhanced linguistic analyzer and persona quality improver

This commit is contained in:
ajaysi
2025-09-14 09:53:27 +05:30
parent c63148e1ce
commit 1460ce3cb6
35 changed files with 4446 additions and 118 deletions

View File

@@ -317,6 +317,22 @@ async def generate_section(request: BlogSectionRequest) -> BlogSectionResponse:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/section/{section_id}/continuity")
async def get_section_continuity(section_id: str) -> Dict[str, Any]:
"""Fetch last computed continuity metrics for a section (if available)."""
try:
# Access the in-memory continuity from the generator
gen = service.content_generator
# Find the last stored summary for the given section id
# For now, expose the most recent metrics if the section was just generated
# We keep a small in-memory snapshot on the generator object
continuity: Dict[str, Any] = getattr(gen, "_last_continuity", {})
metrics = continuity.get(section_id)
return {"section_id": section_id, "continuity_metrics": metrics}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/section/optimize", response_model=BlogOptimizeResponse)
async def optimize_section(request: BlogOptimizeRequest) -> BlogOptimizeResponse:
try:

View File

@@ -11,6 +11,9 @@ backend_path = Path(__file__).parent.parent.parent.parent
sys.path.append(str(backend_path))
from services.llm_providers.gemini_provider import gemini_text_response, gemini_structured_json_response
from services.persona_analysis_service import PersonaAnalysisService
from typing import Dict, Any, Optional
import time
class FacebookWriterBaseService:
@@ -19,6 +22,12 @@ class FacebookWriterBaseService:
def __init__(self):
"""Initialize the base service."""
self.logger = logger
self.persona_service = PersonaAnalysisService()
# Persona caching
self._persona_cache: Dict[str, Dict[str, Any]] = {}
self._cache_timestamps: Dict[str, float] = {}
self._cache_duration = 300 # 5 minutes cache duration
def _generate_text(self, prompt: str, temperature: float = 0.7, max_tokens: int = 2048) -> str:
"""
@@ -146,6 +155,107 @@ class FacebookWriterBaseService:
return base_suggestions
def _get_persona_data(self, user_id: int = 1) -> Optional[Dict[str, Any]]:
"""
Get persona data for Facebook platform with caching.
Args:
user_id: User ID to get persona for
Returns:
Persona data or None if not available
"""
cache_key = f"facebook_persona_{user_id}"
current_time = time.time()
# Check cache first
if cache_key in self._persona_cache and cache_key in self._cache_timestamps:
cache_age = current_time - self._cache_timestamps[cache_key]
if cache_age < self._cache_duration:
self.logger.debug(f"Using cached persona data for user {user_id} (age: {cache_age:.1f}s)")
return self._persona_cache[cache_key]
else:
# Cache expired, remove it
self.logger.debug(f"Cache expired for user {user_id}, refreshing...")
del self._persona_cache[cache_key]
del self._cache_timestamps[cache_key]
# Fetch fresh data
try:
persona_data = self.persona_service.get_persona_for_platform(user_id, 'facebook')
# Cache the result
if persona_data:
self._persona_cache[cache_key] = persona_data
self._cache_timestamps[cache_key] = current_time
self.logger.debug(f"Cached persona data for user {user_id}")
return persona_data
except Exception as e:
self.logger.warning(f"Could not load persona data for Facebook content generation: {e}")
return None
def _clear_persona_cache(self, user_id: int = None):
"""
Clear persona cache for a specific user or all users.
Args:
user_id: User ID to clear cache for, or None to clear all
"""
if user_id is None:
self._persona_cache.clear()
self._cache_timestamps.clear()
self.logger.info("Cleared all persona cache")
else:
cache_key = f"facebook_persona_{user_id}"
if cache_key in self._persona_cache:
del self._persona_cache[cache_key]
del self._cache_timestamps[cache_key]
self.logger.info(f"Cleared persona cache for user {user_id}")
def _build_persona_enhanced_prompt(self, base_prompt: str, persona_data: Optional[Dict[str, Any]] = None) -> str:
"""
Enhance prompt with persona data if available.
Args:
base_prompt: Base prompt to enhance
persona_data: Persona data to incorporate
Returns:
Enhanced prompt with persona guidance
"""
if not persona_data:
return base_prompt
try:
core_persona = persona_data.get('core_persona', {})
platform_persona = persona_data.get('platform_adaptation', {})
if not core_persona:
return base_prompt
persona_guidance = f"""
PERSONA-AWARE WRITING GUIDANCE:
- PERSONA: {core_persona.get('persona_name', 'Unknown')} ({core_persona.get('archetype', 'Unknown')})
- CORE BELIEF: {core_persona.get('core_belief', 'Unknown')}
- CONFIDENCE SCORE: {core_persona.get('confidence_score', 0)}%
PLATFORM OPTIMIZATION (Facebook):
- CHARACTER LIMIT: {platform_persona.get('content_format_rules', {}).get('character_limit', '63206')} characters
- OPTIMAL LENGTH: {platform_persona.get('content_format_rules', {}).get('optimal_length', '40-80 characters')}
- ENGAGEMENT PATTERN: {platform_persona.get('engagement_patterns', {}).get('posting_frequency', '1-2 times per day')}
- HASHTAG STRATEGY: {platform_persona.get('lexical_features', {}).get('hashtag_strategy', '1-2 relevant hashtags')}
ALWAYS generate content that matches this persona's linguistic fingerprint and platform optimization rules.
"""
return f"{base_prompt}\n\n{persona_guidance}"
except Exception as e:
self.logger.warning(f"Error enhancing prompt with persona data: {e}")
return base_prompt
def _handle_error(self, error: Exception, operation: str) -> Dict[str, Any]:
"""
Handle errors and return standardized error response.

View File

@@ -23,8 +23,13 @@ class FacebookPostService(FacebookWriterBaseService):
actual_goal = request.custom_goal if request.post_goal.value == "Custom" else request.post_goal.value
actual_tone = request.custom_tone if request.post_tone.value == "Custom" else request.post_tone.value
# Get persona data for enhanced content generation
user_id = getattr(request, 'user_id', 1)
persona_data = self._get_persona_data(user_id)
# Build the prompt
prompt = self._build_post_prompt(request, actual_goal, actual_tone)
base_prompt = self._build_post_prompt(request, actual_goal, actual_tone)
prompt = self._build_persona_enhanced_prompt(base_prompt, persona_data)
# Generate the post content
content = self._generate_text(prompt, temperature=0.7, max_tokens=1024)

View File

@@ -15,7 +15,11 @@ class FacebookReelService(FacebookWriterBaseService):
actual_reel_type = request.custom_reel_type if request.reel_type.value == "Custom" else request.reel_type.value
actual_style = request.custom_style if request.reel_style.value == "Custom" else request.reel_style.value
prompt = f"""
# Get persona data for enhanced content generation
user_id = getattr(request, 'user_id', 1)
persona_data = self._get_persona_data(user_id)
base_prompt = f"""
Create a Facebook Reel script for:
Business: {request.business_type}
Audience: {request.target_audience}
@@ -30,6 +34,7 @@ class FacebookReelService(FacebookWriterBaseService):
Create an engaging reel script with scene breakdown, timing, and music suggestions.
"""
prompt = self._build_persona_enhanced_prompt(base_prompt, persona_data)
content = self._generate_text(prompt, temperature=0.7, max_tokens=1024)
return FacebookReelResponse(

View File

@@ -29,8 +29,13 @@ class FacebookStoryService(FacebookWriterBaseService):
actual_story_type = request.custom_story_type if request.story_type.value == "Custom" else request.story_type.value
actual_tone = request.custom_tone if request.story_tone.value == "Custom" else request.story_tone.value
# Get persona data for enhanced content generation
user_id = getattr(request, 'user_id', 1)
persona_data = self._get_persona_data(user_id)
# Build the prompt
prompt = self._build_story_prompt(request, actual_story_type, actual_tone)
base_prompt = self._build_story_prompt(request, actual_story_type, actual_tone)
prompt = self._build_persona_enhanced_prompt(base_prompt, persona_data)
# Generate the story content
content = self._generate_text(prompt, temperature=0.7, max_tokens=1024)

View File

@@ -73,12 +73,14 @@ class BlogSectionRequest(BaseModel):
keywords: List[str] = []
tone: Optional[str] = None
persona: Optional[PersonaInfo] = None
mode: Optional[str] = "polished" # 'draft' | 'polished'
class BlogSectionResponse(BaseModel):
success: bool = True
markdown: str
citations: List[ResearchSource] = []
continuity_metrics: Optional[Dict[str, float]] = None
class BlogOptimizeRequest(BaseModel):

View File

@@ -0,0 +1,164 @@
"""
Enhanced Persona Database Models
Improved schema for better writing style mimicry and quality tracking.
"""
from sqlalchemy import Column, Integer, String, Text, DateTime, Float, JSON, ForeignKey, Boolean, Index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from datetime import datetime
Base = declarative_base()
class EnhancedWritingPersona(Base):
"""Enhanced writing persona model with improved linguistic analysis."""
__tablename__ = "enhanced_writing_personas"
# Primary fields
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False, index=True)
persona_name = Column(String(255), nullable=False)
# Core Identity
archetype = Column(String(100), nullable=True)
core_belief = Column(Text, nullable=True)
brand_voice_description = Column(Text, nullable=True)
# Enhanced Linguistic Fingerprint
linguistic_fingerprint = Column(JSON, nullable=True) # More detailed analysis
writing_style_signature = Column(JSON, nullable=True) # Unique style markers
vocabulary_profile = Column(JSON, nullable=True) # Detailed vocabulary analysis
sentence_patterns = Column(JSON, nullable=True) # Sentence structure patterns
rhetorical_style = Column(JSON, nullable=True) # Rhetorical device preferences
# Quality Metrics
style_consistency_score = Column(Float, nullable=True) # 0-100
authenticity_score = Column(Float, nullable=True) # 0-100
readability_score = Column(Float, nullable=True) # 0-100
engagement_potential = Column(Float, nullable=True) # 0-100
# Learning & Adaptation
feedback_history = Column(JSON, nullable=True) # User feedback over time
performance_metrics = Column(JSON, nullable=True) # Content performance data
adaptation_history = Column(JSON, nullable=True) # How persona evolved
# Source data tracking
onboarding_session_id = Column(Integer, nullable=True)
source_website_analysis = Column(JSON, nullable=True)
source_research_preferences = Column(JSON, nullable=True)
# AI Analysis metadata
ai_analysis_version = Column(String(50), nullable=True)
confidence_score = Column(Float, nullable=True)
analysis_date = Column(DateTime, default=datetime.utcnow)
# Metadata
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
is_active = Column(Boolean, default=True)
# Indexes for performance
__table_args__ = (
Index('idx_user_active', 'user_id', 'is_active'),
Index('idx_created_at', 'created_at'),
)
class EnhancedPlatformPersona(Base):
"""Enhanced platform-specific persona with detailed optimization."""
__tablename__ = "enhanced_platform_personas"
# Primary fields
id = Column(Integer, primary_key=True)
writing_persona_id = Column(Integer, ForeignKey("enhanced_writing_personas.id"), nullable=False)
platform_type = Column(String(50), nullable=False, index=True)
# Enhanced Platform-specific Analysis
platform_linguistic_adaptation = Column(JSON, nullable=True) # How language adapts to platform
platform_engagement_patterns = Column(JSON, nullable=True) # Detailed engagement analysis
platform_content_optimization = Column(JSON, nullable=True) # Content optimization rules
platform_algorithm_insights = Column(JSON, nullable=True) # Algorithm-specific insights
# Performance Tracking
content_performance_history = Column(JSON, nullable=True) # Historical performance data
engagement_metrics = Column(JSON, nullable=True) # Engagement statistics
optimization_suggestions = Column(JSON, nullable=True) # AI-generated optimization tips
# Quality Assurance
platform_compliance_score = Column(Float, nullable=True) # 0-100
optimization_effectiveness = Column(Float, nullable=True) # 0-100
# Metadata
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
is_active = Column(Boolean, default=True)
# Relationships
writing_persona = relationship("EnhancedWritingPersona", back_populates="platform_personas")
# Indexes
__table_args__ = (
Index('idx_platform_active', 'platform_type', 'is_active'),
Index('idx_persona_platform', 'writing_persona_id', 'platform_type'),
)
class PersonaQualityMetrics(Base):
"""Tracks persona quality and improvement over time."""
__tablename__ = "persona_quality_metrics"
id = Column(Integer, primary_key=True)
writing_persona_id = Column(Integer, ForeignKey("enhanced_writing_personas.id"), nullable=False)
platform_persona_id = Column(Integer, ForeignKey("enhanced_platform_personas.id"), nullable=True)
# Quality Scores
style_accuracy = Column(Float, nullable=True) # How well it mimics user style
content_quality = Column(Float, nullable=True) # Overall content quality
engagement_rate = Column(Float, nullable=True) # Engagement performance
consistency_score = Column(Float, nullable=True) # Consistency across content
# User Feedback
user_satisfaction = Column(Float, nullable=True) # User rating
user_feedback = Column(Text, nullable=True) # Qualitative feedback
improvement_requests = Column(JSON, nullable=True) # Specific improvement requests
# AI Analysis
ai_quality_assessment = Column(JSON, nullable=True) # AI's quality analysis
improvement_suggestions = Column(JSON, nullable=True) # AI suggestions for improvement
# Metadata
assessment_date = Column(DateTime, default=datetime.utcnow)
assessor_type = Column(String(50), nullable=True) # user, ai, automated
# Relationships
writing_persona = relationship("EnhancedWritingPersona")
platform_persona = relationship("EnhancedPlatformPersona")
class PersonaLearningData(Base):
"""Stores learning data for persona improvement."""
__tablename__ = "persona_learning_data"
id = Column(Integer, primary_key=True)
writing_persona_id = Column(Integer, ForeignKey("enhanced_writing_personas.id"), nullable=False)
# Learning Inputs
user_writing_samples = Column(JSON, nullable=True) # Additional user writing samples
successful_content_examples = Column(JSON, nullable=True) # High-performing content
user_preferences = Column(JSON, nullable=True) # User preferences and adjustments
# Learning Outputs
style_refinements = Column(JSON, nullable=True) # Refinements made to persona
vocabulary_updates = Column(JSON, nullable=True) # Vocabulary additions/removals
pattern_adjustments = Column(JSON, nullable=True) # Pattern adjustments
# Metadata
learning_date = Column(DateTime, default=datetime.utcnow)
learning_type = Column(String(50), nullable=True) # feedback, sample, preference
# Relationships
writing_persona = relationship("EnhancedWritingPersona")
# Add relationships
EnhancedWritingPersona.platform_personas = relationship("EnhancedPlatformPersona", back_populates="writing_persona", cascade="all, delete-orphan")

View File

@@ -0,0 +1,152 @@
"""
ContextMemory - maintains intelligent continuity context across sections using LLM-enhanced summarization.
Stores smart per-section summaries and thread keywords for use in prompts with cost optimization.
"""
from __future__ import annotations
from typing import Dict, List, Optional, Tuple
from collections import deque
from loguru import logger
import hashlib
# Import the common gemini provider
from services.llm_providers.gemini_provider import gemini_text_response
class ContextMemory:
"""In-memory continuity store for recent sections with LLM-enhanced summarization.
Notes:
- Keeps an ordered deque of recent (section_id, summary) pairs
- Uses LLM for intelligent summarization when content is substantial
- Provides utilities to build a compact previous-sections summary
- Implements caching to minimize LLM calls
"""
def __init__(self, max_entries: int = 10):
self.max_entries = max_entries
self._recent: deque[Tuple[str, str]] = deque(maxlen=max_entries)
# Cache for LLM-generated summaries
self._summary_cache: Dict[str, str] = {}
logger.info("✅ ContextMemory initialized with LLM-enhanced summarization")
def update_with_section(self, section_id: str, full_text: str, use_llm: bool = True) -> None:
"""Create a compact summary and store it for continuity usage."""
summary = self._summarize_text_intelligently(full_text, use_llm=use_llm)
self._recent.append((section_id, summary))
def get_recent_summaries(self, limit: int = 2) -> List[str]:
"""Return the last N stored summaries (most recent first)."""
return [s for (_sid, s) in list(self._recent)[-limit:]]
def build_previous_sections_summary(self, limit: int = 2) -> str:
"""Join recent summaries for prompt injection."""
recents = self.get_recent_summaries(limit=limit)
if not recents:
return ""
return "\n\n".join(recents)
def _summarize_text_intelligently(self, text: str, target_words: int = 80, use_llm: bool = True) -> str:
"""Create intelligent summary using LLM when appropriate, fallback to truncation."""
# Create cache key
cache_key = self._get_cache_key(text)
# Check cache first
if cache_key in self._summary_cache:
logger.debug("Summary cache hit")
return self._summary_cache[cache_key]
# Determine if we should use LLM
should_use_llm = use_llm and self._should_use_llm_summarization(text)
if should_use_llm:
try:
summary = self._llm_summarize_text(text, target_words)
self._summary_cache[cache_key] = summary
logger.info("LLM-based summarization completed")
return summary
except Exception as e:
logger.warning(f"LLM summarization failed, using fallback: {e}")
# Fall through to local summarization
# Local fallback
summary = self._summarize_text_locally(text, target_words)
self._summary_cache[cache_key] = summary
return summary
def _should_use_llm_summarization(self, text: str) -> bool:
"""Determine if content is substantial enough to warrant LLM summarization."""
word_count = len(text.split())
# Use LLM for substantial content (>150 words) or complex structure
has_complex_structure = any(marker in text for marker in ['##', '###', '**', '*', '-', '1.', '2.'])
return word_count > 150 or has_complex_structure
def _llm_summarize_text(self, text: str, target_words: int = 80) -> str:
"""Use Gemini API for intelligent text summarization."""
# Truncate text to minimize tokens while keeping key content
truncated_text = text[:800] # First 800 chars usually contain the main points
prompt = f"""
Summarize the following content in approximately {target_words} words, focusing on key concepts and main points.
Content: {truncated_text}
Requirements:
- Capture the main ideas and key concepts
- Maintain the original tone and style
- Keep it concise but informative
- Focus on what's most important for continuity
Generate only the summary, no explanations or formatting.
"""
try:
result = gemini_text_response(
prompt=prompt,
temperature=0.3, # Low temperature for consistent summarization
max_tokens=500, # Increased tokens for better summaries
system_prompt="You are an expert at creating concise, informative summaries."
)
if result and result.strip():
summary = result.strip()
# Ensure it's not too long
words = summary.split()
if len(words) > target_words + 20: # Allow some flexibility
summary = " ".join(words[:target_words]) + "..."
return summary
else:
logger.warning("LLM summary response empty, using fallback")
return self._summarize_text_locally(text, target_words)
except Exception as e:
logger.error(f"LLM summarization error: {e}")
return self._summarize_text_locally(text, target_words)
def _summarize_text_locally(self, text: str, target_words: int = 80) -> str:
"""Very lightweight, deterministic truncation-based summary.
This deliberately avoids extra LLM calls. It collects the first
sentences up to approximately target_words.
"""
words = text.split()
if len(words) <= target_words:
return text.strip()
return " ".join(words[:target_words]).strip() + ""
def _get_cache_key(self, text: str) -> str:
"""Generate cache key from text hash."""
# Use first 200 chars for cache key to balance uniqueness vs memory
return hashlib.md5(text[:200].encode()).hexdigest()[:12]
def clear_cache(self):
"""Clear summary cache (useful for testing or memory management)."""
self._summary_cache.clear()
logger.info("ContextMemory cache cleared")

View File

@@ -0,0 +1,74 @@
"""
EnhancedContentGenerator - thin orchestrator combining URL selection and Gemini provider.
Provides Draft vs Polished modes and optional URL Context usage.
"""
from typing import Any, Dict
from services.llm_providers.gemini_grounded_provider import GeminiGroundedProvider
from .source_url_manager import SourceURLManager
from .context_memory import ContextMemory
from .transition_generator import TransitionGenerator
from .flow_analyzer import FlowAnalyzer
class EnhancedContentGenerator:
def __init__(self):
self.provider = GeminiGroundedProvider()
self.url_manager = SourceURLManager()
self.memory = ContextMemory(max_entries=12)
self.transitioner = TransitionGenerator()
self.flow = FlowAnalyzer()
async def generate_section(self, section: Any, research: Any, mode: str = "polished") -> Dict[str, Any]:
urls = self.url_manager.pick_relevant_urls(section, research)
prev_summary = self.memory.build_previous_sections_summary(limit=2)
prompt = self._build_prompt(section, research, prev_summary)
result = await self.provider.generate_grounded_content(
prompt=prompt,
content_type="linkedin_article",
temperature=0.6 if mode == "polished" else 0.8,
max_tokens=2048,
urls=urls,
mode=mode,
)
# Generate transition and compute intelligent flow metrics
previous_text = prev_summary
current_text = result.get("content", "")
transition = self.transitioner.generate_transition(previous_text, getattr(section, 'heading', 'This section'), use_llm=True)
metrics = self.flow.assess_flow(previous_text, current_text, use_llm=True)
# Update memory for subsequent sections and store continuity snapshot
if current_text:
self.memory.update_with_section(getattr(section, 'id', 'unknown'), current_text, use_llm=True)
# Return enriched result
result["transition"] = transition
result["continuity_metrics"] = metrics
# Persist a lightweight continuity snapshot for API access
try:
sid = getattr(section, 'id', 'unknown')
if not hasattr(self, "_last_continuity"):
self._last_continuity = {}
self._last_continuity[sid] = metrics
except Exception:
pass
return result
def _build_prompt(self, section: Any, research: Any, prev_summary: str) -> str:
heading = getattr(section, 'heading', 'Section')
key_points = getattr(section, 'key_points', [])
keywords = getattr(section, 'keywords', [])
target_words = getattr(section, 'target_words', 300)
return (
f"You are writing the blog section '{heading}'.\n\n"
f"Context summary: {prev_summary}\n"
f"Key points: {', '.join(key_points)}\n"
f"Keywords: {', '.join(keywords)}\n"
f"Target word count: {target_words}.\n"
"Use only factual info from provided sources; add short transition, then body."
)

View File

@@ -0,0 +1,162 @@
"""
FlowAnalyzer - evaluates narrative flow using LLM-based analysis with cost optimization.
Uses Gemini API for intelligent analysis while minimizing API calls through caching and smart triggers.
"""
from typing import Dict, Optional
from loguru import logger
import hashlib
import json
# Import the common gemini provider
from services.llm_providers.gemini_provider import gemini_structured_json_response
class FlowAnalyzer:
def __init__(self):
# Simple in-memory cache to avoid redundant LLM calls
self._cache: Dict[str, Dict[str, float]] = {}
# Cache for rule-based fallback when LLM analysis isn't needed
self._rule_cache: Dict[str, Dict[str, float]] = {}
logger.info("✅ FlowAnalyzer initialized with LLM-based analysis")
def assess_flow(self, previous_text: str, current_text: str, use_llm: bool = True) -> Dict[str, float]:
"""
Return flow metrics in range 0..1.
Args:
previous_text: Previous section content
current_text: Current section content
use_llm: Whether to use LLM analysis (default: True for significant content)
"""
if not current_text:
return {"flow": 0.0, "consistency": 0.0, "progression": 0.0}
# Create cache key from content hashes
cache_key = self._get_cache_key(previous_text, current_text)
# Check cache first
if cache_key in self._cache:
logger.debug("Flow analysis cache hit")
return self._cache[cache_key]
# Determine if we should use LLM analysis
should_use_llm = use_llm and self._should_use_llm_analysis(previous_text, current_text)
if should_use_llm:
try:
metrics = self._llm_flow_analysis(previous_text, current_text)
self._cache[cache_key] = metrics
logger.info("LLM-based flow analysis completed")
return metrics
except Exception as e:
logger.warning(f"LLM flow analysis failed, falling back to rules: {e}")
# Fall through to rule-based analysis
# Rule-based fallback (cached separately)
if cache_key in self._rule_cache:
return self._rule_cache[cache_key]
metrics = self._rule_based_analysis(previous_text, current_text)
self._rule_cache[cache_key] = metrics
return metrics
def _should_use_llm_analysis(self, previous_text: str, current_text: str) -> bool:
"""Determine if content is significant enough to warrant LLM analysis."""
# Use LLM for substantial content or when previous context exists
word_count = len(current_text.split())
has_previous = bool(previous_text and len(previous_text.strip()) > 50)
# Use LLM if: substantial content (>100 words) OR has meaningful previous context
return word_count > 100 or has_previous
def _llm_flow_analysis(self, previous_text: str, current_text: str) -> Dict[str, float]:
"""Use Gemini API for intelligent flow analysis."""
# Truncate content to minimize tokens while keeping context
prev_truncated = (previous_text[-300:] if previous_text else "") if previous_text else ""
curr_truncated = current_text[:500] # First 500 chars usually contain the key content
prompt = f"""
Analyze the narrative flow between these two content sections. Rate each aspect from 0.0 to 1.0.
PREVIOUS SECTION (end): {prev_truncated}
CURRENT SECTION (start): {curr_truncated}
Evaluate:
1. Flow Quality (0.0-1.0): How smoothly does the content transition? Are there logical connections?
2. Consistency (0.0-1.0): Do key themes, terminology, and tone remain consistent?
3. Progression (0.0-1.0): Does the content logically build upon previous ideas?
Return ONLY a JSON object with these exact keys: flow, consistency, progression
"""
schema = {
"type": "object",
"properties": {
"flow": {"type": "number", "minimum": 0.0, "maximum": 1.0},
"consistency": {"type": "number", "minimum": 0.0, "maximum": 1.0},
"progression": {"type": "number", "minimum": 0.0, "maximum": 1.0}
},
"required": ["flow", "consistency", "progression"]
}
try:
result = gemini_structured_json_response(
prompt=prompt,
schema=schema,
temperature=0.2, # Low temperature for consistent scoring
max_tokens=1000 # Increased tokens for better analysis
)
if result.parsed:
return {
"flow": float(result.parsed.get("flow", 0.6)),
"consistency": float(result.parsed.get("consistency", 0.6)),
"progression": float(result.parsed.get("progression", 0.6))
}
else:
logger.warning("LLM response parsing failed, using fallback")
return self._rule_based_analysis(previous_text, current_text)
except Exception as e:
logger.error(f"LLM flow analysis error: {e}")
return self._rule_based_analysis(previous_text, current_text)
def _rule_based_analysis(self, previous_text: str, current_text: str) -> Dict[str, float]:
"""Fallback rule-based analysis for cost efficiency."""
flow = 0.6
consistency = 0.6
progression = 0.6
# Enhanced heuristics
if previous_text and previous_text[-1] in ".!?":
flow += 0.1
if any(k in current_text.lower() for k in ["therefore", "next", "building on", "as a result", "furthermore", "additionally"]):
progression += 0.2
if len(current_text.split()) > 120:
consistency += 0.1
if any(k in current_text.lower() for k in ["however", "but", "although", "despite"]):
flow += 0.1 # Good use of contrast words
return {
"flow": min(flow, 1.0),
"consistency": min(consistency, 1.0),
"progression": min(progression, 1.0),
}
def _get_cache_key(self, previous_text: str, current_text: str) -> str:
"""Generate cache key from content hashes."""
# Use first 100 chars of each for cache key to balance uniqueness vs memory
prev_hash = hashlib.md5((previous_text[:100] if previous_text else "").encode()).hexdigest()[:8]
curr_hash = hashlib.md5(current_text[:100].encode()).hexdigest()[:8]
return f"{prev_hash}_{curr_hash}"
def clear_cache(self):
"""Clear analysis cache (useful for testing or memory management)."""
self._cache.clear()
self._rule_cache.clear()
logger.info("FlowAnalyzer cache cleared")

View File

@@ -0,0 +1,42 @@
"""
SourceURLManager - selects the most relevant source URLs for a section.
Low-effort heuristic using keywords and titles; safe defaults if no research.
"""
from typing import List, Dict, Any
class SourceURLManager:
def pick_relevant_urls(self, section: Any, research: Any, limit: int = 5) -> List[str]:
if not research or not getattr(research, 'sources', None):
return []
section_keywords = set([k.lower() for k in getattr(section, 'keywords', [])])
scored: List[tuple[float, str]] = []
for s in research.sources:
url = getattr(s, 'url', None) or getattr(s, 'uri', None) or s.get('url') if isinstance(s, dict) else None
title = getattr(s, 'title', None) or s.get('title') if isinstance(s, dict) else ''
if not url or not isinstance(url, str):
continue
title_l = (title or '').lower()
# simple overlap score
score = 0.0
for kw in section_keywords:
if kw and kw in title_l:
score += 1.0
# prefer https and reputable domains lightly
if url.startswith('https://'):
score += 0.2
scored.append((score, url))
scored.sort(key=lambda x: x[0], reverse=True)
dedup: List[str] = []
for _, u in scored:
if u not in dedup:
dedup.append(u)
if len(dedup) >= limit:
break
return dedup

View File

@@ -0,0 +1,143 @@
"""
TransitionGenerator - produces intelligent transitions between sections using LLM analysis.
Uses Gemini API for natural transitions while maintaining cost efficiency through smart caching.
"""
from typing import Optional, Dict
from loguru import logger
import hashlib
# Import the common gemini provider
from services.llm_providers.gemini_provider import gemini_text_response
class TransitionGenerator:
def __init__(self):
# Simple cache to avoid redundant LLM calls for similar transitions
self._cache: Dict[str, str] = {}
logger.info("✅ TransitionGenerator initialized with LLM-based generation")
def generate_transition(self, previous_text: str, current_heading: str, use_llm: bool = True) -> str:
"""
Return a 12 sentence bridge from previous_text into current_heading.
Args:
previous_text: Previous section content
current_heading: Current section heading
use_llm: Whether to use LLM generation (default: True for substantial content)
"""
prev = (previous_text or "").strip()
if not prev:
return f"Let's explore {current_heading.lower()} next."
# Create cache key
cache_key = self._get_cache_key(prev, current_heading)
# Check cache first
if cache_key in self._cache:
logger.debug("Transition generation cache hit")
return self._cache[cache_key]
# Determine if we should use LLM
should_use_llm = use_llm and self._should_use_llm_generation(prev, current_heading)
if should_use_llm:
try:
transition = self._llm_generate_transition(prev, current_heading)
self._cache[cache_key] = transition
logger.info("LLM-based transition generated")
return transition
except Exception as e:
logger.warning(f"LLM transition generation failed, using fallback: {e}")
# Fall through to heuristic generation
# Heuristic fallback
transition = self._heuristic_transition(prev, current_heading)
self._cache[cache_key] = transition
return transition
def _should_use_llm_generation(self, previous_text: str, current_heading: str) -> bool:
"""Determine if content is substantial enough to warrant LLM generation."""
# Use LLM for substantial previous content (>100 words) or complex headings
word_count = len(previous_text.split())
complex_heading = len(current_heading.split()) > 2 or any(char in current_heading for char in [':', '-', '&'])
return word_count > 100 or complex_heading
def _llm_generate_transition(self, previous_text: str, current_heading: str) -> str:
"""Use Gemini API for intelligent transition generation."""
# Truncate previous text to minimize tokens while keeping context
prev_truncated = previous_text[-200:] # Last 200 chars usually contain the conclusion
prompt = f"""
Create a smooth, natural 1-2 sentence transition from the previous content to the new section.
PREVIOUS CONTENT (ending): {prev_truncated}
NEW SECTION HEADING: {current_heading}
Requirements:
- Write exactly 1-2 sentences
- Create a logical bridge between the topics
- Use natural, engaging language
- Avoid repetition of the previous content
- Lead smoothly into the new section topic
Generate only the transition text, no explanations or formatting.
"""
try:
result = gemini_text_response(
prompt=prompt,
temperature=0.6, # Balanced creativity and consistency
max_tokens=300, # Increased tokens for better transitions
system_prompt="You are an expert content writer creating smooth transitions between sections."
)
if result and result.strip():
# Clean up the response
transition = result.strip()
# Ensure it's 1-2 sentences
sentences = transition.split('. ')
if len(sentences) > 2:
transition = '. '.join(sentences[:2]) + '.'
return transition
else:
logger.warning("LLM transition response empty, using fallback")
return self._heuristic_transition(previous_text, current_heading)
except Exception as e:
logger.error(f"LLM transition generation error: {e}")
return self._heuristic_transition(previous_text, current_heading)
def _heuristic_transition(self, previous_text: str, current_heading: str) -> str:
"""Fallback heuristic-based transition generation."""
tail = previous_text[-240:]
# Enhanced heuristics based on content patterns
if any(word in tail.lower() for word in ["problem", "issue", "challenge"]):
return f"Now that we've identified the challenges, let's explore {current_heading.lower()} to find solutions."
elif any(word in tail.lower() for word in ["solution", "approach", "method"]):
return f"Building on this approach, {current_heading.lower()} provides the next step in our analysis."
elif any(word in tail.lower() for word in ["important", "crucial", "essential"]):
return f"Given this importance, {current_heading.lower()} becomes our next focus area."
else:
return (
f"Building on the discussion above, this leads us into {current_heading.lower()}, "
f"where we focus on practical implications and what to do next."
)
def _get_cache_key(self, previous_text: str, current_heading: str) -> str:
"""Generate cache key from content hashes."""
# Use last 100 chars of previous text and heading for cache key
prev_hash = hashlib.md5(previous_text[-100:].encode()).hexdigest()[:8]
heading_hash = hashlib.md5(current_heading.encode()).hexdigest()[:8]
return f"{prev_hash}_{heading_hash}"
def clear_cache(self):
"""Clear transition cache (useful for testing or memory management)."""
self._cache.clear()
logger.info("TransitionGenerator cache cleared")

View File

@@ -28,6 +28,7 @@ from models.blog_models import (
from ..research import ResearchService
from ..outline import OutlineService
from ..content.enhanced_content_generator import EnhancedContentGenerator
class BlogWriterService:
@@ -36,6 +37,7 @@ class BlogWriterService:
def __init__(self):
self.research_service = ResearchService()
self.outline_service = OutlineService()
self.content_generator = EnhancedContentGenerator()
# Research Methods
async def research(self, request: BlogResearchRequest) -> BlogResearchResponse:
@@ -71,12 +73,37 @@ class BlogWriterService:
"""Rebalance word count distribution across sections."""
return self.outline_service.rebalance_word_counts(outline, target_words)
# Content Generation Methods (TODO: Extract to content module)
# Content Generation Methods
async def generate_section(self, request: BlogSectionRequest) -> BlogSectionResponse:
"""Generate section content from outline."""
# TODO: Move to content module
md = f"## {request.section.heading}\n\nThis section content will be generated here.\n"
return BlogSectionResponse(success=True, markdown=md, citations=request.section.references)
# Compose research-lite object with minimal continuity summary if available
research_ctx: Any = getattr(request, 'research', None)
try:
ai_result = await self.content_generator.generate_section(
section=request.section,
research=research_ctx,
mode=(request.mode or "polished"),
)
markdown = ai_result.get('content') or ai_result.get('markdown') or ''
citations = []
# Map basic citations from sources if present
for s in ai_result.get('sources', [])[:5]:
citations.append({
"title": s.get('title') if isinstance(s, dict) else getattr(s, 'title', ''),
"url": s.get('url') if isinstance(s, dict) else getattr(s, 'url', ''),
})
if not markdown:
markdown = f"## {request.section.heading}\n\n(Generated content was empty.)"
return BlogSectionResponse(
success=True,
markdown=markdown,
citations=citations,
continuity_metrics=ai_result.get('continuity_metrics')
)
except Exception as e:
logger.error(f"Section generation failed: {e}")
fallback = f"## {request.section.heading}\n\nThis section will cover: {', '.join(request.section.key_points)}."
return BlogSectionResponse(success=False, markdown=fallback, citations=[])
async def optimize_section(self, request: BlogOptimizeRequest) -> BlogOptimizeResponse:
"""Optimize section content for readability and SEO."""

View File

@@ -59,13 +59,15 @@ class CompetitorAnalyzer:
prompt=competitor_prompt,
schema=competitor_schema,
temperature=0.3,
max_tokens=1000
max_tokens=4000
)
if isinstance(competitor_analysis, dict) and 'error' not in competitor_analysis:
logger.info("✅ AI competitor analysis completed successfully")
return competitor_analysis
else:
# Fail gracefully - no fallback data
logger.error(f"AI competitor analysis failed: {competitor_analysis}")
raise ValueError(f"Competitor analysis failed: {competitor_analysis.get('error', 'Unknown error')}")
error_msg = competitor_analysis.get('error', 'Unknown error') if isinstance(competitor_analysis, dict) else str(competitor_analysis)
logger.error(f"AI competitor analysis failed: {error_msg}")
raise ValueError(f"Competitor analysis failed: {error_msg}")

View File

@@ -67,13 +67,15 @@ class ContentAngleGenerator:
prompt=angles_prompt,
schema=angles_schema,
temperature=0.7,
max_tokens=800
max_tokens=4000
)
if isinstance(angles_result, dict) and 'content_angles' in angles_result:
logger.info("✅ AI content angles generation completed successfully")
return angles_result['content_angles'][:7]
else:
# Fail gracefully - no fallback data
logger.error(f"AI content angles generation failed: {angles_result}")
raise ValueError(f"Content angles generation failed: {angles_result.get('error', 'Unknown error')}")
error_msg = angles_result.get('error', 'Unknown error') if isinstance(angles_result, dict) else str(angles_result)
logger.error(f"AI content angles generation failed: {error_msg}")
raise ValueError(f"Content angles generation failed: {error_msg}")

View File

@@ -66,13 +66,15 @@ class KeywordAnalyzer:
prompt=keyword_prompt,
schema=keyword_schema,
temperature=0.3,
max_tokens=1000
max_tokens=4000
)
if isinstance(keyword_analysis, dict) and 'error' not in keyword_analysis:
logger.info("✅ AI keyword analysis completed successfully")
return keyword_analysis
else:
# Fail gracefully - no fallback data
logger.error(f"AI keyword analysis failed: {keyword_analysis}")
raise ValueError(f"Keyword analysis failed: {keyword_analysis.get('error', 'Unknown error')}")
error_msg = keyword_analysis.get('error', 'Unknown error') if isinstance(keyword_analysis, dict) else str(keyword_analysis)
logger.error(f"AI keyword analysis failed: {error_msg}")
raise ValueError(f"Keyword analysis failed: {error_msg}")

View File

@@ -22,6 +22,7 @@ from services.linkedin.content_generator_prompts import (
VideoScriptGenerator
)
from services.persona_analysis_service import PersonaAnalysisService
import time
class ContentGenerator:
@@ -33,10 +34,77 @@ class ContentGenerator:
self.gemini_grounded = gemini_grounded
self.fallback_provider = fallback_provider
# Persona caching
self._persona_cache: Dict[str, Dict[str, Any]] = {}
self._cache_timestamps: Dict[str, float] = {}
self._cache_duration = 300 # 5 minutes cache duration
# Initialize specialized generators
self.carousel_generator = CarouselGenerator(citation_manager, quality_analyzer)
self.video_script_generator = VideoScriptGenerator(citation_manager, quality_analyzer)
def _get_cached_persona_data(self, user_id: int, platform: str) -> Optional[Dict[str, Any]]:
"""
Get persona data with caching for LinkedIn platform.
Args:
user_id: User ID to get persona for
platform: Platform type (linkedin)
Returns:
Persona data or None if not available
"""
cache_key = f"{platform}_persona_{user_id}"
current_time = time.time()
# Check cache first
if cache_key in self._persona_cache and cache_key in self._cache_timestamps:
cache_age = current_time - self._cache_timestamps[cache_key]
if cache_age < self._cache_duration:
logger.debug(f"Using cached persona data for user {user_id} (age: {cache_age:.1f}s)")
return self._persona_cache[cache_key]
else:
# Cache expired, remove it
logger.debug(f"Cache expired for user {user_id}, refreshing...")
del self._persona_cache[cache_key]
del self._cache_timestamps[cache_key]
# Fetch fresh data
try:
persona_service = PersonaAnalysisService()
persona_data = persona_service.get_persona_for_platform(user_id, platform)
# Cache the result
if persona_data:
self._persona_cache[cache_key] = persona_data
self._cache_timestamps[cache_key] = current_time
logger.debug(f"Cached persona data for user {user_id}")
return persona_data
except Exception as e:
logger.warning(f"Could not load persona data for {platform} content generation: {e}")
return None
def _clear_persona_cache(self, user_id: int = None):
"""
Clear persona cache for a specific user or all users.
Args:
user_id: User ID to clear cache for, or None to clear all
"""
if user_id is None:
self._persona_cache.clear()
self._cache_timestamps.clear()
logger.info("Cleared all persona cache")
else:
# Clear cache for all platforms for this user
keys_to_remove = [key for key in self._persona_cache.keys() if key.endswith(f"_{user_id}")]
for key in keys_to_remove:
del self._persona_cache[key]
del self._cache_timestamps[key]
logger.info(f"Cleared persona cache for user {user_id}")
def _transform_gemini_sources(self, gemini_sources):
"""Transform Gemini sources to ResearchSource format."""
transformed_sources = []
@@ -342,8 +410,8 @@ class ContentGenerator:
raise Exception("Gemini Grounded Provider not available - cannot generate content without AI provider")
# Build the prompt for grounded generation using persona if available (DB vs session override)
persona_service = PersonaAnalysisService()
persona_data = persona_service.get_persona_for_platform(user_id=getattr(request, 'user_id', 1), platform='linkedin') if hasattr(request, 'user_id') else None
user_id = getattr(request, 'user_id', 1)
persona_data = self._get_cached_persona_data(user_id, 'linkedin') if hasattr(request, 'user_id') else None
if getattr(request, 'persona_override', None):
try:
# Merge shallowly: override core and platform adaptation parts
@@ -416,8 +484,8 @@ class ContentGenerator:
raise Exception("Gemini Grounded Provider not available - cannot generate content without AI provider")
# Build the prompt for grounded generation using persona if available (DB vs session override)
persona_service = PersonaAnalysisService()
persona_data = persona_service.get_persona_for_platform(user_id=getattr(request, 'user_id', 1), platform='linkedin') if hasattr(request, 'user_id') else None
user_id = getattr(request, 'user_id', 1)
persona_data = self._get_cached_persona_data(user_id, 'linkedin') if hasattr(request, 'user_id') else None
if getattr(request, 'persona_override', None):
try:
override = request.persona_override

View File

@@ -46,14 +46,17 @@ class GeminiGroundedProvider:
# Initialize the Gemini client with timeout configuration
self.client = genai.Client(api_key=self.api_key)
self.timeout = 60 # 60 second timeout for API calls (increased for research)
self._cache: Dict[str, Any] = {}
logger.info("✅ Gemini Grounded Provider initialized with native Google Search grounding")
async def generate_grounded_content(
self,
prompt: str,
self,
prompt: str,
content_type: str = "linkedin_post",
temperature: float = 0.7,
max_tokens: int = 2048
max_tokens: int = 2048,
urls: Optional[List[str]] = None,
mode: str = "polished"
) -> Dict[str, Any]:
"""
Generate grounded content using native Google Search grounding.
@@ -73,14 +76,29 @@ class GeminiGroundedProvider:
# Build the grounded prompt
grounded_prompt = self._build_grounded_prompt(prompt, content_type)
# Configure the grounding tool
grounding_tool = types.Tool(
google_search=types.GoogleSearch()
)
# Configure tools: Google Search and optional URL Context
tools: List[Any] = [
types.Tool(google_search=types.GoogleSearch())
]
if urls:
try:
# URL Context tool (ai.google.dev URL Context)
tools.append(types.Tool(url_context=types.UrlContext()))
logger.info(f"Enabled URL Context tool for {len(urls)} URLs")
except Exception as tool_err:
logger.warning(f"URL Context tool not available in SDK version: {tool_err}")
# Apply mode presets (Draft vs Polished)
model_id = "gemini-2.5-flash"
if mode == "draft":
model_id = "gemini-2.5-flash-lite"
temperature = min(1.0, max(0.0, temperature))
else:
model_id = "gemini-2.5-flash"
# Configure generation settings
config = types.GenerateContentConfig(
tools=[grounding_tool],
tools=tools,
max_output_tokens=max_tokens,
temperature=temperature
)
@@ -90,20 +108,27 @@ class GeminiGroundedProvider:
import concurrent.futures
try:
# Run the synchronous generate_content in a thread pool to make it awaitable
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
response = await asyncio.wait_for(
loop.run_in_executor(
executor,
lambda: self.client.models.generate_content(
model="gemini-2.5-flash",
contents=grounded_prompt,
config=config,
)
),
timeout=self.timeout
)
# Cache first
cache_key = self._make_cache_key(model_id, grounded_prompt, urls)
if cache_key in self._cache:
logger.info("Cache hit for grounded content request")
response = self._cache[cache_key]
else:
# Run the synchronous generate_content in a thread pool to make it awaitable
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
response = await asyncio.wait_for(
loop.run_in_executor(
executor,
lambda: self.client.models.generate_content(
model=model_id,
contents=self._inject_urls_into_prompt(grounded_prompt, urls) if urls else grounded_prompt,
config=config,
)
),
timeout=self.timeout
)
self._cache[cache_key] = response
except asyncio.TimeoutError:
raise Exception(f"Gemini API request timed out after {self.timeout} seconds")
except Exception as api_error:
@@ -112,14 +137,14 @@ class GeminiGroundedProvider:
if "503" in error_str and "overloaded" in error_str:
# Conservative retry for overloaded service (expensive API calls)
response = await self._retry_with_backoff(
lambda: self._make_api_request(grounded_prompt, config),
lambda: self._make_api_request_with_model(grounded_prompt, config, model_id, urls),
max_retries=1, # Only 1 retry to avoid excessive costs
base_delay=5 # Longer delay
)
elif "429" in error_str:
# Conservative retry for rate limits
response = await self._retry_with_backoff(
lambda: self._make_api_request(grounded_prompt, config),
lambda: self._make_api_request_with_model(grounded_prompt, config, model_id, urls),
max_retries=1, # Only 1 retry
base_delay=10 # Much longer delay for rate limits
)
@@ -132,6 +157,15 @@ class GeminiGroundedProvider:
# Process the grounded response
result = self._process_grounded_response(response, content_type)
# Attach URL Context metadata if present
try:
if hasattr(response, 'candidates') and response.candidates:
candidate0 = response.candidates[0]
if hasattr(candidate0, 'url_context_metadata') and candidate0.url_context_metadata:
result['url_context_metadata'] = candidate0.url_context_metadata
logger.info("Attached url_context_metadata to result")
except Exception as meta_err:
logger.warning(f"Unable to attach url_context_metadata: {meta_err}")
logger.info(f"✅ Grounded content generated successfully with {len(result.get('sources', []))} sources")
return result
@@ -162,6 +196,41 @@ class GeminiGroundedProvider:
),
timeout=self.timeout
)
async def _make_api_request_with_model(self, grounded_prompt: str, config: Any, model_id: str, urls: Optional[List[str]] = None):
"""Make the API request with explicit model id and optional URL injection."""
import concurrent.futures
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
resp = await asyncio.wait_for(
loop.run_in_executor(
executor,
lambda: self.client.models.generate_content(
model=model_id,
contents=self._inject_urls_into_prompt(grounded_prompt, urls) if urls else grounded_prompt,
config=config,
)
),
timeout=self.timeout
)
self._cache[self._make_cache_key(model_id, grounded_prompt, urls)] = resp
return resp
def _inject_urls_into_prompt(self, prompt: str, urls: Optional[List[str]]) -> str:
"""Append URLs to the prompt for URL Context tool to pick up (as per docs)."""
if not urls:
return prompt
safe_urls = [u for u in urls if isinstance(u, str) and u.startswith("http")]
if not safe_urls:
return prompt
urls_block = "\n".join(safe_urls[:20])
return f"{prompt}\n\nSOURCE URLS (use url_context to retrieve content):\n{urls_block}"
def _make_cache_key(self, model_id: str, prompt: str, urls: Optional[List[str]]) -> str:
import hashlib
u = "|".join((urls or [])[:20])
base = f"{model_id}|{prompt}|{u}"
return hashlib.sha256(base.encode("utf-8")).hexdigest()
async def _retry_with_backoff(self, func, max_retries: int = 3, base_delay: float = 1.0):
"""Retry a function with exponential backoff."""

View File

@@ -390,11 +390,19 @@ def gemini_structured_json_response(prompt, schema, temperature=0.7, top_p=0.9,
)
# Check for parsed content first (primary method for structured output)
if hasattr(response, 'parsed') and response.parsed is not None:
logger.info("Using response.parsed for structured output")
return response.parsed
if hasattr(response, 'parsed'):
logger.info(f"Response has parsed attribute: {response.parsed is not None}")
if response.parsed is not None:
logger.info("Using response.parsed for structured output")
return response.parsed
else:
logger.warning("Response.parsed is None, falling back to text parsing")
# Debug: Check if there's any text content
if hasattr(response, 'text') and response.text:
logger.info(f"Text response length: {len(response.text)}")
logger.debug(f"Text response preview: {response.text[:200]}...")
# Check for text content as fallback
# Check for text content as fallback (only if no parsed content)
if hasattr(response, 'text') and response.text:
logger.info("No parsed content, trying to parse text response")
try:

File diff suppressed because it is too large Load Diff

View File

@@ -12,6 +12,7 @@ from services.llm_providers.gemini_provider import gemini_structured_json_respon
from .data_collector import OnboardingDataCollector
from .prompt_builder import PersonaPromptBuilder
from services.persona.linkedin.linkedin_persona_service import LinkedInPersonaService
from services.persona.facebook.facebook_persona_service import FacebookPersonaService
class CorePersonaService:
@@ -22,6 +23,7 @@ class CorePersonaService:
self.data_collector = OnboardingDataCollector()
self.prompt_builder = PersonaPromptBuilder()
self.linkedin_service = LinkedInPersonaService()
self.facebook_service = FacebookPersonaService()
logger.info("CorePersonaService initialized")
def generate_core_persona(self, onboarding_data: Dict[str, Any]) -> Dict[str, Any]:
@@ -79,6 +81,10 @@ class CorePersonaService:
if platform.lower() == "linkedin":
return self.linkedin_service.generate_linkedin_persona(core_persona, onboarding_data)
# Use Facebook service for Facebook platform
if platform.lower() == "facebook":
return self.facebook_service.generate_facebook_persona(core_persona, onboarding_data)
# Use generic platform adaptation for other platforms
platform_constraints = self._get_platform_constraints(platform)
prompt = self.prompt_builder.build_platform_adaptation_prompt(core_persona, platform, onboarding_data, platform_constraints)

View File

@@ -0,0 +1,629 @@
"""
Enhanced Linguistic Analysis Service
Advanced analysis for better writing style mimicry and persona quality.
"""
import re
import json
from typing import Dict, Any, List, Tuple
from collections import Counter, defaultdict
from loguru import logger
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from nltk.tag import pos_tag
from textstat import flesch_reading_ease, flesch_kincaid_grade
import spacy
class EnhancedLinguisticAnalyzer:
"""Advanced linguistic analysis for persona creation and improvement."""
def __init__(self):
"""Initialize the linguistic analyzer."""
self.nlp = None
try:
# Try to load spaCy model
self.nlp = spacy.load("en_core_web_sm")
except OSError:
logger.warning("spaCy model not found. Install with: python -m spacy download en_core_web_sm")
# Download required NLTK data
try:
nltk.data.find('tokenizers/punkt')
nltk.data.find('corpora/stopwords')
nltk.data.find('taggers/averaged_perceptron_tagger')
except LookupError:
logger.warning("NLTK data not found. Downloading required data...")
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)
def analyze_writing_style(self, text_samples: List[str]) -> Dict[str, Any]:
"""
Comprehensive analysis of writing style from multiple text samples.
Args:
text_samples: List of text samples to analyze
Returns:
Detailed linguistic analysis
"""
try:
logger.info(f"Analyzing writing style from {len(text_samples)} text samples")
# Combine all text samples
combined_text = " ".join(text_samples)
# Basic metrics
basic_metrics = self._analyze_basic_metrics(combined_text)
# Sentence analysis
sentence_analysis = self._analyze_sentence_patterns(combined_text)
# Vocabulary analysis
vocabulary_analysis = self._analyze_vocabulary(combined_text)
# Rhetorical analysis
rhetorical_analysis = self._analyze_rhetorical_devices(combined_text)
# Style patterns
style_patterns = self._analyze_style_patterns(combined_text)
# Readability analysis
readability_analysis = self._analyze_readability(combined_text)
# Emotional tone analysis
emotional_analysis = self._analyze_emotional_tone(combined_text)
# Consistency analysis
consistency_analysis = self._analyze_consistency(text_samples)
return {
"basic_metrics": basic_metrics,
"sentence_analysis": sentence_analysis,
"vocabulary_analysis": vocabulary_analysis,
"rhetorical_analysis": rhetorical_analysis,
"style_patterns": style_patterns,
"readability_analysis": readability_analysis,
"emotional_analysis": emotional_analysis,
"consistency_analysis": consistency_analysis,
"analysis_metadata": {
"sample_count": len(text_samples),
"total_words": basic_metrics["total_words"],
"total_sentences": basic_metrics["total_sentences"],
"analysis_confidence": self._calculate_analysis_confidence(text_samples)
}
}
except Exception as e:
logger.error(f"Error analyzing writing style: {str(e)}")
return {"error": f"Failed to analyze writing style: {str(e)}"}
def _analyze_basic_metrics(self, text: str) -> Dict[str, Any]:
"""Analyze basic text metrics."""
sentences = sent_tokenize(text)
words = word_tokenize(text.lower())
# Filter out punctuation
words = [word for word in words if word.isalpha()]
return {
"total_words": len(words),
"total_sentences": len(sentences),
"average_sentence_length": len(words) / len(sentences) if sentences else 0,
"average_word_length": sum(len(word) for word in words) / len(words) if words else 0,
"paragraph_count": len(text.split('\n\n')),
"character_count": len(text),
"character_count_no_spaces": len(text.replace(' ', ''))
}
def _analyze_sentence_patterns(self, text: str) -> Dict[str, Any]:
"""Analyze sentence structure patterns."""
sentences = sent_tokenize(text)
sentence_lengths = [len(word_tokenize(sent)) for sent in sentences]
sentence_types = []
for sentence in sentences:
if sentence.endswith('?'):
sentence_types.append('question')
elif sentence.endswith('!'):
sentence_types.append('exclamation')
else:
sentence_types.append('declarative')
# Analyze sentence beginnings
sentence_beginnings = []
for sentence in sentences:
first_word = word_tokenize(sentence)[0].lower() if word_tokenize(sentence) else ""
sentence_beginnings.append(first_word)
return {
"sentence_length_distribution": {
"min": min(sentence_lengths) if sentence_lengths else 0,
"max": max(sentence_lengths) if sentence_lengths else 0,
"average": sum(sentence_lengths) / len(sentence_lengths) if sentence_lengths else 0,
"median": sorted(sentence_lengths)[len(sentence_lengths)//2] if sentence_lengths else 0
},
"sentence_type_distribution": dict(Counter(sentence_types)),
"common_sentence_starters": dict(Counter(sentence_beginnings).most_common(10)),
"sentence_complexity": self._analyze_sentence_complexity(sentences)
}
def _analyze_vocabulary(self, text: str) -> Dict[str, Any]:
"""Analyze vocabulary patterns and preferences."""
words = word_tokenize(text.lower())
words = [word for word in words if word.isalpha()]
# Remove stopwords for analysis
stop_words = set(stopwords.words('english'))
content_words = [word for word in words if word not in stop_words]
# POS tagging
pos_tags = pos_tag(words)
pos_distribution = dict(Counter(tag for word, tag in pos_tags))
# Vocabulary richness
unique_words = set(words)
unique_content_words = set(content_words)
return {
"vocabulary_size": len(unique_words),
"content_vocabulary_size": len(unique_content_words),
"lexical_diversity": len(unique_words) / len(words) if words else 0,
"most_frequent_words": dict(Counter(words).most_common(20)),
"most_frequent_content_words": dict(Counter(content_words).most_common(20)),
"pos_distribution": pos_distribution,
"word_length_distribution": {
"short_words": len([w for w in words if len(w) <= 4]),
"medium_words": len([w for w in words if 5 <= len(w) <= 8]),
"long_words": len([w for w in words if len(w) > 8])
},
"vocabulary_sophistication": self._analyze_vocabulary_sophistication(words)
}
def _analyze_rhetorical_devices(self, text: str) -> Dict[str, Any]:
"""Analyze rhetorical devices and techniques."""
sentences = sent_tokenize(text)
rhetorical_devices = {
"questions": len([s for s in sentences if s.strip().endswith('?')]),
"exclamations": len([s for s in sentences if s.strip().endswith('!')]),
"repetition": self._find_repetition_patterns(text),
"alliteration": self._find_alliteration(text),
"metaphors": self._find_metaphors(text),
"analogies": self._find_analogies(text),
"lists": self._find_lists(text),
"contrasts": self._find_contrasts(text)
}
return rhetorical_devices
def _analyze_style_patterns(self, text: str) -> Dict[str, Any]:
"""Analyze writing style patterns."""
return {
"formality_level": self._assess_formality(text),
"personal_pronouns": self._count_personal_pronouns(text),
"passive_voice": self._count_passive_voice(text),
"contractions": self._count_contractions(text),
"transition_words": self._find_transition_words(text),
"hedging_language": self._find_hedging_language(text),
"emphasis_patterns": self._find_emphasis_patterns(text)
}
def _analyze_readability(self, text: str) -> Dict[str, Any]:
"""Analyze readability metrics."""
try:
return {
"flesch_reading_ease": flesch_reading_ease(text),
"flesch_kincaid_grade": flesch_kincaid_grade(text),
"reading_level": self._determine_reading_level(flesch_reading_ease(text)),
"complexity_score": self._calculate_complexity_score(text)
}
except Exception as e:
logger.warning(f"Error calculating readability: {e}")
return {"error": "Could not calculate readability metrics"}
def _analyze_emotional_tone(self, text: str) -> Dict[str, Any]:
"""Analyze emotional tone and sentiment patterns."""
# Simple sentiment analysis based on word patterns
positive_words = ['good', 'great', 'excellent', 'amazing', 'wonderful', 'fantastic', 'love', 'like', 'enjoy']
negative_words = ['bad', 'terrible', 'awful', 'hate', 'dislike', 'horrible', 'worst', 'problem', 'issue']
words = word_tokenize(text.lower())
positive_count = sum(1 for word in words if word in positive_words)
negative_count = sum(1 for word in words if word in negative_words)
return {
"sentiment_bias": "positive" if positive_count > negative_count else "negative" if negative_count > positive_count else "neutral",
"positive_word_count": positive_count,
"negative_word_count": negative_count,
"emotional_intensity": self._calculate_emotional_intensity(text),
"tone_consistency": self._assess_tone_consistency(text)
}
def _analyze_consistency(self, text_samples: List[str]) -> Dict[str, Any]:
"""Analyze consistency across multiple text samples."""
if len(text_samples) < 2:
return {"consistency_score": 100, "note": "Only one sample provided"}
# Analyze consistency in various metrics
sentence_lengths = []
vocabulary_sets = []
for sample in text_samples:
sentences = sent_tokenize(sample)
words = word_tokenize(sample.lower())
words = [word for word in words if word.isalpha()]
sentence_lengths.append([len(word_tokenize(sent)) for sent in sentences])
vocabulary_sets.append(set(words))
# Calculate consistency scores
avg_sentence_length_consistency = self._calculate_metric_consistency(
[sum(lengths)/len(lengths) for lengths in sentence_lengths]
)
vocabulary_overlap = self._calculate_vocabulary_overlap(vocabulary_sets)
return {
"consistency_score": (avg_sentence_length_consistency + vocabulary_overlap) / 2,
"sentence_length_consistency": avg_sentence_length_consistency,
"vocabulary_consistency": vocabulary_overlap,
"style_stability": self._assess_style_stability(text_samples)
}
def _calculate_analysis_confidence(self, text_samples: List[str]) -> float:
"""Calculate confidence in the analysis based on data quality."""
if not text_samples:
return 0.0
total_words = sum(len(word_tokenize(sample)) for sample in text_samples)
sample_count = len(text_samples)
# Confidence based on amount of data
word_confidence = min(100, (total_words / 1000) * 100) # 1000 words = 100% confidence
sample_confidence = min(100, (sample_count / 5) * 100) # 5 samples = 100% confidence
return (word_confidence + sample_confidence) / 2
# Helper methods for specific analyses
def _analyze_sentence_complexity(self, sentences: List[str]) -> Dict[str, Any]:
"""Analyze sentence complexity patterns."""
complex_sentences = 0
compound_sentences = 0
for sentence in sentences:
if ',' in sentence and ('and' in sentence or 'but' in sentence or 'or' in sentence):
compound_sentences += 1
if len(word_tokenize(sentence)) > 20:
complex_sentences += 1
return {
"complex_sentence_ratio": complex_sentences / len(sentences) if sentences else 0,
"compound_sentence_ratio": compound_sentences / len(sentences) if sentences else 0,
"average_clauses_per_sentence": self._count_clauses(sentences)
}
def _analyze_vocabulary_sophistication(self, words: List[str]) -> Dict[str, Any]:
"""Analyze vocabulary sophistication level."""
# Simple heuristic based on word length and frequency
long_words = [w for w in words if len(w) > 7]
rare_words = [w for w in words if len(w) > 5] # Simplified rare word detection
return {
"sophistication_score": (len(long_words) + len(rare_words)) / len(words) * 100 if words else 0,
"long_word_ratio": len(long_words) / len(words) if words else 0,
"rare_word_ratio": len(rare_words) / len(words) if words else 0
}
def _find_repetition_patterns(self, text: str) -> Dict[str, Any]:
"""Find repetition patterns in text."""
words = word_tokenize(text.lower())
word_freq = Counter(words)
# Find words that appear multiple times
repeated_words = {word: count for word, count in word_freq.items() if count > 2}
return {
"repeated_words": repeated_words,
"repetition_score": len(repeated_words) / len(set(words)) * 100 if words else 0
}
def _find_alliteration(self, text: str) -> List[str]:
"""Find alliteration patterns."""
sentences = sent_tokenize(text)
alliterations = []
for sentence in sentences:
words = word_tokenize(sentence.lower())
words = [word for word in words if word.isalpha()]
if len(words) >= 2:
for i in range(len(words) - 1):
if words[i][0] == words[i+1][0]:
alliterations.append(f"{words[i]} {words[i+1]}")
return alliterations
def _find_metaphors(self, text: str) -> List[str]:
"""Find potential metaphors in text."""
# Simple metaphor detection based on common patterns
metaphor_patterns = [
r'\b(is|are|was|were)\s+(like|as)\s+',
r'\b(like|as)\s+\w+\s+(is|are|was|were)',
r'\b(metaphorically|figuratively)'
]
metaphors = []
for pattern in metaphor_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
metaphors.extend(matches)
return metaphors
def _find_analogies(self, text: str) -> List[str]:
"""Find analogies in text."""
analogy_patterns = [
r'\b(just as|similar to|comparable to|akin to)',
r'\b(in the same way|likewise|similarly)'
]
analogies = []
for pattern in analogy_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
analogies.extend(matches)
return analogies
def _find_lists(self, text: str) -> List[str]:
"""Find list patterns in text."""
list_patterns = [
r'\b(first|second|third|lastly|finally)',
r'\b(one|two|three|four|five)',
r'\b(•|\*|\-|\d+\.)'
]
lists = []
for pattern in list_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
lists.extend(matches)
return lists
def _find_contrasts(self, text: str) -> List[str]:
"""Find contrast patterns in text."""
contrast_words = ['but', 'however', 'although', 'whereas', 'while', 'on the other hand', 'in contrast']
contrasts = []
for word in contrast_words:
if word in text.lower():
contrasts.append(word)
return contrasts
def _assess_formality(self, text: str) -> str:
"""Assess formality level of text."""
formal_indicators = ['therefore', 'furthermore', 'moreover', 'consequently', 'nevertheless']
informal_indicators = ['gonna', 'wanna', 'gotta', 'yeah', 'ok', 'cool']
formal_count = sum(1 for indicator in formal_indicators if indicator in text.lower())
informal_count = sum(1 for indicator in informal_indicators if indicator in text.lower())
if formal_count > informal_count:
return "formal"
elif informal_count > formal_count:
return "informal"
else:
return "neutral"
def _count_personal_pronouns(self, text: str) -> Dict[str, int]:
"""Count personal pronouns in text."""
pronouns = ['i', 'me', 'my', 'mine', 'myself', 'we', 'us', 'our', 'ours', 'ourselves',
'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 'his', 'himself',
'she', 'her', 'hers', 'herself', 'they', 'them', 'their', 'theirs', 'themselves']
words = word_tokenize(text.lower())
pronoun_count = {pronoun: words.count(pronoun) for pronoun in pronouns}
return pronoun_count
def _count_passive_voice(self, text: str) -> int:
"""Count passive voice constructions."""
passive_patterns = [
r'\b(was|were|is|are|been|being)\s+\w+ed\b',
r'\b(was|were|is|are|been|being)\s+\w+en\b'
]
passive_count = 0
for pattern in passive_patterns:
passive_count += len(re.findall(pattern, text, re.IGNORECASE))
return passive_count
def _count_contractions(self, text: str) -> int:
"""Count contractions in text."""
contraction_pattern = r"\b\w+'\w+\b"
return len(re.findall(contraction_pattern, text))
def _find_transition_words(self, text: str) -> List[str]:
"""Find transition words in text."""
transition_words = ['however', 'therefore', 'furthermore', 'moreover', 'nevertheless',
'consequently', 'meanwhile', 'additionally', 'similarly', 'likewise',
'on the other hand', 'in contrast', 'for example', 'for instance']
found_transitions = []
for word in transition_words:
if word in text.lower():
found_transitions.append(word)
return found_transitions
def _find_hedging_language(self, text: str) -> List[str]:
"""Find hedging language in text."""
hedging_words = ['might', 'could', 'possibly', 'perhaps', 'maybe', 'likely', 'probably',
'seems', 'appears', 'suggests', 'indicates', 'tends to']
found_hedging = []
for word in hedging_words:
if word in text.lower():
found_hedging.append(word)
return found_hedging
def _find_emphasis_patterns(self, text: str) -> Dict[str, Any]:
"""Find emphasis patterns in text."""
emphasis_patterns = {
'bold_asterisks': len(re.findall(r'\*\w+\*', text)),
'bold_underscores': len(re.findall(r'_\w+_', text)),
'caps_words': len(re.findall(r'\b[A-Z]{2,}\b', text)),
'exclamation_points': text.count('!'),
'emphasis_words': len(re.findall(r'\b(very|really|extremely|absolutely|completely)\b', text, re.IGNORECASE))
}
return emphasis_patterns
def _determine_reading_level(self, flesch_score: float) -> str:
"""Determine reading level from Flesch score."""
if flesch_score >= 90:
return "very_easy"
elif flesch_score >= 80:
return "easy"
elif flesch_score >= 70:
return "fairly_easy"
elif flesch_score >= 60:
return "standard"
elif flesch_score >= 50:
return "fairly_difficult"
elif flesch_score >= 30:
return "difficult"
else:
return "very_difficult"
def _calculate_complexity_score(self, text: str) -> float:
"""Calculate overall complexity score."""
sentences = sent_tokenize(text)
words = word_tokenize(text.lower())
words = [word for word in words if word.isalpha()]
if not sentences or not words:
return 0.0
# Factors: sentence length, word length, vocabulary diversity
avg_sentence_length = len(words) / len(sentences)
avg_word_length = sum(len(word) for word in words) / len(words)
vocabulary_diversity = len(set(words)) / len(words)
# Normalize and combine
complexity = (avg_sentence_length / 20) * 0.4 + (avg_word_length / 10) * 0.3 + vocabulary_diversity * 0.3
return min(100, complexity * 100)
def _calculate_emotional_intensity(self, text: str) -> float:
"""Calculate emotional intensity of text."""
emotional_words = ['amazing', 'incredible', 'fantastic', 'terrible', 'awful', 'horrible',
'love', 'hate', 'passion', 'fury', 'joy', 'sorrow', 'excitement', 'fear']
words = word_tokenize(text.lower())
emotional_word_count = sum(1 for word in words if word in emotional_words)
return (emotional_word_count / len(words)) * 100 if words else 0
def _assess_tone_consistency(self, text: str) -> float:
"""Assess tone consistency throughout text."""
# Simple heuristic: check for tone shifts
sentences = sent_tokenize(text)
if len(sentences) < 2:
return 100.0
# Analyze first half vs second half
mid_point = len(sentences) // 2
first_half = " ".join(sentences[:mid_point])
second_half = " ".join(sentences[mid_point:])
first_tone = self._analyze_emotional_tone(first_half)
second_tone = self._analyze_emotional_tone(second_half)
# Calculate consistency based on sentiment similarity
if first_tone["sentiment_bias"] == second_tone["sentiment_bias"]:
return 100.0
else:
return 50.0
def _calculate_metric_consistency(self, values: List[float]) -> float:
"""Calculate consistency of a metric across samples."""
if len(values) < 2:
return 100.0
mean_value = sum(values) / len(values)
variance = sum((x - mean_value) ** 2 for x in values) / len(values)
std_dev = variance ** 0.5
# Convert to consistency score (lower std dev = higher consistency)
consistency = max(0, 100 - (std_dev / mean_value * 100)) if mean_value > 0 else 100
return consistency
def _calculate_vocabulary_overlap(self, vocabulary_sets: List[set]) -> float:
"""Calculate vocabulary overlap across samples."""
if len(vocabulary_sets) < 2:
return 100.0
# Calculate pairwise overlaps
overlaps = []
for i in range(len(vocabulary_sets)):
for j in range(i + 1, len(vocabulary_sets)):
intersection = len(vocabulary_sets[i] & vocabulary_sets[j])
union = len(vocabulary_sets[i] | vocabulary_sets[j])
overlap = (intersection / union * 100) if union > 0 else 0
overlaps.append(overlap)
return sum(overlaps) / len(overlaps) if overlaps else 0
def _assess_style_stability(self, text_samples: List[str]) -> Dict[str, Any]:
"""Assess style stability across samples."""
if len(text_samples) < 2:
return {"stability_score": 100, "note": "Only one sample provided"}
# Analyze consistency in key style metrics
metrics = []
for sample in text_samples:
sample_metrics = {
"avg_sentence_length": len(word_tokenize(sample)) / len(sent_tokenize(sample)),
"formality": self._assess_formality(sample),
"emotional_intensity": self._calculate_emotional_intensity(sample)
}
metrics.append(sample_metrics)
# Calculate stability scores
sentence_length_stability = self._calculate_metric_consistency(
[m["avg_sentence_length"] for m in metrics]
)
emotional_stability = self._calculate_metric_consistency(
[m["emotional_intensity"] for m in metrics]
)
# Formality consistency
formality_values = [m["formality"] for m in metrics]
formality_consistency = 100 if len(set(formality_values)) == 1 else 50
overall_stability = (sentence_length_stability + emotional_stability + formality_consistency) / 3
return {
"stability_score": overall_stability,
"sentence_length_stability": sentence_length_stability,
"emotional_stability": emotional_stability,
"formality_consistency": formality_consistency
}
def _count_clauses(self, sentences: List[str]) -> float:
"""Count average clauses per sentence."""
total_clauses = 0
for sentence in sentences:
# Simple clause counting based on conjunctions and punctuation
clauses = len(re.findall(r'[,;]', sentence)) + 1
total_clauses += clauses
return total_clauses / len(sentences) if sentences else 0
a

View File

@@ -0,0 +1,781 @@
"""
Persona Quality Improvement Service
Continuously improves persona quality through feedback and learning.
"""
import json
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime, timedelta
from loguru import logger
from sqlalchemy.orm import Session
from models.enhanced_persona_models import (
EnhancedWritingPersona,
EnhancedPlatformPersona,
PersonaQualityMetrics,
PersonaLearningData
)
from services.database import get_db_session
from services.persona.enhanced_linguistic_analyzer import EnhancedLinguisticAnalyzer
class PersonaQualityImprover:
"""Service for continuously improving persona quality and accuracy."""
def __init__(self):
"""Initialize the quality improver."""
self.linguistic_analyzer = EnhancedLinguisticAnalyzer()
logger.info("PersonaQualityImprover initialized")
def assess_persona_quality(self, persona_id: int, user_feedback: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Assess the quality of a persona and provide improvement suggestions.
Args:
persona_id: ID of the persona to assess
user_feedback: Optional user feedback data
Returns:
Quality assessment results
"""
try:
session = get_db_session()
# Get persona data
persona = session.query(EnhancedWritingPersona).filter(
EnhancedWritingPersona.id == persona_id
).first()
if not persona:
return {"error": "Persona not found"}
# Perform quality assessment
quality_metrics = self._perform_quality_assessment(persona, user_feedback)
# Save quality metrics
self._save_quality_metrics(session, persona_id, quality_metrics, user_feedback)
# Generate improvement suggestions
improvement_suggestions = self._generate_improvement_suggestions(quality_metrics)
session.close()
return {
"persona_id": persona_id,
"quality_metrics": quality_metrics,
"improvement_suggestions": improvement_suggestions,
"assessment_date": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error assessing persona quality: {str(e)}")
return {"error": f"Failed to assess persona quality: {str(e)}"}
def improve_persona_from_feedback(self, persona_id: int, feedback_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Improve persona based on user feedback and performance data.
Args:
persona_id: ID of the persona to improve
feedback_data: User feedback and performance data
Returns:
Improvement results
"""
try:
session = get_db_session()
# Get current persona
persona = session.query(EnhancedWritingPersona).filter(
EnhancedWritingPersona.id == persona_id
).first()
if not persona:
return {"error": "Persona not found"}
# Analyze feedback
feedback_analysis = self._analyze_feedback(feedback_data)
# Generate improvements
improvements = self._generate_persona_improvements(persona, feedback_analysis)
# Apply improvements
updated_persona = self._apply_improvements(session, persona, improvements)
# Save learning data
self._save_learning_data(session, persona_id, feedback_data, improvements)
session.commit()
session.close()
return {
"persona_id": persona_id,
"improvements_applied": improvements,
"updated_persona": updated_persona.to_dict(),
"improvement_date": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error improving persona: {str(e)}")
return {"error": f"Failed to improve persona: {str(e)}"}
def learn_from_content_performance(self, persona_id: int, content_performance: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Learn from content performance data to improve persona.
Args:
persona_id: ID of the persona to improve
content_performance: List of content performance data
Returns:
Learning results
"""
try:
session = get_db_session()
# Analyze performance patterns
performance_analysis = self._analyze_performance_patterns(content_performance)
# Identify successful patterns
successful_patterns = self._identify_successful_patterns(content_performance)
# Generate learning insights
learning_insights = self._generate_learning_insights(performance_analysis, successful_patterns)
# Apply learning to persona
persona_updates = self._apply_performance_learning(persona_id, learning_insights)
# Save learning data
self._save_performance_learning(session, persona_id, content_performance, learning_insights)
session.commit()
session.close()
return {
"persona_id": persona_id,
"learning_insights": learning_insights,
"persona_updates": persona_updates,
"learning_date": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error learning from performance: {str(e)}")
return {"error": f"Failed to learn from performance: {str(e)}"}
def _perform_quality_assessment(self, persona: EnhancedWritingPersona, user_feedback: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""Perform comprehensive quality assessment of a persona."""
# Linguistic analysis quality
linguistic_quality = self._assess_linguistic_quality(persona)
# Consistency assessment
consistency_score = self._assess_consistency(persona)
# Authenticity assessment
authenticity_score = self._assess_authenticity(persona)
# User satisfaction (if feedback provided)
user_satisfaction = self._assess_user_satisfaction(user_feedback) if user_feedback else None
# Platform optimization quality
platform_quality = self._assess_platform_optimization(persona)
# Overall quality score
quality_scores = [linguistic_quality, consistency_score, authenticity_score, platform_quality]
if user_satisfaction is not None:
quality_scores.append(user_satisfaction)
overall_quality = sum(quality_scores) / len(quality_scores)
return {
"overall_quality_score": overall_quality,
"linguistic_quality": linguistic_quality,
"consistency_score": consistency_score,
"authenticity_score": authenticity_score,
"user_satisfaction": user_satisfaction,
"platform_optimization_quality": platform_quality,
"quality_breakdown": {
"linguistic_analysis_completeness": self._assess_analysis_completeness(persona),
"style_consistency": consistency_score,
"brand_alignment": authenticity_score,
"platform_adaptation_quality": platform_quality
}
}
def _assess_linguistic_quality(self, persona: EnhancedWritingPersona) -> float:
"""Assess the quality of linguistic analysis."""
linguistic_fingerprint = persona.linguistic_fingerprint or {}
# Check completeness of linguistic analysis
required_fields = [
'sentence_analysis', 'vocabulary_analysis', 'rhetorical_analysis',
'style_patterns', 'readability_analysis'
]
completeness_score = 0
for field in required_fields:
if field in linguistic_fingerprint and linguistic_fingerprint[field]:
completeness_score += 20
# Check quality of analysis
quality_indicators = 0
if linguistic_fingerprint.get('sentence_analysis', {}).get('sentence_length_distribution'):
quality_indicators += 1
if linguistic_fingerprint.get('vocabulary_analysis', {}).get('lexical_diversity'):
quality_indicators += 1
if linguistic_fingerprint.get('rhetorical_analysis', {}).get('questions'):
quality_indicators += 1
if linguistic_fingerprint.get('style_patterns', {}).get('formality_level'):
quality_indicators += 1
quality_score = (quality_indicators / 4) * 100
return (completeness_score + quality_score) / 2
def _assess_consistency(self, persona: EnhancedWritingPersona) -> float:
"""Assess consistency of the persona."""
consistency_analysis = persona.linguistic_fingerprint.get('consistency_analysis', {})
if not consistency_analysis:
return 50.0 # Default score if no consistency data
return consistency_analysis.get('consistency_score', 50.0)
def _assess_authenticity(self, persona: EnhancedWritingPersona) -> float:
"""Assess authenticity of the persona."""
# Check if persona reflects real user characteristics
source_data = persona.source_website_analysis or {}
# Authenticity indicators
authenticity_indicators = 0
total_indicators = 5
# Check for brand voice alignment
if persona.brand_voice_description:
authenticity_indicators += 1
# Check for core belief definition
if persona.core_belief:
authenticity_indicators += 1
# Check for archetype definition
if persona.archetype:
authenticity_indicators += 1
# Check for source data quality
if source_data.get('writing_style'):
authenticity_indicators += 1
# Check for confidence score
if persona.confidence_score and persona.confidence_score > 70:
authenticity_indicators += 1
return (authenticity_indicators / total_indicators) * 100
def _assess_user_satisfaction(self, user_feedback: Dict[str, Any]) -> float:
"""Assess user satisfaction from feedback."""
if not user_feedback:
return None
# Extract satisfaction metrics
satisfaction_score = user_feedback.get('satisfaction_score', 0)
content_quality_rating = user_feedback.get('content_quality_rating', 0)
style_match_rating = user_feedback.get('style_match_rating', 0)
# Calculate weighted average
if satisfaction_score and content_quality_rating and style_match_rating:
return (satisfaction_score + content_quality_rating + style_match_rating) / 3
elif satisfaction_score:
return satisfaction_score
else:
return 50.0 # Default if no clear satisfaction data
def _assess_platform_optimization(self, persona: EnhancedWritingPersona) -> float:
"""Assess platform optimization quality."""
platform_personas = persona.platform_personas
if not platform_personas:
return 0.0
total_score = 0
platform_count = 0
for platform_persona in platform_personas:
if platform_persona.is_active:
# Check platform-specific optimization completeness
platform_score = 0
if platform_persona.platform_linguistic_adaptation:
platform_score += 25
if platform_persona.platform_engagement_patterns:
platform_score += 25
if platform_persona.platform_content_optimization:
platform_score += 25
if platform_persona.platform_algorithm_insights:
platform_score += 25
total_score += platform_score
platform_count += 1
return total_score / platform_count if platform_count > 0 else 0.0
def _assess_analysis_completeness(self, persona: EnhancedWritingPersona) -> float:
"""Assess completeness of the persona analysis."""
completeness_indicators = 0
total_indicators = 8
# Core persona fields
if persona.persona_name:
completeness_indicators += 1
if persona.archetype:
completeness_indicators += 1
if persona.core_belief:
completeness_indicators += 1
if persona.brand_voice_description:
completeness_indicators += 1
# Linguistic analysis
if persona.linguistic_fingerprint:
completeness_indicators += 1
if persona.writing_style_signature:
completeness_indicators += 1
if persona.vocabulary_profile:
completeness_indicators += 1
if persona.sentence_patterns:
completeness_indicators += 1
return (completeness_indicators / total_indicators) * 100
def _generate_improvement_suggestions(self, quality_metrics: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Generate improvement suggestions based on quality metrics."""
suggestions = []
overall_score = quality_metrics.get('overall_quality_score', 0)
# Linguistic quality improvements
if quality_metrics.get('linguistic_quality', 0) < 70:
suggestions.append({
"category": "linguistic_analysis",
"priority": "high",
"suggestion": "Enhance linguistic analysis with more detailed sentence patterns and vocabulary analysis",
"action": "reanalyze_source_content"
})
# Consistency improvements
if quality_metrics.get('consistency_score', 0) < 70:
suggestions.append({
"category": "consistency",
"priority": "high",
"suggestion": "Improve consistency by analyzing more writing samples",
"action": "collect_additional_samples"
})
# Authenticity improvements
if quality_metrics.get('authenticity_score', 0) < 70:
suggestions.append({
"category": "authenticity",
"priority": "medium",
"suggestion": "Strengthen brand voice alignment and core belief definition",
"action": "refine_brand_analysis"
})
# Platform optimization improvements
if quality_metrics.get('platform_optimization_quality', 0) < 70:
suggestions.append({
"category": "platform_optimization",
"priority": "medium",
"suggestion": "Enhance platform-specific adaptations and algorithm insights",
"action": "update_platform_adaptations"
})
# User satisfaction improvements
user_satisfaction = quality_metrics.get('user_satisfaction')
if user_satisfaction is not None and user_satisfaction < 70:
suggestions.append({
"category": "user_satisfaction",
"priority": "high",
"suggestion": "Address user feedback and adjust persona based on preferences",
"action": "incorporate_user_feedback"
})
return suggestions
def _analyze_feedback(self, feedback_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze user feedback to extract improvement insights."""
return {
"satisfaction_level": feedback_data.get('satisfaction_score', 0),
"content_quality_rating": feedback_data.get('content_quality_rating', 0),
"style_match_rating": feedback_data.get('style_match_rating', 0),
"specific_complaints": feedback_data.get('complaints', []),
"specific_praises": feedback_data.get('praises', []),
"improvement_requests": feedback_data.get('improvement_requests', []),
"preferred_adjustments": feedback_data.get('preferred_adjustments', {})
}
def _generate_persona_improvements(self, persona: EnhancedWritingPersona, feedback_analysis: Dict[str, Any]) -> Dict[str, Any]:
"""Generate specific improvements based on feedback analysis."""
improvements = {}
# Style adjustments based on feedback
if feedback_analysis.get('style_match_rating', 0) < 70:
improvements['style_adjustments'] = {
"tone_adjustment": feedback_analysis.get('preferred_adjustments', {}).get('tone'),
"formality_adjustment": feedback_analysis.get('preferred_adjustments', {}).get('formality'),
"vocabulary_adjustment": feedback_analysis.get('preferred_adjustments', {}).get('vocabulary')
}
# Content quality improvements
if feedback_analysis.get('content_quality_rating', 0) < 70:
improvements['content_quality'] = {
"clarity_improvement": True,
"engagement_enhancement": True,
"structure_optimization": True
}
# Specific complaint addressing
complaints = feedback_analysis.get('specific_complaints', [])
if complaints:
improvements['complaint_resolutions'] = {
"addressed_complaints": complaints,
"resolution_strategies": self._generate_complaint_resolutions(complaints)
}
return improvements
def _generate_complaint_resolutions(self, complaints: List[str]) -> List[Dict[str, Any]]:
"""Generate resolution strategies for specific complaints."""
resolutions = []
for complaint in complaints:
complaint_lower = complaint.lower()
if 'too formal' in complaint_lower:
resolutions.append({
"complaint": complaint,
"resolution": "Reduce formality level and increase conversational tone",
"action": "adjust_formality_metrics"
})
elif 'too casual' in complaint_lower:
resolutions.append({
"complaint": complaint,
"resolution": "Increase formality level and professional tone",
"action": "adjust_formality_metrics"
})
elif 'too long' in complaint_lower:
resolutions.append({
"complaint": complaint,
"resolution": "Reduce average sentence length and improve conciseness",
"action": "adjust_sentence_length"
})
elif 'too short' in complaint_lower:
resolutions.append({
"complaint": complaint,
"resolution": "Increase sentence complexity and add more detail",
"action": "adjust_sentence_length"
})
elif 'boring' in complaint_lower or 'dull' in complaint_lower:
resolutions.append({
"complaint": complaint,
"resolution": "Add more engaging language and rhetorical devices",
"action": "enhance_engagement_patterns"
})
else:
resolutions.append({
"complaint": complaint,
"resolution": "General style adjustment based on feedback",
"action": "general_style_refinement"
})
return resolutions
def _apply_improvements(self, session: Session, persona: EnhancedWritingPersona, improvements: Dict[str, Any]) -> EnhancedWritingPersona:
"""Apply improvements to the persona."""
# Apply style adjustments
if 'style_adjustments' in improvements:
self._apply_style_adjustments(persona, improvements['style_adjustments'])
# Apply content quality improvements
if 'content_quality' in improvements:
self._apply_content_quality_improvements(persona, improvements['content_quality'])
# Apply complaint resolutions
if 'complaint_resolutions' in improvements:
self._apply_complaint_resolutions(persona, improvements['complaint_resolutions'])
# Update persona metadata
persona.updated_at = datetime.utcnow()
session.add(persona)
return persona
def _apply_style_adjustments(self, persona: EnhancedWritingPersona, style_adjustments: Dict[str, Any]):
"""Apply style adjustments to persona."""
# Update linguistic fingerprint based on adjustments
if not persona.linguistic_fingerprint:
persona.linguistic_fingerprint = {}
# Tone adjustment
if style_adjustments.get('tone_adjustment'):
persona.linguistic_fingerprint['adjusted_tone'] = style_adjustments['tone_adjustment']
# Formality adjustment
if style_adjustments.get('formality_adjustment'):
persona.linguistic_fingerprint['adjusted_formality'] = style_adjustments['formality_adjustment']
# Vocabulary adjustment
if style_adjustments.get('vocabulary_adjustment'):
persona.linguistic_fingerprint['adjusted_vocabulary'] = style_adjustments['vocabulary_adjustment']
def _apply_content_quality_improvements(self, persona: EnhancedWritingPersona, quality_improvements: Dict[str, Any]):
"""Apply content quality improvements to persona."""
if not persona.linguistic_fingerprint:
persona.linguistic_fingerprint = {}
# Add quality improvement markers
persona.linguistic_fingerprint['quality_improvements'] = {
"clarity_enhanced": quality_improvements.get('clarity_improvement', False),
"engagement_enhanced": quality_improvements.get('engagement_enhancement', False),
"structure_optimized": quality_improvements.get('structure_optimization', False),
"improvement_date": datetime.utcnow().isoformat()
}
def _apply_complaint_resolutions(self, persona: EnhancedWritingPersona, complaint_resolutions: Dict[str, Any]):
"""Apply complaint resolutions to persona."""
if not persona.linguistic_fingerprint:
persona.linguistic_fingerprint = {}
# Add complaint resolution tracking
persona.linguistic_fingerprint['complaint_resolutions'] = {
"addressed_complaints": complaint_resolutions.get('addressed_complaints', []),
"resolution_strategies": complaint_resolutions.get('resolution_strategies', []),
"resolution_date": datetime.utcnow().isoformat()
}
def _analyze_performance_patterns(self, content_performance: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze content performance patterns."""
if not content_performance:
return {}
# Calculate average performance metrics
total_content = len(content_performance)
avg_engagement = sum(item.get('engagement_rate', 0) for item in content_performance) / total_content
avg_reach = sum(item.get('reach', 0) for item in content_performance) / total_content
avg_clicks = sum(item.get('clicks', 0) for item in content_performance) / total_content
# Identify top performing content
top_performers = sorted(content_performance,
key=lambda x: x.get('engagement_rate', 0),
reverse=True)[:3]
# Analyze content characteristics of top performers
top_performer_analysis = self._analyze_top_performers(top_performers)
return {
"average_engagement_rate": avg_engagement,
"average_reach": avg_reach,
"average_clicks": avg_clicks,
"total_content_analyzed": total_content,
"top_performers": top_performer_analysis,
"performance_trends": self._identify_performance_trends(content_performance)
}
def _analyze_top_performers(self, top_performers: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze characteristics of top performing content."""
if not top_performers:
return {}
# Analyze common characteristics
content_types = [item.get('content_type') for item in top_performers]
topics = [item.get('topic') for item in top_performers]
lengths = [item.get('content_length') for item in top_performers]
return {
"common_content_types": list(set(content_types)),
"common_topics": list(set(topics)),
"average_length": sum(lengths) / len(lengths) if lengths else 0,
"performance_characteristics": {
"high_engagement_keywords": self._extract_high_engagement_keywords(top_performers),
"optimal_posting_times": self._extract_optimal_posting_times(top_performers),
"successful_formats": self._extract_successful_formats(top_performers)
}
}
def _extract_high_engagement_keywords(self, top_performers: List[Dict[str, Any]]) -> List[str]:
"""Extract keywords that appear in high-performing content."""
# This would analyze the content text for common keywords
# For now, return a placeholder
return ["innovation", "strategy", "growth", "success"]
def _extract_optimal_posting_times(self, top_performers: List[Dict[str, Any]]) -> List[str]:
"""Extract optimal posting times from top performers."""
posting_times = [item.get('posting_time') for item in top_performers if item.get('posting_time')]
return list(set(posting_times))
def _extract_successful_formats(self, top_performers: List[Dict[str, Any]]) -> List[str]:
"""Extract successful content formats from top performers."""
formats = [item.get('format') for item in top_performers if item.get('format')]
return list(set(formats))
def _identify_performance_trends(self, content_performance: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Identify performance trends over time."""
# Sort by date if available
sorted_performance = sorted(content_performance,
key=lambda x: x.get('date', ''),
reverse=True)
if len(sorted_performance) < 2:
return {"trend": "insufficient_data"}
# Calculate trend
recent_performance = sorted_performance[:len(sorted_performance)//2]
older_performance = sorted_performance[len(sorted_performance)//2:]
recent_avg = sum(item.get('engagement_rate', 0) for item in recent_performance) / len(recent_performance)
older_avg = sum(item.get('engagement_rate', 0) for item in older_performance) / len(older_performance)
if recent_avg > older_avg * 1.1:
trend = "improving"
elif recent_avg < older_avg * 0.9:
trend = "declining"
else:
trend = "stable"
return {
"trend": trend,
"recent_average": recent_avg,
"older_average": older_avg,
"change_percentage": ((recent_avg - older_avg) / older_avg * 100) if older_avg > 0 else 0
}
def _identify_successful_patterns(self, content_performance: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Identify patterns in successful content."""
# Filter for high-performing content (top 25%)
sorted_performance = sorted(content_performance,
key=lambda x: x.get('engagement_rate', 0),
reverse=True)
top_quarter = sorted_performance[:max(1, len(sorted_performance) // 4)]
return {
"high_performing_content_count": len(top_quarter),
"common_characteristics": self._analyze_top_performers(top_quarter),
"success_patterns": {
"optimal_length_range": self._calculate_optimal_length_range(top_quarter),
"preferred_content_types": self._get_preferred_content_types(top_quarter),
"successful_topic_categories": self._get_successful_topic_categories(top_quarter)
}
}
def _calculate_optimal_length_range(self, top_performers: List[Dict[str, Any]]) -> Dict[str, int]:
"""Calculate optimal content length range from top performers."""
lengths = [item.get('content_length', 0) for item in top_performers if item.get('content_length')]
if not lengths:
return {"min": 0, "max": 0, "average": 0}
return {
"min": min(lengths),
"max": max(lengths),
"average": sum(lengths) / len(lengths)
}
def _get_preferred_content_types(self, top_performers: List[Dict[str, Any]]) -> List[str]:
"""Get preferred content types from top performers."""
content_types = [item.get('content_type') for item in top_performers if item.get('content_type')]
return list(set(content_types))
def _get_successful_topic_categories(self, top_performers: List[Dict[str, Any]]) -> List[str]:
"""Get successful topic categories from top performers."""
topics = [item.get('topic_category') for item in top_performers if item.get('topic_category')]
return list(set(topics))
def _generate_learning_insights(self, performance_analysis: Dict[str, Any], successful_patterns: Dict[str, Any]) -> Dict[str, Any]:
"""Generate learning insights from performance analysis."""
return {
"performance_insights": {
"average_engagement": performance_analysis.get('average_engagement_rate', 0),
"performance_trend": performance_analysis.get('performance_trends', {}).get('trend', 'unknown'),
"top_performing_characteristics": performance_analysis.get('top_performers', {})
},
"success_patterns": successful_patterns,
"recommendations": {
"content_length_optimization": successful_patterns.get('success_patterns', {}).get('optimal_length_range', {}),
"content_type_preferences": successful_patterns.get('success_patterns', {}).get('preferred_content_types', []),
"topic_focus_areas": successful_patterns.get('success_patterns', {}).get('successful_topic_categories', [])
},
"learning_confidence": self._calculate_learning_confidence(performance_analysis, successful_patterns)
}
def _calculate_learning_confidence(self, performance_analysis: Dict[str, Any], successful_patterns: Dict[str, Any]) -> float:
"""Calculate confidence in learning insights."""
# Base confidence on amount of data
total_content = performance_analysis.get('total_content_analyzed', 0)
high_performers = successful_patterns.get('high_performing_content_count', 0)
# Confidence increases with more data
data_confidence = min(100, (total_content / 20) * 100) # 20 pieces of content = 100% confidence
# Confidence increases with more high performers
pattern_confidence = min(100, (high_performers / 5) * 100) # 5 high performers = 100% confidence
return (data_confidence + pattern_confidence) / 2
def _apply_performance_learning(self, persona_id: int, learning_insights: Dict[str, Any]) -> Dict[str, Any]:
"""Apply performance learning to persona."""
# This would update the persona based on learning insights
# For now, return the insights that would be applied
return {
"applied_insights": learning_insights,
"persona_updates": {
"content_length_preferences": learning_insights.get('recommendations', {}).get('content_length_optimization', {}),
"preferred_content_types": learning_insights.get('recommendations', {}).get('content_type_preferences', []),
"successful_topic_areas": learning_insights.get('recommendations', {}).get('topic_focus_areas', []),
"learning_confidence": learning_insights.get('learning_confidence', 0)
}
}
def _save_quality_metrics(self, session: Session, persona_id: int, quality_metrics: Dict[str, Any], user_feedback: Optional[Dict[str, Any]]):
"""Save quality metrics to database."""
quality_record = PersonaQualityMetrics(
writing_persona_id=persona_id,
style_accuracy=quality_metrics.get('linguistic_quality', 0),
content_quality=quality_metrics.get('overall_quality_score', 0),
engagement_rate=quality_metrics.get('platform_optimization_quality', 0),
consistency_score=quality_metrics.get('consistency_score', 0),
user_satisfaction=quality_metrics.get('user_satisfaction'),
user_feedback=json.dumps(user_feedback) if user_feedback else None,
ai_quality_assessment=json.dumps(quality_metrics),
improvement_suggestions=json.dumps(quality_metrics.get('improvement_suggestions', [])),
assessor_type="ai_automated"
)
session.add(quality_record)
def _save_learning_data(self, session: Session, persona_id: int, feedback_data: Dict[str, Any], improvements: Dict[str, Any]):
"""Save learning data to database."""
learning_record = PersonaLearningData(
writing_persona_id=persona_id,
user_writing_samples=json.dumps(feedback_data.get('writing_samples', [])),
successful_content_examples=json.dumps(feedback_data.get('successful_content', [])),
user_preferences=json.dumps(feedback_data.get('preferences', {})),
style_refinements=json.dumps(improvements.get('style_adjustments', {})),
vocabulary_updates=json.dumps(improvements.get('vocabulary_adjustments', {})),
pattern_adjustments=json.dumps(improvements.get('pattern_adjustments', {})),
learning_type="feedback"
)
session.add(learning_record)
def _save_performance_learning(self, session: Session, persona_id: int, content_performance: List[Dict[str, Any]], learning_insights: Dict[str, Any]):
"""Save performance learning data to database."""
learning_record = PersonaLearningData(
writing_persona_id=persona_id,
user_writing_samples=json.dumps(content_performance),
successful_content_examples=json.dumps(learning_insights.get('success_patterns', {})),
user_preferences=json.dumps(learning_insights.get('recommendations', {})),
style_refinements=json.dumps(learning_insights.get('persona_updates', {})),
learning_type="performance"
)
session.add(learning_record)

View File

@@ -247,8 +247,52 @@ def setup_environment():
# Set up billing and subscription system
setup_billing_tables()
# Set up persona tables
if setup_persona_tables():
# Verify persona tables were created successfully
verify_persona_tables()
else:
print("⚠️ Warning: Persona tables setup failed, but continuing...")
print("✅ Environment setup complete")
def setup_persona_tables():
"""Set up persona database tables."""
print("🔧 Setting up persona tables...")
try:
from services.database import engine
from models.persona_models import Base as PersonaBase
# Create persona tables
PersonaBase.metadata.create_all(bind=engine)
print("✅ Persona tables created successfully")
# Verify tables were created
from sqlalchemy import inspect
inspector = inspect(engine)
tables = inspector.get_table_names()
persona_tables = [
'writing_personas',
'platform_personas',
'persona_analysis_results',
'persona_validation_results'
]
created_tables = [table for table in persona_tables if table in tables]
print(f"✅ Verified persona tables created: {created_tables}")
if len(created_tables) != len(persona_tables):
missing = [table for table in persona_tables if table not in created_tables]
print(f"⚠️ Warning: Missing persona tables: {missing}")
return False
return True
except Exception as e:
print(f"❌ Error setting up persona tables: {e}")
return False
def verify_persona_tables():
"""Verify that persona tables exist and are accessible."""
print("🔍 Verifying persona tables...")