Files
ALwrity/lib/integrations/platform_adapters/twitter.py

303 lines
9.8 KiB
Python

"""
Twitter platform adapter implementation.
"""
from typing import Dict, Any, Optional, List
from datetime import datetime
import tweepy
from tweepy.models import Status
from .base import PlatformAdapter
class TwitterAdapter(PlatformAdapter):
"""Twitter platform adapter."""
def __init__(self, config: Dict[str, Any]):
"""Initialize Twitter adapter with configuration."""
super().__init__(config)
self._validate_config()
self._initialize_client()
def _initialize_client(self) -> None:
"""Initialize Twitter API client."""
try:
auth = tweepy.OAuthHandler(
self.config['api_key'],
self.config['api_secret']
)
auth.set_access_token(
self.config['access_token'],
self.config['access_token_secret']
)
self.client = tweepy.API(auth)
self.client.verify_credentials()
except Exception as e:
raise Exception(
f"Failed to initialize Twitter client: {str(e)}"
)
async def publish_content(
self,
content: Dict[str, Any],
schedule_time: Optional[datetime] = None
) -> Dict[str, Any]:
"""Publish content to Twitter."""
try:
# Validate content
validation = await self.validate_content(content)
if not validation.get('success'):
return validation
# Prepare tweet content
tweet_text = content.get('text', '')
media_ids = []
# Handle media attachments if present
if 'media' in content:
for media in content['media']:
media_id = self._upload_media(media)
if media_id:
media_ids.append(media_id)
# Create tweet
tweet = self.client.update_status(
status=tweet_text,
media_ids=media_ids if media_ids else None
)
return self._format_success_response({
'id': tweet.id_str,
'text': tweet.text,
'created_at': tweet.created_at.isoformat()
})
except Exception as e:
return self._format_error_response(
e,
{'content': content, 'schedule_time': schedule_time}
)
async def get_content_status(
self,
content_id: str
) -> Dict[str, Any]:
"""Get status of a tweet."""
try:
tweet = self.client.get_status(content_id)
return self._format_success_response({
'id': tweet.id_str,
'text': tweet.text,
'created_at': tweet.created_at.isoformat(),
'favorite_count': tweet.favorite_count,
'retweet_count': tweet.retweet_count
})
except Exception as e:
return self._format_error_response(
e,
{'content_id': content_id}
)
async def delete_content(
self,
content_id: str
) -> Dict[str, Any]:
"""Delete a tweet."""
try:
self.client.destroy_status(content_id)
return self._format_success_response({
'id': content_id,
'deleted': True
})
except Exception as e:
return self._format_error_response(
e,
{'content_id': content_id}
)
async def update_content(
self,
content_id: str,
updates: Dict[str, Any]
) -> Dict[str, Any]:
"""Update a tweet."""
try:
# Twitter doesn't support updating tweets
# We'll delete the old one and create a new one
await self.delete_content(content_id)
return await self.publish_content(updates)
except Exception as e:
return self._format_error_response(
e,
{
'content_id': content_id,
'updates': updates
}
)
async def get_analytics(
self,
content_id: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict[str, Any]:
"""Get analytics for a tweet."""
try:
tweet = self.client.get_status(content_id)
return self._format_success_response({
'id': tweet.id_str,
'metrics': {
'favorites': tweet.favorite_count,
'retweets': tweet.retweet_count,
'replies': tweet.reply_count if hasattr(tweet, 'reply_count') else 0,
'impressions': tweet.impression_count if hasattr(tweet, 'impression_count') else 0
},
'engagement_rate': self._calculate_engagement_rate(tweet)
})
except Exception as e:
return self._format_error_response(
e,
{
'content_id': content_id,
'start_date': start_date,
'end_date': end_date
}
)
async def validate_content(
self,
content: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate content before publishing."""
try:
# Check text length
text = content.get('text', '')
if len(text) > 280:
return self._format_error_response(
ValueError("Tweet text exceeds 280 characters"),
{'content': content}
)
# Check media attachments
media = content.get('media', [])
if len(media) > 4:
return self._format_error_response(
ValueError("Maximum 4 media attachments allowed"),
{'content': content}
)
return self._format_success_response({
'valid': True,
'content': content
})
except Exception as e:
return self._format_error_response(
e,
{'content': content}
)
async def get_optimal_publish_time(
self,
content_type: str,
target_audience: Optional[Dict[str, Any]] = None
) -> datetime:
"""Get optimal publish time for content."""
# Implement optimal time calculation based on:
# - Content type
# - Target audience timezone
# - Historical engagement data
# For now, return current time
return datetime.now()
async def get_platform_limits(
self
) -> Dict[str, Any]:
"""Get Twitter platform limits."""
return self._format_success_response({
'tweet_length': 280,
'media_attachments': 4,
'poll_options': 4,
'poll_duration': 10080, # 7 days in minutes
'rate_limits': {
'tweets_per_day': 2000,
'tweets_per_hour': 100
}
})
async def get_supported_content_types(
self
) -> List[str]:
"""Get list of supported content types."""
return ['TWEET', 'THREAD', 'POLL']
async def get_platform_metrics(
self
) -> Dict[str, Any]:
"""Get Twitter platform metrics."""
try:
account = self.client.verify_credentials()
return self._format_success_response({
'followers_count': account.followers_count,
'following_count': account.friends_count,
'tweets_count': account.statuses_count,
'account_created_at': account.created_at.isoformat()
})
except Exception as e:
return self._format_error_response(e)
def _calculate_engagement_rate(self, tweet: Status) -> float:
"""Calculate engagement rate for a tweet."""
try:
total_engagement = (
tweet.favorite_count +
tweet.retweet_count +
(tweet.reply_count if hasattr(tweet, 'reply_count') else 0)
)
followers = tweet.user.followers_count
return (total_engagement / followers * 100) if followers > 0 else 0.0
except Exception:
return 0.0
def _upload_media(self, media: Dict[str, Any]) -> Optional[str]:
"""Upload media to Twitter."""
try:
if 'url' in media:
# Download media from URL
response = requests.get(media['url'])
media_file = BytesIO(response.content)
elif 'file' in media:
# Use local file
media_file = open(media['file'], 'rb')
else:
return None
# Upload media
media_upload = self.client.media_upload(
filename=media.get('filename', 'media'),
file=media_file
)
return media_upload.media_id_string
except Exception as e:
logger.error(f"Failed to upload media: {str(e)}")
return None
@classmethod
def get_required_config_fields(cls) -> List[str]:
"""Get list of required configuration fields."""
return [
'api_key',
'api_secret',
'access_token',
'access_token_secret'
]
@classmethod
def get_platform_description(cls) -> str:
"""Get platform description."""
return "Twitter platform adapter for posting and managing tweets"
@classmethod
def get_platform_version(cls) -> str:
"""Get platform adapter version."""
return "1.0.0"