""" Campaign Storage Service Handles database persistence for campaigns, proposals, and assets. """ from typing import Dict, Any, List, Optional from loguru import logger from sqlalchemy.orm import Session from sqlalchemy import desc from models.product_marketing_models import Campaign, CampaignProposal, CampaignAsset, CampaignStatus from services.database import get_session_for_user class CampaignStorageService: """Service for storing and retrieving campaigns from database.""" def __init__(self): """Initialize Campaign Storage Service.""" self.logger = logger logger.info("[Campaign Storage] Service initialized") def save_campaign( self, user_id: str, campaign_data: Dict[str, Any] ) -> Campaign: """ Save campaign blueprint to database. Args: user_id: User ID campaign_data: Campaign blueprint data Returns: Saved Campaign object """ db = get_session_for_user(user_id) if not db: raise ValueError(f"Could not create database session for user {user_id}") try: campaign_id = campaign_data.get('campaign_id') # Check if campaign exists existing = db.query(Campaign).filter( Campaign.campaign_id == campaign_id, Campaign.user_id == user_id ).first() if existing: # Update existing campaign existing.campaign_name = campaign_data.get('campaign_name', existing.campaign_name) existing.goal = campaign_data.get('goal', existing.goal) existing.kpi = campaign_data.get('kpi', existing.kpi) existing.status = campaign_data.get('status', existing.status) existing.phases = campaign_data.get('phases', existing.phases) existing.channels = campaign_data.get('channels', existing.channels) existing.asset_nodes = campaign_data.get('asset_nodes', existing.asset_nodes) existing.product_context = campaign_data.get('product_context', existing.product_context) db.commit() db.refresh(existing) logger.info(f"[Campaign Storage] Updated campaign {campaign_id}") return existing else: # Create new campaign campaign = Campaign( campaign_id=campaign_id, user_id=user_id, campaign_name=campaign_data.get('campaign_name'), goal=campaign_data.get('goal'), kpi=campaign_data.get('kpi'), status=campaign_data.get('status', 'draft'), phases=campaign_data.get('phases'), channels=campaign_data.get('channels', []), asset_nodes=campaign_data.get('asset_nodes', []), product_context=campaign_data.get('product_context'), ) db.add(campaign) db.commit() db.refresh(campaign) logger.info(f"[Campaign Storage] Saved new campaign {campaign_id}") return campaign except Exception as e: db.rollback() logger.error(f"[Campaign Storage] Error saving campaign: {str(e)}") raise finally: db.close() def get_campaign( self, user_id: str, campaign_id: str ) -> Optional[Campaign]: """Get campaign by ID.""" db = get_session_for_user(user_id) if not db: logger.error(f"Could not create database session for user {user_id}") return None try: campaign = db.query(Campaign).filter( Campaign.campaign_id == campaign_id, Campaign.user_id == user_id ).first() return campaign except Exception as e: logger.error(f"[Campaign Storage] Error getting campaign: {str(e)}") return None finally: db.close() def list_campaigns( self, user_id: str, status: Optional[str] = None, limit: int = 50 ) -> List[Campaign]: """List campaigns for user.""" db = get_session_for_user(user_id) try: query = db.query(Campaign).filter(Campaign.user_id == user_id) if status: query = query.filter(Campaign.status == status) campaigns = query.order_by(desc(Campaign.created_at)).limit(limit).all() return campaigns except Exception as e: logger.error(f"[Campaign Storage] Error listing campaigns: {str(e)}") return [] finally: db.close() def save_proposals( self, user_id: str, campaign_id: str, proposals: Dict[str, Any] ) -> List[CampaignProposal]: """Save asset proposals for a campaign.""" db = get_session_for_user(user_id) try: # Delete existing proposals for this campaign db.query(CampaignProposal).filter( CampaignProposal.campaign_id == campaign_id, CampaignProposal.user_id == user_id ).delete() # Create new proposals saved_proposals = [] for asset_id, proposal_data in proposals.get('proposals', {}).items(): proposal = CampaignProposal( campaign_id=campaign_id, user_id=user_id, asset_node_id=asset_id, asset_type=proposal_data.get('asset_type'), channel=proposal_data.get('channel'), proposed_prompt=proposal_data.get('proposed_prompt'), recommended_template=proposal_data.get('recommended_template'), recommended_provider=proposal_data.get('recommended_provider'), recommended_model=proposal_data.get('recommended_model'), cost_estimate=proposal_data.get('cost_estimate', 0.0), concept_summary=proposal_data.get('concept_summary'), status='proposed', ) db.add(proposal) saved_proposals.append(proposal) db.commit() for proposal in saved_proposals: db.refresh(proposal) logger.info(f"[Campaign Storage] Saved {len(saved_proposals)} proposals for campaign {campaign_id}") return saved_proposals except Exception as e: db.rollback() logger.error(f"[Campaign Storage] Error saving proposals: {str(e)}") raise finally: db.close() def get_proposals( self, user_id: str, campaign_id: str ) -> List[CampaignProposal]: """Get proposals for a campaign.""" db = get_session_for_user(user_id) try: proposals = db.query(CampaignProposal).filter( CampaignProposal.campaign_id == campaign_id, CampaignProposal.user_id == user_id ).all() return proposals except Exception as e: logger.error(f"[Campaign Storage] Error getting proposals: {str(e)}") return [] finally: db.close() def update_campaign_status( self, user_id: str, campaign_id: str, status: str ) -> bool: """Update campaign status.""" db = get_session_for_user(user_id) try: campaign = db.query(Campaign).filter( Campaign.campaign_id == campaign_id, Campaign.user_id == user_id ).first() if campaign: campaign.status = status db.commit() logger.info(f"[Campaign Storage] Updated campaign {campaign_id} status to {status}") return True return False except Exception as e: db.rollback() logger.error(f"[Campaign Storage] Error updating status: {str(e)}") return False finally: db.close() def update_asset_status( self, user_id: str, campaign_id: str, asset_id: str, status: str, generated_asset_id: Optional[int] = None ) -> bool: """ Update status of a campaign asset and its proposal. Args: user_id: User ID campaign_id: Campaign ID asset_id: Asset node ID status: New status (generating, ready, approved, rejected) generated_asset_id: Optional Asset Library ID Returns: True if updated successfully """ db = get_session_for_user(user_id) try: # Update proposal status proposal = db.query(CampaignProposal).filter( CampaignProposal.campaign_id == campaign_id, CampaignProposal.user_id == user_id, CampaignProposal.asset_node_id == asset_id ).first() if proposal: proposal.status = status if generated_asset_id: proposal.generated_asset_id = generated_asset_id db.commit() logger.info(f"[Campaign Storage] Updated proposal {asset_id} status to {status}") # Update or create campaign asset campaign_asset = db.query(CampaignAsset).filter( CampaignAsset.campaign_id == campaign_id, CampaignAsset.user_id == user_id, CampaignAsset.asset_node_id == asset_id ).first() if campaign_asset: campaign_asset.status = status if generated_asset_id: campaign_asset.generated_asset_id = generated_asset_id db.commit() logger.info(f"[Campaign Storage] Updated campaign asset {asset_id} status to {status}") else: # Create new campaign asset if it doesn't exist if proposal: campaign_asset = CampaignAsset( campaign_id=campaign_id, user_id=user_id, asset_node_id=asset_id, asset_type=proposal.asset_type, channel=proposal.channel, status=status, generated_asset_id=generated_asset_id, ) db.add(campaign_asset) db.commit() logger.info(f"[Campaign Storage] Created campaign asset {asset_id}") return True except Exception as e: db.rollback() logger.error(f"[Campaign Storage] Error updating asset status: {str(e)}") return False finally: db.close()