"""
Enhanced GitHub Content Scraper with Rate Limiting and Caching
This module provides functionality to scrape GitHub repositories, READMEs, and code files
for content marketing purposes. It includes async support, rate limiting, caching,
and comprehensive metadata collection.
"""
import os
import sys
import json
import asyncio
import aiohttp
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union
from urllib.parse import urljoin, urlparse
import pandas as pd
from bs4 import BeautifulSoup
from loguru import logger
import requests
from pydantic import BaseModel, Field
import time
import pickle
from pathlib import Path
# Configure logging
logger.remove()
logger.add(sys.stdout,
colorize=True,
format="{level}|{file}:{line}:{function}| {message}")
class RateLimiter:
"""Rate limiter for GitHub API requests."""
def __init__(self, calls_per_minute: int = 30):
self.calls_per_minute = calls_per_minute
self.interval = 60 / calls_per_minute # seconds between calls
self.last_call_time = 0
self.lock = asyncio.Lock()
async def acquire(self):
"""Acquire rate limit token."""
async with self.lock:
current_time = time.time()
time_since_last_call = current_time - self.last_call_time
if time_since_last_call < self.interval:
await asyncio.sleep(self.interval - time_since_last_call)
self.last_call_time = time.time()
class Cache:
"""Cache for GitHub content."""
def __init__(self, cache_dir: str = ".github_cache", ttl_hours: int = 24):
self.cache_dir = Path(cache_dir)
self.ttl = timedelta(hours=ttl_hours)
self.cache_dir.mkdir(exist_ok=True)
def _get_cache_path(self, key: str) -> Path:
"""Get cache file path for a key."""
return self.cache_dir / f"{hash(key)}.cache"
def get(self, key: str) -> Optional[Dict]:
"""Get cached value for key."""
cache_path = self._get_cache_path(key)
if not cache_path.exists():
return None
try:
with open(cache_path, 'rb') as f:
data = pickle.load(f)
if datetime.now() - data['timestamp'] > self.ttl:
cache_path.unlink()
return None
return data['value']
except Exception as e:
logger.warning(f"Cache read error for {key}: {e}")
return None
def set(self, key: str, value: Dict):
"""Set cache value for key."""
cache_path = self._get_cache_path(key)
try:
with open(cache_path, 'wb') as f:
pickle.dump({
'timestamp': datetime.now(),
'value': value
}, f)
except Exception as e:
logger.warning(f"Cache write error for {key}: {e}")
class GitHubContent(BaseModel):
"""Model for GitHub content analysis."""
title: str = Field("", description="Title of the content")
description: str = Field("", description="Description of the content")
content: str = Field("", description="Main content")
language: str = Field("", description="Programming language")
stars: int = Field(0, description="Number of stars")
forks: int = Field(0, description="Number of forks")
watchers: int = Field(0, description="Number of watchers")
last_updated: str = Field("", description="Last update date")
topics: List[str] = Field([], description="Repository topics")
contributors: List[str] = Field([], description="Contributor usernames")
readme_url: str = Field("", description="URL of the README")
raw_content_url: str = Field("", description="URL for raw content")
license: str = Field("", description="Repository license")
dependencies: List[str] = Field([], description="Project dependencies")
metadata: Dict = Field({}, description="Additional metadata")
class GitHubScraper:
"""Service for scraping GitHub content with rate limiting and caching."""
def __init__(self, cache_dir: str = ".github_cache", ttl_hours: int = 24, calls_per_minute: int = 30):
"""Initialize the scraper service."""
self.session = None
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'application/vnd.github.v3+json'
}
self.rate_limiter = RateLimiter(calls_per_minute)
self.cache = Cache(cache_dir, ttl_hours)
async def __aenter__(self):
"""Create aiohttp session when entering context."""
self.session = aiohttp.ClientSession(headers=self.headers)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Close aiohttp session when exiting context."""
if self.session:
await self.session.close()
async def fetch_url(self, url: str, use_cache: bool = True) -> str:
"""Fetch URL content asynchronously with rate limiting and caching."""
if use_cache:
cached_content = self.cache.get(url)
if cached_content:
logger.debug(f"Cache hit for {url}")
return cached_content
await self.rate_limiter.acquire()
try:
async with self.session.get(url) as response:
if response.status == 200:
content = await response.text()
if use_cache:
self.cache.set(url, content)
return content
else:
error_msg = f"Failed to fetch URL: Status code {response.status}"
logger.error(error_msg)
raise Exception(error_msg)
except Exception as e:
logger.error(f"Error fetching URL {url}: {e}")
raise
def parse_github_url(self, url: str) -> Dict[str, str]:
"""Parse GitHub URL to extract repository information."""
parsed = urlparse(url)
path_parts = parsed.path.strip('/').split('/')
if len(path_parts) < 2:
raise ValueError("Invalid GitHub URL format")
return {
'owner': path_parts[0],
'repo': path_parts[1],
'branch': path_parts[3] if len(path_parts) > 3 else 'main',
'path': '/'.join(path_parts[4:]) if len(path_parts) > 4 else ''
}
async def get_repo_metadata(self, owner: str, repo: str) -> Dict:
"""Get repository metadata from GitHub API with caching."""
cache_key = f"metadata_{owner}_{repo}"
cached_metadata = self.cache.get(cache_key)
if cached_metadata:
return cached_metadata
await self.rate_limiter.acquire()
api_url = f"https://api.github.com/repos/{owner}/{repo}"
try:
async with self.session.get(api_url) as response:
if response.status == 200:
metadata = await response.json()
self.cache.set(cache_key, metadata)
return metadata
else:
logger.error(f"Failed to fetch repo metadata: {response.status}")
return {}
except Exception as e:
logger.error(f"Error fetching repo metadata: {e}")
return {}
async def get_readme_content(self, owner: str, repo: str, branch: str = 'main') -> Dict:
"""Get README content from GitHub with caching."""
cache_key = f"readme_{owner}_{repo}_{branch}"
cached_content = self.cache.get(cache_key)
if cached_content:
return cached_content
try:
# Try to get README from API first
await self.rate_limiter.acquire()
api_url = f"https://api.github.com/repos/{owner}/{repo}/readme"
async with self.session.get(api_url) as response:
if response.status == 200:
readme_data = await response.json()
content = {
'content': readme_data.get('content', ''),
'encoding': readme_data.get('encoding', 'base64'),
'url': readme_data.get('html_url', '')
}
self.cache.set(cache_key, content)
return content
# Fallback to scraping if API fails
readme_url = f"https://github.com/{owner}/{repo}/blob/{branch}/README.md"
html_content = await self.fetch_url(readme_url, use_cache=True)
soup = BeautifulSoup(html_content, 'html.parser')
# Find the README content
readme_content = soup.find('div', {'class': 'markdown-body'})
if readme_content:
content = {
'content': readme_content.get_text(),
'encoding': 'text',
'url': readme_url
}
self.cache.set(cache_key, content)
return content
return {}
except Exception as e:
logger.error(f"Error fetching README: {e}")
return {}
async def get_file_content(self, owner: str, repo: str, path: str, branch: str = 'main') -> Dict:
"""Get content of a specific file from GitHub with caching."""
cache_key = f"file_{owner}_{repo}_{path}_{branch}"
cached_content = self.cache.get(cache_key)
if cached_content:
return cached_content
try:
# Try to get file content from API first
await self.rate_limiter.acquire()
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={branch}"
async with self.session.get(api_url) as response:
if response.status == 200:
file_data = await response.json()
content = {
'content': file_data.get('content', ''),
'encoding': file_data.get('encoding', 'base64'),
'url': file_data.get('html_url', '')
}
self.cache.set(cache_key, content)
return content
# Fallback to scraping if API fails
file_url = f"https://github.com/{owner}/{repo}/blob/{branch}/{path}"
html_content = await self.fetch_url(file_url, use_cache=True)
soup = BeautifulSoup(html_content, 'html.parser')
# Find the file content
file_content = soup.find('div', {'class': 'file-content'})
if file_content:
content = {
'content': file_content.get_text(),
'encoding': 'text',
'url': file_url
}
self.cache.set(cache_key, content)
return content
return {}
except Exception as e:
logger.error(f"Error fetching file content: {e}")
return {}
async def get_repo_topics(self, owner: str, repo: str) -> List[str]:
"""Get repository topics with caching."""
cache_key = f"topics_{owner}_{repo}"
cached_topics = self.cache.get(cache_key)
if cached_topics:
return cached_topics
try:
await self.rate_limiter.acquire()
api_url = f"https://api.github.com/repos/{owner}/{repo}/topics"
async with self.session.get(api_url, headers={'Accept': 'application/vnd.github.mercy-preview+json'}) as response:
if response.status == 200:
data = await response.json()
topics = data.get('names', [])
self.cache.set(cache_key, topics)
return topics
return []
except Exception as e:
logger.error(f"Error fetching topics: {e}")
return []
async def get_contributors(self, owner: str, repo: str) -> List[str]:
"""Get repository contributors with caching."""
cache_key = f"contributors_{owner}_{repo}"
cached_contributors = self.cache.get(cache_key)
if cached_contributors:
return cached_contributors
try:
await self.rate_limiter.acquire()
api_url = f"https://api.github.com/repos/{owner}/{repo}/contributors"
async with self.session.get(api_url) as response:
if response.status == 200:
contributors = await response.json()
contributor_list = [contributor['login'] for contributor in contributors]
self.cache.set(cache_key, contributor_list)
return contributor_list
return []
except Exception as e:
logger.error(f"Error fetching contributors: {e}")
return []
async def scrape_github_content(self, url: str) -> GitHubContent:
"""Main function to scrape GitHub content with caching."""
cache_key = f"content_{url}"
cached_content = self.cache.get(cache_key)
if cached_content:
return GitHubContent(**cached_content)
try:
# Parse the GitHub URL
repo_info = self.parse_github_url(url)
# Get repository metadata
metadata = await self.get_repo_metadata(repo_info['owner'], repo_info['repo'])
# Get content based on URL type
if not repo_info['path'] or repo_info['path'].lower() == 'readme.md':
content_data = await self.get_readme_content(
repo_info['owner'],
repo_info['repo'],
repo_info['branch']
)
else:
content_data = await self.get_file_content(
repo_info['owner'],
repo_info['repo'],
repo_info['path'],
repo_info['branch']
)
# Get additional metadata
topics = await self.get_repo_topics(repo_info['owner'], repo_info['repo'])
contributors = await self.get_contributors(repo_info['owner'], repo_info['repo'])
# Create GitHubContent object
content = GitHubContent(
title=metadata.get('name', ''),
description=metadata.get('description', ''),
content=content_data.get('content', ''),
language=metadata.get('language', ''),
stars=metadata.get('stargazers_count', 0),
forks=metadata.get('forks_count', 0),
watchers=metadata.get('watchers_count', 0),
last_updated=metadata.get('updated_at', ''),
topics=topics,
contributors=contributors,
readme_url=content_data.get('url', ''),
raw_content_url=metadata.get('html_url', ''),
license=metadata.get('license', {}).get('name', ''),
metadata={
'size': metadata.get('size', 0),
'open_issues': metadata.get('open_issues_count', 0),
'default_branch': metadata.get('default_branch', 'main'),
'created_at': metadata.get('created_at', ''),
'pushed_at': metadata.get('pushed_at', '')
}
)
# Cache the complete content
self.cache.set(cache_key, content.dict())
return content
except Exception as e:
logger.error(f"Error scraping GitHub content: {e}")
raise
async def main():
"""Example usage of the GitHub scraper with rate limiting and caching."""
scraper = GitHubScraper(
cache_dir=".github_cache",
ttl_hours=24,
calls_per_minute=30
)
async with scraper:
# Example URLs
urls = [
"https://github.com/owner/repo",
"https://github.com/owner/repo/blob/main/README.md",
"https://github.com/owner/repo/blob/main/src/main.py"
]
for url in urls:
try:
content = await scraper.scrape_github_content(url)
print(f"Scraped content from {url}:")
print(json.dumps(content.dict(), indent=2))
except Exception as e:
print(f"Error scraping {url}: {e}")
if __name__ == "__main__":
asyncio.run(main())