Features: AI Rich snippet from url, AI product description writer

This commit is contained in:
ajaysi
2024-07-17 12:00:27 +05:30
parent c923435be2
commit 44d83e2b81
19 changed files with 136 additions and 130 deletions

View File

@@ -0,0 +1,115 @@
import streamlit as st
import json
from ..gpt_providers.text_generation.main_text_generation import llm_text_gen
def generate_product_description(title, details, audience, tone, length, keywords):
"""
Generates a product description using OpenAI's API.
Args:
title (str): The title of the product.
details (list): A list of product details (features, benefits, etc.).
audience (list): A list of target audience segments.
tone (str): The desired tone of the description (e.g., "Formal", "Informal").
length (str): The desired length of the description (e.g., "short", "medium", "long").
keywords (str): Keywords related to the product (comma-separated).
Returns:
str: The generated product description.
"""
prompt = f"""
Write a compelling product description for {title}.
Highlight these key features: {', '.join(details)}
Emphasize the benefits of these features for the target audience ({audience}).
Maintain a {tone} tone and aim for a length of approximately {length} words.
Use these keywords naturally throughout the description: {', '.join(keywords)}.
Remember to be persuasive and focus on the value proposition.
"""
try:
response = llm_text_gen(prompt)
return response
except Exception as err:
logger.error(f"Exit: Failed to get response from LLM: {err}")
exit(1)
def display_inputs():
st.title("📝 AI Product Description Writer 🚀")
st.markdown("**Generate compelling and accurate product descriptions with AI.**")
col1, col2 = st.columns(2)
with col1:
product_title = st.text_input("🏷️ **Product Title**", placeholder="Enter the product title (e.g., Wireless Bluetooth Headphones)")
with col2:
product_details = st.text_area("📄 **Product Details**", placeholder="Enter features, benefits, specifications, materials, etc. (e.g., Noise Cancellation, Long Battery Life, Water Resistant, Comfortable Design)")
col3, col4 = st.columns(2)
with col3:
keywords = st.text_input("🔑 **Keywords**", placeholder="Enter keywords, comma-separated (e.g., wireless headphones, noise cancelling, Bluetooth 5.0)")
with col4:
target_audience = st.multiselect(
"🎯 **Target Audience**",
["Teens", "Adults", "Seniors", "Music Lovers", "Fitness Enthusiasts", "Tech Savvy", "Busy Professionals", "Travelers", "Casual Users"],
placeholder="Select target audience (optional)"
)
col5, col6 = st.columns(2)
with col5:
description_length = st.selectbox(
"📏 **Desired Description Length**",
["Short (1-2 sentences)", "Medium (3-5 sentences)", "Long (6+ sentences)"],
help="Select the desired length of the product description"
)
with col6:
brand_tone = st.selectbox(
"🎨 **Brand Tone**",
["Formal", "Informal", "Fun & Energetic"],
help="Select the desired tone for the description"
)
return product_title, product_details, target_audience, brand_tone, description_length, keywords
def display_output(description):
if description:
st.subheader("✨ Generated Product Description:")
st.write(description)
json_ld = {
"@context": "https://schema.org",
"@type": "Product",
"name": product_title,
"description": description,
"audience": target_audience,
"brand": {
"@type": "Brand",
"name": "Your Brand Name"
},
"keywords": keywords.split(", ")
}
def write_ai_prod_desc():
product_title, product_details, target_audience, brand_tone, description_length, keywords = display_inputs()
if st.button("Generate Product Description 🚀"):
with st.spinner("Generating description..."):
description = generate_product_description(
product_title,
product_details.split(", "), # Split details into a list
target_audience,
brand_tone,
description_length.split(" ")[0].lower(), # Extract length from selectbox
keywords
)
display_output(description)

View File

