From 55b7209554bbe85d1a566f639442e5cfbbbc450c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Wed, 3 Jun 2026 18:28:40 +0530 Subject: [PATCH] Refactor backlink discovery HTTP calls --- backend/routers/backlink_outreach.py | 12 +- backend/services/backlink_outreach_scraper.py | 182 ++++++++++++------ backend/services/backlink_outreach_service.py | 113 +++++++---- 3 files changed, 202 insertions(+), 105 deletions(-) diff --git a/backend/routers/backlink_outreach.py b/backend/routers/backlink_outreach.py index 1900f38b..4bcc99dc 100644 --- a/backend/routers/backlink_outreach.py +++ b/backend/routers/backlink_outreach.py @@ -68,7 +68,7 @@ async def discover_backlink_opportunities( payload: BacklinkKeywordInput, current_user: Dict[str, Any] = Depends(get_current_user), ): - return backlink_outreach_service.discover_opportunities(payload.keyword, payload.max_results) + return await backlink_outreach_service.discover_opportunities_async(payload.keyword, payload.max_results) @router.get("/migration-coverage") @@ -84,10 +84,18 @@ async def get_backlink_migration_coverage( async def discover_deep_backlink_opportunities( payload: DeepKeywordInput, current_user: Dict[str, Any] = Depends(get_current_user), + scrape_timeout_seconds: float = Query(15.0, ge=1.0, le=60.0), + scrape_max_concurrency: int = Query(5, ge=1, le=20), ): """Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping.""" user_id = _resolve_user_id(current_user) - result = await backlink_outreach_service.deep_discover(payload.keyword, payload.max_results) + result = await backlink_outreach_service.deep_discover( + payload.keyword, + payload.max_results, + user_id=user_id, + scrape_timeout_seconds=scrape_timeout_seconds, + scrape_max_concurrency=scrape_max_concurrency, + ) if payload.campaign_id: storage = BacklinkOutreachStorageService() saved = 0 diff --git a/backend/services/backlink_outreach_scraper.py b/backend/services/backlink_outreach_scraper.py index d9b2c7f8..a3d77ae2 100644 --- a/backend/services/backlink_outreach_scraper.py +++ b/backend/services/backlink_outreach_scraper.py @@ -8,11 +8,10 @@ from __future__ import annotations import asyncio import re -import time from typing import Any, Dict, List, Optional -from urllib.parse import urlparse +from urllib.parse import quote, urlparse -import requests +import httpx from bs4 import BeautifulSoup from loguru import logger @@ -34,26 +33,47 @@ class BacklinkOutreachScraper: # -- Public API -- async def deep_discover( - self, keyword: str, max_results: int = 15 + self, + keyword: str, + max_results: int = 15, + scrape_timeout_seconds: float = 15.0, + scrape_max_concurrency: int = 5, ) -> Dict[str, Any]: """Discover guest-post opportunities using Exa, falling back to DuckDuckGo.""" if self._is_exa_available(): logger.info(f"[BacklinkScraper] Using Exa for keyword: {keyword}") return await self._discover_with_exa(keyword, max_results) logger.info(f"[BacklinkScraper] Exa unavailable, falling back to DuckDuckGo for: {keyword}") - return await self._discover_with_duckduckgo(keyword, max_results) + return await self._discover_with_duckduckgo( + keyword, + max_results, + scrape_timeout_seconds=scrape_timeout_seconds, + scrape_max_concurrency=scrape_max_concurrency, + ) - def scrape_urls(self, urls: List[str]) -> List[Dict[str, Any]]: - """Fetch full page content for a list of URLs using Exa get_contents.""" + async def scrape_urls( + self, + urls: List[str], + timeout_seconds: float = 15.0, + max_concurrency: int = 5, + ) -> List[Dict[str, Any]]: + """Fetch full page content with non-blocking fallbacks and bounded concurrency.""" exa = self._get_exa_sdk() if not exa: - return self._scrape_urls_fallback(urls) + return await self._scrape_urls_fallback( + urls, timeout_seconds=timeout_seconds, max_concurrency=max_concurrency + ) + loop = asyncio.get_running_loop() try: - result = exa.get_contents(urls, text={"max_characters": 5000}) + result = await loop.run_in_executor( + None, lambda: exa.get_contents(urls, text={"max_characters": 5000}) + ) return self._parse_get_contents_result(result) except Exception as e: logger.warning(f"[BacklinkScraper] Exa get_contents failed: {e}") - return self._scrape_urls_fallback(urls) + return await self._scrape_urls_fallback( + urls, timeout_seconds=timeout_seconds, max_concurrency=max_concurrency + ) # -- Availability -- @@ -207,24 +227,35 @@ class BacklinkOutreachScraper: # -- DuckDuckGo Fallback Discovery -- - async def _discover_with_duckduckgo(self, keyword: str, max_results: int) -> Dict[str, Any]: + async def _discover_with_duckduckgo( + self, + keyword: str, + max_results: int, + scrape_timeout_seconds: float = 15.0, + scrape_max_concurrency: int = 5, + ) -> Dict[str, Any]: queries = self._generate_search_queries(keyword) dedup: Dict[str, Dict[str, Any]] = {} - for query in queries[:4]: - rows = self._duckduckgo_search(query) - for row in rows: - norm_url = self._normalize_url(row.get("url", "")) - if not norm_url or norm_url in dedup: - continue - dedup[norm_url] = row - if len(dedup) >= max_results: - break - time.sleep(0.4) + async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as client: + for query in queries[:4]: + rows = await self._duckduckgo_search(query, client=client) + for row in rows: + norm_url = self._normalize_url(row.get("url", "")) + if not norm_url or norm_url in dedup: + continue + dedup[norm_url] = row + if len(dedup) >= max_results: + break + await asyncio.sleep(0.4) # Scrape discovered URLs with Exa get_contents (or fallback) urls_to_scrape = list(dedup.keys())[:max_results] - scraped = self.scrape_urls(urls_to_scrape) + scraped = await self.scrape_urls( + urls_to_scrape, + timeout_seconds=scrape_timeout_seconds, + max_concurrency=scrape_max_concurrency, + ) scraped_map = {self._normalize_url(s.get("url", "")): s for s in scraped} # Merge DDG results with scraped content @@ -250,51 +281,76 @@ class BacklinkOutreachScraper: "opportunities": opportunities, } - def _duckduckgo_search(self, query: str, retries: int = 2) -> List[Dict[str, Any]]: - encoded = requests.utils.quote(query) + async def _duckduckgo_search( + self, + query: str, + retries: int = 2, + client: Optional[httpx.AsyncClient] = None, + ) -> List[Dict[str, Any]]: + encoded = quote(query) url = f"https://duckduckgo.com/html/?q={encoded}" headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"} - for attempt in range(retries + 1): - try: - resp = requests.get(url, headers=headers, timeout=12) - resp.raise_for_status() - soup = BeautifulSoup(resp.text, "html.parser") - results = [] - for result in soup.select("div.result")[:10]: - anchor = result.select_one("a.result__a") - snippet_el = result.select_one("a.result__snippet") or result.select_one("div.result__snippet") - if not anchor or not anchor.get("href"): - continue - results.append({ - "url": anchor.get("href"), - "title": anchor.get_text(strip=True), - "snippet": snippet_el.get_text(" ", strip=True) if snippet_el else "", - "highlights": [], - }) - return results - except Exception: - if attempt == retries: - return [] - time.sleep(0.6 * (attempt + 1)) - return [] - def _scrape_urls_fallback(self, urls: List[str]) -> List[Dict[str, Any]]: - """Basic HTTP scrape when Exa is unavailable.""" - results = [] + async def _request(active_client: httpx.AsyncClient) -> List[Dict[str, Any]]: + for attempt in range(retries + 1): + try: + resp = await active_client.get(url, headers=headers) + resp.raise_for_status() + soup = BeautifulSoup(resp.text, "html.parser") + results = [] + for result in soup.select("div.result")[:10]: + anchor = result.select_one("a.result__a") + snippet_el = result.select_one("a.result__snippet") or result.select_one("div.result__snippet") + if not anchor or not anchor.get("href"): + continue + results.append({ + "url": anchor.get("href"), + "title": anchor.get_text(strip=True), + "snippet": snippet_el.get_text(" ", strip=True) if snippet_el else "", + "highlights": [], + }) + return results + except (httpx.HTTPError, httpx.TimeoutException): + if attempt == retries: + return [] + await asyncio.sleep(0.6 * (attempt + 1)) + return [] + + if client is not None: + return await _request(client) + + async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as owned_client: + return await _request(owned_client) + + async def _scrape_urls_fallback( + self, + urls: List[str], + timeout_seconds: float = 15.0, + max_concurrency: int = 5, + ) -> List[Dict[str, Any]]: + """Basic async HTTP scrape when Exa is unavailable.""" headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"} - for url in urls[:5]: - try: - resp = requests.get(url, headers=headers, timeout=15) - resp.raise_for_status() - soup = BeautifulSoup(resp.text, "html.parser") - for tag in soup(["script", "style", "nav", "footer", "header"]): - tag.decompose() - text = soup.get_text(separator=" ", strip=True) - title = soup.title.get_text(strip=True) if soup.title else "" - results.append({"url": url, "title": title, "text": text[:5000], "highlights": [], "summary": ""}) - except Exception: - continue - return results + semaphore = asyncio.Semaphore(max(1, max_concurrency)) + timeout = httpx.Timeout(timeout_seconds) + + async def scrape_one(client: httpx.AsyncClient, url: str) -> Optional[Dict[str, Any]]: + async with semaphore: + try: + resp = await client.get(url, headers=headers) + resp.raise_for_status() + soup = BeautifulSoup(resp.text, "html.parser") + for tag in soup(["script", "style", "nav", "footer", "header"]): + tag.decompose() + text = soup.get_text(separator=" ", strip=True) + title = soup.title.get_text(strip=True) if soup.title else "" + return {"url": url, "title": title, "text": text[:5000], "highlights": [], "summary": ""} + except (httpx.HTTPError, httpx.TimeoutException): + return None + + async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as client: + tasks = [scrape_one(client, url) for url in urls] + scraped = await asyncio.gather(*tasks) + return [row for row in scraped if row] # -- Enrichment Pipeline -- diff --git a/backend/services/backlink_outreach_service.py b/backend/services/backlink_outreach_service.py index da8846cd..f268ee58 100644 --- a/backend/services/backlink_outreach_service.py +++ b/backend/services/backlink_outreach_service.py @@ -4,10 +4,11 @@ from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, List, Optional +from urllib.parse import quote +import asyncio import re -import time -import requests +import httpx from bs4 import BeautifulSoup import csv @@ -55,51 +56,67 @@ class BacklinkOutreachService: f"{normalized} + 'Submit article'", ] - def search_for_urls(self, query: str, timeout_seconds: int = 12, retries: int = 2) -> List[SearchResult]: - encoded_query = requests.utils.quote(query) + async def search_for_urls( + self, + query: str, + timeout_seconds: int = 12, + retries: int = 2, + client: Optional[httpx.AsyncClient] = None, + ) -> List[SearchResult]: + """Search DuckDuckGo HTML using a non-blocking HTTP client.""" + encoded_query = quote(query) url = f"https://duckduckgo.com/html/?q={encoded_query}" headers = {"User-Agent": "Mozilla/5.0 ALwrityBacklinkBot/1.0"} - for attempt in range(retries + 1): - try: - response = requests.get(url, headers=headers, timeout=timeout_seconds) - response.raise_for_status() - soup = BeautifulSoup(response.text, "html.parser") - rows: List[SearchResult] = [] - for result in soup.select("div.result")[:10]: - anchor = result.select_one("a.result__a") - snippet = result.select_one("a.result__snippet") or result.select_one("div.result__snippet") - if not anchor or not anchor.get("href"): - continue - rows.append( - SearchResult( - url=anchor.get("href"), - title=anchor.get_text(strip=True), - snippet=snippet.get_text(" ", strip=True) if snippet else "", + async def _request(active_client: httpx.AsyncClient) -> List[SearchResult]: + for attempt in range(retries + 1): + try: + response = await active_client.get(url, headers=headers) + response.raise_for_status() + soup = BeautifulSoup(response.text, "html.parser") + rows: List[SearchResult] = [] + for result in soup.select("div.result")[:10]: + anchor = result.select_one("a.result__a") + snippet = result.select_one("a.result__snippet") or result.select_one("div.result__snippet") + if not anchor or not anchor.get("href"): + continue + rows.append( + SearchResult( + url=anchor.get("href"), + title=anchor.get_text(strip=True), + snippet=snippet.get_text(" ", strip=True) if snippet else "", + ) ) - ) - return rows - except Exception: - if attempt == retries: - return [] - time.sleep(0.6 * (attempt + 1)) - return [] + return rows + except (httpx.HTTPError, httpx.TimeoutException): + if attempt == retries: + return [] + await asyncio.sleep(0.6 * (attempt + 1)) + return [] - def discover_opportunities(self, keyword: str, max_results: int = 10) -> Dict[str, Any]: + if client is not None: + return await _request(client) + + timeout = httpx.Timeout(timeout_seconds) + async with httpx.AsyncClient(timeout=timeout, follow_redirects=True) as owned_client: + return await _request(owned_client) + + async def discover_opportunities_async(self, keyword: str, max_results: int = 10) -> Dict[str, Any]: queries = self.generate_guest_post_queries(keyword)[:4] dedup: Dict[str, SearchResult] = {} - for query in queries: - for result in self.search_for_urls(query): - normalized_url = self._normalize_url(result.url) - if not normalized_url or normalized_url in dedup: - continue - dedup[normalized_url] = result + async with httpx.AsyncClient(timeout=httpx.Timeout(12.0), follow_redirects=True) as client: + for query in queries: + for result in await self.search_for_urls(query, client=client): + normalized_url = self._normalize_url(result.url) + if not normalized_url or normalized_url in dedup: + continue + dedup[normalized_url] = result + if len(dedup) >= max_results: + break if len(dedup) >= max_results: break - if len(dedup) >= max_results: - break - time.sleep(0.4) + await asyncio.sleep(0.4) opportunities: List[OpportunityRecord] = [] for normalized_url, row in dedup.items(): @@ -118,6 +135,10 @@ class BacklinkOutreachService: return {"keyword": keyword, "queries": queries, "opportunities": opportunities} + def discover_opportunities(self, keyword: str, max_results: int = 10) -> Dict[str, Any]: + """Synchronous compatibility wrapper for non-async callers.""" + return asyncio.run(self.discover_opportunities_async(keyword, max_results)) + def _normalize_url(self, url: str) -> str: u = (url or "").strip() if not u: @@ -323,11 +344,23 @@ class BacklinkOutreachService: writer.writerows([{k: self._sanitize_csv_value(v) for k, v in row.items()}]) return output.getvalue() - async def deep_discover(self, keyword: str, max_results: int = 15) -> Dict[str, Any]: + async def deep_discover( + self, + keyword: str, + max_results: int = 15, + user_id: Optional[str] = None, + scrape_timeout_seconds: float = 15.0, + scrape_max_concurrency: int = 5, + ) -> Dict[str, Any]: """Enhanced discovery using Exa neural search + DuckDuckGo with full-page scraping.""" from services.backlink_outreach_scraper import BacklinkOutreachScraper - scraper = BacklinkOutreachScraper(user_id=self._user_id if hasattr(self, '_user_id') else None) - return await scraper.deep_discover(keyword, max_results) + scraper = BacklinkOutreachScraper(user_id=user_id) + return await scraper.deep_discover( + keyword, + max_results, + scrape_timeout_seconds=scrape_timeout_seconds, + scrape_max_concurrency=scrape_max_concurrency, + ) def get_migration_coverage(self) -> Dict[str, Any]: implemented = [