Compare commits

...

1 Commits

Author SHA1 Message Date
ي
a580667876 Prevent duplicate backlink outreach leads 2026-06-03 18:24:46 +05:30
3 changed files with 199 additions and 17 deletions

View File

@@ -1,7 +1,7 @@
"""DB models for production backlink outreach tracking.""" """DB models for production backlink outreach tracking."""
from datetime import datetime from datetime import datetime
from sqlalchemy import Column, String, Integer, Float, DateTime, Text, ForeignKey, Index, Boolean, Date from sqlalchemy import Column, String, Integer, Float, DateTime, Text, ForeignKey, Index, Boolean, Date, and_
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base() Base = declarative_base()
@@ -128,6 +128,21 @@ class SendCounterDomain(Base):
Index("idx_backlink_campaign_user_date", BacklinkCampaign.user_id, BacklinkCampaign.created_at) Index("idx_backlink_campaign_user_date", BacklinkCampaign.user_id, BacklinkCampaign.created_at)
Index(
"idx_backlink_lead_campaign_url_unique",
BacklinkLead.campaign_id,
BacklinkLead.url,
unique=True,
sqlite_where=and_(BacklinkLead.url.isnot(None), BacklinkLead.url != ""),
)
Index(
"idx_backlink_lead_campaign_domain_email_unique",
BacklinkLead.campaign_id,
BacklinkLead.domain,
BacklinkLead.email,
unique=True,
sqlite_where=and_(BacklinkLead.email.isnot(None), BacklinkLead.email != ""),
)
Index("idx_backlink_attempt_campaign_date", OutreachAttempt.campaign_id, OutreachAttempt.created_at) Index("idx_backlink_attempt_campaign_date", OutreachAttempt.campaign_id, OutreachAttempt.created_at)
Index("idx_backlink_suppressed_email", SuppressedRecipient.email, SuppressedRecipient.user_id) Index("idx_backlink_suppressed_email", SuppressedRecipient.email, SuppressedRecipient.user_id)
Index("idx_backlink_counter_user_date", SendCounterUser.user_id, SendCounterUser.date, unique=True) Index("idx_backlink_counter_user_date", SendCounterUser.user_id, SendCounterUser.date, unique=True)

View File

