ALwrity Version 0.5.1 (Fastapi + React)

This commit is contained in:
ajaysi
2025-08-06 16:29:49 +05:30
parent dbf761c31f
commit 2579c12ba4
331 changed files with 0 additions and 22 deletions

View File

@@ -0,0 +1,283 @@
# Platform Adapters
A flexible and extensible system for managing content across different social media platforms and content management systems.
## Overview
The platform adapters system provides a unified interface for publishing, managing, and analyzing content across multiple platforms. It follows a modular architecture where each platform has its own adapter implementation while maintaining a consistent interface.
## Architecture
### Core Components
1. **Base Platform Adapter (`base.py`)**
- Abstract base class defining the interface for all platform adapters
- Common functionality and error handling
- Standardized response formatting
2. **Platform Manager (`manager.py`)**
- Central manager for handling multiple platform adapters
- Platform initialization and configuration
- Unified content publishing and management
3. **Unified Platform Adapter (`unified.py`)**
- Content adaptation across different platforms
- Platform-specific content generation
- Performance analytics and recommendations
### Current Implementations
#### Twitter Adapter (`twitter.py`)
- Full implementation of Twitter API integration
- Features:
- Tweet publishing with media support
- Content validation
- Analytics and engagement metrics
- Media upload handling
- Rate limit management
#### WordPress Adapter (TBD)
- Planned implementation of WordPress REST API integration
- Features:
- ⏳ Post creation and management
- ⏳ Page management
- ⏳ Media library integration
- ⏳ Category and tag management
- ⏳ Custom post type support
- ⏳ SEO metadata management
- ⏳ Comment moderation
- ⏳ User management
#### Wix Adapter (TBD)
- Planned implementation of Wix API integration
- Features:
- ⏳ Blog post management
- ⏳ Page content management
- ⏳ Media upload and management
- ⏳ SEO settings
- ⏳ Collection management
- ⏳ Form submissions handling
- ⏳ Site settings management
- ⏳ Analytics integration
## Features
### Core Features
- ✅ Multi-platform content publishing
- ✅ Content validation and optimization
- ✅ Analytics and performance tracking
- ✅ Media handling
- ✅ Error handling and logging
- ✅ Platform-specific content adaptation
### Platform-Specific Features
#### Twitter
- ✅ Tweet publishing
- ✅ Media attachments
- ✅ Analytics tracking
- ✅ Content validation
- ✅ Rate limit handling
#### Instagram (TBD)
- ⏳ Post creation
- ⏳ Story publishing
- ⏳ Hashtag optimization
- ⏳ Media handling
#### LinkedIn (TBD)
- ⏳ Post creation
- ⏳ Article publishing
- ⏳ Professional content optimization
- ⏳ Company page integration
#### Facebook (TBD)
- ⏳ Post creation
- ⏳ Page management
- ⏳ Audience targeting
- ⏳ Analytics integration
#### WordPress (TBD)
- ⏳ REST API integration
- ⏳ Content synchronization
- ⏳ Media management
- ⏳ SEO optimization
- ⏳ Custom post types
- ⏳ Plugin integration
#### Wix (TBD)
- ⏳ API integration
- ⏳ Content management
- ⏳ Media handling
- ⏳ SEO settings
- ⏳ Collection management
- ⏳ Analytics integration
## Configuration
Each platform adapter requires specific configuration parameters:
### Twitter Configuration
```python
{
'api_key': 'your_api_key',
'api_secret': 'your_api_secret',
'access_token': 'your_access_token',
'access_token_secret': 'your_access_token_secret'
}
```
### WordPress Configuration
```python
{
'site_url': 'https://your-wordpress-site.com',
'username': 'your_username',
'application_password': 'your_application_password',
'api_version': 'v2'
}
```
### Wix Configuration
```python
{
'site_id': 'your_site_id',
'api_key': 'your_api_key',
'access_token': 'your_access_token'
}
```
## Usage
### Basic Usage
```python
from lib.integrations.platform_adapters.manager import PlatformManager
# Initialize platform manager
config = {
'platforms': {
'twitter': {
'api_key': 'your_api_key',
'api_secret': 'your_api_secret',
'access_token': 'your_access_token',
'access_token_secret': 'your_access_token_secret'
},
'wordpress': {
'site_url': 'https://your-wordpress-site.com',
'username': 'your_username',
'application_password': 'your_application_password'
},
'wix': {
'site_id': 'your_site_id',
'api_key': 'your_api_key',
'access_token': 'your_access_token'
}
}
}
manager = PlatformManager(config)
# Publish content
content = {
'text': 'Hello, World!',
'media': [
{
'url': 'https://example.com/image.jpg',
'type': 'image'
}
]
}
result = await manager.publish_content(content, platforms=['twitter', 'wordpress', 'wix'])
```
## TBD Features
### Platform Support
- [ ] Instagram adapter implementation
- [ ] LinkedIn adapter implementation
- [ ] Facebook adapter implementation
- [ ] YouTube adapter implementation
- [ ] TikTok adapter implementation
- [ ] WordPress adapter implementation
- [ ] Wix adapter implementation
### Content Management
- [ ] Bulk content publishing
- [ ] Content scheduling
- [ ] Content templates
- [ ] A/B testing support
- [ ] Content versioning
- [ ] Cross-platform content synchronization
- [ ] CMS-specific content optimization
### Analytics
- [ ] Cross-platform analytics
- [ ] Custom metric tracking
- [ ] Automated reporting
- [ ] Performance optimization suggestions
- [ ] ROI tracking
- [ ] CMS-specific analytics integration
### Media Handling
- [ ] Advanced media optimization
- [ ] Media library management
- [ ] Automatic media resizing
- [ ] Media format conversion
- [ ] Media metadata management
- [ ] Cross-platform media synchronization
### Security
- [ ] OAuth2 implementation
- [ ] API key rotation
- [ ] Rate limit handling
- [ ] Error recovery
- [ ] Audit logging
- [ ] CMS-specific security features
## Contributing
1. Fork the repository
2. Create a feature branch
3. Implement your changes
4. Add tests
5. Submit a pull request
## Testing
Each platform adapter should include:
- Unit tests
- Integration tests
- Mock API responses
- Error handling tests
- Rate limit tests
- CMS-specific test cases
## Error Handling
The system implements standardized error handling:
- Platform-specific error mapping
- Retry mechanisms
- Error logging
- User-friendly error messages
- CMS-specific error handling
## Logging
Comprehensive logging system:
- Platform operations
- API calls
- Error tracking
- Performance metrics
- Debug information
- CMS-specific logging
## Dependencies
- Python 3.11+
- tweepy (for Twitter integration)
- requests
- loguru
- typing
- datetime
- wordpress-xmlrpc (for WordPress integration)
- wix-api-client (for Wix integration)

View File

@@ -0,0 +1,15 @@
"""
Platform adapters for content publishing and management.
"""
from .base import PlatformAdapter
from .manager import PlatformManager
from .twitter import TwitterAdapter
from .unified import UnifiedPlatformAdapter
__all__ = [
'PlatformAdapter',
'PlatformManager',
'TwitterAdapter',
'UnifiedPlatformAdapter'
]

View File

