Refactor backlink discovery HTTP calls

This commit is contained in:
ي
2026-06-03 18:28:40 +05:30
parent 923fa671fe
commit 55b7209554
3 changed files with 202 additions and 105 deletions

View File

@@ -68,7 +68,7 @@ async def discover_backlink_opportunities(
payload: BacklinkKeywordInput,
current_user: Dict[str, Any] = Depends(get_current_user),
):
return backlink_outreach_service.discover_opportunities(payload.keyword, payload.max_results)
return await backlink_outreach_service.discover_opportunities_async(payload.keyword, payload.max_results)
@router.get("/migration-coverage")
@@ -84,10 +84,18 @@ async def get_backlink_migration_coverage(
async def discover_deep_backlink_opportunities(
payload: DeepKeywordInput,
current_user: Dict[str, Any] = Depends(get_current_user),
scrape_timeout_seconds: float = Query(15.0, ge=1.0, le=60.0),
scrape_max_concurrency: int = Query(5, ge=1, le=20),
):
"""Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping."""
user_id = _resolve_user_id(current_user)
result = await backlink_outreach_service.deep_discover(payload.keyword, payload.max_results)
result = await backlink_outreach_service.deep_discover(
payload.keyword,
payload.max_results,
user_id=user_id,
scrape_timeout_seconds=scrape_timeout_seconds,
scrape_max_concurrency=scrape_max_concurrency,
)
if payload.campaign_id:
storage = BacklinkOutreachStorageService()
saved = 0

View File

@@ -8,11 +8,10 @@ from __future__ import annotations
import asyncio
import re
import time
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
from urllib.parse import quote, urlparse
import requests
import httpx
from bs4 import BeautifulSoup
from loguru import logger
@@ -34,26 +33,47 @@ class BacklinkOutreachScraper:
# -- Public API --
async def deep_discover(
self, keyword: str, max_results: int = 15
self,
keyword: str,
max_results: int = 15,
scrape_timeout_seconds: float = 15.0,
scrape_max_concurrency: int = 5,
) -> Dict[str, Any]:
"""Discover guest-post opportunities using Exa, falling back to DuckDuckGo."""
if self._is_exa_available():
logger.info(f"[BacklinkScraper] Using Exa for keyword: {keyword}")
return await self._discover_with_exa(keyword, max_results)
logger.info(f"[BacklinkScraper] Exa unavailable, falling back to DuckDuckGo for: {keyword}")
return await self._discover_with_duckduckgo(keyword, max_results)
return await self._discover_with_duckduckgo(
keyword,
max_results,
scrape_timeout_seconds=scrape_timeout_seconds,
scrape_max_concurrency=scrape_max_concurrency,
)
def scrape_urls(self, urls: List[str]) -> List[Dict[str, Any]]:
"""Fetch full page content for a list of URLs using Exa get_contents."""
async def scrape_urls(
self,
urls: List[str],
timeout_seconds: float = 15.0,
max_concurrency: int = 5,
) -> List[Dict[str, Any]]:
"""Fetch full page content with non-blocking fallbacks and bounded concurrency."""
exa = self._get_exa_sdk()
if not exa:
return self._scrape_urls_fallback(urls)
return await self._scrape_urls_fallback(
urls, timeout_seconds=timeout_seconds, max_concurrency=max_concurrency
)
loop = asyncio.get_running_loop()
try:
result = exa.get_contents(urls, text={"max_characters": 5000})
result = await loop.run_in_executor(
None, lambda: exa.get_contents(urls, text={"max_characters": 5000})
)
return self._parse_get_contents_result(result)
except Exception as e:
logger.warning(f"[BacklinkScraper] Exa get_contents failed: {e}")
return self._scrape_urls_fallback(urls)
return await self._scrape_urls_fallback(
urls, timeout_seconds=timeout_seconds, max_concurrency=max_concurrency
)
# -- Availability --
@@ -207,12 +227,19 @@ class BacklinkOutreachScraper:
# -- DuckDuckGo Fallback Discovery --
async def _discover_with_duckduckgo(self, keyword: str, max_results: int) -> Dict[str, Any]:
async def _discover_with_duckduckgo(
self,
keyword: str,
max_results: int,
scrape_timeout_seconds: float = 15.0,
scrape_max_concurrency: int = 5,
) -> Dict[str, Any]:
queries = self._generate_search_queries(keyword)
dedup: Dict[str, Dict[str, Any]] = {}
async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as client:
for query in queries[:4]:
rows = self._duckduckgo_search(query)
rows = await self._duckduckgo_search(query, client=client)
for row in rows:
norm_url = self._normalize_url(row.get("url", ""))
if not norm_url or norm_url in dedup:
@@ -220,11 +247,15 @@ class BacklinkOutreachScraper:
dedup[norm_url] = row
if len(dedup) >= max_results:
break
time.sleep(0.4)
await asyncio.sleep(0.4)
# Scrape discovered URLs with Exa get_contents (or fallback)
urls_to_scrape = list(dedup.keys())[:max_results]
scraped = self.scrape_urls(urls_to_scrape)
scraped = await self.scrape_urls(
urls_to_scrape,
timeout_seconds=scrape_timeout_seconds,
max_concurrency=scrape_max_concurrency,
)
scraped_map = {self._normalize_url(s.get("url", "")): s for s in scraped}
# Merge DDG results with scraped content
@@ -250,13 +281,20 @@ class BacklinkOutreachScraper:
"opportunities": opportunities,
}
def _duckduckgo_search(self, query: str, retries: int = 2) -> List[Dict[str, Any]]:
encoded = requests.utils.quote(query)
async def _duckduckgo_search(
self,
query: str,
retries: int = 2,
client: Optional[httpx.AsyncClient] = None,
) -> List[Dict[str, Any]]:
encoded = quote(query)
url = f"https://duckduckgo.com/html/?q={encoded}"
headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"}
async def _request(active_client: httpx.AsyncClient) -> List[Dict[str, Any]]:
for attempt in range(retries + 1):
try:
resp = requests.get(url, headers=headers, timeout=12)
resp = await active_client.get(url, headers=headers)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
results = []
@@ -272,29 +310,47 @@ class BacklinkOutreachScraper:
"highlights": [],
})
return results
except Exception:
except (httpx.HTTPError, httpx.TimeoutException):
if attempt == retries:
return []
time.sleep(0.6 * (attempt + 1))
await asyncio.sleep(0.6 * (attempt + 1))
return []
def _scrape_urls_fallback(self, urls: List[str]) -> List[Dict[str, Any]]:
"""Basic HTTP scrape when Exa is unavailable."""
results = []
if client is not None:
return await _request(client)
async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as owned_client:
return await _request(owned_client)
async def _scrape_urls_fallback(
self,
urls: List[str],
timeout_seconds: float = 15.0,
max_concurrency: int = 5,
) -> List[Dict[str, Any]]:
"""Basic async HTTP scrape when Exa is unavailable."""
headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"}
for url in urls[:5]:
semaphore = asyncio.Semaphore(max(1, max_concurrency))
timeout = httpx.Timeout(timeout_seconds)
async def scrape_one(client: httpx.AsyncClient, url: str) -> Optional[Dict[str, Any]]:
async with semaphore:
try:
resp = requests.get(url, headers=headers, timeout=15)
resp = await client.get(url, headers=headers)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator=" ", strip=True)
title = soup.title.get_text(strip=True) if soup.title else ""
results.append({"url": url, "title": title, "text": text[:5000], "highlights": [], "summary": ""})
except Exception:
continue
return results
return {"url": url, "title": title, "text": text[:5000], "highlights": [], "summary": ""}
except (httpx.HTTPError, httpx.TimeoutException):
return None
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client:
tasks = [scrape_one(client, url) for url in urls]
scraped = await asyncio.gather(*tasks)
return [row for row in scraped if row]
# -- Enrichment Pipeline --

