diff --git a/backend/models/backlink_outreach_models.py b/backend/models/backlink_outreach_models.py index 53ca15e4..6bbf7f52 100644 --- a/backend/models/backlink_outreach_models.py +++ b/backend/models/backlink_outreach_models.py @@ -1,7 +1,7 @@ """DB models for production backlink outreach tracking.""" from datetime import datetime -from sqlalchemy import Column, String, Integer, Float, DateTime, Text, ForeignKey, Index, Boolean, Date +from sqlalchemy import Column, String, Integer, Float, DateTime, Text, ForeignKey, Index, Boolean, Date, and_ from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() @@ -128,6 +128,21 @@ class SendCounterDomain(Base): Index("idx_backlink_campaign_user_date", BacklinkCampaign.user_id, BacklinkCampaign.created_at) +Index( + "idx_backlink_lead_campaign_url_unique", + BacklinkLead.campaign_id, + BacklinkLead.url, + unique=True, + sqlite_where=and_(BacklinkLead.url.isnot(None), BacklinkLead.url != ""), +) +Index( + "idx_backlink_lead_campaign_domain_email_unique", + BacklinkLead.campaign_id, + BacklinkLead.domain, + BacklinkLead.email, + unique=True, + sqlite_where=and_(BacklinkLead.email.isnot(None), BacklinkLead.email != ""), +) Index("idx_backlink_attempt_campaign_date", OutreachAttempt.campaign_id, OutreachAttempt.created_at) Index("idx_backlink_suppressed_email", SuppressedRecipient.email, SuppressedRecipient.user_id) Index("idx_backlink_counter_user_date", SendCounterUser.user_id, SendCounterUser.date, unique=True) diff --git a/backend/routers/backlink_outreach.py b/backend/routers/backlink_outreach.py index 1900f38b..62d7e1bb 100644 --- a/backend/routers/backlink_outreach.py +++ b/backend/routers/backlink_outreach.py @@ -91,10 +91,11 @@ async def discover_deep_backlink_opportunities( if payload.campaign_id: storage = BacklinkOutreachStorageService() saved = 0 + duplicates_skipped = 0 save_failed = 0 for opp in result.get("opportunities", []): try: - storage.add_lead( + lead = storage.add_lead( campaign_id=payload.campaign_id, user_id=user_id, url=opp["url"], @@ -105,10 +106,14 @@ async def discover_deep_backlink_opportunities( confidence_score=opp.get("confidence_score", 0.0), discovery_source=opp.get("discovery_source", "duckduckgo"), ) - saved += 1 + if lead.get("duplicate") or lead.get("skipped"): + duplicates_skipped += 1 + else: + saved += 1 except Exception: save_failed += 1 result["saved_to_campaign"] = saved + result["duplicates_skipped"] = duplicates_skipped result["save_failed"] = save_failed return result diff --git a/backend/services/backlink_outreach_storage.py b/backend/services/backlink_outreach_storage.py index b7498aca..ebf24220 100644 --- a/backend/services/backlink_outreach_storage.py +++ b/backend/services/backlink_outreach_storage.py @@ -4,8 +4,10 @@ from __future__ import annotations from datetime import datetime, date from uuid import uuid4 -from typing import List, Optional -from sqlalchemy import text as sql_text, func as sa_func +from typing import List, Optional, Tuple +from urllib.parse import urlsplit, urlunsplit +from sqlalchemy import text as sql_text, func as sa_func, or_ +from sqlalchemy.exc import IntegrityError from services.database import get_session_for_user from models.backlink_outreach_models import ( @@ -21,6 +23,59 @@ class BacklinkOutreachStorageService: "url", "page_title", "snippet", "confidence_score", "discovery_source", "notes" ] + @staticmethod + def _normalize_email(email: Optional[str]) -> Optional[str]: + normalized = (email or "").strip().lower() + return normalized or None + + @staticmethod + def _normalize_domain(domain: Optional[str]) -> str: + value = (domain or "").strip().lower() + if not value: + return "" + if "://" not in value: + value = f"//{value}" + parsed = urlsplit(value) + hostname = (parsed.hostname or value).strip().lower().rstrip(".") + return hostname[4:] if hostname.startswith("www.") else hostname + + @classmethod + def _normalize_url(cls, url: Optional[str]) -> str: + value = (url or "").strip() + if not value: + return "" + parse_value = value if "://" in value else f"https://{value}" + parsed = urlsplit(parse_value) + scheme = (parsed.scheme or "https").lower() + hostname = (parsed.hostname or "").lower().rstrip(".") + if hostname.startswith("www."): + hostname = hostname[4:] + if not hostname: + return value.rstrip("/") + try: + port = parsed.port + except ValueError: + port = None + netloc = hostname + if port and not ((scheme == "http" and port == 80) or (scheme == "https" and port == 443)): + netloc = f"{hostname}:{port}" + path = parsed.path or "" + if path != "/": + path = path.rstrip("/") + query = parsed.query + return urlunsplit((scheme, netloc, path, query, "")) + + @classmethod + def _normalize_lead_identity( + cls, url: Optional[str], domain: Optional[str], email: Optional[str] + ) -> Tuple[str, str, Optional[str]]: + normalized_url = cls._normalize_url(url) + normalized_domain = cls._normalize_domain(domain) + if not normalized_domain and normalized_url: + normalized_domain = cls._normalize_domain(normalized_url) + normalized_email = cls._normalize_email(email) + return normalized_url, normalized_domain, normalized_email + def _ensure_tables(self, user_id: str) -> None: db = get_session_for_user(user_id) if not db: @@ -28,6 +83,7 @@ class BacklinkOutreachStorageService: try: Base.metadata.create_all(bind=db.get_bind(), checkfirst=True) self._migrate_lead_columns(db) + self._migrate_lead_uniqueness_indexes(db) finally: db.close() @@ -49,6 +105,29 @@ class BacklinkOutreachStorageService: except Exception: db.rollback() + def _migrate_lead_uniqueness_indexes(self, db) -> None: + """Create normalized lead uniqueness indexes when existing data allows it.""" + index_statements = ( + """ + CREATE UNIQUE INDEX IF NOT EXISTS idx_backlink_lead_campaign_url_unique + ON backlink_leads (campaign_id, url) + WHERE url IS NOT NULL AND url != '' + """, + """ + CREATE UNIQUE INDEX IF NOT EXISTS idx_backlink_lead_campaign_domain_email_unique + ON backlink_leads (campaign_id, domain, email) + WHERE email IS NOT NULL AND email != '' + """, + ) + for statement in index_statements: + try: + db.execute(sql_text(statement)) + db.commit() + except Exception: + # Existing duplicate historical data should not block app startup; + # service-level duplicate checks still prevent new duplicates. + db.rollback() + def create_campaign(self, user_id: str, workspace_id: str, name: str) -> dict: self._ensure_tables(user_id) db = get_session_for_user(user_id) @@ -120,6 +199,43 @@ class BacklinkOutreachStorageService: # -- Lead CRUD -- + def _find_existing_lead(self, db, campaign_id: str, url: str, domain: str, email: Optional[str]): + duplicate_filters = [] + if url: + duplicate_filters.append(BacklinkLead.url == url) + if domain and email: + duplicate_filters.append((BacklinkLead.domain == domain) & (BacklinkLead.email == email)) + if not duplicate_filters: + return None + + existing = ( + db.query(BacklinkLead) + .filter(BacklinkLead.campaign_id == campaign_id) + .filter(or_(*duplicate_filters)) + .order_by(BacklinkLead.created_at.asc()) + .first() + ) + if existing: + return existing + + # Historical leads may have been stored before normalization. Normalize + # candidates in Python so those records are also treated as duplicates. + candidates = ( + db.query(BacklinkLead) + .filter(BacklinkLead.campaign_id == campaign_id) + .order_by(BacklinkLead.created_at.asc()) + .all() + ) + for candidate in candidates: + candidate_url, candidate_domain, candidate_email = self._normalize_lead_identity( + candidate.url, candidate.domain, candidate.email + ) + if url and candidate_url == url: + return candidate + if domain and email and candidate_domain == domain and candidate_email == email: + return candidate + return None + def add_lead( self, campaign_id: str, @@ -138,14 +254,22 @@ class BacklinkOutreachStorageService: if not db: raise RuntimeError("Database session unavailable") try: + normalized_url, normalized_domain, normalized_email = self._normalize_lead_identity(url, domain, email) + existing = self._find_existing_lead(db, campaign_id, normalized_url, normalized_domain, normalized_email) + if existing: + result = self._lead_to_dict(existing) + result["duplicate"] = True + result["skipped"] = True + return result + lead = BacklinkLead( id=f"bl_{uuid4().hex[:16]}", campaign_id=campaign_id, - url=url, - domain=domain, + url=normalized_url, + domain=normalized_domain, page_title=page_title, snippet=snippet, - email=email, + email=normalized_email, confidence_score=confidence_score, discovery_source=discovery_source, status="discovered", @@ -153,8 +277,21 @@ class BacklinkOutreachStorageService: created_at=datetime.utcnow(), ) db.add(lead) - db.commit() - return self._lead_to_dict(lead) + try: + db.commit() + except IntegrityError: + db.rollback() + existing = self._find_existing_lead(db, campaign_id, normalized_url, normalized_domain, normalized_email) + if existing: + result = self._lead_to_dict(existing) + result["duplicate"] = True + result["skipped"] = True + return result + raise + result = self._lead_to_dict(lead) + result["duplicate"] = False + result["skipped"] = False + return result finally: db.close() @@ -164,16 +301,27 @@ class BacklinkOutreachStorageService: if not db: raise RuntimeError("Database session unavailable") try: - added = [] + results = [] for data in leads_data: + normalized_url, normalized_domain, normalized_email = self._normalize_lead_identity( + data.get("url"), data.get("domain"), data.get("email") + ) + existing = self._find_existing_lead(db, campaign_id, normalized_url, normalized_domain, normalized_email) + if existing: + result = self._lead_to_dict(existing) + result["duplicate"] = True + result["skipped"] = True + results.append(result) + continue + lead = BacklinkLead( id=f"bl_{uuid4().hex[:16]}", campaign_id=campaign_id, - url=data.get("url", ""), - domain=data.get("domain", ""), + url=normalized_url, + domain=normalized_domain, page_title=data.get("page_title", ""), snippet=data.get("snippet", ""), - email=data.get("email"), + email=normalized_email, confidence_score=data.get("confidence_score", 0.0), discovery_source=data.get("discovery_source", "duckduckgo"), status="discovered", @@ -181,9 +329,23 @@ class BacklinkOutreachStorageService: created_at=datetime.utcnow(), ) db.add(lead) - added.append(lead) - db.commit() - return [self._lead_to_dict(l) for l in added] + try: + db.commit() + except IntegrityError: + db.rollback() + existing = self._find_existing_lead(db, campaign_id, normalized_url, normalized_domain, normalized_email) + if existing: + result = self._lead_to_dict(existing) + result["duplicate"] = True + result["skipped"] = True + results.append(result) + continue + raise + result = self._lead_to_dict(lead) + result["duplicate"] = False + result["skipped"] = False + results.append(result) + return results finally: db.close()