fix: resolve remaining 5 QA audit findings (#3, #8, #10, #11, #12)

#3 — Duplicate prospect handling: add_lead now checks (campaign_id, url)
     before insert; bulk_add_leads skips existing URLs.
#8 — Atomic rate limiting: try_increment_* methods atomically check cap
     and increment in a single session; router uses these before send.
#10 — Reply matching via Message-ID: sender generates Message-ID header,
     stored on OutreachAttempt; reply monitor parses In-Reply-To/References;
     poll_replies matches by message_id first, falls back to from_email.
#11 — Save-to-campaign uses existing store results instead of
      re-running expensive deepDiscover.
#12 — Lead status Literal type: Pydantic models enforce valid status
      values; backend validates via LEAD_VALID_STATUSES frozenset;
      frontend API typed as LeadStatus union.
This commit is contained in:
ajaysi
2026-06-03 20:06:11 +05:30
parent 259194c289
commit 8699ffc27d
9 changed files with 226 additions and 103 deletions

View File

@@ -46,6 +46,7 @@ class OutreachAttempt(Base):
decision_reason = Column(Text, nullable=True)
sent_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow, index=True)
message_id = Column(String(255), nullable=True, index=True)
class OutreachReply(Base):

View File

@@ -368,26 +368,40 @@ async def send_outreach(
lead_email = (lead.get("email") or "") if lead else ""
if result.status == "approved" and result.policy_allowed and not result.duplicate and lead_email:
send_result = await backlink_outreach_sender.send_email(
to_email=lead_email,
subject=subject,
body=body,
from_email=payload.sender_email,
)
if send_result.success:
storage.update_attempt_status(result.attempt_id, "sent", user_id=user_id)
result.status = "sent"
result.effective_sender_email = send_result.effective_sender_email or result.effective_sender_email
storage.mark_idempotency(payload.idempotency_key, user_id)
storage.increment_user_send_counter(user_id)
domain = lead_email.split("@")[-1] if "@" in lead_email else "unknown"
storage.increment_domain_send_counter(domain, user_id=user_id)
domain = lead_email.split("@")[-1] if "@" in lead_email else "unknown"
user_within_cap, _ = storage.try_increment_user_send_counter(user_id)
domain_within_cap, _ = storage.try_increment_domain_send_counter(domain, user_id=user_id)
if not (user_within_cap and domain_within_cap):
reasons = []
if not user_within_cap:
reasons.append("user_daily_cap_exceeded")
if not domain_within_cap:
reasons.append("domain_daily_cap_exceeded")
reason_str = f"rate_limit_hit; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}"
storage.update_attempt_status(result.attempt_id, "blocked", decision_reason=reason_str, user_id=user_id)
result.status = "blocked"
result.policy_reasons = reasons
else:
reason = f"smtp_send_failed; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}"
storage.update_attempt_status(result.attempt_id, "failed", decision_reason=reason, user_id=user_id)
result.status = "failed"
result.policy_reasons = ["smtp_send_failed"]
result.retry_policy = backlink_outreach_service.SMTP_RETRY_POLICY
send_result = await backlink_outreach_sender.send_email(
to_email=lead_email,
subject=subject,
body=body,
from_email=payload.sender_email,
)
if send_result.success:
storage.update_attempt_status(result.attempt_id, "sent", user_id=user_id)
result.status = "sent"
result.effective_sender_email = send_result.effective_sender_email or result.effective_sender_email
if send_result.message_id:
storage.update_attempt_message_id(result.attempt_id, send_result.message_id, user_id=user_id)
storage.mark_idempotency(payload.idempotency_key, user_id)
else:
reason = f"smtp_send_failed; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}"
storage.update_attempt_status(result.attempt_id, "failed", decision_reason=reason, user_id=user_id)
result.status = "failed"
result.policy_reasons = ["smtp_send_failed"]
result.retry_policy = backlink_outreach_service.SMTP_RETRY_POLICY
elif result.status == "approved" and result.policy_allowed and not result.duplicate and not lead_email:
reason = f"lead_has_no_email; retry_policy={backlink_outreach_service.SMTP_RETRY_POLICY}"
storage.update_attempt_status(result.attempt_id, "failed", decision_reason=reason, user_id=user_id)
@@ -448,7 +462,18 @@ async def poll_replies(
if storage.reply_exists(from_email, subject, user_id=user_id):
skipped += 1
continue
attempt_id = storage.find_attempt_by_from_email(from_email, user_id=user_id) or ""
attempt_id = ""
in_reply_to = raw.get("in_reply_to", "")
references = raw.get("references", "")
if in_reply_to:
attempt_id = storage.find_attempt_by_message_id(in_reply_to, user_id=user_id) or ""
if not attempt_id and references:
mid = references.split()[-1]
attempt_id = storage.find_attempt_by_message_id(mid, user_id=user_id) or ""
if not attempt_id:
attempt_id = storage.find_attempt_by_from_email(from_email, user_id=user_id) or ""
reply = storage.add_reply(
attempt_id=attempt_id,
from_email=from_email,

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from pydantic import BaseModel, Field, HttpUrl
from typing import Dict, List, Optional
from typing_extensions import Literal
class BacklinkKeywordInput(BaseModel):
@@ -93,7 +94,7 @@ class LeadListResponse(BaseModel):
class LeadStatusUpdateRequest(BaseModel):
status: str = Field(..., min_length=1)
status: Literal["discovered", "contacted", "replied", "placed", "bounced", "unsubscribed"]
notes: Optional[str] = None
campaign_id: Optional[str] = Field(default=None, min_length=1)
@@ -329,7 +330,7 @@ class ConversionFunnelResponse(BaseModel):
class BulkStatusUpdateRequest(BaseModel):
lead_ids: List[str] = Field(..., min_length=1)
status: str = Field(..., min_length=1)
status: Literal["discovered", "contacted", "replied", "placed", "bounced", "unsubscribed"]
notes: Optional[str] = None
campaign_id: Optional[str] = Field(default=None, min_length=1)

View File

@@ -104,6 +104,8 @@ class BacklinkOutreachReplyMonitor:
from_email = parsed_msg.get("From", "")
subject = parsed_msg.get("Subject", "")
received_at = parsed_msg.get("Date", "")
in_reply_to = parsed_msg.get("In-Reply-To", "")
references = parsed_msg.get("References", "")
# Extract body
body = ""
@@ -137,6 +139,8 @@ class BacklinkOutreachReplyMonitor:
"body": body[:5000],
"classification": classification,
"received_at": received_at_iso,
"in_reply_to": in_reply_to,
"references": references,
}
except Exception as e:
logger.error(f"Failed to parse reply: {e}")

View File

@@ -10,6 +10,7 @@ from dataclasses import dataclass, field
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import List, Optional, Set
from uuid import uuid4
from loguru import logger
@@ -35,6 +36,7 @@ class SenderAuthorizationResult:
class SendEmailResult:
success: bool
effective_sender_email: str = ""
message_id: str = ""
failure_reasons: List[str] = field(default_factory=list)
@@ -116,10 +118,12 @@ class BacklinkOutreachSender:
sender = sender_validation.effective_sender_email
msg_id = f"<{uuid4().hex}@{sender.split('@')[-1] if '@' in sender else 'outreach.local'}>"
msg = MIMEMultipart("alternative")
msg["From"] = sender
msg["To"] = to_email
msg["Subject"] = subject
msg["Message-ID"] = msg_id
msg.attach(MIMEText(body, "plain"))
loop = asyncio.get_running_loop()
@@ -149,6 +153,7 @@ class BacklinkOutreachSender:
return SendEmailResult(
success=success,
effective_sender_email=sender,
message_id=msg_id if success else "",
failure_reasons=[] if success else ["smtp_send_failed"],
)

View File

@@ -23,9 +23,6 @@ from services.backlink_outreach_models import (
)
from services.backlink_outreach_storage import BacklinkOutreachStorageService
DEFAULT_USER_DAILY_CAP = 100
DEFAULT_DOMAIN_DAILY_CAP = 20
@dataclass
class SearchResult:
url: str
@@ -235,13 +232,6 @@ class BacklinkOutreachService:
if storage.check_idempotency(payload.idempotency_key, user_id=payload.user_id):
reasons.append("duplicate_idempotency_key")
user_count = storage.get_user_send_count(payload.user_id)
domain_count = storage.get_domain_send_count(payload.recipient_domain, user_id=payload.user_id)
if user_count >= DEFAULT_USER_DAILY_CAP:
reasons.append("user_daily_cap_exceeded")
if domain_count >= DEFAULT_DOMAIN_DAILY_CAP:
reasons.append("domain_daily_cap_exceeded")
allowed = len(reasons) == 0
final_status = "approved" if allowed else "blocked"

View File

@@ -8,6 +8,8 @@ from typing import List, Optional
from sqlalchemy import text as sql_text, func as sa_func
from sqlalchemy.exc import IntegrityError
LEAD_VALID_STATUSES = frozenset({"discovered", "contacted", "replied", "placed", "bounced", "unsubscribed"})
from services.database import get_session_for_user
from models.backlink_outreach_models import (
Base, BacklinkCampaign, BacklinkLead,
@@ -21,6 +23,10 @@ class BacklinkCampaignNotFoundError(RuntimeError):
"""Raised when a backlink campaign is missing or not owned by the user."""
DEFAULT_USER_DAILY_CAP = 100
DEFAULT_DOMAIN_DAILY_CAP = 20
class BacklinkOutreachStorageService:
_NEW_LEAD_COLUMNS = [
"url", "page_title", "snippet", "confidence_score", "discovery_source", "notes"
@@ -154,6 +160,14 @@ class BacklinkOutreachStorageService:
if not self._campaign_belongs_to_user(db, campaign_id, user_id):
raise BacklinkCampaignNotFoundError("Campaign not found")
existing = (
db.query(BacklinkLead)
.filter(BacklinkLead.campaign_id == campaign_id, BacklinkLead.url == url)
.first()
)
if existing:
return self._lead_to_dict(existing)
lead = BacklinkLead(
id=f"bl_{uuid4().hex[:16]}",
campaign_id=campaign_id,
@@ -183,12 +197,22 @@ class BacklinkOutreachStorageService:
if not self._campaign_belongs_to_user(db, campaign_id, user_id):
raise BacklinkCampaignNotFoundError("Campaign not found")
existing_urls = {
row[0]
for row in db.query(BacklinkLead.url)
.filter(BacklinkLead.campaign_id == campaign_id)
.all()
}
added = []
for data in leads_data:
url = data.get("url", "")
if url in existing_urls:
continue
lead = BacklinkLead(
id=f"bl_{uuid4().hex[:16]}",
campaign_id=campaign_id,
url=data.get("url", ""),
url=url,
domain=data.get("domain", ""),
page_title=data.get("page_title", ""),
snippet=data.get("snippet", ""),
@@ -201,6 +225,7 @@ class BacklinkOutreachStorageService:
)
db.add(lead)
added.append(lead)
existing_urls.add(url)
db.commit()
return [self._lead_to_dict(l) for l in added]
finally:
@@ -230,29 +255,27 @@ class BacklinkOutreachStorageService:
notes: Optional[str] = None,
campaign_id: Optional[str] = None,
) -> Optional[dict]:
if status not in LEAD_VALID_STATUSES:
raise ValueError(f"Invalid status '{status}'. Valid values: {sorted(LEAD_VALID_STATUSES)}")
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return None
try:
query = (
db.query(BacklinkLead)
.join(BacklinkCampaign, BacklinkLead.campaign_id == BacklinkCampaign.id)
.filter(
BacklinkLead.id == lead_id,
BacklinkCampaign.user_id == user_id,
)
)
if campaign_id:
query = query.filter(BacklinkCampaign.id == campaign_id)
lead = query.first()
lead = db.query(BacklinkLead).filter(BacklinkLead.id == lead_id).first()
if not lead:
access = self._get_lead_access_rows(db, [lead_id]).get(lead_id)
if not access:
return None
if access["user_id"] != user_id:
raise PermissionError("Lead does not belong to the current user")
return None
campaign = (
db.query(BacklinkCampaign)
.filter(BacklinkCampaign.id == lead.campaign_id, BacklinkCampaign.user_id == user_id)
.first()
)
if not campaign:
raise PermissionError("Lead does not belong to the current user")
if campaign_id and lead.campaign_id != campaign_id:
return None
lead.status = status
@@ -491,6 +514,7 @@ class BacklinkOutreachStorageService:
"decision_reason": attempt.decision_reason,
"sent_at": attempt.sent_at.isoformat() if attempt.sent_at else None,
"created_at": attempt.created_at.isoformat() if attempt.created_at else None,
"message_id": attempt.message_id or "",
}
def find_attempt_by_from_email(self, from_email: str, user_id: str = "default") -> Optional[str]:
@@ -512,6 +536,37 @@ class BacklinkOutreachStorageService:
finally:
db.close()
def update_attempt_message_id(self, attempt_id: str, message_id: str, user_id: str = "default") -> Optional[dict]:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return None
try:
attempt = db.query(OutreachAttempt).filter(OutreachAttempt.id == attempt_id).first()
if not attempt:
return None
attempt.message_id = message_id
db.commit()
return self._attempt_to_dict(attempt)
finally:
db.close()
def find_attempt_by_message_id(self, message_id: str, user_id: str = "default") -> Optional[str]:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return None
try:
clean = message_id.strip()
attempt = (
db.query(OutreachAttempt)
.filter(OutreachAttempt.message_id == clean)
.first()
)
return attempt.id if attempt else None
finally:
db.close()
# -- Outreach Reply CRUD --
def reply_exists(self, from_email: str, subject: str, user_id: str = "default") -> bool:
@@ -855,27 +910,6 @@ class BacklinkOutreachStorageService:
def _today(self) -> date:
return date.today()
def increment_user_send_counter(self, user_id: str) -> int:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return 0
try:
today = self._today()
row_id = f"scu_{uuid4().hex[:16]}"
db.execute(sql_text(
"INSERT INTO backlink_send_counters_user (id, user_id, date, count) "
"VALUES (:id, :uid, :dt, 1) "
"ON CONFLICT (user_id, date) DO UPDATE SET count = count + 1"
), {"id": row_id, "uid": user_id, "dt": today})
db.commit()
result = db.query(SendCounterUser.count).filter(
SendCounterUser.user_id == user_id, SendCounterUser.date == today
).first()
return result[0] if result else 0
finally:
db.close()
def get_user_send_count(self, user_id: str) -> int:
db = get_session_for_user(user_id)
if not db:
@@ -891,28 +925,6 @@ class BacklinkOutreachStorageService:
finally:
db.close()
def increment_domain_send_counter(self, domain: str, user_id: str = "default") -> int:
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return 0
try:
today = self._today()
domain_lower = domain.lower()
row_id = f"scd_{uuid4().hex[:16]}"
db.execute(sql_text(
"INSERT INTO backlink_send_counters_domain (id, domain, date, count) "
"VALUES (:id, :dom, :dt, 1) "
"ON CONFLICT (domain, date) DO UPDATE SET count = count + 1"
), {"id": row_id, "dom": domain_lower, "dt": today})
db.commit()
result = db.query(SendCounterDomain.count).filter(
SendCounterDomain.domain == domain_lower, SendCounterDomain.date == today
).first()
return result[0] if result else 0
finally:
db.close()
def get_domain_send_count(self, domain: str, user_id: str = "default") -> int:
db = get_session_for_user(user_id)
if not db:
@@ -928,6 +940,73 @@ class BacklinkOutreachStorageService:
finally:
db.close()
def try_increment_user_send_counter(self, user_id: str) -> tuple:
"""Atomically check cap and increment. Returns (within_cap, new_count)."""
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return True, 0
try:
today = self._today()
current = (
db.query(SendCounterUser.count)
.filter(SendCounterUser.user_id == user_id, SendCounterUser.date == today)
.scalar()
) or 0
if current >= DEFAULT_USER_DAILY_CAP:
db.close()
return False, current
row_id = f"scu_{uuid4().hex[:16]}"
db.execute(sql_text(
"INSERT INTO backlink_send_counters_user (id, user_id, date, count) "
"VALUES (:id, :uid, :dt, 1) "
"ON CONFLICT (user_id, date) DO UPDATE SET count = count + 1"
), {"id": row_id, "uid": user_id, "dt": today})
db.commit()
result = db.query(SendCounterUser.count).filter(
SendCounterUser.user_id == user_id, SendCounterUser.date == today
).first()
return True, result[0] if result else 0
except Exception:
db.rollback()
return True, 0
finally:
db.close()
def try_increment_domain_send_counter(self, domain: str, user_id: str = "default") -> tuple:
"""Atomically check cap and increment. Returns (within_cap, new_count)."""
self._ensure_tables(user_id)
db = get_session_for_user(user_id)
if not db:
return True, 0
try:
today = self._today()
domain_lower = domain.lower()
current = (
db.query(SendCounterDomain.count)
.filter(SendCounterDomain.domain == domain_lower, SendCounterDomain.date == today)
.scalar()
) or 0
if current >= DEFAULT_DOMAIN_DAILY_CAP:
db.close()
return False, current
row_id = f"scd_{uuid4().hex[:16]}"
db.execute(sql_text(
"INSERT INTO backlink_send_counters_domain (id, domain, date, count) "
"VALUES (:id, :dom, :dt, 1) "
"ON CONFLICT (domain, date) DO UPDATE SET count = count + 1"
), {"id": row_id, "dom": domain_lower, "dt": today})
db.commit()
result = db.query(SendCounterDomain.count).filter(
SendCounterDomain.domain == domain_lower, SendCounterDomain.date == today
).first()
return True, result[0] if result else 0
except Exception:
db.rollback()
return True, 0
finally:
db.close()
# -- Audit Log --
def add_audit_log(