View File

@@ -4,10 +4,11 @@ from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from urllib.parse import quote
import asyncio
import re
import time
import requests
import httpx
from bs4 import BeautifulSoup
import csv
@@ -55,14 +56,22 @@ class BacklinkOutreachService:
f"{normalized} + 'Submit article'",
]
def search_for_urls(self, query: str, timeout_seconds: int = 12, retries: int = 2) -> List[SearchResult]:
encoded_query = requests.utils.quote(query)
async def search_for_urls(
self,
query: str,
timeout_seconds: int = 12,
retries: int = 2,
client: Optional[httpx.AsyncClient] = None,
) -> List[SearchResult]:
"""Search DuckDuckGo HTML using a non-blocking HTTP client."""
encoded_query = quote(query)
url = f"https://duckduckgo.com/html/?q={encoded_query}"
headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"}
async def _request(active_client: httpx.AsyncClient) -> List[SearchResult]:
for attempt in range(retries + 1):
try:
response = requests.get(url, headers=headers, timeout=timeout_seconds)
response = await active_client.get(url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
rows: List[SearchResult] = []
@@ -79,18 +88,26 @@ class BacklinkOutreachService:
)
)
return rows
except Exception:
except (httpx.HTTPError, httpx.TimeoutException):
if attempt == retries:
return []
time.sleep(0.6 * (attempt + 1))
await asyncio.sleep(0.6 * (attempt + 1))
return []
def discover_opportunities(self, keyword: str, max_results: int = 10) -> Dict[str, Any]:
if client is not None:
return await _request(client)
timeout = httpx.Timeout(timeout_seconds)
async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as owned_client:
return await _request(owned_client)
async def discover_opportunities_async(self, keyword: str, max_results: int = 10) -> Dict[str, Any]:
queries = self.generate_guest_post_queries(keyword)[:4]
dedup: Dict[str, SearchResult] = {}
async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as client:
for query in queries:
for result in self.search_for_urls(query):
for result in await self.search_for_urls(query, client=client):
normalized_url = self._normalize_url(result.url)
if not normalized_url or normalized_url in dedup:
continue
@@ -99,7 +116,7 @@ class BacklinkOutreachService:
break
if len(dedup) >= max_results:
break
time.sleep(0.4)
await asyncio.sleep(0.4)
opportunities: List[OpportunityRecord] = []
for normalized_url, row in dedup.items():
@@ -118,6 +135,10 @@ class BacklinkOutreachService:
return {"keyword": keyword, "queries": queries, "opportunities": opportunities}
def discover_opportunities(self, keyword: str, max_results: int = 10) -> Dict[str, Any]:
"""Synchronous compatibility wrapper for non-async callers."""
return asyncio.run(self.discover_opportunities_async(keyword, max_results))
def _normalize_url(self, url: str) -> str:
u = (url or "").strip()
if not u:
@@ -323,11 +344,23 @@ class BacklinkOutreachService:
writer.writerows([{k: self._sanitize_csv_value(v) for k, v in row.items()}])
return output.getvalue()
async def deep_discover(self, keyword: str, max_results: int = 15) -> Dict[str, Any]:
async def deep_discover(
self,
keyword: str,
max_results: int = 15,
user_id: Optional[str] = None,
scrape_timeout_seconds: float = 15.0,
scrape_max_concurrency: int = 5,
) -> Dict[str, Any]:
"""Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping."""
from services.backlink_outreach_scraper import BacklinkOutreachScraper
scraper = BacklinkOutreachScraper(user_id=self._user_id if hasattr(self, '_user_id') else None)
return await scraper.deep_discover(keyword, max_results)
scraper = BacklinkOutreachScraper(user_id=user_id)
return await scraper.deep_discover(
keyword,
max_results,
scrape_timeout_seconds=scrape_timeout_seconds,
scrape_max_concurrency=scrape_max_concurrency,
)
def get_migration_coverage(self) -> Dict[str, Any]:
implemented = [