Files
ALwrity/backend/services/integrations/wordpress_content.py
2025-10-08 10:13:14 +05:30

321 lines
12 KiB
Python

"""
WordPress Content Management Module
Handles content creation, media upload, and publishing to WordPress sites.
"""
import os
import json
import base64
import mimetypes
import tempfile
from typing import Optional, Dict, List, Any, Union
from datetime import datetime
import requests
from requests.auth import HTTPBasicAuth
from PIL import Image
from loguru import logger
class WordPressContentManager:
"""Manages WordPress content operations including posts, media, and taxonomies."""
def __init__(self, site_url: str, username: str, app_password: str):
"""Initialize with WordPress site credentials."""
self.site_url = site_url.rstrip('/')
self.username = username
self.app_password = app_password
self.api_base = f"{self.site_url}/wp-json/wp/v2"
self.auth = HTTPBasicAuth(username, app_password)
def _make_request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
"""Make authenticated request to WordPress API."""
try:
url = f"{self.api_base}/{endpoint.lstrip('/')}"
response = requests.request(method, url, auth=self.auth, **kwargs)
if response.status_code in [200, 201]:
return response.json()
else:
logger.error(f"WordPress API error: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"WordPress API request error: {e}")
return None
def get_categories(self) -> List[Dict[str, Any]]:
"""Get all categories from WordPress site."""
try:
result = self._make_request('GET', 'categories', params={'per_page': 100})
if result:
logger.info(f"Retrieved {len(result)} categories from {self.site_url}")
return result
return []
except Exception as e:
logger.error(f"Error getting categories: {e}")
return []
def get_tags(self) -> List[Dict[str, Any]]:
"""Get all tags from WordPress site."""
try:
result = self._make_request('GET', 'tags', params={'per_page': 100})
if result:
logger.info(f"Retrieved {len(result)} tags from {self.site_url}")
return result
return []
except Exception as e:
logger.error(f"Error getting tags: {e}")
return []
def create_category(self, name: str, description: str = "") -> Optional[Dict[str, Any]]:
"""Create a new category."""
try:
data = {
'name': name,
'description': description
}
result = self._make_request('POST', 'categories', json=data)
if result:
logger.info(f"Created category: {name}")
return result
except Exception as e:
logger.error(f"Error creating category {name}: {e}")
return None
def create_tag(self, name: str, description: str = "") -> Optional[Dict[str, Any]]:
"""Create a new tag."""
try:
data = {
'name': name,
'description': description
}
result = self._make_request('POST', 'tags', json=data)
if result:
logger.info(f"Created tag: {name}")
return result
except Exception as e:
logger.error(f"Error creating tag {name}: {e}")
return None
def get_or_create_category(self, name: str, description: str = "") -> Optional[int]:
"""Get existing category or create new one."""
try:
# First, try to find existing category
categories = self.get_categories()
for category in categories:
if category['name'].lower() == name.lower():
logger.info(f"Found existing category: {name}")
return category['id']
# Create new category if not found
new_category = self.create_category(name, description)
if new_category:
return new_category['id']
return None
except Exception as e:
logger.error(f"Error getting or creating category {name}: {e}")
return None
def get_or_create_tag(self, name: str, description: str = "") -> Optional[int]:
"""Get existing tag or create new one."""
try:
# First, try to find existing tag
tags = self.get_tags()
for tag in tags:
if tag['name'].lower() == name.lower():
logger.info(f"Found existing tag: {name}")
return tag['id']
# Create new tag if not found
new_tag = self.create_tag(name, description)
if new_tag:
return new_tag['id']
return None
except Exception as e:
logger.error(f"Error getting or creating tag {name}: {e}")
return None
def upload_media(self, file_path: str, alt_text: str = "", title: str = "", caption: str = "", description: str = "") -> Optional[Dict[str, Any]]:
"""Upload media file to WordPress."""
try:
if not os.path.exists(file_path):
logger.error(f"Media file not found: {file_path}")
return None
# Get file info
file_name = os.path.basename(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type:
logger.error(f"Unable to determine MIME type for: {file_path}")
return None
# Prepare headers
headers = {
'Content-Disposition': f'attachment; filename="{file_name}"'
}
# Upload file
with open(file_path, 'rb') as file:
files = {'file': (file_name, file, mime_type)}
response = requests.post(
f"{self.api_base}/media",
auth=self.auth,
headers=headers,
files=files
)
if response.status_code == 201:
media_data = response.json()
media_id = media_data['id']
# Update media with metadata
update_data = {
'alt_text': alt_text,
'title': title,
'caption': caption,
'description': description
}
update_response = requests.post(
f"{self.api_base}/media/{media_id}",
auth=self.auth,
json=update_data
)
if update_response.status_code == 200:
logger.info(f"Media uploaded successfully: {file_name}")
return update_response.json()
else:
logger.warning(f"Media uploaded but metadata update failed: {update_response.text}")
return media_data
else:
logger.error(f"Media upload failed: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Error uploading media {file_path}: {e}")
return None
def compress_image(self, image_path: str, quality: int = 85) -> str:
"""Compress image for better upload performance."""
try:
if not os.path.exists(image_path):
raise ValueError(f"Image file not found: {image_path}")
original_size = os.path.getsize(image_path)
with Image.open(image_path) as img:
img_format = img.format or 'JPEG'
# Create temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f'.{img_format.lower()}')
# Save with compression
img.save(temp_file, format=img_format, quality=quality, optimize=True)
compressed_size = os.path.getsize(temp_file.name)
reduction = (1 - (compressed_size / original_size)) * 100
logger.info(f"Image compressed: {original_size/1024:.2f}KB -> {compressed_size/1024:.2f}KB ({reduction:.1f}% reduction)")
return temp_file.name
except Exception as e:
logger.error(f"Error compressing image {image_path}: {e}")
return image_path # Return original if compression fails
def _test_connection(self) -> bool:
"""Test WordPress site connection."""
try:
# Test with a simple API call
api_url = f"{self.api_base}/users/me"
response = requests.get(api_url, auth=self.auth, timeout=10)
if response.status_code == 200:
logger.info(f"WordPress connection test successful for {self.site_url}")
return True
else:
logger.warning(f"WordPress connection test failed for {self.site_url}: {response.status_code}")
return False
except Exception as e:
logger.error(f"WordPress connection test error for {self.site_url}: {e}")
return False
def create_post(self, title: str, content: str, excerpt: str = "",
featured_media_id: Optional[int] = None,
categories: Optional[List[int]] = None,
tags: Optional[List[int]] = None,
status: str = 'draft',
meta: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
"""Create a new WordPress post."""
try:
post_data = {
'title': title,
'content': content,
'excerpt': excerpt,
'status': status
}
if featured_media_id:
post_data['featured_media'] = featured_media_id
if categories:
post_data['categories'] = categories
if tags:
post_data['tags'] = tags
if meta:
post_data['meta'] = meta
result = self._make_request('POST', 'posts', json=post_data)
if result:
logger.info(f"Post created successfully: {title}")
return result
except Exception as e:
logger.error(f"Error creating post {title}: {e}")
return None
def update_post(self, post_id: int, **kwargs) -> Optional[Dict[str, Any]]:
"""Update an existing WordPress post."""
try:
result = self._make_request('POST', f'posts/{post_id}', json=kwargs)
if result:
logger.info(f"Post {post_id} updated successfully")
return result
except Exception as e:
logger.error(f"Error updating post {post_id}: {e}")
return None
def get_post(self, post_id: int) -> Optional[Dict[str, Any]]:
"""Get a specific WordPress post."""
try:
result = self._make_request('GET', f'posts/{post_id}')
return result
except Exception as e:
logger.error(f"Error getting post {post_id}: {e}")
return None
def delete_post(self, post_id: int, force: bool = False) -> bool:
"""Delete a WordPress post."""
try:
params = {'force': force} if force else {}
result = self._make_request('DELETE', f'posts/{post_id}', params=params)
if result:
logger.info(f"Post {post_id} deleted successfully")
return True
return False
except Exception as e:
logger.error(f"Error deleting post {post_id}: {e}")
return False