diff --git a/backend/models/backlink_outreach_models.py b/backend/models/backlink_outreach_models.py index 53ca15e4..075cade7 100644 --- a/backend/models/backlink_outreach_models.py +++ b/backend/models/backlink_outreach_models.py @@ -46,6 +46,7 @@ class OutreachAttempt(Base): decision_reason = Column(Text, nullable=True) sent_at = Column(DateTime, nullable=True) created_at = Column(DateTime, default=datetime.utcnow, index=True) + message_id = Column(String(255), nullable=True, index=True) class OutreachReply(Base): diff --git a/backend/routers/backlink_outreach.py b/backend/routers/backlink_outreach.py index 17d2b09a..2382f76d 100644 --- a/backend/routers/backlink_outreach.py +++ b/backend/routers/backlink_outreach.py @@ -368,26 +368,40 @@ async def send_outreach( lead_email = (lead.get("email") or "") if lead else "" if result.status == "approved" and result.policy_allowed and not result.duplicate and lead_email: - send_result = await backlink_outreach_sender.send_email( - to_email=lead_email, - subject=subject, - body=body, - from_email=payload.sender_email, - ) - if send_result.success: - storage.update_attempt_status(result.attempt_id, "sent", user_id=user_id) - result.status = "sent" - result.effective_sender_email = send_result.effective_sender_email or result.effective_sender_email - storage.mark_idempotency(payload.idempotency_key, user_id) - storage.increment_user_send_counter(user_id) - domain = lead_email.split("@")[-1] if "@" in lead_email else "unknown" - storage.increment_domain_send_counter(domain, user_id=user_id) + domain = lead_email.split("@")[-1] if "@" in lead_email else "unknown" + + user_within_cap, _ = storage.try_increment_user_send_counter(user_id) + domain_within_cap, _ = storage.try_increment_domain_send_counter(domain, user_id=user_id) + if not (user_within_cap and domain_within_cap): + reasons = [] + if not user_within_cap: + reasons.append("user_daily_cap_exceeded") + if not domain_within_cap: + reasons.append("domain_daily_cap_exceeded") + reason_str = f"rate_limit_hit; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}" + storage.update_attempt_status(result.attempt_id, "blocked", decision_reason=reason_str, user_id=user_id) + result.status = "blocked" + result.policy_reasons = reasons else: - reason = f"smtp_send_failed; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}" - storage.update_attempt_status(result.attempt_id, "failed", decision_reason=reason, user_id=user_id) - result.status = "failed" - result.policy_reasons = ["smtp_send_failed"] - result.retry_policy = backlink_outreach_service.SMTP_RETRY_POLICY + send_result = await backlink_outreach_sender.send_email( + to_email=lead_email, + subject=subject, + body=body, + from_email=payload.sender_email, + ) + if send_result.success: + storage.update_attempt_status(result.attempt_id, "sent", user_id=user_id) + result.status = "sent" + result.effective_sender_email = send_result.effective_sender_email or result.effective_sender_email + if send_result.message_id: + storage.update_attempt_message_id(result.attempt_id, send_result.message_id, user_id=user_id) + storage.mark_idempotency(payload.idempotency_key, user_id) + else: + reason = f"smtp_send_failed; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}" + storage.update_attempt_status(result.attempt_id, "failed", decision_reason=reason, user_id=user_id) + result.status = "failed" + result.policy_reasons = ["smtp_send_failed"] + result.retry_policy = backlink_outreach_service.SMTP_RETRY_POLICY elif result.status == "approved" and result.policy_allowed and not result.duplicate and not lead_email: reason = f"lead_has_no_email; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}" storage.update_attempt_status(result.attempt_id, "failed", decision_reason=reason, user_id=user_id) @@ -448,7 +462,18 @@ async def poll_replies( if storage.reply_exists(from_email, subject, user_id=user_id): skipped += 1 continue - attempt_id = storage.find_attempt_by_from_email(from_email, user_id=user_id) or "" + + attempt_id = "" + in_reply_to = raw.get("in_reply_to", "") + references = raw.get("references", "") + if in_reply_to: + attempt_id = storage.find_attempt_by_message_id(in_reply_to, user_id=user_id) or "" + if not attempt_id and references: + mid = references.split()[-1] + attempt_id = storage.find_attempt_by_message_id(mid, user_id=user_id) or "" + if not attempt_id: + attempt_id = storage.find_attempt_by_from_email(from_email, user_id=user_id) or "" + reply = storage.add_reply( attempt_id=attempt_id, from_email=from_email, diff --git a/backend/services/backlink_outreach_models.py b/backend/services/backlink_outreach_models.py index 0b5af65e..b6b1a857 100644 --- a/backend/services/backlink_outreach_models.py +++ b/backend/services/backlink_outreach_models.py @@ -2,6 +2,7 @@ from __future__ import annotations from pydantic import BaseModel, Field, HttpUrl from typing import Dict, List, Optional +from typing_extensions import Literal class BacklinkKeywordInput(BaseModel): @@ -93,7 +94,7 @@ class LeadListResponse(BaseModel): class LeadStatusUpdateRequest(BaseModel): - status: str = Field(..., min_length=1) + status: Literal["discovered", "contacted", "replied", "placed", "bounced", "unsubscribed"] notes: Optional[str] = None campaign_id: Optional[str] = Field(default=None, min_length=1) @@ -329,7 +330,7 @@ class ConversionFunnelResponse(BaseModel): class BulkStatusUpdateRequest(BaseModel): lead_ids: List[str] = Field(..., min_length=1) - status: str = Field(..., min_length=1) + status: Literal["discovered", "contacted", "replied", "placed", "bounced", "unsubscribed"] notes: Optional[str] = None campaign_id: Optional[str] = Field(default=None, min_length=1) diff --git a/backend/services/backlink_outreach_reply_monitor.py b/backend/services/backlink_outreach_reply_monitor.py index 3f465fa9..30298402 100644 --- a/backend/services/backlink_outreach_reply_monitor.py +++ b/backend/services/backlink_outreach_reply_monitor.py @@ -104,6 +104,8 @@ class BacklinkOutreachReplyMonitor: from_email = parsed_msg.get("From", "") subject = parsed_msg.get("Subject", "") received_at = parsed_msg.get("Date", "") + in_reply_to = parsed_msg.get("In-Reply-To", "") + references = parsed_msg.get("References", "") # Extract body body = "" @@ -137,6 +139,8 @@ class BacklinkOutreachReplyMonitor: "body": body[:5000], "classification": classification, "received_at": received_at_iso, + "in_reply_to": in_reply_to, + "references": references, } except Exception as e: logger.error(f"Failed to parse reply: {e}") diff --git a/backend/services/backlink_outreach_sender.py b/backend/services/backlink_outreach_sender.py index 530c4cf8..c9ba63b4 100644 --- a/backend/services/backlink_outreach_sender.py +++ b/backend/services/backlink_outreach_sender.py @@ -10,6 +10,7 @@ from dataclasses import dataclass, field from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from typing import List, Optional, Set +from uuid import uuid4 from loguru import logger @@ -35,6 +36,7 @@ class SenderAuthorizationResult: class SendEmailResult: success: bool effective_sender_email: str = "" + message_id: str = "" failure_reasons: List[str] = field(default_factory=list) @@ -116,10 +118,12 @@ class BacklinkOutreachSender: sender = sender_validation.effective_sender_email + msg_id = f"<{uuid4().hex}@{sender.split('@')[-1] if '@' in sender else 'outreach.local'}>" msg = MIMEMultipart("alternative") msg["From"] = sender msg["To"] = to_email msg["Subject"] = subject + msg["Message-ID"] = msg_id msg.attach(MIMEText(body, "plain")) loop = asyncio.get_running_loop() @@ -149,6 +153,7 @@ class BacklinkOutreachSender: return SendEmailResult( success=success, effective_sender_email=sender, + message_id=msg_id if success else "", failure_reasons=[] if success else ["smtp_send_failed"], ) diff --git a/backend/services/backlink_outreach_service.py b/backend/services/backlink_outreach_service.py index 357cbbf0..abe9e3ac 100644 --- a/backend/services/backlink_outreach_service.py +++ b/backend/services/backlink_outreach_service.py @@ -23,9 +23,6 @@ from services.backlink_outreach_models import ( ) from services.backlink_outreach_storage import BacklinkOutreachStorageService -DEFAULT_USER_DAILY_CAP = 100 -DEFAULT_DOMAIN_DAILY_CAP = 20 - @dataclass class SearchResult: url: str @@ -235,13 +232,6 @@ class BacklinkOutreachService: if storage.check_idempotency(payload.idempotency_key, user_id=payload.user_id): reasons.append("duplicate_idempotency_key") - user_count = storage.get_user_send_count(payload.user_id) - domain_count = storage.get_domain_send_count(payload.recipient_domain, user_id=payload.user_id) - if user_count >= DEFAULT_USER_DAILY_CAP: - reasons.append("user_daily_cap_exceeded") - if domain_count >= DEFAULT_DOMAIN_DAILY_CAP: - reasons.append("domain_daily_cap_exceeded") - allowed = len(reasons) == 0 final_status = "approved" if allowed else "blocked" diff --git a/backend/services/backlink_outreach_storage.py b/backend/services/backlink_outreach_storage.py index 9caade23..3a863a4f 100644 --- a/backend/services/backlink_outreach_storage.py +++ b/backend/services/backlink_outreach_storage.py @@ -8,6 +8,8 @@ from typing import List, Optional from sqlalchemy import text as sql_text, func as sa_func from sqlalchemy.exc import IntegrityError +LEAD_VALID_STATUSES = frozenset({"discovered", "contacted", "replied", "placed", "bounced", "unsubscribed"}) + from services.database import get_session_for_user from models.backlink_outreach_models import ( Base, BacklinkCampaign, BacklinkLead, @@ -21,6 +23,10 @@ class BacklinkCampaignNotFoundError(RuntimeError): """Raised when a backlink campaign is missing or not owned by the user.""" +DEFAULT_USER_DAILY_CAP = 100 +DEFAULT_DOMAIN_DAILY_CAP = 20 + + class BacklinkOutreachStorageService: _NEW_LEAD_COLUMNS = [ "url", "page_title", "snippet", "confidence_score", "discovery_source", "notes" @@ -154,6 +160,14 @@ class BacklinkOutreachStorageService: if not self._campaign_belongs_to_user(db, campaign_id, user_id): raise BacklinkCampaignNotFoundError("Campaign not found") + existing = ( + db.query(BacklinkLead) + .filter(BacklinkLead.campaign_id == campaign_id, BacklinkLead.url == url) + .first() + ) + if existing: + return self._lead_to_dict(existing) + lead = BacklinkLead( id=f"bl_{uuid4().hex[:16]}", campaign_id=campaign_id, @@ -183,12 +197,22 @@ class BacklinkOutreachStorageService: if not self._campaign_belongs_to_user(db, campaign_id, user_id): raise BacklinkCampaignNotFoundError("Campaign not found") + existing_urls = { + row[0] + for row in db.query(BacklinkLead.url) + .filter(BacklinkLead.campaign_id == campaign_id) + .all() + } + added = [] for data in leads_data: + url = data.get("url", "") + if url in existing_urls: + continue lead = BacklinkLead( id=f"bl_{uuid4().hex[:16]}", campaign_id=campaign_id, - url=data.get("url", ""), + url=url, domain=data.get("domain", ""), page_title=data.get("page_title", ""), snippet=data.get("snippet", ""), @@ -201,6 +225,7 @@ class BacklinkOutreachStorageService: ) db.add(lead) added.append(lead) + existing_urls.add(url) db.commit() return [self._lead_to_dict(l) for l in added] finally: @@ -230,29 +255,27 @@ class BacklinkOutreachStorageService: notes: Optional[str] = None, campaign_id: Optional[str] = None, ) -> Optional[dict]: + if status not in LEAD_VALID_STATUSES: + raise ValueError(f"Invalid status '{status}'. Valid values: {sorted(LEAD_VALID_STATUSES)}") + self._ensure_tables(user_id) db = get_session_for_user(user_id) if not db: return None try: - query = ( - db.query(BacklinkLead) - .join(BacklinkCampaign, BacklinkLead.campaign_id == BacklinkCampaign.id) - .filter( - BacklinkLead.id == lead_id, - BacklinkCampaign.user_id == user_id, - ) - ) - if campaign_id: - query = query.filter(BacklinkCampaign.id == campaign_id) - - lead = query.first() + lead = db.query(BacklinkLead).filter(BacklinkLead.id == lead_id).first() if not lead: - access = self._get_lead_access_rows(db, [lead_id]).get(lead_id) - if not access: - return None - if access["user_id"] != user_id: - raise PermissionError("Lead does not belong to the current user") + return None + + campaign = ( + db.query(BacklinkCampaign) + .filter(BacklinkCampaign.id == lead.campaign_id, BacklinkCampaign.user_id == user_id) + .first() + ) + if not campaign: + raise PermissionError("Lead does not belong to the current user") + + if campaign_id and lead.campaign_id != campaign_id: return None lead.status = status @@ -491,6 +514,7 @@ class BacklinkOutreachStorageService: "decision_reason": attempt.decision_reason, "sent_at": attempt.sent_at.isoformat() if attempt.sent_at else None, "created_at": attempt.created_at.isoformat() if attempt.created_at else None, + "message_id": attempt.message_id or "", } def find_attempt_by_from_email(self, from_email: str, user_id: str = "default") -> Optional[str]: @@ -512,6 +536,37 @@ class BacklinkOutreachStorageService: finally: db.close() + def update_attempt_message_id(self, attempt_id: str, message_id: str, user_id: str = "default") -> Optional[dict]: + self._ensure_tables(user_id) + db = get_session_for_user(user_id) + if not db: + return None + try: + attempt = db.query(OutreachAttempt).filter(OutreachAttempt.id == attempt_id).first() + if not attempt: + return None + attempt.message_id = message_id + db.commit() + return self._attempt_to_dict(attempt) + finally: + db.close() + + def find_attempt_by_message_id(self, message_id: str, user_id: str = "default") -> Optional[str]: + self._ensure_tables(user_id) + db = get_session_for_user(user_id) + if not db: + return None + try: + clean = message_id.strip() + attempt = ( + db.query(OutreachAttempt) + .filter(OutreachAttempt.message_id == clean) + .first() + ) + return attempt.id if attempt else None + finally: + db.close() + # -- Outreach Reply CRUD -- def reply_exists(self, from_email: str, subject: str, user_id: str = "default") -> bool: @@ -855,27 +910,6 @@ class BacklinkOutreachStorageService: def _today(self) -> date: return date.today() - def increment_user_send_counter(self, user_id: str) -> int: - self._ensure_tables(user_id) - db = get_session_for_user(user_id) - if not db: - return 0 - try: - today = self._today() - row_id = f"scu_{uuid4().hex[:16]}" - db.execute(sql_text( - "INSERT INTO backlink_send_counters_user (id, user_id, date, count) " - "VALUES (:id, :uid, :dt, 1) " - "ON CONFLICT (user_id, date) DO UPDATE SET count = count + 1" - ), {"id": row_id, "uid": user_id, "dt": today}) - db.commit() - result = db.query(SendCounterUser.count).filter( - SendCounterUser.user_id == user_id, SendCounterUser.date == today - ).first() - return result[0] if result else 0 - finally: - db.close() - def get_user_send_count(self, user_id: str) -> int: db = get_session_for_user(user_id) if not db: @@ -891,28 +925,6 @@ class BacklinkOutreachStorageService: finally: db.close() - def increment_domain_send_counter(self, domain: str, user_id: str = "default") -> int: - self._ensure_tables(user_id) - db = get_session_for_user(user_id) - if not db: - return 0 - try: - today = self._today() - domain_lower = domain.lower() - row_id = f"scd_{uuid4().hex[:16]}" - db.execute(sql_text( - "INSERT INTO backlink_send_counters_domain (id, domain, date, count) " - "VALUES (:id, :dom, :dt, 1) " - "ON CONFLICT (domain, date) DO UPDATE SET count = count + 1" - ), {"id": row_id, "dom": domain_lower, "dt": today}) - db.commit() - result = db.query(SendCounterDomain.count).filter( - SendCounterDomain.domain == domain_lower, SendCounterDomain.date == today - ).first() - return result[0] if result else 0 - finally: - db.close() - def get_domain_send_count(self, domain: str, user_id: str = "default") -> int: db = get_session_for_user(user_id) if not db: @@ -928,6 +940,73 @@ class BacklinkOutreachStorageService: finally: db.close() + def try_increment_user_send_counter(self, user_id: str) -> tuple: + """Atomically check cap and increment. Returns (within_cap, new_count).""" + self._ensure_tables(user_id) + db = get_session_for_user(user_id) + if not db: + return True, 0 + try: + today = self._today() + current = ( + db.query(SendCounterUser.count) + .filter(SendCounterUser.user_id == user_id, SendCounterUser.date == today) + .scalar() + ) or 0 + if current >= DEFAULT_USER_DAILY_CAP: + db.close() + return False, current + row_id = f"scu_{uuid4().hex[:16]}" + db.execute(sql_text( + "INSERT INTO backlink_send_counters_user (id, user_id, date, count) " + "VALUES (:id, :uid, :dt, 1) " + "ON CONFLICT (user_id, date) DO UPDATE SET count = count + 1" + ), {"id": row_id, "uid": user_id, "dt": today}) + db.commit() + result = db.query(SendCounterUser.count).filter( + SendCounterUser.user_id == user_id, SendCounterUser.date == today + ).first() + return True, result[0] if result else 0 + except Exception: + db.rollback() + return True, 0 + finally: + db.close() + + def try_increment_domain_send_counter(self, domain: str, user_id: str = "default") -> tuple: + """Atomically check cap and increment. Returns (within_cap, new_count).""" + self._ensure_tables(user_id) + db = get_session_for_user(user_id) + if not db: + return True, 0 + try: + today = self._today() + domain_lower = domain.lower() + current = ( + db.query(SendCounterDomain.count) + .filter(SendCounterDomain.domain == domain_lower, SendCounterDomain.date == today) + .scalar() + ) or 0 + if current >= DEFAULT_DOMAIN_DAILY_CAP: + db.close() + return False, current + row_id = f"scd_{uuid4().hex[:16]}" + db.execute(sql_text( + "INSERT INTO backlink_send_counters_domain (id, domain, date, count) " + "VALUES (:id, :dom, :dt, 1) " + "ON CONFLICT (domain, date) DO UPDATE SET count = count + 1" + ), {"id": row_id, "dom": domain_lower, "dt": today}) + db.commit() + result = db.query(SendCounterDomain.count).filter( + SendCounterDomain.domain == domain_lower, SendCounterDomain.date == today + ).first() + return True, result[0] if result else 0 + except Exception: + db.rollback() + return True, 0 + finally: + db.close() + # -- Audit Log -- def add_audit_log( diff --git a/frontend/src/api/backlinkOutreachApi.ts b/frontend/src/api/backlinkOutreachApi.ts index 49578b16..47c1c437 100644 --- a/frontend/src/api/backlinkOutreachApi.ts +++ b/frontend/src/api/backlinkOutreachApi.ts @@ -158,7 +158,7 @@ export interface LeadRecord { email: string | null; confidence_score: number; discovery_source: string; - status: string; + status: LeadStatus; notes: string | null; created_at: string | null; } @@ -179,8 +179,10 @@ export interface LeadCreateRequest { notes?: string; } +export type LeadStatus = 'discovered' | 'contacted' | 'replied' | 'placed' | 'bounced' | 'unsubscribed'; + export interface LeadStatusUpdateRequest { - status: string; + status: LeadStatus; notes?: string; campaign_id?: string; } @@ -335,7 +337,7 @@ export interface FollowUpRequest { export interface BulkStatusUpdateRequest { lead_ids: string[]; - status: string; + status: LeadStatus; notes?: string; campaign_id?: string; } diff --git a/frontend/src/components/BacklinkOutreach/BacklinkOutreachDashboard.tsx b/frontend/src/components/BacklinkOutreach/BacklinkOutreachDashboard.tsx index 6e348c38..3acc443a 100644 --- a/frontend/src/components/BacklinkOutreach/BacklinkOutreachDashboard.tsx +++ b/frontend/src/components/BacklinkOutreach/BacklinkOutreachDashboard.tsx @@ -12,6 +12,7 @@ import { GenerateEmailRequest, bulkUpdateLeadStatus, updateLeadStatus, + addLeadToCampaign, fetchCampaignAnalyticsVolume, fetchCampaignAnalyticsFunnel, CampaignVolumePoint, @@ -25,7 +26,7 @@ import { LineChart, Line, BarChart, Bar, XAxis, YAxis, CartesianGrid, Tooltip as type Tab = 'campaigns' | 'discover' | 'leads' | 'composer' | 'analytics'; -const STATUS_OPTIONS = ['discovered', 'contacted', 'replied', 'placed', 'bounced', 'unsubscribed']; +const STATUS_OPTIONS = ['discovered', 'contacted', 'replied', 'placed', 'bounced', 'unsubscribed'] as const; const STATUS_EXPLANATIONS: Record = { discovered: 'Lead found but not yet contacted', @@ -139,7 +140,7 @@ const BacklinkOutreachDashboard: React.FC = () => { const [templateName, setTemplateName] = useState(''); const [selectedLeadIds, setSelectedLeadIds] = useState>(new Set()); - const [bulkStatus, setBulkStatus] = useState('contacted'); + const [bulkStatus, setBulkStatus] = useState<'discovered' | 'contacted' | 'replied' | 'placed' | 'bounced' | 'unsubscribed'>('contacted'); const [volumeData, setVolumeData] = useState([]); const [funnelData, setFunnelData] = useState([]); @@ -203,9 +204,24 @@ const BacklinkOutreachDashboard: React.FC = () => { }, [keyword, deepDiscover]); const handleDiscoverAndSave = useCallback(async () => { - if (!keyword.trim() || !discoverCampaignId) return; - await deepDiscover(keyword.trim(), 15, discoverCampaignId); - }, [keyword, discoverCampaignId, deepDiscover]); + if (!keyword.trim() || !discoverCampaignId || discoveredOpportunities.length === 0) return; + for (const opp of discoveredOpportunities) { + try { + await addLeadToCampaign(discoverCampaignId, { + campaign_id: discoverCampaignId, + url: opp.url, + domain: opp.domain, + page_title: opp.page_title, + snippet: opp.snippet, + email: opp.email ?? undefined, + confidence_score: opp.confidence_score, + }); + } catch (e) { + // skip duplicates + } + } + showToastNotification(`Saved ${discoveredOpportunities.length} leads to campaign`, 'success'); + }, [keyword, discoverCampaignId, discoveredOpportunities]); const handleSelectCampaign = useCallback(async (campaignId: string) => { await selectCampaign(campaignId); @@ -324,7 +340,7 @@ const BacklinkOutreachDashboard: React.FC = () => { ); }; - const handleSingleStatusUpdate = async (leadId: string, status: string) => { + const handleSingleStatusUpdate = async (leadId: string, status: 'discovered' | 'contacted' | 'replied' | 'placed' | 'bounced' | 'unsubscribed') => { setIsStatusUpdating(true); try { await updateLeadStatus(leadId, { @@ -681,7 +697,7 @@ const BacklinkOutreachDashboard: React.FC = () => { {selectedLeadIds.size > 0 && ( <> - setBulkStatus(e.target.value as typeof bulkStatus)} style={{ ...selectSx, padding: '6px 10px', fontSize: '12px', minWidth: '130px' }}> {STATUS_OPTIONS.map((s) => )}