476 lines
15 KiB
Python
476 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SEO Multi-Channel Content Generator
|
|
|
|
Generate marketing content for multiple channels from a single topic.
|
|
Supports Thai language with full PyThaiNLP integration.
|
|
|
|
Channels: Facebook > Facebook Ads > Google Ads > Blog > X (Twitter)
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any
|
|
import yaml
|
|
|
|
# Load environment variables
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv()
|
|
|
|
# Thai language processing
|
|
try:
|
|
from pythainlp import word_tokenize, sent_tokenize
|
|
from pythainlp.util import normalize
|
|
|
|
THAI_SUPPORT = True
|
|
except ImportError:
|
|
THAI_SUPPORT = False
|
|
print("Warning: PyThaiNLP not installed. Thai language support disabled.")
|
|
print("Install with: pip install pythainlp")
|
|
|
|
|
|
class ThaiTextProcessor:
|
|
"""Thai language text processing utilities"""
|
|
|
|
@staticmethod
|
|
def count_words(text: str) -> int:
|
|
"""Count Thai words (no spaces between words)"""
|
|
if not THAI_SUPPORT:
|
|
return len(text.split())
|
|
|
|
tokens = word_tokenize(text, engine="newmm")
|
|
return len([t for t in tokens if t.strip() and not t.isspace()])
|
|
|
|
@staticmethod
|
|
def count_sentences(text: str) -> int:
|
|
"""Count Thai sentences"""
|
|
if not THAI_SUPPORT:
|
|
return len(text.split("."))
|
|
|
|
sentences = sent_tokenize(text, engine="whitespace")
|
|
return len(sentences)
|
|
|
|
@staticmethod
|
|
def calculate_keyword_density(text: str, keyword: str) -> float:
|
|
"""Calculate keyword density for Thai text"""
|
|
if not THAI_SUPPORT:
|
|
text_words = text.lower().split()
|
|
keyword_count = text.lower().count(keyword.lower())
|
|
return (keyword_count / len(text_words) * 100) if text_words else 0
|
|
|
|
text_normalized = normalize(text)
|
|
keyword_normalized = normalize(keyword)
|
|
count = text_normalized.count(keyword_normalized)
|
|
word_count = ThaiTextProcessor.count_words(text)
|
|
return (count / word_count * 100) if word_count > 0 else 0
|
|
|
|
@staticmethod
|
|
def detect_language(text: str) -> str:
|
|
"""Detect if content is Thai or English"""
|
|
thai_chars = sum(1 for c in text if "\u0e00" <= c <= "\u0e7f")
|
|
total_chars = len(text)
|
|
thai_ratio = thai_chars / total_chars if total_chars > 0 else 0
|
|
|
|
return "th" if thai_ratio > 0.3 else "en"
|
|
|
|
|
|
class ChannelTemplate:
|
|
"""Load and manage channel templates"""
|
|
|
|
def __init__(self, channel_name: str, templates_dir: str):
|
|
self.channel_name = channel_name
|
|
self.template_path = os.path.join(templates_dir, f"{channel_name}.yaml")
|
|
self.template = self._load_template()
|
|
|
|
def _load_template(self) -> Dict:
|
|
"""Load YAML template"""
|
|
with open(self.template_path, "r", encoding="utf-8") as f:
|
|
return yaml.safe_load(f)
|
|
|
|
def get_specs(self) -> Dict:
|
|
"""Get channel specifications"""
|
|
return self.template.get("fields", {})
|
|
|
|
def get_quality_requirements(self) -> Dict:
|
|
"""Get quality requirements"""
|
|
return self.template.get("quality", {})
|
|
|
|
|
|
class ImageHandler:
|
|
"""Handle image generation and editing"""
|
|
|
|
def __init__(self, chutes_api_token: str):
|
|
self.chutes_token = chutes_api_token
|
|
self.output_base = "output"
|
|
|
|
def find_product_images(self, product_name: str, website_repo: str) -> List[str]:
|
|
"""Find existing product images in website repo"""
|
|
import glob
|
|
|
|
extensions = [".jpg", ".jpeg", ".png", ".webp"]
|
|
found_images = []
|
|
|
|
search_patterns = [f"**/*{product_name}*{{ext}}" for ext in extensions] + [
|
|
"public/images/**/*{ext}",
|
|
"src/assets/**/*{ext}",
|
|
]
|
|
|
|
for pattern in search_patterns:
|
|
matches = glob.glob(
|
|
os.path.join(website_repo, pattern.format(ext="*")), recursive=True
|
|
)
|
|
# Try specific extensions
|
|
for ext in extensions:
|
|
specific_matches = glob.glob(
|
|
os.path.join(website_repo, pattern.format(ext=ext)), recursive=True
|
|
)
|
|
found_images.extend(specific_matches)
|
|
|
|
return list(set(found_images))[:10]
|
|
|
|
def generate_image_for_channel(
|
|
self, topic: str, channel: str, content_type: str
|
|
) -> str:
|
|
"""
|
|
Handle image for content.
|
|
For product: browse repo first, ask user to confirm/provide
|
|
For non-product: ask user to provide image
|
|
"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
output_dir = os.path.join(
|
|
self.output_base, self._slugify(topic), channel, "images"
|
|
)
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
image_path = os.path.join(output_dir, f"generated_{timestamp}.png")
|
|
|
|
print(f" [Image] Please provide image for: {channel}")
|
|
print(f" Topic: {topic}, Type: {content_type}")
|
|
|
|
return image_path
|
|
|
|
def _slugify(self, text: str) -> str:
|
|
"""Convert text to URL-friendly slug"""
|
|
import re
|
|
|
|
slug = re.sub(r"[^\w\s-]", "", text.lower())
|
|
slug = re.sub(r"[-\s]+", "-", slug)
|
|
return slug.strip("-_")
|
|
|
|
|
|
class ContentGenerator:
|
|
"""Main content generator class"""
|
|
|
|
def __init__(
|
|
self,
|
|
topic: str,
|
|
channels: List[str],
|
|
website_repo: Optional[str] = None,
|
|
auto_publish: bool = False,
|
|
language: Optional[str] = None,
|
|
):
|
|
self.topic = topic
|
|
self.channels = channels
|
|
self.website_repo = website_repo
|
|
self.auto_publish = auto_publish
|
|
self.language = language
|
|
self.templates_dir = os.path.join(os.path.dirname(__file__), "templates")
|
|
self.output_base = "output"
|
|
|
|
# Initialize components
|
|
self.text_processor = ThaiTextProcessor()
|
|
|
|
# Load templates
|
|
self.templates = {}
|
|
for channel in channels:
|
|
template_name = self._get_template_name(channel)
|
|
if template_name:
|
|
self.templates[channel] = ChannelTemplate(
|
|
template_name, self.templates_dir
|
|
)
|
|
|
|
def _get_template_name(self, channel: str) -> Optional[str]:
|
|
"""Map channel name to template file"""
|
|
mapping = {
|
|
"facebook": "facebook",
|
|
"facebook_ads": "facebook_ads",
|
|
"google_ads": "google_ads",
|
|
"blog": "blog",
|
|
"x": "x_thread",
|
|
"twitter": "x_thread",
|
|
}
|
|
return mapping.get(channel.lower())
|
|
|
|
def generate_all(self) -> Dict[str, Any]:
|
|
"""Generate content for all channels"""
|
|
results = {
|
|
"topic": self.topic,
|
|
"generated_at": datetime.now().isoformat(),
|
|
"channels": {},
|
|
"summary": {},
|
|
}
|
|
|
|
print(f"\n🎯 Generating content for: {self.topic}")
|
|
print(f"📱 Channels: {', '.join(self.channels)}")
|
|
print(f"🌐 Language: {self.language or 'auto-detect'}\n")
|
|
|
|
for channel in self.channels:
|
|
if channel in self.templates:
|
|
print(f" Generating {channel}...")
|
|
channel_result = self._generate_for_channel(channel)
|
|
results["channels"][channel] = channel_result
|
|
|
|
# Save results
|
|
self._save_results(results)
|
|
|
|
return results
|
|
|
|
def _generate_for_channel(self, channel: str) -> Dict:
|
|
"""Generate content for specific channel"""
|
|
template = self.templates[channel]
|
|
specs = template.get_specs()
|
|
|
|
# Detect language from topic
|
|
lang = self.language or self.text_processor.detect_language(self.topic)
|
|
|
|
# Generate variations (placeholder - real implementation would use LLM)
|
|
variations = []
|
|
num_variations = template.template.get("output", {}).get("variations", 5)
|
|
|
|
for i in range(num_variations):
|
|
variation = self._create_variation(channel, i, lang, specs)
|
|
variations.append(variation)
|
|
|
|
return {
|
|
"channel": channel,
|
|
"language": lang,
|
|
"variations": variations,
|
|
"api_ready": template.template.get("api_ready", False),
|
|
}
|
|
|
|
def _create_variation(
|
|
self, channel: str, variation_num: int, language: str, specs: Dict
|
|
) -> Dict:
|
|
"""Create single content variation"""
|
|
# This is a placeholder - real implementation would call LLM
|
|
# with proper prompts based on channel template
|
|
|
|
base_variation = {
|
|
"id": f"{channel}_var_{variation_num + 1}",
|
|
"created_at": datetime.now().isoformat(),
|
|
}
|
|
|
|
# Channel-specific structure
|
|
if channel == "facebook":
|
|
base_variation.update(
|
|
{
|
|
"primary_text": f"[Facebook Post {variation_num + 1}] {self.topic}...",
|
|
"headline": f"[Headline] {self.topic}",
|
|
"cta": "เรียนรู้เพิ่มเติม" if language == "th" else "Learn More",
|
|
"hashtags": [f"#{self.topic.replace(' ', '')}"],
|
|
"image": {
|
|
"path": self.generate_image_for_channel(
|
|
self.topic, channel, "social"
|
|
)
|
|
},
|
|
}
|
|
)
|
|
|
|
elif channel == "facebook_ads":
|
|
base_variation.update(
|
|
{
|
|
"primary_text": f"[FB Ad Primary Text] {self.topic}...",
|
|
"headline": f"[FB Ad Headline - 40 chars]",
|
|
"description": f"[FB Ad Description - 90 chars]",
|
|
"cta": "SHOP_NOW",
|
|
"api_ready": {
|
|
"platform": "meta",
|
|
"api_version": "v18.0",
|
|
"endpoint": "/act_{ad_account_id}/adcreatives",
|
|
},
|
|
}
|
|
)
|
|
|
|
elif channel == "google_ads":
|
|
base_variation.update(
|
|
{
|
|
"headlines": [
|
|
{"text": f"[Headline {i + 1}] {self.topic}"} for i in range(15)
|
|
],
|
|
"descriptions": [
|
|
{"text": f"[Description {i + 1}] Learn more about {self.topic}"}
|
|
for i in range(4)
|
|
],
|
|
"keywords": [self.topic, f"บริการ {self.topic}"],
|
|
"api_ready": {
|
|
"platform": "google",
|
|
"api_version": "v15.0",
|
|
"endpoint": "/google.ads.googleads.v15.services/GoogleAdsService:Mutate",
|
|
},
|
|
}
|
|
)
|
|
|
|
elif channel == "blog":
|
|
base_variation.update(
|
|
{
|
|
"markdown": self._generate_blog_markdown(language),
|
|
"frontmatter": {
|
|
"title": f"{self.topic} - Complete Guide",
|
|
"description": f"Learn about {self.topic}",
|
|
"slug": self._slugify(self.topic),
|
|
"lang": language,
|
|
},
|
|
"word_count": 2000 if language == "en" else 1500,
|
|
"publish_status": "draft",
|
|
}
|
|
)
|
|
|
|
elif channel in ["x", "twitter"]:
|
|
base_variation.update(
|
|
{
|
|
"tweets": [
|
|
f"[Tweet {i + 1}/7] Content about {self.topic}..."
|
|
for i in range(7)
|
|
],
|
|
"thread_title": f"Everything about {self.topic} 🧵",
|
|
}
|
|
)
|
|
|
|
return base_variation
|
|
|
|
def _generate_blog_markdown(self, language: str) -> str:
|
|
"""Generate blog post in Markdown format"""
|
|
slug = self._slugify(self.topic)
|
|
|
|
markdown = f"""---
|
|
title: "{self.topic} - Complete Guide"
|
|
description: "Learn everything about {self.topic} in this comprehensive guide"
|
|
keywords: ["{self.topic}", "บริการ {self.topic}", "guide"]
|
|
slug: {slug}
|
|
lang: {language}
|
|
category: guides
|
|
tags: ["{self.topic}", "guide"]
|
|
created: {datetime.now().strftime("%Y-%m-%d")}
|
|
---
|
|
|
|
# {self.topic}: Complete Guide
|
|
|
|
## Introduction
|
|
|
|
[Opening hook about {self.topic}...]
|
|
|
|
## What is {self.topic}?
|
|
|
|
[Definition and explanation...]
|
|
|
|
## Why {self.topic} Matters
|
|
|
|
[Importance and benefits...]
|
|
|
|
## How to Get Started with {self.topic}
|
|
|
|
[Step-by-step guide...]
|
|
|
|
## Best Practices for {self.topic}
|
|
|
|
[Tips and recommendations...]
|
|
|
|
## Conclusion
|
|
|
|
[Summary and call-to-action...]
|
|
"""
|
|
return markdown
|
|
|
|
def _save_results(self, results: Dict):
|
|
"""Save results to output directory"""
|
|
output_dir = os.path.join(self.output_base, self._slugify(self.topic))
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
output_file = os.path.join(output_dir, "results.json")
|
|
with open(output_file, "w", encoding="utf-8") as f:
|
|
json.dump(results, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\n✅ Results saved to: {output_file}")
|
|
|
|
def _slugify(self, text: str) -> str:
|
|
"""Convert text to URL-friendly slug"""
|
|
import re
|
|
|
|
slug = re.sub(r"[^\w\s-]", "", text.lower())
|
|
slug = re.sub(r"[-\s]+", "-", slug)
|
|
return slug.strip("-_")
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
parser = argparse.ArgumentParser(
|
|
description="Generate multi-channel marketing content from a single topic"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--topic", "-t", required=True, help="Topic to generate content about"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--channels",
|
|
"-c",
|
|
nargs="+",
|
|
default=["facebook", "facebook_ads", "google_ads", "blog", "x"],
|
|
choices=["facebook", "facebook_ads", "google_ads", "blog", "x", "twitter"],
|
|
help="Channels to generate content for",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--website-repo",
|
|
"-w",
|
|
help="Path to website repository (for blog auto-publish)",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--auto-publish", action="store_true", help="Auto-publish blog posts to website"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--language",
|
|
"-l",
|
|
choices=["th", "en"],
|
|
help="Content language (default: auto-detect)",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--product-name", "-p", help="Product name (for product image handling)"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Create generator
|
|
generator = ContentGenerator(
|
|
topic=args.topic,
|
|
channels=args.channels,
|
|
website_repo=args.website_repo,
|
|
auto_publish=args.auto_publish,
|
|
language=args.language,
|
|
)
|
|
|
|
# Generate content
|
|
results = generator.generate_all()
|
|
|
|
# Print summary
|
|
print("\n📊 Summary:")
|
|
print(f" Topic: {results['topic']}")
|
|
print(f" Channels generated: {len(results['channels'])}")
|
|
|
|
for channel, data in results["channels"].items():
|
|
print(f" - {channel}: {len(data['variations'])} variations")
|
|
|
|
print(f"\n✨ Done!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|