479 lines
15 KiB
Python
479 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SEO Multi-Channel Content Generator
|
|
|
|
Generate marketing content for multiple channels from a single topic.
|
|
Supports Thai language with full PyThaiNLP integration.
|
|
|
|
Channels: Facebook > Facebook Ads > Google Ads > Blog > X (Twitter)
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any
|
|
import yaml
|
|
|
|
# Load environment variables
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
|
|
# Thai language processing
|
|
try:
|
|
from pythainlp import word_tokenize, sent_tokenize
|
|
from pythainlp.util import normalize
|
|
THAI_SUPPORT = True
|
|
except ImportError:
|
|
THAI_SUPPORT = False
|
|
print("Warning: PyThaiNLP not installed. Thai language support disabled.")
|
|
print("Install with: pip install pythainlp")
|
|
|
|
|
|
class ThaiTextProcessor:
|
|
"""Thai language text processing utilities"""
|
|
|
|
@staticmethod
|
|
def count_words(text: str) -> int:
|
|
"""Count Thai words (no spaces between words)"""
|
|
if not THAI_SUPPORT:
|
|
return len(text.split())
|
|
|
|
tokens = word_tokenize(text, engine="newmm")
|
|
return len([t for t in tokens if t.strip() and not t.isspace()])
|
|
|
|
@staticmethod
|
|
def count_sentences(text: str) -> int:
|
|
"""Count Thai sentences"""
|
|
if not THAI_SUPPORT:
|
|
return len(text.split('.'))
|
|
|
|
sentences = sent_tokenize(text, engine="whitespace")
|
|
return len(sentences)
|
|
|
|
@staticmethod
|
|
def calculate_keyword_density(text: str, keyword: str) -> float:
|
|
"""Calculate keyword density for Thai text"""
|
|
if not THAI_SUPPORT:
|
|
text_words = text.lower().split()
|
|
keyword_count = text.lower().count(keyword.lower())
|
|
return (keyword_count / len(text_words) * 100) if text_words else 0
|
|
|
|
text_normalized = normalize(text)
|
|
keyword_normalized = normalize(keyword)
|
|
count = text_normalized.count(keyword_normalized)
|
|
word_count = ThaiTextProcessor.count_words(text)
|
|
return (count / word_count * 100) if word_count > 0 else 0
|
|
|
|
@staticmethod
|
|
def detect_language(text: str) -> str:
|
|
"""Detect if content is Thai or English"""
|
|
thai_chars = sum(1 for c in text if '\u0E00' <= c <= '\u0E7F')
|
|
total_chars = len(text)
|
|
thai_ratio = thai_chars / total_chars if total_chars > 0 else 0
|
|
|
|
return 'th' if thai_ratio > 0.3 else 'en'
|
|
|
|
|
|
class ChannelTemplate:
|
|
"""Load and manage channel templates"""
|
|
|
|
def __init__(self, channel_name: str, templates_dir: str):
|
|
self.channel_name = channel_name
|
|
self.template_path = os.path.join(templates_dir, f"{channel_name}.yaml")
|
|
self.template = self._load_template()
|
|
|
|
def _load_template(self) -> Dict:
|
|
"""Load YAML template"""
|
|
with open(self.template_path, 'r', encoding='utf-8') as f:
|
|
return yaml.safe_load(f)
|
|
|
|
def get_specs(self) -> Dict:
|
|
"""Get channel specifications"""
|
|
return self.template.get('fields', {})
|
|
|
|
def get_quality_requirements(self) -> Dict:
|
|
"""Get quality requirements"""
|
|
return self.template.get('quality', {})
|
|
|
|
|
|
class ImageHandler:
|
|
"""Handle image generation and editing"""
|
|
|
|
def __init__(self, chutes_api_token: str):
|
|
self.chutes_token = chutes_api_token
|
|
self.output_base = "output"
|
|
|
|
def find_product_images(self, product_name: str, website_repo: str) -> List[str]:
|
|
"""Find existing product images in website repo"""
|
|
import glob
|
|
|
|
extensions = ['.jpg', '.jpeg', '.png', '.webp']
|
|
found_images = []
|
|
|
|
search_patterns = [
|
|
f"**/*{product_name}*{{ext}}" for ext in extensions
|
|
] + [
|
|
"public/images/**/*{ext}",
|
|
"src/assets/**/*{ext}"
|
|
]
|
|
|
|
for pattern in search_patterns:
|
|
matches = glob.glob(
|
|
os.path.join(website_repo, pattern.format(ext='*')),
|
|
recursive=True
|
|
)
|
|
# Try specific extensions
|
|
for ext in extensions:
|
|
specific_matches = glob.glob(
|
|
os.path.join(website_repo, pattern.format(ext=ext)),
|
|
recursive=True
|
|
)
|
|
found_images.extend(specific_matches)
|
|
|
|
return list(set(found_images))[:10]
|
|
|
|
def generate_image_for_channel(self, topic: str, channel: str, content_type: str) -> str:
|
|
"""
|
|
Generate image for content.
|
|
For product: browse repo first, then ask user or use image-edit
|
|
For non-product: generate fresh with image-generation
|
|
"""
|
|
# This would call the image-generation or image-edit skills
|
|
# For now, return placeholder
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
output_dir = os.path.join(
|
|
self.output_base,
|
|
self._slugify(topic),
|
|
channel,
|
|
"images"
|
|
)
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
image_path = os.path.join(output_dir, f"generated_{timestamp}.png")
|
|
|
|
# Placeholder - in real implementation, would call image-generation skill
|
|
print(f" [Image Generation] Would generate image for {channel}")
|
|
print(f" Topic: {topic}, Type: {content_type}")
|
|
|
|
return image_path
|
|
|
|
def _slugify(self, text: str) -> str:
|
|
"""Convert text to URL-friendly slug"""
|
|
import re
|
|
slug = re.sub(r'[^\w\s-]', '', text.lower())
|
|
slug = re.sub(r'[-\s]+', '-', slug)
|
|
return slug.strip('-_')
|
|
|
|
|
|
class ContentGenerator:
|
|
"""Main content generator class"""
|
|
|
|
def __init__(
|
|
self,
|
|
topic: str,
|
|
channels: List[str],
|
|
website_repo: Optional[str] = None,
|
|
auto_publish: bool = False,
|
|
language: Optional[str] = None
|
|
):
|
|
self.topic = topic
|
|
self.channels = channels
|
|
self.website_repo = website_repo
|
|
self.auto_publish = auto_publish
|
|
self.language = language
|
|
self.templates_dir = os.path.join(os.path.dirname(__file__), "templates")
|
|
self.output_base = "output"
|
|
|
|
# Initialize components
|
|
self.text_processor = ThaiTextProcessor()
|
|
self.image_handler = ImageHandler(os.getenv("CHUTES_API_TOKEN", ""))
|
|
|
|
# Load templates
|
|
self.templates = {}
|
|
for channel in channels:
|
|
template_name = self._get_template_name(channel)
|
|
if template_name:
|
|
self.templates[channel] = ChannelTemplate(template_name, self.templates_dir)
|
|
|
|
def _get_template_name(self, channel: str) -> Optional[str]:
|
|
"""Map channel name to template file"""
|
|
mapping = {
|
|
'facebook': 'facebook',
|
|
'facebook_ads': 'facebook_ads',
|
|
'google_ads': 'google_ads',
|
|
'blog': 'blog',
|
|
'x': 'x_thread',
|
|
'twitter': 'x_thread'
|
|
}
|
|
return mapping.get(channel.lower())
|
|
|
|
def generate_all(self) -> Dict[str, Any]:
|
|
"""Generate content for all channels"""
|
|
results = {
|
|
'topic': self.topic,
|
|
'generated_at': datetime.now().isoformat(),
|
|
'channels': {},
|
|
'summary': {}
|
|
}
|
|
|
|
print(f"\n🎯 Generating content for: {self.topic}")
|
|
print(f"📱 Channels: {', '.join(self.channels)}")
|
|
print(f"🌐 Language: {self.language or 'auto-detect'}\n")
|
|
|
|
for channel in self.channels:
|
|
if channel in self.templates:
|
|
print(f" Generating {channel}...")
|
|
channel_result = self._generate_for_channel(channel)
|
|
results['channels'][channel] = channel_result
|
|
|
|
# Save results
|
|
self._save_results(results)
|
|
|
|
return results
|
|
|
|
def _generate_for_channel(self, channel: str) -> Dict:
|
|
"""Generate content for specific channel"""
|
|
template = self.templates[channel]
|
|
specs = template.get_specs()
|
|
|
|
# Detect language from topic
|
|
lang = self.language or self.text_processor.detect_language(self.topic)
|
|
|
|
# Generate variations (placeholder - real implementation would use LLM)
|
|
variations = []
|
|
num_variations = template.template.get('output', {}).get('variations', 5)
|
|
|
|
for i in range(num_variations):
|
|
variation = self._create_variation(channel, i, lang, specs)
|
|
variations.append(variation)
|
|
|
|
return {
|
|
'channel': channel,
|
|
'language': lang,
|
|
'variations': variations,
|
|
'api_ready': template.template.get('api_ready', False)
|
|
}
|
|
|
|
def _create_variation(
|
|
self,
|
|
channel: str,
|
|
variation_num: int,
|
|
language: str,
|
|
specs: Dict
|
|
) -> Dict:
|
|
"""Create single content variation"""
|
|
# This is a placeholder - real implementation would call LLM
|
|
# with proper prompts based on channel template
|
|
|
|
base_variation = {
|
|
'id': f"{channel}_var_{variation_num + 1}",
|
|
'created_at': datetime.now().isoformat()
|
|
}
|
|
|
|
# Channel-specific structure
|
|
if channel == 'facebook':
|
|
base_variation.update({
|
|
'primary_text': f"[Facebook Post {variation_num + 1}] {self.topic}...",
|
|
'headline': f"[Headline] {self.topic}",
|
|
'cta': "เรียนรู้เพิ่มเติม" if language == 'th' else "Learn More",
|
|
'hashtags': [f"#{self.topic.replace(' ', '')}"],
|
|
'image': {
|
|
'path': self.image_handler.generate_image_for_channel(
|
|
self.topic, channel, 'social'
|
|
)
|
|
}
|
|
})
|
|
|
|
elif channel == 'facebook_ads':
|
|
base_variation.update({
|
|
'primary_text': f"[FB Ad Primary Text] {self.topic}...",
|
|
'headline': f"[FB Ad Headline - 40 chars]",
|
|
'description': f"[FB Ad Description - 90 chars]",
|
|
'cta': "SHOP_NOW",
|
|
'api_ready': {
|
|
'platform': 'meta',
|
|
'api_version': 'v18.0',
|
|
'endpoint': '/act_{ad_account_id}/adcreatives'
|
|
}
|
|
})
|
|
|
|
elif channel == 'google_ads':
|
|
base_variation.update({
|
|
'headlines': [
|
|
{'text': f"[Headline {i+1}] {self.topic}"}
|
|
for i in range(15)
|
|
],
|
|
'descriptions': [
|
|
{'text': f"[Description {i+1}] Learn more about {self.topic}"}
|
|
for i in range(4)
|
|
],
|
|
'keywords': [self.topic, f"บริการ {self.topic}"],
|
|
'api_ready': {
|
|
'platform': 'google',
|
|
'api_version': 'v15.0',
|
|
'endpoint': '/google.ads.googleads.v15.services/GoogleAdsService:Mutate'
|
|
}
|
|
})
|
|
|
|
elif channel == 'blog':
|
|
base_variation.update({
|
|
'markdown': self._generate_blog_markdown(language),
|
|
'frontmatter': {
|
|
'title': f"{self.topic} - Complete Guide",
|
|
'description': f"Learn about {self.topic}",
|
|
'slug': self._slugify(self.topic),
|
|
'lang': language
|
|
},
|
|
'word_count': 2000 if language == 'en' else 1500,
|
|
'publish_status': 'draft'
|
|
})
|
|
|
|
elif channel in ['x', 'twitter']:
|
|
base_variation.update({
|
|
'tweets': [
|
|
f"[Tweet {i+1}/7] Content about {self.topic}..."
|
|
for i in range(7)
|
|
],
|
|
'thread_title': f"Everything about {self.topic} 🧵"
|
|
})
|
|
|
|
return base_variation
|
|
|
|
def _generate_blog_markdown(self, language: str) -> str:
|
|
"""Generate blog post in Markdown format"""
|
|
slug = self._slugify(self.topic)
|
|
|
|
markdown = f"""---
|
|
title: "{self.topic} - Complete Guide"
|
|
description: "Learn everything about {self.topic} in this comprehensive guide"
|
|
keywords: ["{self.topic}", "บริการ {self.topic}", "guide"]
|
|
slug: {slug}
|
|
lang: {language}
|
|
category: guides
|
|
tags: ["{self.topic}", "guide"]
|
|
created: {datetime.now().strftime('%Y-%m-%d')}
|
|
---
|
|
|
|
# {self.topic}: Complete Guide
|
|
|
|
## Introduction
|
|
|
|
[Opening hook about {self.topic}...]
|
|
|
|
## What is {self.topic}?
|
|
|
|
[Definition and explanation...]
|
|
|
|
## Why {self.topic} Matters
|
|
|
|
[Importance and benefits...]
|
|
|
|
## How to Get Started with {self.topic}
|
|
|
|
[Step-by-step guide...]
|
|
|
|
## Best Practices for {self.topic}
|
|
|
|
[Tips and recommendations...]
|
|
|
|
## Conclusion
|
|
|
|
[Summary and call-to-action...]
|
|
"""
|
|
return markdown
|
|
|
|
def _save_results(self, results: Dict):
|
|
"""Save results to output directory"""
|
|
output_dir = os.path.join(
|
|
self.output_base,
|
|
self._slugify(self.topic)
|
|
)
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
output_file = os.path.join(output_dir, "results.json")
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(results, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\n✅ Results saved to: {output_file}")
|
|
|
|
def _slugify(self, text: str) -> str:
|
|
"""Convert text to URL-friendly slug"""
|
|
import re
|
|
slug = re.sub(r'[^\w\s-]', '', text.lower())
|
|
slug = re.sub(r'[-\s]+', '-', slug)
|
|
return slug.strip('-_')
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
parser = argparse.ArgumentParser(
|
|
description='Generate multi-channel marketing content from a single topic'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--topic', '-t',
|
|
required=True,
|
|
help='Topic to generate content about'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--channels', '-c',
|
|
nargs='+',
|
|
default=['facebook', 'facebook_ads', 'google_ads', 'blog', 'x'],
|
|
choices=['facebook', 'facebook_ads', 'google_ads', 'blog', 'x', 'twitter'],
|
|
help='Channels to generate content for'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--website-repo', '-w',
|
|
help='Path to website repository (for blog auto-publish)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--auto-publish',
|
|
action='store_true',
|
|
help='Auto-publish blog posts to website'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--language', '-l',
|
|
choices=['th', 'en'],
|
|
help='Content language (default: auto-detect)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--product-name', '-p',
|
|
help='Product name (for product image handling)'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Create generator
|
|
generator = ContentGenerator(
|
|
topic=args.topic,
|
|
channels=args.channels,
|
|
website_repo=args.website_repo,
|
|
auto_publish=args.auto_publish,
|
|
language=args.language
|
|
)
|
|
|
|
# Generate content
|
|
results = generator.generate_all()
|
|
|
|
# Print summary
|
|
print("\n📊 Summary:")
|
|
print(f" Topic: {results['topic']}")
|
|
print(f" Channels generated: {len(results['channels'])}")
|
|
|
|
for channel, data in results['channels'].items():
|
|
print(f" - {channel}: {len(data['variations'])} variations")
|
|
|
|
print(f"\n✨ Done!")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|