Files
ALwrity/backend/services/research/deep_crawl_service.py
ajaysi 928c2f20aa fix: WYSIWYG editor, content generation, and writing assistant bug fixes
- Fix text selection menu not showing: wire contentRef via inputRef on multiline TextField
- Fix blog title not truncating: add min-w-0 for flex item overflow
- Fix outline generation 500: escape curly braces in f-string prompt template
- Fix content generation 'NoneType not callable': replace SessionLocal() with get_session_for_user(), add db param to MediumBlogGenerator, fix signature mismatch in database_task_manager
- Fix writing assistant suggest 500: add auth + user_id to API endpoint and service, replace sync requests with httpx.AsyncClient
- Fix hallucination detector 404: explicitly include router in main.py and app.py
- Fix missing error_data in task failure responses
- Hide CopilotKit web inspector button
- Remove hardcoded fallback suggestions from SmartTypingAssist
- Fix stale closure refs in SmartTypingAssist handleTypingChange
- Add two-column editor layout, stats bar, section hover menu
- Various subscription, billing, and research module improvements
2026-05-14 09:11:51 +05:30

295 lines
13 KiB
Python

"""
Deep Crawl Service for Onboarding Step 3
Handles deep crawling of user's website, combining Sitemap and Tavily data.
"""
import os
import asyncio
import httpx
from typing import Dict, List, Any, Optional
from datetime import datetime
from loguru import logger
from urllib.parse import urlparse
from services.seo_tools.sitemap_service import SitemapService
from services.research.tavily_service import TavilyService
from services.database import get_session_for_user
from models.crawled_content import EndUserWebsiteContent
from models.website_analysis_monitoring_models import DeepWebsiteCrawlTask, DeepWebsiteCrawlExecutionLog
class DeepCrawlService:
def __init__(self):
self.sitemap_service = SitemapService()
self.tavily_service = TavilyService()
async def execute_deep_crawl(self, user_id: str, website_url: str, task_id: Optional[int] = None) -> Dict[str, Any]:
"""
Execute deep crawl for a user's website.
1. Fetch URLs from Sitemap.
2. Crawl using Tavily.
3. Deduplicate URLs.
4. Check liveness (status code).
5. Save content to DB and File.
"""
logger.info(f"Starting deep crawl for {website_url} (User: {user_id})")
execution_start = datetime.utcnow()
db = get_session_for_user(user_id)
if not db:
raise Exception("Database connection failed")
try:
# 1. Sitemap Discovery
sitemap_urls = set()
try:
# Discover sitemap URL
sitemap_url = await self.sitemap_service.discover_sitemap_url(website_url)
if not sitemap_url:
sitemap_url = f"{website_url.rstrip('/')}/sitemap.xml"
# Analyze sitemap to get URLs
# We use analyze_sitemap directly to get raw URLs
sitemap_data = await self.sitemap_service.analyze_sitemap(sitemap_url)
for url_entry in sitemap_data.get("urls", []):
if isinstance(url_entry, dict) and "loc" in url_entry:
sitemap_urls.add(url_entry["loc"])
logger.info(f"Found {len(sitemap_urls)} URLs from sitemap")
except Exception as e:
logger.warning(f"Sitemap analysis failed: {e}")
# 2. Tavily Crawl
tavily_urls = set()
tavily_results = []
try:
# Use intelligent instructions
instructions = "Find all blog posts, articles, and main content pages. Ignore login, signup, and admin pages."
crawl_result = await self.tavily_service.crawl(
url=website_url,
limit=50, # Limit to avoid excessive costs/time
max_depth=2,
extract_depth="basic",
instructions=instructions
)
if crawl_result.get("success"):
for res in crawl_result.get("results", []):
url = res.get("url")
if url:
tavily_urls.add(url)
tavily_results.append(res)
logger.info(f"Found {len(tavily_urls)} URLs from Tavily")
# Track Tavily usage
try:
from services.subscription import PricingService
from sqlalchemy import text
pricing_service = PricingService(db)
current_period = pricing_service.get_current_billing_period(user_id)
cost = 0.005 # Tavily crawl cost estimate
update_query = text("""
UPDATE usage_summaries
SET tavily_calls = COALESCE(tavily_calls, 0) + 1,
tavily_cost = COALESCE(tavily_cost, 0) + :cost,
total_calls = COALESCE(total_calls, 0) + 1,
total_cost = COALESCE(total_cost, 0) + :cost
WHERE user_id = :user_id AND billing_period = :period
""")
db.execute(update_query, {
'cost': cost, 'user_id': user_id, 'period': current_period,
})
db.commit()
logger.info(f"[DeepCrawl] Tracked Tavily crawl usage: user={user_id}, cost=${cost}")
except Exception as track_err:
logger.warning(f"[DeepCrawl] Failed to track Tavily usage: {track_err}")
except Exception as e:
logger.warning(f"Tavily crawl failed: {e}")
# 3. Merge and Deduplicate
all_urls = sitemap_urls.union(tavily_urls)
unique_urls = list(all_urls)
logger.info(f"Total unique URLs to process: {len(unique_urls)}")
# 4. Process URLs (Liveness & Save)
processed_count = 0
success_count = 0
# Create directory for documents if not exists
# We'll save in workspace/{user_id}/crawled_content/
# Note: Path logic should be consistent with project structure
# Assuming workspace path is available via env or config, or constructing it.
# Using relative path for now, adjusted to project root.
# Database files are stored under workspace/workspace_{user_id}/db/.
# So workspace root is workspace/workspace_{user_id}/
workspace_dir = f"workspace/workspace_{user_id}/crawled_content"
os.makedirs(workspace_dir, exist_ok=True)
# Limit concurrent checks
sem = asyncio.Semaphore(10)
async def process_url(url):
async with sem:
return await self._process_single_url(url, user_id, website_url, workspace_dir, tavily_results)
tasks = [process_url(url) for url in unique_urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
processed_data = []
# Save results to DB
for res in results:
if isinstance(res, dict):
processed_data.append(res)
if res.get("status_code") and 200 <= res.get("status_code") < 300:
success_count += 1
# Save to DB
try:
existing = db.query(EndUserWebsiteContent).filter(
EndUserWebsiteContent.user_id == user_id,
EndUserWebsiteContent.url == res["url"]
).first()
if existing:
existing.content = res.get("content")
existing.title = res.get("title")
existing.status_code = res.get("status_code")
existing.crawled_at = datetime.utcnow()
else:
new_content = EndUserWebsiteContent(
user_id=user_id,
website_url=website_url,
url=res["url"],
title=res.get("title"),
content=res.get("content"),
status_code=res.get("status_code"),
crawled_at=datetime.utcnow()
)
db.add(new_content)
except Exception as e:
logger.error(f"Failed to save content to DB for {res['url']}: {e}")
db.commit()
# 5. Update Task Log if task_id provided
if task_id:
log = DeepWebsiteCrawlExecutionLog(
task_id=task_id,
status="success",
result_data={
"total_urls": len(unique_urls),
"sitemap_urls": len(sitemap_urls),
"tavily_urls": len(tavily_urls),
"success_count": success_count,
"processed_urls": processed_data[:100] # Store only a subset to avoid huge JSON
},
execution_time_ms=int((datetime.utcnow() - execution_start).total_seconds() * 1000)
)
db.add(log)
# Update task
task = db.query(DeepWebsiteCrawlTask).filter(DeepWebsiteCrawlTask.id == task_id).first()
if task:
task.last_executed = datetime.utcnow()
task.last_success = datetime.utcnow()
task.status = "active"
task.consecutive_failures = 0
db.commit()
return {
"success": True,
"total_urls": len(unique_urls),
"sitemap_urls": len(sitemap_urls),
"tavily_urls": len(tavily_urls),
"processed_urls": processed_data
}
except Exception as e:
logger.error(f"Deep crawl failed: {e}")
if task_id:
log = DeepWebsiteCrawlExecutionLog(
task_id=task_id,
status="failed",
error_message=str(e),
execution_time_ms=int((datetime.utcnow() - execution_start).total_seconds() * 1000)
)
db.add(log)
task = db.query(DeepWebsiteCrawlTask).filter(DeepWebsiteCrawlTask.id == task_id).first()
if task:
task.last_executed = datetime.utcnow()
task.last_failure = datetime.utcnow()
task.failure_reason = str(e)
task.consecutive_failures += 1
db.commit()
raise e
finally:
db.close()
async def _process_single_url(self, url: str, user_id: str, website_url: str, workspace_dir: str, tavily_results: List[Dict]):
"""Check liveness, extract content, and save."""
status_code = None
error = None
content = None
title = None
# 1. Liveness Check
try:
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client:
resp = await client.get(url)
status_code = resp.status_code
except Exception as e:
error = str(e)
status_code = 0 # Failed
# 2. Get content (from Tavily results or generic extraction if needed)
# Check if we have content from Tavily
tavily_match = next((r for r in tavily_results if r.get("url") == url), None)
if tavily_match:
content = tavily_match.get("raw_content") or tavily_match.get("content")
title = tavily_match.get("title")
elif status_code and 200 <= status_code < 300:
# Simple fetch content if valid
try:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
resp = await client.get(url)
content = resp.text
# Naive title extraction
if "<title>" in content:
start = content.find("<title>") + 7
end = content.find("</title>")
if start > 6 and end > start:
title = content[start:end]
except Exception:
pass
# 3. Save to Document
if content and title:
safe_title = "".join([c for c in title if c.isalnum() or c in (' ', '-', '_')]).strip()[:50]
if not safe_title:
safe_title = "untitled"
filename = f"{safe_title}_{int(datetime.utcnow().timestamp())}.txt"
filepath = os.path.join(workspace_dir, filename)
try:
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"URL: {url}\n")
f.write(f"Title: {title}\n")
f.write(f"Date: {datetime.utcnow()}\n\n")
f.write(content)
except Exception as e:
logger.warning(f"Failed to write file for {url}: {e}")
return {
"url": url,
"status_code": status_code,
"error": error,
"title": title,
"content": content
}