303 lines
11 KiB
Python
303 lines
11 KiB
Python
"""
|
|
Campaign Storage Service
|
|
Handles database persistence for campaigns, proposals, and assets.
|
|
"""
|
|
|
|
from typing import Dict, Any, List, Optional
|
|
from loguru import logger
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import desc
|
|
|
|
from models.product_marketing_models import Campaign, CampaignProposal, CampaignAsset, CampaignStatus
|
|
from services.database import get_session_for_user
|
|
|
|
|
|
class CampaignStorageService:
|
|
"""Service for storing and retrieving campaigns from database."""
|
|
|
|
def __init__(self):
|
|
"""Initialize Campaign Storage Service."""
|
|
self.logger = logger
|
|
logger.info("[Campaign Storage] Service initialized")
|
|
|
|
def save_campaign(
|
|
self,
|
|
user_id: str,
|
|
campaign_data: Dict[str, Any]
|
|
) -> Campaign:
|
|
"""
|
|
Save campaign blueprint to database.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
campaign_data: Campaign blueprint data
|
|
|
|
Returns:
|
|
Saved Campaign object
|
|
"""
|
|
db = get_session_for_user(user_id)
|
|
if not db:
|
|
raise ValueError(f"Could not create database session for user {user_id}")
|
|
|
|
try:
|
|
campaign_id = campaign_data.get('campaign_id')
|
|
|
|
# Check if campaign exists
|
|
existing = db.query(Campaign).filter(
|
|
Campaign.campaign_id == campaign_id,
|
|
Campaign.user_id == user_id
|
|
).first()
|
|
|
|
if existing:
|
|
# Update existing campaign
|
|
existing.campaign_name = campaign_data.get('campaign_name', existing.campaign_name)
|
|
existing.goal = campaign_data.get('goal', existing.goal)
|
|
existing.kpi = campaign_data.get('kpi', existing.kpi)
|
|
existing.status = campaign_data.get('status', existing.status)
|
|
existing.phases = campaign_data.get('phases', existing.phases)
|
|
existing.channels = campaign_data.get('channels', existing.channels)
|
|
existing.asset_nodes = campaign_data.get('asset_nodes', existing.asset_nodes)
|
|
existing.product_context = campaign_data.get('product_context', existing.product_context)
|
|
db.commit()
|
|
db.refresh(existing)
|
|
logger.info(f"[Campaign Storage] Updated campaign {campaign_id}")
|
|
return existing
|
|
else:
|
|
# Create new campaign
|
|
campaign = Campaign(
|
|
campaign_id=campaign_id,
|
|
user_id=user_id,
|
|
campaign_name=campaign_data.get('campaign_name'),
|
|
goal=campaign_data.get('goal'),
|
|
kpi=campaign_data.get('kpi'),
|
|
status=campaign_data.get('status', 'draft'),
|
|
phases=campaign_data.get('phases'),
|
|
channels=campaign_data.get('channels', []),
|
|
asset_nodes=campaign_data.get('asset_nodes', []),
|
|
product_context=campaign_data.get('product_context'),
|
|
)
|
|
db.add(campaign)
|
|
db.commit()
|
|
db.refresh(campaign)
|
|
logger.info(f"[Campaign Storage] Saved new campaign {campaign_id}")
|
|
return campaign
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"[Campaign Storage] Error saving campaign: {str(e)}")
|
|
raise
|
|
finally:
|
|
db.close()
|
|
|
|
def get_campaign(
|
|
self,
|
|
user_id: str,
|
|
campaign_id: str
|
|
) -> Optional[Campaign]:
|
|
"""Get campaign by ID."""
|
|
db = get_session_for_user(user_id)
|
|
if not db:
|
|
logger.error(f"Could not create database session for user {user_id}")
|
|
return None
|
|
|
|
try:
|
|
campaign = db.query(Campaign).filter(
|
|
Campaign.campaign_id == campaign_id,
|
|
Campaign.user_id == user_id
|
|
).first()
|
|
return campaign
|
|
except Exception as e:
|
|
logger.error(f"[Campaign Storage] Error getting campaign: {str(e)}")
|
|
return None
|
|
finally:
|
|
db.close()
|
|
|
|
def list_campaigns(
|
|
self,
|
|
user_id: str,
|
|
status: Optional[str] = None,
|
|
limit: int = 50
|
|
) -> List[Campaign]:
|
|
"""List campaigns for user."""
|
|
db = get_session_for_user(user_id)
|
|
try:
|
|
query = db.query(Campaign).filter(Campaign.user_id == user_id)
|
|
|
|
if status:
|
|
query = query.filter(Campaign.status == status)
|
|
|
|
campaigns = query.order_by(desc(Campaign.created_at)).limit(limit).all()
|
|
return campaigns
|
|
except Exception as e:
|
|
logger.error(f"[Campaign Storage] Error listing campaigns: {str(e)}")
|
|
return []
|
|
finally:
|
|
db.close()
|
|
|
|
def save_proposals(
|
|
self,
|
|
user_id: str,
|
|
campaign_id: str,
|
|
proposals: Dict[str, Any]
|
|
) -> List[CampaignProposal]:
|
|
"""Save asset proposals for a campaign."""
|
|
db = get_session_for_user(user_id)
|
|
try:
|
|
# Delete existing proposals for this campaign
|
|
db.query(CampaignProposal).filter(
|
|
CampaignProposal.campaign_id == campaign_id,
|
|
CampaignProposal.user_id == user_id
|
|
).delete()
|
|
|
|
# Create new proposals
|
|
saved_proposals = []
|
|
for asset_id, proposal_data in proposals.get('proposals', {}).items():
|
|
proposal = CampaignProposal(
|
|
campaign_id=campaign_id,
|
|
user_id=user_id,
|
|
asset_node_id=asset_id,
|
|
asset_type=proposal_data.get('asset_type'),
|
|
channel=proposal_data.get('channel'),
|
|
proposed_prompt=proposal_data.get('proposed_prompt'),
|
|
recommended_template=proposal_data.get('recommended_template'),
|
|
recommended_provider=proposal_data.get('recommended_provider'),
|
|
recommended_model=proposal_data.get('recommended_model'),
|
|
cost_estimate=proposal_data.get('cost_estimate', 0.0),
|
|
concept_summary=proposal_data.get('concept_summary'),
|
|
status='proposed',
|
|
)
|
|
db.add(proposal)
|
|
saved_proposals.append(proposal)
|
|
|
|
db.commit()
|
|
for proposal in saved_proposals:
|
|
db.refresh(proposal)
|
|
|
|
logger.info(f"[Campaign Storage] Saved {len(saved_proposals)} proposals for campaign {campaign_id}")
|
|
return saved_proposals
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"[Campaign Storage] Error saving proposals: {str(e)}")
|
|
raise
|
|
finally:
|
|
db.close()
|
|
|
|
def get_proposals(
|
|
self,
|
|
user_id: str,
|
|
campaign_id: str
|
|
) -> List[CampaignProposal]:
|
|
"""Get proposals for a campaign."""
|
|
db = get_session_for_user(user_id)
|
|
try:
|
|
proposals = db.query(CampaignProposal).filter(
|
|
CampaignProposal.campaign_id == campaign_id,
|
|
CampaignProposal.user_id == user_id
|
|
).all()
|
|
return proposals
|
|
except Exception as e:
|
|
logger.error(f"[Campaign Storage] Error getting proposals: {str(e)}")
|
|
return []
|
|
finally:
|
|
db.close()
|
|
|
|
def update_campaign_status(
|
|
self,
|
|
user_id: str,
|
|
campaign_id: str,
|
|
status: str
|
|
) -> bool:
|
|
"""Update campaign status."""
|
|
db = get_session_for_user(user_id)
|
|
try:
|
|
campaign = db.query(Campaign).filter(
|
|
Campaign.campaign_id == campaign_id,
|
|
Campaign.user_id == user_id
|
|
).first()
|
|
|
|
if campaign:
|
|
campaign.status = status
|
|
db.commit()
|
|
logger.info(f"[Campaign Storage] Updated campaign {campaign_id} status to {status}")
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"[Campaign Storage] Error updating status: {str(e)}")
|
|
return False
|
|
finally:
|
|
db.close()
|
|
|
|
def update_asset_status(
|
|
self,
|
|
user_id: str,
|
|
campaign_id: str,
|
|
asset_id: str,
|
|
status: str,
|
|
generated_asset_id: Optional[int] = None
|
|
) -> bool:
|
|
"""
|
|
Update status of a campaign asset and its proposal.
|
|
|
|
Args:
|
|
user_id: User ID
|
|
campaign_id: Campaign ID
|
|
asset_id: Asset node ID
|
|
status: New status (generating, ready, approved, rejected)
|
|
generated_asset_id: Optional Asset Library ID
|
|
|
|
Returns:
|
|
True if updated successfully
|
|
"""
|
|
db = get_session_for_user(user_id)
|
|
try:
|
|
# Update proposal status
|
|
proposal = db.query(CampaignProposal).filter(
|
|
CampaignProposal.campaign_id == campaign_id,
|
|
CampaignProposal.user_id == user_id,
|
|
CampaignProposal.asset_node_id == asset_id
|
|
).first()
|
|
|
|
if proposal:
|
|
proposal.status = status
|
|
if generated_asset_id:
|
|
proposal.generated_asset_id = generated_asset_id
|
|
db.commit()
|
|
logger.info(f"[Campaign Storage] Updated proposal {asset_id} status to {status}")
|
|
|
|
# Update or create campaign asset
|
|
campaign_asset = db.query(CampaignAsset).filter(
|
|
CampaignAsset.campaign_id == campaign_id,
|
|
CampaignAsset.user_id == user_id,
|
|
CampaignAsset.asset_node_id == asset_id
|
|
).first()
|
|
|
|
if campaign_asset:
|
|
campaign_asset.status = status
|
|
if generated_asset_id:
|
|
campaign_asset.generated_asset_id = generated_asset_id
|
|
db.commit()
|
|
logger.info(f"[Campaign Storage] Updated campaign asset {asset_id} status to {status}")
|
|
else:
|
|
# Create new campaign asset if it doesn't exist
|
|
if proposal:
|
|
campaign_asset = CampaignAsset(
|
|
campaign_id=campaign_id,
|
|
user_id=user_id,
|
|
asset_node_id=asset_id,
|
|
asset_type=proposal.asset_type,
|
|
channel=proposal.channel,
|
|
status=status,
|
|
generated_asset_id=generated_asset_id,
|
|
)
|
|
db.add(campaign_asset)
|
|
db.commit()
|
|
logger.info(f"[Campaign Storage] Created campaign asset {asset_id}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"[Campaign Storage] Error updating asset status: {str(e)}")
|
|
return False
|
|
finally:
|
|
db.close()
|