AI FAQ Generator & github blogs

This commit is contained in:
ajaysi
2025-05-04 17:04:44 +05:30
parent c51e355d26
commit 26b02b9719
9 changed files with 1810 additions and 463 deletions

View File

@@ -1,292 +1,422 @@
"""
Enhanced GitHub Content Scraper with Rate Limiting and Caching
This module provides functionality to scrape GitHub repositories, READMEs, and code files
for content marketing purposes. It includes async support, rate limiting, caching,
and comprehensive metadata collection.
"""
import os
import sys
import datetime
import pandas as pd
import json
import requests
import asyncio
import aiohttp
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Union
from urllib.parse import urljoin, urlparse
import pandas as pd
from bs4 import BeautifulSoup
from loguru import logger
import requests
from pydantic import BaseModel, Field
import time
import pickle
from pathlib import Path
# Configure logging
logger.remove()
logger.add(sys.stdout,
colorize=True,
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
)
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}")
from .take_url_screenshot import take_screenshot
from .gpt_providers.gemini_image_details import gemini_get_img_info
def get_readme_content(url):
try:
# Fetch the README content directly from the URL
response = requests.get(url)
print(response.status_code)
if response.status_code == 200:
logger.debug("Successfully fetched the README.md")
readme_content = response.text
else:
readme_content = None
return readme_content
except Exception as err:
logger.error(f"Failed to fetch raw readme from {url}: {err}: {response.status_code}")
sys.exit(1)
def get_gh_repo_metadata(github_url):
""" Function to get the repo details like stars, commits, forks etc """
logger.info("Scraping github with BS4 and requests.")
# download the target page
page = requests.get(github_url)
# parse the HTML document returned by the server
soup = BeautifulSoup(page.text, 'html.parser')
# initialize the object that will contain the scraped data
repo = {}
# repo scraping logic
name_html_element = soup.select_one('[itemprop="name"]')
name = name_html_element.get_text().strip()
git_branch_icon_html_element = soup.select_one('.octicon-git-branch')
main_branch_html_element = git_branch_icon_html_element.find_next_sibling('span')
main_branch = main_branch_html_element.get_text().strip()
# scrape the repo history data
boxheader_html_element = soup.select_one('.Box .Box-header')
# scrape the repo details in the right box
bordergrid_html_element = soup.select_one('.BorderGrid')
about_html_element = bordergrid_html_element.select_one('h2')
description_html_element = about_html_element.find_next_sibling('p')
description = description_html_element.get_text().strip()
star_icon_html_element = bordergrid_html_element.select_one('.octicon-star')
stars_html_element = star_icon_html_element.find_next_sibling('strong')
stars = stars_html_element.get_text().strip().replace(',', '')
eye_icon_html_element = bordergrid_html_element.select_one('.octicon-eye')
watchers_html_element = eye_icon_html_element.find_next_sibling('strong')
watchers = watchers_html_element.get_text().strip().replace(',', '')
fork_icon_html_element = bordergrid_html_element.select_one('.octicon-repo-forked')
forks_html_element = fork_icon_html_element.find_next_sibling('strong')
forks = forks_html_element.get_text().strip().replace(',', '')
# Find the div with class "f6" containing topic links
topic_div = soup.find('div', class_='f6')
if topic_div:
# Find all the topic links within the div
topic_links = topic_div.find_all('a', class_='topic-tag-link')
# Extract and print the topics
repo['topics'] = [link.text.strip() for link in topic_links]
# FIXME: Unable to scrape branch name.
repo['branch_name'] = None
# store the scraped data
repo['name'] = name
repo['about'] = description
repo['stars'] = stars
repo['watchers'] = watchers
repo['forks'] = forks
#repo['readme'] = readme
logger.info(f"Github Repo Details: {repo}")
return(repo)
def get_gh_details_vision(github_url, generated_image_filepath):
""" Take a screenshot of the url and feed to vision models for scraping details. """
logger.info(f"Take screenshot and pass it to gemini for repo details of {github_url}")
generated_image_filepath = take_screenshot(github_url, generated_image_filepath)
prompt = """From the given image of a github page, find out the number of stars, about, forks, last commit days, link url, topics and branch name. Return the result as json."""
class RateLimiter:
"""Rate limiter for GitHub API requests."""
try:
gh_details = gemini_get_img_info(prompt, generated_image_filepath)
logger.info(f"Github Repo details, from vision model: {gh_details}")
#gh_details = get_gh_repo_metadata(github_url)
except Exception as err:
logger.error(f"Failed to get gh images details: {err}")
gh_details = get_gh_repo_metadata(github_url)
return gh_details
# Convert string to dictionary Split the string into lines
lines = gh_details.split('\n')
# Remove the first and last line
modified_lines = lines[1:-1]
# Join the modified lines back into a string
gh_details = '\n'.join(modified_lines)
gh_details = json.loads(gh_details)
return(gh_details)
def research_github_topics(topics):
""" Scrape github topics of interest for top repos to write on """
# https://www.kaggle.com/code/subhaskumarray/scraping-github-topics-with-their-repositories
# We are going to scrape https://github.com/topics
# We will get a list of topics. For each topic, we will extract topic name, topic description and topic url.
# For each topic, we will get top 30 repositories with repo name, repo username, stars and repo url.
# Finally we are going to create csv file for each topic with respective repo details.
#github_topics = "https://github.com/topics/"
#response = requests.get(github_topics)
#if response.status_code != 200:
# logger.error(f'There is something wrong with {url}')
#response_contents = response.text
# Now we will parse the contents using BeautifulSoup:
#parsed_contents = BeautifulSoup(response_contents,'html.parser')
#logger.info("Get all topics, Titles and their urls from github.")
#topic_titles = get_topic_titles(parsed_contents)
#topic_desc = get_topic_desc(parsed_contents)
#topic_urls = get_topic_url(parsed_contents)
#topic_df = pd.DataFrame(list(zip(topic_titles, topic_desc,topic_urls)),\
# columns =['title', 'description', 'url'])
#logger.info(f"Scraped data from github: {topic_df}")
gh_topics = ['ai', 'ai-tools', 'ai-assistant', 'ai-agents-framework', 'llm', 'multi-agent', 'fine-tuning', 'rag', 'generative', 'prompt-engineering', 'generative-ai', 'text-to-image-generation', 'llm-ops', 'retrieval-augmented-generation', 'langchain', 'gemini-api', 'vertex-ai', 'huggingface', 'auto-gpt', 'llmops', 'ai-toolkit', 'chatbot', 'chatgpt', 'code-assistant', 'text-to-video', 'llms', 'gpt-4']
repo_info_dict = {
'username':[],
'repo_name': [],
'stars': [],
'repo_url': []
}
for agh_topic in gh_topics:
topic_url = f"https://github.com/topics/{agh_topic}"
first_topic_repo_page = download_repo_page(topic_url)
logger.info(f"Get details on github topic: {topic_url}")
repo_tags = first_topic_repo_page.find_all('h3', {'class': 'f3 color-fg-muted text-normal lh-condensed'})
star_tags = first_topic_repo_page.find_all('span', {'class': 'Counter js-social-count'})
def __init__(self, calls_per_minute: int = 30):
self.calls_per_minute = calls_per_minute
self.interval = 60 / calls_per_minute # seconds between calls
self.last_call_time = 0
self.lock = asyncio.Lock()
for i in range(len(repo_tags)):
repo_details = get_repo_info(repo_tags[i], star_tags[i])
async def acquire(self):
"""Acquire rate limit token."""
async with self.lock:
current_time = time.time()
time_since_last_call = current_time - self.last_call_time
if time_since_last_call < self.interval:
await asyncio.sleep(self.interval - time_since_last_call)
self.last_call_time = time.time()
class Cache:
"""Cache for GitHub content."""
def __init__(self, cache_dir: str = ".github_cache", ttl_hours: int = 24):
self.cache_dir = Path(cache_dir)
self.ttl = timedelta(hours=ttl_hours)
self.cache_dir.mkdir(exist_ok=True)
def _get_cache_path(self, key: str) -> Path:
"""Get cache file path for a key."""
return self.cache_dir / f"{hash(key)}.cache"
def get(self, key: str) -> Optional[Dict]:
"""Get cached value for key."""
cache_path = self._get_cache_path(key)
# Check if the repo URL is not already present in the dictionary
if repo_details[3] not in repo_info_dict['repo_url']:
# Store repos with more than 5000 stars.
if repo_details[2] > 5000:
repo_info_dict['username'].append(repo_details[0])
repo_info_dict['repo_name'].append(repo_details[1])
repo_info_dict['stars'].append(repo_details[2])
repo_info_dict['repo_url'].append(repo_details[3])
# Create a DataFrame from repo_info_dict
df_repo_info = pd.DataFrame(repo_info_dict['repo_url'])
# Check if the file already exists
csv_filename = 'github_url_to_write.csv'
if os.path.isfile(csv_filename):
# Append to the existing file
df_repo_info.to_csv(csv_filename, mode='a', header=False, index=False)
logger.info(f"Data appended to existing file: {csv_filename}")
else:
# Create a new file
df_repo_info.to_csv(csv_filename, index=False)
def get_topic_titles(parsed_content):
try:
selected_class = 'f3 lh-condensed mb-0 mt-1 Link--primary'
topic_title_tags = parsed_content.find_all('p',{'class':selected_class})
# We can make a list of topics
topic_titles = []
for tags in topic_title_tags:
topic_titles.append(tags.text)
return topic_titles
except Exception as err:
logger.error(f"Failed to get github topic titles: {err}")
def get_topic_desc(parsed_contents):
try:
desc_selector = 'f5 color-fg-muted mb-0 mt-1'
topic_desc_tags = parsed_contents.find_all('p',{'class': desc_selector})
print(f"{topic_desc_tags}")
topic_desc = []
for desc in topic_desc_tags:
print("dsfsfs")
topic_desc.append(desc.text.strip()) # strip() is used for trimming all extra spaces in description.
return topic_desc
except Exception as err:
logger.error(f"Failed to get github topic desc: {err}")
def get_topic_url(parsed_contents):
try:
topic_link_tag = parsed_contents.find_all('a',{'class':'no-underline flex-1 d-flex flex-column'})
topic_urls = []
base_url = 'http://github.com'
for urls in topic_link_tag:
topic_urls.append(base_url + urls['href'])
return topic_urls
except Exception as err:
logger.error(f"Failed to get github topic urls: {err}")
def download_repo_page(topic_url):
response = requests.get(topic_url)
if response.status_code != 200:
print('There is some error in {}'.format(topic_url))
response_contents = response.text
if not cache_path.exists():
return None
try:
with open(cache_path, 'rb') as f:
data = pickle.load(f)
if datetime.now() - data['timestamp'] > self.ttl:
cache_path.unlink()
return None
return data['value']
except Exception as e:
logger.warning(f"Cache read error for {key}: {e}")
return None
parsed_contents = BeautifulSoup(response_contents,'html.parser')
return parsed_contents
def set(self, key: str, value: Dict):
"""Set cache value for key."""
cache_path = self._get_cache_path(key)
try:
with open(cache_path, 'wb') as f:
pickle.dump({
'timestamp': datetime.now(),
'value': value
}, f)
except Exception as e:
logger.warning(f"Cache write error for {key}: {e}")
class GitHubContent(BaseModel):
"""Model for GitHub content analysis."""
title: str = Field("", description="Title of the content")
description: str = Field("", description="Description of the content")
content: str = Field("", description="Main content")
language: str = Field("", description="Programming language")
stars: int = Field(0, description="Number of stars")
forks: int = Field(0, description="Number of forks")
watchers: int = Field(0, description="Number of watchers")
last_updated: str = Field("", description="Last update date")
topics: List[str] = Field([], description="Repository topics")
contributors: List[str] = Field([], description="Contributor usernames")
readme_url: str = Field("", description="URL of the README")
raw_content_url: str = Field("", description="URL for raw content")
license: str = Field("", description="Repository license")
dependencies: List[str] = Field([], description="Project dependencies")
metadata: Dict = Field({}, description="Additional metadata")
def get_repo_info(repo_tags,star_tags):
# returns all info for a repo
a_tags = repo_tags.find_all('a')
username = a_tags[0].text.strip()
repo_name = a_tags[1].text.strip()
base_url = 'http://github.com/'
repo_url = base_url + a_tags[1]['href'].strip()
class GitHubScraper:
"""Service for scraping GitHub content with rate limiting and caching."""
# Defining a function so that it will convert our star count to integer
def star_counts_converter(stars):
stars = stars.strip()
if stars[-1] == 'k':
return int(float(stars[:-1]) * 1000)
return int(stars)
star_counts = star_counts_converter(star_tags.text.strip())
return username,repo_name,star_counts,repo_url
def __init__(self, cache_dir: str = ".github_cache", ttl_hours: int = 24, calls_per_minute: int = 30):
"""Initialize the scraper service."""
self.session = None
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'application/vnd.github.v3+json'
}
self.rate_limiter = RateLimiter(calls_per_minute)
self.cache = Cache(cache_dir, ttl_hours)
async def __aenter__(self):
"""Create aiohttp session when entering context."""
self.session = aiohttp.ClientSession(headers=self.headers)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Close aiohttp session when exiting context."""
if self.session:
await self.session.close()
async def fetch_url(self, url: str, use_cache: bool = True) -> str:
"""Fetch URL content asynchronously with rate limiting and caching."""
if use_cache:
cached_content = self.cache.get(url)
if cached_content:
logger.debug(f"Cache hit for {url}")
return cached_content
await self.rate_limiter.acquire()
try:
async with self.session.get(url) as response:
if response.status == 200:
content = await response.text()
if use_cache:
self.cache.set(url, content)
return content
else:
error_msg = f"Failed to fetch URL: Status code {response.status}"
logger.error(error_msg)
raise Exception(error_msg)
except Exception as e:
logger.error(f"Error fetching URL {url}: {e}")
raise
def parse_github_url(self, url: str) -> Dict[str, str]:
"""Parse GitHub URL to extract repository information."""
parsed = urlparse(url)
path_parts = parsed.path.strip('/').split('/')
if len(path_parts) < 2:
raise ValueError("Invalid GitHub URL format")
return {
'owner': path_parts[0],
'repo': path_parts[1],
'branch': path_parts[3] if len(path_parts) > 3 else 'main',
'path': '/'.join(path_parts[4:]) if len(path_parts) > 4 else ''
}
async def get_repo_metadata(self, owner: str, repo: str) -> Dict:
"""Get repository metadata from GitHub API with caching."""
cache_key = f"metadata_{owner}_{repo}"
cached_metadata = self.cache.get(cache_key)
if cached_metadata:
return cached_metadata
await self.rate_limiter.acquire()
api_url = f"https://api.github.com/repos/{owner}/{repo}"
try:
async with self.session.get(api_url) as response:
if response.status == 200:
metadata = await response.json()
self.cache.set(cache_key, metadata)
return metadata
else:
logger.error(f"Failed to fetch repo metadata: {response.status}")
return {}
except Exception as e:
logger.error(f"Error fetching repo metadata: {e}")
return {}
async def get_readme_content(self, owner: str, repo: str, branch: str = 'main') -> Dict:
"""Get README content from GitHub with caching."""
cache_key = f"readme_{owner}_{repo}_{branch}"
cached_content = self.cache.get(cache_key)
if cached_content:
return cached_content
try:
# Try to get README from API first
await self.rate_limiter.acquire()
api_url = f"https://api.github.com/repos/{owner}/{repo}/readme"
async with self.session.get(api_url) as response:
if response.status == 200:
readme_data = await response.json()
content = {
'content': readme_data.get('content', ''),
'encoding': readme_data.get('encoding', 'base64'),
'url': readme_data.get('html_url', '')
}
self.cache.set(cache_key, content)
return content
# Fallback to scraping if API fails
readme_url = f"https://github.com/{owner}/{repo}/blob/{branch}/README.md"
html_content = await self.fetch_url(readme_url, use_cache=True)
soup = BeautifulSoup(html_content, 'html.parser')
# Find the README content
readme_content = soup.find('div', {'class': 'markdown-body'})
if readme_content:
content = {
'content': readme_content.get_text(),
'encoding': 'text',
'url': readme_url
}
self.cache.set(cache_key, content)
return content
return {}
except Exception as e:
logger.error(f"Error fetching README: {e}")
return {}
async def get_file_content(self, owner: str, repo: str, path: str, branch: str = 'main') -> Dict:
"""Get content of a specific file from GitHub with caching."""
cache_key = f"file_{owner}_{repo}_{path}_{branch}"
cached_content = self.cache.get(cache_key)
if cached_content:
return cached_content
try:
# Try to get file content from API first
await self.rate_limiter.acquire()
api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}?ref={branch}"
async with self.session.get(api_url) as response:
if response.status == 200:
file_data = await response.json()
content = {
'content': file_data.get('content', ''),
'encoding': file_data.get('encoding', 'base64'),
'url': file_data.get('html_url', '')
}
self.cache.set(cache_key, content)
return content
# Fallback to scraping if API fails
file_url = f"https://github.com/{owner}/{repo}/blob/{branch}/{path}"
html_content = await self.fetch_url(file_url, use_cache=True)
soup = BeautifulSoup(html_content, 'html.parser')
# Find the file content
file_content = soup.find('div', {'class': 'file-content'})
if file_content:
content = {
'content': file_content.get_text(),
'encoding': 'text',
'url': file_url
}
self.cache.set(cache_key, content)
return content
return {}
except Exception as e:
logger.error(f"Error fetching file content: {e}")
return {}
async def get_repo_topics(self, owner: str, repo: str) -> List[str]:
"""Get repository topics with caching."""
cache_key = f"topics_{owner}_{repo}"
cached_topics = self.cache.get(cache_key)
if cached_topics:
return cached_topics
try:
await self.rate_limiter.acquire()
api_url = f"https://api.github.com/repos/{owner}/{repo}/topics"
async with self.session.get(api_url, headers={'Accept': 'application/vnd.github.mercy-preview+json'}) as response:
if response.status == 200:
data = await response.json()
topics = data.get('names', [])
self.cache.set(cache_key, topics)
return topics
return []
except Exception as e:
logger.error(f"Error fetching topics: {e}")
return []
async def get_contributors(self, owner: str, repo: str) -> List[str]:
"""Get repository contributors with caching."""
cache_key = f"contributors_{owner}_{repo}"
cached_contributors = self.cache.get(cache_key)
if cached_contributors:
return cached_contributors
try:
await self.rate_limiter.acquire()
api_url = f"https://api.github.com/repos/{owner}/{repo}/contributors"
async with self.session.get(api_url) as response:
if response.status == 200:
contributors = await response.json()
contributor_list = [contributor['login'] for contributor in contributors]
self.cache.set(cache_key, contributor_list)
return contributor_list
return []
except Exception as e:
logger.error(f"Error fetching contributors: {e}")
return []
async def scrape_github_content(self, url: str) -> GitHubContent:
"""Main function to scrape GitHub content with caching."""
cache_key = f"content_{url}"
cached_content = self.cache.get(cache_key)
if cached_content:
return GitHubContent(**cached_content)
try:
# Parse the GitHub URL
repo_info = self.parse_github_url(url)
# Get repository metadata
metadata = await self.get_repo_metadata(repo_info['owner'], repo_info['repo'])
# Get content based on URL type
if not repo_info['path'] or repo_info['path'].lower() == 'readme.md':
content_data = await self.get_readme_content(
repo_info['owner'],
repo_info['repo'],
repo_info['branch']
)
else:
content_data = await self.get_file_content(
repo_info['owner'],
repo_info['repo'],
repo_info['path'],
repo_info['branch']
)
# Get additional metadata
topics = await self.get_repo_topics(repo_info['owner'], repo_info['repo'])
contributors = await self.get_contributors(repo_info['owner'], repo_info['repo'])
# Create GitHubContent object
content = GitHubContent(
title=metadata.get('name', ''),
description=metadata.get('description', ''),
content=content_data.get('content', ''),
language=metadata.get('language', ''),
stars=metadata.get('stargazers_count', 0),
forks=metadata.get('forks_count', 0),
watchers=metadata.get('watchers_count', 0),
last_updated=metadata.get('updated_at', ''),
topics=topics,
contributors=contributors,
readme_url=content_data.get('url', ''),
raw_content_url=metadata.get('html_url', ''),
license=metadata.get('license', {}).get('name', ''),
metadata={
'size': metadata.get('size', 0),
'open_issues': metadata.get('open_issues_count', 0),
'default_branch': metadata.get('default_branch', 'main'),
'created_at': metadata.get('created_at', ''),
'pushed_at': metadata.get('pushed_at', '')
}
)
# Cache the complete content
self.cache.set(cache_key, content.dict())
return content
except Exception as e:
logger.error(f"Error scraping GitHub content: {e}")
raise
async def main():
"""Example usage of the GitHub scraper with rate limiting and caching."""
scraper = GitHubScraper(
cache_dir=".github_cache",
ttl_hours=24,
calls_per_minute=30
)
async with scraper:
# Example URLs
urls = [
"https://github.com/owner/repo",
"https://github.com/owner/repo/blob/main/README.md",
"https://github.com/owner/repo/blob/main/src/main.py"
]
for url in urls:
try:
content = await scraper.scrape_github_content(url)
print(f"Scraped content from {url}:")
print(json.dumps(content.dict(), indent=2))
except Exception as e:
print(f"Error scraping {url}: {e}")
def save_to_csv(topic_url,topic_name):
file_name = topic_name + '.csv'
if os.path.exists(file_name):
logger.debug(f"The file {file_name} already exists. Skipping.")
topics_df = topic_repo_details(topic_url)
topics_df.to_csv(file_name,index=None)
logger.info(f"Successfully scraped topic {topic_name}")
def check_if_already_written(github_url, file_path='papers_already_written_on.txt'):
"""
Check if a GitHub URL is an exact match in each line of a file.
Args:
github_url (str): GitHub URL string to check.
file_path (str): Path to the file containing lines to check against. Default is 'papers_already_written_on.txt'.
Returns:
bool: True if an exact match is found, False otherwise.
"""
try:
with open(file_path, 'r', encoding="utf-8") as file:
# Read each line in the file
for line in file:
# Check for an exact match
if github_url.strip() == line.strip():
return True
except FileNotFoundError:
print(f"File not found: {file_path}")
except Exception as e:
print(f"An error occurred: {str(e)}")
return False
if __name__ == "__main__":
asyncio.run(main())