#!/usr/bin/env python3 """ Auto-Publish to Payload CMS Publishes blog posts to Payload CMS collections via REST API, commits to git, and triggers auto-deploy. """ import os import sys import subprocess import argparse import re import json import urllib.request import urllib.error from pathlib import Path from datetime import datetime from typing import Dict, Optional, List class PayloadPublisher: """Publish blog posts to Payload CMS via REST API""" def __init__(self, website_url: str, website_repo: str = None): """ Initialize Payload publisher Args: website_url: URL of the website (e.g., https://example.com) website_repo: Optional path to website repository for git sync """ self.website_url = website_url.rstrip("/") self.api_base = f"{self.website_url}/api" self.website_repo = website_repo self.collection = "posts" def _make_request( self, endpoint: str, method: str = "GET", data: dict = None, token: str = None ) -> Dict: """ Make HTTP request to Payload CMS API Args: endpoint: API endpoint path (e.g., '/posts' or '/globals/site') method: HTTP method data: JSON data to send token: Bearer token for authentication Returns: Response JSON as dict """ url = f"{self.api_base}{endpoint}" headers = { "Content-Type": "application/json", } if token: headers["Authorization"] = f"Bearer {token}" request_data = None if data: request_data = json.dumps(data).encode("utf-8") req = urllib.request.Request( url, data=request_data, headers=headers, method=method ) try: with urllib.request.urlopen(req, timeout=30) as response: response_body = response.read().decode("utf-8") if response_body: return json.loads(response_body) return {} except urllib.error.HTTPError as e: error_body = e.read().decode("utf-8") if e.fp else "{}" try: error_data = json.loads(error_body) return {"error": error_data.get("message", str(e)), "status": e.code} except: return {"error": str(e), "status": e.code} except urllib.error.URLError as e: return {"error": f"Connection failed: {e.reason}"} except Exception as e: return {"error": str(e)} def get_token(self, email: str, password: str) -> Optional[str]: """ Authenticate and get access token Args: email: User email password: User password Returns: Access token or None """ result = self._make_request( "/users/login", method="POST", data={"email": email, "password": password} ) if "token" in result: return result["token"] elif "error" in result: print(f" āœ— Auth failed: {result['error']}") return None return None def detect_language(self, content: str) -> str: """Detect if content is Thai or English""" thai_chars = sum(1 for c in content if "\u0e00" <= c <= "\u0e7f") total_chars = len(content) thai_ratio = thai_chars / total_chars if total_chars > 0 else 0 return "th" if thai_ratio > 0.3 else "en" def generate_slug(self, title: str, lang: str = "en") -> str: """Generate URL-friendly slug""" # Remove special characters slug = re.sub(r"[^\w\s-]", "", title.lower()) # Replace whitespace with hyphens slug = re.sub(r"[-\s]+", "-", slug) # Remove leading/trailing hyphens slug = slug.strip("-_") # Limit length return slug[:100] def parse_frontmatter(self, content: str) -> Dict: """Parse frontmatter from markdown content""" import yaml if not content.startswith("---"): return {} try: # Extract frontmatter parts = content.split("---", 2) if len(parts) >= 2: frontmatter = yaml.safe_load(parts[1]) return frontmatter or {} except: pass return {} def markdown_to_lexical(self, markdown: str) -> Dict: """ Convert markdown content to Lexical JSON format This creates a basic Lexical editor state from markdown. For full fidelity, consider using a proper markdown-to-lexical library. Args: markdown: Raw markdown content Returns: Lexical JSON object """ # Basic markdown to Lexical conversion # This creates a simple paragraph-based structure lines = markdown.split("\n") children = [] for line in lines: line = line.strip() if not line: children.append( {"type": "paragraph", "children": [{"type": "text", "text": ""}]} ) elif line.startswith("# "): # H1 children.append( { "type": "heading", "tag": "h1", "children": [{"type": "text", "text": line[2:]}], } ) elif line.startswith("## "): # H2 children.append( { "type": "heading", "tag": "h2", "children": [{"type": "text", "text": line[3:]}], } ) elif line.startswith("### "): # H3 children.append( { "type": "heading", "tag": "h3", "children": [{"type": "text", "text": line[4:]}], } ) elif line.startswith("- ") or line.startswith("* "): # Unordered list item children.append( { "type": "list", "listType": "bullet", "children": [ { "type": "listitem", "children": [{"type": "text", "text": line[2:]}], } ], } ) elif line.startswith("1. ") or line.startswith("1) "): # Ordered list item children.append( { "type": "list", "listType": "number", "children": [ { "type": "listitem", "children": [ { "type": "text", "text": re.sub(r"^\d+[.\)]\s*", "", line), } ], } ], } ) elif line.startswith("> "): # Blockquote children.append( {"type": "quote", "children": [{"type": "text", "text": line[2:]}]} ) elif line.startswith("```"): # Code block (simplified) children.append( {"type": "paragraph", "children": [{"type": "text", "text": line}]} ) else: # Regular paragraph # Handle bold and italic inline text = line children.append( {"type": "paragraph", "children": [{"type": "text", "text": text}]} ) return { "root": { "type": "root", "format": "", "indent": 0, "version": 1, "children": children, } } def publish( self, markdown_content: str, images: List[str] = None, use_git: bool = False, payload_token: str = None, ) -> Dict: """ Publish blog post to Payload CMS Args: markdown_content: Full markdown with frontmatter images: List of image paths to upload use_git: Whether to git commit and push (default: False) payload_token: Payload CMS access token Returns: Publication result """ try: # Parse frontmatter frontmatter = self.parse_frontmatter(markdown_content) # Get required fields title = frontmatter.get("title", "Untitled") slug = frontmatter.get("slug") or self.generate_slug(title) lang = frontmatter.get("lang") or self.detect_language(markdown_content) status = frontmatter.get("status", "draft") description = frontmatter.get("description", "") # Extract markdown body (after frontmatter) body_content = markdown_content if markdown_content.startswith("---"): parts = markdown_content.split("---", 2) if len(parts) >= 3: body_content = parts[2].strip() # Convert markdown to Lexical lexical_content = self.markdown_to_lexical(body_content) # Prepare Payload CMS document payload_doc = { "title": title, "slug": slug, "content": lexical_content, "status": status, "publishedAt": datetime.now().isoformat() if status == "published" else None, } if description: payload_doc["description"] = description # Add language prefix to slug for Thai content if lang == "th": payload_doc["slug"] = f"th/{slug}" print(f"\nšŸ“ Publishing to Payload CMS") print(f" Title: {title}") print(f" Slug: {payload_doc['slug']}") print(f" Language: {lang}") print(f" Status: {status}") # Send to Payload CMS API if payload_token: headers = {"Authorization": f"Bearer {payload_token}"} else: headers = {} # Check if post already exists check_result = self._make_request( f"/{self.collection}?where[slug][equals]={payload_doc['slug']}", method="GET", token=payload_token, ) existing_doc = None if check_result.get("docs") and len(check_result["docs"]) > 0: existing_doc = check_result["docs"][0] print(f" ā„¹ļø Post already exists with ID: {existing_doc['id']}") # Create or update the post if existing_doc: # Update existing result = self._make_request( f"/{self.collection}/{existing_doc['id']}", method="PATCH", data=payload_doc, token=payload_token, ) action = "updated" else: # Create new result = self._make_request( f"/{self.collection}", method="POST", data=payload_doc, token=payload_token, ) action = "created" if "error" in result and "id" not in result: return { "success": False, "error": result.get("error", "Unknown error"), "status_code": result.get("status", 500), } doc_id = result.get("doc", result.get("id")) print(f" āœ“ Post {action}: {doc_id}") # Upload images if provided (to media collection) uploaded_images = [] if images: for img_path in images: if os.path.exists(img_path): uploaded = self._upload_media(img_path, payload_token) if uploaded: uploaded_images.append(uploaded) print(f" āœ“ Uploaded image: {os.path.basename(img_path)}") # Git commit and push (OPTIONAL) git_result = None if use_git and self.website_repo: git_result = self._git_commit_and_push(payload_doc["slug"], lang) elif use_git: print(f" ā„¹ļø Direct write complete (no git repo configured)") return { "success": True, "id": doc_id, "slug": payload_doc["slug"], "language": lang, "action": action, "api_url": f"{self.api_base}/{self.collection}", "admin_url": f"{self.website_url}/admin/collections/{self.collection}/{doc_id}", "git_result": git_result, "method": "api" + (" + git" if use_git else ""), "images_uploaded": len(uploaded_images), } except Exception as e: return {"success": False, "error": str(e)} def _upload_media(self, file_path: str, token: str = None) -> Optional[Dict]: """ Upload media file to Payload CMS media collection Args: file_path: Path to the image file token: Bearer token Returns: Uploaded media document or None """ import mimetypes filename = os.path.basename(file_path) mime_type, _ = mimetypes.guess_type(file_path) try: with open(file_path, "rb") as f: file_data = f.read() # Build multipart form data boundary = "----FormBoundary7MA4YWxkTrZu0gW" body_parts = [] # Add filename field body_parts.append(f"--{boundary}\r\n".encode()) body_parts.append( f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'.encode() ) body_parts.append( f"Content-Type: {mime_type or 'application/octet-stream'}\r\n\r\n".encode() ) body_parts.append(file_data) body_parts.append(b"\r\n") # Add alt text body_parts.append(f"--{boundary}\r\n".encode()) body_parts.append( f'Content-Disposition: form-data; name="alt"\r\n\r\n'.encode() ) body_parts.append(filename.encode()) body_parts.append(b"\r\n") # Close boundary body_parts.append(f"--{boundary}--\r\n".encode()) body = b"".join(body_parts) url = f"{self.api_base}/media" headers = { "Content-Type": f"multipart/form-data; boundary={boundary}", } if token: headers["Authorization"] = f"Bearer {token}" req = urllib.request.Request(url, data=body, headers=headers, method="POST") with urllib.request.urlopen(req, timeout=60) as response: result = json.loads(response.read().decode("utf-8")) return result except Exception as e: print(f" āœ— Failed to upload {filename}: {e}") return None def _git_commit_and_push(self, slug: str, lang: str) -> Dict: """Commit and push changes to git""" if not self.website_repo: return {"success": False, "error": "No git repository configured"} try: # Check if git repo if not os.path.exists(os.path.join(self.website_repo, ".git")): return {"success": False, "error": "Not a git repository"} # Git add subprocess.run( ["git", "add", "."], cwd=self.website_repo, check=True, capture_output=True, ) # Git commit message = f"Add blog post: {slug} ({lang})" result = subprocess.run( ["git", "commit", "-m", message], cwd=self.website_repo, capture_output=True, ) if result.returncode != 0: # Check if there's nothing to commit if "nothing to commit" in result.stderr.decode(): print(f" ā„¹ļø Nothing to commit (no changes)") return {"success": True, "message": "nothing to commit"} return {"success": False, "error": result.stderr.decode()} # Git push subprocess.run( ["git", "push"], cwd=self.website_repo, check=True, capture_output=True ) print(f" āœ“ Committed: {message}") print(f" āœ“ Pushed to remote") return { "success": True, "commit_message": message, "triggered_deploy": True, } except subprocess.CalledProcessError as e: print(f" āœ— Git error: {e.stderr.decode() if e.stderr else str(e)}") return {"success": False, "error": "Git operation failed"} except Exception as e: print(f" āœ— Error: {e}") return {"success": False, "error": str(e)} def main(): """Main entry point""" parser = argparse.ArgumentParser(description="Publish to Payload CMS") parser.add_argument("--file", required=True, help="Markdown file to publish") parser.add_argument( "--website-url", required=True, help="Website URL (e.g., https://example.com)" ) parser.add_argument("--website-repo", help="Path to website repo (for git sync)") parser.add_argument("--email", help="Payload CMS email for authentication") parser.add_argument("--password", help="Payload CMS password for authentication") parser.add_argument( "--token", help="Payload CMS access token (alternative to email/password)" ) parser.add_argument("--image", action="append", help="Image files to upload") parser.add_argument( "--use-git", action="store_true", help="Use git commit/push (default: False)" ) args = parser.parse_args() print(f"\nšŸ“ Publishing to Payload CMS\n") # Get authentication token payload_token = args.token if not payload_token and args.email and args.password: # First, try to get token via the website's login endpoint # For now, require token directly - in future could add auto-login print("āš ļø Token-based auth required. Use --token or set PAYLOAD_TOKEN env var.") payload_token = os.environ.get("PAYLOAD_TOKEN") if not payload_token: print("āŒ Error: --token required or set PAYLOAD_TOKEN environment variable") sys.exit(1) # Read markdown file with open(args.file, "r", encoding="utf-8") as f: content = f.read() # Publish publisher = PayloadPublisher(args.website_url, args.website_repo) result = publisher.publish( content, images=args.image, use_git=args.use_git, payload_token=payload_token ) if result["success"]: print(f"\nāœ… Published successfully!") print(f" ID: {result['id']}") print(f" Slug: {result['slug']}") print(f" Language: {result['language']}") print(f" Method: {result['method']}") print(f" Admin: {result['admin_url']}") if result.get("images_uploaded", 0) > 0: print(f" Images: {result['images_uploaded']} uploaded") if result.get("git_result") and result["git_result"].get("success"): print(f" āœ“ Committed and pushed to Gitea") print(f" āœ“ Deployment triggered") else: print(f"\nāŒ Publication failed: {result.get('error')}") if result.get("status_code"): print(f" Status code: {result['status_code']}") if __name__ == "__main__": main()