Major changes: - Replace Payload CMS with Tina CMS (self-hosted) - Add Astro DB for consent logging (PDPA compliant) - Update Tailwind v3 to v4 (@tailwindcss/vite plugin) - Add astro-tina-starter template - Rewrite consent template for Astro (ConsentBanner.astro, Astro DB, Nano Stores) - Add install-tina-backend.sh for self-hosted Tina per customer - Rename convert-astro.sh to migrate-tina.sh - Add AGENTS.md template for generated websites - Delete all Payload/Next.js files Technical updates: - Astro DB using defineDb with eq operators for queries - Tailwind v4 with @theme block - Tina CMS local development mode - Proper Astro API routes for consent Research-verified with official documentation (April 2026)
590 lines
20 KiB
Python
590 lines
20 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Auto-Publish to Payload CMS
|
||
|
||
Publishes blog posts to Payload CMS collections via REST API,
|
||
commits to git, and triggers auto-deploy.
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import subprocess
|
||
import argparse
|
||
import re
|
||
import json
|
||
import urllib.request
|
||
import urllib.error
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
from typing import Dict, Optional, List
|
||
|
||
|
||
class PayloadPublisher:
|
||
"""Publish blog posts to Payload CMS via REST API"""
|
||
|
||
def __init__(self, website_url: str, website_repo: str = None):
|
||
"""
|
||
Initialize Payload publisher
|
||
|
||
Args:
|
||
website_url: URL of the website (e.g., https://example.com)
|
||
website_repo: Optional path to website repository for git sync
|
||
"""
|
||
self.website_url = website_url.rstrip("/")
|
||
self.api_base = f"{self.website_url}/api"
|
||
self.website_repo = website_repo
|
||
self.collection = "posts"
|
||
|
||
def _make_request(
|
||
self, endpoint: str, method: str = "GET", data: dict = None, token: str = None
|
||
) -> Dict:
|
||
"""
|
||
Make HTTP request to Payload CMS API
|
||
|
||
Args:
|
||
endpoint: API endpoint path (e.g., '/posts' or '/globals/site')
|
||
method: HTTP method
|
||
data: JSON data to send
|
||
token: Bearer token for authentication
|
||
|
||
Returns:
|
||
Response JSON as dict
|
||
"""
|
||
url = f"{self.api_base}{endpoint}"
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
}
|
||
|
||
if token:
|
||
headers["Authorization"] = f"Bearer {token}"
|
||
|
||
request_data = None
|
||
if data:
|
||
request_data = json.dumps(data).encode("utf-8")
|
||
|
||
req = urllib.request.Request(
|
||
url, data=request_data, headers=headers, method=method
|
||
)
|
||
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=30) as response:
|
||
response_body = response.read().decode("utf-8")
|
||
if response_body:
|
||
return json.loads(response_body)
|
||
return {}
|
||
except urllib.error.HTTPError as e:
|
||
error_body = e.read().decode("utf-8") if e.fp else "{}"
|
||
try:
|
||
error_data = json.loads(error_body)
|
||
return {"error": error_data.get("message", str(e)), "status": e.code}
|
||
except:
|
||
return {"error": str(e), "status": e.code}
|
||
except urllib.error.URLError as e:
|
||
return {"error": f"Connection failed: {e.reason}"}
|
||
except Exception as e:
|
||
return {"error": str(e)}
|
||
|
||
def get_token(self, email: str, password: str) -> Optional[str]:
|
||
"""
|
||
Authenticate and get access token
|
||
|
||
Args:
|
||
email: User email
|
||
password: User password
|
||
|
||
Returns:
|
||
Access token or None
|
||
"""
|
||
result = self._make_request(
|
||
"/users/login", method="POST", data={"email": email, "password": password}
|
||
)
|
||
|
||
if "token" in result:
|
||
return result["token"]
|
||
elif "error" in result:
|
||
print(f" ✗ Auth failed: {result['error']}")
|
||
return None
|
||
|
||
return None
|
||
|
||
def detect_language(self, content: str) -> str:
|
||
"""Detect if content is Thai or English"""
|
||
thai_chars = sum(1 for c in content if "\u0e00" <= c <= "\u0e7f")
|
||
total_chars = len(content)
|
||
thai_ratio = thai_chars / total_chars if total_chars > 0 else 0
|
||
return "th" if thai_ratio > 0.3 else "en"
|
||
|
||
def generate_slug(self, title: str, lang: str = "en") -> str:
|
||
"""Generate URL-friendly slug"""
|
||
# Remove special characters
|
||
slug = re.sub(r"[^\w\s-]", "", title.lower())
|
||
# Replace whitespace with hyphens
|
||
slug = re.sub(r"[-\s]+", "-", slug)
|
||
# Remove leading/trailing hyphens
|
||
slug = slug.strip("-_")
|
||
# Limit length
|
||
return slug[:100]
|
||
|
||
def parse_frontmatter(self, content: str) -> Dict:
|
||
"""Parse frontmatter from markdown content"""
|
||
import yaml
|
||
|
||
if not content.startswith("---"):
|
||
return {}
|
||
|
||
try:
|
||
# Extract frontmatter
|
||
parts = content.split("---", 2)
|
||
if len(parts) >= 2:
|
||
frontmatter = yaml.safe_load(parts[1])
|
||
return frontmatter or {}
|
||
except:
|
||
pass
|
||
|
||
return {}
|
||
|
||
def markdown_to_lexical(self, markdown: str) -> Dict:
|
||
"""
|
||
Convert markdown content to Lexical JSON format
|
||
|
||
This creates a basic Lexical editor state from markdown.
|
||
For full fidelity, consider using a proper markdown-to-lexical library.
|
||
|
||
Args:
|
||
markdown: Raw markdown content
|
||
|
||
Returns:
|
||
Lexical JSON object
|
||
"""
|
||
# Basic markdown to Lexical conversion
|
||
# This creates a simple paragraph-based structure
|
||
lines = markdown.split("\n")
|
||
children = []
|
||
|
||
for line in lines:
|
||
line = line.strip()
|
||
if not line:
|
||
children.append(
|
||
{"type": "paragraph", "children": [{"type": "text", "text": ""}]}
|
||
)
|
||
elif line.startswith("# "):
|
||
# H1
|
||
children.append(
|
||
{
|
||
"type": "heading",
|
||
"tag": "h1",
|
||
"children": [{"type": "text", "text": line[2:]}],
|
||
}
|
||
)
|
||
elif line.startswith("## "):
|
||
# H2
|
||
children.append(
|
||
{
|
||
"type": "heading",
|
||
"tag": "h2",
|
||
"children": [{"type": "text", "text": line[3:]}],
|
||
}
|
||
)
|
||
elif line.startswith("### "):
|
||
# H3
|
||
children.append(
|
||
{
|
||
"type": "heading",
|
||
"tag": "h3",
|
||
"children": [{"type": "text", "text": line[4:]}],
|
||
}
|
||
)
|
||
elif line.startswith("- ") or line.startswith("* "):
|
||
# Unordered list item
|
||
children.append(
|
||
{
|
||
"type": "list",
|
||
"listType": "bullet",
|
||
"children": [
|
||
{
|
||
"type": "listitem",
|
||
"children": [{"type": "text", "text": line[2:]}],
|
||
}
|
||
],
|
||
}
|
||
)
|
||
elif line.startswith("1. ") or line.startswith("1) "):
|
||
# Ordered list item
|
||
children.append(
|
||
{
|
||
"type": "list",
|
||
"listType": "number",
|
||
"children": [
|
||
{
|
||
"type": "listitem",
|
||
"children": [
|
||
{
|
||
"type": "text",
|
||
"text": re.sub(r"^\d+[.\)]\s*", "", line),
|
||
}
|
||
],
|
||
}
|
||
],
|
||
}
|
||
)
|
||
elif line.startswith("> "):
|
||
# Blockquote
|
||
children.append(
|
||
{"type": "quote", "children": [{"type": "text", "text": line[2:]}]}
|
||
)
|
||
elif line.startswith("```"):
|
||
# Code block (simplified)
|
||
children.append(
|
||
{"type": "paragraph", "children": [{"type": "text", "text": line}]}
|
||
)
|
||
else:
|
||
# Regular paragraph
|
||
# Handle bold and italic inline
|
||
text = line
|
||
children.append(
|
||
{"type": "paragraph", "children": [{"type": "text", "text": text}]}
|
||
)
|
||
|
||
return {
|
||
"root": {
|
||
"type": "root",
|
||
"format": "",
|
||
"indent": 0,
|
||
"version": 1,
|
||
"children": children,
|
||
}
|
||
}
|
||
|
||
def publish(
|
||
self,
|
||
markdown_content: str,
|
||
images: List[str] = None,
|
||
use_git: bool = False,
|
||
payload_token: str = None,
|
||
) -> Dict:
|
||
"""
|
||
Publish blog post to Payload CMS
|
||
|
||
Args:
|
||
markdown_content: Full markdown with frontmatter
|
||
images: List of image paths to upload
|
||
use_git: Whether to git commit and push (default: False)
|
||
payload_token: Payload CMS access token
|
||
|
||
Returns:
|
||
Publication result
|
||
"""
|
||
try:
|
||
# Parse frontmatter
|
||
frontmatter = self.parse_frontmatter(markdown_content)
|
||
|
||
# Get required fields
|
||
title = frontmatter.get("title", "Untitled")
|
||
slug = frontmatter.get("slug") or self.generate_slug(title)
|
||
lang = frontmatter.get("lang") or self.detect_language(markdown_content)
|
||
status = frontmatter.get("status", "draft")
|
||
description = frontmatter.get("description", "")
|
||
|
||
# Extract markdown body (after frontmatter)
|
||
body_content = markdown_content
|
||
if markdown_content.startswith("---"):
|
||
parts = markdown_content.split("---", 2)
|
||
if len(parts) >= 3:
|
||
body_content = parts[2].strip()
|
||
|
||
# Convert markdown to Lexical
|
||
lexical_content = self.markdown_to_lexical(body_content)
|
||
|
||
# Prepare Payload CMS document
|
||
payload_doc = {
|
||
"title": title,
|
||
"slug": slug,
|
||
"content": lexical_content,
|
||
"status": status,
|
||
"publishedAt": datetime.now().isoformat()
|
||
if status == "published"
|
||
else None,
|
||
}
|
||
|
||
if description:
|
||
payload_doc["description"] = description
|
||
|
||
# Add language prefix to slug for Thai content
|
||
if lang == "th":
|
||
payload_doc["slug"] = f"th/{slug}"
|
||
|
||
print(f"\n📝 Publishing to Payload CMS")
|
||
print(f" Title: {title}")
|
||
print(f" Slug: {payload_doc['slug']}")
|
||
print(f" Language: {lang}")
|
||
print(f" Status: {status}")
|
||
|
||
# Send to Payload CMS API
|
||
if payload_token:
|
||
headers = {"Authorization": f"Bearer {payload_token}"}
|
||
else:
|
||
headers = {}
|
||
|
||
# Check if post already exists
|
||
check_result = self._make_request(
|
||
f"/{self.collection}?where[slug][equals]={payload_doc['slug']}",
|
||
method="GET",
|
||
token=payload_token,
|
||
)
|
||
|
||
existing_doc = None
|
||
if check_result.get("docs") and len(check_result["docs"]) > 0:
|
||
existing_doc = check_result["docs"][0]
|
||
print(f" ℹ️ Post already exists with ID: {existing_doc['id']}")
|
||
|
||
# Create or update the post
|
||
if existing_doc:
|
||
# Update existing
|
||
result = self._make_request(
|
||
f"/{self.collection}/{existing_doc['id']}",
|
||
method="PATCH",
|
||
data=payload_doc,
|
||
token=payload_token,
|
||
)
|
||
action = "updated"
|
||
else:
|
||
# Create new
|
||
result = self._make_request(
|
||
f"/{self.collection}",
|
||
method="POST",
|
||
data=payload_doc,
|
||
token=payload_token,
|
||
)
|
||
action = "created"
|
||
|
||
if "error" in result and "id" not in result:
|
||
return {
|
||
"success": False,
|
||
"error": result.get("error", "Unknown error"),
|
||
"status_code": result.get("status", 500),
|
||
}
|
||
|
||
doc_id = result.get("doc", result.get("id"))
|
||
print(f" ✓ Post {action}: {doc_id}")
|
||
|
||
# Upload images if provided (to media collection)
|
||
uploaded_images = []
|
||
if images:
|
||
for img_path in images:
|
||
if os.path.exists(img_path):
|
||
uploaded = self._upload_media(img_path, payload_token)
|
||
if uploaded:
|
||
uploaded_images.append(uploaded)
|
||
print(f" ✓ Uploaded image: {os.path.basename(img_path)}")
|
||
|
||
# Git commit and push (OPTIONAL)
|
||
git_result = None
|
||
if use_git and self.website_repo:
|
||
git_result = self._git_commit_and_push(payload_doc["slug"], lang)
|
||
elif use_git:
|
||
print(f" ℹ️ Direct write complete (no git repo configured)")
|
||
|
||
return {
|
||
"success": True,
|
||
"id": doc_id,
|
||
"slug": payload_doc["slug"],
|
||
"language": lang,
|
||
"action": action,
|
||
"api_url": f"{self.api_base}/{self.collection}",
|
||
"admin_url": f"{self.website_url}/admin/collections/{self.collection}/{doc_id}",
|
||
"git_result": git_result,
|
||
"method": "api" + (" + git" if use_git else ""),
|
||
"images_uploaded": len(uploaded_images),
|
||
}
|
||
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
def _upload_media(self, file_path: str, token: str = None) -> Optional[Dict]:
|
||
"""
|
||
Upload media file to Payload CMS media collection
|
||
|
||
Args:
|
||
file_path: Path to the image file
|
||
token: Bearer token
|
||
|
||
Returns:
|
||
Uploaded media document or None
|
||
"""
|
||
import mimetypes
|
||
|
||
filename = os.path.basename(file_path)
|
||
mime_type, _ = mimetypes.guess_type(file_path)
|
||
|
||
try:
|
||
with open(file_path, "rb") as f:
|
||
file_data = f.read()
|
||
|
||
# Build multipart form data
|
||
boundary = "----FormBoundary7MA4YWxkTrZu0gW"
|
||
body_parts = []
|
||
|
||
# Add filename field
|
||
body_parts.append(f"--{boundary}\r\n".encode())
|
||
body_parts.append(
|
||
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'.encode()
|
||
)
|
||
body_parts.append(
|
||
f"Content-Type: {mime_type or 'application/octet-stream'}\r\n\r\n".encode()
|
||
)
|
||
body_parts.append(file_data)
|
||
body_parts.append(b"\r\n")
|
||
|
||
# Add alt text
|
||
body_parts.append(f"--{boundary}\r\n".encode())
|
||
body_parts.append(
|
||
f'Content-Disposition: form-data; name="alt"\r\n\r\n'.encode()
|
||
)
|
||
body_parts.append(filename.encode())
|
||
body_parts.append(b"\r\n")
|
||
|
||
# Close boundary
|
||
body_parts.append(f"--{boundary}--\r\n".encode())
|
||
|
||
body = b"".join(body_parts)
|
||
|
||
url = f"{self.api_base}/media"
|
||
headers = {
|
||
"Content-Type": f"multipart/form-data; boundary={boundary}",
|
||
}
|
||
|
||
if token:
|
||
headers["Authorization"] = f"Bearer {token}"
|
||
|
||
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
|
||
|
||
with urllib.request.urlopen(req, timeout=60) as response:
|
||
result = json.loads(response.read().decode("utf-8"))
|
||
return result
|
||
|
||
except Exception as e:
|
||
print(f" ✗ Failed to upload {filename}: {e}")
|
||
return None
|
||
|
||
def _git_commit_and_push(self, slug: str, lang: str) -> Dict:
|
||
"""Commit and push changes to git"""
|
||
if not self.website_repo:
|
||
return {"success": False, "error": "No git repository configured"}
|
||
|
||
try:
|
||
# Check if git repo
|
||
if not os.path.exists(os.path.join(self.website_repo, ".git")):
|
||
return {"success": False, "error": "Not a git repository"}
|
||
|
||
# Git add
|
||
subprocess.run(
|
||
["git", "add", "."],
|
||
cwd=self.website_repo,
|
||
check=True,
|
||
capture_output=True,
|
||
)
|
||
|
||
# Git commit
|
||
message = f"Add blog post: {slug} ({lang})"
|
||
result = subprocess.run(
|
||
["git", "commit", "-m", message],
|
||
cwd=self.website_repo,
|
||
capture_output=True,
|
||
)
|
||
|
||
if result.returncode != 0:
|
||
# Check if there's nothing to commit
|
||
if "nothing to commit" in result.stderr.decode():
|
||
print(f" ℹ️ Nothing to commit (no changes)")
|
||
return {"success": True, "message": "nothing to commit"}
|
||
return {"success": False, "error": result.stderr.decode()}
|
||
|
||
# Git push
|
||
subprocess.run(
|
||
["git", "push"], cwd=self.website_repo, check=True, capture_output=True
|
||
)
|
||
|
||
print(f" ✓ Committed: {message}")
|
||
print(f" ✓ Pushed to remote")
|
||
|
||
return {
|
||
"success": True,
|
||
"commit_message": message,
|
||
"triggered_deploy": True,
|
||
}
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
print(f" ✗ Git error: {e.stderr.decode() if e.stderr else str(e)}")
|
||
return {"success": False, "error": "Git operation failed"}
|
||
except Exception as e:
|
||
print(f" ✗ Error: {e}")
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
def main():
|
||
"""Main entry point"""
|
||
parser = argparse.ArgumentParser(description="Publish to Payload CMS")
|
||
parser.add_argument("--file", required=True, help="Markdown file to publish")
|
||
parser.add_argument(
|
||
"--website-url", required=True, help="Website URL (e.g., https://example.com)"
|
||
)
|
||
parser.add_argument("--website-repo", help="Path to website repo (for git sync)")
|
||
parser.add_argument("--email", help="Payload CMS email for authentication")
|
||
parser.add_argument("--password", help="Payload CMS password for authentication")
|
||
parser.add_argument(
|
||
"--token", help="Payload CMS access token (alternative to email/password)"
|
||
)
|
||
parser.add_argument("--image", action="append", help="Image files to upload")
|
||
parser.add_argument(
|
||
"--use-git", action="store_true", help="Use git commit/push (default: False)"
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
print(f"\n📝 Publishing to Payload CMS\n")
|
||
|
||
# Get authentication token
|
||
payload_token = args.token
|
||
if not payload_token and args.email and args.password:
|
||
# First, try to get token via the website's login endpoint
|
||
# For now, require token directly - in future could add auto-login
|
||
print("⚠️ Token-based auth required. Use --token or set PAYLOAD_TOKEN env var.")
|
||
payload_token = os.environ.get("PAYLOAD_TOKEN")
|
||
|
||
if not payload_token:
|
||
print("❌ Error: --token required or set PAYLOAD_TOKEN environment variable")
|
||
sys.exit(1)
|
||
|
||
# Read markdown file
|
||
with open(args.file, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
|
||
# Publish
|
||
publisher = PayloadPublisher(args.website_url, args.website_repo)
|
||
result = publisher.publish(
|
||
content, images=args.image, use_git=args.use_git, payload_token=payload_token
|
||
)
|
||
|
||
if result["success"]:
|
||
print(f"\n✅ Published successfully!")
|
||
print(f" ID: {result['id']}")
|
||
print(f" Slug: {result['slug']}")
|
||
print(f" Language: {result['language']}")
|
||
print(f" Method: {result['method']}")
|
||
print(f" Admin: {result['admin_url']}")
|
||
|
||
if result.get("images_uploaded", 0) > 0:
|
||
print(f" Images: {result['images_uploaded']} uploaded")
|
||
|
||
if result.get("git_result") and result["git_result"].get("success"):
|
||
print(f" ✓ Committed and pushed to Gitea")
|
||
print(f" ✓ Deployment triggered")
|
||
else:
|
||
print(f"\n❌ Publication failed: {result.get('error')}")
|
||
if result.get("status_code"):
|
||
print(f" Status code: {result['status_code']}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|