diff --git a/backend/routers/backlink_outreach.py b/backend/routers/backlink_outreach.py index a95e1f02..17d2b09a 100644 --- a/backend/routers/backlink_outreach.py +++ b/backend/routers/backlink_outreach.py @@ -329,56 +329,71 @@ async def send_outreach( effective_sender_email=sender_validation.effective_sender_email or None, ) - result = backlink_outreach_service.send_outreach( - SendOutreachRequest( - lead_id=payload.lead_id, - campaign_id=payload.campaign_id, - user_id=user_id, - workspace_id=payload.workspace_id, - sender_email=sender_validation.effective_sender_email, - subject=subject, - body=body, - idempotency_key=payload.idempotency_key, - sender_identity=payload.sender_identity, - legal_basis=payload.legal_basis, - contact_discovery_source=payload.contact_discovery_source, - recipient_region=payload.recipient_region, - recipient_region_source=payload.recipient_region_source, - consent_status=payload.consent_status, - approved_by_human=payload.approved_by_human, - unsubscribe_url=payload.unsubscribe_url, - one_click_unsubscribe=payload.one_click_unsubscribe, + try: + result = backlink_outreach_service.send_outreach( + SendOutreachRequest( + lead_id=payload.lead_id, + campaign_id=payload.campaign_id, + user_id=user_id, + workspace_id=payload.workspace_id, + sender_email=sender_validation.effective_sender_email, + subject=subject, + body=body, + idempotency_key=payload.idempotency_key, + sender_identity=payload.sender_identity, + legal_basis=payload.legal_basis, + contact_discovery_source=payload.contact_discovery_source, + recipient_region=payload.recipient_region, + recipient_region_source=payload.recipient_region_source, + consent_status=payload.consent_status, + approved_by_human=payload.approved_by_human, + unsubscribe_url=payload.unsubscribe_url, + one_click_unsubscribe=payload.one_click_unsubscribe, + ) ) - ) + except Exception: + existing = storage.get_attempt_by_idempotency_key(payload.idempotency_key, user_id=user_id) + if existing: + result = backlink_outreach_service.response_from_attempt(existing, duplicate=True) + if sender_validation.effective_sender_email: + result.effective_sender_email = sender_validation.effective_sender_email + return result + raise HTTPException(status_code=409, detail="Unable to reserve idempotency key") + result.effective_sender_email = sender_validation.effective_sender_email lead_email = "" - if result.attempt_id: + if result.attempt_id and result.status == "approved" and not result.duplicate: lead = storage.get_lead(payload.lead_id, user_id=user_id) lead_email = (lead.get("email") or "") if lead else "" - if result.policy_allowed and lead_email: + if result.status == "approved" and result.policy_allowed and not result.duplicate and lead_email: send_result = await backlink_outreach_sender.send_email( to_email=lead_email, subject=subject, body=body, from_email=payload.sender_email, ) - status = "sent" if send_result.success else "failed" - storage.update_attempt_status(result.attempt_id, status, user_id=user_id) - result.status = status - result.effective_sender_email = send_result.effective_sender_email or result.effective_sender_email - if send_result.failure_reasons: - result.policy_reasons = (result.policy_reasons or []) + send_result.failure_reasons if send_result.success: + storage.update_attempt_status(result.attempt_id, "sent", user_id=user_id) + result.status = "sent" + result.effective_sender_email = send_result.effective_sender_email or result.effective_sender_email storage.mark_idempotency(payload.idempotency_key, user_id) storage.increment_user_send_counter(user_id) domain = lead_email.split("@")[-1] if "@" in lead_email else "unknown" storage.increment_domain_send_counter(domain, user_id=user_id) - elif result.policy_allowed and not lead_email: - storage.update_attempt_status(result.attempt_id, "failed", user_id=user_id) + else: + reason = f"smtp_send_failed; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}" + storage.update_attempt_status(result.attempt_id, "failed", decision_reason=reason, user_id=user_id) + result.status = "failed" + result.policy_reasons = ["smtp_send_failed"] + result.retry_policy = backlink_outreach_service.SMTP_RETRY_POLICY + elif result.status == "approved" and result.policy_allowed and not result.duplicate and not lead_email: + reason = f"lead_has_no_email; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}" + storage.update_attempt_status(result.attempt_id, "failed", decision_reason=reason, user_id=user_id) result.status = "failed" result.policy_reasons = (result.policy_reasons or []) + ["lead_has_no_email"] + result.retry_policy = backlink_outreach_service.SMTP_RETRY_POLICY return result diff --git a/backend/services/backlink_outreach_models.py b/backend/services/backlink_outreach_models.py index 72c6fb1b..0b5af65e 100644 --- a/backend/services/backlink_outreach_models.py +++ b/backend/services/backlink_outreach_models.py @@ -192,6 +192,8 @@ class SendOutreachResponse(BaseModel): policy_allowed: bool policy_reasons: List[str] = Field(default_factory=list) effective_sender_email: Optional[str] = None + duplicate: bool = False + retry_policy: Optional[str] = None class OutreachAttemptRecord(BaseModel): diff --git a/backend/services/backlink_outreach_service.py b/backend/services/backlink_outreach_service.py index e3f8b90f..357cbbf0 100644 --- a/backend/services/backlink_outreach_service.py +++ b/backend/services/backlink_outreach_service.py @@ -271,12 +271,75 @@ class BacklinkOutreachService: return "au" return "unknown" + + SMTP_RETRY_POLICY = "manual_retry_with_new_idempotency_key" + + @staticmethod + def _decision_parts(attempt: Optional[dict]) -> List[str]: + if not attempt: + return [] + reason = attempt.get("decision_reason") or "" + return [part.strip() for part in reason.split(";") if part.strip()] + + def response_from_attempt(self, attempt: Optional[dict], duplicate: bool = False) -> SendOutreachResponse: + if not attempt: + return SendOutreachResponse( + attempt_id="", + status="duplicate", + policy_allowed=False, + policy_reasons=["duplicate_idempotency_key"], + duplicate=True, + ) + + status = attempt.get("status", "failed") + parts = self._decision_parts(attempt) + retry_policy = next((part.split("=", 1)[1] for part in parts if part.startswith("retry_policy=")), None) + reasons = [part for part in parts if not part.startswith("retry_policy=")] + if not retry_policy and ("smtp_send_failed" in reasons or "lead_has_no_email" in reasons): + retry_policy = self.SMTP_RETRY_POLICY + policy_allowed = status in {"queued", "approved", "sent", "failed"} and not any( + reason.startswith("human_review_required") + or reason in { + "invalid_legal_basis", + "region_requires_explicit_consent", + "sender_identity_required", + "recipient_suppressed", + "user_daily_cap_exceeded", + "domain_daily_cap_exceeded", + } + for reason in reasons + ) + if status == "blocked": + policy_allowed = False + return SendOutreachResponse( + attempt_id=attempt.get("attempt_id", ""), + status=status, + policy_allowed=policy_allowed, + policy_reasons=reasons, + duplicate=duplicate, + retry_policy=retry_policy, + ) + def send_outreach(self, request: SendOutreachRequest) -> SendOutreachResponse: storage = self._get_storage() lead = storage.get_lead(request.lead_id, user_id=request.user_id) if not lead: return SendOutreachResponse(attempt_id="", status="failed", policy_allowed=False, policy_reasons=["lead_not_found"]) + reservation = storage.reserve_attempt_idempotency( + lead_id=request.lead_id, + campaign_id=request.campaign_id, + idempotency_key=request.idempotency_key, + sender_email=request.sender_email, + subject=request.subject, + body=request.body, + user_id=request.user_id, + ) + if not reservation.get("reserved"): + return self.response_from_attempt(reservation.get("attempt"), duplicate=True) + + attempt = reservation.get("attempt") or {} + attempt_id = attempt.get("attempt_id", "") domain = lead.get("domain", request.sender_email.split("@")[-1] if "@" in request.sender_email else "unknown") recipient_region = (request.recipient_region or "unknown").strip().lower() if recipient_region == "unknown": @@ -305,21 +368,16 @@ class BacklinkOutreachService: ) policy = self.validate_send_policy(policy_req) - attempt = storage.add_attempt( - lead_id=request.lead_id, - campaign_id=request.campaign_id, - idempotency_key=request.idempotency_key, - sender_email=request.sender_email, - subject=request.subject, - body=request.body, - status="approved" if policy.allowed else "blocked", + updated_attempt = storage.update_attempt_status( + attempt_id, + "approved" if policy.allowed else "blocked", decision_reason="; ".join(policy.reasons) if policy.reasons else None, user_id=request.user_id, - ) + ) or attempt return SendOutreachResponse( - attempt_id=attempt.get("attempt_id", ""), - status=attempt.get("status", "failed"), + attempt_id=updated_attempt.get("attempt_id", attempt_id), + status=updated_attempt.get("status", "failed"), policy_allowed=policy.allowed, policy_reasons=policy.reasons, effective_sender_email=request.sender_email, diff --git a/backend/services/backlink_outreach_storage.py b/backend/services/backlink_outreach_storage.py index a19311e1..9caade23 100644 --- a/backend/services/backlink_outreach_storage.py +++ b/backend/services/backlink_outreach_storage.py @@ -6,6 +6,7 @@ from datetime import datetime, date from uuid import uuid4 from typing import List, Optional from sqlalchemy import text as sql_text, func as sa_func +from sqlalchemy.exc import IntegrityError from services.database import get_session_for_user from models.backlink_outreach_models import ( @@ -319,6 +320,79 @@ class BacklinkOutreachStorageService: # -- Outreach Attempt CRUD -- + + def get_attempt_by_idempotency_key(self, idempotency_key: str, user_id: str = "default") -> Optional[dict]: + """Return the existing attempt for an idempotency key visible to the user.""" + self._ensure_tables(user_id) + db = get_session_for_user(user_id) + if not db: + return None + try: + attempt = ( + db.query(OutreachAttempt) + .join(BacklinkCampaign, OutreachAttempt.campaign_id == BacklinkCampaign.id) + .filter( + OutreachAttempt.idempotency_key == idempotency_key, + BacklinkCampaign.user_id == user_id, + ) + .first() + ) + return self._attempt_to_dict(attempt) if attempt else None + finally: + db.close() + + def reserve_attempt_idempotency( + self, + lead_id: str, + campaign_id: str, + idempotency_key: str, + sender_email: str = "", + subject: str = "", + body: str = "", + user_id: str = "default", + ) -> dict: + """Atomically reserve an outreach idempotency key by creating the attempt row. + + Returns {"reserved": True, "attempt": attempt_dict} for the caller that won + the reservation, or {"reserved": False, "attempt": existing_attempt_or_none} + when the unique key already exists. Duplicate rows are detected by the + database unique constraint so concurrent requests do not both proceed to + policy approval or SMTP delivery. + """ + self._ensure_tables(user_id) + db = get_session_for_user(user_id) + if not db: + raise RuntimeError("Database session unavailable") + try: + attempt = OutreachAttempt( + id=f"att_{uuid4().hex[:16]}", + lead_id=lead_id, + campaign_id=campaign_id, + idempotency_key=idempotency_key, + sender_email=sender_email, + subject=subject, + body=body, + status="queued", + created_at=datetime.utcnow(), + ) + db.add(attempt) + db.commit() + return {"reserved": True, "attempt": self._attempt_to_dict(attempt)} + except IntegrityError: + db.rollback() + existing = ( + db.query(OutreachAttempt) + .join(BacklinkCampaign, OutreachAttempt.campaign_id == BacklinkCampaign.id) + .filter( + OutreachAttempt.idempotency_key == idempotency_key, + BacklinkCampaign.user_id == user_id, + ) + .first() + ) + return {"reserved": False, "attempt": self._attempt_to_dict(existing) if existing else None} + finally: + db.close() + def add_attempt( self, lead_id: str, @@ -351,6 +425,20 @@ class BacklinkOutreachStorageService: db.add(attempt) db.commit() return self._attempt_to_dict(attempt) + except IntegrityError: + db.rollback() + existing = ( + db.query(OutreachAttempt) + .join(BacklinkCampaign, OutreachAttempt.campaign_id == BacklinkCampaign.id) + .filter( + OutreachAttempt.idempotency_key == idempotency_key, + BacklinkCampaign.user_id == user_id, + ) + .first() + ) + if existing: + return self._attempt_to_dict(existing) + raise finally: db.close() @@ -756,6 +844,9 @@ class BacklinkOutreachStorageService: db.add(entry) db.commit() return {"idempotency_key": idempotency_key} + except IntegrityError: + db.rollback() + return {"idempotency_key": idempotency_key} finally: db.close()