Files

654 lines
30 KiB
Python

"""
Campaign Creator Orchestrator
Main service that orchestrates campaign workflows and asset generation.
"""
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from loguru import logger
from services.image_studio import ImageStudioManager, CreateStudioRequest
from .prompt_builder import CampaignPromptBuilder
from services.product_marketing.brand_dna_sync import BrandDNASyncService
from .asset_audit import AssetAuditService
from .channel_pack import ChannelPackService
from services.database import SessionLocal
from services.subscription import PricingService
from services.subscription.preflight_validator import validate_image_generation_operations
@dataclass
class CampaignAssetNode:
"""Represents an asset node in the campaign graph."""
asset_id: str
asset_type: str # image, video, text, audio
channel: str
status: str # draft, generating, ready, approved
prompt: Optional[str] = None
template_id: Optional[str] = None
provider: Optional[str] = None
cost_estimate: Optional[float] = None
generated_asset_id: Optional[int] = None # Asset Library ID
@dataclass
class CampaignBlueprint:
"""Campaign blueprint with phases and asset nodes."""
campaign_id: str
campaign_name: str
goal: str
kpi: Optional[str] = None
phases: List[Dict[str, Any]] = None # teaser, launch, nurture
asset_nodes: List[CampaignAssetNode] = None
channels: List[str] = None
status: str = "draft" # draft, generating, ready, published
class CampaignOrchestrator:
"""Main orchestrator for Campaign Creator."""
def __init__(self):
"""Initialize Campaign Orchestrator."""
self.image_studio = ImageStudioManager()
self.prompt_builder = CampaignPromptBuilder()
self.brand_dna_sync = BrandDNASyncService()
self.asset_audit = AssetAuditService()
self.channel_pack = ChannelPackService()
self.logger = logger
logger.info("[Campaign Orchestrator] Initialized")
def create_campaign_blueprint(
self,
user_id: str,
campaign_data: Dict[str, Any]
) -> CampaignBlueprint:
"""
Create campaign blueprint from user input and onboarding data.
Args:
user_id: User ID
campaign_data: Campaign information (name, goal, channels, etc.)
Returns:
Campaign blueprint with asset nodes
"""
try:
import time
campaign_id = campaign_data.get('campaign_id') or f"campaign_{user_id}_{int(time.time())}"
campaign_name = campaign_data.get('campaign_name', 'New Campaign')
goal = campaign_data.get('goal', 'product_launch')
channels = campaign_data.get('channels', [])
# Get brand DNA for personalization
brand_dna = self.brand_dna_sync.get_brand_dna_tokens(user_id)
# Build campaign phases
phases = self._build_campaign_phases(goal, channels)
# Generate asset nodes for each phase and channel
asset_nodes = []
for phase in phases:
phase_name = phase.get('name')
for channel in channels:
# Determine required assets for this phase + channel
required_assets = self._get_required_assets(phase_name, channel)
for asset_type in required_assets:
asset_node = CampaignAssetNode(
asset_id=f"{campaign_id}_{phase_name}_{channel}_{asset_type}",
asset_type=asset_type,
channel=channel,
status="draft",
)
asset_nodes.append(asset_node)
blueprint = CampaignBlueprint(
campaign_id=campaign_id,
campaign_name=campaign_name,
goal=goal,
kpi=campaign_data.get('kpi'),
phases=phases,
asset_nodes=asset_nodes,
channels=channels,
status="draft",
)
logger.info(f"[Orchestrator] Created blueprint for campaign {campaign_id} with {len(asset_nodes)} assets")
return blueprint
except Exception as e:
logger.error(f"[Orchestrator] Error creating blueprint: {str(e)}")
raise
def generate_asset_proposals(
self,
user_id: str,
blueprint: CampaignBlueprint,
product_context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Generate AI proposals for each asset node in the blueprint.
Args:
user_id: User ID
blueprint: Campaign blueprint
product_context: Product information
Returns:
Dictionary with proposals for each asset node
"""
try:
proposals = {}
for asset_node in blueprint.asset_nodes:
# Build specialized prompt based on asset type and channel
if asset_node.asset_type == "image":
base_prompt = product_context.get('product_description', 'Product image') if product_context else 'Marketing image'
enhanced_prompt = self.prompt_builder.build_marketing_image_prompt(
base_prompt=base_prompt,
user_id=user_id,
channel=asset_node.channel,
asset_type="hero_image",
product_context=product_context,
)
# Get channel pack for template recommendations
channel_pack = self.channel_pack.get_channel_pack(asset_node.channel)
recommended_template = channel_pack.get('templates', [{}])[0] if channel_pack.get('templates') else None
# Estimate cost
cost_estimate = self._estimate_asset_cost("image", asset_node.channel)
proposals[asset_node.asset_id] = {
"asset_id": asset_node.asset_id,
"asset_type": asset_node.asset_type,
"channel": asset_node.channel,
"campaign_id": blueprint.campaign_id, # Include campaign_id for tracking
"proposed_prompt": enhanced_prompt,
"recommended_template": recommended_template.get('id') if recommended_template else None,
"recommended_provider": recommended_template.get('recommended_provider', 'wavespeed') if recommended_template else 'wavespeed',
"cost_estimate": cost_estimate,
"concept_summary": self._generate_concept_summary(enhanced_prompt),
}
elif asset_node.asset_type == "video":
# Video asset proposals - determine if animation (image-to-video) or demo (text-to-video)
# Default to animation if we have product image, otherwise demo
video_subtype = asset_proposal.get('video_subtype', 'animation') if 'asset_proposal' in locals() else 'demo'
# For demo videos (text-to-video), we need product description
if video_subtype == "demo" or not product_context or not product_context.get('product_image_base64'):
# Text-to-video demo video
video_type = "demo" # Default, can be customized
if asset_node.channel in ["tiktok", "instagram"]:
video_type = "storytelling" # Storytelling for social media
elif asset_node.channel in ["linkedin", "youtube"]:
video_type = "feature_highlight" # Feature highlights for professional
# Estimate cost for text-to-video (WAN 2.5: $0.05-$0.15/second)
duration = 10 # Default 10s for demo videos
resolution = "720p" # Default
cost_per_second = 0.10 if resolution == "720p" else (0.15 if resolution == "1080p" else 0.05)
cost_estimate = duration * cost_per_second
proposals[asset_node.asset_id] = {
"asset_id": asset_node.asset_id,
"asset_type": asset_node.asset_type,
"video_subtype": "demo", # Text-to-video
"channel": asset_node.channel,
"campaign_id": blueprint.campaign_id,
"video_type": video_type,
"duration": duration,
"resolution": resolution,
"cost_estimate": cost_estimate,
"concept_summary": f"Product {video_type} video optimized for {asset_node.channel}",
"note": "Text-to-video demo - requires product description",
}
else:
# Image-to-video animation
animation_type = "reveal" # Default
if asset_node.channel in ["tiktok", "instagram", "youtube"]:
animation_type = "demo" # Demo animations for social media
elif asset_node.channel in ["linkedin", "facebook"]:
animation_type = "reveal" # Professional reveal for B2B
# Estimate cost for image-to-video (WAN 2.5: $0.05-$0.15/second)
duration = 5 # Default 5s for animations
resolution = "720p" # Default
cost_per_second = 0.10 if resolution == "720p" else (0.15 if resolution == "1080p" else 0.05)
cost_estimate = duration * cost_per_second
proposals[asset_node.asset_id] = {
"asset_id": asset_node.asset_id,
"asset_type": asset_node.asset_type,
"video_subtype": "animation", # Image-to-video
"channel": asset_node.channel,
"campaign_id": blueprint.campaign_id,
"animation_type": animation_type,
"duration": duration,
"resolution": resolution,
"cost_estimate": cost_estimate,
"concept_summary": f"Product {animation_type} animation optimized for {asset_node.channel}",
"note": "Requires product image - will be provided during generation",
}
elif asset_node.asset_type == "text":
base_request = f"Write {asset_node.channel} {asset_node.asset_type} for product launch"
enhanced_prompt = self.prompt_builder.build_marketing_copy_prompt(
base_request=base_request,
user_id=user_id,
channel=asset_node.channel,
content_type="caption",
product_context=product_context,
)
proposals[asset_node.asset_id] = {
"asset_id": asset_node.asset_id,
"asset_type": asset_node.asset_type,
"channel": asset_node.channel,
"campaign_id": blueprint.campaign_id, # Include campaign_id for tracking
"proposed_prompt": enhanced_prompt,
"cost_estimate": 0.0, # Text generation cost is minimal
"concept_summary": "Marketing copy optimized for channel and persona",
}
logger.info(f"[Orchestrator] Generated {len(proposals)} asset proposals")
return {"proposals": proposals, "total_assets": len(proposals)}
except Exception as e:
logger.error(f"[Orchestrator] Error generating proposals: {str(e)}")
raise
async def generate_asset(
self,
user_id: str,
asset_proposal: Dict[str, Any],
product_context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Generate a single asset using Image Studio APIs.
Args:
user_id: User ID
asset_proposal: Asset proposal from generate_asset_proposals
product_context: Product information
Returns:
Generated asset result
"""
try:
asset_type = asset_proposal.get('asset_type')
if asset_type == "image":
# Build CreateStudioRequest
create_request = CreateStudioRequest(
prompt=asset_proposal.get('proposed_prompt'),
template_id=asset_proposal.get('recommended_template'),
provider=asset_proposal.get('recommended_provider', 'wavespeed'),
quality="premium",
enhance_prompt=True,
use_persona=True,
num_variations=1,
)
# Generate image using Image Studio
result = await self.image_studio.create_image(create_request, user_id=user_id)
# Asset is automatically tracked in Asset Library via Image Studio
return {
"success": True,
"asset_type": "image",
"result": result,
"asset_library_ids": [
r.get('asset_id') for r in result.get('results', [])
if r.get('asset_id')
],
}
elif asset_type == "video":
# Check video subtype: "animation" (image-to-video) or "demo" (text-to-video)
video_subtype = asset_proposal.get('video_subtype', 'animation')
if video_subtype == "demo":
# Text-to-video: Product demo video from description
from services.product_marketing.product_video_service import ProductVideoService, ProductVideoRequest
# Get product info from context
product_name = product_context.get('product_name', 'Product') if product_context else 'Product'
product_description = product_context.get('product_description', '') if product_context else ''
if not product_description:
raise ValueError("Product description required for text-to-video demo generation")
# Get brand context
brand_dna = self.brand_dna_sync.get_brand_dna_tokens(user_id)
brand_context = {
"visual_identity": brand_dna.get("visual_identity", {}),
"persona": brand_dna.get("persona", {}),
}
# Get video type from proposal or default
video_type = asset_proposal.get('video_type', 'demo')
# Create video service
video_service = ProductVideoService()
# Create video request
video_request = ProductVideoRequest(
product_name=product_name,
product_description=product_description,
video_type=video_type,
resolution=asset_proposal.get('resolution', '720p'),
duration=asset_proposal.get('duration', 10),
audio_base64=asset_proposal.get('audio_base64'),
brand_context=brand_context,
additional_context=asset_proposal.get('additional_context'),
)
# Generate video using unified ai_video_generate()
result = await video_service.generate_product_video(video_request, user_id)
# Extract campaign_id for metadata
campaign_id = asset_proposal.get('campaign_id')
asset_id = asset_proposal.get('asset_id', '')
return {
"success": True,
"asset_type": "video",
"video_subtype": "demo",
"video_url": result.get('file_url'),
"video_filename": result.get('filename'),
"cost": result.get('cost', 0.0),
"video_type": video_type,
"campaign_id": campaign_id,
"asset_id": asset_id,
}
else:
# Image-to-video: Product animation
from services.product_marketing.product_animation_service import ProductAnimationService, ProductAnimationRequest
# Get product image from proposal or product context
product_image_base64 = asset_proposal.get('product_image_base64')
if not product_image_base64 and product_context:
product_image_base64 = product_context.get('product_image_base64')
if not product_image_base64:
raise ValueError("Product image required for image-to-video animation generation")
# Get animation type from proposal or default to "reveal"
animation_type = asset_proposal.get('animation_type', 'reveal')
product_name = product_context.get('product_name', 'Product') if product_context else 'Product'
product_description = product_context.get('product_description') if product_context else None
# Get brand context
brand_dna = self.brand_dna_sync.get_brand_dna_tokens(user_id)
brand_context = {
"visual_identity": brand_dna.get("visual_identity", {}),
"persona": brand_dna.get("persona", {}),
}
# Create animation service
animation_service = ProductAnimationService()
# Create animation request
animation_request = ProductAnimationRequest(
product_image_base64=product_image_base64,
animation_type=animation_type,
product_name=product_name,
product_description=product_description,
resolution=asset_proposal.get('resolution', '720p'),
duration=asset_proposal.get('duration', 5),
audio_base64=asset_proposal.get('audio_base64'),
brand_context=brand_context,
additional_context=asset_proposal.get('additional_context'),
)
# Generate video
result = await animation_service.animate_product(animation_request, user_id)
# Extract campaign_id for metadata
campaign_id = asset_proposal.get('campaign_id')
asset_id = asset_proposal.get('asset_id', '')
return {
"success": True,
"asset_type": "video",
"video_subtype": "animation",
"video_url": result.get('video_url'),
"video_filename": result.get('filename'),
"cost": result.get('cost', 0.0),
"animation_type": animation_type,
"campaign_id": campaign_id,
"asset_id": asset_id,
}
elif asset_type == "text":
# Import text generation service and tracker
import asyncio
from services.llm_providers.main_text_generation import llm_text_gen
from utils.text_asset_tracker import save_and_track_text_content
from services.database import SessionLocal
# Get enhanced prompt from proposal
text_prompt = asset_proposal.get('proposed_prompt', '')
channel = asset_proposal.get('channel', 'social')
asset_id = asset_proposal.get('asset_id', '')
# Extract campaign_id - try from asset_proposal first, then from asset_id
# asset_id format: {campaign_id}_{phase}_{channel}_{type}
campaign_id = asset_proposal.get('campaign_id')
if not campaign_id and asset_id and '_' in asset_id:
# Try to extract: asset_id might be "campaign_user123_1234567890_teaser_instagram_text"
# We need to find where phase_name starts (common phases: teaser, launch, nurture)
parts = asset_id.split('_')
# Find phase indicator (usually one of: teaser, launch, nurture)
phase_indicators = ['teaser', 'launch', 'nurture', 'prelaunch', 'postlaunch']
phase_idx = None
for i, part in enumerate(parts):
if part.lower() in phase_indicators:
phase_idx = i
break
if phase_idx and phase_idx > 0:
# Campaign ID is everything before the phase
campaign_id = '_'.join(parts[:phase_idx])
# If still not found, use None (metadata will work without it)
if not campaign_id:
logger.warning(f"[Orchestrator] Could not extract campaign_id from asset_id: {asset_id}")
# Build system prompt for marketing copy
system_prompt = f"""You are an expert marketing copywriter specializing in {channel} content.
Generate compelling, on-brand marketing copy that:
- Is optimized for {channel} platform best practices
- Includes a clear call-to-action
- Uses appropriate tone and style for the platform
- Is concise and engaging
- Aligns with the product marketing context provided
Return only the final copy text without explanations or markdown formatting."""
# Run synchronous llm_text_gen in thread pool
logger.info(f"[Orchestrator] Generating text asset for channel: {channel}")
generated_text = await asyncio.to_thread(
llm_text_gen,
prompt=text_prompt,
system_prompt=system_prompt,
user_id=user_id
)
if not generated_text or not generated_text.strip():
raise ValueError("Text generation returned empty content")
# Save to Asset Library
db = SessionLocal()
asset_library_id = None
try:
asset_library_id = save_and_track_text_content(
db=db,
user_id=user_id,
content=generated_text.strip(),
source_module="campaign_creator",
title=f"{channel.title()} Copy: {asset_id.split('_')[-1] if '_' in asset_id else 'Marketing Copy'}",
description=f"Marketing copy for {channel} platform generated from campaign proposal",
prompt=text_prompt,
tags=["campaign_creator", channel.lower(), "text", "copy"],
asset_metadata={
"campaign_id": campaign_id,
"asset_id": asset_id,
"asset_type": "text",
"channel": channel,
"concept_summary": asset_proposal.get('concept_summary'),
},
subdirectory="campaigns",
file_extension=".txt"
)
if asset_library_id:
logger.info(f"[Orchestrator] ✅ Text asset saved to library: ID={asset_library_id}")
else:
logger.warning(f"[Orchestrator] ⚠️ Text asset tracking returned None")
except Exception as save_error:
logger.error(f"[Orchestrator] ⚠️ Failed to save text asset to library: {str(save_error)}")
# Continue even if save fails - text is still generated
finally:
db.close()
return {
"success": True,
"asset_type": "text",
"content": generated_text.strip(),
"asset_library_id": asset_library_id,
"channel": channel,
}
else:
raise ValueError(f"Unsupported asset type: {asset_type}")
except Exception as e:
logger.error(f"[Orchestrator] Error generating asset: {str(e)}")
raise
def validate_campaign_preflight(
self,
user_id: str,
blueprint: CampaignBlueprint
) -> Dict[str, Any]:
"""
Validate campaign blueprint against subscription limits before generation.
Args:
user_id: User ID
blueprint: Campaign blueprint
Returns:
Pre-flight validation results
"""
try:
db = SessionLocal()
try:
pricing_service = PricingService(db)
# Count operations needed
image_count = sum(1 for node in blueprint.asset_nodes if node.asset_type == "image")
text_count = sum(1 for node in blueprint.asset_nodes if node.asset_type == "text")
# Estimate total cost
total_cost = 0.0
for node in blueprint.asset_nodes:
if node.cost_estimate:
total_cost += node.cost_estimate
# Validate image generation limits
operations = []
if image_count > 0:
operations.append({
'provider': 'stability', # Default provider
'tokens_requested': 0,
'actual_provider_name': 'wavespeed',
'operation_type': 'image_generation',
})
can_proceed, message, error_details = pricing_service.check_comprehensive_limits(
user_id=user_id,
operations=operations * image_count if operations else []
)
return {
"can_proceed": can_proceed,
"message": message,
"error_details": error_details,
"summary": {
"total_assets": len(blueprint.asset_nodes),
"image_count": image_count,
"text_count": text_count,
"estimated_cost": total_cost,
},
}
finally:
db.close()
except Exception as e:
logger.error(f"[Orchestrator] Error in pre-flight validation: {str(e)}")
return {
"can_proceed": False,
"message": f"Validation error: {str(e)}",
"error_details": {},
}
def _build_campaign_phases(
self,
goal: str,
channels: List[str]
) -> List[Dict[str, Any]]:
"""Build campaign phases based on goal."""
if goal == "product_launch":
return [
{"name": "teaser", "duration_days": 7, "purpose": "Build anticipation"},
{"name": "launch", "duration_days": 3, "purpose": "Official launch"},
{"name": "nurture", "duration_days": 14, "purpose": "Sustain engagement"},
]
else:
return [
{"name": "campaign", "duration_days": 30, "purpose": "Campaign execution"},
]
def _get_required_assets(
self,
phase: str,
channel: str
) -> List[str]:
"""Get required asset types for phase and channel."""
# Default: image for all phases and channels
assets = ["image"]
# Add text/copy for social channels
if channel in ["instagram", "linkedin", "facebook", "twitter"]:
assets.append("text")
return assets
def _estimate_asset_cost(
self,
asset_type: str,
channel: str
) -> float:
"""Estimate cost for asset generation."""
if asset_type == "image":
# Premium quality image: ~5-6 credits
return 5.0
elif asset_type == "video":
# WAN 2.5 Image-to-Video: $0.05-$0.15/second
# Default: 5 seconds at 720p = $0.50
return 0.50
elif asset_type == "text":
return 0.0 # Text generation is typically included
else:
return 0.0
def _generate_concept_summary(self, prompt: str) -> str:
"""Generate a brief concept summary from prompt."""
# Simple extraction: take first 100 chars
return prompt[:100] + "..." if len(prompt) > 100 else prompt