ALwrity + Wordpress + Wix + GSC integration

This commit is contained in:
ajaysi
2025-10-08 10:13:14 +05:30
parent 14dfb2e5c0
commit 3bab3450dc
147 changed files with 19815 additions and 17053 deletions

View File

@@ -17,7 +17,16 @@ class GSCService:
def __init__(self, db_path: str = "alwrity.db"):
"""Initialize GSC service with database connection."""
self.db_path = db_path
self.credentials_file = "gsc_credentials.json"
# Resolve credentials file robustly: env override or project-relative default
env_credentials_path = os.getenv("GSC_CREDENTIALS_FILE")
if env_credentials_path:
self.credentials_file = env_credentials_path
else:
# Default to <backend>/gsc_credentials.json regardless of CWD
services_dir = os.path.dirname(__file__)
backend_dir = os.path.abspath(os.path.join(services_dir, os.pardir))
self.credentials_file = os.path.join(backend_dir, "gsc_credentials.json")
logger.info(f"GSC credentials file path set to: {self.credentials_file}")
self.scopes = ['https://www.googleapis.com/auth/webmasters.readonly']
self._init_gsc_tables()
logger.info("GSC Service initialized successfully")
@@ -62,12 +71,18 @@ class GSCService:
def save_user_credentials(self, user_id: str, credentials: Credentials) -> bool:
"""Save user's GSC credentials to database."""
try:
# Read client credentials from file to ensure we have all required fields
with open(self.credentials_file, 'r') as f:
client_config = json.load(f)
web_config = client_config.get('web', {})
credentials_json = json.dumps({
'token': credentials.token,
'refresh_token': credentials.refresh_token,
'token_uri': credentials.token_uri,
'client_id': credentials.client_id,
'client_secret': credentials.client_secret,
'token_uri': credentials.token_uri or web_config.get('token_uri'),
'client_id': credentials.client_id or web_config.get('client_id'),
'client_secret': credentials.client_secret or web_config.get('client_secret'),
'scopes': credentials.scopes
})
@@ -99,18 +114,33 @@ class GSCService:
result = cursor.fetchone()
if not result:
logger.warning(f"No GSC credentials found for user: {user_id}")
return None
credentials_data = json.loads(result[0])
# Check for required fields, but allow connection without refresh token
required_fields = ['token_uri', 'client_id', 'client_secret']
missing_fields = [field for field in required_fields if not credentials_data.get(field)]
if missing_fields:
logger.warning(f"GSC credentials for user {user_id} missing required fields: {missing_fields}")
return None
credentials = Credentials.from_authorized_user_info(credentials_data, self.scopes)
# Refresh token if needed
if credentials.expired and credentials.refresh_token:
credentials.refresh(GoogleRequest())
self.save_user_credentials(user_id, credentials)
# Refresh token if needed and possible
if credentials.expired:
if credentials.refresh_token:
try:
credentials.refresh(GoogleRequest())
self.save_user_credentials(user_id, credentials)
except Exception as e:
logger.error(f"Failed to refresh GSC token for user {user_id}: {e}")
return None
else:
logger.warning(f"GSC token expired for user {user_id} but no refresh token available - user needs to re-authorize")
return None
logger.info(f"GSC credentials loaded for user: {user_id}")
return credentials
except Exception as e:
@@ -120,21 +150,28 @@ class GSCService:
def get_oauth_url(self, user_id: str) -> str:
"""Get OAuth authorization URL for GSC."""
try:
logger.info(f"Generating OAuth URL for user: {user_id}")
if not os.path.exists(self.credentials_file):
raise FileNotFoundError(f"GSC credentials file not found: {self.credentials_file}")
redirect_uri = os.getenv('GSC_REDIRECT_URI', 'http://localhost:8000/gsc/callback')
flow = Flow.from_client_secrets_file(
self.credentials_file,
scopes=self.scopes,
redirect_uri=os.getenv('GSC_REDIRECT_URI', 'http://localhost:8000/gsc/callback')
redirect_uri=redirect_uri
)
authorization_url, state = flow.authorization_url(
access_type='offline',
include_granted_scopes='true'
include_granted_scopes='true',
prompt='consent' # Force consent screen to get refresh token
)
logger.info(f"OAuth URL generated for user: {user_id}")
# Store state for verification
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
@@ -144,34 +181,58 @@ class GSCService:
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
INSERT INTO gsc_oauth_states (state, user_id)
INSERT OR REPLACE INTO gsc_oauth_states (state, user_id)
VALUES (?, ?)
''', (state, user_id))
conn.commit()
logger.info(f"OAuth URL generated for user: {user_id}")
logger.info(f"OAuth URL generated successfully for user: {user_id}")
return authorization_url
except Exception as e:
logger.error(f"Error generating OAuth URL for user {user_id}: {e}")
logger.error(f"Error type: {type(e).__name__}")
logger.error(f"Error details: {str(e)}")
raise
def handle_oauth_callback(self, authorization_code: str, state: str) -> bool:
"""Handle OAuth callback and save credentials."""
try:
logger.info(f"Handling OAuth callback with state: {state}")
# Verify state
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT user_id FROM gsc_oauth_states WHERE state = ?
''', (state,))
result = cursor.fetchone()
if not result:
raise ValueError("Invalid OAuth state")
user_id = result[0]
if not result:
# Check if this is a duplicate callback by looking for recent credentials
cursor.execute('SELECT user_id, credentials_json FROM gsc_credentials ORDER BY updated_at DESC LIMIT 1')
recent_credentials = cursor.fetchone()
if recent_credentials:
logger.info("Duplicate callback detected - returning success")
return True
# If no recent credentials, try to find any recent state
cursor.execute('SELECT state, user_id FROM gsc_oauth_states ORDER BY created_at DESC LIMIT 1')
recent_state = cursor.fetchone()
if recent_state:
user_id = recent_state[1]
# Clean up the old state
cursor.execute('DELETE FROM gsc_oauth_states WHERE state = ?', (recent_state[0],))
conn.commit()
else:
raise ValueError("Invalid OAuth state")
else:
user_id = result[0]
# Clean up state
cursor.execute('DELETE FROM gsc_oauth_states WHERE state = ?', (state,))
@@ -330,6 +391,21 @@ class GSCService:
logger.error(f"Error revoking GSC access for user {user_id}: {e}")
return False
def clear_incomplete_credentials(self, user_id: str) -> bool:
"""Clear incomplete GSC credentials that are missing required fields."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM gsc_credentials WHERE user_id = ?', (user_id,))
conn.commit()
logger.info(f"Cleared incomplete GSC credentials for user: {user_id}")
return True
except Exception as e:
logger.error(f"Error clearing incomplete credentials for user {user_id}: {e}")
return False
def _get_cached_data(self, user_id: str, site_url: str, data_type: str, cache_key: str) -> Optional[Dict]:
"""Get cached data if not expired."""
try:

View File

@@ -0,0 +1,170 @@
# WordPress Integration Service
A comprehensive WordPress integration service for ALwrity that enables seamless content publishing to WordPress sites.
## Architecture
### Core Components
1. **WordPressService** (`wordpress_service.py`)
- Manages WordPress site connections
- Handles site credentials and authentication
- Provides site management operations
2. **WordPressContentManager** (`wordpress_content.py`)
- Manages WordPress content operations
- Handles media uploads and compression
- Manages categories, tags, and posts
- Provides WordPress REST API interactions
3. **WordPressPublisher** (`wordpress_publisher.py`)
- High-level publishing service
- Orchestrates content creation and publishing
- Manages post references and tracking
## Features
### Site Management
- ✅ Connect multiple WordPress sites
- ✅ Site credential management
- ✅ Connection testing and validation
- ✅ Site disconnection
### Content Publishing
- ✅ Blog post creation and publishing
- ✅ Media upload with compression
- ✅ Category and tag management
- ✅ Featured image support
- ✅ SEO metadata (meta descriptions)
- ✅ Draft and published status control
### Advanced Features
- ✅ Image compression for better performance
- ✅ Automatic category/tag creation
- ✅ Post status management
- ✅ Post deletion and updates
- ✅ Publishing history tracking
## Database Schema
### WordPress Sites Table
```sql
CREATE TABLE wordpress_sites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
site_url TEXT NOT NULL,
site_name TEXT,
username TEXT NOT NULL,
app_password TEXT NOT NULL,
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, site_url)
);
```
### WordPress Posts Table
```sql
CREATE TABLE wordpress_posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
site_id INTEGER NOT NULL,
wp_post_id INTEGER NOT NULL,
title TEXT NOT NULL,
status TEXT DEFAULT 'draft',
published_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (site_id) REFERENCES wordpress_sites (id)
);
```
## Usage Examples
### Basic Site Connection
```python
from backend.services.integrations import WordPressService
wp_service = WordPressService()
success = wp_service.add_site(
user_id="user123",
site_url="https://mysite.com",
site_name="My Blog",
username="admin",
app_password="xxxx-xxxx-xxxx-xxxx"
)
```
### Publishing Content
```python
from backend.services.integrations import WordPressPublisher
publisher = WordPressPublisher()
result = publisher.publish_blog_post(
user_id="user123",
site_id=1,
title="My Blog Post",
content="<p>This is my blog post content.</p>",
excerpt="A brief excerpt",
featured_image_path="/path/to/image.jpg",
categories=["Technology", "AI"],
tags=["wordpress", "automation"],
status="publish"
)
```
### Content Management
```python
from backend.services.integrations import WordPressContentManager
content_manager = WordPressContentManager(
site_url="https://mysite.com",
username="admin",
app_password="xxxx-xxxx-xxxx-xxxx"
)
# Upload media
media = content_manager.upload_media(
file_path="/path/to/image.jpg",
alt_text="Description",
title="Image Title"
)
# Create post
post = content_manager.create_post(
title="Post Title",
content="<p>Post content</p>",
featured_media_id=media['id'],
status="draft"
)
```
## Authentication
WordPress integration uses **Application Passwords** for authentication:
1. Go to WordPress Admin → Users → Profile
2. Scroll down to "Application Passwords"
3. Create a new application password
4. Use the generated password for authentication
## Error Handling
All services include comprehensive error handling:
- Connection validation
- API response checking
- Graceful failure handling
- Detailed logging
## Logging
The service uses structured logging with different levels:
- `INFO`: Successful operations
- `WARNING`: Non-critical issues
- `ERROR`: Failed operations
## Security
- Credentials are stored securely in the database
- Application passwords are used instead of main passwords
- Connection testing before credential storage
- Proper authentication for all API calls

View File

@@ -0,0 +1,13 @@
"""
WordPress Integration Package
"""
from .wordpress_service import WordPressService
from .wordpress_content import WordPressContentManager
from .wordpress_publisher import WordPressPublisher
__all__ = [
'WordPressService',
'WordPressContentManager',
'WordPressPublisher'
]

View File

@@ -0,0 +1,320 @@
"""
WordPress Content Management Module
Handles content creation, media upload, and publishing to WordPress sites.
"""
import os
import json
import base64
import mimetypes
import tempfile
from typing import Optional, Dict, List, Any, Union
from datetime import datetime
import requests
from requests.auth import HTTPBasicAuth
from PIL import Image
from loguru import logger
class WordPressContentManager:
"""Manages WordPress content operations including posts, media, and taxonomies."""
def __init__(self, site_url: str, username: str, app_password: str):
"""Initialize with WordPress site credentials."""
self.site_url = site_url.rstrip('/')
self.username = username
self.app_password = app_password
self.api_base = f"{self.site_url}/wp-json/wp/v2"
self.auth = HTTPBasicAuth(username, app_password)
def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
"""Make authenticated request to WordPress API."""
try:
url = f"{self.api_base}/{endpoint.lstrip('/')}"
response = requests.request(method, url, auth=self.auth, **kwargs)
if response.status_code in [200, 201]:
return response.json()
else:
logger.error(f"WordPress API error: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"WordPress API request error: {e}")
return None
def get_categories(self) -> List[Dict[str, Any]]:
"""Get all categories from WordPress site."""
try:
result = self._make_request('GET', 'categories', params={'per_page': 100})
if result:
logger.info(f"Retrieved {len(result)} categories from {self.site_url}")
return result
return []
except Exception as e:
logger.error(f"Error getting categories: {e}")
return []
def get_tags(self) -> List[Dict[str, Any]]:
"""Get all tags from WordPress site."""
try:
result = self._make_request('GET', 'tags', params={'per_page': 100})
if result:
logger.info(f"Retrieved {len(result)} tags from {self.site_url}")
return result
return []
except Exception as e:
logger.error(f"Error getting tags: {e}")
return []
def create_category(self, name: str, description: str = "") -> Optional[Dict[str, Any]]:
"""Create a new category."""
try:
data = {
'name': name,
'description': description
}
result = self._make_request('POST', 'categories', json=data)
if result:
logger.info(f"Created category: {name}")
return result
except Exception as e:
logger.error(f"Error creating category {name}: {e}")
return None
def create_tag(self, name: str, description: str = "") -> Optional[Dict[str, Any]]:
"""Create a new tag."""
try:
data = {
'name': name,
'description': description
}
result = self._make_request('POST', 'tags', json=data)
if result:
logger.info(f"Created tag: {name}")
return result
except Exception as e:
logger.error(f"Error creating tag {name}: {e}")
return None
def get_or_create_category(self, name: str, description: str = "") -> Optional[int]:
"""Get existing category or create new one."""
try:
# First, try to find existing category
categories = self.get_categories()
for category in categories:
if category['name'].lower() == name.lower():
logger.info(f"Found existing category: {name}")
return category['id']
# Create new category if not found
new_category = self.create_category(name, description)
if new_category:
return new_category['id']
return None
except Exception as e:
logger.error(f"Error getting or creating category {name}: {e}")
return None
def get_or_create_tag(self, name: str, description: str = "") -> Optional[int]:
"""Get existing tag or create new one."""
try:
# First, try to find existing tag
tags = self.get_tags()
for tag in tags:
if tag['name'].lower() == name.lower():
logger.info(f"Found existing tag: {name}")
return tag['id']
# Create new tag if not found
new_tag = self.create_tag(name, description)
if new_tag:
return new_tag['id']
return None
except Exception as e:
logger.error(f"Error getting or creating tag {name}: {e}")
return None
def upload_media(self, file_path: str, alt_text: str = "", title: str = "", caption: str = "", description: str = "") -> Optional[Dict[str, Any]]:
"""Upload media file to WordPress."""
try:
if not os.path.exists(file_path):
logger.error(f"Media file not found: {file_path}")
return None
# Get file info
file_name = os.path.basename(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type:
logger.error(f"Unable to determine MIME type for: {file_path}")
return None
# Prepare headers
headers = {
'Content-Disposition': f'attachment; filename="{file_name}"'
}
# Upload file
with open(file_path, 'rb') as file:
files = {'file': (file_name, file, mime_type)}
response = requests.post(
f"{self.api_base}/media",
auth=self.auth,
headers=headers,
files=files
)
if response.status_code == 201:
media_data = response.json()
media_id = media_data['id']
# Update media with metadata
update_data = {
'alt_text': alt_text,
'title': title,
'caption': caption,
'description': description
}
update_response = requests.post(
f"{self.api_base}/media/{media_id}",
auth=self.auth,
json=update_data
)
if update_response.status_code == 200:
logger.info(f"Media uploaded successfully: {file_name}")
return update_response.json()
else:
logger.warning(f"Media uploaded but metadata update failed: {update_response.text}")
return media_data
else:
logger.error(f"Media upload failed: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Error uploading media {file_path}: {e}")
return None
def compress_image(self, image_path: str, quality: int = 85) -> str:
"""Compress image for better upload performance."""
try:
if not os.path.exists(image_path):
raise ValueError(f"Image file not found: {image_path}")
original_size = os.path.getsize(image_path)
with Image.open(image_path) as img:
img_format = img.format or 'JPEG'
# Create temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f'.{img_format.lower()}')
# Save with compression
img.save(temp_file, format=img_format, quality=quality, optimize=True)
compressed_size = os.path.getsize(temp_file.name)
reduction = (1 - (compressed_size / original_size)) * 100
logger.info(f"Image compressed: {original_size/1024:.2f}KB -> {compressed_size/1024:.2f}KB ({reduction:.1f}% reduction)")
return temp_file.name
except Exception as e:
logger.error(f"Error compressing image {image_path}: {e}")
return image_path # Return original if compression fails
def _test_connection(self) -> bool:
"""Test WordPress site connection."""
try:
# Test with a simple API call
api_url = f"{self.api_base}/users/me"
response = requests.get(api_url, auth=self.auth, timeout=10)
if response.status_code == 200:
logger.info(f"WordPress connection test successful for {self.site_url}")
return True
else:
logger.warning(f"WordPress connection test failed for {self.site_url}: {response.status_code}")
return False
except Exception as e:
logger.error(f"WordPress connection test error for {self.site_url}: {e}")
return False
def create_post(self, title: str, content: str, excerpt: str = "",
featured_media_id: Optional[int] = None,
categories: Optional[List[int]] = None,
tags: Optional[List[int]] = None,
status: str = 'draft',
meta: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
"""Create a new WordPress post."""
try:
post_data = {
'title': title,
'content': content,
'excerpt': excerpt,
'status': status
}
if featured_media_id:
post_data['featured_media'] = featured_media_id
if categories:
post_data['categories'] = categories
if tags:
post_data['tags'] = tags
if meta:
post_data['meta'] = meta
result = self._make_request('POST', 'posts', json=post_data)
if result:
logger.info(f"Post created successfully: {title}")
return result
except Exception as e:
logger.error(f"Error creating post {title}: {e}")
return None
def update_post(self, post_id: int, **kwargs) -> Optional[Dict[str, Any]]:
"""Update an existing WordPress post."""
try:
result = self._make_request('POST', f'posts/{post_id}', json=kwargs)
if result:
logger.info(f"Post {post_id} updated successfully")
return result
except Exception as e:
logger.error(f"Error updating post {post_id}: {e}")
return None
def get_post(self, post_id: int) -> Optional[Dict[str, Any]]:
"""Get a specific WordPress post."""
try:
result = self._make_request('GET', f'posts/{post_id}')
return result
except Exception as e:
logger.error(f"Error getting post {post_id}: {e}")
return None
def delete_post(self, post_id: int, force: bool = False) -> bool:
"""Delete a WordPress post."""
try:
params = {'force': force} if force else {}
result = self._make_request('DELETE', f'posts/{post_id}', params=params)
if result:
logger.info(f"Post {post_id} deleted successfully")
return True
return False
except Exception as e:
logger.error(f"Error deleting post {post_id}: {e}")
return False

View File

@@ -0,0 +1,287 @@
"""
WordPress OAuth2 Service
Handles WordPress.com OAuth2 authentication flow for simplified user connection.
"""
import os
import secrets
import sqlite3
import requests
from typing import Optional, Dict, Any, List
from datetime import datetime, timedelta
from loguru import logger
import json
import base64
class WordPressOAuthService:
"""Manages WordPress.com OAuth2 authentication flow."""
def __init__(self, db_path: str = "alwrity.db"):
self.db_path = db_path
# WordPress.com OAuth2 credentials
self.client_id = os.getenv('WORDPRESS_CLIENT_ID', '')
self.client_secret = os.getenv('WORDPRESS_CLIENT_SECRET', '')
self.redirect_uri = os.getenv('WORDPRESS_REDIRECT_URI', 'https://littery-sonny-unscrutinisingly.ngrok-free.dev/wp/callback')
self.base_url = "https://public-api.wordpress.com"
# Validate configuration
if not self.client_id or not self.client_secret or self.client_id == 'your_wordpress_com_client_id_here':
logger.error("WordPress OAuth client credentials not configured. Please set WORDPRESS_CLIENT_ID and WORDPRESS_CLIENT_SECRET environment variables with valid WordPress.com application credentials.")
logger.error("To get credentials: 1. Go to https://developer.wordpress.com/apps/ 2. Create a new application 3. Set redirect URI to: https://your-domain.com/wp/callback")
self._init_db()
def _init_db(self):
"""Initialize database tables for OAuth tokens."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS wordpress_oauth_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
access_token TEXT NOT NULL,
refresh_token TEXT,
token_type TEXT DEFAULT 'bearer',
expires_at TIMESTAMP,
scope TEXT,
blog_id TEXT,
blog_url TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS wordpress_oauth_states (
id INTEGER PRIMARY KEY AUTOINCREMENT,
state TEXT NOT NULL UNIQUE,
user_id TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP DEFAULT (datetime('now', '+10 minutes'))
)
''')
conn.commit()
logger.info("WordPress OAuth database initialized.")
def generate_authorization_url(self, user_id: str, scope: str = "global") -> Dict[str, Any]:
"""Generate WordPress OAuth2 authorization URL."""
try:
# Check if credentials are properly configured
if not self.client_id or not self.client_secret or self.client_id == 'your_wordpress_com_client_id_here':
logger.error("WordPress OAuth client credentials not configured")
return None
# Generate secure state parameter
state = secrets.token_urlsafe(32)
# Store state in database for validation
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO wordpress_oauth_states (state, user_id)
VALUES (?, ?)
''', (state, user_id))
conn.commit()
# Build authorization URL
# For WordPress.com, use "global" scope for full access to enable posting
params = [
f"client_id={self.client_id}",
f"redirect_uri={self.redirect_uri}",
"response_type=code",
f"state={state}",
f"scope={scope}" # WordPress.com requires "global" scope for full access
]
auth_url = f"{self.base_url}/oauth2/authorize?{'&'.join(params)}"
logger.info(f"Generated WordPress OAuth URL for user {user_id}")
return {
"auth_url": auth_url,
"state": state
}
except Exception as e:
logger.error(f"Error generating WordPress OAuth URL: {e}")
return None
def handle_oauth_callback(self, code: str, state: str) -> Optional[Dict[str, Any]]:
"""Handle OAuth callback and exchange code for access token."""
try:
# Validate state parameter
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT user_id FROM wordpress_oauth_states
WHERE state = ? AND expires_at > datetime('now')
''', (state,))
result = cursor.fetchone()
if not result:
logger.error(f"Invalid or expired state parameter: {state}")
return None
user_id = result[0]
# Clean up used state
cursor.execute('DELETE FROM wordpress_oauth_states WHERE state = ?', (state,))
conn.commit()
# Exchange authorization code for access token
token_data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'redirect_uri': self.redirect_uri,
'code': code,
'grant_type': 'authorization_code'
}
response = requests.post(
f"{self.base_url}/oauth2/token",
data=token_data,
timeout=30
)
if response.status_code != 200:
logger.error(f"Token exchange failed: {response.status_code} - {response.text}")
return None
token_info = response.json()
# Store token information
access_token = token_info.get('access_token')
blog_id = token_info.get('blog_id')
blog_url = token_info.get('blog_url')
scope = token_info.get('scope', '')
# Calculate expiration (WordPress tokens typically expire in 2 weeks)
expires_at = datetime.now() + timedelta(days=14)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO wordpress_oauth_tokens
(user_id, access_token, token_type, expires_at, scope, blog_id, blog_url)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (user_id, access_token, 'bearer', expires_at, scope, blog_id, blog_url))
conn.commit()
logger.info(f"WordPress OAuth token stored for user {user_id}")
return {
"success": True,
"access_token": access_token,
"blog_id": blog_id,
"blog_url": blog_url,
"scope": scope,
"expires_at": expires_at.isoformat()
}
except Exception as e:
logger.error(f"Error handling WordPress OAuth callback: {e}")
return None
def get_user_tokens(self, user_id: str) -> List[Dict[str, Any]]:
"""Get all active WordPress tokens for a user."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT id, access_token, token_type, expires_at, scope, blog_id, blog_url, created_at
FROM wordpress_oauth_tokens
WHERE user_id = ? AND is_active = TRUE AND expires_at > datetime('now')
ORDER BY created_at DESC
''', (user_id,))
tokens = []
for row in cursor.fetchall():
tokens.append({
"id": row[0],
"access_token": row[1],
"token_type": row[2],
"expires_at": row[3],
"scope": row[4],
"blog_id": row[5],
"blog_url": row[6],
"created_at": row[7]
})
return tokens
except Exception as e:
logger.error(f"Error getting WordPress tokens for user {user_id}: {e}")
return []
def test_token(self, access_token: str) -> bool:
"""Test if a WordPress access token is valid."""
try:
headers = {'Authorization': f'Bearer {access_token}'}
response = requests.get(
f"{self.base_url}/rest/v1/me/",
headers=headers,
timeout=10
)
return response.status_code == 200
except Exception as e:
logger.error(f"Error testing WordPress token: {e}")
return False
def revoke_token(self, user_id: str, token_id: int) -> bool:
"""Revoke a WordPress OAuth token."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE wordpress_oauth_tokens
SET is_active = FALSE, updated_at = datetime('now')
WHERE user_id = ? AND id = ?
''', (user_id, token_id))
conn.commit()
if cursor.rowcount > 0:
logger.info(f"WordPress token {token_id} revoked for user {user_id}")
return True
return False
except Exception as e:
logger.error(f"Error revoking WordPress token: {e}")
return False
def get_connection_status(self, user_id: str) -> Dict[str, Any]:
"""Get WordPress connection status for a user."""
try:
tokens = self.get_user_tokens(user_id)
if not tokens:
return {
"connected": False,
"sites": [],
"total_sites": 0
}
# Test each token and get site information
active_sites = []
for token in tokens:
if self.test_token(token["access_token"]):
active_sites.append({
"id": token["id"],
"blog_id": token["blog_id"],
"blog_url": token["blog_url"],
"scope": token["scope"],
"created_at": token["created_at"]
})
return {
"connected": len(active_sites) > 0,
"sites": active_sites,
"total_sites": len(active_sites)
}
except Exception as e:
logger.error(f"Error getting WordPress connection status: {e}")
return {
"connected": False,
"sites": [],
"total_sites": 0
}

View File

@@ -0,0 +1,287 @@
"""
WordPress Publishing Service
High-level service for publishing content to WordPress sites.
"""
import os
import json
import tempfile
from typing import Optional, Dict, List, Any, Union
from datetime import datetime
from loguru import logger
from .wordpress_service import WordPressService
from .wordpress_content import WordPressContentManager
import sqlite3
class WordPressPublisher:
"""High-level WordPress publishing service."""
def __init__(self, db_path: str = "alwrity.db"):
"""Initialize WordPress publisher."""
self.wp_service = WordPressService(db_path)
self.db_path = db_path
def publish_blog_post(self, user_id: str, site_id: int,
title: str, content: str,
excerpt: str = "",
featured_image_path: Optional[str] = None,
categories: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
status: str = 'draft',
meta_description: str = "") -> Dict[str, Any]:
"""Publish a blog post to WordPress."""
try:
# Get site credentials
credentials = self.wp_service.get_site_credentials(site_id)
if not credentials:
return {
'success': False,
'error': 'WordPress site not found or inactive',
'post_id': None
}
# Initialize content manager
content_manager = WordPressContentManager(
credentials['site_url'],
credentials['username'],
credentials['app_password']
)
# Test connection
if not content_manager._test_connection():
return {
'success': False,
'error': 'Cannot connect to WordPress site',
'post_id': None
}
# Handle featured image
featured_media_id = None
if featured_image_path and os.path.exists(featured_image_path):
try:
# Compress image if it's an image file
if featured_image_path.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')):
compressed_path = content_manager.compress_image(featured_image_path)
featured_media = content_manager.upload_media(
compressed_path,
alt_text=title,
title=title,
caption=excerpt
)
# Clean up temporary file if created
if compressed_path != featured_image_path:
os.unlink(compressed_path)
else:
featured_media = content_manager.upload_media(
featured_image_path,
alt_text=title,
title=title,
caption=excerpt
)
if featured_media:
featured_media_id = featured_media['id']
logger.info(f"Featured image uploaded: {featured_media_id}")
except Exception as e:
logger.warning(f"Failed to upload featured image: {e}")
# Handle categories
category_ids = []
if categories:
for category_name in categories:
category_id = content_manager.get_or_create_category(category_name)
if category_id:
category_ids.append(category_id)
# Handle tags
tag_ids = []
if tags:
for tag_name in tags:
tag_id = content_manager.get_or_create_tag(tag_name)
if tag_id:
tag_ids.append(tag_id)
# Prepare meta data
meta_data = {}
if meta_description:
meta_data['description'] = meta_description
# Create the post
post_data = content_manager.create_post(
title=title,
content=content,
excerpt=excerpt,
featured_media_id=featured_media_id,
categories=category_ids if category_ids else None,
tags=tag_ids if tag_ids else None,
status=status,
meta=meta_data if meta_data else None
)
if post_data:
# Store post reference in database
self._store_post_reference(user_id, site_id, post_data['id'], title, status)
logger.info(f"Blog post published successfully: {title}")
return {
'success': True,
'post_id': post_data['id'],
'post_url': post_data.get('link'),
'featured_media_id': featured_media_id,
'categories': category_ids,
'tags': tag_ids
}
else:
return {
'success': False,
'error': 'Failed to create WordPress post',
'post_id': None
}
except Exception as e:
logger.error(f"Error publishing blog post: {e}")
return {
'success': False,
'error': str(e),
'post_id': None
}
def _store_post_reference(self, user_id: str, site_id: int, wp_post_id: int, title: str, status: str) -> None:
"""Store post reference in database."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO wordpress_posts
(user_id, site_id, wp_post_id, title, status, published_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (user_id, site_id, wp_post_id, title, status,
datetime.now().isoformat() if status == 'publish' else None))
conn.commit()
except Exception as e:
logger.error(f"Error storing post reference: {e}")
def get_user_posts(self, user_id: str, site_id: Optional[int] = None) -> List[Dict[str, Any]]:
"""Get all posts published by user."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
if site_id:
cursor.execute('''
SELECT wp.id, wp.wp_post_id, wp.title, wp.status, wp.published_at, wp.created_at,
ws.site_name, ws.site_url
FROM wordpress_posts wp
JOIN wordpress_sites ws ON wp.site_id = ws.id
WHERE wp.user_id = ? AND wp.site_id = ?
ORDER BY wp.created_at DESC
''', (user_id, site_id))
else:
cursor.execute('''
SELECT wp.id, wp.wp_post_id, wp.title, wp.status, wp.published_at, wp.created_at,
ws.site_name, ws.site_url
FROM wordpress_posts wp
JOIN wordpress_sites ws ON wp.site_id = ws.id
WHERE wp.user_id = ?
ORDER BY wp.created_at DESC
''', (user_id,))
posts = []
for row in cursor.fetchall():
posts.append({
'id': row[0],
'wp_post_id': row[1],
'title': row[2],
'status': row[3],
'published_at': row[4],
'created_at': row[5],
'site_name': row[6],
'site_url': row[7]
})
return posts
except Exception as e:
logger.error(f"Error getting user posts: {e}")
return []
def update_post_status(self, user_id: str, post_id: int, status: str) -> bool:
"""Update post status (draft/publish)."""
try:
# Get post info
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT wp.site_id, wp.wp_post_id, ws.site_url, ws.username, ws.app_password
FROM wordpress_posts wp
JOIN wordpress_sites ws ON wp.site_id = ws.id
WHERE wp.id = ? AND wp.user_id = ?
''', (post_id, user_id))
result = cursor.fetchone()
if not result:
return False
site_id, wp_post_id, site_url, username, app_password = result
# Update in WordPress
content_manager = WordPressContentManager(site_url, username, app_password)
wp_result = content_manager.update_post(wp_post_id, status=status)
if wp_result:
# Update in database
cursor.execute('''
UPDATE wordpress_posts
SET status = ?, published_at = ?
WHERE id = ?
''', (status, datetime.now().isoformat() if status == 'publish' else None, post_id))
conn.commit()
logger.info(f"Post {post_id} status updated to {status}")
return True
return False
except Exception as e:
logger.error(f"Error updating post status: {e}")
return False
def delete_post(self, user_id: str, post_id: int, force: bool = False) -> bool:
"""Delete a WordPress post."""
try:
# Get post info
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT wp.site_id, wp.wp_post_id, ws.site_url, ws.username, ws.app_password
FROM wordpress_posts wp
JOIN wordpress_sites ws ON wp.site_id = ws.id
WHERE wp.id = ? AND wp.user_id = ?
''', (post_id, user_id))
result = cursor.fetchone()
if not result:
return False
site_id, wp_post_id, site_url, username, app_password = result
# Delete from WordPress
content_manager = WordPressContentManager(site_url, username, app_password)
wp_result = content_manager.delete_post(wp_post_id, force=force)
if wp_result:
# Remove from database
cursor.execute('DELETE FROM wordpress_posts WHERE id = ?', (post_id,))
conn.commit()
logger.info(f"Post {post_id} deleted successfully")
return True
return False
except Exception as e:
logger.error(f"Error deleting post: {e}")
return False

View File

@@ -0,0 +1,249 @@
"""
WordPress Service for ALwrity
Handles WordPress site connections, content publishing, and media management.
"""
import os
import json
import sqlite3
import base64
import mimetypes
import tempfile
from typing import Optional, Dict, List, Any, Tuple
from datetime import datetime
import requests
from requests.auth import HTTPBasicAuth
from PIL import Image
from loguru import logger
class WordPressService:
"""Main WordPress service class for managing WordPress integrations."""
def __init__(self, db_path: str = "alwrity.db"):
"""Initialize WordPress service with database path."""
self.db_path = db_path
self.api_version = "v2"
self._ensure_tables()
def _ensure_tables(self) -> None:
"""Ensure required database tables exist."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# WordPress sites table
cursor.execute('''
CREATE TABLE IF NOT EXISTS wordpress_sites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
site_url TEXT NOT NULL,
site_name TEXT,
username TEXT NOT NULL,
app_password TEXT NOT NULL,
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, site_url)
)
''')
# WordPress posts table for tracking published content
cursor.execute('''
CREATE TABLE IF NOT EXISTS wordpress_posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
site_id INTEGER NOT NULL,
wp_post_id INTEGER NOT NULL,
title TEXT NOT NULL,
status TEXT DEFAULT 'draft',
published_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (site_id) REFERENCES wordpress_sites (id)
)
''')
conn.commit()
logger.info("WordPress database tables ensured")
except Exception as e:
logger.error(f"Error ensuring WordPress tables: {e}")
raise
def add_site(self, user_id: str, site_url: str, site_name: str, username: str, app_password: str) -> bool:
"""Add a new WordPress site connection."""
try:
# Validate site URL format
if not site_url.startswith(('http://', 'https://')):
site_url = f"https://{site_url}"
# Test connection before saving
if not self._test_connection(site_url, username, app_password):
logger.error(f"Failed to connect to WordPress site: {site_url}")
return False
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO wordpress_sites
(user_id, site_url, site_name, username, app_password, updated_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (user_id, site_url, site_name, username, app_password))
conn.commit()
logger.info(f"WordPress site added for user {user_id}: {site_name}")
return True
except Exception as e:
logger.error(f"Error adding WordPress site: {e}")
return False
def get_user_sites(self, user_id: str) -> List[Dict[str, Any]]:
"""Get all WordPress sites for a user."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT id, site_url, site_name, username, is_active, created_at, updated_at
FROM wordpress_sites
WHERE user_id = ? AND is_active = 1
ORDER BY updated_at DESC
''', (user_id,))
sites = []
for row in cursor.fetchall():
sites.append({
'id': row[0],
'site_url': row[1],
'site_name': row[2],
'username': row[3],
'is_active': bool(row[4]),
'created_at': row[5],
'updated_at': row[6]
})
logger.info(f"Retrieved {len(sites)} WordPress sites for user {user_id}")
return sites
except Exception as e:
logger.error(f"Error getting WordPress sites for user {user_id}: {e}")
return []
def get_site_credentials(self, site_id: int) -> Optional[Dict[str, str]]:
"""Get credentials for a specific WordPress site."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT site_url, username, app_password
FROM wordpress_sites
WHERE id = ? AND is_active = 1
''', (site_id,))
result = cursor.fetchone()
if result:
return {
'site_url': result[0],
'username': result[1],
'app_password': result[2]
}
return None
except Exception as e:
logger.error(f"Error getting credentials for site {site_id}: {e}")
return None
def _test_connection(self, site_url: str, username: str, app_password: str) -> bool:
"""Test WordPress site connection."""
try:
# Test with a simple API call
api_url = f"{site_url}/wp-json/wp/v2/users/me"
response = requests.get(api_url, auth=HTTPBasicAuth(username, app_password), timeout=10)
if response.status_code == 200:
logger.info(f"WordPress connection test successful for {site_url}")
return True
else:
logger.warning(f"WordPress connection test failed for {site_url}: {response.status_code}")
return False
except Exception as e:
logger.error(f"WordPress connection test error for {site_url}: {e}")
return False
def disconnect_site(self, user_id: str, site_id: int) -> bool:
"""Disconnect a WordPress site."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE wordpress_sites
SET is_active = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ? AND user_id = ?
''', (site_id, user_id))
conn.commit()
logger.info(f"WordPress site {site_id} disconnected for user {user_id}")
return True
except Exception as e:
logger.error(f"Error disconnecting WordPress site {site_id}: {e}")
return False
def get_site_info(self, site_id: int) -> Optional[Dict[str, Any]]:
"""Get detailed information about a WordPress site."""
try:
credentials = self.get_site_credentials(site_id)
if not credentials:
return None
site_url = credentials['site_url']
username = credentials['username']
app_password = credentials['app_password']
# Get site information
info = {
'site_url': site_url,
'username': username,
'api_version': self.api_version
}
# Test connection and get basic info
if self._test_connection(site_url, username, app_password):
info['connected'] = True
info['last_checked'] = datetime.now().isoformat()
else:
info['connected'] = False
info['last_checked'] = datetime.now().isoformat()
return info
except Exception as e:
logger.error(f"Error getting site info for {site_id}: {e}")
return None
def get_posts_for_all_sites(self, user_id: str) -> List[Dict[str, Any]]:
"""Get all tracked WordPress posts for all sites of a user."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT wp.id, wp.wordpress_post_id, wp.title, wp.status, wp.published_at, wp.last_updated_at,
ws.site_name, ws.site_url
FROM wordpress_posts wp
JOIN wordpress_sites ws ON wp.site_id = ws.id
WHERE wp.user_id = ? AND ws.is_active = TRUE
ORDER BY wp.published_at DESC
''', (user_id,))
posts = []
for post_data in cursor.fetchall():
posts.append({
"id": post_data[0],
"wp_post_id": post_data[1],
"title": post_data[2],
"status": post_data[3],
"published_at": post_data[4],
"created_at": post_data[5],
"site_name": post_data[6],
"site_url": post_data[7]
})
return posts

View File

@@ -15,10 +15,34 @@ class PersonaPromptBuilder:
def build_persona_analysis_prompt(self, onboarding_data: Dict[str, Any]) -> str:
"""Build the main persona analysis prompt with comprehensive data."""
# Get enhanced analysis data
enhanced_analysis = onboarding_data.get("enhanced_analysis", {})
website_analysis = onboarding_data.get("website_analysis", {}) or {}
research_prefs = onboarding_data.get("research_preferences", {}) or {}
# Handle both frontend-style data and backend database-style data
# Frontend sends: {websiteAnalysis, competitorResearch, sitemapAnalysis, businessData}
# Backend sends: {enhanced_analysis, website_analysis, research_preferences}
# Normalize data structure
if "websiteAnalysis" in onboarding_data:
# Frontend-style data - adapt to expected structure
website_analysis = onboarding_data.get("websiteAnalysis", {}) or {}
competitor_research = onboarding_data.get("competitorResearch", {}) or {}
sitemap_analysis = onboarding_data.get("sitemapAnalysis", {}) or {}
business_data = onboarding_data.get("businessData", {}) or {}
# Create enhanced_analysis from frontend data
enhanced_analysis = {
"comprehensive_style_analysis": website_analysis.get("writing_style", {}),
"content_insights": website_analysis.get("content_characteristics", {}),
"audience_intelligence": website_analysis.get("target_audience", {}),
"technical_writing_metrics": website_analysis.get("style_patterns", {}),
"competitive_analysis": competitor_research,
"sitemap_data": sitemap_analysis,
"business_context": business_data
}
research_prefs = {}
else:
# Backend database-style data
enhanced_analysis = onboarding_data.get("enhanced_analysis", {})
website_analysis = onboarding_data.get("website_analysis", {}) or {}
research_prefs = onboarding_data.get("research_preferences", {}) or {}
prompt = f"""
COMPREHENSIVE PERSONA GENERATION TASK: Create a highly detailed, data-driven writing persona based on extensive AI analysis of user's website and content strategy.
@@ -115,10 +139,8 @@ Style Patterns: {json.dumps(website_analysis.get('style_patterns', {}), indent=2
- Include competitive analysis for market positioning
- Use content strategy insights for practical application
- Ensure the persona reflects the brand's unique elements and competitive advantages
- Provide a confidence score (0-100) based on data richness and quality
- Include detailed analysis notes explaining your reasoning and data sources
Generate a comprehensive, data-driven persona profile that can be used to replicate this writing style across different platforms while maintaining brand authenticity and competitive positioning.
Generate a comprehensive, data-driven persona profile that accurately captures the writing style and brand voice to replicate consistently across different platforms.
"""
return prompt
@@ -256,11 +278,9 @@ Generate a platform-optimized persona adaptation that maintains brand consistenc
}
}
}
},
"confidence_score": {"type": "number"},
"analysis_notes": {"type": "string"}
}
},
"required": ["identity", "linguistic_fingerprint", "tonal_range", "confidence_score"]
"required": ["identity", "linguistic_fingerprint", "tonal_range"]
}
def get_platform_schema(self) -> Dict[str, Any]:

View File

@@ -13,28 +13,35 @@ from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from nltk.tag import pos_tag
from textstat import flesch_reading_ease, flesch_kincaid_grade
import spacy
class EnhancedLinguisticAnalyzer:
"""Advanced linguistic analysis for persona creation and improvement."""
def __init__(self):
"""Initialize the linguistic analyzer."""
"""Initialize the linguistic analyzer with required spaCy dependency."""
self.nlp = None
self.spacy_available = False
# spaCy is REQUIRED for high-quality persona generation
try:
# Try to load spaCy model
import spacy
self.nlp = spacy.load("en_core_web_sm")
except OSError:
logger.warning("spaCy model not found. Install with: python -m spacy download en_core_web_sm")
self.spacy_available = True
logger.info("SUCCESS: spaCy model loaded successfully - Enhanced linguistic analysis available")
except ImportError as e:
logger.error(f"ERROR: spaCy is REQUIRED for persona generation. Install with: pip install spacy && python -m spacy download en_core_web_sm")
raise ImportError("spaCy is required for enhanced persona generation. Install with: pip install spacy && python -m spacy download en_core_web_sm") from e
except OSError as e:
logger.error(f"ERROR: spaCy model 'en_core_web_sm' is REQUIRED. Download with: python -m spacy download en_core_web_sm")
raise OSError("spaCy model 'en_core_web_sm' is required. Download with: python -m spacy download en_core_web_sm") from e
# Download required NLTK data
try:
nltk.data.find('tokenizers/punkt')
nltk.data.find('tokenizers/punkt_tab') # Updated for newer NLTK versions
nltk.data.find('corpora/stopwords')
nltk.data.find('taggers/averaged_perceptron_tagger')
except LookupError:
logger.warning("NLTK data not found. Downloading required data...")
nltk.download('punkt', quiet=True)
nltk.download('punkt_tab', quiet=True) # Updated for newer NLTK versions
nltk.download('stopwords', quiet=True)
nltk.download('averaged_perceptron_tagger', quiet=True)
@@ -625,5 +632,4 @@ class EnhancedLinguisticAnalyzer:
clauses = len(re.findall(r'[,;]', sentence)) + 1
total_clauses += clauses
return total_clauses / len(sentences) if sentences else 0
a
return total_clauses / len(sentences) if sentences else 0

View File

@@ -26,6 +26,299 @@ class PersonaQualityImprover:
self.linguistic_analyzer = EnhancedLinguisticAnalyzer()
logger.info("PersonaQualityImprover initialized")
def assess_persona_quality_comprehensive(
self,
core_persona: Dict[str, Any],
platform_personas: Dict[str, Any],
linguistic_analysis: Dict[str, Any],
user_preferences: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Comprehensive quality assessment for quality-first approach.
"""
try:
# Calculate comprehensive quality metrics
quality_metrics = self._calculate_comprehensive_quality_metrics(
core_persona, platform_personas, linguistic_analysis, user_preferences
)
# Generate detailed recommendations
recommendations = self._generate_comprehensive_recommendations(quality_metrics, linguistic_analysis)
return {
"overall_score": quality_metrics.get('overall_score', 0),
"core_completeness": quality_metrics.get('core_completeness', 0),
"platform_consistency": quality_metrics.get('platform_consistency', 0),
"platform_optimization": quality_metrics.get('platform_optimization', 0),
"linguistic_quality": quality_metrics.get('linguistic_quality', 0),
"recommendations": recommendations,
"assessment_method": "comprehensive_ai_based",
"linguistic_insights": linguistic_analysis,
"detailed_metrics": quality_metrics
}
except Exception as e:
logger.error(f"Comprehensive quality assessment error: {str(e)}")
return {
"overall_score": 75,
"core_completeness": 75,
"platform_consistency": 75,
"platform_optimization": 75,
"linguistic_quality": 75,
"recommendations": ["Quality assessment completed with default metrics"],
"error": str(e)
}
def improve_persona_quality(
self,
core_persona: Dict[str, Any],
platform_personas: Dict[str, Any],
quality_metrics: Dict[str, Any]
) -> Dict[str, Any]:
"""
Improve persona quality based on assessment results.
"""
try:
logger.info("Improving persona quality based on assessment results...")
improved_core_persona = self._improve_core_persona(core_persona, quality_metrics)
improved_platform_personas = self._improve_platform_personas(platform_personas, quality_metrics)
return {
"core_persona": improved_core_persona,
"platform_personas": improved_platform_personas,
"improvement_applied": True,
"improvement_details": "Quality improvements applied based on assessment results"
}
except Exception as e:
logger.error(f"Persona quality improvement error: {str(e)}")
return {"error": f"Failed to improve persona quality: {str(e)}"}
def _calculate_comprehensive_quality_metrics(
self,
core_persona: Dict[str, Any],
platform_personas: Dict[str, Any],
linguistic_analysis: Dict[str, Any],
user_preferences: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Calculate comprehensive quality metrics."""
try:
# Core completeness (30% weight)
core_completeness = self._assess_core_completeness(core_persona, linguistic_analysis)
# Platform consistency (25% weight)
platform_consistency = self._assess_platform_consistency(core_persona, platform_personas)
# Platform optimization (25% weight)
platform_optimization = self._assess_platform_optimization(platform_personas)
# Linguistic quality (20% weight)
linguistic_quality = self._assess_linguistic_quality(linguistic_analysis)
# Calculate weighted overall score
overall_score = int((
core_completeness * 0.30 +
platform_consistency * 0.25 +
platform_optimization * 0.25 +
linguistic_quality * 0.20
))
return {
"overall_score": overall_score,
"core_completeness": core_completeness,
"platform_consistency": platform_consistency,
"platform_optimization": platform_optimization,
"linguistic_quality": linguistic_quality,
"weights": {
"core_completeness": 0.30,
"platform_consistency": 0.25,
"platform_optimization": 0.25,
"linguistic_quality": 0.20
}
}
except Exception as e:
logger.error(f"Error calculating comprehensive quality metrics: {str(e)}")
return {
"overall_score": 75,
"core_completeness": 75,
"platform_consistency": 75,
"platform_optimization": 75,
"linguistic_quality": 75
}
def _assess_core_completeness(self, core_persona: Dict[str, Any], linguistic_analysis: Dict[str, Any]) -> int:
"""Assess core persona completeness."""
required_sections = ['writing_style', 'content_characteristics', 'brand_voice', 'target_audience']
present_sections = sum(1 for section in required_sections if section in core_persona and core_persona[section])
base_score = int((present_sections / len(required_sections)) * 100)
# Boost if linguistic analysis provides additional insights
if linguistic_analysis and linguistic_analysis.get('analysis_completeness', 0) > 0.8:
base_score = min(base_score + 10, 100)
return base_score
def _assess_platform_consistency(self, core_persona: Dict[str, Any], platform_personas: Dict[str, Any]) -> int:
"""Assess consistency across platform personas."""
if not platform_personas:
return 50
core_voice = core_persona.get('brand_voice', {}).get('keywords', [])
consistency_scores = []
for platform, persona in platform_personas.items():
if 'error' not in persona:
platform_voice = persona.get('brand_voice', {}).get('keywords', [])
overlap = len(set(core_voice) & set(platform_voice))
consistency_scores.append(min(overlap * 10, 100))
return int(sum(consistency_scores) / len(consistency_scores)) if consistency_scores else 75
def _assess_platform_optimization(self, platform_personas: Dict[str, Any]) -> int:
"""Assess platform-specific optimization quality."""
if not platform_personas:
return 50
optimization_scores = []
for platform, persona in platform_personas.items():
if 'error' not in persona:
has_optimizations = any(key in persona for key in [
'platform_optimizations', 'content_guidelines', 'engagement_strategies'
])
optimization_scores.append(90 if has_optimizations else 60)
return int(sum(optimization_scores) / len(optimization_scores)) if optimization_scores else 75
def _assess_linguistic_quality(self, linguistic_analysis: Dict[str, Any]) -> int:
"""Assess linguistic analysis quality."""
if not linguistic_analysis:
return 50
quality_indicators = [
'analysis_completeness',
'style_consistency',
'vocabulary_sophistication',
'content_coherence'
]
scores = [linguistic_analysis.get(indicator, 0.5) for indicator in quality_indicators]
return int(sum(scores) / len(scores) * 100)
def _generate_comprehensive_recommendations(self, quality_metrics: Dict[str, Any], linguistic_analysis: Dict[str, Any]) -> List[str]:
"""Generate comprehensive quality recommendations."""
recommendations = []
if quality_metrics.get('core_completeness', 0) < 85:
recommendations.append("Enhance core persona with more detailed writing style characteristics and brand voice elements")
if quality_metrics.get('platform_consistency', 0) < 80:
recommendations.append("Improve brand voice consistency across all platform adaptations")
if quality_metrics.get('platform_optimization', 0) < 85:
recommendations.append("Strengthen platform-specific optimizations and engagement strategies")
if quality_metrics.get('linguistic_quality', 0) < 80:
recommendations.append("Improve linguistic quality and writing sophistication")
# Add linguistic-specific recommendations
if linguistic_analysis:
if linguistic_analysis.get('style_consistency', 0) < 0.7:
recommendations.append("Enhance writing style consistency across content samples")
if linguistic_analysis.get('vocabulary_sophistication', 0) < 0.7:
recommendations.append("Increase vocabulary sophistication for better audience engagement")
if not recommendations:
recommendations.append("Your personas demonstrate excellent quality across all assessment criteria!")
return recommendations
def _improve_core_persona(self, core_persona: Dict[str, Any], quality_metrics: Dict[str, Any]) -> Dict[str, Any]:
"""Improve core persona based on quality metrics."""
improved_persona = core_persona.copy()
# Enhance based on quality gaps
if quality_metrics.get('core_completeness', 0) < 85:
# Add more detailed characteristics
if 'writing_style' not in improved_persona:
improved_persona['writing_style'] = {}
if 'sentence_structure' not in improved_persona['writing_style']:
improved_persona['writing_style']['sentence_structure'] = 'Varied and engaging'
if 'vocabulary_level' not in improved_persona['writing_style']:
improved_persona['writing_style']['vocabulary_level'] = 'Professional with accessible language'
return improved_persona
def _improve_platform_personas(self, platform_personas: Dict[str, Any], quality_metrics: Dict[str, Any]) -> Dict[str, Any]:
"""Improve platform personas based on quality metrics."""
improved_personas = platform_personas.copy()
# Enhance each platform persona
for platform, persona in improved_personas.items():
if 'error' not in persona:
# Add platform-specific optimizations if missing
if 'platform_optimizations' not in persona:
persona['platform_optimizations'] = self._get_default_platform_optimizations(platform)
# Enhance engagement strategies
if 'engagement_strategies' not in persona:
persona['engagement_strategies'] = self._get_default_engagement_strategies(platform)
return improved_personas
def _get_default_platform_optimizations(self, platform: str) -> Dict[str, Any]:
"""Get default platform optimizations."""
optimizations = {
'linkedin': {
'professional_networking': True,
'thought_leadership': True,
'industry_insights': True
},
'facebook': {
'community_building': True,
'social_engagement': True,
'visual_storytelling': True
},
'twitter': {
'real_time_updates': True,
'hashtag_optimization': True,
'concise_messaging': True
},
'blog': {
'seo_optimization': True,
'long_form_content': True,
'storytelling': True
}
}
return optimizations.get(platform, {})
def _get_default_engagement_strategies(self, platform: str) -> Dict[str, Any]:
"""Get default engagement strategies."""
strategies = {
'linkedin': {
'call_to_action': 'Connect with me to discuss',
'engagement_style': 'Professional networking'
},
'facebook': {
'call_to_action': 'Join our community',
'engagement_style': 'Social interaction'
},
'twitter': {
'call_to_action': 'Follow for updates',
'engagement_style': 'Real-time conversation'
},
'blog': {
'call_to_action': 'Subscribe for more insights',
'engagement_style': 'Educational content'
}
}
return strategies.get(platform, {})
def assess_persona_quality(self, persona_id: int, user_feedback: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Assess the quality of a persona and provide improvement suggestions.

View File

@@ -7,6 +7,7 @@ content distribution, and publishing patterns for SEO optimization.
import aiohttp
import asyncio
import re
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from loguru import logger
@@ -25,6 +26,27 @@ class SitemapService:
"""Initialize the sitemap service"""
self.service_name = "sitemap_analyzer"
logger.info(f"Initialized {self.service_name}")
# Common sitemap paths to check
self.common_sitemap_paths = [
"sitemap.xml",
"sitemap_index.xml",
"sitemap/index.xml",
"sitemap.php",
"sitemap.txt",
"sitemap.xml.gz",
"sitemap1.xml",
# Common CMS/plugin paths
"wp-sitemap.xml", # WordPress 5.5+ default
"post-sitemap.xml",
"page-sitemap.xml",
"product-sitemap.xml", # WooCommerce
"category-sitemap.xml",
# Common feed paths that can act as sitemaps
"rss/",
"rss.xml",
"atom.xml",
]
async def analyze_sitemap(
self,
@@ -305,6 +327,96 @@ class SitemapService:
)
}
async def analyze_sitemap_for_onboarding(
self,
sitemap_url: str,
user_url: str,
competitors: List[str] = None,
industry_context: str = None,
analyze_content_trends: bool = True,
analyze_publishing_patterns: bool = True
) -> Dict[str, Any]:
"""Enhanced sitemap analysis specifically for onboarding Step 3 competitive analysis"""
try:
# Run standard sitemap analysis
analysis_result = await self.analyze_sitemap(
sitemap_url=sitemap_url,
analyze_content_trends=analyze_content_trends,
analyze_publishing_patterns=analyze_publishing_patterns
)
# Enhance with onboarding-specific insights
onboarding_insights = await self._generate_onboarding_insights(
analysis_result,
user_url,
competitors,
industry_context
)
# Combine results
analysis_result["onboarding_insights"] = onboarding_insights
analysis_result["user_url"] = user_url
analysis_result["industry_context"] = industry_context
analysis_result["competitors_analyzed"] = competitors or []
return analysis_result
except Exception as e:
logger.error(f"Error in onboarding sitemap analysis: {e}")
return {
"error": str(e),
"success": False
}
async def _generate_onboarding_insights(
self,
analysis_result: Dict[str, Any],
user_url: str,
competitors: List[str] = None,
industry_context: str = None
) -> Dict[str, Any]:
"""Generate onboarding-specific insights for competitive analysis"""
try:
structure_analysis = analysis_result.get("structure_analysis", {})
content_trends = analysis_result.get("content_trends", {})
publishing_patterns = analysis_result.get("publishing_patterns", {})
# Build onboarding-specific prompt
prompt = self._build_onboarding_analysis_prompt(
structure_analysis, content_trends, publishing_patterns,
user_url, competitors, industry_context
)
# Generate AI insights
ai_response = llm_text_gen(
prompt=prompt,
system_prompt=self._get_onboarding_system_prompt()
)
# Parse and structure insights
insights = self._parse_onboarding_insights(ai_response)
# Log AI analysis
await seo_logger.log_ai_analysis(
tool_name=f"{self.service_name}_onboarding",
prompt=prompt,
response=ai_response,
model_used="gemini-2.0-flash-001"
)
return insights
except Exception as e:
logger.error(f"Error generating onboarding insights: {e}")
return {
"competitive_positioning": "Analysis unavailable",
"content_gaps": [],
"growth_opportunities": [],
"industry_benchmarks": []
}
async def _generate_ai_insights(
self,
structure_analysis: Dict[str, Any],
@@ -599,4 +711,320 @@ Focus on actionable insights for content creators and digital marketing professi
"service": self.service_name,
"error": str(e),
"last_check": datetime.utcnow().isoformat()
}
}
def _build_onboarding_analysis_prompt(
self,
structure_analysis: Dict[str, Any],
content_trends: Dict[str, Any],
publishing_patterns: Dict[str, Any],
user_url: str,
competitors: List[str] = None,
industry_context: str = None
) -> str:
"""Build AI prompt for onboarding-specific sitemap analysis"""
total_urls = structure_analysis.get("total_urls", 0)
url_patterns = structure_analysis.get("url_patterns", {})
avg_depth = structure_analysis.get("average_path_depth", 0)
publishing_velocity = content_trends.get("publishing_velocity", 0)
competitor_info = ""
if competitors:
competitor_info = f"\nCompetitors to consider: {', '.join(competitors[:5])}"
industry_info = ""
if industry_context:
industry_info = f"\nIndustry Context: {industry_context}"
prompt = f"""
Analyze this website's sitemap for competitive positioning and content strategy insights:
USER WEBSITE: {user_url}
Total URLs: {total_urls}
Average Path Depth: {avg_depth}
Publishing Velocity: {publishing_velocity:.2f} posts/day
{industry_info}{competitor_info}
URL Structure Analysis:
{chr(10).join([f"- {category}: {count} URLs" for category, count in list(url_patterns.items())[:8]])}
Content Publishing Patterns:
- Publishing Rate: {publishing_velocity:.2f} pages per day
- Content Categories: {len(url_patterns)} main categories identified
Please provide competitive analysis insights focusing on:
1. **COMPETITIVE POSITIONING**: How does this site's content structure compare to industry standards?
2. **CONTENT GAPS**: What content categories or topics are missing based on the URL structure?
3. **GROWTH OPPORTUNITIES**: Specific content expansion opportunities to compete better
4. **INDUSTRY BENCHMARKS**: How does publishing frequency and content depth compare to competitors?
5. **STRATEGIC RECOMMENDATIONS**: 3-5 actionable steps for content strategy improvement
Focus on actionable insights that help content creators understand their competitive position and identify growth opportunities.
"""
return prompt
def _get_onboarding_system_prompt(self) -> str:
"""Get system prompt for onboarding sitemap analysis"""
return """You are a competitive intelligence and content strategy expert specializing in website structure analysis for content creators and digital marketers.
Your role is to analyze website sitemaps and provide strategic insights that help users understand their competitive position and identify content opportunities.
Key focus areas:
- Competitive positioning analysis
- Content gap identification
- Growth opportunity recommendations
- Industry benchmarking insights
- Actionable strategic recommendations
Provide practical, data-driven insights that help content creators make informed decisions about their content strategy and competitive positioning.
Format your response as structured insights that can be easily parsed and displayed in a user interface."""
def _parse_onboarding_insights(self, ai_response: str) -> Dict[str, Any]:
"""Parse AI response for onboarding-specific insights"""
try:
# Initialize structured response
insights = {
"competitive_positioning": "Analysis in progress...",
"content_gaps": [],
"growth_opportunities": [],
"industry_benchmarks": [],
"strategic_recommendations": []
}
# Simple parsing logic - look for structured sections
lines = ai_response.split('\n')
current_section = None
for line in lines:
line = line.strip()
if not line:
continue
# Detect sections
if any(keyword in line.lower() for keyword in ['competitive positioning', 'market position']):
current_section = 'competitive_positioning'
insights[current_section] = line
elif any(keyword in line.lower() for keyword in ['content gaps', 'missing content']):
current_section = 'content_gaps'
elif any(keyword in line.lower() for keyword in ['growth opportunities', 'expansion']):
current_section = 'growth_opportunities'
elif any(keyword in line.lower() for keyword in ['industry benchmarks', 'benchmarks']):
current_section = 'industry_benchmarks'
elif any(keyword in line.lower() for keyword in ['strategic recommendations', 'recommendations']):
current_section = 'strategic_recommendations'
elif line.startswith('-') or line.startswith(''):
# This is a list item
if current_section and current_section in insights:
if isinstance(insights[current_section], str):
insights[current_section] = [insights[current_section]]
insights[current_section].append(line[1:].strip())
elif current_section == 'competitive_positioning':
# Append to competitive positioning text
if insights[current_section] == "Analysis in progress...":
insights[current_section] = line
else:
insights[current_section] += " " + line
# Fallback: if no structured parsing worked, use the full response
if insights["competitive_positioning"] == "Analysis in progress...":
insights["competitive_positioning"] = ai_response[:500] + "..." if len(ai_response) > 500 else ai_response
# Ensure lists are properly formatted
for key in ['content_gaps', 'growth_opportunities', 'industry_benchmarks', 'strategic_recommendations']:
if isinstance(insights[key], str):
insights[key] = [insights[key]] if insights[key] else []
return insights
except Exception as e:
logger.error(f"Error parsing onboarding insights: {e}")
return {
"competitive_positioning": ai_response[:300] + "..." if len(ai_response) > 300 else ai_response,
"content_gaps": ["Analysis parsing error - see full response above"],
"growth_opportunities": [],
"industry_benchmarks": [],
"strategic_recommendations": []
}
async def discover_sitemap_url(self, website_url: str) -> Optional[str]:
"""
Intelligently discover the sitemap URL for a given website.
Args:
website_url: The website URL to find sitemap for
Returns:
The discovered sitemap URL or None if not found
"""
try:
# Ensure the URL has a proper scheme
if not urlparse(website_url).scheme:
base_url = f"https://{website_url}"
else:
base_url = website_url.rstrip('/')
logger.info(f"Discovering sitemap for: {base_url}")
# Method 1: Check robots.txt first (most reliable)
sitemap_url = await self._find_sitemap_in_robots_txt(base_url)
if sitemap_url:
logger.info(f"Found sitemap via robots.txt: {sitemap_url}")
return sitemap_url
# Method 2: Check common paths
sitemap_url = await self._find_sitemap_by_common_paths(base_url)
if sitemap_url:
logger.info(f"Found sitemap via common paths: {sitemap_url}")
return sitemap_url
logger.warning(f"No sitemap found for {base_url}")
return None
except Exception as e:
logger.error(f"Error discovering sitemap for {website_url}: {e}")
return None
async def _find_sitemap_in_robots_txt(self, base_url: str) -> Optional[str]:
"""
Check robots.txt for sitemap directives.
Args:
base_url: Base URL of the website
Returns:
Sitemap URL if found in robots.txt, None otherwise
"""
try:
robots_url = urljoin(base_url, "/robots.txt")
logger.debug(f"Checking robots.txt at: {robots_url}")
async with aiohttp.ClientSession() as session:
async with session.get(robots_url, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
content = await response.text()
# Look for sitemap directives (case-insensitive)
sitemap_matches = re.findall(r'^Sitemap:\s*(.+)', content, re.IGNORECASE | re.MULTILINE)
if sitemap_matches:
sitemap_url = sitemap_matches[0].strip()
logger.debug(f"Found sitemap directive in robots.txt: {sitemap_url}")
# Verify the sitemap URL is accessible
if await self._verify_sitemap_url(sitemap_url):
return sitemap_url
else:
logger.warning(f"robots.txt points to inaccessible sitemap: {sitemap_url}")
logger.debug("No sitemap directive found in robots.txt")
else:
logger.debug(f"robots.txt returned HTTP {response.status}")
except Exception as e:
logger.debug(f"Error checking robots.txt: {e}")
return None
async def _find_sitemap_by_common_paths(self, base_url: str) -> Optional[str]:
"""
Check common sitemap paths.
Args:
base_url: Base URL of the website
Returns:
Sitemap URL if found at common paths, None otherwise
"""
try:
logger.debug(f"Checking common sitemap paths for: {base_url}")
# Check paths in parallel for better performance
tasks = []
for path in self.common_sitemap_paths:
full_url = urljoin(base_url, path)
tasks.append(self._check_sitemap_url(full_url, f"common path: /{path}"))
# Wait for all checks to complete
results = await asyncio.gather(*tasks, return_exceptions=True)
# Return the first successful result
for result in results:
if isinstance(result, str) and result:
return result
logger.debug("No sitemap found at common paths")
except Exception as e:
logger.debug(f"Error checking common paths: {e}")
return None
async def _check_sitemap_url(self, url: str, method: str) -> Optional[str]:
"""
Check if a URL is a valid sitemap.
Args:
url: URL to check
method: Method description for logging
Returns:
URL if valid sitemap, None otherwise
"""
try:
headers = {
'User-Agent': 'ALwritySitemapBot/1.0 (https://alwrity.com)',
'Accept': 'application/xml, text/xml, */*'
}
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response:
if response.status == 200:
content_type = response.headers.get('Content-Type', '').lower()
# Check if it's a valid sitemap content type
if any(xml_type in content_type for xml_type in ['xml', 'text', 'application/x-gzip']):
logger.debug(f"Found valid sitemap via {method}: {url} (Content-Type: {content_type})")
return url
else:
# Still consider it if it's 200 but not typical content type
logger.debug(f"Found potential sitemap via {method}: {url} (Content-Type: {content_type})")
return url
elif response.status == 404:
# Skip 404s silently
pass
else:
logger.debug(f"HTTP {response.status} for {url} via {method}")
except Exception as e:
# Skip connection errors silently
logger.debug(f"Connection error for {url}: {e}")
return None
async def _verify_sitemap_url(self, url: str) -> bool:
"""
Verify that a sitemap URL is accessible and returns valid content.
Args:
url: Sitemap URL to verify
Returns:
True if accessible, False otherwise
"""
try:
headers = {
'User-Agent': 'ALwritySitemapBot/1.0 (https://alwrity.com)',
'Accept': 'application/xml, text/xml, */*'
}
async with aiohttp.ClientSession() as session:
async with session.head(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as response:
return response.status == 200
except Exception:
return False

View File

@@ -336,14 +336,49 @@ def validate_step_data(step_number: int, data: Dict[str, Any]) -> List[str]:
errors.append("Invalid website URL format")
elif step_number == 3: # AI Research
if not data or 'research_providers' not in data:
errors.append("At least one research provider must be configured")
elif not data['research_providers']:
errors.append("At least one research provider must be configured")
# Validate that research data is present (competitors, research summary, or sitemap analysis)
if not data:
errors.append("Research data is required for step 3 completion")
else:
# Check for required research fields
has_competitors = 'competitors' in data and data['competitors']
has_research_summary = 'researchSummary' in data and data['researchSummary']
has_sitemap_analysis = 'sitemapAnalysis' in data and data['sitemapAnalysis']
if not (has_competitors or has_research_summary or has_sitemap_analysis):
errors.append("At least one research data field (competitors, researchSummary, or sitemapAnalysis) must be present")
elif step_number == 4: # Personalization
# Optional step, no validation required
pass
# Validate that persona data is present
if not data:
errors.append("Persona data is required for step 4 completion")
else:
# Check for required persona fields
required_persona_fields = ['corePersona', 'platformPersonas']
missing_fields = []
for field in required_persona_fields:
if field not in data or not data[field]:
missing_fields.append(field)
if missing_fields:
errors.append(f"Missing required persona data: {', '.join(missing_fields)}")
# Validate core persona structure if present
if 'corePersona' in data and data['corePersona']:
core_persona = data['corePersona']
if not isinstance(core_persona, dict):
errors.append("corePersona must be a valid object")
elif 'identity' not in core_persona:
errors.append("corePersona must contain identity information")
# Validate platform personas structure if present
if 'platformPersonas' in data and data['platformPersonas']:
platform_personas = data['platformPersonas']
if not isinstance(platform_personas, dict):
errors.append("platformPersonas must be a valid object")
elif len(platform_personas) == 0:
errors.append("At least one platform persona must be configured")
elif step_number == 5: # Integrations
# Optional step, no validation required