@@ -0,0 +1,157 @@
"""
Base platform adapter class.
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from datetime import datetime
class PlatformAdapter(ABC):
"""Base class for platform-specific adapters."""
def __init__(self, config: Dict[str, Any]):
"""Initialize platform adapter with configuration."""
self.config = config
self.platform_name = self.__class__.__name__.replace('Adapter', '').upper()
@abstractmethod
async def publish_content(
self,
content: Dict[str, Any],
schedule_time: Optional[datetime] = None
) -> Dict[str, Any]:
"""Publish content to the platform."""
pass
@abstractmethod
async def get_content_status(
self,
content_id: str
) -> Dict[str, Any]:
"""Get the status of published content."""
pass
@abstractmethod
async def delete_content(
self,
content_id: str
) -> Dict[str, Any]:
"""Delete published content."""
pass
@abstractmethod
async def update_content(
self,
content_id: str,
updates: Dict[str, Any]
) -> Dict[str, Any]:
"""Update published content."""
pass
@abstractmethod
async def get_analytics(
self,
content_id: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""Get analytics for published content."""
pass
@abstractmethod
async def validate_content(
self,
content: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate content before publishing."""
pass
@abstractmethod
async def get_optimal_publish_time(
self,
content_type: str,
target_audience: Optional[Dict[str, Any]] = None
) -> datetime:
"""Get optimal publish time for content."""
pass
@abstractmethod
async def get_platform_limits(
self
) -> Dict[str, Any]:
"""Get platform-specific limits and constraints."""
pass
@abstractmethod
async def get_supported_content_types(
self
) -> List[str]:
"""Get list of supported content types."""
pass
@abstractmethod
async def get_platform_metrics(
self
) -> Dict[str, Any]:
"""Get platform-specific metrics and statistics."""
pass
def _format_error_response(
self,
error: Exception,
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Format error response."""
return {
'success': False,
'platform': self.platform_name,
'error': str(error),
'error_type': error.__class__.__name__,
'context': context or {}
}
def _format_success_response(
self,
data: Dict[str, Any],
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Format success response."""
return {
'success': True,
'platform': self.platform_name,
'data': data,
'context': context or {}
}
def _validate_config(self) -> None:
"""Validate platform configuration."""
required_fields = self.get_required_config_fields()
missing_fields = [
field for field in required_fields
if field not in self.config
]
if missing_fields:
raise ValueError(
f"Missing required configuration fields: {', '.join(missing_fields)}"
)
@classmethod
def get_required_config_fields(cls) -> List[str]:
"""Get list of required configuration fields."""
return []
@classmethod
def get_platform_name(cls) -> str:
"""Get platform name."""
return cls.__name__.replace('Adapter', '').upper()
@classmethod
def get_platform_description(cls) -> str:
"""Get platform description."""
return "Base platform adapter"
@classmethod
def get_platform_version(cls) -> str:
"""Get platform adapter version."""
return "1.0.0"

View File

@@ -0,0 +1,284 @@
"""
Platform manager for handling multiple platform adapters.
"""
import logging
from typing import Dict, Any, List, Optional, Type
from datetime import datetime
from .base import PlatformAdapter
from .twitter import TwitterAdapter
from .wix import WixAdapter
logger = logging.getLogger(__name__)
class PlatformManager:
"""Manages multiple platform adapters."""
def __init__(self, config: Dict[str, Any]):
"""Initialize platform manager with configuration."""
self.config = config
self.adapters: Dict[str, PlatformAdapter] = {}
self._initialize_adapters()
def _initialize_adapters(self) -> None:
"""Initialize platform adapters based on configuration."""
platform_configs = self.config.get('platforms', {})
for platform, config in platform_configs.items():
try:
adapter = self._create_adapter(platform, config)
if adapter:
self.adapters[platform] = adapter
logger.info(f"Initialized {platform} adapter")
except Exception as e:
logger.error(f"Failed to initialize {platform} adapter: {str(e)}")
def _create_adapter(
self,
platform: str,
config: Dict[str, Any]
) -> Optional[PlatformAdapter]:
"""Create platform adapter instance."""
adapter_map: Dict[str, Type[PlatformAdapter]] = {
'TWITTER': TwitterAdapter,
'WIX': WixAdapter,
# Add other platform adapters here
}
adapter_class = adapter_map.get(platform.upper())
if not adapter_class:
logger.warning(f"Unsupported platform: {platform}")
return None
try:
return adapter_class(config)
except Exception as e:
raise Exception(
f"Failed to create {platform} adapter: {str(e)}"
)
async def publish_content(
self,
content: Dict[str, Any],
platforms: List[str],
schedule_time: Optional[datetime] = None
) -> Dict[str, Dict[str, Any]]:
"""Publish content to multiple platforms."""
results = {}
for platform in platforms:
if platform not in self.adapters:
results[platform] = {
'success': False,
'error': f"Platform adapter not found: {platform}"
}
continue
try:
result = await self.adapters[platform].publish_content(
content,
schedule_time
)
results[platform] = result
except Exception as e:
results[platform] = {
'success': False,
'error': str(e)
}
return results
async def get_content_status(
self,
content_id: str,
platform: str
) -> Dict[str, Any]:
"""Get content status from a specific platform."""
if platform not in self.adapters:
return {
'success': False,
'error': f"Platform adapter not found: {platform}"
}
try:
return await self.adapters[platform].get_content_status(content_id)
except Exception as e:
return {
'success': False,
'error': str(e)
}
async def delete_content(
self,
content_id: str,
platform: str
) -> Dict[str, Any]:
"""Delete content from a specific platform."""
if platform not in self.adapters:
return {
'success': False,
'error': f"Platform adapter not found: {platform}"
}
try:
return await self.adapters[platform].delete_content(content_id)
except Exception as e:
return {
'success': False,
'error': str(e)
}
async def update_content(
self,
content_id: str,
updates: Dict[str, Any],
platform: str
) -> Dict[str, Any]:
"""Update content on a specific platform."""
if platform not in self.adapters:
return {
'success': False,
'error': f"Platform adapter not found: {platform}"
}
try:
return await self.adapters[platform].update_content(
content_id,
updates
)
except Exception as e:
return {
'success': False,
'error': str(e)
}
async def get_analytics(
self,
content_id: str,
platform: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""Get analytics from a specific platform."""
if platform not in self.adapters:
return {
'success': False,
'error': f"Platform adapter not found: {platform}"
}
try:
return await self.adapters[platform].get_analytics(
content_id,
start_date,
end_date
)
except Exception as e:
return {
'success': False,
'error': str(e)
}
async def validate_content(
self,
content: Dict[str, Any],
platform: str
) -> Dict[str, Any]:
"""Validate content for a specific platform."""
if platform not in self.adapters:
return {
'success': False,
'error': f"Platform adapter not found: {platform}"
}
try:
return await self.adapters[platform].validate_content(content)
except Exception as e:
return {
'success': False,
'error': str(e)
}
async def get_optimal_publish_time(
self,
content_type: str,
platform: str,
target_audience: Optional[Dict[str, Any]] = None
) -> datetime:
"""Get optimal publish time for a specific platform."""
if platform not in self.adapters:
raise Exception(f"Platform adapter not found: {platform}")
return await self.adapters[platform].get_optimal_publish_time(
content_type,
target_audience
)
async def get_platform_limits(
self,
platform: str
) -> Dict[str, Any]:
"""Get platform limits for a specific platform."""
if platform not in self.adapters:
return {
'success': False,
'error': f"Platform adapter not found: {platform}"
}
try:
return await self.adapters[platform].get_platform_limits()
except Exception as e:
return {
'success': False,
'error': str(e)
}
async def get_supported_content_types(
self,
platform: str
) -> List[str]:
"""Get supported content types for a specific platform."""
if platform not in self.adapters:
raise Exception(f"Platform adapter not found: {platform}")
return await self.adapters[platform].get_supported_content_types()
async def get_platform_metrics(
self,
platform: str
) -> Dict[str, Any]:
"""Get platform metrics for a specific platform."""
if platform not in self.adapters:
return {
'success': False,
'error': f"Platform adapter not found: {platform}"
}
try:
return await self.adapters[platform].get_platform_metrics()
except Exception as e:
return {
'success': False,
'error': str(e)
}
def get_available_platforms(self) -> List[str]:
"""Get list of available platform adapters."""
return list(self.adapters.keys())
def get_platform_info(self, platform: str) -> Dict[str, Any]:
"""Get information about a specific platform."""
if platform not in self.adapters:
return {
'success': False,
'error': f"Platform adapter not found: {platform}"
}
adapter = self.adapters[platform]
return {
'success': True,
'name': adapter.get_platform_name(),
'description': adapter.get_platform_description(),
'version': adapter.get_platform_version(),
'required_config': adapter.get_required_config_fields()
}

View File

@@ -0,0 +1,568 @@
"""
Twitter platform adapter implementation with enhanced error handling and real metrics.
"""
from typing import Dict, Any, Optional, List
from datetime import datetime
import tweepy
from tweepy.models import Status
import logging
import time
from .base import PlatformAdapter
logger = logging.getLogger(__name__)
class TwitterAdapter(PlatformAdapter):
"""Enhanced Twitter platform adapter with real metrics and error handling."""
def __init__(self, config: Dict[str, Any]):
"""Initialize Twitter adapter with configuration."""
super().__init__(config)
self._validate_config()
self._initialize_client()
self.rate_limit_tracker = {}
def _initialize_client(self) -> None:
"""Initialize Twitter API client with enhanced error handling."""
try:
# Initialize OAuth handler
auth = tweepy.OAuthHandler(
self.config['api_key'],
self.config['api_secret']
)
auth.set_access_token(
self.config['access_token'],
self.config['access_token_secret']
)
# Create API client with wait_on_rate_limit
self.client = tweepy.API(
auth,
wait_on_rate_limit=True,
retry_count=3,
retry_delay=5
)
# Verify credentials
user = self.client.verify_credentials()
if not user:
raise Exception("Failed to verify Twitter credentials")
logger.info(f"Twitter client initialized for @{user.screen_name}")
except tweepy.Unauthorized:
raise Exception("Invalid Twitter API credentials")
except tweepy.Forbidden:
raise Exception("Access forbidden - check API permissions")
except Exception as e:
raise Exception(f"Failed to initialize Twitter client: {str(e)}")
async def publish_content(
self,
content: Dict[str, Any],
schedule_time: Optional[datetime] = None
) -> Dict[str, Any]:
"""Publish content to Twitter with enhanced error handling."""
try:
# Validate content first
validation = await self.validate_content(content)
if not validation.get('success'):
return validation
# Check rate limits
if not self._check_rate_limit('tweets'):
return self._format_error_response(
Exception("Rate limit exceeded for tweets"),
{'content': content}
)
# Prepare tweet content
tweet_text = content.get('text', '')
media_ids = []
# Handle media attachments if present
if 'media' in content and content['media']:
for media in content['media']:
media_id = self._upload_media(media)
if media_id:
media_ids.append(media_id)
# Create tweet
tweet = self.client.update_status(
status=tweet_text,
media_ids=media_ids if media_ids else None
)
# Update rate limit tracker
self._update_rate_limit_tracker('tweets')
# Format response with comprehensive data
tweet_data = {
'id': tweet.id_str,
'text': tweet.text,
'created_at': tweet.created_at.isoformat(),
'user': {
'screen_name': tweet.user.screen_name,
'name': tweet.user.name,
'followers_count': tweet.user.followers_count
},
'metrics': {
'retweet_count': tweet.retweet_count,
'favorite_count': tweet.favorite_count,
'reply_count': getattr(tweet, 'reply_count', 0)
},
'urls': {
'tweet_url': f"https://twitter.com/{tweet.user.screen_name}/status/{tweet.id_str}"
}
}
return self._format_success_response(tweet_data)
except tweepy.Unauthorized:
return self._format_error_response(
Exception("Authentication failed - please reconnect your account"),
{'content': content}
)
except tweepy.Forbidden as e:
error_msg = "Access forbidden"
if "duplicate" in str(e).lower():
error_msg = "Duplicate tweet detected - please modify your content"
elif "automated" in str(e).lower():
error_msg = "Tweet appears automated - please make it more personal"
return self._format_error_response(
Exception(error_msg),
{'content': content}
)
except tweepy.TooManyRequests:
return self._format_error_response(
Exception("Rate limit exceeded - please wait before posting again"),
{'content': content}
)
except Exception as e:
return self._format_error_response(e, {'content': content})
async def get_content_status(self, content_id: str) -> Dict[str, Any]:
"""Get status of a tweet with real metrics."""
try:
tweet = self.client.get_status(
content_id,
include_entities=True,
tweet_mode='extended'
)
tweet_data = {
'id': tweet.id_str,
'text': tweet.full_text,
'created_at': tweet.created_at.isoformat(),
'metrics': {
'retweet_count': tweet.retweet_count,
'favorite_count': tweet.favorite_count,
'reply_count': getattr(tweet, 'reply_count', 0),
'quote_count': getattr(tweet, 'quote_count', 0)
},
'engagement': {
'engagement_rate': self._calculate_engagement_rate(tweet),
'total_engagement': tweet.retweet_count + tweet.favorite_count + getattr(tweet, 'reply_count', 0)
},
'user': {
'screen_name': tweet.user.screen_name,
'followers_count': tweet.user.followers_count
}
}
return self._format_success_response(tweet_data)
except tweepy.NotFound:
return self._format_error_response(
Exception("Tweet not found - it may have been deleted"),
{'content_id': content_id}
)
except Exception as e:
return self._format_error_response(e, {'content_id': content_id})
async def get_analytics(
self,
content_id: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""Get comprehensive analytics for a tweet."""
try:
# Get tweet details
tweet = self.client.get_status(
content_id,
include_entities=True,
tweet_mode='extended'
)
# Calculate engagement metrics
total_engagement = (
tweet.retweet_count +
tweet.favorite_count +
getattr(tweet, 'reply_count', 0) +
getattr(tweet, 'quote_count', 0)
)
engagement_rate = self._calculate_engagement_rate(tweet)
# Get time-based metrics (if tweet is recent)
time_metrics = self._calculate_time_metrics(tweet)
analytics_data = {
'tweet_id': tweet.id_str,
'metrics': {
'likes': tweet.favorite_count,
'retweets': tweet.retweet_count,
'replies': getattr(tweet, 'reply_count', 0),
'quotes': getattr(tweet, 'quote_count', 0),
'total_engagement': total_engagement,
'impressions': getattr(tweet, 'impression_count', 0) # May not be available
},
'engagement': {
'engagement_rate': engagement_rate,
'likes_rate': (tweet.favorite_count / tweet.user.followers_count * 100) if tweet.user.followers_count > 0 else 0,
'retweets_rate': (tweet.retweet_count / tweet.user.followers_count * 100) if tweet.user.followers_count > 0 else 0
},
'timing': time_metrics,
'audience': {
'followers_at_post': tweet.user.followers_count,
'reach_percentage': (total_engagement / tweet.user.followers_count * 100) if tweet.user.followers_count > 0 else 0
},
'content_analysis': {
'character_count': len(tweet.full_text),
'hashtag_count': len([entity for entity in tweet.entities.get('hashtags', [])]),
'mention_count': len([entity for entity in tweet.entities.get('user_mentions', [])]),
'url_count': len([entity for entity in tweet.entities.get('urls', [])])
}
}
return self._format_success_response(analytics_data)
except Exception as e:
return self._format_error_response(e, {
'content_id': content_id,
'start_date': start_date,
'end_date': end_date
})
async def validate_content(self, content: Dict[str, Any]) -> Dict[str, Any]:
"""Enhanced content validation."""
try:
errors = []
warnings = []
# Check text
text = content.get('text', '')
if not text.strip():
errors.append("Tweet text cannot be empty")
# Check length
if len(text) > 280:
errors.append(f"Tweet text exceeds 280 characters ({len(text)}/280)")
elif len(text) > 270:
warnings.append("Tweet is close to character limit")
# Check for very short tweets
if len(text) < 10:
warnings.append("Very short tweets may get less engagement")
# Check media
media = content.get('media', [])
if len(media) > 4:
errors.append("Maximum 4 media attachments allowed")
# Check for spam indicators
if text.count('#') > 3:
warnings.append("Too many hashtags may reduce engagement")
if text.count('@') > 5:
warnings.append("Too many mentions may appear spammy")
# Check for duplicate content (basic check)
if self._is_potential_duplicate(text):
warnings.append("Content may be similar to recent tweets")
if errors:
return self._format_error_response(
ValueError(f"Validation failed: {'; '.join(errors)}"),
{'content': content, 'warnings': warnings}
)
validation_data = {
'valid': True,
'content': content,
'warnings': warnings,
'suggestions': self._get_content_suggestions(text)
}
return self._format_success_response(validation_data)
except Exception as e:
return self._format_error_response(e, {'content': content})
def _calculate_engagement_rate(self, tweet: Status) -> float:
"""Calculate engagement rate for a tweet."""
try:
total_engagement = (
tweet.favorite_count +
tweet.retweet_count +
getattr(tweet, 'reply_count', 0) +
getattr(tweet, 'quote_count', 0)
)
followers = tweet.user.followers_count
return (total_engagement / followers * 100) if followers > 0 else 0.0
except Exception:
return 0.0
def _calculate_time_metrics(self, tweet: Status) -> Dict[str, Any]:
"""Calculate time-based metrics for a tweet."""
try:
now = datetime.now()
tweet_time = tweet.created_at.replace(tzinfo=None)
age_hours = (now - tweet_time).total_seconds() / 3600
# Calculate engagement velocity (engagement per hour)
total_engagement = (
tweet.favorite_count +
tweet.retweet_count +
getattr(tweet, 'reply_count', 0)
)
engagement_velocity = total_engagement / max(age_hours, 1)
return {
'age_hours': round(age_hours, 2),
'engagement_velocity': round(engagement_velocity, 2),
'peak_engagement_period': self._estimate_peak_period(tweet_time),
'posted_at': tweet_time.isoformat()
}
except Exception:
return {}
def _estimate_peak_period(self, tweet_time: datetime) -> str:
"""Estimate if tweet was posted during peak engagement period."""
hour = tweet_time.hour
if 9 <= hour <= 10:
return "Morning Peak (9-10 AM)"
elif 12 <= hour <= 13:
return "Lunch Peak (12-1 PM)"
elif 19 <= hour <= 21:
return "Evening Peak (7-9 PM)"
else:
return "Off-Peak Hours"
def _check_rate_limit(self, endpoint: str) -> bool:
"""Check if we're within rate limits for an endpoint."""
try:
rate_limits = self.client.get_rate_limit_status()
endpoint_map = {
'tweets': '/statuses/update',
'user_timeline': '/statuses/user_timeline',
'verify_credentials': '/account/verify_credentials'
}
if endpoint in endpoint_map:
limit_info = rate_limits['resources']['statuses'].get(endpoint_map[endpoint])
if limit_info:
return limit_info['remaining'] > 0
return True # Default to allowing if we can't check
except Exception:
return True # Default to allowing if check fails
def _update_rate_limit_tracker(self, endpoint: str) -> None:
"""Update internal rate limit tracker."""
now = time.time()
if endpoint not in self.rate_limit_tracker:
self.rate_limit_tracker[endpoint] = []
# Add current request
self.rate_limit_tracker[endpoint].append(now)
# Clean old requests (older than 15 minutes)
self.rate_limit_tracker[endpoint] = [
timestamp for timestamp in self.rate_limit_tracker[endpoint]
if now - timestamp < 900 # 15 minutes
]
def _is_potential_duplicate(self, text: str) -> bool:
"""Basic check for potential duplicate content."""
# This is a simplified check - in production, you'd want more sophisticated detection
try:
# Get recent tweets from user
recent_tweets = self.client.user_timeline(count=20, tweet_mode='extended')
for tweet in recent_tweets:
# Simple similarity check
if self._calculate_text_similarity(text, tweet.full_text) > 0.8:
return True
return False
except Exception:
return False # If we can't check, assume it's not a duplicate
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
"""Calculate simple text similarity."""
# Simple word-based similarity
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
intersection = words1.intersection(words2)
union = words1.union(words2)
return len(intersection) / len(union) if union else 0.0
def _get_content_suggestions(self, text: str) -> List[str]:
"""Get suggestions for improving tweet content."""
suggestions = []
if len(text) < 50:
suggestions.append("Consider adding more context to increase engagement")
if not any(char in text for char in '!?'):
suggestions.append("Adding punctuation can make tweets more engaging")
if '#' not in text:
suggestions.append("Consider adding 1-2 relevant hashtags")
if not any(emoji_char in text for emoji_char in '😀😃😄😁😆😅😂🤣'):
suggestions.append("Emojis can increase engagement and visual appeal")
return suggestions
async def delete_content(
self,
content_id: str
) -> Dict[str, Any]:
"""Delete a tweet."""
try:
self.client.destroy_status(content_id)
return self._format_success_response({
'id': content_id,
'deleted': True
})
except Exception as e:
return self._format_error_response(
e,
{'content_id': content_id}
)
async def update_content(
self,
content_id: str,
updates: Dict[str, Any]
) -> Dict[str, Any]:
"""Update a tweet."""
try:
# Twitter doesn't support updating tweets
# We'll delete the old one and create a new one
await self.delete_content(content_id)
return await self.publish_content(updates)
except Exception as e:
return self._format_error_response(
e,
{
'content_id': content_id,
'updates': updates
}
)
async def get_optimal_publish_time(
self,
content_type: str,
target_audience: Optional[Dict[str, Any]] = None
) -> datetime:
"""Get optimal publish time for content."""
# Implement optimal time calculation based on:
# - Content type
# - Target audience timezone
# - Historical engagement data
# For now, return current time
return datetime.now()
async def get_platform_limits(
self
) -> Dict[str, Any]:
"""Get Twitter platform limits."""
return self._format_success_response({
'tweet_length': 280,
'media_attachments': 4,
'poll_options': 4,
'poll_duration': 10080, # 7 days in minutes
'rate_limits': {
'tweets_per_day': 2000,
'tweets_per_hour': 100
}
})
async def get_supported_content_types(
self
) -> List[str]:
"""Get list of supported content types."""
return ['TWEET', 'THREAD', 'POLL']
async def get_platform_metrics(
self
) -> Dict[str, Any]:
"""Get Twitter platform metrics."""
try:
account = self.client.verify_credentials()
return self._format_success_response({
'followers_count': account.followers_count,
'following_count': account.friends_count,
'tweets_count': account.statuses_count,
'account_created_at': account.created_at.isoformat()
})
except Exception as e:
return self._format_error_response(e)
def _upload_media(self, media: Dict[str, Any]) -> Optional[str]:
"""Upload media to Twitter."""
try:
if 'url' in media:
# Download media from URL
response = requests.get(media['url'])
media_file = BytesIO(response.content)
elif 'file' in media:
# Use local file
media_file = open(media['file'], 'rb')
else:
return None
# Upload media
media_upload = self.client.media_upload(
filename=media.get('filename', 'media'),
file=media_file
)
return media_upload.media_id_string
except Exception as e:
logger.error(f"Failed to upload media: {str(e)}")
return None
@classmethod
def get_required_config_fields(cls) -> List[str]:
"""Get list of required configuration fields."""
return [
'api_key',
'api_secret',
'access_token',
'access_token_secret'
]
@classmethod
def get_platform_description(cls) -> str:
"""Get platform description."""
return "Twitter platform adapter for posting and managing tweets"
@classmethod
def get_platform_version(cls) -> str:
"""Get platform adapter version."""
return "1.0.0"

View File

@@ -0,0 +1,290 @@
"""
Unified platform adapter for content adaptation across different platforms.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime
from loguru import logger
from lib.utils.website_analyzer.analyzer import WebsiteAnalyzer
from lib.ai_seo_tools.content_gap_analysis.main import ContentGapAnalysis
from lib.ai_seo_tools.content_title_generator import ai_title_generator
from lib.ai_seo_tools.meta_desc_generator import metadesc_generator_main
from lib.ai_seo_tools.seo_structured_data import ai_structured_data
class UnifiedPlatformAdapter:
"""Unified adapter for different social media platforms."""
def __init__(self):
"""Initialize the platform adapter."""
self.platform_handlers = {
'instagram': self._handle_instagram,
'linkedin': self._handle_linkedin,
'twitter': self._handle_twitter,
'facebook': self._handle_facebook
}
logger.info("UnifiedPlatformAdapter initialized")
def generate_content(self, platform: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate content for a specific platform.
Args:
platform: Target platform
data: Content data
Returns:
Dictionary containing generated content
"""
try:
handler = self.platform_handlers.get(platform.lower())
if not handler:
raise ValueError(f"Unsupported platform: {platform}")
return handler(data)
except Exception as e:
error_msg = f"Error generating content for {platform}: {str(e)}"
logger.error(error_msg, exc_info=True)
return {
'error': error_msg,
'content': None
}
def get_content_performance(self, content_item: Dict[str, Any]) -> Dict[str, Any]:
"""Get performance metrics for content across platforms."""
try:
logger.info(f"Getting performance metrics for content: {content_item.get('title', 'Untitled')}")
# Get platform from content item
platform = content_item.get('platforms', ['Unknown'])[0]
# Initialize performance metrics
performance = {
'engagement_metrics': {
'likes': 0,
'comments': 0,
'shares': 0,
'reach': 0
},
'seo_metrics': {
'impressions': 0,
'clicks': 0,
'ctr': 0,
'position': 0
},
'conversion_metrics': {
'conversions': 0,
'conversion_rate': 0,
'revenue': 0
},
'platform_specific': {},
'performance_trends': [],
'recommendations': []
}
# Add platform-specific metrics
if platform == 'WEBSITE':
performance['platform_specific'] = {
'bounce_rate': 0,
'time_on_page': 0,
'page_views': 0
}
return performance
except Exception as e:
error_msg = f"Error getting content performance: {str(e)}"
logger.error(error_msg, exc_info=True)
return {
'error': error_msg,
'metrics': {},
'trends': {},
'recommendations': []
}
def _handle_instagram(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Handle Instagram content generation."""
try:
# Generate Instagram-specific content
caption = metadesc_generator_main(data)
hashtags = self._generate_hashtags(data)
return {
'platform': 'instagram',
'content': {
'caption': caption,
'hashtags': hashtags,
'media_suggestions': self._get_media_suggestions(data)
}
}
except Exception as e:
logger.error(f"Error generating Instagram content: {str(e)}")
return {
'platform': 'instagram',
'error': str(e)
}
def _handle_linkedin(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Handle LinkedIn content generation."""
try:
# Generate LinkedIn-specific content
post = metadesc_generator_main(data)
return {
'platform': 'linkedin',
'content': {
'post': post,
'engagement_optimization': self._get_engagement_suggestions(data),
'media_suggestions': self._get_media_suggestions(data)
}
}
except Exception as e:
logger.error(f"Error generating LinkedIn content: {str(e)}")
return {
'platform': 'linkedin',
'error': str(e)
}
def _handle_twitter(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Handle Twitter content generation."""
try:
# Generate Twitter-specific content
tweet = metadesc_generator_main(data)
hashtags = self._generate_hashtags(data)
return {
'platform': 'twitter',
'content': {
'tweet': tweet,
'hashtags': hashtags,
'thread_structure': self._get_thread_structure(data),
'media_suggestions': self._get_media_suggestions(data)
}
}
except Exception as e:
logger.error(f"Error generating Twitter content: {str(e)}")
return {
'platform': 'twitter',
'error': str(e)
}
def _handle_facebook(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Handle Facebook content generation."""
try:
# Generate Facebook-specific content
post = metadesc_generator_main(data)
return {
'platform': 'facebook',
'content': {
'post': post,
'engagement_optimization': self._get_engagement_suggestions(data),
'media_suggestions': self._get_media_suggestions(data)
}
}
except Exception as e:
logger.error(f"Error generating Facebook content: {str(e)}")
return {
'platform': 'facebook',
'error': str(e)
}
def _generate_hashtags(self, data: Dict[str, Any]) -> List[str]:
"""Generate relevant hashtags for content."""
try:
# Extract keywords from content
keywords = data.get('keywords', [])
# Add platform-specific hashtags
platform = data.get('platform', '').lower()
platform_hashtags = {
'instagram': ['#instagood', '#photooftheday'],
'twitter': ['#trending', '#followme'],
'linkedin': ['#business', '#professional'],
'facebook': ['#social', '#community']
}.get(platform, [])
return keywords + platform_hashtags
except Exception as e:
logger.error(f"Error generating hashtags: {str(e)}")
return []
def _get_media_suggestions(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Get media suggestions for content."""
try:
# Generate media suggestions based on content type
content_type = data.get('type', 'post')
suggestions = []
if content_type == 'blog':
suggestions.append({
'type': 'featured_image',
'description': 'Main blog post image',
'dimensions': '1200x630'
})
elif content_type == 'social':
suggestions.append({
'type': 'post_image',
'description': 'Social media post image',
'dimensions': '1080x1080'
})
return suggestions
except Exception as e:
logger.error(f"Error getting media suggestions: {str(e)}")
return []
def _get_engagement_suggestions(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Get engagement optimization suggestions."""
try:
return {
'best_posting_times': ['9:00 AM', '5:00 PM'],
'engagement_tips': [
'Ask questions to encourage comments',
'Use relevant hashtags',
'Include a clear call-to-action'
],
'content_length': {
'optimal': '150-200 characters',
'maximum': '300 characters'
}
}
except Exception as e:
logger.error(f"Error getting engagement suggestions: {str(e)}")
return {}
def _get_thread_structure(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Get thread structure for Twitter threads."""
try:
content = data.get('content', '')
sentences = content.split('.')
thread = []
current_tweet = ''
for sentence in sentences:
if len(current_tweet + sentence) <= 280:
current_tweet += sentence + '.'
else:
if current_tweet:
thread.append({
'content': current_tweet.strip(),
'type': 'tweet'
})
current_tweet = sentence + '.'
if current_tweet:
thread.append({
'content': current_tweet.strip(),
'type': 'tweet'
})
return thread
except Exception as e:
logger.error(f"Error generating thread structure: {str(e)}")
return []

View File

@@ -0,0 +1,327 @@
"""
Wix platform adapter implementation.
"""
from io import BytesIO
from typing import Dict, Any, Optional, List
from datetime import datetime
import logging
from pathlib import Path
import requests
from .base import PlatformAdapter
from lib.integrations.wix.wix_api_client import WixAPIClient
logger = logging.getLogger(__name__)
class WixAdapter(PlatformAdapter):
"""Wix platform adapter."""
def __init__(self, config: Dict[str, Any]):
"""Initialize Wix adapter with configuration."""
super().__init__(config)
self._validate_config()
self._initialize_client()
def _initialize_client(self) -> None:
"""Initialize Wix API client."""
try:
self.client = WixAPIClient(
api_key=self.config.get('api_key'),
refresh_token=self.config.get('refresh_token'),
site_id=self.config.get('site_id')
)
logger.info("Successfully initialized Wix API client")
except Exception as e:
raise Exception(f"Failed to initialize Wix client: {str(e)}")
async def publish_content(
self,
content: Dict[str, Any],
schedule_time: Optional[datetime] = None
) -> Dict[str, Any]:
"""Publish content to Wix blog."""
try:
# Validate content
validation = await self.validate_content(content)
if not validation.get('success'):
return validation
# Prepare blog post data
post_data = {
'title': content.get('title', ''),
'content': content.get('content', ''),
'excerpt': content.get('excerpt', ''),
'slug': content.get('slug', ''),
'tags': content.get('tags', []),
'categories': content.get('categories', []),
'seo': content.get('seo', {}),
'publish_date': schedule_time.isoformat() if schedule_time else None
}
# Handle media attachments
media_ids = []
if 'media' in content:
for media in content['media']:
media_id = await self._upload_media(media)
if media_id:
media_ids.append(media_id)
# Create blog post
post = self.client.create_post(post_data)
# Add media to post if any
if media_ids:
self.client.add_media_to_post(post['id'], media_ids)
return self._format_success_response({
'id': post['id'],
'title': post['title'],
'url': post['url'],
'created_at': post['created_at']
})
except Exception as e:
return self._format_error_response(
e,
{'content': content, 'schedule_time': schedule_time}
)
async def get_content_status(
self,
content_id: str
) -> Dict[str, Any]:
"""Get status of a blog post."""
try:
post = self.client.get_post(content_id)
return self._format_success_response({
'id': post['id'],
'title': post['title'],
'status': post['status'],
'url': post['url'],
'created_at': post['created_at'],
'updated_at': post['updated_at'],
'published_at': post.get('published_at')
})
except Exception as e:
return self._format_error_response(
e,
{'content_id': content_id}
)
async def delete_content(
self,
content_id: str
) -> Dict[str, Any]:
"""Delete a blog post."""
try:
self.client.delete_post(content_id)
return self._format_success_response({
'id': content_id,
'deleted': True
})
except Exception as e:
return self._format_error_response(
e,
{'content_id': content_id}
)
async def update_content(
self,
content_id: str,
updates: Dict[str, Any]
) -> Dict[str, Any]:
"""Update a blog post."""
try:
post = self.client.update_post(content_id, updates)
return self._format_success_response({
'id': post['id'],
'title': post['title'],
'url': post['url'],
'updated_at': post['updated_at']
})
except Exception as e:
return self._format_error_response(
e,
{
'content_id': content_id,
'updates': updates
}
)
async def get_analytics(
self,
content_id: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""Get analytics for a blog post."""
try:
analytics = self.client.get_post_analytics(
content_id,
start_date,
end_date
)
return self._format_success_response({
'id': content_id,
'metrics': {
'views': analytics.get('views', 0),
'unique_visitors': analytics.get('unique_visitors', 0),
'average_time_on_page': analytics.get('average_time_on_page', 0),
'bounce_rate': analytics.get('bounce_rate', 0)
}
})
except Exception as e:
return self._format_error_response(
e,
{
'content_id': content_id,
'start_date': start_date,
'end_date': end_date
}
)
async def validate_content(
self,
content: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate content before publishing."""
try:
# Check required fields
required_fields = ['title', 'content']
missing_fields = [
field for field in required_fields
if field not in content
]
if missing_fields:
return self._format_error_response(
ValueError(f"Missing required fields: {', '.join(missing_fields)}"),
{'content': content}
)
# Check content length
if len(content['content']) > 100000: # Wix limit
return self._format_error_response(
ValueError("Content exceeds maximum length of 100,000 characters"),
{'content': content}
)
# Check media attachments
media = content.get('media', [])
if len(media) > 20: # Wix limit
return self._format_error_response(
ValueError("Maximum 20 media attachments allowed"),
{'content': content}
)
return self._format_success_response({
'valid': True,
'content': content
})
except Exception as e:
return self._format_error_response(
e,
{'content': content}
)
async def get_optimal_publish_time(
self,
content_type: str,
target_audience: Optional[Dict[str, Any]] = None
) -> datetime:
"""Get optimal publish time for content."""
# Implement optimal time calculation based on:
# - Content type
# - Target audience timezone
# - Historical engagement data
# For now, return current time
return datetime.now()
async def get_platform_limits(
self
) -> Dict[str, Any]:
"""Get Wix platform limits."""
return self._format_success_response({
'content_length': 100000,
'media_attachments': 20,
'tags_per_post': 50,
'categories_per_post': 10,
'rate_limits': {
'posts_per_day': 100,
'media_uploads_per_day': 1000
}
})
async def get_supported_content_types(
self
) -> List[str]:
"""Get list of supported content types."""
return ['BLOG_POST', 'PAGE', 'COLLECTION_ITEM']
async def get_platform_metrics(
self
) -> Dict[str, Any]:
"""Get Wix platform metrics."""
try:
site_stats = self.client.get_site_statistics()
return self._format_success_response({
'total_posts': site_stats.get('total_posts', 0),
'total_views': site_stats.get('total_views', 0),
'total_comments': site_stats.get('total_comments', 0),
'average_engagement': site_stats.get('average_engagement', 0)
})
except Exception as e:
return self._format_error_response(e)
async def _upload_media(
self,
media: Dict[str, Any]
) -> Optional[str]:
"""Upload media to Wix."""
try:
if 'url' in media:
# Download media from URL
response = requests.get(media['url'])
media_file = BytesIO(response.content)
filename = media.get('filename', 'media')
elif 'file' in media:
# Use local file
file_path = Path(media['file'])
media_file = open(file_path, 'rb')
filename = file_path.name
else:
return None
# Upload media
media_id = self.client.upload_media(
file=media_file,
filename=filename,
mime_type=media.get('mime_type')
)
return media_id
except Exception as e:
logger.error(f"Failed to upload media: {str(e)}")
return None
@classmethod
def get_required_config_fields(cls) -> List[str]:
"""Get list of required configuration fields."""
return [
'api_key',
'refresh_token',
'site_id'
]
@classmethod
def get_platform_description(cls) -> str:
"""Get platform description."""
return "Wix platform adapter for managing blog posts and content"
@classmethod
def get_platform_version(cls) -> str:
"""Get platform adapter version."""
return "1.0.0"

View File

@@ -0,0 +1,337 @@
"""
Twitter Authentication Bridge
Connects the platform adapter with the UI authentication system for secure Twitter integration.
"""
import streamlit as st
import tweepy
import json
import os
from typing import Dict, Any, Optional, Tuple
from datetime import datetime, timedelta
from pathlib import Path
import hashlib
import base64
from cryptography.fernet import Fernet
import logging
from .platform_adapters.twitter import TwitterAdapter
logger = logging.getLogger(__name__)
class TwitterAuthBridge:
"""Bridge between Twitter authentication and platform adapter."""
def __init__(self):
self.config_dir = Path("config/twitter")
self.config_dir.mkdir(parents=True, exist_ok=True)
self.encryption_key = self._get_or_create_encryption_key()
def _get_or_create_encryption_key(self) -> bytes:
"""Get or create encryption key for secure credential storage."""
key_file = self.config_dir / "encryption.key"
if key_file.exists():
with open(key_file, 'rb') as f:
return f.read()
else:
key = Fernet.generate_key()
with open(key_file, 'wb') as f:
f.write(key)
return key
def encrypt_credentials(self, credentials: Dict[str, str]) -> str:
"""Encrypt Twitter credentials for secure storage."""
try:
fernet = Fernet(self.encryption_key)
credentials_json = json.dumps(credentials)
encrypted_data = fernet.encrypt(credentials_json.encode())
return base64.b64encode(encrypted_data).decode()
except Exception as e:
logger.error(f"Failed to encrypt credentials: {str(e)}")
raise
def decrypt_credentials(self, encrypted_data: str) -> Dict[str, str]:
"""Decrypt Twitter credentials from secure storage."""
try:
fernet = Fernet(self.encryption_key)
encrypted_bytes = base64.b64decode(encrypted_data.encode())
decrypted_data = fernet.decrypt(encrypted_bytes)
return json.loads(decrypted_data.decode())
except Exception as e:
logger.error(f"Failed to decrypt credentials: {str(e)}")
raise
def save_credentials(self, user_id: str, credentials: Dict[str, str]) -> bool:
"""Save encrypted Twitter credentials to file."""
try:
# Create user-specific credentials file
user_hash = hashlib.sha256(user_id.encode()).hexdigest()[:16]
creds_file = self.config_dir / f"user_{user_hash}.enc"
# Add timestamp and validation
credentials_with_meta = {
**credentials,
'created_at': datetime.now().isoformat(),
'user_id_hash': user_hash
}
# Encrypt and save
encrypted_data = self.encrypt_credentials(credentials_with_meta)
with open(creds_file, 'w') as f:
f.write(encrypted_data)
logger.info(f"Credentials saved for user {user_hash}")
return True
except Exception as e:
logger.error(f"Failed to save credentials: {str(e)}")
return False
def load_credentials(self, user_id: str) -> Optional[Dict[str, str]]:
"""Load and decrypt Twitter credentials from file."""
try:
user_hash = hashlib.sha256(user_id.encode()).hexdigest()[:16]
creds_file = self.config_dir / f"user_{user_hash}.enc"
if not creds_file.exists():
logger.warning(f"No credentials found for user {user_hash}")
return None
# Load and decrypt
with open(creds_file, 'r') as f:
encrypted_data = f.read()
credentials = self.decrypt_credentials(encrypted_data)
# Validate credentials are not expired (optional)
created_at = datetime.fromisoformat(credentials.get('created_at', ''))
if datetime.now() - created_at > timedelta(days=365): # 1 year expiry
logger.warning(f"Credentials expired for user {user_hash}")
return None
# Remove metadata before returning
clean_credentials = {k: v for k, v in credentials.items()
if k not in ['created_at', 'user_id_hash']}
return clean_credentials
except Exception as e:
logger.error(f"Failed to load credentials: {str(e)}")
return None
def delete_credentials(self, user_id: str) -> bool:
"""Delete stored Twitter credentials."""
try:
user_hash = hashlib.sha256(user_id.encode()).hexdigest()[:16]
creds_file = self.config_dir / f"user_{user_hash}.enc"
if creds_file.exists():
creds_file.unlink()
logger.info(f"Credentials deleted for user {user_hash}")
return True
except Exception as e:
logger.error(f"Failed to delete credentials: {str(e)}")
return False
def validate_credentials(self, credentials: Dict[str, str]) -> Tuple[bool, str]:
"""Validate Twitter API credentials."""
try:
# Check required fields
required_fields = ['api_key', 'api_secret', 'access_token', 'access_token_secret']
missing_fields = [field for field in required_fields if not credentials.get(field)]
if missing_fields:
return False, f"Missing required fields: {', '.join(missing_fields)}"
# Test connection
auth = tweepy.OAuthHandler(
credentials['api_key'],
credentials['api_secret']
)
auth.set_access_token(
credentials['access_token'],
credentials['access_token_secret']
)
api = tweepy.API(auth)
user = api.verify_credentials()
if user:
return True, f"Valid credentials for @{user.screen_name}"
else:
return False, "Failed to verify credentials"
except tweepy.Unauthorized:
return False, "Invalid API credentials"
except tweepy.Forbidden:
return False, "Access forbidden - check API permissions"
except tweepy.TooManyRequests:
return False, "Rate limit exceeded - try again later"
except Exception as e:
return False, f"Connection error: {str(e)}"
def get_twitter_adapter(self, user_id: str) -> Optional[TwitterAdapter]:
"""Get configured Twitter adapter for user."""
try:
# First check session state
if 'twitter_adapter' in st.session_state:
return st.session_state.twitter_adapter
# Load credentials
credentials = self.load_credentials(user_id)
if not credentials:
return None
# Validate credentials
is_valid, message = self.validate_credentials(credentials)
if not is_valid:
logger.error(f"Invalid credentials: {message}")
return None
# Create adapter
adapter = TwitterAdapter(credentials)
# Cache in session state
st.session_state.twitter_adapter = adapter
return adapter
except Exception as e:
logger.error(f"Failed to get Twitter adapter: {str(e)}")
return None
def get_user_info(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Get Twitter user information."""
try:
adapter = self.get_twitter_adapter(user_id)
if not adapter:
return None
# Get user info from Twitter
user = adapter.client.verify_credentials()
user_info = {
'id': user.id_str,
'screen_name': user.screen_name,
'name': user.name,
'description': user.description,
'followers_count': user.followers_count,
'friends_count': user.friends_count,
'statuses_count': user.statuses_count,
'profile_image_url': user.profile_image_url_https,
'profile_banner_url': getattr(user, 'profile_banner_url', ''),
'verified': user.verified,
'created_at': user.created_at.isoformat(),
'location': user.location or '',
'url': user.url or ''
}
return user_info
except Exception as e:
logger.error(f"Failed to get user info: {str(e)}")
return None
def setup_session_state(self, user_id: str) -> bool:
"""Setup session state with Twitter authentication."""
try:
# Load credentials
credentials = self.load_credentials(user_id)
if not credentials:
return False
# Get user info
user_info = self.get_user_info(user_id)
if not user_info:
return False
# Setup session state
st.session_state.twitter_authenticated = True
st.session_state.twitter_user_id = user_id
st.session_state.twitter_user_info = user_info
st.session_state.twitter_config = credentials
return True
except Exception as e:
logger.error(f"Failed to setup session state: {str(e)}")
return False
def clear_session_state(self) -> None:
"""Clear Twitter authentication from session state."""
keys_to_clear = [
'twitter_authenticated',
'twitter_user_id',
'twitter_user_info',
'twitter_config',
'twitter_adapter'
]
for key in keys_to_clear:
if key in st.session_state:
del st.session_state[key]
def is_authenticated(self) -> bool:
"""Check if user is authenticated with Twitter."""
return (
st.session_state.get('twitter_authenticated', False) and
st.session_state.get('twitter_user_info') is not None and
st.session_state.get('twitter_config') is not None
)
def get_rate_limit_status(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Get current rate limit status."""
try:
adapter = self.get_twitter_adapter(user_id)
if not adapter:
return None
rate_limits = adapter.client.get_rate_limit_status()
# Extract relevant rate limits
relevant_limits = {
'tweets': rate_limits['resources']['statuses']['/statuses/update'],
'user_timeline': rate_limits['resources']['statuses']['/statuses/user_timeline'],
'verify_credentials': rate_limits['resources']['account']['/account/verify_credentials']
}
return relevant_limits
except Exception as e:
logger.error(f"Failed to get rate limit status: {str(e)}")
return None
# Global instance
twitter_auth = TwitterAuthBridge()
# Convenience functions for UI
def save_twitter_credentials(user_id: str, credentials: Dict[str, str]) -> bool:
"""Save Twitter credentials (convenience function)."""
return twitter_auth.save_credentials(user_id, credentials)
def load_twitter_credentials(user_id: str) -> Optional[Dict[str, str]]:
"""Load Twitter credentials (convenience function)."""
return twitter_auth.load_credentials(user_id)
def get_twitter_adapter(user_id: str) -> Optional[TwitterAdapter]:
"""Get Twitter adapter (convenience function)."""
return twitter_auth.get_twitter_adapter(user_id)
def is_twitter_authenticated() -> bool:
"""Check if Twitter is authenticated (convenience function)."""
return twitter_auth.is_authenticated()
def setup_twitter_session(user_id: str) -> bool:
"""Setup Twitter session (convenience function)."""
return twitter_auth.setup_session_state(user_id)
def clear_twitter_session() -> None:
"""Clear Twitter session (convenience function)."""
twitter_auth.clear_session_state()
def validate_twitter_credentials(credentials: Dict[str, str]) -> Tuple[bool, str]:
"""Validate Twitter credentials (convenience function)."""
return twitter_auth.validate_credentials(credentials)

View File

@@ -0,0 +1,208 @@
# Wix Blog Integration for Alwrity
This integration allows you to publish blog content from Alwrity directly to your Wix site using the Wix REST API.
## Features
- **Blog Post Management**: Create, update, and delete blog posts
- **Media Management**: Upload images and other media files
- **SEO Optimization**: Comprehensive SEO settings and analysis
- **Category Management**: Create and manage blog categories
- **Markdown Support**: Write in markdown and publish as HTML
- **Streamlit UI**: User-friendly interface for publishing
## Prerequisites
Before using this integration, you'll need:
1. A Wix site with the Blog feature enabled
2. Wix API credentials (refresh token and site ID)
3. Python 3.7+ with required dependencies
## Getting Wix API Credentials
To use this integration, you need to obtain a refresh token and site ID from Wix:
1. **Create a Wix Developer Account**:
- Go to [Wix Developers](https://dev.wix.com/) and sign up or log in
- Create a new OAuth app
2. **Configure OAuth App**:
- Set a name and description for your app
- Add redirect URLs (e.g., `https://localhost:3000/oauth/callback`)
- Save the app and note the App ID and App Secret
3. **Get a Refresh Token**:
- Follow the OAuth flow to get an authorization code
- Exchange the code for an access token and refresh token
- Detailed instructions: [Wix OAuth Documentation](https://dev.wix.com/api/rest/getting-started/authentication)
4. **Get Your Site ID**:
- Log in to your Wix account
- Go to your site's dashboard
- The site ID is in the URL: `https://manage.wix.com/dashboard/{SITE_ID}/home`
## Installation
The Wix integration is included with Alwrity. No additional installation is required.
## Usage
### Using the Streamlit UI
1. Navigate to the Wix integration in the Alwrity UI
2. Enter your Wix refresh token and site ID
3. Fill in the blog details and content
4. Click "Publish to Wix"
### Using the Python API
```python
from lib.integrations.wix_integration import WixIntegration
# Initialize the integration
wix = WixIntegration(
refresh_token="YOUR_REFRESH_TOKEN",
site_id="YOUR_SITE_ID"
)
# Publish a blog post
result = wix.publish_blog_post(
title="My Blog Post",
content="# Hello World\n\nThis is my blog post.",
is_markdown=True,
tags=["example", "blog"],
categories=["Technology"],
publish=True
)
# Get the published post URL
post_url = result.get("post", {}).get("url")
print(f"Published at: {post_url}")
```
### Using the Command-Line Interface
```bash
# Set environment variables
export WIX_REFRESH_TOKEN="YOUR_REFRESH_TOKEN"
export WIX_SITE_ID="YOUR_SITE_ID"
# List blog posts
python -m lib.integrations.wix_cli list-posts
# Publish a blog post
python -m lib.integrations.wix_cli publish-post \
--title "My Blog Post" \
--content-file blog.md \
--is-markdown \
--tags "example,blog" \
--categories "Technology"
# Generate an SEO report
python -m lib.integrations.wix_cli seo-report \
--title "My Blog Post" \
--keywords "example,blog,technology"
```
## API Reference
### WixIntegration
The main integration class that provides high-level methods for working with Wix blogs.
#### Methods
- `publish_blog_post(title, content, ...)`: Publish a blog post
- `upload_media(file_path, ...)`: Upload a media file
- `get_seo_report(post_id, target_keywords)`: Generate an SEO report
- `list_blog_posts(limit, offset, ...)`: List blog posts
- `list_categories()`: List blog categories
- `create_category(name, description)`: Create a blog category
- `get_post_by_id(post_id)`: Get a blog post by ID
- `get_post_by_title(title)`: Get a blog post by title
- `delete_post(post_id)`: Delete a blog post
### WixAPIClient
Low-level client for interacting with the Wix API.
### WixBlogManager
Handles blog content management, including markdown processing and image handling.
### WixSEOOptimizer
Provides SEO analysis and optimization for blog posts.
## Error Handling
The integration includes comprehensive error handling:
- API errors are logged with detailed information
- Authentication errors provide clear guidance
- File handling errors include path information
- Network errors include retry logic
## Best Practices
1. **Store credentials securely**:
- Use environment variables or a secure credential store
- Don't hardcode credentials in your code
2. **Optimize images before upload**:
- Compress images to reduce file size
- Use appropriate image formats (JPEG for photos, PNG for graphics)
3. **SEO optimization**:
- Use the SEO report to improve your content
- Include relevant keywords in titles and headings
- Add alt text to all images
4. **Content management**:
- Use categories and tags consistently
- Include featured images for better visual appeal
- Write clear, concise meta descriptions
## Troubleshooting
### Common Issues
1. **Authentication Errors**:
- Ensure your refresh token is valid
- Check that your site ID is correct
- Verify that your app has the necessary permissions
2. **API Rate Limits**:
- The Wix API has rate limits that may affect bulk operations
- Add delays between requests if you're publishing many posts
3. **Image Upload Issues**:
- Check that the image file exists and is readable
- Verify that the image format is supported (JPEG, PNG, GIF)
- Ensure the image file size is within Wix limits
4. **Content Formatting Issues**:
- If using markdown, ensure it's valid
- Check for special characters that might cause issues
- Verify that HTML content is properly formatted
### Getting Help
If you encounter issues not covered here:
1. Check the logs for detailed error messages
2. Consult the [Wix API Documentation](https://dev.wix.com/api/rest/getting-started)
3. Contact Alwrity support for assistance
## License
This integration is part of the Alwrity platform and is subject to the same license terms.
## Acknowledgements
- [Wix REST API](https://dev.wix.com/api/rest) for providing the API endpoints
- [Requests](https://docs.python-requests.org/) for HTTP functionality
- [Markdown](https://python-markdown.github.io/) for markdown processing
- [BeautifulSoup](https://www.crummy.com/software/BeautifulSoup/) for HTML parsing
- [Streamlit](https://streamlit.io/) for the user interface

View File

@@ -0,0 +1,841 @@
"""
Wix API Client for Blog Management
This module provides a comprehensive client for interacting with the Wix API
to manage blog posts, SEO settings, and media uploads.
Documentation: https://dev.wix.com/api/rest/getting-started
"""
import os
import json
import time
import logging
import requests
from typing import Dict, List, Optional, Union, Any, Tuple
from datetime import datetime
import mimetypes
from pathlib import Path
from io import BytesIO
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('wix_api_client')
class WixAPIClient:
"""
Client for interacting with the Wix API for blog management.
This client handles authentication, blog post creation/updating,
media uploads, and SEO settings.
"""
# Base URLs for different Wix API endpoints
BASE_URL = "https://www.wixapis.com"
OAUTH_URL = "https://www.wix.com/oauth"
# API Endpoints
BLOG_API = "/blog/v3"
MEDIA_API = "/site-media/v1"
SEO_API = "/site-properties/v4/seo"
def __init__(
self,
api_key: Optional[str] = None,
refresh_token: Optional[str] = None,
site_id: Optional[str] = None
):
"""
Initialize the Wix API Client.
Args:
api_key: Wix API key (optional if using refresh token)
refresh_token: Wix refresh token for OAuth authentication
site_id: Wix site ID
"""
self.api_key = api_key or os.environ.get('WIX_API_KEY')
self.refresh_token = refresh_token or os.environ.get('WIX_REFRESH_TOKEN')
self.site_id = site_id or os.environ.get('WIX_SITE_ID')
self.access_token = None
self.token_expiry = 0
if not self.refresh_token:
logger.warning("No refresh token provided. Authentication will fail.")
if not self.site_id:
logger.warning("No site ID provided. API calls will fail.")
def _get_headers(self) -> Dict[str, str]:
"""
Get the headers required for API requests.
Returns:
Dict containing the necessary headers for Wix API requests
"""
# Ensure we have a valid access token
self._ensure_valid_token()
headers = {
"Authorization": f"Bearer {self.access_token}",
"wix-site-id": self.site_id,
"Content-Type": "application/json"
}
return headers
def _ensure_valid_token(self) -> None:
"""
Ensure we have a valid access token, refreshing if necessary.
"""
current_time = time.time()
# If token is expired or doesn't exist, refresh it
if not self.access_token or current_time >= self.token_expiry:
self._refresh_access_token()
def _refresh_access_token(self) -> None:
"""
Refresh the access token using the refresh token.
"""
if not self.refresh_token:
raise ValueError("Refresh token is required for authentication")
url = f"{self.OAUTH_URL}/access"
payload = {
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": self.api_key if self.api_key else ""
}
try:
response = requests.post(url, json=payload)
response.raise_for_status()
data = response.json()
self.access_token = data.get("access_token")
# Set token expiry (subtract 5 minutes for safety margin)
expires_in = data.get("expires_in", 3600) # Default to 1 hour if not specified
self.token_expiry = time.time() + expires_in - 300
logger.info("Successfully refreshed access token")
except requests.exceptions.RequestException as e:
logger.error(f"Failed to refresh access token: {str(e)}")
if response.text:
logger.error(f"Response: {response.text}")
raise
def _make_request(
self,
method: str,
endpoint: str,
data: Optional[Dict] = None,
params: Optional[Dict] = None,
files: Optional[Dict] = None
) -> Dict:
"""
Make a request to the Wix API.
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint
data: Request payload
params: Query parameters
files: Files to upload
Returns:
Response data as dictionary
"""
url = f"{self.BASE_URL}{endpoint}"
headers = self._get_headers()
# If we're uploading files, remove the Content-Type header
if files:
headers.pop("Content-Type", None)
try:
response = requests.request(
method=method,
url=url,
headers=headers,
json=data,
params=params,
files=files
)
# Log request details for debugging
logger.debug(f"Request: {method} {url}")
logger.debug(f"Headers: {headers}")
if data:
logger.debug(f"Data: {json.dumps(data)}")
if params:
logger.debug(f"Params: {params}")
# Handle response
response.raise_for_status()
if response.content:
return response.json()
return {}
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error: {str(e)}")
if response.text:
logger.error(f"Response: {response.text}")
raise
except requests.exceptions.RequestException as e:
logger.error(f"Request error: {str(e)}")
raise
def list_posts(
self,
limit: int = 50,
offset: int = 0,
sort_field: str = "lastPublishedDate",
sort_order: str = "desc",
filter_by: Optional[Dict] = None
) -> Dict:
"""
List blog posts with pagination and sorting.
Args:
limit: Maximum number of posts to return (default: 50)
offset: Pagination offset (default: 0)
sort_field: Field to sort by (default: lastPublishedDate)
sort_order: Sort order, 'asc' or 'desc' (default: desc)
filter_by: Optional filter criteria
Returns:
Dictionary containing blog posts and pagination info
"""
endpoint = f"{self.BLOG_API}/posts/query"
payload = {
"limit": limit,
"offset": offset,
"sort": [
{
"fieldName": sort_field,
"order": sort_order
}
]
}
if filter_by:
payload["filter"] = filter_by
return self._make_request("POST", endpoint, data=payload)
def get_post(self, post_id: str) -> Dict:
"""
Get a specific blog post by ID.
Args:
post_id: ID of the blog post
Returns:
Blog post data
"""
endpoint = f"{self.BLOG_API}/posts/{post_id}"
return self._make_request("GET", endpoint)
def create_post(
self,
title: str,
content: str,
excerpt: Optional[str] = None,
featured_image_id: Optional[str] = None,
tags: Optional[List[str]] = None,
categories: Optional[List[str]] = None,
seo_data: Optional[Dict] = None,
publish: bool = False
) -> Dict:
"""
Create a new blog post.
Args:
title: Post title
content: Post content (HTML)
excerpt: Post excerpt/summary
featured_image_id: ID of the featured image (from media manager)
tags: List of tags
categories: List of category IDs
seo_data: SEO settings for the post
publish: Whether to publish the post immediately
Returns:
Created blog post data
"""
endpoint = f"{self.BLOG_API}/posts"
# Prepare the post data
post_data = {
"post": {
"title": title,
"content": content,
"excerpt": excerpt or "",
"featured_image_id": featured_image_id,
"tags": tags or [],
"categoryIds": categories or []
}
}
# Add SEO data if provided
if seo_data:
post_data["post"]["seoData"] = seo_data
# Create the post
response = self._make_request("POST", endpoint, data=post_data)
# Publish the post if requested
if publish and response.get("post", {}).get("id"):
post_id = response["post"]["id"]
self.publish_post(post_id)
# Refresh the post data to get the published version
response = self.get_post(post_id)
return response
def update_post(
self,
post_id: str,
title: Optional[str] = None,
content: Optional[str] = None,
excerpt: Optional[str] = None,
featured_image_id: Optional[str] = None,
tags: Optional[List[str]] = None,
categories: Optional[List[str]] = None,
seo_data: Optional[Dict] = None,
publish: bool = False
) -> Dict:
"""
Update an existing blog post.
Args:
post_id: ID of the post to update
title: New post title (optional)
content: New post content (HTML) (optional)
excerpt: New post excerpt/summary (optional)
featured_image_id: New featured image ID (optional)
tags: New list of tags (optional)
categories: New list of category IDs (optional)
seo_data: New SEO settings (optional)
publish: Whether to publish the post after updating
Returns:
Updated blog post data
"""
# First, get the current post data
current_post = self.get_post(post_id)
if "post" not in current_post:
raise ValueError(f"Post with ID {post_id} not found")
current_post_data = current_post["post"]
# Update only the fields that were provided
update_data = {
"post": {
"id": post_id,
"title": title if title is not None else current_post_data.get("title", ""),
"content": content if content is not None else current_post_data.get("content", ""),
"excerpt": excerpt if excerpt is not None else current_post_data.get("excerpt", ""),
"featured_image_id": featured_image_id if featured_image_id is not None else current_post_data.get("featuredImageId"),
"tags": tags if tags is not None else current_post_data.get("tags", []),
"categoryIds": categories if categories is not None else current_post_data.get("categoryIds", [])
}
}
# Add SEO data if provided
if seo_data:
update_data["post"]["seoData"] = seo_data
elif "seoData" in current_post_data:
update_data["post"]["seoData"] = current_post_data["seoData"]
# Update the post
endpoint = f"{self.BLOG_API}/posts/{post_id}"
response = self._make_request("PATCH", endpoint, data=update_data)
# Publish the post if requested
if publish:
self.publish_post(post_id)
# Refresh the post data to get the published version
response = self.get_post(post_id)
return response
def delete_post(self, post_id: str) -> Dict:
"""
Delete a blog post.
Args:
post_id: ID of the post to delete
Returns:
Response data
"""
endpoint = f"{self.BLOG_API}/posts/{post_id}"
return self._make_request("DELETE", endpoint)
def publish_post(self, post_id: str) -> Dict:
"""
Publish a draft blog post.
Args:
post_id: ID of the post to publish
Returns:
Published post data
"""
endpoint = f"{self.BLOG_API}/posts/{post_id}/publish"
return self._make_request("POST", endpoint)
def unpublish_post(self, post_id: str) -> Dict:
"""
Unpublish a published blog post (revert to draft).
Args:
post_id: ID of the post to unpublish
Returns:
Unpublished post data
"""
endpoint = f"{self.BLOG_API}/posts/{post_id}/unpublish"
return self._make_request("POST", endpoint)
def list_categories(self) -> Dict:
"""
List all blog categories.
Returns:
Dictionary containing blog categories
"""
endpoint = f"{self.BLOG_API}/categories"
return self._make_request("GET", endpoint)
def create_category(self, label: str, description: Optional[str] = None) -> Dict:
"""
Create a new blog category.
Args:
label: Category name
description: Category description (optional)
Returns:
Created category data
"""
endpoint = f"{self.BLOG_API}/categories"
payload = {
"category": {
"label": label,
"description": description or ""
}
}
return self._make_request("POST", endpoint, data=payload)
def update_category(
self,
category_id: str,
label: Optional[str] = None,
description: Optional[str] = None
) -> Dict:
"""
Update an existing blog category.
Args:
category_id: ID of the category to update
label: New category name (optional)
description: New category description (optional)
Returns:
Updated category data
"""
# First, get the current category data
current_categories = self.list_categories()
current_category = None
for category in current_categories.get("categories", []):
if category.get("id") == category_id:
current_category = category
break
if not current_category:
raise ValueError(f"Category with ID {category_id} not found")
# Update only the fields that were provided
update_data = {
"category": {
"id": category_id,
"label": label if label is not None else current_category.get("label", ""),
"description": description if description is not None else current_category.get("description", "")
}
}
endpoint = f"{self.BLOG_API}/categories/{category_id}"
return self._make_request("PATCH", endpoint, data=update_data)
def delete_category(self, category_id: str) -> Dict:
"""
Delete a blog category.
Args:
category_id: ID of the category to delete
Returns:
Response data
"""
endpoint = f"{self.BLOG_API}/categories/{category_id}"
return self._make_request("DELETE", endpoint)
def upload_image(
self,
file_path: str,
title: Optional[str] = None,
alt_text: Optional[str] = None,
description: Optional[str] = None
) -> Dict:
"""
Upload an image to the Wix media manager.
Args:
file_path: Path to the image file
title: Image title (optional)
alt_text: Image alt text for accessibility (optional)
description: Image description (optional)
Returns:
Uploaded image data
"""
# Check if file exists
if not os.path.isfile(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Get file name and mime type
file_name = os.path.basename(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type or not mime_type.startswith('image/'):
raise ValueError(f"File does not appear to be an image: {file_path}")
# Prepare metadata
metadata = {
"title": title or file_name,
"altText": alt_text or "",
"description": description or ""
}
# First, get an upload URL
endpoint = f"{self.MEDIA_API}/files/upload/url"
upload_url_response = self._make_request("POST", endpoint, data={
"mimeType": mime_type,
"fileName": file_name
})
if "uploadUrl" not in upload_url_response:
raise ValueError("Failed to get upload URL")
upload_url = upload_url_response["uploadUrl"]
# Upload the file to the provided URL
with open(file_path, 'rb') as file:
upload_response = requests.post(
upload_url,
files={'file': (file_name, file, mime_type)},
headers={"Content-Type": mime_type}
)
upload_response.raise_for_status()
# Complete the upload with metadata
endpoint = f"{self.MEDIA_API}/files"
complete_data = {
"uploadToken": upload_url_response.get("uploadToken"),
"mediaOptions": {
"mimeType": mime_type,
"fileName": file_name,
"mediaType": "IMAGE",
"title": metadata["title"],
"description": metadata["description"],
"alt": metadata["altText"]
}
}
return self._make_request("POST", endpoint, data=complete_data)
def get_media_item(self, media_id: str) -> Dict:
"""
Get details of a specific media item.
Args:
media_id: ID of the media item
Returns:
Media item data
"""
endpoint = f"{self.MEDIA_API}/files/{media_id}"
return self._make_request("GET", endpoint)
def list_media_items(
self,
media_type: str = "IMAGE",
limit: int = 50,
offset: int = 0
) -> Dict:
"""
List media items with pagination.
Args:
media_type: Type of media to list (IMAGE, VIDEO, AUDIO, DOCUMENT)
limit: Maximum number of items to return
offset: Pagination offset
Returns:
Dictionary containing media items and pagination info
"""
endpoint = f"{self.MEDIA_API}/files/query"
payload = {
"query": {
"paging": {
"limit": limit,
"offset": offset
},
"filter": {
"mediaType": media_type
}
}
}
return self._make_request("POST", endpoint, data=payload)
def delete_media_item(self, media_id: str) -> Dict:
"""
Delete a media item.
Args:
media_id: ID of the media item to delete
Returns:
Response data
"""
endpoint = f"{self.MEDIA_API}/files/{media_id}"
return self._make_request("DELETE", endpoint)
def get_seo_settings(self, page_url: str) -> Dict:
"""
Get SEO settings for a specific page.
Args:
page_url: URL path of the page (e.g., "/blog/my-post")
Returns:
SEO settings data
"""
endpoint = f"{self.SEO_API}/sites/{self.site_id}/url/{page_url}"
return self._make_request("GET", endpoint)
def update_seo_settings(
self,
page_url: str,
title: Optional[str] = None,
description: Optional[str] = None,
keywords: Optional[List[str]] = None,
og_image_url: Optional[str] = None,
structured_data: Optional[Dict] = None,
no_index: Optional[bool] = None
) -> Dict:
"""
Update SEO settings for a specific page.
Args:
page_url: URL path of the page (e.g., "/blog/my-post")
title: SEO title
description: SEO description
keywords: SEO keywords
og_image_url: Open Graph image URL
structured_data: Structured data (JSON-LD)
no_index: Whether to prevent indexing by search engines
Returns:
Updated SEO settings data
"""
# First, get current SEO settings
try:
current_settings = self.get_seo_settings(page_url)
except:
# If the page doesn't exist yet, start with empty settings
current_settings = {"tags": {}}
# Prepare the update data
seo_data = {
"tags": {}
}
# Update only the fields that were provided
if title is not None:
seo_data["tags"]["title"] = title
elif "title" in current_settings.get("tags", {}):
seo_data["tags"]["title"] = current_settings["tags"]["title"]
if description is not None:
seo_data["tags"]["description"] = description
elif "description" in current_settings.get("tags", {}):
seo_data["tags"]["description"] = current_settings["tags"]["description"]
if keywords is not None:
seo_data["tags"]["keywords"] = ", ".join(keywords)
elif "keywords" in current_settings.get("tags", {}):
seo_data["tags"]["keywords"] = current_settings["tags"]["keywords"]
if og_image_url is not None:
seo_data["tags"]["og:image"] = og_image_url
elif "og:image" in current_settings.get("tags", {}):
seo_data["tags"]["og:image"] = current_settings["tags"]["og:image"]
if structured_data is not None:
seo_data["tags"]["jsonld"] = json.dumps(structured_data)
elif "jsonld" in current_settings.get("tags", {}):
seo_data["tags"]["jsonld"] = current_settings["tags"]["jsonld"]
if no_index is not None:
seo_data["tags"]["robots"] = "noindex" if no_index else "index"
elif "robots" in current_settings.get("tags", {}):
seo_data["tags"]["robots"] = current_settings["tags"]["robots"]
endpoint = f"{self.SEO_API}/sites/{self.site_id}/url/{page_url}"
return self._make_request("PUT", endpoint, data=seo_data)
def create_blog_post_with_image(
self,
title: str,
content: str,
image_path: Optional[str] = None,
excerpt: Optional[str] = None,
tags: Optional[List[str]] = None,
categories: Optional[List[str]] = None,
seo_title: Optional[str] = None,
seo_description: Optional[str] = None,
seo_keywords: Optional[List[str]] = None,
publish: bool = False
) -> Dict:
"""
Create a blog post with an optional featured image in one operation.
Args:
title: Post title
content: Post content (HTML)
image_path: Path to featured image (optional)
excerpt: Post excerpt/summary (optional)
tags: List of tags (optional)
categories: List of category IDs (optional)
seo_title: SEO title (optional)
seo_description: SEO description (optional)
seo_keywords: SEO keywords (optional)
publish: Whether to publish the post immediately (optional)
Returns:
Created blog post data
"""
# Upload image if provided
featured_image_id = None
if image_path and os.path.isfile(image_path):
try:
image_response = self.upload_image(
file_path=image_path,
title=title,
alt_text=title
)
featured_image_id = image_response.get("file", {}).get("id")
logger.info(f"Uploaded image with ID: {featured_image_id}")
except Exception as e:
logger.error(f"Failed to upload image: {str(e)}")
# Prepare SEO data
seo_data = None
if seo_title or seo_description or seo_keywords:
seo_data = {
"title": seo_title or title,
"description": seo_description or excerpt or "",
"keywords": seo_keywords or tags or []
}
# Create the blog post
return self.create_post(
title=title,
content=content,
excerpt=excerpt,
featured_image_id=featured_image_id,
tags=tags,
categories=categories,
seo_data=seo_data,
publish=publish
)
def get_or_create_category(self, category_name: str) -> str:
"""
Get a category ID by name, creating it if it doesn't exist.
Args:
category_name: Name of the category
Returns:
Category ID
"""
# List all categories
categories_response = self.list_categories()
categories = categories_response.get("categories", [])
# Check if category exists
for category in categories:
if category.get("label", "").lower() == category_name.lower():
return category.get("id")
# Create category if it doesn't exist
create_response = self.create_category(label=category_name)
return create_response.get("category", {}).get("id")
def get_post_by_slug(self, slug: str) -> Optional[Dict]:
"""
Find a post by its slug.
Args:
slug: Post slug
Returns:
Post data or None if not found
"""
# List posts with a filter for the slug
filter_by = {
"slug": {
"$eq": slug
}
}
response = self.list_posts(limit=1, filter_by=filter_by)
posts = response.get("posts", [])
if posts:
return posts[0]
return None
def get_post_url(self, post_id: str) -> str:
"""
Get the full URL for a blog post.
Args:
post_id: ID of the blog post
Returns:
Full URL to the blog post
"""
post_data = self.get_post(post_id)
slug = post_data.get("post", {}).get("slug", "")
# Get the blog URL prefix
# This is a simplification - in reality, you might need to get this from site settings
return f"/blog/{slug}"

View File

@@ -0,0 +1,720 @@
"""
Wix Blog Manager
This module provides high-level functions for managing blog content on Wix,
including content creation, SEO optimization, and media management.
"""
import os
import re
import logging
import tempfile
import requests
from typing import Dict, List, Optional, Union, Any, Tuple
from datetime import datetime
from pathlib import Path
import markdown
import html2text
from bs4 import BeautifulSoup
from .wix_api_client import WixAPIClient
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('wix_blog_manager')
class WixBlogManager:
"""
High-level manager for Wix blog content.
This class provides convenient methods for common blog management tasks,
building on the lower-level WixAPIClient.
"""
def __init__(
self,
api_key: Optional[str] = None,
refresh_token: Optional[str] = None,
site_id: Optional[str] = None
):
"""
Initialize the Wix Blog Manager.
Args:
api_key: Wix API key (optional if using refresh token)
refresh_token: Wix refresh token for OAuth authentication
site_id: Wix site ID
"""
self.client = WixAPIClient(api_key, refresh_token, site_id)
def publish_markdown_post(
self,
title: str,
markdown_content: str,
featured_image_path: Optional[str] = None,
featured_image_url: Optional[str] = None,
excerpt: Optional[str] = None,
tags: Optional[List[str]] = None,
categories: Optional[List[str]] = None,
seo_title: Optional[str] = None,
seo_description: Optional[str] = None,
seo_keywords: Optional[List[str]] = None,
publish: bool = False
) -> Dict:
"""
Publish a blog post from markdown content.
Args:
title: Post title
markdown_content: Post content in markdown format
featured_image_path: Local path to featured image (optional)
featured_image_url: URL of featured image to download (optional)
excerpt: Post excerpt/summary (optional)
tags: List of tags (optional)
categories: List of category names (optional)
seo_title: SEO title (optional)
seo_description: SEO description (optional)
seo_keywords: SEO keywords (optional)
publish: Whether to publish the post immediately (optional)
Returns:
Published blog post data
"""
# Convert markdown to HTML
html_content = self._markdown_to_html(markdown_content)
# Process images in the content
html_content, embedded_images = self._process_content_images(html_content)
# Handle featured image
featured_image_id = None
temp_image_path = None
if featured_image_url and not featured_image_path:
# Download the image from URL
try:
temp_image_path = self._download_image(featured_image_url)
featured_image_path = temp_image_path
except Exception as e:
logger.error(f"Failed to download featured image: {str(e)}")
if featured_image_path:
try:
image_response = self.client.upload_image(
file_path=featured_image_path,
title=title,
alt_text=title
)
featured_image_id = image_response.get("file", {}).get("id")
logger.info(f"Uploaded featured image with ID: {featured_image_id}")
except Exception as e:
logger.error(f"Failed to upload featured image: {str(e)}")
# Clean up temporary file if created
if temp_image_path and os.path.exists(temp_image_path):
try:
os.remove(temp_image_path)
except:
pass
# Process categories - convert names to IDs
category_ids = []
if categories:
for category_name in categories:
try:
category_id = self.client.get_or_create_category(category_name)
if category_id:
category_ids.append(category_id)
except Exception as e:
logger.error(f"Failed to process category '{category_name}': {str(e)}")
# Generate excerpt if not provided
if not excerpt:
excerpt = self._generate_excerpt(markdown_content)
# Prepare SEO data
seo_data = None
if seo_title or seo_description or seo_keywords:
seo_data = {
"title": seo_title or title,
"description": seo_description or excerpt or "",
"keywords": seo_keywords or tags or []
}
# Create the blog post
response = self.client.create_post(
title=title,
content=html_content,
excerpt=excerpt,
featured_image_id=featured_image_id,
tags=tags,
categories=category_ids,
seo_data=seo_data,
publish=publish
)
# Update SEO settings if the post was published
if publish and response.get("post", {}).get("id"):
post_id = response["post"]["id"]
post_url = self.client.get_post_url(post_id)
try:
self.client.update_seo_settings(
page_url=post_url,
title=seo_title or title,
description=seo_description or excerpt or "",
keywords=seo_keywords or tags,
og_image_url=featured_image_url
)
except Exception as e:
logger.error(f"Failed to update SEO settings: {str(e)}")
return response
def update_markdown_post(
self,
post_id: str,
title: Optional[str] = None,
markdown_content: Optional[str] = None,
featured_image_path: Optional[str] = None,
featured_image_url: Optional[str] = None,
excerpt: Optional[str] = None,
tags: Optional[List[str]] = None,
categories: Optional[List[str]] = None,
seo_title: Optional[str] = None,
seo_description: Optional[str] = None,
seo_keywords: Optional[List[str]] = None,
publish: bool = False
) -> Dict:
"""
Update an existing blog post with markdown content.
Args:
post_id: ID of the post to update
title: New post title (optional)
markdown_content: New post content in markdown format (optional)
featured_image_path: Local path to new featured image (optional)
featured_image_url: URL of new featured image to download (optional)
excerpt: New post excerpt/summary (optional)
tags: New list of tags (optional)
categories: New list of category names (optional)
seo_title: New SEO title (optional)
seo_description: New SEO description (optional)
seo_keywords: New SEO keywords (optional)
publish: Whether to publish the post after updating (optional)
Returns:
Updated blog post data
"""
# Get current post data
current_post = self.client.get_post(post_id)
if "post" not in current_post:
raise ValueError(f"Post with ID {post_id} not found")
# Convert markdown to HTML if provided
html_content = None
if markdown_content:
html_content = self._markdown_to_html(markdown_content)
# Process images in the content
html_content, embedded_images = self._process_content_images(html_content)
# Handle featured image
featured_image_id = None
temp_image_path = None
if featured_image_url and not featured_image_path:
# Download the image from URL
try:
temp_image_path = self._download_image(featured_image_url)
featured_image_path = temp_image_path
except Exception as e:
logger.error(f"Failed to download featured image: {str(e)}")
if featured_image_path:
try:
image_response = self.client.upload_image(
file_path=featured_image_path,
title=title or current_post["post"].get("title", ""),
alt_text=title or current_post["post"].get("title", "")
)
featured_image_id = image_response.get("file", {}).get("id")
logger.info(f"Uploaded featured image with ID: {featured_image_id}")
except Exception as e:
logger.error(f"Failed to upload featured image: {str(e)}")
# Clean up temporary file if created
if temp_image_path and os.path.exists(temp_image_path):
try:
os.remove(temp_image_path)
except:
pass
# Process categories - convert names to IDs
category_ids = None
if categories:
category_ids = []
for category_name in categories:
try:
category_id = self.client.get_or_create_category(category_name)
if category_id:
category_ids.append(category_id)
except Exception as e:
logger.error(f"Failed to process category '{category_name}': {str(e)}")
# Generate excerpt if not provided but markdown is
if not excerpt and markdown_content:
excerpt = self._generate_excerpt(markdown_content)
# Prepare SEO data
seo_data = None
if seo_title or seo_description or seo_keywords:
seo_data = {
"title": seo_title or title or current_post["post"].get("title", ""),
"description": seo_description or excerpt or current_post["post"].get("excerpt", ""),
"keywords": seo_keywords or tags or current_post["post"].get("tags", [])
}
# Update the blog post
response = self.client.update_post(
post_id=post_id,
title=title,
content=html_content,
excerpt=excerpt,
featured_image_id=featured_image_id,
tags=tags,
categories=category_ids,
seo_data=seo_data,
publish=publish
)
# Update SEO settings if needed
if (seo_title or seo_description or seo_keywords or featured_image_url):
post_url = self.client.get_post_url(post_id)
try:
self.client.update_seo_settings(
page_url=post_url,
title=seo_title or title,
description=seo_description or excerpt,
keywords=seo_keywords or tags,
og_image_url=featured_image_url
)
except Exception as e:
logger.error(f"Failed to update SEO settings: {str(e)}")
return response
def find_post_by_title(self, title: str) -> Optional[Dict]:
"""
Find a post by its title (exact match).
Args:
title: Post title to search for
Returns:
Post data or None if not found
"""
# List all posts (this is inefficient but Wix API doesn't support filtering by title)
# In a production environment, you might want to implement pagination
response = self.client.list_posts(limit=100)
posts = response.get("posts", [])
for post in posts:
if post.get("title") == title:
return post
return None
def publish_or_update_markdown_post(
self,
title: str,
markdown_content: str,
featured_image_path: Optional[str] = None,
featured_image_url: Optional[str] = None,
excerpt: Optional[str] = None,
tags: Optional[List[str]] = None,
categories: Optional[List[str]] = None,
seo_title: Optional[str] = None,
seo_description: Optional[str] = None,
seo_keywords: Optional[List[str]] = None,
publish: bool = False,
update_if_exists: bool = True
) -> Dict:
"""
Publish a new post or update an existing one with the same title.
Args:
title: Post title
markdown_content: Post content in markdown format
featured_image_path: Local path to featured image (optional)
featured_image_url: URL of featured image to download (optional)
excerpt: Post excerpt/summary (optional)
tags: List of tags (optional)
categories: List of category names (optional)
seo_title: SEO title (optional)
seo_description: SEO description (optional)
seo_keywords: SEO keywords (optional)
publish: Whether to publish the post immediately (optional)
update_if_exists: Whether to update an existing post with the same title (optional)
Returns:
Published or updated blog post data
"""
# Check if a post with this title already exists
existing_post = self.find_post_by_title(title)
if existing_post and update_if_exists:
# Update existing post
logger.info(f"Updating existing post with title: {title}")
return self.update_markdown_post(
post_id=existing_post["id"],
title=title,
markdown_content=markdown_content,
featured_image_path=featured_image_path,
featured_image_url=featured_image_url,
excerpt=excerpt,
tags=tags,
categories=categories,
seo_title=seo_title,
seo_description=seo_description,
seo_keywords=seo_keywords,
publish=publish
)
else:
# Create new post
logger.info(f"Creating new post with title: {title}")
return self.publish_markdown_post(
title=title,
markdown_content=markdown_content,
featured_image_path=featured_image_path,
featured_image_url=featured_image_url,
excerpt=excerpt,
tags=tags,
categories=categories,
seo_title=seo_title,
seo_description=seo_description,
seo_keywords=seo_keywords,
publish=publish
)
def optimize_seo_for_post(
self,
post_id: str,
seo_title: Optional[str] = None,
seo_description: Optional[str] = None,
seo_keywords: Optional[List[str]] = None,
og_image_url: Optional[str] = None,
structured_data: Optional[Dict] = None
) -> Dict:
"""
Optimize SEO settings for an existing blog post.
Args:
post_id: ID of the blog post
seo_title: SEO title (optional)
seo_description: SEO description (optional)
seo_keywords: SEO keywords (optional)
og_image_url: Open Graph image URL (optional)
structured_data: Structured data (JSON-LD) (optional)
Returns:
Updated SEO settings data
"""
# Get the post URL
post_url = self.client.get_post_url(post_id)
# Update SEO settings
return self.client.update_seo_settings(
page_url=post_url,
title=seo_title,
description=seo_description,
keywords=seo_keywords,
og_image_url=og_image_url,
structured_data=structured_data
)
def generate_structured_data(
self,
post_id: str,
author_name: str,
publisher_name: str,
publisher_logo_url: str
) -> Dict:
"""
Generate structured data (JSON-LD) for a blog post.
Args:
post_id: ID of the blog post
author_name: Name of the author
publisher_name: Name of the publisher
publisher_logo_url: URL of the publisher's logo
Returns:
Structured data as a dictionary
"""
# Get post data
post_data = self.client.get_post(post_id)
post = post_data.get("post", {})
# Get post URL
post_url = self.client.get_post_url(post_id)
# Create structured data
structured_data = {
"@context": "https://schema.org",
"@type": "BlogPosting",
"headline": post.get("title", ""),
"description": post.get("excerpt", ""),
"author": {
"@type": "Person",
"name": author_name
},
"publisher": {
"@type": "Organization",
"name": publisher_name,
"logo": {
"@type": "ImageObject",
"url": publisher_logo_url
}
},
"datePublished": post.get("publishedDate", ""),
"dateModified": post.get("lastPublishedDate", "")
}
# Add featured image if available
if post.get("featuredImageId"):
try:
media_item = self.client.get_media_item(post["featuredImageId"])
image_url = media_item.get("file", {}).get("url", "")
if image_url:
structured_data["image"] = image_url
except:
pass
return structured_data
def apply_structured_data_to_post(
self,
post_id: str,
author_name: str,
publisher_name: str,
publisher_logo_url: str
) -> Dict:
"""
Generate and apply structured data to a blog post.
Args:
post_id: ID of the blog post
author_name: Name of the author
publisher_name: Name of the publisher
publisher_logo_url: URL of the publisher's logo
Returns:
Updated SEO settings data
"""
# Generate structured data
structured_data = self.generate_structured_data(
post_id=post_id,
author_name=author_name,
publisher_name=publisher_name,
publisher_logo_url=publisher_logo_url
)
# Get the post URL
post_url = self.client.get_post_url(post_id)
# Update SEO settings with structured data
return self.client.update_seo_settings(
page_url=post_url,
structured_data=structured_data
)
# Helper methods
def _markdown_to_html(self, markdown_content: str) -> str:
"""
Convert markdown content to HTML.
Args:
markdown_content: Content in markdown format
Returns:
HTML content
"""
# Use the markdown library to convert to HTML
html = markdown.markdown(
markdown_content,
extensions=['extra', 'codehilite', 'tables', 'toc']
)
return html
def _html_to_markdown(self, html_content: str) -> str:
"""
Convert HTML content to markdown.
Args:
html_content: Content in HTML format
Returns:
Markdown content
"""
# Use html2text to convert HTML to markdown
h = html2text.HTML2Text()
h.ignore_links = False
h.ignore_images = False
h.ignore_tables = False
h.ignore_emphasis = False
return h.handle(html_content)
def _process_content_images(self, html_content: str) -> Tuple[str, List[Dict]]:
"""
Process images in HTML content, uploading them to Wix and replacing URLs.
Args:
html_content: HTML content with image tags
Returns:
Tuple of (updated HTML content, list of uploaded image data)
"""
soup = BeautifulSoup(html_content, 'html.parser')
img_tags = soup.find_all('img')
uploaded_images = []
for img in img_tags:
src = img.get('src', '')
alt = img.get('alt', '')
# Skip images that are already hosted on Wix
if 'wixstatic.com' in src:
continue
# Handle images with data URLs
if src.startswith('data:image'):
logger.info("Skipping data URL image - not supported in this implementation")
continue
# Handle remote images
if src.startswith('http://') or src.startswith('https://'):
try:
# Download the image
temp_path = self._download_image(src)
# Upload to Wix
image_response = self.client.upload_image(
file_path=temp_path,
title=alt or "Blog image",
alt_text=alt or "Blog image"
)
# Get the new URL
new_url = image_response.get("file", {}).get("url", "")
if new_url:
# Replace the src attribute
img['src'] = new_url
uploaded_images.append({
'original_url': src,
'wix_url': new_url,
'wix_id': image_response.get("file", {}).get("id", "")
})
# Clean up temp file
if os.path.exists(temp_path):
os.remove(temp_path)
except Exception as e:
logger.error(f"Failed to process image {src}: {str(e)}")
# Handle local images (not implemented in this version)
else:
logger.info(f"Skipping local image {src} - not supported in this implementation")
# Return the updated HTML
return str(soup), uploaded_images
def _download_image(self, url: str) -> str:
"""
Download an image from a URL to a temporary file.
Args:
url: URL of the image
Returns:
Path to the downloaded temporary file
"""
response = requests.get(url, stream=True)
response.raise_for_status()
# Determine file extension
content_type = response.headers.get('content-type', '')
extension = '.jpg' # Default
if 'image/jpeg' in content_type:
extension = '.jpg'
elif 'image/png' in content_type:
extension = '.png'
elif 'image/gif' in content_type:
extension = '.gif'
elif 'image/webp' in content_type:
extension = '.webp'
# Create a temporary file
fd, temp_path = tempfile.mkstemp(suffix=extension)
os.close(fd)
# Write the image data to the file
with open(temp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return temp_path
def _generate_excerpt(self, markdown_content: str, max_length: int = 160) -> str:
"""
Generate an excerpt from markdown content.
Args:
markdown_content: Content in markdown format
max_length: Maximum length of the excerpt
Returns:
Generated excerpt
"""
# Convert markdown to plain text
h = html2text.HTML2Text()
h.ignore_links = True
h.ignore_images = True
h.ignore_tables = True
h.ignore_emphasis = True
# First convert markdown to HTML, then HTML to plain text
html = markdown.markdown(markdown_content)
plain_text = h.handle(html)
# Clean up the text
plain_text = re.sub(r'\s+', ' ', plain_text).strip()
# Truncate to max_length
if len(plain_text) <= max_length:
return plain_text
# Try to truncate at a sentence boundary
sentences = re.split(r'(?<=[.!?])\s+', plain_text)
excerpt = ""
for sentence in sentences:
if len(excerpt + sentence) <= max_length:
excerpt += sentence + " "
else:
break
# If we couldn't get a full sentence, just truncate
if not excerpt:
excerpt = plain_text[:max_length-3] + "..."
return excerpt.strip()

View File

@@ -0,0 +1,350 @@
"""
Wix Blog Publisher for Alwrity
This module integrates the Wix API with the Alwrity AI Writer platform,
allowing users to publish generated blog content directly to their Wix site.
"""
import os
import logging
import tempfile
import streamlit as st
from typing import Dict, List, Optional, Union, Any, Tuple
from pathlib import Path
from .wix_integration import WixIntegration
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('wix_blog_publisher')
def publish_to_wix(
title: str,
content: str,
is_markdown: bool = True,
featured_image_path: Optional[str] = None,
featured_image_url: Optional[str] = None,
excerpt: Optional[str] = None,
tags: Optional[List[str]] = None,
categories: Optional[List[str]] = None,
seo_title: Optional[str] = None,
seo_description: Optional[str] = None,
seo_keywords: Optional[List[str]] = None,
author_name: Optional[str] = None,
publisher_name: Optional[str] = None,
publisher_logo_url: Optional[str] = None,
publish: bool = True,
update_if_exists: bool = True,
api_key: Optional[str] = None,
refresh_token: Optional[str] = None,
site_id: Optional[str] = None
) -> Dict:
"""
Publish a blog post to Wix.
Args:
title: Post title
content: Post content (markdown or HTML)
is_markdown: Whether the content is in markdown format
featured_image_path: Local path to featured image (optional)
featured_image_url: URL of featured image to download (optional)
excerpt: Post excerpt/summary (optional)
tags: List of tags (optional)
categories: List of category names (optional)
seo_title: SEO title (optional)
seo_description: SEO description (optional)
seo_keywords: SEO keywords (optional)
author_name: Name of the author (optional)
publisher_name: Name of the publisher (optional)
publisher_logo_url: URL of the publisher's logo (optional)
publish: Whether to publish the post immediately (optional)
update_if_exists: Whether to update an existing post with the same title (optional)
api_key: Wix API key (optional if using refresh token)
refresh_token: Wix refresh token for OAuth authentication
site_id: Wix site ID
Returns:
Published blog post data
"""
# Initialize Wix integration
wix = WixIntegration(api_key, refresh_token, site_id)
# Publish the blog post
return wix.publish_blog_post(
title=title,
content=content,
is_markdown=is_markdown,
featured_image_path=featured_image_path,
featured_image_url=featured_image_url,
excerpt=excerpt,
tags=tags,
categories=categories,
seo_title=seo_title,
seo_description=seo_description,
seo_keywords=seo_keywords,
author_name=author_name,
publisher_name=publisher_name,
publisher_logo_url=publisher_logo_url,
publish=publish,
update_if_exists=update_if_exists
)
def wix_blog_publisher_ui():
"""
Streamlit UI for publishing blog posts to Wix.
"""
st.title("Publish to Wix")
st.write("Publish your blog content directly to your Wix site.")
# Authentication settings
st.header("Wix Authentication")
# Check for saved credentials
if "wix_refresh_token" in st.session_state and "wix_site_id" in st.session_state:
st.success("✅ Wix credentials are saved in this session.")
show_saved = st.checkbox("Show saved credentials")
if show_saved:
st.text_input("Refresh Token", value=st.session_state.wix_refresh_token, type="password", disabled=True)
st.text_input("Site ID", value=st.session_state.wix_site_id, disabled=True)
clear_creds = st.button("Clear saved credentials")
if clear_creds:
if "wix_refresh_token" in st.session_state:
del st.session_state.wix_refresh_token
if "wix_site_id" in st.session_state:
del st.session_state.wix_site_id
st.rerun()
else:
col1, col2 = st.columns(2)
with col1:
refresh_token = st.text_input("Wix Refresh Token", type="password", help="Your Wix refresh token for API authentication")
with col2:
site_id = st.text_input("Wix Site ID", help="Your Wix site ID")
save_creds = st.checkbox("Save credentials for this session", value=True)
if st.button("Validate Credentials"):
if not refresh_token:
st.error("Refresh token is required.")
return
if not site_id:
st.error("Site ID is required.")
return
# Try to initialize Wix integration to validate credentials
try:
wix = WixIntegration(refresh_token=refresh_token, site_id=site_id)
# Test API call
site_info = wix.get_site_info()
if site_info.get("status") == "connected":
st.success(f"✅ Credentials validated successfully! Found {site_info.get('post_count', 0)} posts and {site_info.get('category_count', 0)} categories.")
# Save credentials if requested
if save_creds:
st.session_state.wix_refresh_token = refresh_token
st.session_state.wix_site_id = site_id
st.rerun()
else:
st.error(f"❌ Failed to validate credentials: {site_info.get('error', 'Unknown error')}")
except Exception as e:
st.error(f"❌ Failed to validate credentials: {str(e)}")
return
# Blog content section
st.header("Blog Content")
# Check if we have content in session state (from other parts of the app)
blog_title = st.text_input(
"Blog Title",
value=st.session_state.get("blog_title", ""),
help="The title of your blog post"
)
content_type = st.radio(
"Content Format",
["Markdown", "HTML"],
horizontal=True,
help="The format of your blog content"
)
is_markdown = content_type == "Markdown"
blog_content = st.text_area(
"Blog Content",
value=st.session_state.get("blog_content", ""),
height=300,
help="The content of your blog post"
)
# Featured image
st.subheader("Featured Image")
image_source = st.radio(
"Image Source",
["None", "Upload", "URL"],
horizontal=True,
help="How to provide the featured image"
)
featured_image_path = None
featured_image_url = None
if image_source == "Upload":
uploaded_file = st.file_uploader("Upload Featured Image", type=["jpg", "jpeg", "png", "gif"])
if uploaded_file:
# Save the uploaded file to a temporary location
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{uploaded_file.name.split('.')[-1]}") as tmp:
tmp.write(uploaded_file.getvalue())
featured_image_path = tmp.name
elif image_source == "URL":
featured_image_url = st.text_input("Featured Image URL", help="URL of the featured image")
# Blog metadata
st.header("Blog Metadata")
col1, col2 = st.columns(2)
with col1:
excerpt = st.text_area(
"Excerpt",
value=st.session_state.get("blog_excerpt", ""),
help="A short summary of your blog post"
)
tags_input = st.text_input(
"Tags (comma-separated)",
value=", ".join(st.session_state.get("blog_tags", [])) if isinstance(st.session_state.get("blog_tags", []), list) else st.session_state.get("blog_tags", ""),
help="Tags for your blog post, separated by commas"
)
tags = [tag.strip() for tag in tags_input.split(",")] if tags_input else None
categories_input = st.text_input(
"Categories (comma-separated)",
value=", ".join(st.session_state.get("blog_categories", [])) if isinstance(st.session_state.get("blog_categories", []), list) else st.session_state.get("blog_categories", ""),
help="Categories for your blog post, separated by commas"
)
categories = [cat.strip() for cat in categories_input.split(",")] if categories_input else None
with col2:
author_name = st.text_input("Author Name", help="Name of the blog post author")
publisher_name = st.text_input("Publisher Name", help="Name of the blog publisher (usually your site name)")
publisher_logo_url = st.text_input("Publisher Logo URL", help="URL of the publisher's logo")
# SEO settings
with st.expander("SEO Settings"):
seo_title = st.text_input("SEO Title", value=blog_title, help="Title for search engines (defaults to blog title)")
seo_description = st.text_area("SEO Description", value=excerpt, help="Description for search engines (defaults to excerpt)")
seo_keywords_input = st.text_input("SEO Keywords (comma-separated)", value=tags_input, help="Keywords for search engines (defaults to tags)")
seo_keywords = [kw.strip() for kw in seo_keywords_input.split(",")] if seo_keywords_input else None
# Publishing options
st.header("Publishing Options")
col1, col2 = st.columns(2)
with col1:
publish = not st.checkbox("Save as draft", help="If checked, the post will be saved as a draft instead of being published")
with col2:
update_if_exists = st.checkbox("Update if exists", value=True, help="If checked, an existing post with the same title will be updated")
# Publish button
if st.button("Publish to Wix", type="primary"):
if not blog_title:
st.error("Blog title is required.")
return
if not blog_content:
st.error("Blog content is required.")
return
# Get credentials
refresh_token = st.session_state.get("wix_refresh_token")
site_id = st.session_state.get("wix_site_id")
if not refresh_token or not site_id:
st.error("Wix credentials are required. Please enter them in the authentication section.")
return
# Show progress
with st.spinner("Publishing to Wix..."):
try:
# Publish to Wix
result = publish_to_wix(
title=blog_title,
content=blog_content,
is_markdown=is_markdown,
featured_image_path=featured_image_path,
featured_image_url=featured_image_url,
excerpt=excerpt,
tags=tags,
categories=categories,
seo_title=seo_title,
seo_description=seo_description,
seo_keywords=seo_keywords,
author_name=author_name,
publisher_name=publisher_name,
publisher_logo_url=publisher_logo_url,
publish=publish,
update_if_exists=update_if_exists,
refresh_token=refresh_token,
site_id=site_id
)
# Clean up temporary file if created
if featured_image_path and os.path.exists(featured_image_path) and featured_image_path.startswith(tempfile.gettempdir()):
try:
os.remove(featured_image_path)
except:
pass
# Show success message
st.success("✅ Blog post published successfully!")
# Show post details
post = result.get("post", {})
st.subheader("Published Post Details")
col1, col2 = st.columns(2)
with col1:
st.write(f"**Title:** {post.get('title', 'N/A')}")
st.write(f"**Status:** {post.get('status', 'N/A')}")
st.write(f"**ID:** {post.get('id', 'N/A')}")
with col2:
st.write(f"**Published Date:** {post.get('publishedDate', 'N/A')}")
st.write(f"**URL:** {post.get('url', 'N/A')}")
st.write(f"**Tags:** {', '.join(post.get('tags', []))}")
# Add a view button if URL is available
if post.get("url"):
st.markdown(f"[View Post]({post.get('url')})")
# Add SEO report button
if st.button("Generate SEO Report"):
with st.spinner("Generating SEO report..."):
try:
wix = WixIntegration(refresh_token=refresh_token, site_id=site_id)
seo_report = wix.get_seo_report(post.get("id"), seo_keywords or tags or [])
st.subheader("SEO Report")
st.write(f"**SEO Score:** {seo_report.get('seo_score', 0):.1f}/100")
st.write("**Recommendations:**")
for i, rec in enumerate(seo_report.get("recommendations", [])):
st.write(f"{i+1}. {rec}")
except Exception as e:
st.error(f"Failed to generate SEO report: {str(e)}")
except Exception as e:
st.error(f"❌ Failed to publish blog post: {str(e)}")
logger.error(f"Failed to publish blog post: {str(e)}")
# For testing the UI directly
if __name__ == "__main__":
wix_blog_publisher_ui()

View File

@@ -0,0 +1,388 @@
"""
Wix Integration for Alwrity
This module provides a high-level interface for integrating Wix blog functionality
with the Alwrity AI Writer platform.
"""
import os
import logging
import json
from typing import Dict, List, Optional, Union, Any, Tuple
from pathlib import Path
from .wix_api_client import WixAPIClient
from .wix_blog_manager import WixBlogManager
from .wix_seo_optimizer import WixSEOOptimizer
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('wix_integration')
class WixIntegration:
"""
Main integration class for Wix blog functionality.
This class provides a simplified interface for common operations,
combining the functionality of the API client, blog manager, and SEO optimizer.
"""
def __init__(
self,
api_key: Optional[str] = None,
refresh_token: Optional[str] = None,
site_id: Optional[str] = None
):
"""
Initialize the Wix Integration.
Args:
api_key: Wix API key (optional if using refresh token)
refresh_token: Wix refresh token for OAuth authentication
site_id: Wix site ID
"""
self.api_client = WixAPIClient(api_key, refresh_token, site_id)
self.blog_manager = WixBlogManager(api_key, refresh_token, site_id)
self.seo_optimizer = WixSEOOptimizer(api_key, refresh_token, site_id)
def publish_blog_post(
self,
title: str,
content: str,
is_markdown: bool = True,
featured_image_path: Optional[str] = None,
featured_image_url: Optional[str] = None,
excerpt: Optional[str] = None,
tags: Optional[List[str]] = None,
categories: Optional[List[str]] = None,
seo_title: Optional[str] = None,
seo_description: Optional[str] = None,
seo_keywords: Optional[List[str]] = None,
author_name: Optional[str] = None,
publisher_name: Optional[str] = None,
publisher_logo_url: Optional[str] = None,
publish: bool = True,
update_if_exists: bool = True
) -> Dict:
"""
Publish a blog post with comprehensive SEO optimization.
Args:
title: Post title
content: Post content (markdown or HTML)
is_markdown: Whether the content is in markdown format
featured_image_path: Local path to featured image (optional)
featured_image_url: URL of featured image to download (optional)
excerpt: Post excerpt/summary (optional)
tags: List of tags (optional)
categories: List of category names (optional)
seo_title: SEO title (optional)
seo_description: SEO description (optional)
seo_keywords: SEO keywords (optional)
author_name: Name of the author (optional)
publisher_name: Name of the publisher (optional)
publisher_logo_url: URL of the publisher's logo (optional)
publish: Whether to publish the post immediately (optional)
update_if_exists: Whether to update an existing post with the same title (optional)
Returns:
Published blog post data
"""
# Generate SEO data if not provided
if not seo_keywords and tags:
seo_keywords = tags
if not seo_title:
seo_title = title
if not seo_description and not excerpt:
if is_markdown:
# Generate description from markdown content
seo_description = self.blog_manager._generate_excerpt(content)
else:
# Generate description from HTML content
seo_description = self.seo_optimizer.generate_meta_description(content)
elif not seo_description:
seo_description = excerpt
# Publish or update the post
if is_markdown:
response = self.blog_manager.publish_or_update_markdown_post(
title=title,
markdown_content=content,
featured_image_path=featured_image_path,
featured_image_url=featured_image_url,
excerpt=excerpt,
tags=tags,
categories=categories,
seo_title=seo_title,
seo_description=seo_description,
seo_keywords=seo_keywords,
publish=publish,
update_if_exists=update_if_exists
)
else:
# Find existing post or create new one
existing_post = self.blog_manager.find_post_by_title(title)
if existing_post and update_if_exists:
# Update existing post
response = self.api_client.update_post(
post_id=existing_post["id"],
title=title,
content=content,
excerpt=excerpt,
tags=tags,
categories=[self.api_client.get_or_create_category(cat) for cat in categories] if categories else None,
seo_data={
"title": seo_title,
"description": seo_description,
"keywords": seo_keywords or []
},
publish=publish
)
else:
# Create new post
response = self.api_client.create_post(
title=title,
content=content,
excerpt=excerpt,
tags=tags,
categories=[self.api_client.get_or_create_category(cat) for cat in categories] if categories else None,
seo_data={
"title": seo_title,
"description": seo_description,
"keywords": seo_keywords or []
},
publish=publish
)
# Apply additional SEO optimization if the post was published
if publish and response.get("post", {}).get("id"):
post_id = response["post"]["id"]
# Apply structured data if author and publisher info is provided
if author_name and publisher_name and publisher_logo_url:
try:
self.seo_optimizer.apply_structured_data_to_post(
post_id=post_id,
author_name=author_name,
publisher_name=publisher_name,
publisher_logo_url=publisher_logo_url
)
except Exception as e:
logger.error(f"Failed to apply structured data: {str(e)}")
# Apply comprehensive SEO optimization
try:
self.seo_optimizer.apply_seo_optimization(
post_id=post_id,
title=seo_title,
description=seo_description,
keywords=seo_keywords,
author_name=author_name,
publisher_name=publisher_name,
publisher_logo_url=publisher_logo_url,
og_image_url=featured_image_url
)
except Exception as e:
logger.error(f"Failed to apply SEO optimization: {str(e)}")
return response
def upload_media(
self,
file_path: str,
title: Optional[str] = None,
alt_text: Optional[str] = None,
description: Optional[str] = None
) -> Dict:
"""
Upload a media file to Wix.
Args:
file_path: Path to the media file
title: Media title (optional)
alt_text: Media alt text (optional)
description: Media description (optional)
Returns:
Uploaded media data
"""
return self.api_client.upload_image(
file_path=file_path,
title=title,
alt_text=alt_text,
description=description
)
def get_seo_report(self, post_id: str, target_keywords: List[str]) -> Dict:
"""
Generate a comprehensive SEO report for a blog post.
Args:
post_id: ID of the blog post
target_keywords: List of target keywords
Returns:
Dictionary with SEO report data
"""
return self.seo_optimizer.generate_seo_report(post_id, target_keywords)
def list_blog_posts(
self,
limit: int = 50,
offset: int = 0,
sort_field: str = "lastPublishedDate",
sort_order: str = "desc"
) -> Dict:
"""
List blog posts with pagination and sorting.
Args:
limit: Maximum number of posts to return (default: 50)
offset: Pagination offset (default: 0)
sort_field: Field to sort by (default: lastPublishedDate)
sort_order: Sort order, 'asc' or 'desc' (default: desc)
Returns:
Dictionary containing blog posts and pagination info
"""
return self.api_client.list_posts(
limit=limit,
offset=offset,
sort_field=sort_field,
sort_order=sort_order
)
def list_categories(self) -> Dict:
"""
List all blog categories.
Returns:
Dictionary containing blog categories
"""
return self.api_client.list_categories()
def create_category(self, name: str, description: Optional[str] = None) -> str:
"""
Create a new blog category.
Args:
name: Category name
description: Category description (optional)
Returns:
ID of the created category
"""
response = self.api_client.create_category(
label=name,
description=description
)
return response.get("category", {}).get("id", "")
def get_post_by_id(self, post_id: str) -> Dict:
"""
Get a blog post by ID.
Args:
post_id: ID of the blog post
Returns:
Blog post data
"""
return self.api_client.get_post(post_id)
def get_post_by_title(self, title: str) -> Optional[Dict]:
"""
Get a blog post by title.
Args:
title: Title of the blog post
Returns:
Blog post data or None if not found
"""
return self.blog_manager.find_post_by_title(title)
def delete_post(self, post_id: str) -> Dict:
"""
Delete a blog post.
Args:
post_id: ID of the blog post
Returns:
Response data
"""
return self.api_client.delete_post(post_id)
def update_post_status(self, post_id: str, publish: bool = True) -> Dict:
"""
Update the publication status of a blog post.
Args:
post_id: ID of the blog post
publish: Whether to publish (True) or unpublish (False) the post
Returns:
Updated blog post data
"""
if publish:
return self.api_client.publish_post(post_id)
else:
return self.api_client.unpublish_post(post_id)
def search_posts(self, query: str, limit: int = 10) -> List[Dict]:
"""
Search for blog posts by content or title.
Args:
query: Search query
limit: Maximum number of results to return
Returns:
List of matching blog posts
"""
# First try to find by title
title_matches = []
try:
all_posts = self.list_blog_posts(limit=100)["posts"]
for post in all_posts:
if query.lower() in post.get("title", "").lower():
title_matches.append(post)
if len(title_matches) >= limit:
break
except Exception as e:
logger.error(f"Error searching posts by title: {str(e)}")
return title_matches[:limit]
def get_site_info(self) -> Dict:
"""
Get information about the Wix site.
Returns:
Dictionary with site information
"""
try:
# Make a simple API call to verify credentials and get site info
posts = self.list_blog_posts(limit=1)
categories = self.list_categories()
return {
"site_id": self.api_client.site_id,
"post_count": posts.get("totalCount", 0),
"category_count": len(categories.get("categories", [])),
"status": "connected"
}
except Exception as e:
logger.error(f"Error getting site info: {str(e)}")
return {
"site_id": self.api_client.site_id,
"status": "error",
"error": str(e)
}

View File

@@ -0,0 +1,335 @@
import os
import sys
import mimetypes
import requests
from requests.auth import HTTPBasicAuth
import base64
import json
from clint.textui import progress
from PIL import Image
import tempfile
import os
from loguru import logger
logger.remove()
logger.add(sys.stdout,
colorize=True,
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
)
## Check if blog needs to be posted on wordpress.
#if wordpress:
## Fixme: Fetch all tags and categories to check, if present ones are present and
## use them else create new ones. Its better to use chatgpt than string comparison.
## Similar tags and categories will be missed.
## blog_categories =
## blog_tags =
#logger.info("Uploading the blog to wordpress.\n")
#main_img_path = compress_image(main_img_path, quality=85)
#try:
# img_details = analyze_and_extract_details_from_image(main_img_path)
# alt_text = img_details.get('alt_text')
# img_description = img_details.get('description')
# img_title = img_details.get('title')
# caption = img_details.get('caption')
# try:
# media = upload_media(wordpress_url, wordpress_username, wordpress_password,
# main_img_path, alt_text, img_description, img_title, caption)
# except Exception as err:
# sys.exit(f"Error occurred in upload_media: {err}")
#except Exception as e:
# sys.exit(f"Error occurred in analyze_and_extract_details_from_image: {e}")
#
## Then create the post with the uploaded media as the featured image
#media_id = media['id']
#blog_markdown_str = convert_markdown_to_html(blog_markdown_str)
#try:
# upload_blog_post(wordpress_url, wordpress_username, wordpress_password, a_blog_topic,
# blog_markdown_str, media_id, blog_meta_desc, blog_categories, blog_tags, status='publish')
#except Exception as err:
# sys.exit(f"Failed to upload blog to wordpress.Error: {err}")
def compress_image(image_path, quality=85):
"""
Compress the image by reducing its quality and logger.info size information.
:param image_path: Path to the original image
:param quality: Quality of the output image (1-100), lower means more compression
:return: Path to the compressed image
"""
if not os.path.exists(image_path):
raise ValueError(f"Provided image path does not exist: {image_path}")
# Get the size of the original image
original_size = os.path.getsize(image_path)
# Open the image
with Image.open(image_path) as img:
# Define the format based on the original image format
img_format = img.format
# Create a temporary file to save the compressed image
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.' + img_format.lower())
# Save the image with reduced quality
img.save(temp_file, format=img_format, quality=quality, optimize=True)
# Get the size of the compressed image
compressed_size = os.path.getsize(temp_file.name)
# Calculate the percentage reduction
reduction = (1 - (compressed_size / original_size)) * 100
logger.info("########### Image Compression ###############")
logger.info(f"Compressing the image, Original size: {original_size / 1024:.2f} KB")
logger.info(f"Compressed size: {compressed_size / 1024:.2f} KB")
logger.info(f"Reduction in image size: {reduction:.2f}%")
# TBD: https://tinypng.com/developers/reference/python
logger.info(f"Note: Consider converting images to JPEG/WebP format.\n\n")
return temp_file.name
def create_wordpress_tag(url, username, app_password, tag_name):
"""
Create a new tag in WordPress using the REST API and return its ID.
:param url: URL of the WordPress site (e.g., 'https://example.com')
:param username: WordPress username
:param app_password: WordPress application password
:param tag_name: Name of the tag to be created
:return: ID of the created tag or error message
"""
api_endpoint = f"{url}/wp-json/wp/v2/tags"
headers = {
'Content-Type': 'application/json',
}
data = {
'name': tag_name,
}
response = requests.post(api_endpoint, json=data, auth=HTTPBasicAuth(username, app_password), headers=headers)
if response.status_code == 201:
return response.json().get('id') # Return the ID of the created tag
else:
return response.text
def create_wordpress_category(url, username, app_password, category_name):
"""
Create a new category in WordPress using the REST API and return its ID.
:param url: URL of the WordPress site (e.g., 'https://example.com')
:param username: WordPress username
:param app_password: WordPress application password
:param category_name: Name of the category to be created
:return: ID of the created category or error message
"""
api_endpoint = f"{url}/wp-json/wp/v2/categories"
headers = {
'Content-Type': 'application/json',
}
data = {
'name': category_name,
}
response = requests.post(api_endpoint, json=data, auth=HTTPBasicAuth(username, app_password), headers=headers)
if response.status_code == 201:
return response.json().get('id') # Return the ID of the created category
else:
return response.text
def get_all_wordpress_categories(url, username, password):
"""
Get all categories from WordPress.
:param url: URL of the WordPress site
:param username: WordPress username
:param password: WordPress application password
:return: Dictionary of category names and their IDs
"""
logger.info("Fetching all wordpress categories to create Or use exsiting.")
categories = {}
api_endpoint = f"{url}/wp-json/wp/v2/categories"
response = requests.get(api_endpoint, auth=HTTPBasicAuth(username, password))
if response.status_code == 200:
for category in response.json():
categories[category['name']] = category['id']
return categories
else:
return "Error: " + response.text
def get_all_wordpress_tags(url, username, password):
"""
Get all tags from WordPress.
:param url: URL of the WordPress site
:param username: WordPress username
:param password: WordPress application password
:return: Dictionary of tag names and their IDs
"""
logger.info("Fetching all tags from wordpress to create or use existing tag.")
tags = {}
api_endpoint = f"{url}/wp-json/wp/v2/tags"
response = requests.get(api_endpoint, auth=HTTPBasicAuth(username, password))
if response.status_code == 200:
for tag in response.json():
tags[tag['name']] = tag['id']
return tags
else:
return "Error: " + response.text
def create_or_get_wordpress_category(url, username, password, category_name):
"""
Create a new category or get existing one from WordPress.
:param url: URL of the WordPress site
:param username: WordPress username
:param password: WordPress application password
:param category_name: Name of the category
:return: ID of the category
"""
existing_categories = get_all_wordpress_categories(url, username, password)
if category_name in existing_categories:
return existing_categories[category_name]
else:
return create_wordpress_category(url, username, password, category_name)
def create_or_get_wordpress_tag(url, username, password, tag_name):
"""
Create a new tag or get existing one from WordPress.
:param url: URL of the WordPress site
:param username: WordPress username
:param password: WordPress application password
:param tag_name: Name of the tag
:return: ID of the tag
"""
existing_tags = get_all_wordpress_tags(url, username, password)
if tag_name in existing_tags:
return existing_tags[tag_name]
else:
return create_wordpress_tag(url, username, password, tag_name)
def upload_media(url, username, password, media_path, alt_text, description, title, caption):
"""
Upload media to WordPress site with alt text, description, title, and caption.
:param url: URL of your WordPress site
:param username: Your WordPress username
:param password: Your WordPress password
:param media_path: Path to the media file
:param alt_text: Alternative text for the image
:param description: Description of the media
:param title: Title of the media
:param caption: Caption for the media
"""
if not os.path.exists(media_path):
logger.info(f"File not found: {media_path}")
return None
mime_type, _ = mimetypes.guess_type(media_path)
if mime_type is None:
logger.info(f"Unable to determine MIME type for the file: {media_path}")
return None
credentials = username + ':' + password
token = base64.b64encode(credentials.encode())
header = {
'Authorization': 'Basic ' + token.decode('utf-8'),
'Content-Disposition': 'attachment; filename={}'.format(os.path.basename(media_path))
}
with open(media_path, 'rb', encoding="utf-8") as media:
media_name = os.path.basename(media_path)
files = {'file': (media_name, media, mime_type)}
# Upload the media file
response = requests.post(url + '/wp-json/wp/v2/media', headers=header, files=files)
if response.status_code == 201:
logger.info("Media uploaded successfully.")
media_id = response.json()['id']
# Update media with alt text, description, title, and caption
media_data = {
'alt_text': alt_text,
'description': description,
'title': title,
'caption': caption
}
media_update_response = requests.post(f"{url}/wp-json/wp/v2/media/{media_id}", headers=header, json=media_data)
if media_update_response.status_code == 200:
logger.info("Media updated with alt text, description, title, and caption successfully.")
return media_update_response.json()
else:
logger.error("Failed to update media.")
logger.error(f"Response:{media_update_response.content}")
return None
else:
logger.error("Failed to upload media.")
logger.error("Response:{response.content}")
return None
def upload_blog_post(url, username, password, title, content, media_id, meta_desc, categories=None, tags=None, status='draft'):
"""
Upload a blog post to a WordPress site.
https://developer.wordpress.org/rest-api/reference/posts/#create-a-post
:param url: URL of your WordPress site
:param username: Your WordPress username
:param password: Your WordPress password
:param title: Title of the blog post
:param content: Content of the blog post
:param media_id: ID of the uploaded media to be set as the featured image
:param categories: List of category IDs
:param tags: List of tag IDs
:param status: Status of the post ('draft', 'publish', etc.)
"""
credentials = username + ':' + password
token = base64.b64encode(credentials.encode())
header = {'Authorization': 'Basic ' + token.decode('utf-8')}
# Prepare the data for the post
# https://developer.wordpress.org/rest-api/reference/posts/#schema-meta
post = {
'title': title,
'content': content,
# One of: publish, future, draft, pending, private
'status': status,
'excerpt': meta_desc,
'featured_media': media_id,
#'categories': categories,
#'tags': tags,
'meta': {
'description': meta_desc # This depends on your WordPress setup
}
}
#if categories:
# post['categories'] = categories
# Make the request
response = requests.post(url + '/wp-json/wp/v2/posts', headers=header, json=post)
# Check response
if response.status_code == 201:
logger.info("Blog to wordpress, uploaded successfully.")
return json.loads(response.content)
else:
logger.error("Blog upload to wordpress Failed.")
logger.error(f"Response: {response.content}") # Print response content for debugging
return None