Files
ALwrity/backend/services/backlink_outreach_service.py

420 lines
18 KiB
Python

"""Canonical backlink outreach service entrypoint."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import re
import time
import requests
from bs4 import BeautifulSoup
import csv
import io
from services.backlink_outreach_models import (
OpportunityContactInfo, OpportunityRecord,
PolicyValidationRequest, PolicyValidationResponse,
SendOutreachRequest, SendOutreachResponse,
CampaignVolumeResponse, CampaignVolumePoint,
ConversionFunnelResponse, FunnelStage,
)
from services.backlink_outreach_storage import BacklinkOutreachStorageService
DEFAULT_USER_DAILY_CAP = 100
DEFAULT_DOMAIN_DAILY_CAP = 20
@dataclass
class SearchResult:
url: str
title: str
snippet: str
class BacklinkOutreachService:
def list_backlink_modules(self) -> List[Dict[str, Any]]:
return [
{"identifier": "backlink", "module_path": "backend/services/backlink_outreach_service.py", "purpose": "Canonical backlink service facade"},
{"identifier": "outreach", "module_path": "backend/routers/backlink_outreach.py", "purpose": "HTTP API entrypoint for backlink outreach"},
{"identifier": "guest_post", "module_path": "frontend/src/api/backlinkOutreachApi.ts", "purpose": "Frontend API integration for guest-post workflows"},
]
def generate_guest_post_queries(self, keyword: str) -> List[str]:
normalized = (keyword or "").strip()
if not normalized:
return []
return [
f"{normalized} + 'Guest Contributor'",
f"{normalized} + 'Add Guest Post'",
f"{normalized} + 'Guest Bloggers Wanted'",
f"{normalized} + 'Write for Us'",
f"{normalized} + 'Submit Guest Post'",
f"{normalized} + 'Become a Guest Blogger'",
f"{normalized} + 'guest post opportunities'",
f"{normalized} + 'Submit article'",
]
def search_for_urls(self, query: str, timeout_seconds: int = 12, retries: int = 2) -> List[SearchResult]:
encoded_query = requests.utils.quote(query)
url = f"https://duckduckgo.com/html/?q={encoded_query}"
headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"}
for attempt in range(retries + 1):
try:
response = requests.get(url, headers=headers, timeout=timeout_seconds)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
rows: List[SearchResult] = []
for result in soup.select("div.result")[:10]:
anchor = result.select_one("a.result__a")
snippet = result.select_one("a.result__snippet") or result.select_one("div.result__snippet")
if not anchor or not anchor.get("href"):
continue
rows.append(
SearchResult(
url=anchor.get("href"),
title=anchor.get_text(strip=True),
snippet=snippet.get_text(" ", strip=True) if snippet else "",
)
)
return rows
except Exception:
if attempt == retries:
return []
time.sleep(0.6 * (attempt + 1))
return []
def discover_opportunities(self, keyword: str, max_results: int = 10) -> Dict[str, Any]:
queries = self.generate_guest_post_queries(keyword)[:4]
dedup: Dict[str, SearchResult] = {}
for query in queries:
for result in self.search_for_urls(query):
normalized_url = self._normalize_url(result.url)
if not normalized_url or normalized_url in dedup:
continue
dedup[normalized_url] = result
if len(dedup) >= max_results:
break
if len(dedup) >= max_results:
break
time.sleep(0.4)
opportunities: List[OpportunityRecord] = []
for normalized_url, row in dedup.items():
contact = self._extract_contact_info(row.snippet)
score = self._score_confidence(row.title, row.snippet)
opportunities.append(
OpportunityRecord(
url=normalized_url,
title=row.title or "Untitled",
snippet=row.snippet,
metadata={"source": "duckduckgo_html", "query_keyword": keyword},
contact_info=contact,
confidence_score=score,
)
)
return {"keyword": keyword, "queries": queries, "opportunities": opportunities}
def _normalize_url(self, url: str) -> str:
u = (url or "").strip()
if not u:
return ""
if u.startswith("//"):
u = f"https:{u}"
if not re.match(r"^https?://", u):
return ""
return u.split("#")[0].rstrip("/")
def _extract_contact_info(self, text: str) -> OpportunityContactInfo:
if not text:
return OpportunityContactInfo()
email_match = re.search(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", text)
return OpportunityContactInfo(email=email_match.group(0) if email_match else None)
def _score_confidence(self, title: str, snippet: str) -> float:
hay = f"{title} {snippet}".lower()
cues = ["write for us", "guest post", "submit", "contributor", "guest blogger"]
hits = sum(1 for cue in cues if cue in hay)
return min(1.0, 0.35 + (0.13 * hits))
def _get_storage(self) -> BacklinkOutreachStorageService:
return BacklinkOutreachStorageService()
def validate_send_policy(self, payload: PolicyValidationRequest) -> PolicyValidationResponse:
reasons: List[str] = []
storage = self._get_storage()
if payload.workspace_id.startswith("new-") and not payload.approved_by_human:
reasons.append("human_review_required_for_new_workspace")
if payload.legal_basis.lower() not in {"legitimate_interest", "consent", "contract"}:
reasons.append("invalid_legal_basis")
if payload.recipient_region.lower() in {"eu", "eea"} and payload.legal_basis.lower() != "consent":
reasons.append("region_requires_explicit_consent")
if len(payload.sender_identity.strip()) < 3:
reasons.append("sender_identity_required")
if storage.is_suppressed(str(payload.recipient_email), payload.recipient_domain, user_id=payload.user_id):
reasons.append("recipient_suppressed")
if storage.check_idempotency(payload.idempotency_key, user_id=payload.user_id):
reasons.append("duplicate_idempotency_key")
user_count = storage.get_user_send_count(payload.user_id)
domain_count = storage.get_domain_send_count(payload.recipient_domain, user_id=payload.user_id)
if user_count >= DEFAULT_USER_DAILY_CAP:
reasons.append("user_daily_cap_exceeded")
if domain_count >= DEFAULT_DOMAIN_DAILY_CAP:
reasons.append("domain_daily_cap_exceeded")
allowed = len(reasons) == 0
final_status = "approved" if allowed else "blocked"
storage.add_audit_log(
event="policy_check",
user_id=payload.user_id,
campaign_id=payload.campaign_id,
recipient=str(payload.recipient_email),
allowed=allowed,
reasons=reasons,
override=payload.approved_by_human,
)
return PolicyValidationResponse(allowed=allowed, reasons=reasons, final_status=final_status)
EU_DOMAIN_SUFFIXES = (".de", ".fr", ".it", ".es", ".nl", ".be", ".at", ".se", ".dk", ".fi", ".pt", ".ie", ".gr", ".pl", ".cz", ".ro", ".hu", ".bg", ".hr", ".sk", ".si", ".ee", ".lv", ".lt", ".lu", ".mt", ".cy")
def _infer_region(self, domain: str) -> str:
d = domain.lower()
if any(d.endswith(s) or d.endswith(s + "/") for s in self.EU_DOMAIN_SUFFIXES):
return "eu"
if d.endswith(".uk"):
return "uk"
if d.endswith(".ca"):
return "ca"
if d.endswith(".au"):
return "au"
return "unknown"
SMTP_RETRY_POLICY = "manual_retry_with_new_idempotency_key"
@staticmethod
def _decision_parts(attempt: Optional[dict]) -> List[str]:
if not attempt:
return []
reason = attempt.get("decision_reason") or ""
return [part.strip() for part in reason.split(";") if part.strip()]
def response_from_attempt(self, attempt: Optional[dict], duplicate: bool = False) -> SendOutreachResponse:
if not attempt:
return SendOutreachResponse(
attempt_id="",
status="duplicate",
policy_allowed=False,
policy_reasons=["duplicate_idempotency_key"],
duplicate=True,
)
status = attempt.get("status", "failed")
parts = self._decision_parts(attempt)
retry_policy = next((part.split("=", 1)[1] for part in parts if part.startswith("retry_policy=")), None)
reasons = [part for part in parts if not part.startswith("retry_policy=")]
if not retry_policy and ("smtp_send_failed" in reasons or "lead_has_no_email" in reasons):
retry_policy = self.SMTP_RETRY_POLICY
policy_allowed = status in {"queued", "approved", "sent", "failed"} and not any(
reason.startswith("human_review_required")
or reason in {
"invalid_legal_basis",
"region_requires_explicit_consent",
"sender_identity_required",
"recipient_suppressed",
"user_daily_cap_exceeded",
"domain_daily_cap_exceeded",
}
for reason in reasons
)
if status == "blocked":
policy_allowed = False
return SendOutreachResponse(
attempt_id=attempt.get("attempt_id", ""),
status=status,
policy_allowed=policy_allowed,
policy_reasons=reasons,
duplicate=duplicate,
retry_policy=retry_policy,
)
def send_outreach(self, request: SendOutreachRequest) -> SendOutreachResponse:
storage = self._get_storage()
lead = storage.get_lead(request.lead_id, user_id=request.user_id)
if not lead:
return SendOutreachResponse(attempt_id="", status="failed", policy_allowed=False, policy_reasons=["lead_not_found"])
reservation = storage.reserve_attempt_idempotency(
lead_id=request.lead_id,
campaign_id=request.campaign_id,
idempotency_key=request.idempotency_key,
sender_email=request.sender_email,
subject=request.subject,
body=request.body,
user_id=request.user_id,
)
if not reservation.get("reserved"):
return self.response_from_attempt(reservation.get("attempt"), duplicate=True)
attempt = reservation.get("attempt") or {}
attempt_id = attempt.get("attempt_id", "")
domain = lead.get("domain", request.sender_email.split("@")[-1] if "@" in request.sender_email else "unknown")
recipient_region = self._infer_region(domain)
legal_basis = "consent" if recipient_region == "eu" else "legitimate_interest"
policy_req = PolicyValidationRequest(
user_id=request.user_id,
workspace_id=request.workspace_id,
campaign_id=request.campaign_id,
recipient_email=lead.get("email", ""),
recipient_domain=domain,
recipient_region=recipient_region,
legal_basis=legal_basis,
approved_by_human=False,
unsubscribe_url=None,
sender_identity=request.sender_email,
idempotency_key=request.idempotency_key,
)
policy = self.validate_send_policy(policy_req)
updated_attempt = storage.update_attempt_status(
attempt_id,
"approved" if policy.allowed else "blocked",
decision_reason="; ".join(policy.reasons) if policy.reasons else None,
user_id=request.user_id,
) or attempt
return SendOutreachResponse(
attempt_id=updated_attempt.get("attempt_id", attempt_id),
status=updated_attempt.get("status", "failed"),
policy_allowed=policy.allowed,
policy_reasons=policy.reasons,
)
def get_reporting_snapshot(self, user_id: str = "default") -> Dict[str, Any]:
storage = self._get_storage()
campaigns = storage.list_campaigns(user_id, user_id, limit=100)
total_sent = 0
total_replied = 0
total_placed = 0
total_leads = 0
for c in campaigns:
cid = c["campaign_id"]
attempts = storage.list_attempts(cid, limit=10000, user_id=user_id)
leads = storage.list_leads_all(cid, user_id=user_id)
total_sent += sum(1 for a in attempts if a.get("status") == "sent")
total_replied += storage.count_replies(cid, user_id=user_id)
total_placed += sum(1 for l in leads if l.get("status") == "placed")
total_leads += len(leads)
logs = storage.list_audit_logs("", limit=1000, user_id=user_id)
return {
"send_volume": total_sent,
"decision_events": len(logs),
"response_rate": round(total_replied / total_sent, 4) if total_sent > 0 else 0.0,
"placement_conversion": round(total_placed / total_leads, 4) if total_leads > 0 else 0.0,
}
def get_campaign_volume(self, campaign_id: str, days: int = 30, user_id: str = "default") -> CampaignVolumeResponse:
storage = self._get_storage()
points = storage.get_send_volume_by_day(campaign_id, days, user_id=user_id)
return CampaignVolumeResponse(
campaign_id=campaign_id, days=days,
volume=[CampaignVolumePoint(**p) for p in points],
)
def get_campaign_funnel(self, campaign_id: str, user_id: str = "default") -> ConversionFunnelResponse:
storage = self._get_storage()
stages = storage.get_lead_status_counts(campaign_id, user_id=user_id)
return ConversionFunnelResponse(
campaign_id=campaign_id,
stages=[FunnelStage(**s) for s in stages],
)
CSV_LEAD_FIELDS = ["lead_id", "campaign_id", "domain", "page_title", "email", "status", "discovery_source", "created_at"]
CSV_ATTEMPT_FIELDS = ["attempt_id", "lead_id", "campaign_id", "sender_email", "subject", "status", "sent_at", "created_at"]
CSV_REPLY_FIELDS = ["reply_id", "attempt_id", "from_email", "subject", "classification", "received_at"]
@staticmethod
def _sanitize_csv_value(value: Any) -> str:
s = str(value) if value is not None else ""
if s and s[0] in ("=", "+", "-", "@", "\t", "\r"):
s = "'" + s
return s
def export_leads_csv(self, campaign_id: str, user_id: str = "default") -> str:
storage = self._get_storage()
leads = storage.list_leads_all(campaign_id, user_id=user_id)
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=self.CSV_LEAD_FIELDS, extrasaction="ignore")
writer.writeheader()
for row in leads:
writer.writerows([{k: self._sanitize_csv_value(v) for k, v in row.items()}])
return output.getvalue()
def export_attempts_csv(self, campaign_id: str, user_id: str = "default") -> str:
storage = self._get_storage()
attempts = storage.list_attempts_all(campaign_id, user_id=user_id)
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=self.CSV_ATTEMPT_FIELDS, extrasaction="ignore")
writer.writeheader()
for row in attempts:
writer.writerows([{k: self._sanitize_csv_value(v) for k, v in row.items()}])
return output.getvalue()
def export_replies_csv(self, campaign_id: str, user_id: str = "default") -> str:
storage = self._get_storage()
replies = storage.list_replies_all(campaign_id, user_id=user_id)
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=self.CSV_REPLY_FIELDS, extrasaction="ignore")
writer.writeheader()
for row in replies:
writer.writerows([{k: self._sanitize_csv_value(v) for k, v in row.items()}])
return output.getvalue()
async def deep_discover(self, keyword: str, max_results: int = 15) -> Dict[str, Any]:
"""Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping."""
from services.backlink_outreach_scraper import BacklinkOutreachScraper
scraper = BacklinkOutreachScraper(user_id=self._user_id if hasattr(self, '_user_id') else None)
return await scraper.deep_discover(keyword, max_results)
def get_migration_coverage(self) -> Dict[str, Any]:
implemented = [
"discoverable backend router + service",
"frontend API/store/UI integration point",
"legacy guest-post search query generation templates",
"provider-backed URL discovery + normalization + deduplication",
"typed opportunity records and confidence score",
"deep webpage scraping + contact-page extraction via Exa",
"quality scoring and guest-post signal detection",
"DB-backed policy validation with suppression & idempotency",
"outreach attempt recording + status lifecycle",
"SMTP email sending via backlink_outreach_sender",
"IMAP reply polling with auto-classification",
"follow-up scheduling with sent tracking",
"email template CRUD + AI generation (llm_text_gen)",
"personalized send via template variables",
]
planned = [
"follow-up orchestration and campaign analytics",
]
return {
"legacy_reference": "ToBeMigrated/ai_marketing_tools/ai_backlinker/ai_backlinking.py",
"implemented_count": len(implemented),
"planned_count": len(planned),
"implemented": implemented,
"planned": planned,
}
backlink_outreach_service = BacklinkOutreachService()