"""Canonical backlink outreach service entrypoint.""" from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, List, Optional import re import time import requests from bs4 import BeautifulSoup import csv import io from services.backlink_outreach_models import ( OpportunityContactInfo, OpportunityRecord, PolicyValidationRequest, PolicyValidationResponse, SendOutreachRequest, SendOutreachResponse, CampaignVolumeResponse, CampaignVolumePoint, ConversionFunnelResponse, FunnelStage, ) from services.backlink_outreach_storage import BacklinkOutreachStorageService DEFAULT_USER_DAILY_CAP = 100 DEFAULT_DOMAIN_DAILY_CAP = 20 @dataclass class SearchResult: url: str title: str snippet: str class BacklinkOutreachService: def list_backlink_modules(self) -> List[Dict[str, Any]]: return [ {"identifier": "backlink", "module_path": "backend/services/backlink_outreach_service.py", "purpose": "Canonical backlink service facade"}, {"identifier": "outreach", "module_path": "backend/routers/backlink_outreach.py", "purpose": "HTTP API entrypoint for backlink outreach"}, {"identifier": "guest_post", "module_path": "frontend/src/api/backlinkOutreachApi.ts", "purpose": "Frontend API integration for guest-post workflows"}, ] def generate_guest_post_queries(self, keyword: str) -> List[str]: normalized = (keyword or "").strip() if not normalized: return [] return [ f"{normalized} + 'Guest Contributor'", f"{normalized} + 'Add Guest Post'", f"{normalized} + 'Guest Bloggers Wanted'", f"{normalized} + 'Write for Us'", f"{normalized} + 'Submit Guest Post'", f"{normalized} + 'Become a Guest Blogger'", f"{normalized} + 'guest post opportunities'", f"{normalized} + 'Submit article'", ] def search_for_urls(self, query: str, timeout_seconds: int = 12, retries: int = 2) -> List[SearchResult]: encoded_query = requests.utils.quote(query) url = f"https://duckduckgo.com/html/?q={encoded_query}" headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"} for attempt in range(retries + 1): try: response = requests.get(url, headers=headers, timeout=timeout_seconds) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") rows: List[SearchResult] = [] for result in soup.select("div.result")[:10]: anchor = result.select_one("a.result__a") snippet = result.select_one("a.result__snippet") or result.select_one("div.result__snippet") if not anchor or not anchor.get("href"): continue rows.append( SearchResult( url=anchor.get("href"), title=anchor.get_text(strip=True), snippet=snippet.get_text(" ", strip=True) if snippet else "", ) ) return rows except Exception: if attempt == retries: return [] time.sleep(0.6 * (attempt + 1)) return [] def discover_opportunities(self, keyword: str, max_results: int = 10) -> Dict[str, Any]: queries = self.generate_guest_post_queries(keyword)[:4] dedup: Dict[str, SearchResult] = {} for query in queries: for result in self.search_for_urls(query): normalized_url = self._normalize_url(result.url) if not normalized_url or normalized_url in dedup: continue dedup[normalized_url] = result if len(dedup) >= max_results: break if len(dedup) >= max_results: break time.sleep(0.4) opportunities: List[OpportunityRecord] = [] for normalized_url, row in dedup.items(): contact = self._extract_contact_info(row.snippet) score = self._score_confidence(row.title, row.snippet) opportunities.append( OpportunityRecord( url=normalized_url, title=row.title or "Untitled", snippet=row.snippet, metadata={"source": "duckduckgo_html", "query_keyword": keyword}, contact_info=contact, confidence_score=score, ) ) return {"keyword": keyword, "queries": queries, "opportunities": opportunities} def _normalize_url(self, url: str) -> str: u = (url or "").strip() if not u: return "" if u.startswith("//"): u = f"https:{u}" if not re.match(r"^https?://", u): return "" return u.split("#")[0].rstrip("/") def _extract_contact_info(self, text: str) -> OpportunityContactInfo: if not text: return OpportunityContactInfo() email_match = re.search(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", text) return OpportunityContactInfo(email=email_match.group(0) if email_match else None) def _score_confidence(self, title: str, snippet: str) -> float: hay = f"{title} {snippet}".lower() cues = ["write for us", "guest post", "submit", "contributor", "guest blogger"] hits = sum(1 for cue in cues if cue in hay) return min(1.0, 0.35 + (0.13 * hits)) def _get_storage(self) -> BacklinkOutreachStorageService: return BacklinkOutreachStorageService() def validate_send_policy(self, payload: PolicyValidationRequest) -> PolicyValidationResponse: reasons: List[str] = [] storage = self._get_storage() if payload.workspace_id.startswith("new-") and not payload.approved_by_human: reasons.append("human_review_required_for_new_workspace") if payload.legal_basis.lower() not in {"legitimate_interest", "consent", "contract"}: reasons.append("invalid_legal_basis") if payload.recipient_region.lower() in {"eu", "eea"} and payload.legal_basis.lower() != "consent": reasons.append("region_requires_explicit_consent") if len(payload.sender_identity.strip()) < 3: reasons.append("sender_identity_required") if storage.is_suppressed(str(payload.recipient_email), payload.recipient_domain, user_id=payload.user_id): reasons.append("recipient_suppressed") if storage.check_idempotency(payload.idempotency_key, user_id=payload.user_id): reasons.append("duplicate_idempotency_key") user_count = storage.get_user_send_count(payload.user_id) domain_count = storage.get_domain_send_count(payload.recipient_domain, user_id=payload.user_id) if user_count >= DEFAULT_USER_DAILY_CAP: reasons.append("user_daily_cap_exceeded") if domain_count >= DEFAULT_DOMAIN_DAILY_CAP: reasons.append("domain_daily_cap_exceeded") allowed = len(reasons) == 0 final_status = "approved" if allowed else "blocked" storage.add_audit_log( event="policy_check", user_id=payload.user_id, campaign_id=payload.campaign_id, recipient=str(payload.recipient_email), allowed=allowed, reasons=reasons, override=payload.approved_by_human, ) return PolicyValidationResponse(allowed=allowed, reasons=reasons, final_status=final_status) EU_DOMAIN_SUFFIXES = (".de", ".fr", ".it", ".es", ".nl", ".be", ".at", ".se", ".dk", ".fi", ".pt", ".ie", ".gr", ".pl", ".cz", ".ro", ".hu", ".bg", ".hr", ".sk", ".si", ".ee", ".lv", ".lt", ".lu", ".mt", ".cy") def _infer_region(self, domain: str) -> str: d = domain.lower() if any(d.endswith(s) or d.endswith(s + "/") for s in self.EU_DOMAIN_SUFFIXES): return "eu" if d.endswith(".uk"): return "uk" if d.endswith(".ca"): return "ca" if d.endswith(".au"): return "au" return "unknown" def send_outreach(self, request: SendOutreachRequest) -> SendOutreachResponse: storage = self._get_storage() lead = storage.get_lead(request.lead_id, user_id=request.user_id) if not lead: return SendOutreachResponse(attempt_id="", status="failed", policy_allowed=False, policy_reasons=["lead_not_found"]) domain = lead.get("domain", request.sender_email.split("@")[-1] if "@" in request.sender_email else "unknown") recipient_region = self._infer_region(domain) legal_basis = "consent" if recipient_region == "eu" else "legitimate_interest" policy_req = PolicyValidationRequest( user_id=request.user_id, workspace_id=request.workspace_id, campaign_id=request.campaign_id, recipient_email=lead.get("email", ""), recipient_domain=domain, recipient_region=recipient_region, legal_basis=legal_basis, approved_by_human=False, unsubscribe_url=None, sender_identity=request.sender_email, idempotency_key=request.idempotency_key, ) policy = self.validate_send_policy(policy_req) attempt = storage.add_attempt( lead_id=request.lead_id, campaign_id=request.campaign_id, idempotency_key=request.idempotency_key, sender_email=request.sender_email, subject=request.subject, body=request.body, status="approved" if policy.allowed else "blocked", decision_reason="; ".join(policy.reasons) if policy.reasons else None, user_id=request.user_id, ) return SendOutreachResponse( attempt_id=attempt.get("attempt_id", ""), status=attempt.get("status", "failed"), policy_allowed=policy.allowed, policy_reasons=policy.reasons, ) def get_reporting_snapshot(self, user_id: str = "default") -> Dict[str, Any]: storage = self._get_storage() campaigns = storage.list_campaigns(user_id, user_id, limit=100) total_sent = 0 total_replied = 0 total_placed = 0 total_leads = 0 for c in campaigns: cid = c["campaign_id"] attempts = storage.list_attempts(cid, limit=10000, user_id=user_id) leads = storage.list_leads_all(cid, user_id=user_id) total_sent += sum(1 for a in attempts if a.get("status") == "sent") total_replied += storage.count_replies(cid, user_id=user_id) total_placed += sum(1 for l in leads if l.get("status") == "placed") total_leads += len(leads) logs = storage.list_audit_logs("", limit=1000, user_id=user_id) return { "send_volume": total_sent, "decision_events": len(logs), "response_rate": round(total_replied / total_sent, 4) if total_sent > 0 else 0.0, "placement_conversion": round(total_placed / total_leads, 4) if total_leads > 0 else 0.0, } def get_campaign_volume(self, campaign_id: str, days: int = 30, user_id: str = "default") -> CampaignVolumeResponse: storage = self._get_storage() points = storage.get_send_volume_by_day(campaign_id, days, user_id=user_id) return CampaignVolumeResponse( campaign_id=campaign_id, days=days, volume=[CampaignVolumePoint(**p) for p in points], ) def get_campaign_funnel(self, campaign_id: str, user_id: str = "default") -> ConversionFunnelResponse: storage = self._get_storage() stages = storage.get_lead_status_counts(campaign_id, user_id=user_id) return ConversionFunnelResponse( campaign_id=campaign_id, stages=[FunnelStage(**s) for s in stages], ) CSV_LEAD_FIELDS = ["lead_id", "campaign_id", "domain", "page_title", "email", "status", "discovery_source", "created_at"] CSV_ATTEMPT_FIELDS = ["attempt_id", "lead_id", "campaign_id", "sender_email", "subject", "status", "sent_at", "created_at"] CSV_REPLY_FIELDS = ["reply_id", "attempt_id", "from_email", "subject", "classification", "received_at"] @staticmethod def _sanitize_csv_value(value: Any) -> str: s = str(value) if value is not None else "" if s and s[0] in ("=", "+", "-", "@", "\t", "\r"): s = "'" + s return s def export_leads_csv(self, campaign_id: str, user_id: str = "default") -> str: storage = self._get_storage() leads = storage.list_leads_all(campaign_id, user_id=user_id) output = io.StringIO() writer = csv.DictWriter(output, fieldnames=self.CSV_LEAD_FIELDS, extrasaction="ignore") writer.writeheader() for row in leads: writer.writerows([{k: self._sanitize_csv_value(v) for k, v in row.items()}]) return output.getvalue() def export_attempts_csv(self, campaign_id: str, user_id: str = "default") -> str: storage = self._get_storage() attempts = storage.list_attempts_all(campaign_id, user_id=user_id) output = io.StringIO() writer = csv.DictWriter(output, fieldnames=self.CSV_ATTEMPT_FIELDS, extrasaction="ignore") writer.writeheader() for row in attempts: writer.writerows([{k: self._sanitize_csv_value(v) for k, v in row.items()}]) return output.getvalue() def export_replies_csv(self, campaign_id: str, user_id: str = "default") -> str: storage = self._get_storage() replies = storage.list_replies_all(campaign_id, user_id=user_id) output = io.StringIO() writer = csv.DictWriter(output, fieldnames=self.CSV_REPLY_FIELDS, extrasaction="ignore") writer.writeheader() for row in replies: writer.writerows([{k: self._sanitize_csv_value(v) for k, v in row.items()}]) return output.getvalue() async def deep_discover(self, keyword: str, max_results: int = 15) -> Dict[str, Any]: """Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping.""" from services.backlink_outreach_scraper import BacklinkOutreachScraper scraper = BacklinkOutreachScraper(user_id=self._user_id if hasattr(self, '_user_id') else None) return await scraper.deep_discover(keyword, max_results) def get_migration_coverage(self) -> Dict[str, Any]: implemented = [ "discoverable backend router + service", "frontend API/store/UI integration point", "legacy guest-post search query generation templates", "provider-backed URL discovery + normalization + deduplication", "typed opportunity records and confidence score", "deep webpage scraping + contact-page extraction via Exa", "quality scoring and guest-post signal detection", "DB-backed policy validation with suppression & idempotency", "outreach attempt recording + status lifecycle", "SMTP email sending via backlink_outreach_sender", "IMAP reply polling with auto-classification", "follow-up scheduling with sent tracking", "email template CRUD + AI generation (llm_text_gen)", "personalized send via template variables", ] planned = [ "follow-up orchestration and campaign analytics", ] return { "legacy_reference": "ToBeMigrated/ai_marketing_tools/ai_backlinker/ai_backlinking.py", "implemented_count": len(implemented), "planned_count": len(planned), "implemented": implemented, "planned": planned, } backlink_outreach_service = BacklinkOutreachService()