Merge remote-tracking branch 'origin/codex/refactor-backlink-outreach-services-for-async-support'

# Conflicts:
#	backend/routers/backlink_outreach.py
This commit is contained in:
ajaysi
2026-06-03 18:49:01 +05:30
3 changed files with 202 additions and 105 deletions

View File

@@ -71,7 +71,7 @@ async def discover_backlink_opportunities(
payload: BacklinkKeywordInput, payload: BacklinkKeywordInput,
current_user: Dict[str, Any] = Depends(get_current_user), current_user: Dict[str, Any] = Depends(get_current_user),
): ):
return backlink_outreach_service.discover_opportunities(payload.keyword, payload.max_results) return await backlink_outreach_service.discover_opportunities_async(payload.keyword, payload.max_results)
@router.get("/migration-coverage") @router.get("/migration-coverage")
@@ -87,6 +87,8 @@ async def get_backlink_migration_coverage(
async def discover_deep_backlink_opportunities( async def discover_deep_backlink_opportunities(
payload: DeepKeywordInput, payload: DeepKeywordInput,
current_user: Dict[str, Any] = Depends(get_current_user), current_user: Dict[str, Any] = Depends(get_current_user),
scrape_timeout_seconds: float = Query(15.0, ge=1.0, le=60.0),
scrape_max_concurrency: int = Query(5, ge=1, le=20),
): ):
"""Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping.""" """Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping."""
user_id = _resolve_user_id(current_user) user_id = _resolve_user_id(current_user)
@@ -96,7 +98,13 @@ async def discover_deep_backlink_opportunities(
if not storage.get_campaign(payload.campaign_id, user_id): if not storage.get_campaign(payload.campaign_id, user_id):
raise HTTPException(status_code=404, detail="Campaign not found") raise HTTPException(status_code=404, detail="Campaign not found")
result = await backlink_outreach_service.deep_discover(payload.keyword, payload.max_results) result = await backlink_outreach_service.deep_discover(
payload.keyword,
payload.max_results,
user_id=user_id,
scrape_timeout_seconds=scrape_timeout_seconds,
scrape_max_concurrency=scrape_max_concurrency,
)
if payload.campaign_id: if payload.campaign_id:
saved = 0 saved = 0
save_failed = 0 save_failed = 0

View File

@@ -8,11 +8,10 @@ from __future__ import annotations
import asyncio import asyncio
import re import re
import time
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from urllib.parse import urlparse from urllib.parse import quote, urlparse
import requests import httpx
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from loguru import logger from loguru import logger
@@ -34,26 +33,47 @@ class BacklinkOutreachScraper:
# -- Public API -- # -- Public API --
async def deep_discover( async def deep_discover(
self, keyword: str, max_results: int = 15 self,
keyword: str,
max_results: int = 15,
scrape_timeout_seconds: float = 15.0,
scrape_max_concurrency: int = 5,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Discover guest-post opportunities using Exa, falling back to DuckDuckGo.""" """Discover guest-post opportunities using Exa, falling back to DuckDuckGo."""
if self._is_exa_available(): if self._is_exa_available():
logger.info(f"[BacklinkScraper] Using Exa for keyword: {keyword}") logger.info(f"[BacklinkScraper] Using Exa for keyword: {keyword}")
return await self._discover_with_exa(keyword, max_results) return await self._discover_with_exa(keyword, max_results)
logger.info(f"[BacklinkScraper] Exa unavailable, falling back to DuckDuckGo for: {keyword}") logger.info(f"[BacklinkScraper] Exa unavailable, falling back to DuckDuckGo for: {keyword}")
return await self._discover_with_duckduckgo(keyword, max_results) return await self._discover_with_duckduckgo(
keyword,
max_results,
scrape_timeout_seconds=scrape_timeout_seconds,
scrape_max_concurrency=scrape_max_concurrency,
)
def scrape_urls(self, urls: List[str]) -> List[Dict[str, Any]]: async def scrape_urls(
"""Fetch full page content for a list of URLs using Exa get_contents.""" self,
urls: List[str],
timeout_seconds: float = 15.0,
max_concurrency: int = 5,
) -> List[Dict[str, Any]]:
"""Fetch full page content with non-blocking fallbacks and bounded concurrency."""
exa = self._get_exa_sdk() exa = self._get_exa_sdk()
if not exa: if not exa:
return self._scrape_urls_fallback(urls) return await self._scrape_urls_fallback(
urls, timeout_seconds=timeout_seconds, max_concurrency=max_concurrency
)
loop = asyncio.get_running_loop()
try: try:
result = exa.get_contents(urls, text={"max_characters": 5000}) result = await loop.run_in_executor(
None, lambda: exa.get_contents(urls, text={"max_characters": 5000})
)
return self._parse_get_contents_result(result) return self._parse_get_contents_result(result)
except Exception as e: except Exception as e:
logger.warning(f"[BacklinkScraper] Exa get_contents failed: {e}") logger.warning(f"[BacklinkScraper] Exa get_contents failed: {e}")
return self._scrape_urls_fallback(urls) return await self._scrape_urls_fallback(
urls, timeout_seconds=timeout_seconds, max_concurrency=max_concurrency
)
# -- Availability -- # -- Availability --
@@ -207,24 +227,35 @@ class BacklinkOutreachScraper:
# -- DuckDuckGo Fallback Discovery -- # -- DuckDuckGo Fallback Discovery --
async def _discover_with_duckduckgo(self, keyword: str, max_results: int) -> Dict[str, Any]: async def _discover_with_duckduckgo(
self,
keyword: str,
max_results: int,
scrape_timeout_seconds: float = 15.0,
scrape_max_concurrency: int = 5,
) -> Dict[str, Any]:
queries = self._generate_search_queries(keyword) queries = self._generate_search_queries(keyword)
dedup: Dict[str, Dict[str, Any]] = {} dedup: Dict[str, Dict[str, Any]] = {}
for query in queries[:4]: async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as client:
rows = self._duckduckgo_search(query) for query in queries[:4]:
for row in rows: rows = await self._duckduckgo_search(query, client=client)
norm_url = self._normalize_url(row.get("url", "")) for row in rows:
if not norm_url or norm_url in dedup: norm_url = self._normalize_url(row.get("url", ""))
continue if not norm_url or norm_url in dedup:
dedup[norm_url] = row continue
if len(dedup) >= max_results: dedup[norm_url] = row
break if len(dedup) >= max_results:
time.sleep(0.4) break
await asyncio.sleep(0.4)
# Scrape discovered URLs with Exa get_contents (or fallback) # Scrape discovered URLs with Exa get_contents (or fallback)
urls_to_scrape = list(dedup.keys())[:max_results] urls_to_scrape = list(dedup.keys())[:max_results]
scraped = self.scrape_urls(urls_to_scrape) scraped = await self.scrape_urls(
urls_to_scrape,
timeout_seconds=scrape_timeout_seconds,
max_concurrency=scrape_max_concurrency,
)
scraped_map = {self._normalize_url(s.get("url", "")): s for s in scraped} scraped_map = {self._normalize_url(s.get("url", "")): s for s in scraped}
# Merge DDG results with scraped content # Merge DDG results with scraped content
@@ -250,51 +281,76 @@ class BacklinkOutreachScraper:
"opportunities": opportunities, "opportunities": opportunities,
} }
def _duckduckgo_search(self, query: str, retries: int = 2) -> List[Dict[str, Any]]: async def _duckduckgo_search(
encoded = requests.utils.quote(query) self,
query: str,
retries: int = 2,
client: Optional[httpx.AsyncClient] = None,
) -> List[Dict[str, Any]]:
encoded = quote(query)
url = f"https://duckduckgo.com/html/?q={encoded}" url = f"https://duckduckgo.com/html/?q={encoded}"
headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"} headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"}
for attempt in range(retries + 1):
try:
resp = requests.get(url, headers=headers, timeout=12)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
results = []
for result in soup.select("div.result")[:10]:
anchor = result.select_one("a.result__a")
snippet_el = result.select_one("a.result__snippet") or result.select_one("div.result__snippet")
if not anchor or not anchor.get("href"):
continue
results.append({
"url": anchor.get("href"),
"title": anchor.get_text(strip=True),
"snippet": snippet_el.get_text(" ", strip=True) if snippet_el else "",
"highlights": [],
})
return results
except Exception:
if attempt == retries:
return []
time.sleep(0.6 * (attempt + 1))
return []
def _scrape_urls_fallback(self, urls: List[str]) -> List[Dict[str, Any]]: async def _request(active_client: httpx.AsyncClient) -> List[Dict[str, Any]]:
"""Basic HTTP scrape when Exa is unavailable.""" for attempt in range(retries + 1):
results = [] try:
resp = await active_client.get(url, headers=headers)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
results = []
for result in soup.select("div.result")[:10]:
anchor = result.select_one("a.result__a")
snippet_el = result.select_one("a.result__snippet") or result.select_one("div.result__snippet")
if not anchor or not anchor.get("href"):
continue
results.append({
"url": anchor.get("href"),
"title": anchor.get_text(strip=True),
"snippet": snippet_el.get_text(" ", strip=True) if snippet_el else "",
"highlights": [],
})
return results
except (httpx.HTTPError, httpx.TimeoutException):
if attempt == retries:
return []
await asyncio.sleep(0.6 * (attempt + 1))
return []
if client is not None:
return await _request(client)
async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as owned_client:
return await _request(owned_client)
async def _scrape_urls_fallback(
self,
urls: List[str],
timeout_seconds: float = 15.0,
max_concurrency: int = 5,
) -> List[Dict[str, Any]]:
"""Basic async HTTP scrape when Exa is unavailable."""
headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"} headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"}
for url in urls[:5]: semaphore = asyncio.Semaphore(max(1, max_concurrency))
try: timeout = httpx.Timeout(timeout_seconds)
resp = requests.get(url, headers=headers, timeout=15)
resp.raise_for_status() async def scrape_one(client: httpx.AsyncClient, url: str) -> Optional[Dict[str, Any]]:
soup = BeautifulSoup(resp.text, "html.parser") async with semaphore:
for tag in soup(["script", "style", "nav", "footer", "header"]): try:
tag.decompose() resp = await client.get(url, headers=headers)
text = soup.get_text(separator=" ", strip=True) resp.raise_for_status()
title = soup.title.get_text(strip=True) if soup.title else "" soup = BeautifulSoup(resp.text, "html.parser")
results.append({"url": url, "title": title, "text": text[:5000], "highlights": [], "summary": ""}) for tag in soup(["script", "style", "nav", "footer", "header"]):
except Exception: tag.decompose()
continue text = soup.get_text(separator=" ", strip=True)
return results title = soup.title.get_text(strip=True) if soup.title else ""
return {"url": url, "title": title, "text": text[:5000], "highlights": [], "summary": ""}
except (httpx.HTTPError, httpx.TimeoutException):
return None
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
tasks = [scrape_one(client, url) for url in urls]
scraped = await asyncio.gather(*tasks)
return [row for row in scraped if row]
# -- Enrichment Pipeline -- # -- Enrichment Pipeline --

View File

@@ -4,10 +4,11 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from urllib.parse import quote
import asyncio
import re import re
import time
import requests import httpx
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
import csv import csv
@@ -55,51 +56,67 @@ class BacklinkOutreachService:
f"{normalized} + 'Submit article'", f"{normalized} + 'Submit article'",
] ]
def search_for_urls(self, query: str, timeout_seconds: int = 12, retries: int = 2) -> List[SearchResult]: async def search_for_urls(
encoded_query = requests.utils.quote(query) self,
query: str,
timeout_seconds: int = 12,
retries: int = 2,
client: Optional[httpx.AsyncClient] = None,
) -> List[SearchResult]:
"""Search DuckDuckGo HTML using a non-blocking HTTP client."""
encoded_query = quote(query)
url = f"https://duckduckgo.com/html/?q={encoded_query}" url = f"https://duckduckgo.com/html/?q={encoded_query}"
headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"} headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"}
for attempt in range(retries + 1): async def _request(active_client: httpx.AsyncClient) -> List[SearchResult]:
try: for attempt in range(retries + 1):
response = requests.get(url, headers=headers, timeout=timeout_seconds) try:
response.raise_for_status() response = await active_client.get(url, headers=headers)
soup = BeautifulSoup(response.text, "html.parser") response.raise_for_status()
rows: List[SearchResult] = [] soup = BeautifulSoup(response.text, "html.parser")
for result in soup.select("div.result")[:10]: rows: List[SearchResult] = []
anchor = result.select_one("a.result__a") for result in soup.select("div.result")[:10]:
snippet = result.select_one("a.result__snippet") or result.select_one("div.result__snippet") anchor = result.select_one("a.result__a")
if not anchor or not anchor.get("href"): snippet = result.select_one("a.result__snippet") or result.select_one("div.result__snippet")
continue if not anchor or not anchor.get("href"):
rows.append( continue
SearchResult( rows.append(
url=anchor.get("href"), SearchResult(
title=anchor.get_text(strip=True), url=anchor.get("href"),
snippet=snippet.get_text(" ", strip=True) if snippet else "", title=anchor.get_text(strip=True),
snippet=snippet.get_text(" ", strip=True) if snippet else "",
)
) )
) return rows
return rows except (httpx.HTTPError, httpx.TimeoutException):
except Exception: if attempt == retries:
if attempt == retries: return []
return [] await asyncio.sleep(0.6 * (attempt + 1))
time.sleep(0.6 * (attempt + 1)) return []
return []
def discover_opportunities(self, keyword: str, max_results: int = 10) -> Dict[str, Any]: if client is not None:
return await _request(client)
timeout = httpx.Timeout(timeout_seconds)
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as owned_client:
return await _request(owned_client)
async def discover_opportunities_async(self, keyword: str, max_results: int = 10) -> Dict[str, Any]:
queries = self.generate_guest_post_queries(keyword)[:4] queries = self.generate_guest_post_queries(keyword)[:4]
dedup: Dict[str, SearchResult] = {} dedup: Dict[str, SearchResult] = {}
for query in queries: async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as client:
for result in self.search_for_urls(query): for query in queries:
normalized_url = self._normalize_url(result.url) for result in await self.search_for_urls(query, client=client):
if not normalized_url or normalized_url in dedup: normalized_url = self._normalize_url(result.url)
continue if not normalized_url or normalized_url in dedup:
dedup[normalized_url] = result continue
dedup[normalized_url] = result
if len(dedup) >= max_results:
break
if len(dedup) >= max_results: if len(dedup) >= max_results:
break break
if len(dedup) >= max_results: await asyncio.sleep(0.4)
break
time.sleep(0.4)
opportunities: List[OpportunityRecord] = [] opportunities: List[OpportunityRecord] = []
for normalized_url, row in dedup.items(): for normalized_url, row in dedup.items():
@@ -118,6 +135,10 @@ class BacklinkOutreachService:
return {"keyword": keyword, "queries": queries, "opportunities": opportunities} return {"keyword": keyword, "queries": queries, "opportunities": opportunities}
def discover_opportunities(self, keyword: str, max_results: int = 10) -> Dict[str, Any]:
"""Synchronous compatibility wrapper for non-async callers."""
return asyncio.run(self.discover_opportunities_async(keyword, max_results))
def _normalize_url(self, url: str) -> str: def _normalize_url(self, url: str) -> str:
u = (url or "").strip() u = (url or "").strip()
if not u: if not u:
@@ -323,11 +344,23 @@ class BacklinkOutreachService:
writer.writerows([{k: self._sanitize_csv_value(v) for k, v in row.items()}]) writer.writerows([{k: self._sanitize_csv_value(v) for k, v in row.items()}])
return output.getvalue() return output.getvalue()
async def deep_discover(self, keyword: str, max_results: int = 15) -> Dict[str, Any]: async def deep_discover(
self,
keyword: str,
max_results: int = 15,
user_id: Optional[str] = None,
scrape_timeout_seconds: float = 15.0,
scrape_max_concurrency: int = 5,
) -> Dict[str, Any]:
"""Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping.""" """Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping."""
from services.backlink_outreach_scraper import BacklinkOutreachScraper from services.backlink_outreach_scraper import BacklinkOutreachScraper
scraper = BacklinkOutreachScraper(user_id=self._user_id if hasattr(self, '_user_id') else None) scraper = BacklinkOutreachScraper(user_id=user_id)
return await scraper.deep_discover(keyword, max_results) return await scraper.deep_discover(
keyword,
max_results,
scrape_timeout_seconds=scrape_timeout_seconds,
scrape_max_concurrency=scrape_max_concurrency,
)
def get_migration_coverage(self) -> Dict[str, Any]: def get_migration_coverage(self) -> Dict[str, Any]:
implemented = [ implemented = [