@@ -87,12 +87,6 @@ def facebook_post_writer():
)
with col2:
business_type = st.text_input(
"🏢 **What is your business type?**",
placeholder="e.g., Fitness coach",
help="Provide the type of your business. This will help tailor the post content."
)
post_tone_options = ["Informative", "Humorous", "Inspirational", "Upbeat", "Casual", "Customize"]
post_tone = st.selectbox(
"🎨 **What tone do you want to use?**",
@@ -108,6 +102,12 @@ def facebook_post_writer():
help="Provide a specific tone if you selected 'Customize'."
)
business_type = st.text_input(
"🏢 **What is your business type?**",
placeholder="e.g., Fitness coach",
help="Provide the type of your business. This will help tailor the post content."
)
avoid = st.text_input(
"❌ **What elements do you want to avoid?**",
placeholder="e.g., Long paragraphs",

View File

@@ -0,0 +1,39 @@
import sys
from .gpt_providers.openai_chat_completion import openai_chatgpt
from .gpt_providers.gemini_pro_text import gemini_text_response
from loguru import logger
logger.remove()
logger.add(sys.stdout,
colorize=True,
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
)
def github_readme_blog(readme_content):
""" """
prompt = f"""As an expert programmer and teacher, Write an original, detailed and step-by-step guide, from the provided Text below.
Your guide should be original, engaging and help beginners get started easily.
Write new example codes and detailed comments on how to run them. Include appropriate emoji where applicable.
Include a referances section that links to more code examples.
Your response MUST be a how-to blog in markdown format.
Respond ONLY with your blog content.
Text: '{readme_content}'
"""
if 'gemini' in gpt_providers:
try:
response = gemini_text_response(prompt)
return response
except Exception as err:
logger.error(f"Failed to get response from gemini: {err}")
sys.exit(1)
elif 'openai' in gpt_providers:
try:
logger.info("Calling OpenAI LLM.")
response = openai_chatgpt(prompt)
return response
except Exception as err:
SystemError(f"Failed to get response from Openai: {err}")

View File

@@ -0,0 +1,140 @@
""" Package for writing getting-started and how to guides. """
import os
import sys
import datetime
import json
from loguru import logger
logger.remove()
logger.add(sys.stdout,
colorize=True,
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
)
from .scrape_github_readme import get_gh_details_vision, get_readme_content
from .scrape_github_readme import research_github_topics, check_if_already_written
from .github_getting_started import github_readme_blog
from .gpt_online_researcher import do_online_research
from .faqs_generator_blog import generate_blog_faq
from .get_blog_metadata import blog_metadata
from .save_blog_to_file import save_blog_to_file
from .arxiv_schlorly_research import read_written_ids, extract_arxiv_ids_from_line, append_id_to_file
def blog_from_github(github_opts, flag):
""" Module for writing getting started code examples from github. """
if 'url' in flag:
try:
write_from_url(github_opts)
except Exception as err:
logger.error(f"Failed to write from github url: {github_opts}")
sys.exit(1)
elif 'csv' in flag:
try:
gh_urls = []
with open(github_opts, 'r', encoding="utf-8") as file:
# Read each line in the file
for gh_url in file:
gh_urls.append(gh_url.strip())
except FileNotFoundError:
logger.error(f"CSV File not found: {file_path}")
except Exception as e:
logger.error(f"CSV: An error occurred: {str(e)}")
for gh_url in gh_urls:
try:
write_from_url(gh_url.strip())
except Exception as err:
logger.error(f"Failed to write blog from github: {err}")
def write_from_url(gh_url):
# String to store the blog content.
howto_blog = ''
# The url was not found in already_written data.
if not check_if_already_written(gh_url):
logger.info(f"Writing getting started from url: {gh_url}")
else:
logger.error(f"Skipping, already written on url: {gh_url}")
return
# Direct link to the raw content of README file
# fixme: Remove the hardcoding, need add another option OR in config ?
image_dir = os.path.join(os.getcwd(), "blog_images")
generated_image_name = f"screenshot_image_{datetime.datetime.now():%Y-%m-%d-%H-%M-%S}.png"
generated_image_filepath = os.path.join(image_dir, generated_image_name)
try:
logger.info(f"Getting github repo details from vision model: {generated_image_filepath}")
gh_json = get_gh_details_vision(gh_url, generated_image_filepath)
except Exception as err:
logger.error(f"Failed to get gemini vision details from GH repo image: {err}")
sys.exit(1)
howto_blog = "```" + f"\nGithub URL:{gh_url}\nStars:{gh_json.get('stars')}\n"
howto_blog += f"Forks:{gh_json.get('forks')}\n"
howto_blog += f"Description:{gh_json.get('about')}\nBranch:{gh_json.get('branch_name')}\n" + "```\n\n"
raw_readme_url_base = "https://raw.githubusercontent.com/" + "/".join(gh_url.split("/")[-2:])
if gh_json.get('branch_name'):
raw_readme_url = raw_readme_url_base + f"/{gh_json.get('branch_name')}/" + "README.md"
else:
raw_readme_url = raw_readme_url_base + f"/main/" + "README.md"
logger.info(f"Using this url to fetch the README file: {raw_readme_url}")
try:
# Get and print the main content
readme_content = get_readme_content(raw_readme_url)
except Exception as err:
logger.error(f"Failed to get README from URL: {raw_readme_url}: {err}")
# If the readme is still None, try with master branch.
if not readme_content:
raw_readme_url = raw_readme_url_base + f"/master/" + "README.md"
logger.warning(f"Trying with master branch: {raw_readme_url}")
readme_content = get_readme_content(raw_readme_url)
if not readme_content:
logger.error(f"Still failed to get the README: {readme_content}")
sys.exit(1)
# Create a getting-started blog, adapted from the GH url README.
howto_blog += github_readme_blog(readme_content, "gemini")
# Do online research for faqs on the github url.
try:
# Repo names are misnomers for others search, include its decription too.
# Which, skews the result favourably towards its home/paid pages.
#online_query = f"{''.join(gh_url.split('/')[-1:])} " + gh_json.get('about')
online_query = f"{''.join(gh_url.split('/')[-1:])} "
logger.info("Do web research with Tavily & Metaphor AI.")
research_report = do_online_research(online_query, "gemini", gh_url)
except Exception as err:
logger.error(f"failed to do online research: {err}")
# Generate FAQs from the online research report.
try:
blog_faqs = generate_blog_faq(research_report, "gemini")
howto_blog += f"\n\n## {''.join(gh_url.split('/')[-1:])} FAQs\n\n" + blog_faqs
except Exception as err:
logger.error(f"Failed to generate FAQs from web research_report: {err}")
logger.info(f"\n\nFinal Blog Content: {howto_blog}\n\n")
try:
blog_title, blog_meta_desc, blog_tags, blog_categories = blog_metadata(howto_blog, "gemini")
except Exception as err:
logger.error(f"Failed to get blog metadata: {err}")
raise err
try:
save_blog_to_file(howto_blog, blog_title, blog_meta_desc, blog_tags,\
blog_categories, generated_image_filepath)
except Exception as err:
logger.error(f"Failed to save blog to a file: {err}")
sys.exit(1)
try:
append_id_to_file(gh_url, "papers_already_written_on.txt")
except Exception as err:
logger.error(f"Failed to write/append ID to papers_already_written_on.txt: {err}")
raise err

View File

@@ -0,0 +1,297 @@
import os
import sys
import datetime
import pandas as pd
import json
import requests
from bs4 import BeautifulSoup
from loguru import logger
logger.remove()
logger.add(sys.stdout,
colorize=True,
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
)
from .take_url_screenshot import take_screenshot
from .gpt_providers.gemini_image_details import gemini_get_img_info
def get_readme_content(url):
try:
# Fetch the README content directly from the URL
response = requests.get(url)
print(response.status_code)
if response.status_code == 200:
logger.debug("Successfully fetched the README.md")
readme_content = response.text
else:
readme_content = None
return readme_content
except Exception as err:
logger.error(f"Failed to fetch raw readme from {url}: {err}: {response.status_code}")
sys.exit(1)
def get_gh_repo_metadata(github_url):
""" Function to get the repo details like stars, commits, forks etc """
logger.info("Scraping github with BS4 and requests.")
# download the target page
page = requests.get(github_url)
# parse the HTML document returned by the server
soup = BeautifulSoup(page.text, 'html.parser')
# initialize the object that will contain the scraped data
repo = {}
# repo scraping logic
name_html_element = soup.select_one('[itemprop="name"]')
name = name_html_element.get_text().strip()
git_branch_icon_html_element = soup.select_one('.octicon-git-branch')
main_branch_html_element = git_branch_icon_html_element.find_next_sibling('span')
main_branch = main_branch_html_element.get_text().strip()
# scrape the repo history data
boxheader_html_element = soup.select_one('.Box .Box-header')
# scrape the repo details in the right box
bordergrid_html_element = soup.select_one('.BorderGrid')
about_html_element = bordergrid_html_element.select_one('h2')
description_html_element = about_html_element.find_next_sibling('p')
description = description_html_element.get_text().strip()
star_icon_html_element = bordergrid_html_element.select_one('.octicon-star')
stars_html_element = star_icon_html_element.find_next_sibling('strong')
stars = stars_html_element.get_text().strip().replace(',', '')
eye_icon_html_element = bordergrid_html_element.select_one('.octicon-eye')
watchers_html_element = eye_icon_html_element.find_next_sibling('strong')
watchers = watchers_html_element.get_text().strip().replace(',', '')
fork_icon_html_element = bordergrid_html_element.select_one('.octicon-repo-forked')
forks_html_element = fork_icon_html_element.find_next_sibling('strong')
forks = forks_html_element.get_text().strip().replace(',', '')
# Find the div with class "f6" containing topic links
topic_div = soup.find('div', class_='f6')
if topic_div:
# Find all the topic links within the div
topic_links = topic_div.find_all('a', class_='topic-tag-link')
# Extract and print the topics
repo['topics'] = [link.text.strip() for link in topic_links]
# FIXME: Unable to scrape branch name.
repo['branch_name'] = None
# store the scraped data
repo['name'] = name
repo['about'] = description
repo['stars'] = stars
repo['watchers'] = watchers
repo['forks'] = forks
#repo['readme'] = readme
logger.info(f"Github Repo Details: {repo}")
return(repo)
def get_gh_details_vision(github_url, generated_image_filepath):
""" Take a screenshot of the url and feed to vision models for scraping details. """
logger.info(f"Take screenshot and pass it to gemini for repo details of {github_url}")
generated_image_filepath = take_screenshot(github_url, generated_image_filepath)
prompt = """From the given image of a github page, find out the number of stars, about, forks, last commit days, link url, topics and branch name. Return the result as json."""
try:
gh_details = gemini_get_img_info(prompt, generated_image_filepath)
logger.info(f"Github Repo details, from vision model: {gh_details}")
#gh_details = get_gh_repo_metadata(github_url)
except Exception as err:
logger.error(f"Failed to get gh images details: {err}")
gh_details = get_gh_repo_metadata(github_url)
return gh_details
# Convert string to dictionary Split the string into lines
lines = gh_details.split('\n')
# Remove the first and last line
modified_lines = lines[1:-1]
# Join the modified lines back into a string
gh_details = '\n'.join(modified_lines)
gh_details = json.loads(gh_details)
return(gh_details)
def research_github_topics(topics):
""" Scrape github topics of interest for top repos to write on """
# https://www.kaggle.com/code/subhaskumarray/scraping-github-topics-with-their-repositories
# We are going to scrape https://github.com/topics
# We will get a list of topics. For each topic, we will extract topic name, topic description and topic url.
# For each topic, we will get top 30 repositories with repo name, repo username, stars and repo url.
# Finally we are going to create csv file for each topic with respective repo details.
#github_topics = "https://github.com/topics/"
#response = requests.get(github_topics)
#if response.status_code != 200:
# logger.error(f'There is something wrong with {url}')
#response_contents = response.text
# Now we will parse the contents using BeautifulSoup:
#parsed_contents = BeautifulSoup(response_contents,'html.parser')
#logger.info("Get all topics, Titles and their urls from github.")
#topic_titles = get_topic_titles(parsed_contents)
#topic_desc = get_topic_desc(parsed_contents)
#topic_urls = get_topic_url(parsed_contents)
#topic_df = pd.DataFrame(list(zip(topic_titles, topic_desc,topic_urls)),\
# columns =['title', 'description', 'url'])
#logger.info(f"Scraped data from github: {topic_df}")
gh_topics = ['ai', 'ai-tools', 'ai-assistant', 'ai-agents-framework', 'llm', 'multi-agent', 'fine-tuning', 'rag', 'generative', 'prompt-engineering', 'generative-ai', 'text-to-image-generation', 'llm-ops', 'retrieval-augmented-generation', 'langchain', 'gemini-api', 'vertex-ai', 'huggingface', 'auto-gpt', 'llmops', 'ai-toolkit', 'chatbot', 'chatgpt', 'code-assistant', 'text-to-video', 'llms', 'gpt-4']
repo_info_dict = {
'username':[],
'repo_name': [],
'stars': [],
'repo_url': []
}
for agh_topic in gh_topics:
topic_url = f"https://github.com/topics/{agh_topic}"
first_topic_repo_page = download_repo_page(topic_url)
logger.info(f"Get details on github topic: {topic_url}")
repo_tags = first_topic_repo_page.find_all('h3', {'class': 'f3 color-fg-muted text-normal lh-condensed'})
star_tags = first_topic_repo_page.find_all('span', {'class': 'Counter js-social-count'})
for i in range(len(repo_tags)):
repo_details = get_repo_info(repo_tags[i], star_tags[i])
# Check if the repo URL is not already present in the dictionary
if repo_details[3] not in repo_info_dict['repo_url']:
# Store repos with more than 5000 stars.
if repo_details[2] > 5000:
repo_info_dict['username'].append(repo_details[0])
repo_info_dict['repo_name'].append(repo_details[1])
repo_info_dict['stars'].append(repo_details[2])
repo_info_dict['repo_url'].append(repo_details[3])
# Create a DataFrame from repo_info_dict
df_repo_info = pd.DataFrame(repo_info_dict['repo_url'])
# Check if the file already exists
csv_filename = 'github_url_to_write.csv'
if os.path.isfile(csv_filename):
# Append to the existing file
df_repo_info.to_csv(csv_filename, mode='a', header=False, index=False)
logger.info(f"Data appended to existing file: {csv_filename}")
else:
# Create a new file
df_repo_info.to_csv(csv_filename, index=False)
def get_topic_titles(parsed_content):
try:
selected_class = 'f3 lh-condensed mb-0 mt-1 Link--primary'
topic_title_tags = parsed_content.find_all('p',{'class':selected_class})
# We can make a list of topics
topic_titles = []
for tags in topic_title_tags:
topic_titles.append(tags.text)
return topic_titles
except Exception as err:
logger.error(f"Failed to get github topic titles: {err}")
def get_topic_desc(parsed_contents):
try:
desc_selector = 'f5 color-fg-muted mb-0 mt-1'
topic_desc_tags = parsed_contents.find_all('p',{'class': desc_selector})
print(f"{topic_desc_tags}")
topic_desc = []
for desc in topic_desc_tags:
print("dsfsfs")
topic_desc.append(desc.text.strip()) # strip() is used for trimming all extra spaces in description.
return topic_desc
except Exception as err:
logger.error(f"Failed to get github topic desc: {err}")
def get_topic_url(parsed_contents):
try:
topic_link_tag = parsed_contents.find_all('a',{'class':'no-underline flex-1 d-flex flex-column'})
topic_urls = []
base_url = 'http://github.com'
for urls in topic_link_tag:
topic_urls.append(base_url + urls['href'])
return topic_urls
except Exception as err:
logger.error(f"Failed to get github topic urls: {err}")
def download_repo_page(topic_url):
response = requests.get(topic_url)
if response.status_code != 200:
print('There is some error in {}'.format(topic_url))
response_contents = response.text
parsed_contents = BeautifulSoup(response_contents,'html.parser')
return parsed_contents
def get_repo_info(repo_tags,star_tags):
# returns all info for a repo
a_tags = repo_tags.find_all('a')
username = a_tags[0].text.strip()
repo_name = a_tags[1].text.strip()
base_url = 'http://github.com/'
repo_url = base_url + a_tags[1]['href'].strip()
# Defining a function so that it will convert our star count to integer
def star_counts_converter(stars):
stars = stars.strip()
if stars[-1] == 'k':
return int(float(stars[:-1]) * 1000)
return int(stars)
star_counts = star_counts_converter(star_tags.text.strip())
return username,repo_name,star_counts,repo_url
def save_to_csv(topic_url,topic_name):
file_name = topic_name + '.csv'
if os.path.exists(file_name):
logger.debug(f"The file {file_name} already exists. Skipping.")
topics_df = topic_repo_details(topic_url)
topics_df.to_csv(file_name,index=None)
logger.info(f"Successfully scraped topic {topic_name}")
def check_if_already_written(github_url, file_path='papers_already_written_on.txt'):
"""
Check if a GitHub URL is an exact match in each line of a file.
Args:
github_url (str): GitHub URL string to check.
file_path (str): Path to the file containing lines to check against. Default is 'papers_already_written_on.txt'.
Returns:
bool: True if an exact match is found, False otherwise.
"""
try:
with open(file_path, 'r', encoding="utf-8") as file:
# Read each line in the file
for line in file:
# Check for an exact match
if github_url.strip() == line.strip():
return True
except FileNotFoundError:
print(f"File not found: {file_path}")
except Exception as e:
print(f"An error occurred: {str(e)}")
return False

View File

@@ -0,0 +1,202 @@
import sys
import os
import datetime
import tiktoken
from .arxiv_schlorly_research import fetch_arxiv_data, create_dataframe, get_arxiv_main_content
from .arxiv_schlorly_research import arxiv_bibtex, scrape_images_from_arxiv, download_image
from .arxiv_schlorly_research import read_written_ids, extract_arxiv_ids_from_line, append_id_to_file
from .write_research_review_blog import review_research_paper
from .combine_research_and_blog import blog_with_research
from .write_blog_scholar_paper import write_blog_from_paper
from .gpt_providers.gemini_pro_text import gemini_text_response
from .generate_image_from_prompt import generate_image
from .convert_content_to_markdown import convert_tomarkdown_format
from .get_blog_metadata import blog_metadata
from .get_code_examples import gemini_get_code_samples
from .save_blog_to_file import save_blog_to_file
from .take_url_screenshot import screenshot_api
from loguru import logger
logger.remove()
logger.add(sys.stdout,
colorize=True,
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
)
def blog_arxiv_keyword(query):
""" Write blog on given arxiv paper."""
arxiv_id = None
arxiv_url = None
bibtex = None
research_review = None
column_names = ['Title', 'Date', 'Id', 'Summary', 'PDF URL']
papers = fetch_arxiv_data(query)
df = create_dataframe(papers, column_names)
for paper in papers:
# Extracting the arxiv_id
arxiv_id = paper[2].split('/')[-1]
arxiv_url = "https://browse.arxiv.org/html/" + arxiv_id
bibtex = arxiv_bibtex(arxiv_id)
logger.info(f"Get research paper text from the url: {arxiv_url}")
research_content = get_arxiv_main_content(arxiv_url)
num_tokens = num_tokens_from_string(research_content, "cl100k_base")
logger.info(f"Number of tokens sent: {num_tokens}")
# If the number of tokens is below the threshold, process and print the review
if 1000 < num_tokens < 30000:
logger.info(f"Writing research review on {paper[0]}")
research_review = review_research_paper(research_content)
research_review = f"\n{research_review}\n\n" + f"```{bibtex}```"
#research_review = research_review + "\n\n\n" + f"{df.to_markdown()}"
research_review = convert_tomarkdown_format(research_review, "gemini")
break
else:
# Skip to the next iteration if the condition is not met
continue
logger.info(f"Final scholar article: \n\n{research_review}\n")
# TBD: Scrape images from research reports and pass to vision to get conclusions out of it.
#image_urls = scrape_images_from_arxiv(arxiv_url)
#print("Downloading images found on the page:")
#for img_url in image_urls:
# download_image(img_url, arxiv_url)
try:
blog_postprocessing(arxiv_id, research_review)
except Exception as err:
logger.error(f"Failed in blog post processing: {err}")
sys.exit(1)
logger.info(f"\n\n ################ Finished writing Blog for : #################### \n")
def blog_arxiv_url_list(file_path):
""" Write blogs on all the arxiv links given in a file. """
extracted_ids = []
try:
with open(file_path, 'r', encoding="utf-8") as file:
for line in file:
arxiv_id = extract_arxiv_ids_from_line(line)
if arxiv_id:
extracted_ids.append(arxiv_id)
except FileNotFoundError:
logger.error(f"File not found: {file_path}")
raise FileNotFoundError
except Exception as e:
logger.error(f"Error while reading the file: {e}")
raise e
# Read already written IDs
written_ids = read_written_ids('papers_already_written_on.txt')
# Loop through extracted IDs
for arxiv_id in extracted_ids:
if arxiv_id not in written_ids:
# This ID has not been written on yet
arxiv_url = "https://browse.arxiv.org/html/" + arxiv_id
logger.info(f"Get research paper text from the url: {arxiv_url}")
research_content = get_arxiv_main_content(arxiv_url)
try:
num_tokens = num_tokens_from_string(research_content, "cl100k_base")
except Exception as err:
logger.error(f"Failed in counting tokens: {err}")
sys.exit(1)
logger.info(f"Number of tokens sent: {num_tokens}")
# If the number of tokens is below the threshold, process and print the review
# FIXME: Docs over 30k tokens, need to be chunked and summarized.
if 1000 < num_tokens < 30000:
try:
logger.info(f"Getting bibtex for arxiv ID: {arxiv_id}")
bibtex = arxiv_bibtex(arxiv_id)
except Exception as err:
logger.error(f"Failed to get Bibtex: {err}")
try:
logger.info(f"Writing a research review..")
research_review = review_research_paper(research_content, "gemini")
logger.info(f"Research Review: \n{research_review}\n\n")
except Exception as err:
logger.error(f"Failed to write review on research paper: {arxiv_id}{err}")
research_blog = write_blog_from_paper(research_content, "gemini")
logger.info(f"\n\nResearch Blog: {research_blog}\n\n")
research_blog = f"\n{research_review}\n\n" + f"```\n{bibtex}\n```"
#research_review = blog_with_research(research_review, research_blog, "gemini")
#logger.info(f"\n\n\nBLOG_WITH_RESEARCh: {research_review}\n\n\n")
research_review = convert_tomarkdown_format(research_review, "gemini")
research_review = f"\n{research_review}\n\n" + f"```{bibtex}```"
logger.info(f"Final blog from research paper: \n\n{research_review}\n\n\n")
try:
blog_postprocessing(arxiv_id, research_review)
except Exception as err:
logger.error(f"Failed in blog post processing: {err}")
sys.exit(1)
logger.info(f"\n\n ################ Finished writing Blog for : #################### \n")
else:
# Skip to the next iteration if the condition is not met
logger.error("FIXME: Docs over 30k tokens, need to be chunked and summarized.")
continue
else:
logger.warning(f"Already written, skip writing on Arxiv paper ID: {arxiv_id}")
def blog_postprocessing(arxiv_id, research_review):
""" Common function to do blog postprocessing. """
try:
append_id_to_file(arxiv_id, "papers_already_written_on.txt")
except Exception as err:
logger.error(f"Failed to write/append ID to papers_already_written_on.txt: {err}")
raise err
try:
blog_title, blog_meta_desc, blog_tags, blog_categories = blog_metadata(research_review)
except Exception as err:
logger.error(f"Failed to get blog metadata: {err}")
raise err
try:
arxiv_url_scrnsht = f"https://arxiv.org/abs/{arxiv_id}"
generated_image_filepath = take_paper_screenshot(arxiv_url_scrnsht)
except Exception as err:
logger.error(f"Failed to tsk paper screenshot: {err}")
raise err
try:
save_blog_to_file(research_review, blog_title, blog_meta_desc, blog_tags,\
blog_categories, generated_image_filepath)
except Exception as err:
logger.error(f"Failed to save blog to a file: {err}")
sys.exit(1)
def take_paper_screenshot(arxiv_url):
""" Common function to take paper screenshot. """
# fixme: Remove the hardcoding, need add another option OR in config ?
image_dir = os.path.join(os.getcwd(), "blog_images")
generated_image_name = f"generated_image_{datetime.datetime.now():%Y-%m-%d-%H-%M-%S}.png"
generated_image_filepath = os.path.join(image_dir, generated_image_name)
if arxiv_url:
try:
generated_image_filepath = screenshot_api(arxiv_url, generated_image_filepath)
except Exception as err:
logger.error(f"Failed in taking url screenshot: {err}")
return generated_image_filepath
def num_tokens_from_string(string, encoding_name):
"""Returns the number of tokens in a text string."""
try:
encoding = tiktoken.get_encoding(encoding_name)
num_tokens = len(encoding.encode(string))
return num_tokens
except Exception as err:
logger.error(f"Failed to count tokens: {err}")
sys.exit(1)

View File

@@ -0,0 +1,49 @@
import sys
from .gpt_providers.openai_chat_completion import openai_chatgpt
from .gpt_providers.gemini_pro_text import gemini_text_response
from loguru import logger
logger.remove()
logger.add(sys.stdout,
colorize=True,
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
)
def write_blog_from_paper(paper_content):
""" Write blog from given paper url. """
prompt = f"""As an expert in NLP and AI, I will provide you with a content of a research paper.
Your task is to write a highly detailed blog(at least 2000 words), breaking down complex concepts for beginners.
Take your time and do not rush to respond.
Do not provide explanations, suggestions in your response.
Include the below section in your blog:
Highlights: Include a list of 5 most important and unique claims of the given research paper.
Abstract: Start by reading the abstract, which provides a concise summary of the research, including its purpose, methodology, and key findings.
Introduction: This section will give you background information and set the context for the research. It often ends with a statement of the research question or hypothesis.
Methodology: Include description of how authors conducted the research. This can include data sources, experimental setup, analytical techniques, etc.
Results: This section presents the data or findings of the research. Pay attention to figures, tables, and any statistical analysis provided.
Discussion/Analysis: In this section, Explain how research paper answers the research questions or how they fit with existing knowledge.
Conclusion: This part summarizes the main findings and their implications. It might also suggest areas for further research.
References: The cited works can provide additional context or background reading.
Remember, Please use MLA format and markdown syntax.
Do not provide description, explanations for your response.
Take your time in crafting your blog content, do not rush to give the response.
Using the blog structure above, please write a detailed and original blog on given research paper: \n'{paper_content}'\n\n"""
if 'gemini' in gpt_providers:
try:
response = gemini_text_response(prompt)
return response
except Exception as err:
logger.error(f"Failed to get response from gemini: {err}")
raise err
elif 'openai' in gpt_providers:
try:
logger.info("Calling OpenAI LLM.")
response = openai_chatgpt(prompt)
return response
except Exception as err:
logger.error(f"failed to get response from Openai: {err}")
raise err

View File

@@ -0,0 +1,89 @@
import sys
from .gpt_providers.openai_chat_completion import openai_chatgpt
from .gpt_providers.gemini_pro_text import gemini_text_response
from .gpt_providers.mistral_chat_completion import mistral_text_response
from loguru import logger
logger.remove()
logger.add(sys.stdout,
colorize=True,
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
)
def review_research_paper(research_blog):
""" """
prompt = f"""As world's top researcher and academician, I will provide you with research paper.
Your task is to write a highly detailed review report.
Important, your report should be factual, original and demostrate your expertise.
Review guidelines:
1). Read the Abstract and Introduction Carefully:
Begin by thoroughly reading the abstract and introduction of the paper.
Try to understand the research question, the objectives, and the background information.
Identify the central argument or hypothesis that the study is examining.
2). Examine the Methodology and Methods:
Read closely at the research design, whether it is experimental, observational, qualitative, or a combination of methods.
Check the sampling strategy and the size of the sample.
Review the methods of data collection and the instruments used for this purpose.
Think about any ethical issues and possible biases in the study.
3). Analyze the Results and Discussion:
Review how the results are presented, including any tables, graphs, and statistical analysis.
Evaluate the findings' validity and reliability.
Analyze whether the results support or contradict the research question and hypothesis.
Read the discussion section where the authors interpret their findings and their significance.
4). Consider the Limitations and Strengths:
Spot any limitations or potential weaknesses in the study.
Evaluate the strengths and contributions that the research makes.
Think about how generalizable the findings are to other populations or situations.
5). Assess the Writing and Organization:
Judge the clarity and structure of the report.
Consider the use of language, grammar, and the overall formatting.
Assess how well the arguments are logically organized and how coherent the report is.
6). Evaluate the Literature Review:
Examine how comprehensive and relevant the literature review is.
Consider how the study adds to or builds upon existing research.
Evaluate the timeliness and quality of the sources cited in the research.
7). Review the Conclusion and Implications:
Look at the conclusions drawn from the study and how well they align with the findings.
Think about the practical implications and potential applications of the research.
Evaluate the suggestions for further research or policy actions.
8). Overall Assessment:
Formulate an overall opinion about the research report's quality and thoroughness.
Consider the significance and impact of the findings.
Evaluate how the study contributes to its field of research.
9). Provide Constructive Feedback:
Offer constructive criticism and suggestions for improvement, where necessary.
Think about possible biases or alternative ways to interpret the findings.
Suggest ideas for future research or for replicating the study.
Do not provide description, explanations for your response.
Using the above review guidelines, write a detailed review report on the below research paper.
Research Paper: '{research_blog}'
"""
if 'gemini' in gpt_providers:
try:
response = gemini_text_response(prompt)
return response
except Exception as err:
logger.error(f"Failed to get response from gemini: {err}")
response = mistral_text_response(prompt)
return response
elif 'openai' in gpt_providers:
try:
logger.info("Calling OpenAI LLM.")
response = openai_chatgpt(prompt)
return response
except Exception as err:
SystemError(f"Failed to get response from Openai: {err}")