feat: migrate website-creator from Next.js+Payload to Astro+Tina CMS

Major changes:
- Replace Payload CMS with Tina CMS (self-hosted)
- Add Astro DB for consent logging (PDPA compliant)
- Update Tailwind v3 to v4 (@tailwindcss/vite plugin)
- Add astro-tina-starter template
- Rewrite consent template for Astro (ConsentBanner.astro, Astro DB, Nano Stores)
- Add install-tina-backend.sh for self-hosted Tina per customer
- Rename convert-astro.sh to migrate-tina.sh
- Add AGENTS.md template for generated websites
- Delete all Payload/Next.js files

Technical updates:
- Astro DB using defineDb with eq operators for queries
- Tailwind v4 with @theme block
- Tina CMS local development mode
- Proper Astro API routes for consent

Research-verified with official documentation (April 2026)
This commit is contained in:
2026-04-17 14:52:59 +07:00
parent ce8483e546
commit 628298183a
74 changed files with 3536 additions and 11431 deletions

View File

@@ -1,8 +1,8 @@
#!/usr/bin/env python3
"""
Auto-Publish to Astro Content Collections
Auto-Publish to Payload CMS
Publishes blog posts to Astro content collections,
Publishes blog posts to Payload CMS collections via REST API,
commits to git, and triggers auto-deploy.
"""
@@ -11,195 +11,579 @@ import sys
import subprocess
import argparse
import re
import json
import urllib.request
import urllib.error
from pathlib import Path
from datetime import datetime
from typing import Dict, Optional
from typing import Dict, Optional, List
class AstroPublisher:
"""Publish blog posts to Astro content collections"""
def __init__(self, website_repo: str):
class PayloadPublisher:
"""Publish blog posts to Payload CMS via REST API"""
def __init__(self, website_url: str, website_repo: str = None):
"""
Initialize Astro publisher
Initialize Payload publisher
Args:
website_repo: Path to Astro website repository
website_url: URL of the website (e.g., https://example.com)
website_repo: Optional path to website repository for git sync
"""
self.website_url = website_url.rstrip("/")
self.api_base = f"{self.website_url}/api"
self.website_repo = website_repo
self.content_dir = os.path.join(website_repo, 'src/content/blog')
self.images_dir = os.path.join(website_repo, 'public/images/blog')
self.collection = "posts"
def _make_request(
self, endpoint: str, method: str = "GET", data: dict = None, token: str = None
) -> Dict:
"""
Make HTTP request to Payload CMS API
Args:
endpoint: API endpoint path (e.g., '/posts' or '/globals/site')
method: HTTP method
data: JSON data to send
token: Bearer token for authentication
Returns:
Response JSON as dict
"""
url = f"{self.api_base}{endpoint}"
headers = {
"Content-Type": "application/json",
}
if token:
headers["Authorization"] = f"Bearer {token}"
request_data = None
if data:
request_data = json.dumps(data).encode("utf-8")
req = urllib.request.Request(
url, data=request_data, headers=headers, method=method
)
try:
with urllib.request.urlopen(req, timeout=30) as response:
response_body = response.read().decode("utf-8")
if response_body:
return json.loads(response_body)
return {}
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8") if e.fp else "{}"
try:
error_data = json.loads(error_body)
return {"error": error_data.get("message", str(e)), "status": e.code}
except:
return {"error": str(e), "status": e.code}
except urllib.error.URLError as e:
return {"error": f"Connection failed: {e.reason}"}
except Exception as e:
return {"error": str(e)}
def get_token(self, email: str, password: str) -> Optional[str]:
"""
Authenticate and get access token
Args:
email: User email
password: User password
Returns:
Access token or None
"""
result = self._make_request(
"/users/login", method="POST", data={"email": email, "password": password}
)
if "token" in result:
return result["token"]
elif "error" in result:
print(f" ✗ Auth failed: {result['error']}")
return None
return None
def detect_language(self, content: str) -> str:
"""Detect if content is Thai or English"""
thai_chars = sum(1 for c in content if '\u0E00' <= c <= '\u0E7F')
thai_chars = sum(1 for c in content if "\u0e00" <= c <= "\u0e7f")
total_chars = len(content)
thai_ratio = thai_chars / total_chars if total_chars > 0 else 0
return 'th' if thai_ratio > 0.3 else 'en'
def generate_slug(self, title: str, lang: str = 'en') -> str:
return "th" if thai_ratio > 0.3 else "en"
def generate_slug(self, title: str, lang: str = "en") -> str:
"""Generate URL-friendly slug"""
# Remove special characters
slug = re.sub(r'[^\w\s-]', '', title.lower())
slug = re.sub(r"[^\w\s-]", "", title.lower())
# Replace whitespace with hyphens
slug = re.sub(r'[-\s]+', '-', slug)
slug = re.sub(r"[-\s]+", "-", slug)
# Remove leading/trailing hyphens
slug = slug.strip('-_')
slug = slug.strip("-_")
# Limit length
return slug[:100]
def parse_frontmatter(self, content: str) -> Dict:
"""Parse frontmatter from markdown content"""
import yaml
if not content.startswith('---'):
if not content.startswith("---"):
return {}
try:
# Extract frontmatter
parts = content.split('---', 2)
parts = content.split("---", 2)
if len(parts) >= 2:
frontmatter = yaml.safe_load(parts[1])
return frontmatter or {}
except:
pass
return {}
def publish(self, markdown_content: str, images: list = None, use_git: bool = False) -> Dict:
def markdown_to_lexical(self, markdown: str) -> Dict:
"""
Publish blog post to Astro content collections
Convert markdown content to Lexical JSON format
This creates a basic Lexical editor state from markdown.
For full fidelity, consider using a proper markdown-to-lexical library.
Args:
markdown: Raw markdown content
Returns:
Lexical JSON object
"""
# Basic markdown to Lexical conversion
# This creates a simple paragraph-based structure
lines = markdown.split("\n")
children = []
for line in lines:
line = line.strip()
if not line:
children.append(
{"type": "paragraph", "children": [{"type": "text", "text": ""}]}
)
elif line.startswith("# "):
# H1
children.append(
{
"type": "heading",
"tag": "h1",
"children": [{"type": "text", "text": line[2:]}],
}
)
elif line.startswith("## "):
# H2
children.append(
{
"type": "heading",
"tag": "h2",
"children": [{"type": "text", "text": line[3:]}],
}
)
elif line.startswith("### "):
# H3
children.append(
{
"type": "heading",
"tag": "h3",
"children": [{"type": "text", "text": line[4:]}],
}
)
elif line.startswith("- ") or line.startswith("* "):
# Unordered list item
children.append(
{
"type": "list",
"listType": "bullet",
"children": [
{
"type": "listitem",
"children": [{"type": "text", "text": line[2:]}],
}
],
}
)
elif line.startswith("1. ") or line.startswith("1) "):
# Ordered list item
children.append(
{
"type": "list",
"listType": "number",
"children": [
{
"type": "listitem",
"children": [
{
"type": "text",
"text": re.sub(r"^\d+[.\)]\s*", "", line),
}
],
}
],
}
)
elif line.startswith("> "):
# Blockquote
children.append(
{"type": "quote", "children": [{"type": "text", "text": line[2:]}]}
)
elif line.startswith("```"):
# Code block (simplified)
children.append(
{"type": "paragraph", "children": [{"type": "text", "text": line}]}
)
else:
# Regular paragraph
# Handle bold and italic inline
text = line
children.append(
{"type": "paragraph", "children": [{"type": "text", "text": text}]}
)
return {
"root": {
"type": "root",
"format": "",
"indent": 0,
"version": 1,
"children": children,
}
}
def publish(
self,
markdown_content: str,
images: List[str] = None,
use_git: bool = False,
payload_token: str = None,
) -> Dict:
"""
Publish blog post to Payload CMS
Args:
markdown_content: Full markdown with frontmatter
images: List of image paths to copy
use_git: Whether to git commit and push (default: False - direct write only)
images: List of image paths to upload
use_git: Whether to git commit and push (default: False)
payload_token: Payload CMS access token
Returns:
Publication result
"""
try:
# Parse frontmatter
frontmatter = self.parse_frontmatter(markdown_content)
# Get required fields
title = frontmatter.get('title', 'Untitled')
slug = frontmatter.get('slug') or self.generate_slug(title)
lang = frontmatter.get('lang') or self.detect_language(markdown_content)
# Determine output path
lang_folder = f'({lang})'
output_dir = os.path.join(self.content_dir, lang_folder)
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f'{slug}.md')
# Write markdown file (ALWAYS do this)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
print(f"\n✓ Saved: {output_path}")
# Copy images if provided
title = frontmatter.get("title", "Untitled")
slug = frontmatter.get("slug") or self.generate_slug(title)
lang = frontmatter.get("lang") or self.detect_language(markdown_content)
status = frontmatter.get("status", "draft")
description = frontmatter.get("description", "")
# Extract markdown body (after frontmatter)
body_content = markdown_content
if markdown_content.startswith("---"):
parts = markdown_content.split("---", 2)
if len(parts) >= 3:
body_content = parts[2].strip()
# Convert markdown to Lexical
lexical_content = self.markdown_to_lexical(body_content)
# Prepare Payload CMS document
payload_doc = {
"title": title,
"slug": slug,
"content": lexical_content,
"status": status,
"publishedAt": datetime.now().isoformat()
if status == "published"
else None,
}
if description:
payload_doc["description"] = description
# Add language prefix to slug for Thai content
if lang == "th":
payload_doc["slug"] = f"th/{slug}"
print(f"\n📝 Publishing to Payload CMS")
print(f" Title: {title}")
print(f" Slug: {payload_doc['slug']}")
print(f" Language: {lang}")
print(f" Status: {status}")
# Send to Payload CMS API
if payload_token:
headers = {"Authorization": f"Bearer {payload_token}"}
else:
headers = {}
# Check if post already exists
check_result = self._make_request(
f"/{self.collection}?where[slug][equals]={payload_doc['slug']}",
method="GET",
token=payload_token,
)
existing_doc = None
if check_result.get("docs") and len(check_result["docs"]) > 0:
existing_doc = check_result["docs"][0]
print(f" Post already exists with ID: {existing_doc['id']}")
# Create or update the post
if existing_doc:
# Update existing
result = self._make_request(
f"/{self.collection}/{existing_doc['id']}",
method="PATCH",
data=payload_doc,
token=payload_token,
)
action = "updated"
else:
# Create new
result = self._make_request(
f"/{self.collection}",
method="POST",
data=payload_doc,
token=payload_token,
)
action = "created"
if "error" in result and "id" not in result:
return {
"success": False,
"error": result.get("error", "Unknown error"),
"status_code": result.get("status", 500),
}
doc_id = result.get("doc", result.get("id"))
print(f" ✓ Post {action}: {doc_id}")
# Upload images if provided (to media collection)
uploaded_images = []
if images:
images_output = os.path.join(self.images_dir, slug)
os.makedirs(images_output, exist_ok=True)
for img_path in images:
if os.path.exists(img_path):
import shutil
shutil.copy(img_path, images_output)
print(f" ✓ Copied image: {os.path.basename(img_path)}")
# Git commit and push (OPTIONAL - only if requested and Gitea configured)
uploaded = self._upload_media(img_path, payload_token)
if uploaded:
uploaded_images.append(uploaded)
print(f" ✓ Uploaded image: {os.path.basename(img_path)}")
# Git commit and push (OPTIONAL)
git_result = None
if use_git:
git_result = self.git_commit_and_push(slug, lang)
else:
print(f" Direct write complete (no git)")
if use_git and self.website_repo:
git_result = self._git_commit_and_push(payload_doc["slug"], lang)
elif use_git:
print(f" Direct write complete (no git repo configured)")
return {
'success': True,
'slug': slug,
'language': lang,
'path': output_path,
'git_result': git_result,
'method': 'direct_write' if not use_git else 'git_push'
"success": True,
"id": doc_id,
"slug": payload_doc["slug"],
"language": lang,
"action": action,
"api_url": f"{self.api_base}/{self.collection}",
"admin_url": f"{self.website_url}/admin/collections/{self.collection}/{doc_id}",
"git_result": git_result,
"method": "api" + (" + git" if use_git else ""),
"images_uploaded": len(uploaded_images),
}
except Exception as e:
return {
'success': False,
'error': str(e)
return {"success": False, "error": str(e)}
def _upload_media(self, file_path: str, token: str = None) -> Optional[Dict]:
"""
Upload media file to Payload CMS media collection
Args:
file_path: Path to the image file
token: Bearer token
Returns:
Uploaded media document or None
"""
import mimetypes
filename = os.path.basename(file_path)
mime_type, _ = mimetypes.guess_type(file_path)
try:
with open(file_path, "rb") as f:
file_data = f.read()
# Build multipart form data
boundary = "----FormBoundary7MA4YWxkTrZu0gW"
body_parts = []
# Add filename field
body_parts.append(f"--{boundary}\r\n".encode())
body_parts.append(
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'.encode()
)
body_parts.append(
f"Content-Type: {mime_type or 'application/octet-stream'}\r\n\r\n".encode()
)
body_parts.append(file_data)
body_parts.append(b"\r\n")
# Add alt text
body_parts.append(f"--{boundary}\r\n".encode())
body_parts.append(
f'Content-Disposition: form-data; name="alt"\r\n\r\n'.encode()
)
body_parts.append(filename.encode())
body_parts.append(b"\r\n")
# Close boundary
body_parts.append(f"--{boundary}--\r\n".encode())
body = b"".join(body_parts)
url = f"{self.api_base}/media"
headers = {
"Content-Type": f"multipart/form-data; boundary={boundary}",
}
def git_commit_and_push(self, slug: str, lang: str) -> Dict:
if token:
headers["Authorization"] = f"Bearer {token}"
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=60) as response:
result = json.loads(response.read().decode("utf-8"))
return result
except Exception as e:
print(f" ✗ Failed to upload {filename}: {e}")
return None
def _git_commit_and_push(self, slug: str, lang: str) -> Dict:
"""Commit and push changes to git"""
if not self.website_repo:
return {"success": False, "error": "No git repository configured"}
try:
# Check if git repo
if not os.path.exists(os.path.join(self.website_repo, '.git')):
return {'success': False, 'error': 'Not a git repository'}
if not os.path.exists(os.path.join(self.website_repo, ".git")):
return {"success": False, "error": "Not a git repository"}
# Git add
subprocess.run(['git', 'add', '.'], cwd=self.website_repo, check=True, capture_output=True)
subprocess.run(
["git", "add", "."],
cwd=self.website_repo,
check=True,
capture_output=True,
)
# Git commit
message = f"Add blog post: {slug} ({lang})"
subprocess.run(['git', 'commit', '-m', message], cwd=self.website_repo, check=True, capture_output=True)
result = subprocess.run(
["git", "commit", "-m", message],
cwd=self.website_repo,
capture_output=True,
)
if result.returncode != 0:
# Check if there's nothing to commit
if "nothing to commit" in result.stderr.decode():
print(f" Nothing to commit (no changes)")
return {"success": True, "message": "nothing to commit"}
return {"success": False, "error": result.stderr.decode()}
# Git push
subprocess.run(['git', 'push'], cwd=self.website_repo, check=True, capture_output=True)
print(f"✓ Committed: {message}")
print(f"✓ Pushed to remote")
subprocess.run(
["git", "push"], cwd=self.website_repo, check=True, capture_output=True
)
print(f" ✓ Committed: {message}")
print(f" ✓ Pushed to remote")
return {
'success': True,
'commit_message': message,
'triggered_deploy': True
"success": True,
"commit_message": message,
"triggered_deploy": True,
}
except subprocess.CalledProcessError as e:
print(f"✗ Git error: {e.stderr.decode() if e.stderr else str(e)}")
return {'success': False, 'error': 'Git operation failed'}
print(f" ✗ Git error: {e.stderr.decode() if e.stderr else str(e)}")
return {"success": False, "error": "Git operation failed"}
except Exception as e:
print(f"✗ Error: {e}")
return {'success': False, 'error': str(e)}
print(f" ✗ Error: {e}")
return {"success": False, "error": str(e)}
def main():
"""Test Astro publisher"""
parser = argparse.ArgumentParser(description='Publish to Astro')
parser.add_argument('--file', required=True, help='Markdown file to publish')
parser.add_argument('--website-repo', required=True, help='Path to website repo')
parser.add_argument('--image', action='append', help='Image files to copy')
parser.add_argument('--use-git', action='store_true', help='Use git commit/push (default: direct write only)')
"""Main entry point"""
parser = argparse.ArgumentParser(description="Publish to Payload CMS")
parser.add_argument("--file", required=True, help="Markdown file to publish")
parser.add_argument(
"--website-url", required=True, help="Website URL (e.g., https://example.com)"
)
parser.add_argument("--website-repo", help="Path to website repo (for git sync)")
parser.add_argument("--email", help="Payload CMS email for authentication")
parser.add_argument("--password", help="Payload CMS password for authentication")
parser.add_argument(
"--token", help="Payload CMS access token (alternative to email/password)"
)
parser.add_argument("--image", action="append", help="Image files to upload")
parser.add_argument(
"--use-git", action="store_true", help="Use git commit/push (default: False)"
)
args = parser.parse_args()
print(f"\n📝 Publishing to Astro\n")
print(f"\n📝 Publishing to Payload CMS\n")
# Get authentication token
payload_token = args.token
if not payload_token and args.email and args.password:
# First, try to get token via the website's login endpoint
# For now, require token directly - in future could add auto-login
print("⚠️ Token-based auth required. Use --token or set PAYLOAD_TOKEN env var.")
payload_token = os.environ.get("PAYLOAD_TOKEN")
if not payload_token:
print("❌ Error: --token required or set PAYLOAD_TOKEN environment variable")
sys.exit(1)
# Read markdown file
with open(args.file, 'r', encoding='utf-8') as f:
with open(args.file, "r", encoding="utf-8") as f:
content = f.read()
# Publish (default: direct write, no git)
publisher = AstroPublisher(args.website_repo)
result = publisher.publish(content, args.image, use_git=args.use_git)
if result['success']:
# Publish
publisher = PayloadPublisher(args.website_url, args.website_repo)
result = publisher.publish(
content, images=args.image, use_git=args.use_git, payload_token=payload_token
)
if result["success"]:
print(f"\n✅ Published successfully!")
print(f" ID: {result['id']}")
print(f" Slug: {result['slug']}")
print(f" Language: {result['language']}")
print(f" Path: {result['path']}")
print(f" Method: {result['method']}")
if result.get('git_result') and result['git_result'].get('success'):
print(f" Admin: {result['admin_url']}")
if result.get("images_uploaded", 0) > 0:
print(f" Images: {result['images_uploaded']} uploaded")
if result.get("git_result") and result["git_result"].get("success"):
print(f" ✓ Committed and pushed to Gitea")
print(f" ✓ Deployment triggered")
else:
print(f"\n❌ Publication failed: {result.get('error')}")
if result.get("status_code"):
print(f" Status code: {result['status_code']}")
if __name__ == '__main__':
if __name__ == "__main__":
main()