Files
Kunthawat Greethong 628298183a feat: migrate website-creator from Next.js+Payload to Astro+Tina CMS
Major changes:
- Replace Payload CMS with Tina CMS (self-hosted)
- Add Astro DB for consent logging (PDPA compliant)
- Update Tailwind v3 to v4 (@tailwindcss/vite plugin)
- Add astro-tina-starter template
- Rewrite consent template for Astro (ConsentBanner.astro, Astro DB, Nano Stores)
- Add install-tina-backend.sh for self-hosted Tina per customer
- Rename convert-astro.sh to migrate-tina.sh
- Add AGENTS.md template for generated websites
- Delete all Payload/Next.js files

Technical updates:
- Astro DB using defineDb with eq operators for queries
- Tailwind v4 with @theme block
- Tina CMS local development mode
- Proper Astro API routes for consent

Research-verified with official documentation (April 2026)
2026-04-17 14:52:59 +07:00

590 lines
20 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Auto-Publish to Payload CMS
Publishes blog posts to Payload CMS collections via REST API,
commits to git, and triggers auto-deploy.
"""
import os
import sys
import subprocess
import argparse
import re
import json
import urllib.request
import urllib.error
from pathlib import Path
from datetime import datetime
from typing import Dict, Optional, List
class PayloadPublisher:
"""Publish blog posts to Payload CMS via REST API"""
def __init__(self, website_url: str, website_repo: str = None):
"""
Initialize Payload publisher
Args:
website_url: URL of the website (e.g., https://example.com)
website_repo: Optional path to website repository for git sync
"""
self.website_url = website_url.rstrip("/")
self.api_base = f"{self.website_url}/api"
self.website_repo = website_repo
self.collection = "posts"
def _make_request(
self, endpoint: str, method: str = "GET", data: dict = None, token: str = None
) -> Dict:
"""
Make HTTP request to Payload CMS API
Args:
endpoint: API endpoint path (e.g., '/posts' or '/globals/site')
method: HTTP method
data: JSON data to send
token: Bearer token for authentication
Returns:
Response JSON as dict
"""
url = f"{self.api_base}{endpoint}"
headers = {
"Content-Type": "application/json",
}
if token:
headers["Authorization"] = f"Bearer {token}"
request_data = None
if data:
request_data = json.dumps(data).encode("utf-8")
req = urllib.request.Request(
url, data=request_data, headers=headers, method=method
)
try:
with urllib.request.urlopen(req, timeout=30) as response:
response_body = response.read().decode("utf-8")
if response_body:
return json.loads(response_body)
return {}
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8") if e.fp else "{}"
try:
error_data = json.loads(error_body)
return {"error": error_data.get("message", str(e)), "status": e.code}
except:
return {"error": str(e), "status": e.code}
except urllib.error.URLError as e:
return {"error": f"Connection failed: {e.reason}"}
except Exception as e:
return {"error": str(e)}
def get_token(self, email: str, password: str) -> Optional[str]:
"""
Authenticate and get access token
Args:
email: User email
password: User password
Returns:
Access token or None
"""
result = self._make_request(
"/users/login", method="POST", data={"email": email, "password": password}
)
if "token" in result:
return result["token"]
elif "error" in result:
print(f" ✗ Auth failed: {result['error']}")
return None
return None
def detect_language(self, content: str) -> str:
"""Detect if content is Thai or English"""
thai_chars = sum(1 for c in content if "\u0e00" <= c <= "\u0e7f")
total_chars = len(content)
thai_ratio = thai_chars / total_chars if total_chars > 0 else 0
return "th" if thai_ratio > 0.3 else "en"
def generate_slug(self, title: str, lang: str = "en") -> str:
"""Generate URL-friendly slug"""
# Remove special characters
slug = re.sub(r"[^\w\s-]", "", title.lower())
# Replace whitespace with hyphens
slug = re.sub(r"[-\s]+", "-", slug)
# Remove leading/trailing hyphens
slug = slug.strip("-_")
# Limit length
return slug[:100]
def parse_frontmatter(self, content: str) -> Dict:
"""Parse frontmatter from markdown content"""
import yaml
if not content.startswith("---"):
return {}
try:
# Extract frontmatter
parts = content.split("---", 2)
if len(parts) >= 2:
frontmatter = yaml.safe_load(parts[1])
return frontmatter or {}
except:
pass
return {}
def markdown_to_lexical(self, markdown: str) -> Dict:
"""
Convert markdown content to Lexical JSON format
This creates a basic Lexical editor state from markdown.
For full fidelity, consider using a proper markdown-to-lexical library.
Args:
markdown: Raw markdown content
Returns:
Lexical JSON object
"""
# Basic markdown to Lexical conversion
# This creates a simple paragraph-based structure
lines = markdown.split("\n")
children = []
for line in lines:
line = line.strip()
if not line:
children.append(
{"type": "paragraph", "children": [{"type": "text", "text": ""}]}
)
elif line.startswith("# "):
# H1
children.append(
{
"type": "heading",
"tag": "h1",
"children": [{"type": "text", "text": line[2:]}],
}
)
elif line.startswith("## "):
# H2
children.append(
{
"type": "heading",
"tag": "h2",
"children": [{"type": "text", "text": line[3:]}],
}
)
elif line.startswith("### "):
# H3
children.append(
{
"type": "heading",
"tag": "h3",
"children": [{"type": "text", "text": line[4:]}],
}
)
elif line.startswith("- ") or line.startswith("* "):
# Unordered list item
children.append(
{
"type": "list",
"listType": "bullet",
"children": [
{
"type": "listitem",
"children": [{"type": "text", "text": line[2:]}],
}
],
}
)
elif line.startswith("1. ") or line.startswith("1) "):
# Ordered list item
children.append(
{
"type": "list",
"listType": "number",
"children": [
{
"type": "listitem",
"children": [
{
"type": "text",
"text": re.sub(r"^\d+[.\)]\s*", "", line),
}
],
}
],
}
)
elif line.startswith("> "):
# Blockquote
children.append(
{"type": "quote", "children": [{"type": "text", "text": line[2:]}]}
)
elif line.startswith("```"):
# Code block (simplified)
children.append(
{"type": "paragraph", "children": [{"type": "text", "text": line}]}
)
else:
# Regular paragraph
# Handle bold and italic inline
text = line
children.append(
{"type": "paragraph", "children": [{"type": "text", "text": text}]}
)
return {
"root": {
"type": "root",
"format": "",
"indent": 0,
"version": 1,
"children": children,
}
}
def publish(
self,
markdown_content: str,
images: List[str] = None,
use_git: bool = False,
payload_token: str = None,
) -> Dict:
"""
Publish blog post to Payload CMS
Args:
markdown_content: Full markdown with frontmatter
images: List of image paths to upload
use_git: Whether to git commit and push (default: False)
payload_token: Payload CMS access token
Returns:
Publication result
"""
try:
# Parse frontmatter
frontmatter = self.parse_frontmatter(markdown_content)
# Get required fields
title = frontmatter.get("title", "Untitled")
slug = frontmatter.get("slug") or self.generate_slug(title)
lang = frontmatter.get("lang") or self.detect_language(markdown_content)
status = frontmatter.get("status", "draft")
description = frontmatter.get("description", "")
# Extract markdown body (after frontmatter)
body_content = markdown_content
if markdown_content.startswith("---"):
parts = markdown_content.split("---", 2)
if len(parts) >= 3:
body_content = parts[2].strip()
# Convert markdown to Lexical
lexical_content = self.markdown_to_lexical(body_content)
# Prepare Payload CMS document
payload_doc = {
"title": title,
"slug": slug,
"content": lexical_content,
"status": status,
"publishedAt": datetime.now().isoformat()
if status == "published"
else None,
}
if description:
payload_doc["description"] = description
# Add language prefix to slug for Thai content
if lang == "th":
payload_doc["slug"] = f"th/{slug}"
print(f"\n📝 Publishing to Payload CMS")
print(f" Title: {title}")
print(f" Slug: {payload_doc['slug']}")
print(f" Language: {lang}")
print(f" Status: {status}")
# Send to Payload CMS API
if payload_token:
headers = {"Authorization": f"Bearer {payload_token}"}
else:
headers = {}
# Check if post already exists
check_result = self._make_request(
f"/{self.collection}?where[slug][equals]={payload_doc['slug']}",
method="GET",
token=payload_token,
)
existing_doc = None
if check_result.get("docs") and len(check_result["docs"]) > 0:
existing_doc = check_result["docs"][0]
print(f" Post already exists with ID: {existing_doc['id']}")
# Create or update the post
if existing_doc:
# Update existing
result = self._make_request(
f"/{self.collection}/{existing_doc['id']}",
method="PATCH",
data=payload_doc,
token=payload_token,
)
action = "updated"
else:
# Create new
result = self._make_request(
f"/{self.collection}",
method="POST",
data=payload_doc,
token=payload_token,
)
action = "created"
if "error" in result and "id" not in result:
return {
"success": False,
"error": result.get("error", "Unknown error"),
"status_code": result.get("status", 500),
}
doc_id = result.get("doc", result.get("id"))
print(f" ✓ Post {action}: {doc_id}")
# Upload images if provided (to media collection)
uploaded_images = []
if images:
for img_path in images:
if os.path.exists(img_path):
uploaded = self._upload_media(img_path, payload_token)
if uploaded:
uploaded_images.append(uploaded)
print(f" ✓ Uploaded image: {os.path.basename(img_path)}")
# Git commit and push (OPTIONAL)
git_result = None
if use_git and self.website_repo:
git_result = self._git_commit_and_push(payload_doc["slug"], lang)
elif use_git:
print(f" Direct write complete (no git repo configured)")
return {
"success": True,
"id": doc_id,
"slug": payload_doc["slug"],
"language": lang,
"action": action,
"api_url": f"{self.api_base}/{self.collection}",
"admin_url": f"{self.website_url}/admin/collections/{self.collection}/{doc_id}",
"git_result": git_result,
"method": "api" + (" + git" if use_git else ""),
"images_uploaded": len(uploaded_images),
}
except Exception as e:
return {"success": False, "error": str(e)}
def _upload_media(self, file_path: str, token: str = None) -> Optional[Dict]:
"""
Upload media file to Payload CMS media collection
Args:
file_path: Path to the image file
token: Bearer token
Returns:
Uploaded media document or None
"""
import mimetypes
filename = os.path.basename(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
try:
with open(file_path, "rb") as f:
file_data = f.read()
# Build multipart form data
boundary = "----FormBoundary7MA4YWxkTrZu0gW"
body_parts = []
# Add filename field
body_parts.append(f"--{boundary}\r\n".encode())
body_parts.append(
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'.encode()
)
body_parts.append(
f"Content-Type: {mime_type or 'application/octet-stream'}\r\n\r\n".encode()
)
body_parts.append(file_data)
body_parts.append(b"\r\n")
# Add alt text
body_parts.append(f"--{boundary}\r\n".encode())
body_parts.append(
f'Content-Disposition: form-data; name="alt"\r\n\r\n'.encode()
)
body_parts.append(filename.encode())
body_parts.append(b"\r\n")
# Close boundary
body_parts.append(f"--{boundary}--\r\n".encode())
body = b"".join(body_parts)
url = f"{self.api_base}/media"
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
}
if token:
headers["Authorization"] = f"Bearer {token}"
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=60) as response:
result = json.loads(response.read().decode("utf-8"))
return result
except Exception as e:
print(f" ✗ Failed to upload {filename}: {e}")
return None
def _git_commit_and_push(self, slug: str, lang: str) -> Dict:
"""Commit and push changes to git"""
if not self.website_repo:
return {"success": False, "error": "No git repository configured"}
try:
# Check if git repo
if not os.path.exists(os.path.join(self.website_repo, ".git")):
return {"success": False, "error": "Not a git repository"}
# Git add
subprocess.run(
["git", "add", "."],
cwd=self.website_repo,
check=True,
capture_output=True,
)
# Git commit
message = f"Add blog post: {slug} ({lang})"
result = subprocess.run(
["git", "commit", "-m", message],
cwd=self.website_repo,
capture_output=True,
)
if result.returncode != 0:
# Check if there's nothing to commit
if "nothing to commit" in result.stderr.decode():
print(f" Nothing to commit (no changes)")
return {"success": True, "message": "nothing to commit"}
return {"success": False, "error": result.stderr.decode()}
# Git push
subprocess.run(
["git", "push"], cwd=self.website_repo, check=True, capture_output=True
)
print(f" ✓ Committed: {message}")
print(f" ✓ Pushed to remote")
return {
"success": True,
"commit_message": message,
"triggered_deploy": True,
}
except subprocess.CalledProcessError as e:
print(f" ✗ Git error: {e.stderr.decode() if e.stderr else str(e)}")
return {"success": False, "error": "Git operation failed"}
except Exception as e:
print(f" ✗ Error: {e}")
return {"success": False, "error": str(e)}
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description="Publish to Payload CMS")
parser.add_argument("--file", required=True, help="Markdown file to publish")
parser.add_argument(
"--website-url", required=True, help="Website URL (e.g., https://example.com)"
)
parser.add_argument("--website-repo", help="Path to website repo (for git sync)")
parser.add_argument("--email", help="Payload CMS email for authentication")
parser.add_argument("--password", help="Payload CMS password for authentication")
parser.add_argument(
"--token", help="Payload CMS access token (alternative to email/password)"
)
parser.add_argument("--image", action="append", help="Image files to upload")
parser.add_argument(
"--use-git", action="store_true", help="Use git commit/push (default: False)"
)
args = parser.parse_args()
print(f"\n📝 Publishing to Payload CMS\n")
# Get authentication token
payload_token = args.token
if not payload_token and args.email and args.password:
# First, try to get token via the website's login endpoint
# For now, require token directly - in future could add auto-login
print("⚠️ Token-based auth required. Use --token or set PAYLOAD_TOKEN env var.")
payload_token = os.environ.get("PAYLOAD_TOKEN")
if not payload_token:
print("❌ Error: --token required or set PAYLOAD_TOKEN environment variable")
sys.exit(1)
# Read markdown file
with open(args.file, "r", encoding="utf-8") as f:
content = f.read()
# Publish
publisher = PayloadPublisher(args.website_url, args.website_repo)
result = publisher.publish(
content, images=args.image, use_git=args.use_git, payload_token=payload_token
)
if result["success"]:
print(f"\n✅ Published successfully!")
print(f" ID: {result['id']}")
print(f" Slug: {result['slug']}")
print(f" Language: {result['language']}")
print(f" Method: {result['method']}")
print(f" Admin: {result['admin_url']}")
if result.get("images_uploaded", 0) > 0:
print(f" Images: {result['images_uploaded']} uploaded")
if result.get("git_result") and result["git_result"].get("success"):
print(f" ✓ Committed and pushed to Gitea")
print(f" ✓ Deployment triggered")
else:
print(f"\n❌ Publication failed: {result.get('error')}")
if result.get("status_code"):
print(f" Status code: {result['status_code']}")
if __name__ == "__main__":
main()