Compare commits
1 Commits
v0.5.1
...
codex/add-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a580667876 |
@@ -1,7 +1,7 @@
|
|||||||
"""DB models for production backlink outreach tracking."""
|
"""DB models for production backlink outreach tracking."""
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from sqlalchemy import Column, String, Integer, Float, DateTime, Text, ForeignKey, Index, Boolean, Date
|
from sqlalchemy import Column, String, Integer, Float, DateTime, Text, ForeignKey, Index, Boolean, Date, and_
|
||||||
from sqlalchemy.ext.declarative import declarative_base
|
from sqlalchemy.ext.declarative import declarative_base
|
||||||
|
|
||||||
Base = declarative_base()
|
Base = declarative_base()
|
||||||
@@ -128,6 +128,21 @@ class SendCounterDomain(Base):
|
|||||||
|
|
||||||
|
|
||||||
Index("idx_backlink_campaign_user_date", BacklinkCampaign.user_id, BacklinkCampaign.created_at)
|
Index("idx_backlink_campaign_user_date", BacklinkCampaign.user_id, BacklinkCampaign.created_at)
|
||||||
|
Index(
|
||||||
|
"idx_backlink_lead_campaign_url_unique",
|
||||||
|
BacklinkLead.campaign_id,
|
||||||
|
BacklinkLead.url,
|
||||||
|
unique=True,
|
||||||
|
sqlite_where=and_(BacklinkLead.url.isnot(None), BacklinkLead.url != ""),
|
||||||
|
)
|
||||||
|
Index(
|
||||||
|
"idx_backlink_lead_campaign_domain_email_unique",
|
||||||
|
BacklinkLead.campaign_id,
|
||||||
|
BacklinkLead.domain,
|
||||||
|
BacklinkLead.email,
|
||||||
|
unique=True,
|
||||||
|
sqlite_where=and_(BacklinkLead.email.isnot(None), BacklinkLead.email != ""),
|
||||||
|
)
|
||||||
Index("idx_backlink_attempt_campaign_date", OutreachAttempt.campaign_id, OutreachAttempt.created_at)
|
Index("idx_backlink_attempt_campaign_date", OutreachAttempt.campaign_id, OutreachAttempt.created_at)
|
||||||
Index("idx_backlink_suppressed_email", SuppressedRecipient.email, SuppressedRecipient.user_id)
|
Index("idx_backlink_suppressed_email", SuppressedRecipient.email, SuppressedRecipient.user_id)
|
||||||
Index("idx_backlink_counter_user_date", SendCounterUser.user_id, SendCounterUser.date, unique=True)
|
Index("idx_backlink_counter_user_date", SendCounterUser.user_id, SendCounterUser.date, unique=True)
|
||||||
|
|||||||
@@ -91,10 +91,11 @@ async def discover_deep_backlink_opportunities(
|
|||||||
if payload.campaign_id:
|
if payload.campaign_id:
|
||||||
storage = BacklinkOutreachStorageService()
|
storage = BacklinkOutreachStorageService()
|
||||||
saved = 0
|
saved = 0
|
||||||
|
duplicates_skipped = 0
|
||||||
save_failed = 0
|
save_failed = 0
|
||||||
for opp in result.get("opportunities", []):
|
for opp in result.get("opportunities", []):
|
||||||
try:
|
try:
|
||||||
storage.add_lead(
|
lead = storage.add_lead(
|
||||||
campaign_id=payload.campaign_id,
|
campaign_id=payload.campaign_id,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
url=opp["url"],
|
url=opp["url"],
|
||||||
@@ -105,10 +106,14 @@ async def discover_deep_backlink_opportunities(
|
|||||||
confidence_score=opp.get("confidence_score", 0.0),
|
confidence_score=opp.get("confidence_score", 0.0),
|
||||||
discovery_source=opp.get("discovery_source", "duckduckgo"),
|
discovery_source=opp.get("discovery_source", "duckduckgo"),
|
||||||
)
|
)
|
||||||
saved += 1
|
if lead.get("duplicate") or lead.get("skipped"):
|
||||||
|
duplicates_skipped += 1
|
||||||
|
else:
|
||||||
|
saved += 1
|
||||||
except Exception:
|
except Exception:
|
||||||
save_failed += 1
|
save_failed += 1
|
||||||
result["saved_to_campaign"] = saved
|
result["saved_to_campaign"] = saved
|
||||||
|
result["duplicates_skipped"] = duplicates_skipped
|
||||||
result["save_failed"] = save_failed
|
result["save_failed"] = save_failed
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|||||||
@@ -4,8 +4,10 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from datetime import datetime, date
|
from datetime import datetime, date
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
from typing import List, Optional
|
from typing import List, Optional, Tuple
|
||||||
from sqlalchemy import text as sql_text, func as sa_func
|
from urllib.parse import urlsplit, urlunsplit
|
||||||
|
from sqlalchemy import text as sql_text, func as sa_func, or_
|
||||||
|
from sqlalchemy.exc import IntegrityError
|
||||||
|
|
||||||
from services.database import get_session_for_user
|
from services.database import get_session_for_user
|
||||||
from models.backlink_outreach_models import (
|
from models.backlink_outreach_models import (
|
||||||
@@ -21,6 +23,59 @@ class BacklinkOutreachStorageService:
|
|||||||
"url", "page_title", "snippet", "confidence_score", "discovery_source", "notes"
|
"url", "page_title", "snippet", "confidence_score", "discovery_source", "notes"
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _normalize_email(email: Optional[str]) -> Optional[str]:
|
||||||
|
normalized = (email or "").strip().lower()
|
||||||
|
return normalized or None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _normalize_domain(domain: Optional[str]) -> str:
|
||||||
|
value = (domain or "").strip().lower()
|
||||||
|
if not value:
|
||||||
|
return ""
|
||||||
|
if "://" not in value:
|
||||||
|
value = f"//{value}"
|
||||||
|
parsed = urlsplit(value)
|
||||||
|
hostname = (parsed.hostname or value).strip().lower().rstrip(".")
|
||||||
|
return hostname[4:] if hostname.startswith("www.") else hostname
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _normalize_url(cls, url: Optional[str]) -> str:
|
||||||
|
value = (url or "").strip()
|
||||||
|
if not value:
|
||||||
|
return ""
|
||||||
|
parse_value = value if "://" in value else f"https://{value}"
|
||||||
|
parsed = urlsplit(parse_value)
|
||||||
|
scheme = (parsed.scheme or "https").lower()
|
||||||
|
hostname = (parsed.hostname or "").lower().rstrip(".")
|
||||||
|
if hostname.startswith("www."):
|
||||||
|
hostname = hostname[4:]
|
||||||
|
if not hostname:
|
||||||
|
return value.rstrip("/")
|
||||||
|
try:
|
||||||
|
port = parsed.port
|
||||||
|
except ValueError:
|
||||||
|
port = None
|
||||||
|
netloc = hostname
|
||||||
|
if port and not ((scheme == "http" and port == 80) or (scheme == "https" and port == 443)):
|
||||||
|
netloc = f"{hostname}:{port}"
|
||||||
|
path = parsed.path or ""
|
||||||
|
if path != "/":
|
||||||
|
path = path.rstrip("/")
|
||||||
|
query = parsed.query
|
||||||
|
return urlunsplit((scheme, netloc, path, query, ""))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _normalize_lead_identity(
|
||||||
|
cls, url: Optional[str], domain: Optional[str], email: Optional[str]
|
||||||
|
) -> Tuple[str, str, Optional[str]]:
|
||||||
|
normalized_url = cls._normalize_url(url)
|
||||||
|
normalized_domain = cls._normalize_domain(domain)
|
||||||
|
if not normalized_domain and normalized_url:
|
||||||
|
normalized_domain = cls._normalize_domain(normalized_url)
|
||||||
|
normalized_email = cls._normalize_email(email)
|
||||||
|
return normalized_url, normalized_domain, normalized_email
|
||||||
|
|
||||||
def _ensure_tables(self, user_id: str) -> None:
|
def _ensure_tables(self, user_id: str) -> None:
|
||||||
db = get_session_for_user(user_id)
|
db = get_session_for_user(user_id)
|
||||||
if not db:
|
if not db:
|
||||||
@@ -28,6 +83,7 @@ class BacklinkOutreachStorageService:
|
|||||||
try:
|
try:
|
||||||
Base.metadata.create_all(bind=db.get_bind(), checkfirst=True)
|
Base.metadata.create_all(bind=db.get_bind(), checkfirst=True)
|
||||||
self._migrate_lead_columns(db)
|
self._migrate_lead_columns(db)
|
||||||
|
self._migrate_lead_uniqueness_indexes(db)
|
||||||
finally:
|
finally:
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
@@ -49,6 +105,29 @@ class BacklinkOutreachStorageService:
|
|||||||
except Exception:
|
except Exception:
|
||||||
db.rollback()
|
db.rollback()
|
||||||
|
|
||||||
|
def _migrate_lead_uniqueness_indexes(self, db) -> None:
|
||||||
|
"""Create normalized lead uniqueness indexes when existing data allows it."""
|
||||||
|
index_statements = (
|
||||||
|
"""
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_backlink_lead_campaign_url_unique
|
||||||
|
ON backlink_leads (campaign_id, url)
|
||||||
|
WHERE url IS NOT NULL AND url != ''
|
||||||
|
""",
|
||||||
|
"""
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_backlink_lead_campaign_domain_email_unique
|
||||||
|
ON backlink_leads (campaign_id, domain, email)
|
||||||
|
WHERE email IS NOT NULL AND email != ''
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
for statement in index_statements:
|
||||||
|
try:
|
||||||
|
db.execute(sql_text(statement))
|
||||||
|
db.commit()
|
||||||
|
except Exception:
|
||||||
|
# Existing duplicate historical data should not block app startup;
|
||||||
|
# service-level duplicate checks still prevent new duplicates.
|
||||||
|
db.rollback()
|
||||||
|
|
||||||
def create_campaign(self, user_id: str, workspace_id: str, name: str) -> dict:
|
def create_campaign(self, user_id: str, workspace_id: str, name: str) -> dict:
|
||||||
self._ensure_tables(user_id)
|
self._ensure_tables(user_id)
|
||||||
db = get_session_for_user(user_id)
|
db = get_session_for_user(user_id)
|
||||||
@@ -120,6 +199,43 @@ class BacklinkOutreachStorageService:
|
|||||||
|
|
||||||
# -- Lead CRUD --
|
# -- Lead CRUD --
|
||||||
|
|
||||||
|
def _find_existing_lead(self, db, campaign_id: str, url: str, domain: str, email: Optional[str]):
|
||||||
|
duplicate_filters = []
|
||||||
|
if url:
|
||||||
|
duplicate_filters.append(BacklinkLead.url == url)
|
||||||
|
if domain and email:
|
||||||
|
duplicate_filters.append((BacklinkLead.domain == domain) & (BacklinkLead.email == email))
|
||||||
|
if not duplicate_filters:
|
||||||
|
return None
|
||||||
|
|
||||||
|
existing = (
|
||||||
|
db.query(BacklinkLead)
|
||||||
|
.filter(BacklinkLead.campaign_id == campaign_id)
|
||||||
|
.filter(or_(*duplicate_filters))
|
||||||
|
.order_by(BacklinkLead.created_at.asc())
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if existing:
|
||||||
|
return existing
|
||||||
|
|
||||||
|
# Historical leads may have been stored before normalization. Normalize
|
||||||
|
# candidates in Python so those records are also treated as duplicates.
|
||||||
|
candidates = (
|
||||||
|
db.query(BacklinkLead)
|
||||||
|
.filter(BacklinkLead.campaign_id == campaign_id)
|
||||||
|
.order_by(BacklinkLead.created_at.asc())
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
for candidate in candidates:
|
||||||
|
candidate_url, candidate_domain, candidate_email = self._normalize_lead_identity(
|
||||||
|
candidate.url, candidate.domain, candidate.email
|
||||||
|
)
|
||||||
|
if url and candidate_url == url:
|
||||||
|
return candidate
|
||||||
|
if domain and email and candidate_domain == domain and candidate_email == email:
|
||||||
|
return candidate
|
||||||
|
return None
|
||||||
|
|
||||||
def add_lead(
|
def add_lead(
|
||||||
self,
|
self,
|
||||||
campaign_id: str,
|
campaign_id: str,
|
||||||
@@ -138,14 +254,22 @@ class BacklinkOutreachStorageService:
|
|||||||
if not db:
|
if not db:
|
||||||
raise RuntimeError("Database session unavailable")
|
raise RuntimeError("Database session unavailable")
|
||||||
try:
|
try:
|
||||||
|
normalized_url, normalized_domain, normalized_email = self._normalize_lead_identity(url, domain, email)
|
||||||
|
existing = self._find_existing_lead(db, campaign_id, normalized_url, normalized_domain, normalized_email)
|
||||||
|
if existing:
|
||||||
|
result = self._lead_to_dict(existing)
|
||||||
|
result["duplicate"] = True
|
||||||
|
result["skipped"] = True
|
||||||
|
return result
|
||||||
|
|
||||||
lead = BacklinkLead(
|
lead = BacklinkLead(
|
||||||
id=f"bl_{uuid4().hex[:16]}",
|
id=f"bl_{uuid4().hex[:16]}",
|
||||||
campaign_id=campaign_id,
|
campaign_id=campaign_id,
|
||||||
url=url,
|
url=normalized_url,
|
||||||
domain=domain,
|
domain=normalized_domain,
|
||||||
page_title=page_title,
|
page_title=page_title,
|
||||||
snippet=snippet,
|
snippet=snippet,
|
||||||
email=email,
|
email=normalized_email,
|
||||||
confidence_score=confidence_score,
|
confidence_score=confidence_score,
|
||||||
discovery_source=discovery_source,
|
discovery_source=discovery_source,
|
||||||
status="discovered",
|
status="discovered",
|
||||||
@@ -153,8 +277,21 @@ class BacklinkOutreachStorageService:
|
|||||||
created_at=datetime.utcnow(),
|
created_at=datetime.utcnow(),
|
||||||
)
|
)
|
||||||
db.add(lead)
|
db.add(lead)
|
||||||
db.commit()
|
try:
|
||||||
return self._lead_to_dict(lead)
|
db.commit()
|
||||||
|
except IntegrityError:
|
||||||
|
db.rollback()
|
||||||
|
existing = self._find_existing_lead(db, campaign_id, normalized_url, normalized_domain, normalized_email)
|
||||||
|
if existing:
|
||||||
|
result = self._lead_to_dict(existing)
|
||||||
|
result["duplicate"] = True
|
||||||
|
result["skipped"] = True
|
||||||
|
return result
|
||||||
|
raise
|
||||||
|
result = self._lead_to_dict(lead)
|
||||||
|
result["duplicate"] = False
|
||||||
|
result["skipped"] = False
|
||||||
|
return result
|
||||||
finally:
|
finally:
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
@@ -164,16 +301,27 @@ class BacklinkOutreachStorageService:
|
|||||||
if not db:
|
if not db:
|
||||||
raise RuntimeError("Database session unavailable")
|
raise RuntimeError("Database session unavailable")
|
||||||
try:
|
try:
|
||||||
added = []
|
results = []
|
||||||
for data in leads_data:
|
for data in leads_data:
|
||||||
|
normalized_url, normalized_domain, normalized_email = self._normalize_lead_identity(
|
||||||
|
data.get("url"), data.get("domain"), data.get("email")
|
||||||
|
)
|
||||||
|
existing = self._find_existing_lead(db, campaign_id, normalized_url, normalized_domain, normalized_email)
|
||||||
|
if existing:
|
||||||
|
result = self._lead_to_dict(existing)
|
||||||
|
result["duplicate"] = True
|
||||||
|
result["skipped"] = True
|
||||||
|
results.append(result)
|
||||||
|
continue
|
||||||
|
|
||||||
lead = BacklinkLead(
|
lead = BacklinkLead(
|
||||||
id=f"bl_{uuid4().hex[:16]}",
|
id=f"bl_{uuid4().hex[:16]}",
|
||||||
campaign_id=campaign_id,
|
campaign_id=campaign_id,
|
||||||
url=data.get("url", ""),
|
url=normalized_url,
|
||||||
domain=data.get("domain", ""),
|
domain=normalized_domain,
|
||||||
page_title=data.get("page_title", ""),
|
page_title=data.get("page_title", ""),
|
||||||
snippet=data.get("snippet", ""),
|
snippet=data.get("snippet", ""),
|
||||||
email=data.get("email"),
|
email=normalized_email,
|
||||||
confidence_score=data.get("confidence_score", 0.0),
|
confidence_score=data.get("confidence_score", 0.0),
|
||||||
discovery_source=data.get("discovery_source", "duckduckgo"),
|
discovery_source=data.get("discovery_source", "duckduckgo"),
|
||||||
status="discovered",
|
status="discovered",
|
||||||
@@ -181,9 +329,23 @@ class BacklinkOutreachStorageService:
|
|||||||
created_at=datetime.utcnow(),
|
created_at=datetime.utcnow(),
|
||||||
)
|
)
|
||||||
db.add(lead)
|
db.add(lead)
|
||||||
added.append(lead)
|
try:
|
||||||
db.commit()
|
db.commit()
|
||||||
return [self._lead_to_dict(l) for l in added]
|
except IntegrityError:
|
||||||
|
db.rollback()
|
||||||
|
existing = self._find_existing_lead(db, campaign_id, normalized_url, normalized_domain, normalized_email)
|
||||||
|
if existing:
|
||||||
|
result = self._lead_to_dict(existing)
|
||||||
|
result["duplicate"] = True
|
||||||
|
result["skipped"] = True
|
||||||
|
results.append(result)
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
result = self._lead_to_dict(lead)
|
||||||
|
result["duplicate"] = False
|
||||||
|
result["skipped"] = False
|
||||||
|
results.append(result)
|
||||||
|
return results
|
||||||
finally:
|
finally:
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user