@@ -91,10 +91,11 @@ async def discover_deep_backlink_opportunities(
if payload.campaign_id: if payload.campaign_id:
storage = BacklinkOutreachStorageService() storage = BacklinkOutreachStorageService()
saved = 0 saved = 0
duplicates_skipped = 0
save_failed = 0 save_failed = 0
for opp in result.get("opportunities", []): for opp in result.get("opportunities", []):
try: try:
storage.add_lead( lead = storage.add_lead(
campaign_id=payload.campaign_id, campaign_id=payload.campaign_id,
user_id=user_id, user_id=user_id,
url=opp["url"], url=opp["url"],
@@ -105,10 +106,14 @@ async def discover_deep_backlink_opportunities(
confidence_score=opp.get("confidence_score", 0.0), confidence_score=opp.get("confidence_score", 0.0),
discovery_source=opp.get("discovery_source", "duckduckgo"), discovery_source=opp.get("discovery_source", "duckduckgo"),
) )
if lead.get("duplicate") or lead.get("skipped"):
duplicates_skipped += 1
else:
saved += 1 saved += 1
except Exception: except Exception:
save_failed += 1 save_failed += 1
result["saved_to_campaign"] = saved result["saved_to_campaign"] = saved
result["duplicates_skipped"] = duplicates_skipped
result["save_failed"] = save_failed result["save_failed"] = save_failed
return result return result

View File

@@ -4,8 +4,10 @@ from __future__ import annotations
from datetime import datetime, date from datetime import datetime, date
from uuid import uuid4 from uuid import uuid4
from typing import List, Optional from typing import List, Optional, Tuple
from sqlalchemy import text as sql_text, func as sa_func from urllib.parse import urlsplit, urlunsplit
from sqlalchemy import text as sql_text, func as sa_func, or_
from sqlalchemy.exc import IntegrityError
from services.database import get_session_for_user from services.database import get_session_for_user
from models.backlink_outreach_models import ( from models.backlink_outreach_models import (
@@ -21,6 +23,59 @@ class BacklinkOutreachStorageService:
"url", "page_title", "snippet", "confidence_score", "discovery_source", "notes" "url", "page_title", "snippet", "confidence_score", "discovery_source", "notes"
] ]
@staticmethod
def _normalize_email(email: Optional[str]) -> Optional[str]:
normalized = (email or "").strip().lower()
return normalized or None
@staticmethod
def _normalize_domain(domain: Optional[str]) -> str:
value = (domain or "").strip().lower()
if not value:
return ""
if "://" not in value:
value = f"//{value}"
parsed = urlsplit(value)
hostname = (parsed.hostname or value).strip().lower().rstrip(".")
return hostname[4:] if hostname.startswith("www.") else hostname
@classmethod
def _normalize_url(cls, url: Optional[str]) -> str:
value = (url or "").strip()
if not value:
return ""
parse_value = value if "://" in value else f"https://{value}"
parsed = urlsplit(parse_value)
scheme = (parsed.scheme or "https").lower()
hostname = (parsed.hostname or "").lower().rstrip(".")
if hostname.startswith("www."):
hostname = hostname[4:]
if not hostname:
return value.rstrip("/")
try:
port = parsed.port
except ValueError:
port = None
netloc = hostname
if port and not ((scheme == "http" and port == 80) or (scheme == "https" and port == 443)):
netloc = f"{hostname}:{port}"
path = parsed.path or ""
if path != "/":
path = path.rstrip("/")
query = parsed.query
return urlunsplit((scheme, netloc, path, query, ""))
@classmethod
def _normalize_lead_identity(
cls, url: Optional[str], domain: Optional[str], email: Optional[str]
) -> Tuple[str, str, Optional[str]]:
normalized_url = cls._normalize_url(url)
normalized_domain = cls._normalize_domain(domain)
if not normalized_domain and normalized_url:
normalized_domain = cls._normalize_domain(normalized_url)
normalized_email = cls._normalize_email(email)
return normalized_url, normalized_domain, normalized_email
def _ensure_tables(self, user_id: str) -> None: def _ensure_tables(self, user_id: str) -> None:
db = get_session_for_user(user_id) db = get_session_for_user(user_id)
if not db: if not db:
@@ -28,6 +83,7 @@ class BacklinkOutreachStorageService:
try: try:
Base.metadata.create_all(bind=db.get_bind(), checkfirst=True) Base.metadata.create_all(bind=db.get_bind(), checkfirst=True)
self._migrate_lead_columns(db) self._migrate_lead_columns(db)
self._migrate_lead_uniqueness_indexes(db)
finally: finally:
db.close() db.close()
@@ -49,6 +105,29 @@ class BacklinkOutreachStorageService:
except Exception: except Exception:
db.rollback() db.rollback()
def _migrate_lead_uniqueness_indexes(self, db) -> None:
"""Create normalized lead uniqueness indexes when existing data allows it."""
index_statements = (
"""
CREATE UNIQUE INDEX IF NOT EXISTS idx_backlink_lead_campaign_url_unique
ON backlink_leads (campaign_id, url)
WHERE url IS NOT NULL AND url != ''
""",
"""
CREATE UNIQUE INDEX IF NOT EXISTS idx_backlink_lead_campaign_domain_email_unique
ON backlink_leads (campaign_id, domain, email)
WHERE email IS NOT NULL AND email != ''
""",
)
for statement in index_statements:
try:
db.execute(sql_text(statement))
db.commit()
except Exception:
# Existing duplicate historical data should not block app startup;
# service-level duplicate checks still prevent new duplicates.
db.rollback()
def create_campaign(self, user_id: str, workspace_id: str, name: str) -> dict: def create_campaign(self, user_id: str, workspace_id: str, name: str) -> dict:
self._ensure_tables(user_id) self._ensure_tables(user_id)
db = get_session_for_user(user_id) db = get_session_for_user(user_id)
@@ -120,6 +199,43 @@ class BacklinkOutreachStorageService:
# -- Lead CRUD -- # -- Lead CRUD --
def _find_existing_lead(self, db, campaign_id: str, url: str, domain: str, email: Optional[str]):
duplicate_filters = []
if url:
duplicate_filters.append(BacklinkLead.url == url)
if domain and email:
duplicate_filters.append((BacklinkLead.domain == domain) & (BacklinkLead.email == email))
if not duplicate_filters:
return None
existing = (
db.query(BacklinkLead)
.filter(BacklinkLead.campaign_id == campaign_id)
.filter(or_(*duplicate_filters))
.order_by(BacklinkLead.created_at.asc())
.first()
)
if existing:
return existing
# Historical leads may have been stored before normalization. Normalize
# candidates in Python so those records are also treated as duplicates.
candidates = (
db.query(BacklinkLead)
.filter(BacklinkLead.campaign_id == campaign_id)
.order_by(BacklinkLead.created_at.asc())
.all()
)
for candidate in candidates:
candidate_url, candidate_domain, candidate_email = self._normalize_lead_identity(
candidate.url, candidate.domain, candidate.email
)
if url and candidate_url == url:
return candidate
if domain and email and candidate_domain == domain and candidate_email == email:
return candidate
return None
def add_lead( def add_lead(
self, self,
campaign_id: str, campaign_id: str,
@@ -138,14 +254,22 @@ class BacklinkOutreachStorageService:
if not db: if not db:
raise RuntimeError("Database session unavailable") raise RuntimeError("Database session unavailable")
try: try:
normalized_url, normalized_domain, normalized_email = self._normalize_lead_identity(url, domain, email)
existing = self._find_existing_lead(db, campaign_id, normalized_url, normalized_domain, normalized_email)
if existing:
result = self._lead_to_dict(existing)
result["duplicate"] = True
result["skipped"] = True
return result
lead = BacklinkLead( lead = BacklinkLead(
id=f"bl_{uuid4().hex[:16]}", id=f"bl_{uuid4().hex[:16]}",
campaign_id=campaign_id, campaign_id=campaign_id,
url=url, url=normalized_url,
domain=domain, domain=normalized_domain,
page_title=page_title, page_title=page_title,
snippet=snippet, snippet=snippet,
email=email, email=normalized_email,
confidence_score=confidence_score, confidence_score=confidence_score,
discovery_source=discovery_source, discovery_source=discovery_source,
status="discovered", status="discovered",
@@ -153,8 +277,21 @@ class BacklinkOutreachStorageService:
created_at=datetime.utcnow(), created_at=datetime.utcnow(),
) )
db.add(lead) db.add(lead)
try:
db.commit() db.commit()
return self._lead_to_dict(lead) except IntegrityError:
db.rollback()
existing = self._find_existing_lead(db, campaign_id, normalized_url, normalized_domain, normalized_email)
if existing:
result = self._lead_to_dict(existing)
result["duplicate"] = True
result["skipped"] = True
return result
raise
result = self._lead_to_dict(lead)
result["duplicate"] = False
result["skipped"] = False
return result
finally: finally:
db.close() db.close()
@@ -164,16 +301,27 @@ class BacklinkOutreachStorageService:
if not db: if not db:
raise RuntimeError("Database session unavailable") raise RuntimeError("Database session unavailable")
try: try:
added = [] results = []
for data in leads_data: for data in leads_data:
normalized_url, normalized_domain, normalized_email = self._normalize_lead_identity(
data.get("url"), data.get("domain"), data.get("email")
)
existing = self._find_existing_lead(db, campaign_id, normalized_url, normalized_domain, normalized_email)
if existing:
result = self._lead_to_dict(existing)
result["duplicate"] = True
result["skipped"] = True
results.append(result)
continue
lead = BacklinkLead( lead = BacklinkLead(
id=f"bl_{uuid4().hex[:16]}", id=f"bl_{uuid4().hex[:16]}",
campaign_id=campaign_id, campaign_id=campaign_id,
url=data.get("url", ""), url=normalized_url,
domain=data.get("domain", ""), domain=normalized_domain,
page_title=data.get("page_title", ""), page_title=data.get("page_title", ""),
snippet=data.get("snippet", ""), snippet=data.get("snippet", ""),
email=data.get("email"), email=normalized_email,
confidence_score=data.get("confidence_score", 0.0), confidence_score=data.get("confidence_score", 0.0),
discovery_source=data.get("discovery_source", "duckduckgo"), discovery_source=data.get("discovery_source", "duckduckgo"),
status="discovered", status="discovered",
@@ -181,9 +329,23 @@ class BacklinkOutreachStorageService:
created_at=datetime.utcnow(), created_at=datetime.utcnow(),
) )
db.add(lead) db.add(lead)
added.append(lead) try:
db.commit() db.commit()
return [self._lead_to_dict(l) for l in added] except IntegrityError:
db.rollback()
existing = self._find_existing_lead(db, campaign_id, normalized_url, normalized_domain, normalized_email)
if existing:
result = self._lead_to_dict(existing)
result["duplicate"] = True
result["skipped"] = True
results.append(result)
continue
raise
result = self._lead_to_dict(lead)
result["duplicate"] = False
result["skipped"] = False
results.append(result)
return results
finally: finally:
db.close() db.close()