981 lines
34 KiB
Python
981 lines
34 KiB
Python
####################################################
|
|
#
|
|
# FIXME: Gotta use this lib: https://github.com/monk1337/resp/tree/main
|
|
# https://github.com/danielnsilva/semanticscholar
|
|
# https://github.com/shauryr/S2QA
|
|
#
|
|
####################################################
|
|
|
|
|
|
import os
|
|
import sys
|
|
import re
|
|
import pandas as pd
|
|
import arxiv
|
|
import PyPDF2
|
|
import requests
|
|
import networkx as nx
|
|
from bs4 import BeautifulSoup
|
|
from urllib.parse import urlparse
|
|
from loguru import logger
|
|
from ..gpt_providers.text_generation.main_text_generation import llm_text_gen
|
|
import bibtexparser
|
|
from pylatexenc.latex2text import LatexNodes2Text
|
|
from matplotlib import pyplot as plt
|
|
from collections import defaultdict
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
from sklearn.cluster import KMeans
|
|
import numpy as np
|
|
|
|
logger.remove()
|
|
logger.add(sys.stdout, colorize=True, format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}")
|
|
|
|
def create_arxiv_client(page_size=100, delay_seconds=3.0, num_retries=3):
|
|
"""
|
|
Creates a reusable arXiv API client with custom configuration.
|
|
|
|
Args:
|
|
page_size (int): Number of results per page (default: 100)
|
|
delay_seconds (float): Delay between API requests (default: 3.0)
|
|
num_retries (int): Number of retries for failed requests (default: 3)
|
|
|
|
Returns:
|
|
arxiv.Client: Configured arXiv API client
|
|
"""
|
|
try:
|
|
client = arxiv.Client(
|
|
page_size=page_size,
|
|
delay_seconds=delay_seconds,
|
|
num_retries=num_retries
|
|
)
|
|
return client
|
|
except Exception as e:
|
|
logger.error(f"Error creating arXiv client: {e}")
|
|
raise e
|
|
|
|
def expand_search_query(query, research_interests=None):
|
|
"""
|
|
Uses AI to expand the search query based on user's research interests.
|
|
|
|
Args:
|
|
query (str): Original search query
|
|
research_interests (list): List of user's research interests
|
|
|
|
Returns:
|
|
str: Expanded search query
|
|
"""
|
|
try:
|
|
interests_context = "\n".join(research_interests) if research_interests else ""
|
|
prompt = f"""Given the original arXiv search query: '{query}'
|
|
{f'And considering these research interests:\n{interests_context}' if interests_context else ''}
|
|
Generate an expanded arXiv search query that:
|
|
1. Includes relevant synonyms and related concepts
|
|
2. Uses appropriate arXiv search operators (AND, OR, etc.)
|
|
3. Incorporates field-specific tags (ti:, abs:, au:, etc.)
|
|
4. Maintains focus on the core topic
|
|
Return only the expanded query without any explanation."""
|
|
|
|
expanded_query = llm_text_gen(prompt)
|
|
logger.info(f"Expanded query: {expanded_query}")
|
|
return expanded_query
|
|
except Exception as e:
|
|
logger.error(f"Error expanding search query: {e}")
|
|
return query
|
|
|
|
def analyze_citation_network(papers):
|
|
"""
|
|
Analyzes citation relationships between papers using DOIs and references.
|
|
|
|
Args:
|
|
papers (list): List of paper metadata dictionaries
|
|
|
|
Returns:
|
|
dict: Citation network analysis results
|
|
"""
|
|
try:
|
|
# Create a directed graph for citations
|
|
G = nx.DiGraph()
|
|
|
|
# Add nodes and edges
|
|
for paper in papers:
|
|
paper_id = paper['entry_id']
|
|
G.add_node(paper_id, title=paper['title'])
|
|
|
|
# Add edges based on DOIs and references
|
|
if paper['doi']:
|
|
for other_paper in papers:
|
|
if other_paper['doi'] and other_paper['doi'] in paper['summary']:
|
|
G.add_edge(paper_id, other_paper['entry_id'])
|
|
|
|
# Calculate network metrics
|
|
analysis = {
|
|
'influential_papers': sorted(nx.pagerank(G).items(), key=lambda x: x[1], reverse=True),
|
|
'citation_clusters': list(nx.connected_components(G.to_undirected())),
|
|
'citation_paths': dict(nx.all_pairs_shortest_path_length(G))
|
|
}
|
|
return analysis
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing citation network: {e}")
|
|
return {}
|
|
|
|
def categorize_papers(papers):
|
|
"""
|
|
Uses AI to categorize papers based on their metadata and content.
|
|
|
|
Args:
|
|
papers (list): List of paper metadata dictionaries
|
|
|
|
Returns:
|
|
dict: Paper categorization results
|
|
"""
|
|
try:
|
|
categorized_papers = {}
|
|
for paper in papers:
|
|
prompt = f"""Analyze this research paper and provide detailed categorization:
|
|
Title: {paper['title']}
|
|
Abstract: {paper['summary']}
|
|
Primary Category: {paper['primary_category']}
|
|
Categories: {', '.join(paper['categories'])}
|
|
|
|
Provide a JSON response with these fields:
|
|
1. main_theme: Primary research theme
|
|
2. sub_themes: List of related sub-themes
|
|
3. methodology: Research methodology used
|
|
4. application_domains: Potential application areas
|
|
5. technical_complexity: Level (Basic/Intermediate/Advanced)"""
|
|
|
|
categorization = llm_text_gen(prompt)
|
|
categorized_papers[paper['entry_id']] = categorization
|
|
|
|
return categorized_papers
|
|
except Exception as e:
|
|
logger.error(f"Error categorizing papers: {e}")
|
|
return {}
|
|
|
|
def get_paper_recommendations(papers, research_interests):
|
|
"""
|
|
Generates personalized paper recommendations based on user's research interests.
|
|
|
|
Args:
|
|
papers (list): List of paper metadata dictionaries
|
|
research_interests (list): User's research interests
|
|
|
|
Returns:
|
|
dict: Personalized paper recommendations
|
|
"""
|
|
try:
|
|
interests_text = "\n".join(research_interests)
|
|
recommendations = {}
|
|
|
|
for paper in papers:
|
|
prompt = f"""Evaluate this paper's relevance to the user's research interests:
|
|
Paper:
|
|
- Title: {paper['title']}
|
|
- Abstract: {paper['summary']}
|
|
- Categories: {', '.join(paper['categories'])}
|
|
|
|
User's Research Interests:
|
|
{interests_text}
|
|
|
|
Provide a JSON response with:
|
|
1. relevance_score: 0-100
|
|
2. relevance_aspects: List of matching aspects
|
|
3. potential_value: How this paper could benefit the user's research"""
|
|
|
|
evaluation = llm_text_gen(prompt)
|
|
recommendations[paper['entry_id']] = evaluation
|
|
|
|
return recommendations
|
|
except Exception as e:
|
|
logger.error(f"Error generating paper recommendations: {e}")
|
|
return {}
|
|
|
|
def fetch_arxiv_data(query, max_results=10, sort_by=arxiv.SortCriterion.SubmittedDate, sort_order=None, client=None, research_interests=None):
|
|
"""
|
|
Fetches arXiv data based on a query with advanced search options.
|
|
|
|
Args:
|
|
query (str): The search query (supports advanced syntax, e.g., 'au:einstein AND cat:physics')
|
|
max_results (int): The maximum number of results to fetch
|
|
sort_by (arxiv.SortCriterion): Sorting criterion (default: SubmittedDate)
|
|
sort_order (str): Sort order ('ascending' or 'descending', default: None)
|
|
client (arxiv.Client): Optional custom client (default: None, creates new client)
|
|
|
|
Returns:
|
|
list: A list of arXiv data with extended metadata
|
|
"""
|
|
try:
|
|
if client is None:
|
|
client = create_arxiv_client()
|
|
|
|
# Expand search query using AI if research interests are provided
|
|
expanded_query = expand_search_query(query, research_interests) if research_interests else query
|
|
logger.info(f"Using expanded query: {expanded_query}")
|
|
|
|
search = arxiv.Search(
|
|
query=expanded_query,
|
|
max_results=max_results,
|
|
sort_by=sort_by,
|
|
sort_order=sort_order
|
|
)
|
|
|
|
results = list(client.results(search))
|
|
all_data = [
|
|
{
|
|
'title': result.title,
|
|
'published': result.published,
|
|
'updated': result.updated,
|
|
'entry_id': result.entry_id,
|
|
'summary': result.summary,
|
|
'authors': [str(author) for author in result.authors],
|
|
'pdf_url': result.pdf_url,
|
|
'journal_ref': getattr(result, 'journal_ref', None),
|
|
'doi': getattr(result, 'doi', None),
|
|
'primary_category': getattr(result, 'primary_category', None),
|
|
'categories': getattr(result, 'categories', []),
|
|
'links': [link.href for link in getattr(result, 'links', [])]
|
|
}
|
|
for result in results
|
|
]
|
|
|
|
# Enhance results with AI-powered analysis
|
|
if all_data:
|
|
# Analyze citation network
|
|
citation_analysis = analyze_citation_network(all_data)
|
|
|
|
# Categorize papers using AI
|
|
paper_categories = categorize_papers(all_data)
|
|
|
|
# Generate recommendations if research interests are provided
|
|
recommendations = get_paper_recommendations(all_data, research_interests) if research_interests else {}
|
|
|
|
# Perform content analysis
|
|
content_analyses = [analyze_paper_content(paper['entry_id']) for paper in all_data]
|
|
trend_analysis = analyze_research_trends(all_data)
|
|
concept_mapping = map_cross_paper_concepts(all_data)
|
|
|
|
# Generate bibliography data
|
|
bibliography_data = {
|
|
'bibtex_entries': [generate_bibtex_entry(paper) for paper in all_data],
|
|
'citations': {
|
|
'apa': [convert_citation_format(generate_bibtex_entry(paper), 'apa') for paper in all_data],
|
|
'mla': [convert_citation_format(generate_bibtex_entry(paper), 'mla') for paper in all_data],
|
|
'chicago': [convert_citation_format(generate_bibtex_entry(paper), 'chicago') for paper in all_data]
|
|
},
|
|
'reference_graph': visualize_reference_graph(all_data),
|
|
'citation_impact': analyze_citation_impact(all_data)
|
|
}
|
|
|
|
# Add enhanced data to results
|
|
enhanced_data = {
|
|
'papers': all_data,
|
|
'citation_analysis': citation_analysis,
|
|
'paper_categories': paper_categories,
|
|
'recommendations': recommendations,
|
|
'content_analyses': content_analyses,
|
|
'trend_analysis': trend_analysis,
|
|
'concept_mapping': concept_mapping,
|
|
'bibliography': bibliography_data
|
|
}
|
|
return enhanced_data
|
|
|
|
return {'papers': all_data}
|
|
except Exception as e:
|
|
logger.error(f"An error occurred while fetching data from arXiv: {e}")
|
|
raise e
|
|
|
|
def create_dataframe(data, column_names):
|
|
"""
|
|
Creates a DataFrame from the provided data.
|
|
|
|
Args:
|
|
data (list): The data to convert to a DataFrame.
|
|
column_names (list): The column names for the DataFrame.
|
|
|
|
Returns:
|
|
DataFrame: The created DataFrame.
|
|
"""
|
|
try:
|
|
df = pd.DataFrame(data, columns=column_names)
|
|
return df
|
|
except Exception as e:
|
|
logger.error(f"An error occurred while creating DataFrame: {e}")
|
|
return pd.DataFrame()
|
|
|
|
def get_arxiv_main_content(url):
|
|
"""
|
|
Returns the main content of an arXiv paper.
|
|
|
|
Args:
|
|
url (str): The URL of the arXiv paper.
|
|
|
|
Returns:
|
|
str: The main content of the paper as a string.
|
|
"""
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
soup = BeautifulSoup(response.content, "html.parser")
|
|
main_content = soup.find('div', class_='ltx_page_content')
|
|
if not main_content:
|
|
logger.warning("Main content not found in the page.")
|
|
return "Main content not found."
|
|
alert_section = main_content.find('div', class_='package-alerts ltx_document')
|
|
if (alert_section):
|
|
alert_section.decompose()
|
|
for element_id in ["abs", "authors"]:
|
|
element = main_content.find(id=element_id)
|
|
if (element):
|
|
element.decompose()
|
|
return main_content.text.strip()
|
|
except Exception as html_error:
|
|
logger.warning(f"HTML content not accessible, trying PDF: {html_error}")
|
|
return get_pdf_content(url)
|
|
|
|
def download_paper(paper_id, output_dir="downloads", filename=None, get_source=False):
|
|
"""
|
|
Downloads a paper's PDF or source files with enhanced error handling.
|
|
|
|
Args:
|
|
paper_id (str): The arXiv ID of the paper
|
|
output_dir (str): Directory to save the downloaded file (default: 'downloads')
|
|
filename (str): Custom filename (default: None, uses paper ID)
|
|
get_source (bool): If True, downloads source files instead of PDF (default: False)
|
|
|
|
Returns:
|
|
str: Path to the downloaded file or None if download fails
|
|
"""
|
|
try:
|
|
# Create output directory if it doesn't exist
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Get paper metadata
|
|
client = create_arxiv_client()
|
|
paper = next(client.results(arxiv.Search(id_list=[paper_id])))
|
|
|
|
# Set filename if not provided
|
|
if not filename:
|
|
safe_title = re.sub(r'[^\w\-_.]', '_', paper.title[:50])
|
|
filename = f"{paper_id}_{safe_title}"
|
|
filename += ".tar.gz" if get_source else ".pdf"
|
|
|
|
# Full path for the downloaded file
|
|
file_path = os.path.join(output_dir, filename)
|
|
|
|
# Download the file
|
|
if get_source:
|
|
paper.download_source(dirpath=output_dir, filename=filename)
|
|
else:
|
|
paper.download_pdf(dirpath=output_dir, filename=filename)
|
|
|
|
logger.info(f"Successfully downloaded {'source' if get_source else 'PDF'} to {file_path}")
|
|
return file_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error downloading {'source' if get_source else 'PDF'} for {paper_id}: {e}")
|
|
return None
|
|
|
|
def analyze_paper_content(url_or_id, cleanup=True):
|
|
"""
|
|
Analyzes paper content using AI to extract key information and insights.
|
|
|
|
Args:
|
|
url_or_id (str): The arXiv URL or ID of the paper
|
|
cleanup (bool): Whether to delete the PDF after extraction (default: True)
|
|
|
|
Returns:
|
|
dict: Analysis results including summary, key findings, and concepts
|
|
"""
|
|
try:
|
|
# Get paper content
|
|
content = get_pdf_content(url_or_id, cleanup)
|
|
if not content or 'Failed to' in content:
|
|
return {'error': content}
|
|
|
|
# Generate paper summary
|
|
summary_prompt = f"""Analyze this research paper and provide a comprehensive summary:
|
|
{content[:8000]} # Limit content length for API
|
|
|
|
Provide a JSON response with:
|
|
1. executive_summary: Brief overview (2-3 sentences)
|
|
2. key_findings: List of main research findings
|
|
3. methodology: Research methods used
|
|
4. implications: Practical implications of the research
|
|
5. limitations: Study limitations and constraints"""
|
|
|
|
summary_analysis = llm_text_gen(summary_prompt)
|
|
|
|
# Extract key concepts and relationships
|
|
concepts_prompt = f"""Analyze this research paper and identify key concepts and relationships:
|
|
{content[:8000]}
|
|
|
|
Provide a JSON response with:
|
|
1. main_concepts: List of key technical concepts
|
|
2. concept_relationships: How concepts are related
|
|
3. novel_contributions: New ideas or approaches introduced
|
|
4. technical_requirements: Required technologies or methods
|
|
5. future_directions: Suggested future research"""
|
|
|
|
concept_analysis = llm_text_gen(concepts_prompt)
|
|
|
|
return {
|
|
'summary_analysis': summary_analysis,
|
|
'concept_analysis': concept_analysis,
|
|
'full_text': content
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing paper content: {e}")
|
|
return {'error': str(e)}
|
|
|
|
def analyze_research_trends(papers):
|
|
"""
|
|
Analyzes research trends across multiple papers.
|
|
|
|
Args:
|
|
papers (list): List of paper metadata and content
|
|
|
|
Returns:
|
|
dict: Trend analysis results
|
|
"""
|
|
try:
|
|
# Collect paper information
|
|
papers_info = []
|
|
for paper in papers:
|
|
content = get_pdf_content(paper['entry_id'], cleanup=True)
|
|
if content and 'Failed to' not in content:
|
|
papers_info.append({
|
|
'title': paper['title'],
|
|
'abstract': paper['summary'],
|
|
'content': content[:8000], # Limit content length
|
|
'year': paper['published'].year
|
|
})
|
|
|
|
if not papers_info:
|
|
return {'error': 'No valid paper content found for analysis'}
|
|
|
|
# Analyze trends
|
|
trends_prompt = f"""Analyze these research papers and identify key trends:
|
|
Papers:
|
|
{str(papers_info)}
|
|
|
|
Provide a JSON response with:
|
|
1. temporal_trends: How research focus evolved over time
|
|
2. emerging_themes: New and growing research areas
|
|
3. declining_themes: Decreasing research focus areas
|
|
4. methodology_trends: Evolution of research methods
|
|
5. technology_trends: Trends in technology usage
|
|
6. research_gaps: Identified gaps and opportunities"""
|
|
|
|
trend_analysis = llm_text_gen(trends_prompt)
|
|
return {'trend_analysis': trend_analysis}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing research trends: {e}")
|
|
return {'error': str(e)}
|
|
|
|
def map_cross_paper_concepts(papers):
|
|
"""
|
|
Maps concepts and relationships across multiple papers.
|
|
|
|
Args:
|
|
papers (list): List of paper metadata and content
|
|
|
|
Returns:
|
|
dict: Concept mapping results
|
|
"""
|
|
try:
|
|
# Analyze each paper
|
|
paper_analyses = []
|
|
for paper in papers:
|
|
analysis = analyze_paper_content(paper['entry_id'])
|
|
if 'error' not in analysis:
|
|
paper_analyses.append({
|
|
'paper_id': paper['entry_id'],
|
|
'title': paper['title'],
|
|
'analysis': analysis
|
|
})
|
|
|
|
if not paper_analyses:
|
|
return {'error': 'No valid paper analyses for concept mapping'}
|
|
|
|
# Generate cross-paper concept map
|
|
mapping_prompt = f"""Analyze relationships between concepts across these papers:
|
|
{str(paper_analyses)}
|
|
|
|
Provide a JSON response with:
|
|
1. shared_concepts: Concepts appearing in multiple papers
|
|
2. concept_evolution: How concepts developed across papers
|
|
3. conflicting_views: Different interpretations of same concepts
|
|
4. complementary_findings: How papers complement each other
|
|
5. knowledge_gaps: Areas needing more research"""
|
|
|
|
concept_mapping = llm_text_gen(mapping_prompt)
|
|
return {'concept_mapping': concept_mapping}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error mapping cross-paper concepts: {e}")
|
|
return {'error': str(e)}
|
|
|
|
def generate_bibtex_entry(paper):
|
|
"""
|
|
Generates a BibTeX entry for a paper with complete metadata.
|
|
|
|
Args:
|
|
paper (dict): Paper metadata dictionary
|
|
|
|
Returns:
|
|
str: BibTeX entry string
|
|
"""
|
|
try:
|
|
# Generate a unique citation key
|
|
first_author = paper['authors'][0].split()[-1] if paper['authors'] else 'Unknown'
|
|
year = paper['published'].year if paper['published'] else '0000'
|
|
citation_key = f"{first_author}{year}{paper['entry_id'].split('/')[-1]}"
|
|
|
|
# Format authors for BibTeX
|
|
authors = ' and '.join(paper['authors'])
|
|
|
|
# Create BibTeX entry
|
|
bibtex = f"@article{{{citation_key},\n"
|
|
bibtex += f" title = {{{paper['title']}}},\n"
|
|
bibtex += f" author = {{{authors}}},\n"
|
|
bibtex += f" year = {{{year}}},\n"
|
|
bibtex += f" journal = {{arXiv preprint}},\n"
|
|
bibtex += f" archivePrefix = {{arXiv}},\n"
|
|
bibtex += f" eprint = {{{paper['entry_id'].split('/')[-1]}}},\n"
|
|
if paper['doi']:
|
|
bibtex += f" doi = {{{paper['doi']}}},\n"
|
|
bibtex += f" url = {{{paper['entry_id']}}},\n"
|
|
bibtex += f" abstract = {{{paper['summary']}}}\n"
|
|
bibtex += "}"
|
|
|
|
return bibtex
|
|
except Exception as e:
|
|
logger.error(f"Error generating BibTeX entry: {e}")
|
|
return ""
|
|
|
|
def convert_citation_format(bibtex_str, target_format):
|
|
"""
|
|
Converts BibTeX citations to other formats and validates the output.
|
|
|
|
Args:
|
|
bibtex_str (str): BibTeX entry string
|
|
target_format (str): Target citation format ('apa', 'mla', 'chicago', etc.)
|
|
|
|
Returns:
|
|
str: Formatted citation string
|
|
"""
|
|
try:
|
|
# Parse BibTeX entry
|
|
bib_database = bibtexparser.loads(bibtex_str)
|
|
entry = bib_database.entries[0]
|
|
|
|
# Generate citation format prompt
|
|
prompt = f"""Convert this bibliographic information to {target_format} format:
|
|
Title: {entry.get('title', '')}
|
|
Authors: {entry.get('author', '')}
|
|
Year: {entry.get('year', '')}
|
|
Journal: {entry.get('journal', '')}
|
|
DOI: {entry.get('doi', '')}
|
|
URL: {entry.get('url', '')}
|
|
|
|
Return only the formatted citation without any explanation."""
|
|
|
|
# Use AI to generate formatted citation
|
|
formatted_citation = llm_text_gen(prompt)
|
|
return formatted_citation.strip()
|
|
except Exception as e:
|
|
logger.error(f"Error converting citation format: {e}")
|
|
return ""
|
|
|
|
def visualize_reference_graph(papers):
|
|
"""
|
|
Creates a visual representation of the citation network.
|
|
|
|
Args:
|
|
papers (list): List of paper metadata dictionaries
|
|
|
|
Returns:
|
|
str: Path to the saved visualization file
|
|
"""
|
|
try:
|
|
# Create directed graph
|
|
G = nx.DiGraph()
|
|
|
|
# Add nodes and edges
|
|
for paper in papers:
|
|
paper_id = paper['entry_id']
|
|
G.add_node(paper_id, title=paper['title'])
|
|
|
|
# Add citation edges
|
|
if paper['doi']:
|
|
for other_paper in papers:
|
|
if other_paper['doi'] and other_paper['doi'] in paper['summary']:
|
|
G.add_edge(paper_id, other_paper['entry_id'])
|
|
|
|
# Set up the visualization
|
|
plt.figure(figsize=(12, 8))
|
|
pos = nx.spring_layout(G)
|
|
|
|
# Draw the graph
|
|
nx.draw(G, pos, with_labels=False, node_color='lightblue',
|
|
node_size=1000, arrowsize=20)
|
|
|
|
# Add labels
|
|
labels = nx.get_node_attributes(G, 'title')
|
|
nx.draw_networkx_labels(G, pos, labels, font_size=8)
|
|
|
|
# Save the visualization
|
|
output_path = 'reference_graph.png'
|
|
plt.savefig(output_path, dpi=300, bbox_inches='tight')
|
|
plt.close()
|
|
|
|
return output_path
|
|
except Exception as e:
|
|
logger.error(f"Error visualizing reference graph: {e}")
|
|
return ""
|
|
|
|
def analyze_citation_impact(papers):
|
|
"""
|
|
Analyzes citation impact and influence patterns.
|
|
|
|
Args:
|
|
papers (list): List of paper metadata dictionaries
|
|
|
|
Returns:
|
|
dict: Citation impact analysis results
|
|
"""
|
|
try:
|
|
# Create citation network
|
|
G = nx.DiGraph()
|
|
for paper in papers:
|
|
G.add_node(paper['entry_id'], **paper)
|
|
if paper['doi']:
|
|
for other_paper in papers:
|
|
if other_paper['doi'] and other_paper['doi'] in paper['summary']:
|
|
G.add_edge(paper_id, other_paper['entry_id'])
|
|
|
|
# Calculate impact metrics
|
|
impact_analysis = {
|
|
'citation_counts': dict(G.in_degree()),
|
|
'influence_scores': nx.pagerank(G),
|
|
'authority_scores': nx.authority_matrix(G).diagonal(),
|
|
'hub_scores': nx.hub_matrix(G).diagonal(),
|
|
'citation_paths': dict(nx.all_pairs_shortest_path_length(G))
|
|
}
|
|
|
|
# Add temporal analysis
|
|
year_citations = defaultdict(int)
|
|
for paper in papers:
|
|
if paper['published']:
|
|
year = paper['published'].year
|
|
year_citations[year] += G.in_degree(paper['entry_id'])
|
|
impact_analysis['temporal_trends'] = dict(year_citations)
|
|
|
|
return impact_analysis
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing citation impact: {e}")
|
|
return {}
|
|
|
|
def get_pdf_content(url_or_id, cleanup=True):
|
|
"""
|
|
Extracts text content from a paper's PDF with improved error handling.
|
|
|
|
Args:
|
|
url_or_id (str): The arXiv URL or ID of the paper
|
|
cleanup (bool): Whether to delete the PDF after extraction (default: True)
|
|
|
|
Returns:
|
|
str: The extracted text content or error message
|
|
"""
|
|
try:
|
|
# Extract arxiv ID from URL if needed
|
|
arxiv_id = url_or_id.split('/')[-1] if '/' in url_or_id else url_or_id
|
|
|
|
# Download PDF
|
|
pdf_path = download_paper(arxiv_id)
|
|
if not pdf_path:
|
|
return "Failed to download PDF."
|
|
|
|
# Extract text from PDF
|
|
pdf_text = ''
|
|
with open(pdf_path, 'rb') as f:
|
|
pdf_reader = PyPDF2.PdfReader(f)
|
|
for page_num, page in enumerate(pdf_reader.pages, 1):
|
|
try:
|
|
page_text = page.extract_text()
|
|
if page_text:
|
|
pdf_text += f"\n--- Page {page_num} ---\n{page_text}"
|
|
except Exception as err:
|
|
logger.error(f"Error extracting text from page {page_num}: {err}")
|
|
continue
|
|
|
|
# Clean up
|
|
if cleanup:
|
|
try:
|
|
os.remove(pdf_path)
|
|
logger.debug(f"Cleaned up temporary PDF file: {pdf_path}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to cleanup PDF file {pdf_path}: {e}")
|
|
|
|
# Process and return text
|
|
if not pdf_text.strip():
|
|
return "No text content could be extracted from the PDF."
|
|
|
|
return clean_pdf_text(pdf_text)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to process PDF: {e}")
|
|
return f"Failed to retrieve content: {str(e)}"
|
|
|
|
def clean_pdf_text(text):
|
|
"""
|
|
Helper function to clean the text extracted from a PDF.
|
|
|
|
Args:
|
|
text (str): The text to clean.
|
|
|
|
Returns:
|
|
str: The cleaned text.
|
|
"""
|
|
pattern = r'References\s*.*'
|
|
text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL)
|
|
sections_to_remove = ['Acknowledgements', 'References', 'Bibliography']
|
|
for section in sections_to_remove:
|
|
pattern = r'(' + re.escape(section) + r'\s*.*?)(?=\n[A-Z]{2,}|$)'
|
|
text = re.sub(pattern, '', text, flags=re.DOTALL | re.IGNORECASE)
|
|
return text
|
|
|
|
def download_image(image_url, base_url, folder="images"):
|
|
"""
|
|
Downloads an image from a URL.
|
|
|
|
Args:
|
|
image_url (str): The URL of the image.
|
|
base_url (str): The base URL of the website.
|
|
folder (str): The folder to save the image.
|
|
|
|
Returns:
|
|
bool: True if the image was downloaded successfully, False otherwise.
|
|
"""
|
|
if image_url.startswith('data:image'):
|
|
logger.info(f"Skipping download of data URI image: {image_url}")
|
|
return False
|
|
if not os.path.exists(folder):
|
|
os.makedirs(folder)
|
|
if not urlparse(image_url).scheme:
|
|
if not base_url.endswith('/'):
|
|
base_url += '/'
|
|
image_url = base_url + image_url
|
|
try:
|
|
response = requests.get(image_url)
|
|
response.raise_for_status()
|
|
image_name = image_url.split("/")[-1]
|
|
with open(os.path.join(folder, image_name), 'wb') as file:
|
|
file.write(response.content)
|
|
return True
|
|
except requests.RequestException as e:
|
|
logger.error(f"Error downloading {image_url}: {e}")
|
|
return False
|
|
|
|
def scrape_images_from_arxiv(url):
|
|
"""
|
|
Scrapes images from an arXiv page.
|
|
|
|
Args:
|
|
url (str): The URL of the arXiv page.
|
|
|
|
Returns:
|
|
list: A list of image URLs.
|
|
"""
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
soup = BeautifulSoup(response.text, 'html.parser')
|
|
images = soup.find_all('img')
|
|
image_urls = [img['src'] for img in images if 'src' in img.attrs]
|
|
return image_urls
|
|
except requests.RequestException as e:
|
|
logger.error(f"Error fetching page {url}: {e}")
|
|
return []
|
|
|
|
def generate_bibtex(paper_id, client=None):
|
|
"""
|
|
Generate a BibTeX entry for an arXiv paper with enhanced metadata.
|
|
|
|
Args:
|
|
paper_id (str): The arXiv ID of the paper
|
|
client (arxiv.Client): Optional custom client (default: None)
|
|
|
|
Returns:
|
|
str: BibTeX entry as a string
|
|
"""
|
|
try:
|
|
if client is None:
|
|
client = create_arxiv_client()
|
|
|
|
# Fetch paper metadata
|
|
paper = next(client.results(arxiv.Search(id_list=[paper_id])))
|
|
|
|
# Extract author information
|
|
authors = [str(author) for author in paper.authors]
|
|
first_author = authors[0].split(', ')[0] if authors else 'Unknown'
|
|
|
|
# Format year
|
|
year = paper.published.year if paper.published else 'Unknown'
|
|
|
|
# Create citation key
|
|
citation_key = f"{first_author}{str(year)[-2:]}"
|
|
|
|
# Build BibTeX entry
|
|
bibtex = [
|
|
f"@article{{{citation_key},",
|
|
f" author = {{{' and '.join(authors)}}},",
|
|
f" title = {{{paper.title}}},",
|
|
f" year = {{{year}}},",
|
|
f" eprint = {{{paper_id}}},",
|
|
f" archivePrefix = {{arXiv}},"
|
|
]
|
|
|
|
# Add optional fields if available
|
|
if paper.doi:
|
|
bibtex.append(f" doi = {{{paper.doi}}},")
|
|
if getattr(paper, 'journal_ref', None):
|
|
bibtex.append(f" journal = {{{paper.journal_ref}}},")
|
|
if getattr(paper, 'primary_category', None):
|
|
bibtex.append(f" primaryClass = {{{paper.primary_category}}},")
|
|
|
|
# Add URL and close entry
|
|
bibtex.extend([
|
|
f" url = {{https://arxiv.org/abs/{paper_id}}}",
|
|
"}"
|
|
])
|
|
|
|
return '\n'.join(bibtex)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error generating BibTeX for {paper_id}: {e}")
|
|
return ""
|
|
|
|
def batch_download_papers(paper_ids, output_dir="downloads", get_source=False):
|
|
"""
|
|
Download multiple papers in batch with progress tracking.
|
|
|
|
Args:
|
|
paper_ids (list): List of arXiv IDs to download
|
|
output_dir (str): Directory to save downloaded files (default: 'downloads')
|
|
get_source (bool): If True, downloads source files instead of PDFs (default: False)
|
|
|
|
Returns:
|
|
dict: Mapping of paper IDs to their download status and paths
|
|
"""
|
|
results = {}
|
|
client = create_arxiv_client()
|
|
|
|
for paper_id in paper_ids:
|
|
try:
|
|
file_path = download_paper(paper_id, output_dir, get_source=get_source)
|
|
results[paper_id] = {
|
|
'success': bool(file_path),
|
|
'path': file_path,
|
|
'error': None
|
|
}
|
|
except Exception as e:
|
|
results[paper_id] = {
|
|
'success': False,
|
|
'path': None,
|
|
'error': str(e)
|
|
}
|
|
logger.error(f"Failed to download {paper_id}: {e}")
|
|
|
|
return results
|
|
|
|
def batch_generate_bibtex(paper_ids):
|
|
"""
|
|
Generate BibTeX entries for multiple papers.
|
|
|
|
Args:
|
|
paper_ids (list): List of arXiv IDs
|
|
|
|
Returns:
|
|
dict: Mapping of paper IDs to their BibTeX entries
|
|
"""
|
|
results = {}
|
|
client = create_arxiv_client()
|
|
|
|
for paper_id in paper_ids:
|
|
try:
|
|
bibtex = generate_bibtex(paper_id, client)
|
|
results[paper_id] = {
|
|
'success': bool(bibtex),
|
|
'bibtex': bibtex,
|
|
'error': None
|
|
}
|
|
except Exception as e:
|
|
results[paper_id] = {
|
|
'success': False,
|
|
'bibtex': '',
|
|
'error': str(e)
|
|
}
|
|
logger.error(f"Failed to generate BibTeX for {paper_id}: {e}")
|
|
|
|
return results
|
|
|
|
def extract_arxiv_ids_from_line(line):
|
|
"""
|
|
Extract the arXiv ID from a given line of text.
|
|
|
|
Args:
|
|
line (str): A line of text potentially containing an arXiv URL.
|
|
|
|
Returns:
|
|
str: The extracted arXiv ID, or None if not found.
|
|
"""
|
|
arxiv_id_pattern = re.compile(r'arxiv\.org\/abs\/(\d+\.\d+)(v\d+)?')
|
|
match = arxiv_id_pattern.search(line)
|
|
if match:
|
|
return match.group(1) + (match.group(2) if match.group(2) else '')
|
|
return None
|
|
|
|
def read_written_ids(file_path):
|
|
"""
|
|
Read already written arXiv IDs from a file.
|
|
|
|
Args:
|
|
file_path (str): Path to the file containing written IDs.
|
|
|
|
Returns:
|
|
set: A set of arXiv IDs.
|
|
"""
|
|
written_ids = set()
|
|
try:
|
|
with open(file_path, 'r', encoding="utf-8") as file:
|
|
for line in file:
|
|
written_ids.add(line.strip())
|
|
except FileNotFoundError:
|
|
logger.error(f"File not found: {file_path}")
|
|
except Exception as e:
|
|
logger.error(f"Error while reading the file: {e}")
|
|
return written_ids
|
|
|
|
def append_id_to_file(arxiv_id, output_file_path):
|
|
"""
|
|
Append a single arXiv ID to a file. Checks if the file exists and creates it if not.
|
|
|
|
Args:
|
|
arxiv_id (str): The arXiv ID to append.
|
|
output_file_path (str): Path to the output file.
|
|
"""
|
|
try:
|
|
if not os.path.exists(output_file_path):
|
|
logger.info(f"File does not exist. Creating new file: {output_file_path}")
|
|
with open(output_file_path, 'a', encoding="utf-8") as outfile:
|
|
outfile.write(arxiv_id + '\n')
|
|
else:
|
|
logger.info(f"Appending to existing file: {output_file_path}")
|
|
with open(output_file_path, 'a', encoding="utf-8") as outfile:
|
|
outfile.write(arxiv_id + '\n')
|
|
except Exception as e:
|
|
logger.error(f"Error while appending to file: {e}")
|