WIP000.1- AI content writer
This commit is contained in:
5
ai_stratups.csv
Normal file
5
ai_stratups.csv
Normal file
@@ -0,0 +1,5 @@
|
||||
Company,URL,Focus Areas,keyword
|
||||
Codiga,https://www.codiga.io/,Coding,Code Snippets and Code Analysis
|
||||
Mutable AI,https://mutable.ai/,Coding,Build fast with production quality using AI
|
||||
Replit Ghostwriter,https://replit.com/,Coding,Accelerate your coding with AI assistance and mobile app
|
||||
Stenography,https://stenography.dev/,Coding,Finally. Automatic Documentation.
|
||||
|
@@ -1 +0,0 @@
|
||||
# This file makes the `lib` directory a Python package
|
||||
@@ -1,8 +1,12 @@
|
||||
def blog_proof_editor(blog_content, blog_keywords):
|
||||
from .gpt_providers.gemini_pro_text import gemini_text_response
|
||||
from .gpt_providers.openai_chat_completion import openai_chatgpt
|
||||
|
||||
|
||||
def blog_proof_editor(blog_content, blog_keywords, gpt_provider="openai"):
|
||||
"""
|
||||
Helper for blog proof reading.
|
||||
"""
|
||||
prompt = """I am looking for detailed editing and enhancement of the given blog post,
|
||||
prompt = f"""I am looking for detailed editing and enhancement of the given blog post,
|
||||
with a particular focus on maintaining originality.
|
||||
The topic of the content is [{blog_keywords}]. Please go through the blog and make direct edits to improve it,
|
||||
ensuring the final output is both high-quality and original.
|
||||
@@ -22,12 +26,18 @@ def blog_proof_editor(blog_content, blog_keywords):
|
||||
8). Refine Overall Structure: Make structural changes to improve the overall impact of the content.
|
||||
9). Remember, rewrite all content that repeated, while maintaining the formatting of the given blog text.
|
||||
|
||||
Please apply these changes directly to the following blog text and provide the edited version:
|
||||
[blog_content]. """
|
||||
Please apply these changes directly to the following blog post and provide the edited version:\n
|
||||
'{blog_content}'. """
|
||||
|
||||
try:
|
||||
# TBD: Add logic for which_provider and which_model
|
||||
response = openai_chatgpt(prompt)
|
||||
return response
|
||||
except Exception as err:
|
||||
SystemError(f"Error Blog Proof Reading: {err}")
|
||||
if 'openai' in gpt_provider:
|
||||
try:
|
||||
response = openai_chatgpt(prompt)
|
||||
return response
|
||||
except Exception as err:
|
||||
SystemError(f"Openai Error Blog Proof Reading: {err}")
|
||||
elif 'gemini' in gpt_provider:
|
||||
try:
|
||||
response = gemini_text_response(prompt)
|
||||
return response
|
||||
except Exception as err:
|
||||
SystemError(f"Gemini Error Blog Proof Reading: {err}")
|
||||
|
||||
@@ -1,31 +1,72 @@
|
||||
from .gpt_providers.openai_chat_completion import openai_chatgpt
|
||||
from .gpt_providers.gemini_pro_text import gemini_text_response
|
||||
|
||||
|
||||
def convert_tomarkdown_format(blog_content):
|
||||
def convert_tomarkdown_format(blog_content, gpt_provider="openai"):
|
||||
""" Helper for converting content to markdown format for static sites. """
|
||||
prompt = f"""
|
||||
As an expert in markdown language format and font matter, used for static webpages.
|
||||
Your task is to convert and improve formatting of given blog content.
|
||||
Do Not modify the content, only modify to convert it into highly readable blog content.
|
||||
As an expert in markdown language format and font matter,
|
||||
I will provide you with a blog post.
|
||||
Your task is to improve formatting of given blog post.
|
||||
|
||||
Use below guidelines and include other best practises:
|
||||
1). Headers for Structure: Use # for main headings and increase the number of # for
|
||||
subheadings (##, ###, etc.). Organize given content into clear, hierarchical sections.
|
||||
2). Emphasizing Text: Use single asterisks or underscores for italic (*italic* or _italic_),
|
||||
double for bold (**bold** or __bold__), and triple for bold italic (***bold italic***).
|
||||
3). Lists: For unordered lists, use dashes, asterisks, or plus signs (-, *, +).
|
||||
For ordered lists, use numbers followed by periods (1., 2., etc.).
|
||||
4). Blockquotes: Use > for blockquotes, and add additional > for nested blockquotes.
|
||||
5). Code Blocks: Use backticks for inline code (code) and triple backticks for code blocks.
|
||||
Specify a language for syntax highlighting.
|
||||
6). Horizontal Lines: Create a horizontal line using three or more asterisks, dashes, or underscores (---, ***).
|
||||
7). Table Formatting: Use pipes | and dashes - to create tables. Align text with colons.
|
||||
8). Remember to use suitable emojis for the given blog content.
|
||||
Use below guidelines to do formatting, structuring to make it highly readable:
|
||||
1. **Headings for Structure:**
|
||||
- Use # for the main title of the blog post.
|
||||
- Use ## for subheadings that divide the post into clear sections.
|
||||
- Use ###, ####, etc. for additional subheadings as needed.
|
||||
- Keep the headings concise and descriptive.
|
||||
|
||||
Convert the given blog content in well organised markdown content: {blog_content}"""
|
||||
try:
|
||||
# TBD: Add logic for which_provider and which_model
|
||||
response = openai_chatgpt(prompt)
|
||||
return response
|
||||
except Exception as err:
|
||||
SystemError(f"Error in converting to Markdown format.")
|
||||
2. **Emphasizing Text:**
|
||||
- Use * or _ for italicizing important words or phrases.
|
||||
- Use ** or __ for bolding key points.
|
||||
- Use *** or ___ for bold italicizing very important text.
|
||||
- Use sparingly to avoid overwhelming the reader.
|
||||
|
||||
3. **Lists:**
|
||||
- Use - or * for unordered lists.
|
||||
- Use 1., 2., etc. for ordered lists.
|
||||
- Keep list items concise and to the point.
|
||||
- Use consistent formatting for all lists.
|
||||
|
||||
4. **Blockquotes:**
|
||||
- Use > to indent and highlight quotes or important information.
|
||||
- Use additional > for nested blockquotes.
|
||||
- Attribute quotes to their original source if applicable.
|
||||
|
||||
5. **Code Blocks:**
|
||||
- Use backticks ` for inline code.
|
||||
- Use triple backticks ``` for code blocks.
|
||||
- Specify the language of the code block for syntax highlighting, e.g., ```python```.
|
||||
- Use code blocks to display code snippets or technical information.
|
||||
|
||||
6. **Horizontal Lines:**
|
||||
- Use three or more asterisks, dashes, or underscores to create a horizontal line, e.g., ***, ---, or ___
|
||||
- Use horizontal lines to separate different sections of the blog post.
|
||||
|
||||
7. **Table Formatting:**
|
||||
- Use pipes | and dashes - to create tables.
|
||||
- Align text within columns using colons :.
|
||||
- Use tables to present data or information in a structured format.
|
||||
|
||||
8. **Other Best Practices:**
|
||||
- Use emojis sparingly and appropriately to add visual interest and enhance the reader's experience.
|
||||
- Proofread carefully for any errors in grammar, spelling, or formatting.
|
||||
- Keep the blog post organized and easy to navigate.
|
||||
- Use a consistent formatting style throughout the post.
|
||||
|
||||
Dont provide explanations, just your final response.
|
||||
Convert the given blog post in well organised markdown content:\n
|
||||
Blog Post: '{blog_content}'"""
|
||||
|
||||
if 'openai' in gpt_provider:
|
||||
try:
|
||||
response = openai_chatgpt(prompt)
|
||||
return response
|
||||
except Exception as err:
|
||||
SystemError(f"Openai Error in converting to Markdown format.")
|
||||
elif 'gemini' in gpt_provider:
|
||||
try:
|
||||
response = gemini_text_response(prompt)
|
||||
return response
|
||||
except Exception as err:
|
||||
SystemError(f"Gemini Error in converting to Markdown format.")
|
||||
|
||||
@@ -1,16 +1,36 @@
|
||||
def get_blog_categories(blog_article):
|
||||
import sys
|
||||
|
||||
from .gpt_providers.openai_gpt_provider import openai_chatgpt
|
||||
from .gpt_providers.gemini_pro_text import gemini_text_response
|
||||
|
||||
from loguru import logger
|
||||
logger.remove()
|
||||
logger.add(sys.stdout,
|
||||
colorize=True,
|
||||
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
|
||||
)
|
||||
|
||||
|
||||
def get_blog_categories(blog_article, gpt_providers):
|
||||
"""
|
||||
Function to generate blog categories for given blog content.
|
||||
"""
|
||||
prompt = f"""As an expert SEO and content writer, I will provide you with blog content.
|
||||
Suggest only 2 blog categories which are most relevant to provided blog content,
|
||||
by identifying the main topic. Also consider the target audience and the
|
||||
blog's category taxonomy. Only reply with comma separated values. The blog content is: {blog_article}"
|
||||
"""
|
||||
try:
|
||||
# TBD: Add logic for which_provider and which_model
|
||||
response = openai_chatgpt(prompt)
|
||||
except Exception as err:
|
||||
SystemError(f"Error in generating blog categories: {err}")
|
||||
else:
|
||||
return response
|
||||
blog's category taxonomy. Only reply with comma separated values.
|
||||
The blog content is: '{blog_article}'"
|
||||
"""
|
||||
logger.info("Generating blog categories for the given blog.")
|
||||
if 'gemini' in gpt_providers:
|
||||
try:
|
||||
response = gemini_text_response(prompt)
|
||||
return response
|
||||
except Exception as err:
|
||||
logger.error(f"Failed to get response from gemini: {err}")
|
||||
elif 'openai' in gpt_providers:
|
||||
try:
|
||||
response = openai_chatgpt(prompt)
|
||||
return response
|
||||
except Exception as err:
|
||||
SystemError(f"Error in generating blog get_blog_categories: {err}")
|
||||
|
||||
@@ -1,26 +1,37 @@
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from .gpt_providers.openai_chat_completion import openai_chatgpt
|
||||
from .gpt_providers.gemini_pro_text import gemini_text_response
|
||||
import google.generativeai as genai
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(Path('../.env'))
|
||||
|
||||
from loguru import logger
|
||||
logger.remove()
|
||||
logger.add(sys.stdout,
|
||||
colorize=True,
|
||||
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
|
||||
)
|
||||
|
||||
|
||||
def generate_blog_description(blog_content, gpt_providers):
|
||||
"""
|
||||
Prompt designed to give SEO optimized blog descripton
|
||||
"""
|
||||
logger.info("Generating Blog Meta Description for the given blog.")
|
||||
prompt = f"""As an expert SEO and blog writer, Compose a compelling meta description for the given blog content,
|
||||
adhering to SEO best practices. Keep it between 150-160 characters.
|
||||
Provide a glimpse of the content's value to entice readers.
|
||||
Respond with only one of your best effort and do not include your explanations.
|
||||
Blog Content: {blog_content}"""
|
||||
Blog Content: '{blog_content}'"""
|
||||
|
||||
if 'gemini' in gpt_providers:
|
||||
try:
|
||||
genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
|
||||
except Exception as err:
|
||||
logger.error("Failed in getting GEMINI_API_KEY")
|
||||
# Use gemini-pro model for text and image.
|
||||
model = genai.GenerativeModel('gemini-pro')
|
||||
try:
|
||||
response = model.generate_content(prompt)
|
||||
return response.text
|
||||
response = gemini_text_response(prompt)
|
||||
return response
|
||||
except Exception as err:
|
||||
logger.error("Failed to get response from gemini.")
|
||||
elif 'openai' in gpt_providers:
|
||||
|
||||
@@ -1,32 +1,38 @@
|
||||
import sys
|
||||
|
||||
from .gpt_providers.openai_chat_completion import openai_chatgpt
|
||||
import google.generativeai as genai
|
||||
from .gpt_providers.gemini_pro_text import gemini_text_response
|
||||
|
||||
from loguru import logger
|
||||
logger.remove()
|
||||
logger.add(sys.stdout,
|
||||
colorize=True,
|
||||
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
|
||||
)
|
||||
|
||||
|
||||
def generate_blog_title(blog_meta_desc, gpt_providers):
|
||||
def generate_blog_title(blog_article, gpt_providers="openai"):
|
||||
"""
|
||||
Given a blog title generate an outline for it
|
||||
"""
|
||||
prompt = f"""As a SEO expert and content writer, I will provide you with meta description of blog.
|
||||
logger.info("Generating blog title.")
|
||||
prompt = f"""As a SEO expert, I will provide you with a blog content.
|
||||
Your task is write a SEO optimized, call to action and engaging blog title for it.
|
||||
Follows SEO best practises to suggest the blog title.
|
||||
Please keep the titles concise, not exceeding 60 words, and ensure to maintain their meaning.
|
||||
Respond with only one title and no description or keyword like Title:
|
||||
Generate blog title for this given meta description: {blog_meta_desc}
|
||||
"""
|
||||
if 'gemini' in gpt_providers:
|
||||
Please keep the titles concise, not exceeding 60 words.
|
||||
Respond with only one title and no explanations.
|
||||
Important: Your response should be in plaintext.
|
||||
Generate blog title for this given blog content:\n '{blog_article}' """
|
||||
|
||||
if 'gemini' in gpt_providers:
|
||||
try:
|
||||
genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
|
||||
response = gemini_text_response(prompt)
|
||||
return response
|
||||
except Exception as err:
|
||||
logger.error("Failed in getting GEMINI_API_KEY")
|
||||
# Use gemini-pro model for text and image.
|
||||
model = genai.GenerativeModel('gemini-pro')
|
||||
try:
|
||||
response = model.generate_content(prompt)
|
||||
return response.text
|
||||
except Exception as err:
|
||||
logger.error("Failed to get response from gemini.")
|
||||
logger.error(f"Failed to get response from gemini: {err}")
|
||||
elif 'openai' in gpt_providers:
|
||||
try:
|
||||
logger.info("Calling OpenAI LLM.")
|
||||
response = openai_chatgpt(prompt)
|
||||
return response
|
||||
except Exception as err:
|
||||
|
||||
@@ -1,47 +1,37 @@
|
||||
"""
|
||||
At the command line, only need to run once to install the package via pip:
|
||||
|
||||
$ pip install google-generativeai
|
||||
"""
|
||||
from .gpt_providers.gemini_pro_text import gemini_text_response
|
||||
|
||||
import google.generativeai as genai
|
||||
|
||||
genai.configure(api_key="YOUR_API_KEY")
|
||||
def gemini_get_code_samples(blog_article):
|
||||
""" Provide a programming blog and get code exmaples."""
|
||||
prompt = f"""As an expert programmer and copywriter, I will provide you with blog article.
|
||||
Your task is to research and write one code example for the given blog article.
|
||||
Do not include your explanations in response.
|
||||
Blog Article: '{blog_article}' """
|
||||
try:
|
||||
code_sample = gemini_text_response(prompt)
|
||||
response = combine_blog_code_sample(blog_article, code_sample)
|
||||
return response
|
||||
except Exception as err:
|
||||
raise ValueError(f"Failed to get response from Gemini pro: {err}")
|
||||
|
||||
# Set up the model
|
||||
generation_config = {
|
||||
"temperature": 1,
|
||||
"top_p": 1,
|
||||
"top_k": 1,
|
||||
"max_output_tokens": 2048,
|
||||
}
|
||||
|
||||
safety_settings = [
|
||||
{
|
||||
"category": "HARM_CATEGORY_HARASSMENT",
|
||||
"threshold": "BLOCK_MEDIUM_AND_ABOVE"
|
||||
},
|
||||
{
|
||||
"category": "HARM_CATEGORY_HATE_SPEECH",
|
||||
"threshold": "BLOCK_MEDIUM_AND_ABOVE"
|
||||
},
|
||||
{
|
||||
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
|
||||
"threshold": "BLOCK_MEDIUM_AND_ABOVE"
|
||||
},
|
||||
{
|
||||
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
|
||||
"threshold": "BLOCK_MEDIUM_AND_ABOVE"
|
||||
}
|
||||
]
|
||||
def combine_blog_code_sample(blog_article, code_sample):
|
||||
""" Include the code sample into the given blog. """
|
||||
prompt = """You are expert document editor, I will provide you blog article and a code sample.
|
||||
Your task is to edit the given blog article to include the code sample after the introduction section.
|
||||
Do not modify the content of the given blog article. Your response should include the whole blog_article with
|
||||
the code sample added to it.
|
||||
Adopt the formatting of the given blog article. Do not include explanations of your response.
|
||||
Edit the given blog to include the code sample in it.
|
||||
Blog Article: {blog_article}\n
|
||||
Code sample: {code_sample}\n"""
|
||||
|
||||
model = genai.GenerativeModel(model_name="gemini-pro",
|
||||
generation_config=generation_config,
|
||||
safety_settings=safety_settings)
|
||||
|
||||
prompt_parts = [
|
||||
"As an expert programmer and web researcher, I will provide you with blog content. Your task is to understand the blog content and do web research around the main keywords. Check if the blog content is about programming then provide me with original code examples, relevant to the blog content. The provided code example should be of high coding standards, include docstring and follow pep8 standards. Do not provide explanations for your response.\nBlog content: \"\"\" {blog_content} \"\"\"\n ",
|
||||
]
|
||||
|
||||
response = model.generate_content(prompt_parts)
|
||||
print(response.text)
|
||||
try:
|
||||
response = gemini_text_response(prompt)
|
||||
return response
|
||||
except Exception as err:
|
||||
raise ValueError(f"Failed to combine blog and code: {err}")
|
||||
|
||||
@@ -1,5 +1,14 @@
|
||||
import sys
|
||||
|
||||
from .gpt_providers.openai_chat_completion import openai_chatgpt
|
||||
import google.generativeai as genai
|
||||
from .gpt_providers.gemini_pro_text import gemini_text_response
|
||||
|
||||
from loguru import logger
|
||||
logger.remove()
|
||||
logger.add(sys.stdout,
|
||||
colorize=True,
|
||||
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
|
||||
)
|
||||
|
||||
|
||||
def get_blog_tags(blog_article, gpt_providers):
|
||||
@@ -10,17 +19,11 @@ def get_blog_tags(blog_article, gpt_providers):
|
||||
prompt = f"""As an expert SEO and blog writer, suggest only 2 relevant and specific blog tags
|
||||
for the given blog content. Only reply with comma separated values.
|
||||
Blog content: {blog_article}."""
|
||||
|
||||
if 'gemini' in gpt_providers:
|
||||
logger.info("Generating Blog tags for the given blog post.")
|
||||
if 'gemini' in gpt_providers:
|
||||
try:
|
||||
genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
|
||||
except Exception as err:
|
||||
logger.error("Failed in getting GEMINI_API_KEY")
|
||||
# Use gemini-pro model for text and image.
|
||||
model = genai.GenerativeModel('gemini-pro')
|
||||
try:
|
||||
response = model.generate_content(prompt)
|
||||
return response.text
|
||||
response = gemini_text_response(prompt)
|
||||
return response
|
||||
except Exception as err:
|
||||
logger.error("Failed to get response from gemini.")
|
||||
elif 'openai' in gpt_providers:
|
||||
|
||||
@@ -34,7 +34,7 @@ from .get_tags import get_blog_tags
|
||||
from .get_blog_category import get_blog_categories
|
||||
from .convert_content_to_markdown import convert_tomarkdown_format
|
||||
from .convert_markdown_to_html import convert_markdown_to_html
|
||||
|
||||
from .utils.youtube_keyword_research import research_yt
|
||||
from loguru import logger
|
||||
logger.remove()
|
||||
logger.add(sys.stdout,
|
||||
@@ -57,12 +57,13 @@ wordpress_url = ''
|
||||
wordpress_username = ''
|
||||
wordpress_password = ''
|
||||
|
||||
|
||||
def generate_youtube_blog(yt_url_list, output_format="markdown"):
|
||||
"""Takes a list of youtube videos and generates blog for each one of them.
|
||||
"""
|
||||
# Use to store the blog in a string, to save in a *.md file.
|
||||
blog_markdown_str = ""
|
||||
if isinstance(yt_url_list, str):
|
||||
yt_url_list = [yt_url_list]
|
||||
for a_yt_url in yt_url_list:
|
||||
try:
|
||||
logger.info(f"Starting to write blog on URL: {a_yt_url}")
|
||||
@@ -89,8 +90,8 @@ def generate_youtube_blog(yt_url_list, output_format="markdown"):
|
||||
except Exception as e:
|
||||
logger.error(f"Error in do_online_research: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
# Note: Check if the order of input matters for your function
|
||||
logger.info("Preparing a blog content from audio script and online research content...")
|
||||
blog_markdown_str = blog_with_research(research_report, blog_markdown_str)
|
||||
logger.warning("\n\n--------------- Second Blog Draft after online research: --------\n\n")
|
||||
@@ -102,7 +103,9 @@ def generate_youtube_blog(yt_url_list, output_format="markdown"):
|
||||
|
||||
try:
|
||||
# Get the title and meta description of the blog.
|
||||
logger.info("Generating Blog Description.")
|
||||
blog_meta_desc = generate_blog_description(blog_markdown_str, "gemini")
|
||||
logger.info("Generating Blog Title.")
|
||||
title = generate_blog_title(blog_meta_desc, "gemini")
|
||||
logger.info(f"Title is {title} and description is {blog_meta_desc}")
|
||||
# Regex pattern to match 'Title:', 'title:', 'TITLE:', etc., followed by optional whitespace
|
||||
@@ -110,9 +113,9 @@ def generate_youtube_blog(yt_url_list, output_format="markdown"):
|
||||
#blog_markdown_str = "# " + title.replace('"', '') + "\n\n"
|
||||
|
||||
# Get blog tags and categories.
|
||||
blog_tags = get_blog_tags(blog_meta_desc)
|
||||
blog_tags = get_blog_tags(blog_meta_desc, "gemini")
|
||||
logger.info(f"Blog tags are: {blog_tags}")
|
||||
blog_categories = get_blog_categories(blog_meta_desc)
|
||||
blog_categories = get_blog_categories(blog_meta_desc, "gemini")
|
||||
logger.info(f"Blog categories are: {blog_categories}")
|
||||
|
||||
# Generate an introduction for the blog
|
||||
@@ -171,7 +174,7 @@ def generate_youtube_blog(yt_url_list, output_format="markdown"):
|
||||
|
||||
except Exception as e:
|
||||
# raise assertionerror
|
||||
logger.info(f"Error: Failed to generate_youtube_blog: {e}")
|
||||
logger.error(f"Error: Failed to generate_youtube_blog: {e}")
|
||||
exit(1)
|
||||
|
||||
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
# Not using it, as they wanted phone verification done.
|
||||
|
||||
import os
|
||||
import serpapi
|
||||
import csv
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
@@ -17,22 +14,4 @@ result = client.search(
|
||||
gl="us",
|
||||
)
|
||||
|
||||
print(result)
|
||||
print(result['organic_results'])
|
||||
print(result["search_information"]["total_results"]) # Get number of results available
|
||||
print(result["related_questions"]) # Get all the related questions
|
||||
|
||||
|
||||
organic_results = result["organic_results"]
|
||||
with open('output.csv', 'w', newline='') as csvfile:
|
||||
csv_writer = csv.writer(csvfile)
|
||||
|
||||
# Write the headers
|
||||
csv_writer.writerow(["Title", "Link", "Snippet"])
|
||||
|
||||
# Write the data
|
||||
for result in organic_results:
|
||||
csv_writer.writerow([result["title"], result["link"], result["snippet"]])
|
||||
|
||||
|
||||
print('Done writing to CSV file.')
|
||||
|
||||
@@ -17,10 +17,18 @@
|
||||
##############################################################
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
import logging
|
||||
|
||||
from tavily import TavilyClient
|
||||
import serpapi
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(Path('../.env'))
|
||||
|
||||
from langchain.adapters.openai import convert_openai_messages
|
||||
from langchain.chat_models import ChatOpenAI
|
||||
import google.generativeai as genai
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s-%(levelname)s-%(module)s-%(lineno)d-%(message)s')
|
||||
from tenacity import (
|
||||
@@ -29,9 +37,26 @@ from tenacity import (
|
||||
wait_random_exponential,
|
||||
) # for exponential backoff
|
||||
|
||||
from .gpt_providers.gemini_pro_text import gemini_text_response
|
||||
from .blog_proof_reader import blog_proof_editor
|
||||
from .convert_content_to_markdown import convert_tomarkdown_format
|
||||
|
||||
|
||||
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
|
||||
def do_online_research(query):
|
||||
def do_online_research(query, gpt_provider="openai"):
|
||||
# Do a google search for the given keyword. The search results will give urls, questions for faq
|
||||
faq_questions = []
|
||||
organic_results = []
|
||||
report = ''
|
||||
try:
|
||||
faq_questions = google_search(query, "faq")
|
||||
logging.info(f"Google search FAQ questions: {faq_questions}")
|
||||
# Now, get top 10 google organic results and polish the content to compete for these keywords.
|
||||
organic_results = google_search(query, "organic_result")
|
||||
except Exception as err:
|
||||
logging.error(f"Failed to do Serpapi research: {err}")
|
||||
# Not failing, as tavily would do same and then GPT-V to search.
|
||||
#exit(1)
|
||||
try:
|
||||
# Retrieve API keys
|
||||
api_key = os.getenv('TAVILY_API_KEY')
|
||||
@@ -45,7 +70,6 @@ def do_online_research(query):
|
||||
except Exception as err:
|
||||
logging.error("Failed to create Tavily client. Check TAVILY_API_KEY")
|
||||
exit(1)
|
||||
|
||||
# Run tavily search
|
||||
logging.info(f"Running Tavily search on: {query}")
|
||||
try:
|
||||
@@ -54,30 +78,104 @@ def do_online_research(query):
|
||||
logging.error(f"Failed to do Tavily Research: {err}")
|
||||
exit(1)
|
||||
|
||||
# Setup prompt for GPT-4
|
||||
prompt = [{
|
||||
"role": "system",
|
||||
"content": ('You are an AI critical thinker research assistant. '
|
||||
if "gemini" in gpt_provider:
|
||||
prompt = ["You are an AI critical thinker research assistant."
|
||||
"I will provide you with json content and a list of faq questions."
|
||||
"Use given json as context for writing your research report."
|
||||
"Your sole purpose is to write well written, critically acclaimed, objective and structured research report"
|
||||
"Important: Include and write code examples in your final report."
|
||||
"Include your own insights on the topic to make it comprehensive and detailed."
|
||||
"Use the urls from json content to provide cititations and include it in referances section of your report."
|
||||
"Include appropriate emojis in your research report."
|
||||
"Include FAQs relevant to your research report. Use the given faq questions. Write answers for each faq."
|
||||
"Format your report in MLA format and markdown style, with special focus on readibility."
|
||||
f"Do not provide explanations for your response.\njson content: \"\"\" {content} \"\"\"\n "
|
||||
f"\nList of FAQ questions: \"\"\" {faq_questions} \"\"\"\n"]
|
||||
report = gemini_text_response(prompt)
|
||||
|
||||
elif "openai" in gpt_provider:
|
||||
# Setup prompt for GPT-4
|
||||
prompt = [{
|
||||
"role": "system",
|
||||
"content": ('You are an AI critical thinker research assistant. '
|
||||
'Your sole purpose is to write well written, critically acclaimed, '
|
||||
'objective and structured reports on given text.')
|
||||
}, {
|
||||
"role": "user",
|
||||
"content": (f'Information: """{content}"""\n\n'
|
||||
}, {
|
||||
"role": "user",
|
||||
"content": (f'Information: """{content}"""\n\n'
|
||||
f'Using the above information, answer the following '
|
||||
f'query: "{query}" in a detailed report --'
|
||||
f'Please use MLA format and markdown syntax.')
|
||||
}]
|
||||
# Run GPT-4
|
||||
logging.info("Generating Research report with GPT-4...")
|
||||
lc_messages = convert_openai_messages(prompt)
|
||||
try:
|
||||
report = ChatOpenAI(model='gpt-4', openai_api_key=openai_api_key).invoke(lc_messages).content
|
||||
#logging.info(f"\n Below is the online research report for given keywords/title: \n\n{report}")
|
||||
return report
|
||||
except Exception as err:
|
||||
logging.error("Failed to generate do_online_research with ChatOpenAI")
|
||||
exit(1)
|
||||
|
||||
}]
|
||||
report = openai_research_report(prompt)
|
||||
report = compete_organic_results(query, report, organic_results)
|
||||
return report
|
||||
except Exception as e:
|
||||
logging.error(f"Failed in online research: {e}")
|
||||
exit(1)
|
||||
|
||||
|
||||
def openai_research_report(query):
|
||||
""" Generate research report with openai """
|
||||
# Run GPT-4
|
||||
logging.info("Generating Research report with GPT-4...")
|
||||
lc_messages = convert_openai_messages(prompt)
|
||||
try:
|
||||
report = ChatOpenAI(model='gpt-4', openai_api_key=openai_api_key).invoke(lc_messages).content
|
||||
#logging.info(f"\n Below is the online research report for given keywords/title: \n\n{report}")
|
||||
return report
|
||||
except Exception as err:
|
||||
logging.error("Failed to generate do_online_research with ChatOpenAI")
|
||||
exit(1)
|
||||
|
||||
|
||||
def compete_organic_results(query, report, organic_results):
|
||||
""" Given a blog content and google search organinc results, create a new blog to compete against them."""
|
||||
prompt = f""" As an SEO expert and copywriter, I will provide you with my blog content on topic '{query}', and
|
||||
Top google search results.
|
||||
Your task is to rewrite the given blog to make it compete against top position results.
|
||||
Make sure, the new blog has high probability of ranking highest against given organic search result competitors.
|
||||
Modify the given blog content following best SEO practises.
|
||||
Make sure the blog is original, unique and highly readable.
|
||||
Remember, Maintain and adopt the formatting, structure, style and tone of the provided blog content.
|
||||
Include relevant emojis in your final blog for visual appeal. Use it sparingly.
|
||||
Your response should be well-structured, objective, and critically acclaimed blog article based on provided texts.
|
||||
|
||||
Remember, your goal is to create a detailed blog article that will compete against given organic result competitors.
|
||||
Do not provide explanations, suggestions for your response, reply only with your final response.
|
||||
Take your time in crafting your content, do not rush to give the response.
|
||||
Blog Content: '{report}'\n
|
||||
Organic Search result: '{organic_results}'
|
||||
"""
|
||||
report = gemini_text_response(prompt)
|
||||
return report
|
||||
|
||||
|
||||
def google_search(query, flag="faq"):
|
||||
""" Do google search for given query """
|
||||
try:
|
||||
api_key = os.getenv('SERPAPI_KEY')
|
||||
client = serpapi.Client(api_key=api_key)
|
||||
result = client.search(
|
||||
q=query,
|
||||
engine="google",
|
||||
hl="en",
|
||||
)
|
||||
except Exception as err:
|
||||
logging.error(f"Failed in Google Search: {err}")
|
||||
exit(1)
|
||||
if 'faq' in flag:
|
||||
# Check if 'inline_people_also_search_for' and 'related_questions' exist in result
|
||||
related_search = [item['title'] for item in result.get('inline_people_also_search_for', [])]
|
||||
related_questions = [item['question'] for item in result.get('related_questions', [])]
|
||||
|
||||
# Determine which list to use for faq_questions
|
||||
if not related_search and not related_questions:
|
||||
faq_questions = [item['query'] for item in result.get('related_searches', [])]
|
||||
else:
|
||||
faq_questions = related_search + related_questions
|
||||
return faq_questions
|
||||
|
||||
elif 'organic_result' in flag:
|
||||
# Check if 'organic_results' exists in result
|
||||
return result.get('organic_results', [])
|
||||
|
||||
38
lib/gpt_providers/gemini_pro_text.py
Normal file
38
lib/gpt_providers/gemini_pro_text.py
Normal file
@@ -0,0 +1,38 @@
|
||||
# Using Gemini Pro LLM model
|
||||
import os
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import google.generativeai as genai
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s-%(levelname)s-%(module)s-%(lineno)d-%(message)s')
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(Path('../../.env'))
|
||||
|
||||
from tenacity import (
|
||||
retry,
|
||||
stop_after_attempt,
|
||||
wait_random_exponential,
|
||||
) # for exponential backoff
|
||||
|
||||
|
||||
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
|
||||
def gemini_text_response(prompt):
|
||||
""" Provide a programming blog and get code exmaples."""
|
||||
genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
|
||||
|
||||
# Set up the model
|
||||
generation_config = {
|
||||
"temperature": 1,
|
||||
"top_p": 1,
|
||||
"top_k": 1,
|
||||
"max_output_tokens": 4096,
|
||||
}
|
||||
|
||||
model = genai.GenerativeModel(model_name="gemini-pro", generation_config=generation_config)
|
||||
try:
|
||||
response = model.generate_content(prompt)
|
||||
except Exception as err:
|
||||
logger.error(f"Failed to get response from Gemini: {err}. Retrying..")
|
||||
gemini_research_report(query)
|
||||
|
||||
return response.text
|
||||
@@ -51,11 +51,3 @@ def generate_dalle3_images(img_prompt, image_dir, size="1024x1024", quality="hd"
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to generate images with Dalle3: {e}")
|
||||
sys.exit("Exiting due to a general error in image generation.")
|
||||
|
||||
# Example usage
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
image_path = generate_dalle3_images("A futuristic cityscape", "/path/to/image/dir")
|
||||
print(f"Image generated and saved at: {image_path}")
|
||||
except SystemExit as e:
|
||||
print(f"Terminated: {e}")
|
||||
|
||||
@@ -24,16 +24,6 @@ logger.add(sys.stdout,
|
||||
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
|
||||
)
|
||||
|
||||
# fixme: Remove the hardcoding, need add another option OR in config ?
|
||||
image_dir = "blog_images"
|
||||
image_dir = os.path.join(os.getcwd(), image_dir)
|
||||
# TBD: This can come from config file.
|
||||
output_path = "blogs"
|
||||
output_path = os.path.join(os.getcwd(), output_path)
|
||||
wordpress_url = 'https://latestaitools.in/'
|
||||
wordpress_username = 'upaudel750'
|
||||
wordpress_password = 'YvCS VbzQ QSp8 4XZe 0DUw Myys'
|
||||
|
||||
|
||||
def generate_youtube_blog(yt_url_list, output_format="markdown"):
|
||||
"""Takes a list of youtube videos and generates blog for each one of them.
|
||||
|
||||
@@ -1,113 +1,70 @@
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
import datetime
|
||||
|
||||
from .gpt_providers.openai_chat_completion import openai_chatgpt
|
||||
import google.generativeai as genai
|
||||
from .gpt_providers.gemini_pro_text import gemini_text_response
|
||||
from .gpt_online_researcher import do_online_research
|
||||
from .get_blog_meta_desc import generate_blog_description
|
||||
from .get_tags import get_blog_tags
|
||||
from .get_blog_category import get_blog_categories
|
||||
from .get_blog_title import generate_blog_title
|
||||
from .get_code_examples import gemini_get_code_samples
|
||||
from .save_blog_to_file import save_blog_to_file
|
||||
from .take_url_screenshot import screenshot_api
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(Path('../.env'))
|
||||
|
||||
from loguru import logger
|
||||
logger.remove()
|
||||
logger.add(sys.stdout,
|
||||
colorize=True,
|
||||
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
|
||||
)
|
||||
|
||||
|
||||
def generate_detailed_blog(num_blogs, blog_keywords, niche, num_subtopics,
|
||||
wordpress=False, research_online=False, output_format="HTML"):
|
||||
def generate_keyword_blog(blog_keywords, url=None, output_format="markdown"):
|
||||
"""
|
||||
This function will take a blog Topic to first generate sections for it
|
||||
and then generate content for each section.
|
||||
"""
|
||||
# Use to store the blog in a string, to save in a *.md file.
|
||||
blog_markdown_str = ""
|
||||
|
||||
# TBD: Check if the generated topics are equal to what user asked.
|
||||
blog_topic_arr = generate_blog_topics(blog_keywords, num_blogs, niche)
|
||||
logger.info(f"Generated Blog Topics:---- \n{blog_topic_arr}\n")
|
||||
# Split the string at newlines
|
||||
blog_topic_arr = blog_topic_arr.split('\n')
|
||||
|
||||
# For each of blog topic, generate content.
|
||||
for a_blog_topic in blog_topic_arr:
|
||||
# if md/html
|
||||
a_blog_topic = a_blog_topic.replace('"', '')
|
||||
a_blog_topic = re.sub(r'^[\d.\s]+', '', a_blog_topic)
|
||||
blog_markdown_str = "# " + a_blog_topic + "\n\n"
|
||||
|
||||
# Get the introduction specific to blog title and sub topics.
|
||||
tpc_outlines = generate_topic_outline(a_blog_topic, num_subtopics)
|
||||
tpc_outlines = tpc_outlines.split("\n")
|
||||
|
||||
blog_intro = get_blog_intro(a_blog_topic, tpc_outlines)
|
||||
logger.info(f"The intro is:\n{blog_intro}")
|
||||
blog_markdown_str = blog_markdown_str + "### Introduction" + "\n\n" + f"{blog_intro}" + "\n\n"
|
||||
|
||||
# Now, for each blog we have sub topic. Generate content for each of the sub topic.
|
||||
for a_outline in tpc_outlines:
|
||||
a_outline = a_outline.replace('"', '')
|
||||
logger.info(f"Generating content for sub-topic: {a_outline}")
|
||||
sub_topic_content = generate_topic_content(blog_keywords, a_outline)
|
||||
# a_outline is sub topic heading, hence part ToC also.
|
||||
#blog_markdown_str = blog_markdown_str + "\n\n" + f"### {a_outline}" + "\n\n"
|
||||
blog_markdown_str = blog_markdown_str + "\n" + f"\n {sub_topic_content}" + "\n\n"
|
||||
|
||||
# Get the Conclusion of the blog, by passing the generated blog.
|
||||
blog_conclusion = get_blog_conclusion(blog_markdown_str)
|
||||
blog_markdown_str = blog_markdown_str + "### Conclusion" + "\n" + f"{blog_conclusion}" + "\n"
|
||||
for akeyword in blog_keywords:
|
||||
logger.info(f"Researching and Writing Blog on keywords: {akeyword}")
|
||||
# Use to store the blog in a string, to save in a *.md file.
|
||||
blog_markdown_str = ""
|
||||
|
||||
# Call on the got-researcher, tavily apis for this. Do google search for organic competition.
|
||||
blog_markdown_str = do_online_research(akeyword, "gemini")
|
||||
# logger.info/check the final blog content.
|
||||
logger.info(f"Final blog content: {blog_markdown_str}")
|
||||
|
||||
#if research_online:
|
||||
# # Call on the got-researcher, tavily apis for this. So many apis floating around.
|
||||
# report = do_online_research_on(blog_keywords)
|
||||
# blog_markdown_str = blog_with_research(report, blog_markdown_str)
|
||||
|
||||
blog_meta_desc = generate_blog_description(blog_markdown_str)
|
||||
logger.info(f"\nThe blog meta description is:{blog_meta_desc}\n")
|
||||
|
||||
# Generate an image based on meta description
|
||||
logger.info(f"Calling Image generation with prompt: {blog_meta_desc}")
|
||||
main_img_path = generate_image(blog_meta_desc, image_dir, "dalle3")
|
||||
|
||||
blog_tags = get_blog_tags(blog_markdown_str)
|
||||
logger.info(f"\nBlog tags for generated content: {blog_tags}\n")
|
||||
|
||||
blog_categories = get_blog_categories(blog_markdown_str)
|
||||
blog_title = generate_blog_title(blog_markdown_str, "gemini")
|
||||
blog_meta_desc = generate_blog_description(blog_markdown_str, "gemini")
|
||||
logger.info(f"The blog meta description is: {blog_meta_desc}\n")
|
||||
blog_tags = get_blog_tags(blog_markdown_str, "gemini")
|
||||
logger.info(f"Blog tags for generated content: {blog_tags}")
|
||||
blog_categories = get_blog_categories(blog_markdown_str, "gemini")
|
||||
logger.info(f"Generated blog categories: {blog_categories}\n")
|
||||
|
||||
# Use chatgpt to convert the text into HTML or markdown.
|
||||
if 'html' in output_format:
|
||||
blog_markdown_str = convert_markdown_to_html(blog_markdown_str)
|
||||
#blog_markdown_str = gemini_get_code_samples(blog_markdown_str)
|
||||
#logger.info(f"Blog with code sample: \n {blog_markdown_str}")
|
||||
|
||||
# Check if blog needs to be posted on wordpress.
|
||||
if wordpress:
|
||||
# Fixme: Fetch all tags and categories to check, if present ones are present and
|
||||
# use them else create new ones. Its better to use chatgpt than string comparison.
|
||||
# Similar tags and categories will be missed.
|
||||
# blog_categories =
|
||||
# blog_tags =
|
||||
logger.info("Uploading the blog to wordpress.\n")
|
||||
main_img_path = compress_image(main_img_path, quality=85)
|
||||
# fixme: Remove the hardcoding, need add another option OR in config ?
|
||||
image_dir = os.path.join(os.getcwd(), "blog_images")
|
||||
generated_image_name = f"generated_image_{datetime.datetime.now():%Y-%m-%d-%H-%M-%S}.png"
|
||||
generated_image_filepath = os.path.join(image_dir, generated_image_name)
|
||||
# Generate an image based on meta description
|
||||
#logger.info(f"Calling Image generation with prompt: {blog_meta_desc}")
|
||||
#main_img_path = generate_image(blog_meta_desc, image_dir, "dalle3")
|
||||
if url:
|
||||
try:
|
||||
img_details = analyze_and_extract_details_from_image(main_img_path)
|
||||
alt_text = img_details.get('alt_text')
|
||||
img_description = img_details.get('description')
|
||||
img_title = img_details.get('title')
|
||||
caption = img_details.get('caption')
|
||||
try:
|
||||
media = upload_media(wordpress_url, wordpress_username, wordpress_password,
|
||||
main_img_path, alt_text, img_description, img_title, caption)
|
||||
except Exception as err:
|
||||
sys.exit(f"Error occurred in upload_media: {err}")
|
||||
except Exception as e:
|
||||
sys.exit(f"Error occurred in analyze_and_extract_details_from_image: {e}")
|
||||
|
||||
# Then create the post with the uploaded media as the featured image
|
||||
media_id = media['id']
|
||||
blog_markdown_str = convert_markdown_to_html(blog_markdown_str)
|
||||
try:
|
||||
upload_blog_post(wordpress_url, wordpress_username, wordpress_password, a_blog_topic,
|
||||
blog_markdown_str, media_id, blog_meta_desc, blog_categories, blog_tags, status='publish')
|
||||
generated_image_filepath = screenshot_api(url, generated_image_filepath)
|
||||
except Exception as err:
|
||||
sys.exit(f"Failed to upload blog to wordpress.Error: {err}")
|
||||
|
||||
logger.error(f"Failed in taking compnay page screenshot: {err}")
|
||||
# TBD: Save the blog content as a .md file. Markdown or HTML ?
|
||||
save_blog_to_file(blog_markdown_str,
|
||||
a_blog_topic,
|
||||
blog_meta_desc, blog_tags,
|
||||
blog_categories, main_img_path)
|
||||
save_blog_to_file(blog_markdown_str, blog_title, blog_meta_desc, blog_tags, blog_categories, generated_image_filepath)
|
||||
|
||||
# Now, we need perform some *basic checks on the blog content, such as:
|
||||
# is_content_ai_generated.py, plagiarism_checker_from_known_sources.py
|
||||
# seo_analyzer.py . These are present in the lib folder.
|
||||
# prompt: Rewrite, improve and paraphrase [text] and use headings and subheadings
|
||||
# to break up the content and make it easier to read using the keyword [keyword].
|
||||
logger.info(f"\n\n ################ Finished writing Blog for : {akeyword} #################### \n")
|
||||
|
||||
150
lib/main_youtube_research_blog.py
Normal file
150
lib/main_youtube_research_blog.py
Normal file
@@ -0,0 +1,150 @@
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from loguru import logger
|
||||
|
||||
# Import from local packages
|
||||
from .gpt_providers.openai_chat_completion import openai_chatgpt
|
||||
from .gpt_providers.gpt_vision_img_details import analyze_and_extract_details_from_image
|
||||
from .generate_image_from_prompt import generate_image
|
||||
from .write_blogs_from_youtube_videos import youtube_to_blog
|
||||
from .wordpress_blog_uploader import compress_image, upload_blog_post, upload_media
|
||||
from .gpt_online_researcher import do_online_research
|
||||
from .save_blog_to_file import save_blog_to_file
|
||||
from .optimize_images_for_upload import optimize_image
|
||||
from .combine_research_and_blog import blog_with_research
|
||||
from .get_blog_meta_desc import generate_blog_description
|
||||
from .get_blog_title import generate_blog_title
|
||||
from .get_tags import get_blog_tags
|
||||
from .get_blog_category import get_blog_categories
|
||||
from .convert_content_to_markdown import convert_tomarkdown_format
|
||||
from .convert_markdown_to_html import convert_markdown_to_html
|
||||
from .utils.youtube_keyword_research import research_yt
|
||||
|
||||
# Configuring the logger
|
||||
logger.remove()
|
||||
logger.add(sys.stdout, colorize=True, format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}")
|
||||
|
||||
# Constants for directory paths
|
||||
IMAGE_DIR = os.path.join(os.getcwd(), "blog_images")
|
||||
OUTPUT_PATH = os.path.join(os.getcwd(), "blogs")
|
||||
|
||||
|
||||
def generate_youtube_research_blog(yt_keywords):
|
||||
"""
|
||||
Research YouTube based on given keywords and get top video URLs.
|
||||
"""
|
||||
for ayt_keyword in yt_keywords:
|
||||
yt_research_response = ''
|
||||
data = {}
|
||||
logger.info(f"Researching YouTube top videos for: {yt_keywords}")
|
||||
try:
|
||||
yt_research_response = research_yt(ayt_keyword)
|
||||
if not yt_research_response:
|
||||
yt_research_response = research_yt(ayt_keyword)
|
||||
except Exception as err:
|
||||
logger.error(f"Failed to do YouTube Research: {err}")
|
||||
|
||||
if not yt_research_response.strip():
|
||||
logger.warning("Error: JSON data is empty.")
|
||||
yt_research_response = research_yt(ayt_keyword)
|
||||
else:
|
||||
try:
|
||||
aggregated_data = load_response_json(yt_research_response, ayt_keyword)
|
||||
except Exception as err:
|
||||
logger.error(f"Failed to load json response: {err}")
|
||||
sys.exit(1)
|
||||
|
||||
for title, a_yt_url, views, references, quickstart_code in zip(
|
||||
aggregated_data["titles"], aggregated_data["urls"], aggregated_data["views"],
|
||||
aggregated_data["references"], aggregated_data["quickstart_codes"]):
|
||||
blog_markdown_str = ""
|
||||
if a_yt_url != "No URL Provided":
|
||||
# Transcribe the audio using whisper model.
|
||||
try:
|
||||
logger.info(f"Starting to write blog on URL: {a_yt_url}")
|
||||
blog_markdown_str, yt_title = youtube_to_blog(a_yt_url)
|
||||
logger.warning("\n\n--------------- First Draft of the Blog: --------\n\n")
|
||||
logger.info(f"{blog_markdown_str}\n")
|
||||
logger.warning("--------------------END of First draft----------\n\n")
|
||||
if not yt_title or not blog_markdown_str:
|
||||
logger.error("No content or title for audio to proceed.")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in youtube_to_blog: {e}")
|
||||
sys.exit(1)
|
||||
sys.exit(1)
|
||||
|
||||
if title != "Unknown Title":
|
||||
print(f"Title: {title}")
|
||||
if url != "No URL Provided":
|
||||
print(f"URL: {url}")
|
||||
if views != "No View Count":
|
||||
print(f"Views: {views}")
|
||||
if references: # Checks if references list is not empty
|
||||
print(f"References: {', '.join(references)}")
|
||||
if quickstart_code != "Code coming soon":
|
||||
print(f"Quickstart Code: {quickstart_code}")
|
||||
print() # Adds a newline for separation between entries
|
||||
|
||||
|
||||
|
||||
def load_response_json(yt_research_response, yt_keyword):
|
||||
"""
|
||||
Load and parse the YouTube research response JSON.
|
||||
"""
|
||||
try:
|
||||
logger.info(f"Loading the JSON data for parsing: {yt_research_response}")
|
||||
data = json.loads(yt_research_response.replace('`', '').strip())
|
||||
|
||||
if isinstance(data, dict):
|
||||
results_key = next((key for key in data if key.lower().startswith("result")), None)
|
||||
if results_key:
|
||||
research_yt_dict = process_results(data[results_key])
|
||||
elif isinstance(data, list):
|
||||
research_yt_dict = process_results(data)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"load_response_json: Failed to parse JSON data: {e}")
|
||||
generate_youtube_research_blog([yt_keyword])
|
||||
|
||||
return research_yt_dict
|
||||
|
||||
|
||||
def process_results(results):
|
||||
"""
|
||||
Process the results from the YouTube research JSON and return the aggregated data.
|
||||
|
||||
Args:
|
||||
results (list): List of dictionaries containing YouTube video details.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary containing lists of titles, URLs, views, references, and quickstart codes.
|
||||
|
||||
Raises:
|
||||
Exception: If an error occurs during the processing of individual entries.
|
||||
"""
|
||||
titles = []
|
||||
urls = []
|
||||
views_list = []
|
||||
references_list = []
|
||||
quickstart_codes = []
|
||||
|
||||
for entry in results:
|
||||
try:
|
||||
titles.append(entry.get("Title", "Unknown Title"))
|
||||
urls.append(entry.get("URL", "No URL Provided"))
|
||||
views_list.append(entry.get("Views", "No View Count"))
|
||||
references_list.append(entry.get("References", []))
|
||||
quickstart_codes.append(entry.get("Quickstart_Code", "Code coming soon"))
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing yt resulr entry: {e}")
|
||||
continue
|
||||
|
||||
return {
|
||||
"titles": titles,
|
||||
"urls": urls,
|
||||
"views": views_list,
|
||||
"references": references_list,
|
||||
"quickstart_codes": quickstart_codes
|
||||
}
|
||||
@@ -1,201 +0,0 @@
|
||||
import os
|
||||
import requests
|
||||
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
||||
from langchain import PromptTemplate
|
||||
from langchain.chains.summarize import load_summarize_chain
|
||||
from bs4 import BeautifulSoup
|
||||
from langchain.chat_models import ChatOpenAI
|
||||
from dotenv import load_dotenv
|
||||
import json
|
||||
from autogen import config_list_from_json
|
||||
from autogen.agentchat.contrib.gpt_assistant_agent import GPTAssistantAgent
|
||||
from autogen import UserProxyAgent
|
||||
import autogen
|
||||
|
||||
|
||||
load_dotenv()
|
||||
brwoserless_api_key = os.getenv("BROWSERLESS_API_KEY")
|
||||
serper_api_key = os.getenv("SERP_API_KEY")
|
||||
airtable_api_key = os.getenv("AIRTABLE_API_KEY")
|
||||
config_list = config_list_from_json("OAI_CONFIG_LIST")
|
||||
|
||||
|
||||
# ------------------ Create functions ------------------ #
|
||||
|
||||
# Function for google search
|
||||
def google_search(search_keyword):
|
||||
url = "https://google.serper.dev/search"
|
||||
|
||||
payload = json.dumps({
|
||||
"q": search_keyword
|
||||
})
|
||||
|
||||
headers = {
|
||||
'X-API-KEY': serper_api_key,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
response = requests.request("POST", url, headers=headers, data=payload)
|
||||
print("RESPONSE:", response.text)
|
||||
return response.text
|
||||
|
||||
# Function for scraping
|
||||
def summary(objective, content):
|
||||
llm = ChatOpenAI(temperature = 0, model = "gpt-3.5-turbo-16k-0613")
|
||||
|
||||
text_splitter = RecursiveCharacterTextSplitter(separators=["\n\n", "\n"], chunk_size = 10000, chunk_overlap=500)
|
||||
docs = text_splitter.create_documents([content])
|
||||
|
||||
map_prompt = """
|
||||
Write a summary of the following text for {objective}:
|
||||
"{text}"
|
||||
SUMMARY:
|
||||
"""
|
||||
map_prompt_template = PromptTemplate(template=map_prompt, input_variables=["text", "objective"])
|
||||
|
||||
summary_chain = load_summarize_chain(
|
||||
llm=llm,
|
||||
chain_type='map_reduce',
|
||||
map_prompt = map_prompt_template,
|
||||
combine_prompt = map_prompt_template,
|
||||
verbose = False
|
||||
)
|
||||
|
||||
output = summary_chain.run(input_documents=docs, objective=objective)
|
||||
|
||||
return output
|
||||
|
||||
def web_scraping(objective: str, url: str):
|
||||
#scrape website, and also will summarize the content based on objective if the content is too large
|
||||
#objective is the original objective & task that user give to the agent, url is the url of the website to be scraped
|
||||
|
||||
print("Scraping website...")
|
||||
# Define the headers for the request
|
||||
headers = {
|
||||
'Cache-Control': 'no-cache',
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
|
||||
# Define the data to be sent in the request
|
||||
data = {
|
||||
"url": url
|
||||
}
|
||||
|
||||
# Convert Python object to JSON string
|
||||
data_json = json.dumps(data)
|
||||
|
||||
# Send the POST request
|
||||
response = requests.post(f"https://chrome.browserless.io/content?token={brwoserless_api_key}", headers=headers, data=data_json)
|
||||
|
||||
# Check the response status code
|
||||
if response.status_code == 200:
|
||||
soup = BeautifulSoup(response.content, "html.parser")
|
||||
text = soup.get_text()
|
||||
print("CONTENTTTTTT:", text)
|
||||
if len(text) > 10000:
|
||||
output = summary(objective,text)
|
||||
return output
|
||||
else:
|
||||
return text
|
||||
else:
|
||||
print(f"HTTP request failed with status code {response.status_code}")
|
||||
|
||||
|
||||
# Function for get airtable records
|
||||
def get_airtable_records(base_id, table_id):
|
||||
url = f"https://api.airtable.com/v0/{base_id}/{table_id}"
|
||||
|
||||
headers = {
|
||||
'Authorization': f'Bearer {airtable_api_key}',
|
||||
}
|
||||
|
||||
response = requests.request("GET", url, headers=headers)
|
||||
data = response.json()
|
||||
print(data)
|
||||
return data
|
||||
|
||||
|
||||
# Function for update airtable records
|
||||
|
||||
def update_single_airtable_record(base_id, table_id, id, fields):
|
||||
url = f"https://api.airtable.com/v0/{base_id}/{table_id}"
|
||||
|
||||
headers = {
|
||||
'Authorization': f'Bearer {airtable_api_key}',
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
data = {
|
||||
"records": [{
|
||||
"id": id,
|
||||
"fields": fields
|
||||
}]
|
||||
}
|
||||
|
||||
response = requests.patch(url, headers=headers, data=json.dumps(data))
|
||||
data = response.json()
|
||||
return data
|
||||
|
||||
|
||||
# ------------------ Create agent ------------------ #
|
||||
|
||||
# Create user proxy agent
|
||||
user_proxy = UserProxyAgent(name="user_proxy",
|
||||
is_termination_msg=lambda msg: "TERMINATE" in msg["content"],
|
||||
human_input_mode="ALWAYS",
|
||||
max_consecutive_auto_reply=1
|
||||
)
|
||||
|
||||
# Create researcher agent
|
||||
researcher = GPTAssistantAgent(
|
||||
name = "researcher",
|
||||
llm_config = {
|
||||
"config_list": config_list,
|
||||
"assistant_id": "asst_qyvioid5My8K3SdFClaEnwmB"
|
||||
}
|
||||
)
|
||||
|
||||
researcher.register_function(
|
||||
function_map={
|
||||
"web_scraping": web_scraping,
|
||||
"google_search": google_search
|
||||
}
|
||||
)
|
||||
|
||||
# Create research manager agent
|
||||
research_manager = GPTAssistantAgent(
|
||||
name="research_manager",
|
||||
llm_config = {
|
||||
"config_list": config_list,
|
||||
"assistant_id": "asst_C1Ta5XmmEcYD6vnOSVflnwG9"
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
# Create director agent
|
||||
director = GPTAssistantAgent(
|
||||
name = "director",
|
||||
llm_config = {
|
||||
"config_list": config_list,
|
||||
"assistant_id": "asst_zVBJGch5mOyCYl9H1J3L9Ime",
|
||||
}
|
||||
)
|
||||
|
||||
director.register_function(
|
||||
function_map={
|
||||
"get_airtable_records": get_airtable_records,
|
||||
"update_single_airtable_record": update_single_airtable_record
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
# Create group chat
|
||||
groupchat = autogen.GroupChat(agents=[user_proxy, researcher, research_manager, director], messages=[], max_round=15)
|
||||
group_chat_manager = autogen.GroupChatManager(groupchat=groupchat, llm_config={"config_list": config_list})
|
||||
|
||||
|
||||
# ------------------ start conversation ------------------ #
|
||||
message = """
|
||||
Research the funding stage/amount & pricing for each company in the list: https://airtable.com/appj0J4gFpvLrQWjI/tblF4OmG6oLjYtgZl/viwmFx2ttAVrJm0E3?blocks=hide
|
||||
"""
|
||||
user_proxy.initiate_chat(group_chat_manager, message=message)
|
||||
@@ -14,8 +14,15 @@ logger.add(sys.stdout,
|
||||
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
|
||||
)
|
||||
|
||||
# fixme: Remove the hardcoding, need add another option OR in config ?
|
||||
image_dir = "blog_images"
|
||||
image_dir = os.path.join(os.getcwd(), image_dir)
|
||||
# TBD: This can come from config file.
|
||||
output_path = "blogs"
|
||||
output_path = os.path.join(os.getcwd(), output_path)
|
||||
|
||||
def save_blog_to_file(blog_content, blog_title, blog_meta_desc, blog_tags, blog_categories, main_img_path, output_path, file_type="md"):
|
||||
|
||||
def save_blog_to_file(blog_content, blog_title, blog_meta_desc, blog_tags, blog_categories, main_img_path=None, file_type="md"):
|
||||
"""
|
||||
Saves the provided blog content to a file in the specified format.
|
||||
|
||||
@@ -33,6 +40,7 @@ def save_blog_to_file(blog_content, blog_title, blog_meta_desc, blog_tags, blog_
|
||||
FileNotFoundError: If the output_path does not exist.
|
||||
Exception: If the blog content cannot be written to the file.
|
||||
"""
|
||||
blog_frontmatter = ''
|
||||
# Sanitize and prepare the blog title
|
||||
# Remove colon and ampersand
|
||||
blog_title_md = blog_title.replace(":", "").replace("&", "")
|
||||
@@ -55,18 +63,28 @@ def save_blog_to_file(blog_content, blog_title, blog_meta_desc, blog_tags, blog_
|
||||
dtobj = datetime.datetime.now(ZoneInfo('Asia/Kolkata'))
|
||||
formatted_date = dtobj.strftime('%Y-%m-%d %H:%M:%S %z')
|
||||
blog_title = blog_title.replace(":", "-").replace('"', '')
|
||||
blog_frontmatter = dedent(f"""\
|
||||
---
|
||||
title: {blog_title}
|
||||
date: {formatted_date}
|
||||
categories: [{blog_categories}]
|
||||
tags: [{blog_tags}]
|
||||
description: {blog_meta_desc.replace(":", "-")}
|
||||
img_path: '/assets/'
|
||||
image:
|
||||
path: {os.path.basename(main_img_path)}
|
||||
alt: {blog_title}
|
||||
---\n\n""")
|
||||
if main_img_path:
|
||||
blog_frontmatter = dedent(f"""\
|
||||
---
|
||||
title: {blog_title}
|
||||
date: {formatted_date}
|
||||
categories: [{blog_categories}]
|
||||
tags: [{blog_tags}]
|
||||
description: {blog_meta_desc.replace(":", "-")}
|
||||
img_path: '/assets/'
|
||||
image:
|
||||
path: {os.path.basename(main_img_path)}
|
||||
alt: {blog_title}
|
||||
---\n\n""")
|
||||
else:
|
||||
blog_frontmatter = dedent(f"""\
|
||||
---
|
||||
title: {blog_title}
|
||||
date: {formatted_date}
|
||||
categories: [{blog_categories}]
|
||||
tags: [{blog_tags}]
|
||||
description: {blog_meta_desc.replace(":", "-")}
|
||||
---\n\n""")
|
||||
|
||||
blog_output_path = os.path.join(
|
||||
output_path,
|
||||
|
||||
83
lib/take_url_screenshot.py
Normal file
83
lib/take_url_screenshot.py
Normal file
@@ -0,0 +1,83 @@
|
||||
import os
|
||||
import datetime
|
||||
|
||||
from selenium import webdriver
|
||||
from PIL import Image
|
||||
import shutil
|
||||
from screenshotone import Client, TakeOptions
|
||||
from pathlib import Path
|
||||
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv(Path('../.env'))
|
||||
|
||||
|
||||
def screenshot_api(url, generated_image_filepath):
|
||||
""" Use screenshotone API to take company webpage screenshots """
|
||||
try:
|
||||
# create API client
|
||||
client = Client(os.getenv('SCREENSHOTONE_ACCESS_KEY'), os.getenv('SCREENSHOTONE_SECRET_KEY'))
|
||||
|
||||
# set up options
|
||||
options = (TakeOptions.url(url)
|
||||
.format("png")
|
||||
.viewport_width(1024)
|
||||
.viewport_height(768)
|
||||
.block_cookie_banners(True)
|
||||
.block_chats(True))
|
||||
|
||||
# generate the screenshot URL and share it with a user
|
||||
#url = client.generate_take_url(options)
|
||||
# or render a screenshot and download the image as stream
|
||||
image = client.take(options)
|
||||
|
||||
# store the screenshot the example.png file
|
||||
with open(generated_image_filepath, 'wb') as result_file:
|
||||
shutil.copyfileobj(image, result_file)
|
||||
|
||||
# Display the screenshot using Image.show
|
||||
image = Image.open(generated_image_filepath)
|
||||
image.show()
|
||||
|
||||
except Exception as err:
|
||||
print(f"Failed in screenshotone api: {err}")
|
||||
generated_image_filepath = take_screenshot(url, generated_image_filepath)
|
||||
|
||||
return generated_image_filepath
|
||||
|
||||
|
||||
def take_screenshot(url, generated_image_filepath, full_screenshot):
|
||||
# Create a webdriver instance
|
||||
driver = webdriver.Chrome()
|
||||
|
||||
# Navigate to the given url
|
||||
driver.get(url)
|
||||
|
||||
# Get the height of the webpage
|
||||
page_height = driver.execute_script("return document.body.scrollHeight")
|
||||
|
||||
# Scroll down to the bottom of the webpage
|
||||
for i in range(0, page_height, 100):
|
||||
driver.execute_script(f"window.scrollTo(0, {i})")
|
||||
|
||||
# Get the total height of the webpage
|
||||
total_height = driver.execute_script("return document.body.scrollHeight")
|
||||
|
||||
# Resize the webdriver window to the height of the webpage
|
||||
if full_screenshot:
|
||||
driver.set_window_size(800, total_height)
|
||||
|
||||
# Take a screenshot of the webpage
|
||||
screenshot = driver.get_screenshot_as_png()
|
||||
|
||||
# Close the webdriver instance
|
||||
driver.quit()
|
||||
|
||||
# Save the screenshot to a file
|
||||
with open(generated_image_filepath, "wb") as f:
|
||||
f.write(screenshot)
|
||||
|
||||
# Display the screenshot using Image.show
|
||||
image = Image.open(generated_image_filepath)
|
||||
image.show()
|
||||
|
||||
return generated_image_filepath
|
||||
57
lib/utils/youtube_keyword_research.py
Normal file
57
lib/utils/youtube_keyword_research.py
Normal file
@@ -0,0 +1,57 @@
|
||||
"""
|
||||
At the command line, only need to run once to install the package via pip:
|
||||
|
||||
$ pip install google-generativeai
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
|
||||
import google.generativeai as genai
|
||||
|
||||
def research_yt(keywords):
|
||||
""" Research top youtube videos for given keywords """
|
||||
try:
|
||||
genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
|
||||
except Exception as err:
|
||||
print("Google Gemini Error: {err}")
|
||||
|
||||
# Set up the model
|
||||
generation_config = {
|
||||
"temperature": 0.9,
|
||||
"top_p": 1,
|
||||
"top_k": 1,
|
||||
"max_output_tokens": 2048,
|
||||
}
|
||||
|
||||
safety_settings = [
|
||||
{
|
||||
"category": "HARM_CATEGORY_HARASSMENT",
|
||||
"threshold": "BLOCK_MEDIUM_AND_ABOVE"
|
||||
},
|
||||
{
|
||||
"category": "HARM_CATEGORY_HATE_SPEECH",
|
||||
"threshold": "BLOCK_MEDIUM_AND_ABOVE"
|
||||
},
|
||||
{
|
||||
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
|
||||
"threshold": "BLOCK_MEDIUM_AND_ABOVE"
|
||||
},
|
||||
{
|
||||
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
|
||||
"threshold": "BLOCK_MEDIUM_AND_ABOVE"
|
||||
},
|
||||
]
|
||||
|
||||
model = genai.GenerativeModel(model_name="gemini-pro",
|
||||
generation_config=generation_config,
|
||||
safety_settings=safety_settings)
|
||||
|
||||
prompt_parts = [f"Research 5 latest youtube urls on {keywords}, released this week. Check the number of views and also get the references from youtube video description. REMEMBER to make sure, your response urls are available and valid. For each result, visit their webpages to write detailed quickstart code samples, preferably in python. Your response urls should consist of trending topics on latest {keywords}. Your response should be in json format, so that i can easily parse all the fields. For consistency, always use json key names as Title, URL, Views, References and Quickstart_Code."]
|
||||
|
||||
try:
|
||||
response = model.generate_content(prompt_parts)
|
||||
except Exception as err:
|
||||
print(f"Failed to get response from Gemini Pro.{response}")
|
||||
sys.exit(1)
|
||||
|
||||
return response.text
|
||||
@@ -19,6 +19,38 @@ logger.add(sys.stdout,
|
||||
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
|
||||
)
|
||||
|
||||
## Check if blog needs to be posted on wordpress.
|
||||
#if wordpress:
|
||||
## Fixme: Fetch all tags and categories to check, if present ones are present and
|
||||
## use them else create new ones. Its better to use chatgpt than string comparison.
|
||||
## Similar tags and categories will be missed.
|
||||
## blog_categories =
|
||||
## blog_tags =
|
||||
#logger.info("Uploading the blog to wordpress.\n")
|
||||
#main_img_path = compress_image(main_img_path, quality=85)
|
||||
#try:
|
||||
# img_details = analyze_and_extract_details_from_image(main_img_path)
|
||||
# alt_text = img_details.get('alt_text')
|
||||
# img_description = img_details.get('description')
|
||||
# img_title = img_details.get('title')
|
||||
# caption = img_details.get('caption')
|
||||
# try:
|
||||
# media = upload_media(wordpress_url, wordpress_username, wordpress_password,
|
||||
# main_img_path, alt_text, img_description, img_title, caption)
|
||||
# except Exception as err:
|
||||
# sys.exit(f"Error occurred in upload_media: {err}")
|
||||
#except Exception as e:
|
||||
# sys.exit(f"Error occurred in analyze_and_extract_details_from_image: {e}")
|
||||
#
|
||||
## Then create the post with the uploaded media as the featured image
|
||||
#media_id = media['id']
|
||||
#blog_markdown_str = convert_markdown_to_html(blog_markdown_str)
|
||||
#try:
|
||||
# upload_blog_post(wordpress_url, wordpress_username, wordpress_password, a_blog_topic,
|
||||
# blog_markdown_str, media_id, blog_meta_desc, blog_categories, blog_tags, status='publish')
|
||||
#except Exception as err:
|
||||
# sys.exit(f"Failed to upload blog to wordpress.Error: {err}")
|
||||
|
||||
|
||||
def compress_image(image_path, quality=85):
|
||||
"""
|
||||
|
||||
@@ -38,7 +38,7 @@ def youtube_to_blog(video_url):
|
||||
|
||||
try:
|
||||
# Summarizing the content of the YouTube video
|
||||
audio_blog_content = summarize_youtube_video_openai(audio_text, "gemini")
|
||||
audio_blog_content = summarize_youtube_video(audio_text, "gemini")
|
||||
logger.info("Successfully converted given URL to blog article.")
|
||||
return audio_blog_content, audio_title
|
||||
except Exception as e:
|
||||
@@ -47,7 +47,7 @@ def youtube_to_blog(video_url):
|
||||
return audio_blog_content
|
||||
|
||||
|
||||
def summarize_youtube_video(user_contenti, gpt_providers):
|
||||
def summarize_youtube_video(user_content, gpt_providers):
|
||||
"""Generates a summary of a YouTube video using OpenAI GPT-3 and displays a progress bar.
|
||||
Args:
|
||||
video_link: The URL of the YouTube video to summarize.
|
||||
|
||||
81
pseo_main.py
81
pseo_main.py
@@ -7,9 +7,12 @@ Allows the user to specify various parameters for blog generation without needin
|
||||
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
import argparse
|
||||
import requests
|
||||
from loguru import logger
|
||||
import csv
|
||||
import json
|
||||
|
||||
# Logger configuration
|
||||
logger.remove()
|
||||
@@ -17,6 +20,8 @@ logger.add(sys.stdout, colorize=True, format="<level>{level}</level>|<green>{fil
|
||||
|
||||
# Importing custom functions
|
||||
from lib.get_text_response import generate_detailed_blog, generate_youtube_blog
|
||||
from lib.main_youtube_research_blog import generate_youtube_research_blog
|
||||
from lib.main_keywords_to_blog import generate_keyword_blog
|
||||
|
||||
|
||||
def parse_arguments():
|
||||
@@ -25,9 +30,8 @@ def parse_arguments():
|
||||
Returns:
|
||||
argparse.Namespace: Parsed arguments.
|
||||
"""
|
||||
|
||||
parser = argparse.ArgumentParser(description="Generate blogs based on user input.")
|
||||
parser.add_argument("--num_blogs", type=int, default=5, help="Number of blogs to generate (default: 5).")
|
||||
parser.add_argument("--csv", type=str, help="Provide path csv file. Check the template csv for example.")
|
||||
parser.add_argument("--keywords", type=str, help="Keywords for blog generation.")
|
||||
parser.add_argument("--niche", action='store_true', default=False, help="Flag to generate niche blogs (default: False).")
|
||||
parser.add_argument("--num_subtopics", type=int, default=6, help="Number of subtopics per blog (default: 6).")
|
||||
@@ -59,7 +63,7 @@ def main():
|
||||
args = parse_arguments()
|
||||
logger.info("Fetch and Validate Openai key.")
|
||||
# Validate user input
|
||||
if not args.keywords and not args.youtube_urls:
|
||||
if not args.keywords and not args.youtube_urls and not args.csv:
|
||||
raise ValueError("Either --keywords or --youtube_urls must be provided.")
|
||||
|
||||
# Validate OpenAI API key
|
||||
@@ -72,17 +76,80 @@ def main():
|
||||
# Handle blog generation based on input
|
||||
if args.youtube_urls:
|
||||
yt_urls = args.youtube_urls.split(",")
|
||||
logger.info(f"Generating blogs from YouTube URLs: {yt_urls}")
|
||||
generate_youtube_blog(yt_urls)
|
||||
valid_urls = [url for url in yt_urls if is_valid_url(url)]
|
||||
quoted_strings = [url for url in yt_urls if not is_valid_url(url)]
|
||||
|
||||
if valid_urls:
|
||||
logger.info(f"Generating blogs from YouTube URLs: {valid_urls}")
|
||||
generate_youtube_blog(valid_urls)
|
||||
if quoted_strings:
|
||||
logger.info(f"Do youtube research and write blogs for: {quoted_strings}")
|
||||
generate_youtube_research_blog(quoted_strings)
|
||||
|
||||
elif args.keywords:
|
||||
logger.info(f"Generating {args.num_blogs} blogs on '{args.keywords}' with {args.num_subtopics} subtopics.")
|
||||
generate_detailed_blog(args.num_blogs, args.keywords, args.niche,
|
||||
args.num_subtopics, args.wordpress, args.output_format)
|
||||
#generate_detailed_blog(args.num_blogs, args.keywords, args.niche,
|
||||
# args.num_subtopics, args.wordpress, args.output_format)
|
||||
keyword_list = args.keywords.split(",")
|
||||
generate_keyword_blog(keyword_list)
|
||||
|
||||
elif args.csv:
|
||||
try:
|
||||
data = read_csv_to_json(args.csv)
|
||||
logger.info(f"Generating blogs from csv file: {json.dumps(data, indent=4)}")
|
||||
for item in data:
|
||||
keyword_list = [item['keyword']]
|
||||
generate_keyword_blog(keyword_list, item['URL'])
|
||||
except Exception as err:
|
||||
logger.error(f"Failed to generate blogs the CSV file:{err}")
|
||||
sys.exit(1)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def read_csv_to_json(file_path):
|
||||
# Initialize a list to store JSON objects
|
||||
json_data = []
|
||||
|
||||
try:
|
||||
# Read the CSV file
|
||||
with open(file_path, newline='', encoding='utf-8') as csvfile:
|
||||
reader = csv.DictReader(csvfile)
|
||||
|
||||
# Iterate over each row and convert it to a JSON object
|
||||
for row in reader:
|
||||
json_data.append(row)
|
||||
|
||||
return json_data
|
||||
except Exception as err:
|
||||
logger.error(f"Failed to read the CSV file:{err}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def is_valid_url(url):
|
||||
"""
|
||||
Check if the given string is a valid URL.
|
||||
|
||||
Args:
|
||||
url (str): String to check.
|
||||
|
||||
Returns:
|
||||
bool: True if the string is a valid URL, False otherwise.
|
||||
"""
|
||||
# Regular expression to check for a valid URL
|
||||
url_pattern = re.compile(
|
||||
r'^(?:http|ftp)s?://' # http:// or https://
|
||||
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
|
||||
r'localhost|' # localhost...
|
||||
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4
|
||||
r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6
|
||||
r'(?::\d+)?' # optional port
|
||||
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
|
||||
|
||||
return re.match(url_pattern, url) is not None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user