AI Analysis and Content Strategy fixes. Enhanced Strategy Routes refactoring.
This commit is contained in:
653
backend/services/campaign_creator/orchestrator.py
Normal file
653
backend/services/campaign_creator/orchestrator.py
Normal file
@@ -0,0 +1,653 @@
|
||||
"""
|
||||
Campaign Creator Orchestrator
|
||||
Main service that orchestrates campaign workflows and asset generation.
|
||||
"""
|
||||
|
||||
from typing import Dict, Any, List, Optional
|
||||
from dataclasses import dataclass
|
||||
from loguru import logger
|
||||
|
||||
from services.image_studio import ImageStudioManager, CreateStudioRequest
|
||||
from .prompt_builder import CampaignPromptBuilder
|
||||
from services.product_marketing.brand_dna_sync import BrandDNASyncService
|
||||
from .asset_audit import AssetAuditService
|
||||
from .channel_pack import ChannelPackService
|
||||
from services.database import SessionLocal
|
||||
from services.subscription import PricingService
|
||||
from services.subscription.preflight_validator import validate_image_generation_operations
|
||||
|
||||
|
||||
@dataclass
|
||||
class CampaignAssetNode:
|
||||
"""Represents an asset node in the campaign graph."""
|
||||
asset_id: str
|
||||
asset_type: str # image, video, text, audio
|
||||
channel: str
|
||||
status: str # draft, generating, ready, approved
|
||||
prompt: Optional[str] = None
|
||||
template_id: Optional[str] = None
|
||||
provider: Optional[str] = None
|
||||
cost_estimate: Optional[float] = None
|
||||
generated_asset_id: Optional[int] = None # Asset Library ID
|
||||
|
||||
|
||||
@dataclass
|
||||
class CampaignBlueprint:
|
||||
"""Campaign blueprint with phases and asset nodes."""
|
||||
campaign_id: str
|
||||
campaign_name: str
|
||||
goal: str
|
||||
kpi: Optional[str] = None
|
||||
phases: List[Dict[str, Any]] = None # teaser, launch, nurture
|
||||
asset_nodes: List[CampaignAssetNode] = None
|
||||
channels: List[str] = None
|
||||
status: str = "draft" # draft, generating, ready, published
|
||||
|
||||
|
||||
class CampaignOrchestrator:
|
||||
"""Main orchestrator for Campaign Creator."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize Campaign Orchestrator."""
|
||||
self.image_studio = ImageStudioManager()
|
||||
self.prompt_builder = CampaignPromptBuilder()
|
||||
self.brand_dna_sync = BrandDNASyncService()
|
||||
self.asset_audit = AssetAuditService()
|
||||
self.channel_pack = ChannelPackService()
|
||||
self.logger = logger
|
||||
logger.info("[Campaign Orchestrator] Initialized")
|
||||
|
||||
def create_campaign_blueprint(
|
||||
self,
|
||||
user_id: str,
|
||||
campaign_data: Dict[str, Any]
|
||||
) -> CampaignBlueprint:
|
||||
"""
|
||||
Create campaign blueprint from user input and onboarding data.
|
||||
|
||||
Args:
|
||||
user_id: User ID
|
||||
campaign_data: Campaign information (name, goal, channels, etc.)
|
||||
|
||||
Returns:
|
||||
Campaign blueprint with asset nodes
|
||||
"""
|
||||
try:
|
||||
import time
|
||||
campaign_id = campaign_data.get('campaign_id') or f"campaign_{user_id}_{int(time.time())}"
|
||||
campaign_name = campaign_data.get('campaign_name', 'New Campaign')
|
||||
goal = campaign_data.get('goal', 'product_launch')
|
||||
channels = campaign_data.get('channels', [])
|
||||
|
||||
# Get brand DNA for personalization
|
||||
brand_dna = self.brand_dna_sync.get_brand_dna_tokens(user_id)
|
||||
|
||||
# Build campaign phases
|
||||
phases = self._build_campaign_phases(goal, channels)
|
||||
|
||||
# Generate asset nodes for each phase and channel
|
||||
asset_nodes = []
|
||||
for phase in phases:
|
||||
phase_name = phase.get('name')
|
||||
for channel in channels:
|
||||
# Determine required assets for this phase + channel
|
||||
required_assets = self._get_required_assets(phase_name, channel)
|
||||
|
||||
for asset_type in required_assets:
|
||||
asset_node = CampaignAssetNode(
|
||||
asset_id=f"{campaign_id}_{phase_name}_{channel}_{asset_type}",
|
||||
asset_type=asset_type,
|
||||
channel=channel,
|
||||
status="draft",
|
||||
)
|
||||
asset_nodes.append(asset_node)
|
||||
|
||||
blueprint = CampaignBlueprint(
|
||||
campaign_id=campaign_id,
|
||||
campaign_name=campaign_name,
|
||||
goal=goal,
|
||||
kpi=campaign_data.get('kpi'),
|
||||
phases=phases,
|
||||
asset_nodes=asset_nodes,
|
||||
channels=channels,
|
||||
status="draft",
|
||||
)
|
||||
|
||||
logger.info(f"[Orchestrator] Created blueprint for campaign {campaign_id} with {len(asset_nodes)} assets")
|
||||
return blueprint
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[Orchestrator] Error creating blueprint: {str(e)}")
|
||||
raise
|
||||
|
||||
def generate_asset_proposals(
|
||||
self,
|
||||
user_id: str,
|
||||
blueprint: CampaignBlueprint,
|
||||
product_context: Optional[Dict[str, Any]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Generate AI proposals for each asset node in the blueprint.
|
||||
|
||||
Args:
|
||||
user_id: User ID
|
||||
blueprint: Campaign blueprint
|
||||
product_context: Product information
|
||||
|
||||
Returns:
|
||||
Dictionary with proposals for each asset node
|
||||
"""
|
||||
try:
|
||||
proposals = {}
|
||||
|
||||
for asset_node in blueprint.asset_nodes:
|
||||
# Build specialized prompt based on asset type and channel
|
||||
if asset_node.asset_type == "image":
|
||||
base_prompt = product_context.get('product_description', 'Product image') if product_context else 'Marketing image'
|
||||
enhanced_prompt = self.prompt_builder.build_marketing_image_prompt(
|
||||
base_prompt=base_prompt,
|
||||
user_id=user_id,
|
||||
channel=asset_node.channel,
|
||||
asset_type="hero_image",
|
||||
product_context=product_context,
|
||||
)
|
||||
|
||||
# Get channel pack for template recommendations
|
||||
channel_pack = self.channel_pack.get_channel_pack(asset_node.channel)
|
||||
recommended_template = channel_pack.get('templates', [{}])[0] if channel_pack.get('templates') else None
|
||||
|
||||
# Estimate cost
|
||||
cost_estimate = self._estimate_asset_cost("image", asset_node.channel)
|
||||
|
||||
proposals[asset_node.asset_id] = {
|
||||
"asset_id": asset_node.asset_id,
|
||||
"asset_type": asset_node.asset_type,
|
||||
"channel": asset_node.channel,
|
||||
"campaign_id": blueprint.campaign_id, # Include campaign_id for tracking
|
||||
"proposed_prompt": enhanced_prompt,
|
||||
"recommended_template": recommended_template.get('id') if recommended_template else None,
|
||||
"recommended_provider": recommended_template.get('recommended_provider', 'wavespeed') if recommended_template else 'wavespeed',
|
||||
"cost_estimate": cost_estimate,
|
||||
"concept_summary": self._generate_concept_summary(enhanced_prompt),
|
||||
}
|
||||
|
||||
elif asset_node.asset_type == "video":
|
||||
# Video asset proposals - determine if animation (image-to-video) or demo (text-to-video)
|
||||
# Default to animation if we have product image, otherwise demo
|
||||
video_subtype = asset_proposal.get('video_subtype', 'animation') if 'asset_proposal' in locals() else 'demo'
|
||||
|
||||
# For demo videos (text-to-video), we need product description
|
||||
if video_subtype == "demo" or not product_context or not product_context.get('product_image_base64'):
|
||||
# Text-to-video demo video
|
||||
video_type = "demo" # Default, can be customized
|
||||
if asset_node.channel in ["tiktok", "instagram"]:
|
||||
video_type = "storytelling" # Storytelling for social media
|
||||
elif asset_node.channel in ["linkedin", "youtube"]:
|
||||
video_type = "feature_highlight" # Feature highlights for professional
|
||||
|
||||
# Estimate cost for text-to-video (WAN 2.5: $0.05-$0.15/second)
|
||||
duration = 10 # Default 10s for demo videos
|
||||
resolution = "720p" # Default
|
||||
cost_per_second = 0.10 if resolution == "720p" else (0.15 if resolution == "1080p" else 0.05)
|
||||
cost_estimate = duration * cost_per_second
|
||||
|
||||
proposals[asset_node.asset_id] = {
|
||||
"asset_id": asset_node.asset_id,
|
||||
"asset_type": asset_node.asset_type,
|
||||
"video_subtype": "demo", # Text-to-video
|
||||
"channel": asset_node.channel,
|
||||
"campaign_id": blueprint.campaign_id,
|
||||
"video_type": video_type,
|
||||
"duration": duration,
|
||||
"resolution": resolution,
|
||||
"cost_estimate": cost_estimate,
|
||||
"concept_summary": f"Product {video_type} video optimized for {asset_node.channel}",
|
||||
"note": "Text-to-video demo - requires product description",
|
||||
}
|
||||
else:
|
||||
# Image-to-video animation
|
||||
animation_type = "reveal" # Default
|
||||
if asset_node.channel in ["tiktok", "instagram", "youtube"]:
|
||||
animation_type = "demo" # Demo animations for social media
|
||||
elif asset_node.channel in ["linkedin", "facebook"]:
|
||||
animation_type = "reveal" # Professional reveal for B2B
|
||||
|
||||
# Estimate cost for image-to-video (WAN 2.5: $0.05-$0.15/second)
|
||||
duration = 5 # Default 5s for animations
|
||||
resolution = "720p" # Default
|
||||
cost_per_second = 0.10 if resolution == "720p" else (0.15 if resolution == "1080p" else 0.05)
|
||||
cost_estimate = duration * cost_per_second
|
||||
|
||||
proposals[asset_node.asset_id] = {
|
||||
"asset_id": asset_node.asset_id,
|
||||
"asset_type": asset_node.asset_type,
|
||||
"video_subtype": "animation", # Image-to-video
|
||||
"channel": asset_node.channel,
|
||||
"campaign_id": blueprint.campaign_id,
|
||||
"animation_type": animation_type,
|
||||
"duration": duration,
|
||||
"resolution": resolution,
|
||||
"cost_estimate": cost_estimate,
|
||||
"concept_summary": f"Product {animation_type} animation optimized for {asset_node.channel}",
|
||||
"note": "Requires product image - will be provided during generation",
|
||||
}
|
||||
|
||||
elif asset_node.asset_type == "text":
|
||||
base_request = f"Write {asset_node.channel} {asset_node.asset_type} for product launch"
|
||||
enhanced_prompt = self.prompt_builder.build_marketing_copy_prompt(
|
||||
base_request=base_request,
|
||||
user_id=user_id,
|
||||
channel=asset_node.channel,
|
||||
content_type="caption",
|
||||
product_context=product_context,
|
||||
)
|
||||
|
||||
proposals[asset_node.asset_id] = {
|
||||
"asset_id": asset_node.asset_id,
|
||||
"asset_type": asset_node.asset_type,
|
||||
"channel": asset_node.channel,
|
||||
"campaign_id": blueprint.campaign_id, # Include campaign_id for tracking
|
||||
"proposed_prompt": enhanced_prompt,
|
||||
"cost_estimate": 0.0, # Text generation cost is minimal
|
||||
"concept_summary": "Marketing copy optimized for channel and persona",
|
||||
}
|
||||
|
||||
logger.info(f"[Orchestrator] Generated {len(proposals)} asset proposals")
|
||||
return {"proposals": proposals, "total_assets": len(proposals)}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[Orchestrator] Error generating proposals: {str(e)}")
|
||||
raise
|
||||
|
||||
async def generate_asset(
|
||||
self,
|
||||
user_id: str,
|
||||
asset_proposal: Dict[str, Any],
|
||||
product_context: Optional[Dict[str, Any]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Generate a single asset using Image Studio APIs.
|
||||
|
||||
Args:
|
||||
user_id: User ID
|
||||
asset_proposal: Asset proposal from generate_asset_proposals
|
||||
product_context: Product information
|
||||
|
||||
Returns:
|
||||
Generated asset result
|
||||
"""
|
||||
try:
|
||||
asset_type = asset_proposal.get('asset_type')
|
||||
|
||||
if asset_type == "image":
|
||||
# Build CreateStudioRequest
|
||||
create_request = CreateStudioRequest(
|
||||
prompt=asset_proposal.get('proposed_prompt'),
|
||||
template_id=asset_proposal.get('recommended_template'),
|
||||
provider=asset_proposal.get('recommended_provider', 'wavespeed'),
|
||||
quality="premium",
|
||||
enhance_prompt=True,
|
||||
use_persona=True,
|
||||
num_variations=1,
|
||||
)
|
||||
|
||||
# Generate image using Image Studio
|
||||
result = await self.image_studio.create_image(create_request, user_id=user_id)
|
||||
|
||||
# Asset is automatically tracked in Asset Library via Image Studio
|
||||
return {
|
||||
"success": True,
|
||||
"asset_type": "image",
|
||||
"result": result,
|
||||
"asset_library_ids": [
|
||||
r.get('asset_id') for r in result.get('results', [])
|
||||
if r.get('asset_id')
|
||||
],
|
||||
}
|
||||
|
||||
elif asset_type == "video":
|
||||
# Check video subtype: "animation" (image-to-video) or "demo" (text-to-video)
|
||||
video_subtype = asset_proposal.get('video_subtype', 'animation')
|
||||
|
||||
if video_subtype == "demo":
|
||||
# Text-to-video: Product demo video from description
|
||||
from services.product_marketing.product_video_service import ProductVideoService, ProductVideoRequest
|
||||
|
||||
# Get product info from context
|
||||
product_name = product_context.get('product_name', 'Product') if product_context else 'Product'
|
||||
product_description = product_context.get('product_description', '') if product_context else ''
|
||||
|
||||
if not product_description:
|
||||
raise ValueError("Product description required for text-to-video demo generation")
|
||||
|
||||
# Get brand context
|
||||
brand_dna = self.brand_dna_sync.get_brand_dna_tokens(user_id)
|
||||
brand_context = {
|
||||
"visual_identity": brand_dna.get("visual_identity", {}),
|
||||
"persona": brand_dna.get("persona", {}),
|
||||
}
|
||||
|
||||
# Get video type from proposal or default
|
||||
video_type = asset_proposal.get('video_type', 'demo')
|
||||
|
||||
# Create video service
|
||||
video_service = ProductVideoService()
|
||||
|
||||
# Create video request
|
||||
video_request = ProductVideoRequest(
|
||||
product_name=product_name,
|
||||
product_description=product_description,
|
||||
video_type=video_type,
|
||||
resolution=asset_proposal.get('resolution', '720p'),
|
||||
duration=asset_proposal.get('duration', 10),
|
||||
audio_base64=asset_proposal.get('audio_base64'),
|
||||
brand_context=brand_context,
|
||||
additional_context=asset_proposal.get('additional_context'),
|
||||
)
|
||||
|
||||
# Generate video using unified ai_video_generate()
|
||||
result = await video_service.generate_product_video(video_request, user_id)
|
||||
|
||||
# Extract campaign_id for metadata
|
||||
campaign_id = asset_proposal.get('campaign_id')
|
||||
asset_id = asset_proposal.get('asset_id', '')
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"asset_type": "video",
|
||||
"video_subtype": "demo",
|
||||
"video_url": result.get('file_url'),
|
||||
"video_filename": result.get('filename'),
|
||||
"cost": result.get('cost', 0.0),
|
||||
"video_type": video_type,
|
||||
"campaign_id": campaign_id,
|
||||
"asset_id": asset_id,
|
||||
}
|
||||
|
||||
else:
|
||||
# Image-to-video: Product animation
|
||||
from services.product_marketing.product_animation_service import ProductAnimationService, ProductAnimationRequest
|
||||
|
||||
# Get product image from proposal or product context
|
||||
product_image_base64 = asset_proposal.get('product_image_base64')
|
||||
if not product_image_base64 and product_context:
|
||||
product_image_base64 = product_context.get('product_image_base64')
|
||||
|
||||
if not product_image_base64:
|
||||
raise ValueError("Product image required for image-to-video animation generation")
|
||||
|
||||
# Get animation type from proposal or default to "reveal"
|
||||
animation_type = asset_proposal.get('animation_type', 'reveal')
|
||||
product_name = product_context.get('product_name', 'Product') if product_context else 'Product'
|
||||
product_description = product_context.get('product_description') if product_context else None
|
||||
|
||||
# Get brand context
|
||||
brand_dna = self.brand_dna_sync.get_brand_dna_tokens(user_id)
|
||||
brand_context = {
|
||||
"visual_identity": brand_dna.get("visual_identity", {}),
|
||||
"persona": brand_dna.get("persona", {}),
|
||||
}
|
||||
|
||||
# Create animation service
|
||||
animation_service = ProductAnimationService()
|
||||
|
||||
# Create animation request
|
||||
animation_request = ProductAnimationRequest(
|
||||
product_image_base64=product_image_base64,
|
||||
animation_type=animation_type,
|
||||
product_name=product_name,
|
||||
product_description=product_description,
|
||||
resolution=asset_proposal.get('resolution', '720p'),
|
||||
duration=asset_proposal.get('duration', 5),
|
||||
audio_base64=asset_proposal.get('audio_base64'),
|
||||
brand_context=brand_context,
|
||||
additional_context=asset_proposal.get('additional_context'),
|
||||
)
|
||||
|
||||
# Generate video
|
||||
result = await animation_service.animate_product(animation_request, user_id)
|
||||
|
||||
# Extract campaign_id for metadata
|
||||
campaign_id = asset_proposal.get('campaign_id')
|
||||
asset_id = asset_proposal.get('asset_id', '')
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"asset_type": "video",
|
||||
"video_subtype": "animation",
|
||||
"video_url": result.get('video_url'),
|
||||
"video_filename": result.get('filename'),
|
||||
"cost": result.get('cost', 0.0),
|
||||
"animation_type": animation_type,
|
||||
"campaign_id": campaign_id,
|
||||
"asset_id": asset_id,
|
||||
}
|
||||
|
||||
elif asset_type == "text":
|
||||
# Import text generation service and tracker
|
||||
import asyncio
|
||||
from services.llm_providers.main_text_generation import llm_text_gen
|
||||
from utils.text_asset_tracker import save_and_track_text_content
|
||||
from services.database import SessionLocal
|
||||
|
||||
# Get enhanced prompt from proposal
|
||||
text_prompt = asset_proposal.get('proposed_prompt', '')
|
||||
channel = asset_proposal.get('channel', 'social')
|
||||
asset_id = asset_proposal.get('asset_id', '')
|
||||
|
||||
# Extract campaign_id - try from asset_proposal first, then from asset_id
|
||||
# asset_id format: {campaign_id}_{phase}_{channel}_{type}
|
||||
campaign_id = asset_proposal.get('campaign_id')
|
||||
if not campaign_id and asset_id and '_' in asset_id:
|
||||
# Try to extract: asset_id might be "campaign_user123_1234567890_teaser_instagram_text"
|
||||
# We need to find where phase_name starts (common phases: teaser, launch, nurture)
|
||||
parts = asset_id.split('_')
|
||||
# Find phase indicator (usually one of: teaser, launch, nurture)
|
||||
phase_indicators = ['teaser', 'launch', 'nurture', 'prelaunch', 'postlaunch']
|
||||
phase_idx = None
|
||||
for i, part in enumerate(parts):
|
||||
if part.lower() in phase_indicators:
|
||||
phase_idx = i
|
||||
break
|
||||
if phase_idx and phase_idx > 0:
|
||||
# Campaign ID is everything before the phase
|
||||
campaign_id = '_'.join(parts[:phase_idx])
|
||||
|
||||
# If still not found, use None (metadata will work without it)
|
||||
if not campaign_id:
|
||||
logger.warning(f"[Orchestrator] Could not extract campaign_id from asset_id: {asset_id}")
|
||||
|
||||
# Build system prompt for marketing copy
|
||||
system_prompt = f"""You are an expert marketing copywriter specializing in {channel} content.
|
||||
Generate compelling, on-brand marketing copy that:
|
||||
- Is optimized for {channel} platform best practices
|
||||
- Includes a clear call-to-action
|
||||
- Uses appropriate tone and style for the platform
|
||||
- Is concise and engaging
|
||||
- Aligns with the product marketing context provided
|
||||
|
||||
Return only the final copy text without explanations or markdown formatting."""
|
||||
|
||||
# Run synchronous llm_text_gen in thread pool
|
||||
logger.info(f"[Orchestrator] Generating text asset for channel: {channel}")
|
||||
generated_text = await asyncio.to_thread(
|
||||
llm_text_gen,
|
||||
prompt=text_prompt,
|
||||
system_prompt=system_prompt,
|
||||
user_id=user_id
|
||||
)
|
||||
|
||||
if not generated_text or not generated_text.strip():
|
||||
raise ValueError("Text generation returned empty content")
|
||||
|
||||
# Save to Asset Library
|
||||
db = SessionLocal()
|
||||
asset_library_id = None
|
||||
try:
|
||||
asset_library_id = save_and_track_text_content(
|
||||
db=db,
|
||||
user_id=user_id,
|
||||
content=generated_text.strip(),
|
||||
source_module="campaign_creator",
|
||||
title=f"{channel.title()} Copy: {asset_id.split('_')[-1] if '_' in asset_id else 'Marketing Copy'}",
|
||||
description=f"Marketing copy for {channel} platform generated from campaign proposal",
|
||||
prompt=text_prompt,
|
||||
tags=["campaign_creator", channel.lower(), "text", "copy"],
|
||||
asset_metadata={
|
||||
"campaign_id": campaign_id,
|
||||
"asset_id": asset_id,
|
||||
"asset_type": "text",
|
||||
"channel": channel,
|
||||
"concept_summary": asset_proposal.get('concept_summary'),
|
||||
},
|
||||
subdirectory="campaigns",
|
||||
file_extension=".txt"
|
||||
)
|
||||
|
||||
if asset_library_id:
|
||||
logger.info(f"[Orchestrator] ✅ Text asset saved to library: ID={asset_library_id}")
|
||||
else:
|
||||
logger.warning(f"[Orchestrator] ⚠️ Text asset tracking returned None")
|
||||
|
||||
except Exception as save_error:
|
||||
logger.error(f"[Orchestrator] ⚠️ Failed to save text asset to library: {str(save_error)}")
|
||||
# Continue even if save fails - text is still generated
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"asset_type": "text",
|
||||
"content": generated_text.strip(),
|
||||
"asset_library_id": asset_library_id,
|
||||
"channel": channel,
|
||||
}
|
||||
|
||||
else:
|
||||
raise ValueError(f"Unsupported asset type: {asset_type}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[Orchestrator] Error generating asset: {str(e)}")
|
||||
raise
|
||||
|
||||
def validate_campaign_preflight(
|
||||
self,
|
||||
user_id: str,
|
||||
blueprint: CampaignBlueprint
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Validate campaign blueprint against subscription limits before generation.
|
||||
|
||||
Args:
|
||||
user_id: User ID
|
||||
blueprint: Campaign blueprint
|
||||
|
||||
Returns:
|
||||
Pre-flight validation results
|
||||
"""
|
||||
try:
|
||||
db = SessionLocal()
|
||||
try:
|
||||
pricing_service = PricingService(db)
|
||||
|
||||
# Count operations needed
|
||||
image_count = sum(1 for node in blueprint.asset_nodes if node.asset_type == "image")
|
||||
text_count = sum(1 for node in blueprint.asset_nodes if node.asset_type == "text")
|
||||
|
||||
# Estimate total cost
|
||||
total_cost = 0.0
|
||||
for node in blueprint.asset_nodes:
|
||||
if node.cost_estimate:
|
||||
total_cost += node.cost_estimate
|
||||
|
||||
# Validate image generation limits
|
||||
operations = []
|
||||
if image_count > 0:
|
||||
operations.append({
|
||||
'provider': 'stability', # Default provider
|
||||
'tokens_requested': 0,
|
||||
'actual_provider_name': 'wavespeed',
|
||||
'operation_type': 'image_generation',
|
||||
})
|
||||
|
||||
can_proceed, message, error_details = pricing_service.check_comprehensive_limits(
|
||||
user_id=user_id,
|
||||
operations=operations * image_count if operations else []
|
||||
)
|
||||
|
||||
return {
|
||||
"can_proceed": can_proceed,
|
||||
"message": message,
|
||||
"error_details": error_details,
|
||||
"summary": {
|
||||
"total_assets": len(blueprint.asset_nodes),
|
||||
"image_count": image_count,
|
||||
"text_count": text_count,
|
||||
"estimated_cost": total_cost,
|
||||
},
|
||||
}
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[Orchestrator] Error in pre-flight validation: {str(e)}")
|
||||
return {
|
||||
"can_proceed": False,
|
||||
"message": f"Validation error: {str(e)}",
|
||||
"error_details": {},
|
||||
}
|
||||
|
||||
def _build_campaign_phases(
|
||||
self,
|
||||
goal: str,
|
||||
channels: List[str]
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Build campaign phases based on goal."""
|
||||
if goal == "product_launch":
|
||||
return [
|
||||
{"name": "teaser", "duration_days": 7, "purpose": "Build anticipation"},
|
||||
{"name": "launch", "duration_days": 3, "purpose": "Official launch"},
|
||||
{"name": "nurture", "duration_days": 14, "purpose": "Sustain engagement"},
|
||||
]
|
||||
else:
|
||||
return [
|
||||
{"name": "campaign", "duration_days": 30, "purpose": "Campaign execution"},
|
||||
]
|
||||
|
||||
def _get_required_assets(
|
||||
self,
|
||||
phase: str,
|
||||
channel: str
|
||||
) -> List[str]:
|
||||
"""Get required asset types for phase and channel."""
|
||||
# Default: image for all phases and channels
|
||||
assets = ["image"]
|
||||
|
||||
# Add text/copy for social channels
|
||||
if channel in ["instagram", "linkedin", "facebook", "twitter"]:
|
||||
assets.append("text")
|
||||
|
||||
return assets
|
||||
|
||||
def _estimate_asset_cost(
|
||||
self,
|
||||
asset_type: str,
|
||||
channel: str
|
||||
) -> float:
|
||||
"""Estimate cost for asset generation."""
|
||||
if asset_type == "image":
|
||||
# Premium quality image: ~5-6 credits
|
||||
return 5.0
|
||||
elif asset_type == "video":
|
||||
# WAN 2.5 Image-to-Video: $0.05-$0.15/second
|
||||
# Default: 5 seconds at 720p = $0.50
|
||||
return 0.50
|
||||
elif asset_type == "text":
|
||||
return 0.0 # Text generation is typically included
|
||||
else:
|
||||
return 0.0
|
||||
|
||||
def _generate_concept_summary(self, prompt: str) -> str:
|
||||
"""Generate a brief concept summary from prompt."""
|
||||
# Simple extraction: take first 100 chars
|
||||
return prompt[:100] + "..." if len(prompt) > 100 else prompt
|
||||
Reference in New Issue
Block a user