Files
ALwrity/backend/services/integrations/wordpress_service.py

286 lines
11 KiB
Python

"""
WordPress Service for ALwrity
Handles WordPress site connections, content publishing, and media management.
"""
import os
import json
import sqlite3
import base64
import mimetypes
import tempfile
from typing import Optional, Dict, List, Any, Tuple
from datetime import datetime
import requests
from requests.auth import HTTPBasicAuth
from PIL import Image
from loguru import logger
from services.database import get_user_db_path
class WordPressService:
"""Service for WordPress integration."""
def __init__(self, db_path: str = None):
# db_path is deprecated in favor of dynamic user_id based paths
self.db_path = db_path
self.api_version = "v2"
# self._ensure_tables() # Deferred to per-user calls
def _get_db_path(self, user_id: str) -> str:
return get_user_db_path(user_id)
def _ensure_tables(self, user_id: str) -> None:
"""Ensure required database tables exist."""
try:
db_path = self._get_db_path(user_id)
os.makedirs(os.path.dirname(db_path), exist_ok=True)
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
# WordPress sites table
cursor.execute('''
CREATE TABLE IF NOT EXISTS wordpress_sites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
site_url TEXT NOT NULL,
site_name TEXT,
username TEXT NOT NULL,
app_password TEXT NOT NULL,
is_active BOOLEAN DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, site_url)
)
''')
# WordPress posts table for tracking published content
cursor.execute('''
CREATE TABLE IF NOT EXISTS wordpress_posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
site_id INTEGER NOT NULL,
wp_post_id INTEGER NOT NULL,
title TEXT NOT NULL,
status TEXT DEFAULT 'draft',
published_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (site_id) REFERENCES wordpress_sites (id)
)
''')
conn.commit()
# logger.info("WordPress database tables ensured")
except Exception as e:
logger.error(f"Error ensuring WordPress tables for user {user_id}: {e}")
raise
def add_site(self, user_id: str, site_url: str, site_name: str, username: str, app_password: str) -> bool:
"""Add a new WordPress site connection."""
try:
# Validate site URL format
if not site_url.startswith(('http://', 'https://')):
site_url = f"https://{site_url}"
# Test connection before saving
if not self._test_connection(site_url, username, app_password):
logger.error(f"Failed to connect to WordPress site: {site_url}")
return False
self._ensure_tables(user_id)
db_path = self._get_db_path(user_id)
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO wordpress_sites
(user_id, site_url, site_name, username, app_password, updated_at)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
''', (user_id, site_url, site_name, username, app_password))
conn.commit()
logger.info(f"WordPress site added for user {user_id}: {site_name}")
return True
except Exception as e:
logger.error(f"Error adding WordPress site: {e}")
return False
def get_user_sites(self, user_id: str) -> List[Dict[str, Any]]:
"""Get all WordPress sites for a user."""
try:
db_path = self._get_db_path(user_id)
if not os.path.exists(db_path):
return []
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
# Check if table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='wordpress_sites'")
if not cursor.fetchone():
return []
cursor.execute('''
SELECT id, site_url, site_name, username, is_active, created_at, updated_at
FROM wordpress_sites
WHERE user_id = ? AND is_active = 1
ORDER BY updated_at DESC
''', (user_id,))
sites = []
for row in cursor.fetchall():
sites.append({
'id': row[0],
'site_url': row[1],
'site_name': row[2],
'username': row[3],
'is_active': bool(row[4]),
'created_at': row[5],
'updated_at': row[6]
})
logger.info(f"Retrieved {len(sites)} WordPress sites for user {user_id}")
return sites
except Exception as e:
logger.error(f"Error getting WordPress sites for user {user_id}: {e}")
return []
def get_site_credentials(self, user_id: str, site_id: int) -> Optional[Dict[str, str]]:
"""Get credentials for a specific WordPress site."""
try:
db_path = self._get_db_path(user_id)
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT site_url, username, app_password
FROM wordpress_sites
WHERE id = ? AND user_id = ? AND is_active = 1
''', (site_id, user_id))
result = cursor.fetchone()
if result:
return {
'site_url': result[0],
'username': result[1],
'app_password': result[2]
}
return None
except Exception as e:
logger.error(f"Error getting credentials for site {site_id}: {e}")
return None
def _test_connection(self, site_url: str, username: str, app_password: str) -> bool:
"""Test WordPress site connection."""
try:
# Test with a simple API call
api_url = f"{site_url}/wp-json/wp/v2/users/me"
response = requests.get(api_url, auth=HTTPBasicAuth(username, app_password), timeout=10)
if response.status_code == 200:
logger.info(f"WordPress connection test successful for {site_url}")
return True
else:
logger.warning(f"WordPress connection test failed for {site_url}: {response.status_code}")
return False
except Exception as e:
logger.error(f"WordPress connection test error for {site_url}: {e}")
return False
def disconnect_site(self, user_id: str, site_id: int) -> bool:
"""Disconnect a WordPress site."""
try:
db_path = self._get_db_path(user_id)
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
UPDATE wordpress_sites
SET is_active = 0, updated_at = CURRENT_TIMESTAMP
WHERE id = ? AND user_id = ?
''', (site_id, user_id))
conn.commit()
logger.info(f"WordPress site {site_id} disconnected for user {user_id}")
return True
except Exception as e:
logger.error(f"Error disconnecting WordPress site {site_id}: {e}")
return False
def get_site_info(self, user_id: str, site_id: int) -> Optional[Dict[str, Any]]:
"""Get detailed information about a WordPress site."""
try:
credentials = self.get_site_credentials(user_id, site_id)
if not credentials:
return None
site_url = credentials['site_url']
username = credentials['username']
app_password = credentials['app_password']
# Get site information
info = {
'site_url': site_url,
'username': username,
'api_version': self.api_version
}
# Test connection and get basic info
if self._test_connection(site_url, username, app_password):
info['connected'] = True
info['last_checked'] = datetime.now().isoformat()
else:
info['connected'] = False
info['last_checked'] = datetime.now().isoformat()
return info
except Exception as e:
logger.error(f"Error getting site info for {site_id}: {e}")
return None
def get_posts_for_all_sites(self, user_id: str) -> List[Dict[str, Any]]:
"""Get all tracked WordPress posts for all sites of a user."""
db_path = self._get_db_path(user_id)
if not os.path.exists(db_path):
return []
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
# Check if table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='wordpress_posts'")
if not cursor.fetchone():
return []
cursor.execute('''
SELECT wp.id, wp.wp_post_id, wp.title, wp.status, wp.published_at, wp.created_at,
ws.site_name, ws.site_url
FROM wordpress_posts wp
JOIN wordpress_sites ws ON wp.site_id = ws.id
WHERE wp.user_id = ? AND ws.is_active = 1
ORDER BY wp.published_at DESC
''', (user_id,))
posts = []
for post_data in cursor.fetchall():
posts.append({
"id": post_data[0],
"wp_post_id": post_data[1],
"title": post_data[2],
"status": post_data[3],
"published_at": post_data[4],
"created_at": post_data[5],
"site_name": post_data[6],
"site_url": post_data[7]
})
return posts
except Exception as e:
logger.error(f"Error getting posts for user {user_id}: {e}")
return []