530 lines
21 KiB
Python
530 lines
21 KiB
Python
"""
|
|
Wix Integration Service
|
|
|
|
Handles authentication, permission checking, and blog publishing to Wix websites.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import requests
|
|
import uuid
|
|
from typing import Dict, Any, Optional, List
|
|
from loguru import logger
|
|
from datetime import datetime, timedelta
|
|
import base64
|
|
from urllib.parse import urlencode, parse_qs
|
|
import jwt
|
|
import base64 as b64
|
|
from services.integrations.wix.blog import WixBlogService
|
|
from services.integrations.wix.media import WixMediaService
|
|
from services.integrations.wix.utils import extract_meta_from_token, normalize_token_string, extract_member_id_from_access_token as utils_extract_member
|
|
from services.integrations.wix.content import convert_content_to_ricos as ricos_builder
|
|
from services.integrations.wix.auth import WixAuthService
|
|
from services.integrations.wix.seo import build_seo_data
|
|
from services.integrations.wix.ricos_converter import markdown_to_html, convert_via_wix_api
|
|
from services.integrations.wix.blog_publisher import create_blog_post as publish_blog_post
|
|
|
|
class WixService:
|
|
"""Service for interacting with Wix APIs"""
|
|
|
|
def __init__(self):
|
|
self.client_id = os.getenv('WIX_CLIENT_ID')
|
|
self.redirect_uri = os.getenv('WIX_REDIRECT_URI', 'https://alwrity-ai.vercel.app/wix/callback')
|
|
self.base_url = 'https://www.wixapis.com'
|
|
self.oauth_url = 'https://www.wix.com/oauth/authorize'
|
|
# Modular services
|
|
self.blog_service = WixBlogService(self.base_url, self.client_id)
|
|
self.media_service = WixMediaService(self.base_url)
|
|
self.auth_service = WixAuthService(self.client_id, self.redirect_uri, self.base_url)
|
|
|
|
if not self.client_id:
|
|
logger.warning("Wix client ID not configured. Set WIX_CLIENT_ID environment variable.")
|
|
|
|
def get_authorization_url(self, state: str = None) -> str:
|
|
"""
|
|
Generate Wix OAuth authorization URL for "on behalf of user" authentication
|
|
|
|
This implements the "Authenticate on behalf of a Wix User" flow as described in:
|
|
https://dev.wix.com/docs/build-apps/develop-your-app/access/authentication/authenticate-on-behalf-of-a-wix-user
|
|
|
|
Args:
|
|
state: Optional state parameter for security
|
|
|
|
Returns:
|
|
Authorization URL for user to visit
|
|
"""
|
|
url, code_verifier = self.auth_service.generate_authorization_url(state)
|
|
self._code_verifier = code_verifier
|
|
return url
|
|
|
|
def _create_redirect_session_for_auth(self, redirect_uri: str, client_id: str, code_challenge: str, state: str) -> str:
|
|
"""
|
|
Create a redirect session for Wix Headless OAuth authentication using Redirects API
|
|
|
|
Args:
|
|
redirect_uri: The redirect URI for OAuth callback
|
|
client_id: The OAuth client ID
|
|
code_challenge: The PKCE code challenge
|
|
state: The OAuth state parameter
|
|
|
|
Returns:
|
|
The redirect URL for OAuth authentication
|
|
"""
|
|
try:
|
|
# According to Wix documentation, we need to use the Redirects API
|
|
# to create a redirect session for OAuth authentication
|
|
# This is the correct approach for Wix Headless OAuth
|
|
|
|
# For now, return the direct OAuth URL as a fallback
|
|
# In production, this should call the Wix Redirects API
|
|
scope = (
|
|
"BLOG.CREATE-DRAFT,BLOG.PUBLISH-POST,BLOG.READ-CATEGORY," \
|
|
"BLOG.CREATE-CATEGORY,BLOG.READ-TAG,BLOG.CREATE-TAG," \
|
|
"MEDIA.SITE_MEDIA_FILES_IMPORT"
|
|
)
|
|
redirect_url = (
|
|
"https://www.wix.com/oauth/authorize?client_id="
|
|
f"{client_id}&redirect_uri={redirect_uri}&response_type=code"
|
|
f"&scope={scope}&code_challenge={code_challenge}"
|
|
f"&code_challenge_method=S256&state={state}"
|
|
)
|
|
|
|
logger.info(f"Generated Wix Headless OAuth redirect URL: {redirect_url}")
|
|
logger.warning("Using direct OAuth URL - should implement Redirects API for production")
|
|
return redirect_url
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to create redirect session for auth: {e}")
|
|
raise
|
|
|
|
def exchange_code_for_tokens(self, code: str, code_verifier: str = None) -> Dict[str, Any]:
|
|
"""
|
|
Exchange authorization code for access and refresh tokens using PKCE
|
|
|
|
Args:
|
|
code: Authorization code from Wix
|
|
code_verifier: PKCE code verifier (uses stored one if not provided)
|
|
|
|
Returns:
|
|
Token response with access_token, refresh_token, etc.
|
|
"""
|
|
if not self.client_id:
|
|
raise ValueError("Wix client ID not configured")
|
|
if not code_verifier:
|
|
code_verifier = getattr(self, '_code_verifier', None)
|
|
if not code_verifier:
|
|
raise ValueError("Code verifier not found. Please provide code_verifier parameter.")
|
|
try:
|
|
return self.auth_service.exchange_code_for_tokens(code, code_verifier)
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to exchange code for tokens: {e}")
|
|
raise
|
|
|
|
def refresh_access_token(self, refresh_token: str) -> Dict[str, Any]:
|
|
"""
|
|
Refresh access token using refresh token (Wix Headless OAuth)
|
|
|
|
Args:
|
|
refresh_token: Valid refresh token
|
|
|
|
Returns:
|
|
New token response
|
|
"""
|
|
if not self.client_id:
|
|
raise ValueError("Wix client ID not configured")
|
|
try:
|
|
return self.auth_service.refresh_access_token(refresh_token)
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to refresh access token: {e}")
|
|
raise
|
|
|
|
def get_site_info(self, access_token: str) -> Dict[str, Any]:
|
|
"""
|
|
Get information about the connected Wix site
|
|
|
|
Args:
|
|
access_token: Valid access token
|
|
|
|
Returns:
|
|
Site information
|
|
"""
|
|
token_str = normalize_token_string(access_token)
|
|
if not token_str:
|
|
raise ValueError("Invalid access token format for create_blog_post")
|
|
try:
|
|
return self.auth_service.get_site_info(token_str)
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to get site info: {e}")
|
|
raise
|
|
|
|
def get_current_member(self, access_token: str) -> Dict[str, Any]:
|
|
"""
|
|
Get current member information (for third-party apps)
|
|
|
|
Args:
|
|
access_token: Valid access token
|
|
|
|
Returns:
|
|
Current member information
|
|
"""
|
|
token_str = normalize_token_string(access_token)
|
|
if not token_str:
|
|
raise ValueError("Invalid access token format for get_current_member")
|
|
try:
|
|
return self.auth_service.get_current_member(token_str, self.client_id)
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to get current member: {e}")
|
|
raise
|
|
|
|
def extract_member_id_from_access_token(self, access_token: Any) -> Optional[str]:
|
|
return utils_extract_member(access_token)
|
|
|
|
def _normalize_token_string(self, access_token: Any) -> Optional[str]:
|
|
return normalize_token_string(access_token)
|
|
|
|
def check_blog_permissions(self, access_token: str) -> Dict[str, Any]:
|
|
"""
|
|
Check if the app has required blog permissions
|
|
|
|
Args:
|
|
access_token: Valid access token
|
|
|
|
Returns:
|
|
Permission status
|
|
"""
|
|
headers = {
|
|
'Authorization': f'Bearer {access_token}',
|
|
'Content-Type': 'application/json',
|
|
'wix-client-id': self.client_id or ''
|
|
}
|
|
|
|
try:
|
|
# Try to list blog categories to check permissions
|
|
response = requests.get(
|
|
f"{self.base_url}/blog/v1/categories",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
return {
|
|
'has_permissions': True,
|
|
'can_create_posts': True,
|
|
'can_publish': True
|
|
}
|
|
elif response.status_code == 403:
|
|
return {
|
|
'has_permissions': False,
|
|
'can_create_posts': False,
|
|
'can_publish': False,
|
|
'error': 'Insufficient permissions'
|
|
}
|
|
else:
|
|
response.raise_for_status()
|
|
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to check blog permissions: {e}")
|
|
return {
|
|
'has_permissions': False,
|
|
'error': str(e)
|
|
}
|
|
|
|
def import_image_to_wix(self, access_token: str, image_url: str, display_name: str = None) -> str:
|
|
"""
|
|
Import external image to Wix Media Manager
|
|
|
|
Args:
|
|
access_token: Valid access token
|
|
image_url: URL of the image to import
|
|
display_name: Optional display name for the image
|
|
|
|
Returns:
|
|
Wix media ID
|
|
"""
|
|
try:
|
|
result = self.media_service.import_image(
|
|
access_token,
|
|
image_url,
|
|
display_name or f'Imported Image {datetime.now().strftime("%Y%m%d_%H%M%S")}'
|
|
)
|
|
return result['file']['id']
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to import image to Wix: {e}")
|
|
raise
|
|
|
|
def convert_content_to_ricos(self, content: str, images: List[str] = None,
|
|
use_wix_api: bool = False, access_token: str = None) -> Dict[str, Any]:
|
|
"""
|
|
Convert markdown content to Ricos JSON format.
|
|
|
|
Args:
|
|
content: Markdown content to convert
|
|
images: Optional list of image URLs
|
|
use_wix_api: If True, use Wix's official Ricos Documents API (requires access_token)
|
|
access_token: Wix access token (required if use_wix_api=True)
|
|
|
|
Returns:
|
|
Ricos JSON document
|
|
"""
|
|
if use_wix_api and access_token:
|
|
try:
|
|
return convert_via_wix_api(content, access_token, self.base_url)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to convert via Wix API, falling back to custom parser: {e}")
|
|
# Fall back to custom parser
|
|
|
|
# Use custom parser (current implementation)
|
|
return ricos_builder(content, images)
|
|
|
|
|
|
def create_blog_post(self, access_token: str, title: str, content: str,
|
|
cover_image_url: str = None, category_ids: List[str] = None,
|
|
tag_ids: List[str] = None, publish: bool = True,
|
|
member_id: str = None, seo_metadata: Dict[str, Any] = None) -> Dict[str, Any]:
|
|
"""
|
|
Create and optionally publish a blog post on Wix
|
|
|
|
Args:
|
|
access_token: Valid access token
|
|
title: Blog post title
|
|
content: Blog post content
|
|
cover_image_url: Optional cover image URL
|
|
category_ids: Optional list of category IDs
|
|
tag_ids: Optional list of tag IDs
|
|
publish: Whether to publish immediately or save as draft
|
|
member_id: Required for third-party apps - the member ID of the post author
|
|
seo_metadata: Optional SEO metadata dict with fields like:
|
|
- seo_title: SEO optimized title
|
|
- meta_description: Meta description
|
|
- focus_keyword: Main keyword
|
|
- blog_tags: List of tag strings (for keywords)
|
|
- open_graph: Open Graph data
|
|
- canonical_url: Canonical URL
|
|
|
|
Returns:
|
|
Created blog post information
|
|
"""
|
|
# Normalize access token to string to avoid type issues (can be dict/int from storage)
|
|
from services.integrations.wix.utils import normalize_token_string
|
|
normalized_token = normalize_token_string(access_token)
|
|
if normalized_token:
|
|
token_to_use = normalized_token.strip()
|
|
else:
|
|
token_to_use = str(access_token).strip() if access_token is not None else ""
|
|
|
|
if not token_to_use:
|
|
raise ValueError("access_token is required to create a blog post")
|
|
|
|
return publish_blog_post(
|
|
blog_service=self.blog_service,
|
|
access_token=token_to_use,
|
|
title=title,
|
|
content=content,
|
|
member_id=member_id,
|
|
cover_image_url=cover_image_url,
|
|
category_ids=category_ids,
|
|
tag_ids=tag_ids,
|
|
publish=publish,
|
|
seo_metadata=seo_metadata,
|
|
import_image_func=self.import_image_to_wix,
|
|
lookup_categories_func=self.lookup_or_create_categories,
|
|
lookup_tags_func=self.lookup_or_create_tags,
|
|
base_url=self.base_url
|
|
)
|
|
|
|
def get_blog_categories(self, access_token: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get available blog categories
|
|
|
|
Args:
|
|
access_token: Valid access token
|
|
|
|
Returns:
|
|
List of blog categories
|
|
"""
|
|
try:
|
|
return self.blog_service.list_categories(access_token)
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to get blog categories: {e}")
|
|
raise
|
|
|
|
def get_blog_tags(self, access_token: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get available blog tags
|
|
|
|
Args:
|
|
access_token: Valid access token
|
|
|
|
Returns:
|
|
List of blog tags
|
|
"""
|
|
try:
|
|
return self.blog_service.list_tags(access_token)
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to get blog tags: {e}")
|
|
raise
|
|
|
|
def lookup_or_create_categories(self, access_token: str, category_names: List[str],
|
|
extra_headers: Optional[Dict[str, str]] = None) -> List[str]:
|
|
"""
|
|
Lookup existing categories by name or create new ones, return their IDs.
|
|
|
|
Args:
|
|
access_token: Valid access token
|
|
category_names: List of category name strings
|
|
extra_headers: Optional extra headers (e.g., wix-site-id)
|
|
|
|
Returns:
|
|
List of category UUIDs
|
|
"""
|
|
if not category_names:
|
|
return []
|
|
|
|
try:
|
|
# Get existing categories
|
|
existing_categories = self.blog_service.list_categories(access_token, extra_headers)
|
|
|
|
# Create name -> ID mapping (case-insensitive)
|
|
category_map = {}
|
|
for cat in existing_categories:
|
|
cat_label = cat.get('label', '').strip()
|
|
cat_id = cat.get('id')
|
|
if cat_label and cat_id:
|
|
category_map[cat_label.lower()] = cat_id
|
|
|
|
category_ids = []
|
|
for category_name in category_names:
|
|
category_name_clean = str(category_name).strip()
|
|
if not category_name_clean:
|
|
continue
|
|
|
|
# Lookup existing category (case-insensitive)
|
|
category_id = category_map.get(category_name_clean.lower())
|
|
|
|
if not category_id:
|
|
# Create new category
|
|
try:
|
|
logger.info(f"Creating new category: {category_name_clean}")
|
|
result = self.blog_service.create_category(
|
|
access_token,
|
|
label=category_name_clean,
|
|
extra_headers=extra_headers
|
|
)
|
|
new_category = result.get('category', {})
|
|
category_id = new_category.get('id')
|
|
if category_id:
|
|
category_ids.append(category_id)
|
|
# Update map to avoid duplicate creates
|
|
category_map[category_name_clean.lower()] = category_id
|
|
logger.info(f"Created category '{category_name_clean}' with ID: {category_id}")
|
|
except Exception as create_error:
|
|
logger.warning(f"Failed to create category '{category_name_clean}': {create_error}")
|
|
# Continue with other categories
|
|
else:
|
|
category_ids.append(category_id)
|
|
logger.info(f"Found existing category '{category_name_clean}' with ID: {category_id}")
|
|
|
|
return category_ids
|
|
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to lookup/create categories: {e}")
|
|
return []
|
|
|
|
def lookup_or_create_tags(self, access_token: str, tag_names: List[str],
|
|
extra_headers: Optional[Dict[str, str]] = None) -> List[str]:
|
|
"""
|
|
Lookup existing tags by name or create new ones, return their IDs.
|
|
|
|
Args:
|
|
access_token: Valid access token
|
|
tag_names: List of tag name strings
|
|
extra_headers: Optional extra headers (e.g., wix-site-id)
|
|
|
|
Returns:
|
|
List of tag UUIDs
|
|
"""
|
|
if not tag_names:
|
|
return []
|
|
|
|
try:
|
|
# Get existing tags
|
|
existing_tags = self.blog_service.list_tags(access_token, extra_headers)
|
|
|
|
# Create name -> ID mapping (case-insensitive)
|
|
tag_map = {}
|
|
for tag in existing_tags:
|
|
tag_label = tag.get('label', '').strip()
|
|
tag_id = tag.get('id')
|
|
if tag_label and tag_id:
|
|
tag_map[tag_label.lower()] = tag_id
|
|
|
|
tag_ids = []
|
|
for tag_name in tag_names:
|
|
tag_name_clean = str(tag_name).strip()
|
|
if not tag_name_clean:
|
|
continue
|
|
|
|
# Lookup existing tag (case-insensitive)
|
|
tag_id = tag_map.get(tag_name_clean.lower())
|
|
|
|
if not tag_id:
|
|
# Create new tag
|
|
try:
|
|
logger.info(f"Creating new tag: {tag_name_clean}")
|
|
result = self.blog_service.create_tag(
|
|
access_token,
|
|
label=tag_name_clean,
|
|
extra_headers=extra_headers
|
|
)
|
|
new_tag = result.get('tag', {})
|
|
tag_id = new_tag.get('id')
|
|
if tag_id:
|
|
tag_ids.append(tag_id)
|
|
# Update map to avoid duplicate creates
|
|
tag_map[tag_name_clean.lower()] = tag_id
|
|
logger.info(f"Created tag '{tag_name_clean}' with ID: {tag_id}")
|
|
except Exception as create_error:
|
|
logger.warning(f"Failed to create tag '{tag_name_clean}': {create_error}")
|
|
# Continue with other tags
|
|
else:
|
|
tag_ids.append(tag_id)
|
|
logger.info(f"Found existing tag '{tag_name_clean}' with ID: {tag_id}")
|
|
|
|
return tag_ids
|
|
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to lookup/create tags: {e}")
|
|
return []
|
|
|
|
def publish_draft_post(self, access_token: str, draft_post_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Publish a draft post by ID.
|
|
"""
|
|
try:
|
|
result = self.blog_service.publish_draft(access_token, draft_post_id)
|
|
logger.info(f"DEBUG: Publish result: {result}")
|
|
return result
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to publish draft post: {e}")
|
|
raise
|
|
|
|
def create_category(self, access_token: str, label: str, description: Optional[str] = None,
|
|
language: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Create a blog category.
|
|
"""
|
|
try:
|
|
return self.blog_service.create_category(access_token, label, description, language)
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to create category: {e}")
|
|
raise
|
|
|
|
def create_tag(self, access_token: str, label: str, language: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Create a blog tag.
|
|
"""
|
|
try:
|
|
return self.blog_service.create_tag(access_token, label, language)
|
|
except requests.RequestException as e:
|
|
logger.error(f"Failed to create tag: {e}")
|
|
raise
|