Gemini AI common code and utils

This commit is contained in:
ajaysi
2025-04-11 17:47:55 +05:30
parent b556edb989
commit e41be5789a
18 changed files with 590 additions and 262 deletions

2
.gitignore vendored
View File

@@ -52,6 +52,8 @@ web_research_report*
*.venv_*_*_* *.venv_*_*_*
*.venv_*_*_*_* *.venv_*_*_*_*
website_analyzer.log
*venv *venv
venv_new* venv_new*
venv_* venv_*

View File

@@ -2,7 +2,9 @@ import os
import json import json
import streamlit as st import streamlit as st
from tenacity import retry, stop_after_attempt, wait_random_exponential from tenacity import retry, stop_after_attempt, wait_random_exponential
import google.generativeai as genai from loguru import logger
import sys
from ..gpt_providers.text_generation.main_text_generation import llm_text_gen from ..gpt_providers.text_generation.main_text_generation import llm_text_gen
@@ -103,5 +105,6 @@ def generate_blog_metadesc(keywords, tone, search_type, language):
try: try:
return llm_text_gen(prompt) return llm_text_gen(prompt)
except Exception as err: except Exception as err:
logger.error(f"Error generating meta description: {err}")
st.error(f"💥 Error: Failed to generate response from LLM: {err}") st.error(f"💥 Error: Failed to generate response from LLM: {err}")
return None return None

View File

@@ -7,27 +7,29 @@
import os import os
from pathlib import Path from pathlib import Path
from dotenv import load_dotenv from dotenv import load_dotenv
from google.api_core import retry
import google.generativeai as genai
from pprint import pprint from pprint import pprint
from loguru import logger
import sys
from ..gpt_providers.text_generation.main_text_generation import llm_text_gen
def generate_with_retry(model, prompt): def generate_with_retry(prompt, system_prompt=None):
""" """
Generates content from the model with retry handling for errors. Generates content using the llm_text_gen function with retry handling for errors.
Parameters: Parameters:
model (GenerativeModel): The generative model to use for content generation.
prompt (str): The prompt to generate content from. prompt (str): The prompt to generate content from.
system_prompt (str, optional): Custom system prompt to use instead of the default one.
Returns: Returns:
str: The generated content. str: The generated content.
""" """
try: try:
# FIXME: Need a progress bar here. # Use llm_text_gen instead of directly calling the model
return model.generate_content(prompt, request_options={'retry':retry.Retry()}) return llm_text_gen(prompt, system_prompt)
except Exception as e: except Exception as e:
print(f"Error generating content: {e}") logger.error(f"Error generating content: {e}")
return "" return ""
@@ -36,11 +38,12 @@ def ai_essay_generator(essay_title, selected_essay_type, selected_education_leve
Write an Essay using prompt chaining and iterative generation. Write an Essay using prompt chaining and iterative generation.
Parameters: Parameters:
persona (str): The persona statement for the author. essay_title (str): The title or topic of the essay.
story_genre (str): The genre of the story. selected_essay_type (str): The type of essay to write.
characters (str): The characters in the story. selected_education_level (str): The education level of the target audience.
selected_num_pages (int): The number of pages or words for the essay.
""" """
print(f"Starting to write Essay on {essay_title}..") logger.info(f"Starting to write Essay on {essay_title}..")
try: try:
# Define persona and writing guidelines # Define persona and writing guidelines
guidelines = f'''\ guidelines = f'''\
@@ -127,59 +130,55 @@ def ai_essay_generator(essay_title, selected_essay_type, selected_education_leve
{guidelines} {guidelines}
''' '''
# Configure generative AI
load_dotenv(Path('../.env'))
genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
# Initialize the generative model
model = genai.GenerativeModel('gemini-pro')
# Generate prompts # Generate prompts
try: try:
premise = generate_with_retry(model, premise_prompt).text premise = generate_with_retry(premise_prompt)
print(f"The title of the Essay is: {premise}") logger.info(f"The title of the Essay is: {premise}")
except Exception as err: except Exception as err:
print(f"Essay title Generation Error: {err}") logger.error(f"Essay title Generation Error: {err}")
return return
outline = generate_with_retry(model, outline_prompt.format(premise=premise)).text outline = generate_with_retry(outline_prompt.format(premise=premise))
print(f"The Outline of the essay is: {outline}\n\n") logger.info(f"The Outline of the essay is: {outline}\n\n")
if not outline: if not outline:
print("Failed to generate Essay outline. Exiting...") logger.error("Failed to generate Essay outline. Exiting...")
return return
try: try:
starting_draft = generate_with_retry(model, starting_draft = generate_with_retry(
starting_prompt.format(premise=premise, outline=outline)).text starting_prompt.format(premise=premise, outline=outline))
pprint(starting_draft) pprint(starting_draft)
except Exception as err: except Exception as err:
print(f"Failed to Generate Essay draft: {err}") logger.error(f"Failed to Generate Essay draft: {err}")
return return
try: try:
draft = starting_draft draft = starting_draft
continuation = generate_with_retry(model, continuation = generate_with_retry(
continuation_prompt.format(premise=premise, outline=outline, story_text=draft)).text continuation_prompt.format(premise=premise, outline=outline, story_text=draft))
pprint(continuation) pprint(continuation)
except Exception as err: except Exception as err:
print(f"Failed to write the initial draft: {err}") logger.error(f"Failed to write the initial draft: {err}")
# Add the continuation to the initial draft, keep building the story until we see 'IAMDONE' # Add the continuation to the initial draft, keep building the story until we see 'IAMDONE'
try: try:
draft += '\n\n' + continuation draft += '\n\n' + continuation
except Exception as err: except Exception as err:
print(f"Failed as: {err} and {continuation}") logger.error(f"Failed as: {err} and {continuation}")
while 'IAMDONE' not in continuation: while 'IAMDONE' not in continuation:
try: try:
continuation = generate_with_retry(model, continuation = generate_with_retry(
continuation_prompt.format(premise=premise, outline=outline, story_text=draft)).text continuation_prompt.format(premise=premise, outline=outline, story_text=draft))
draft += '\n\n' + continuation draft += '\n\n' + continuation
except Exception as err: except Exception as err:
print(f"Failed to continually write the Essay: {err}") logger.error(f"Failed to continually write the Essay: {err}")
return return
# Remove 'IAMDONE' and print the final story # Remove 'IAMDONE' and print the final story
final = draft.replace('IAMDONE', '').strip() final = draft.replace('IAMDONE', '').strip()
pprint(final) pprint(final)
return final
except Exception as e: except Exception as e:
print(f"Main Essay writing: An error occurred: {e}") logger.error(f"Main Essay writing: An error occurred: {e}")
return ""

View File

@@ -15,14 +15,14 @@ from pathlib import Path
from ...gpt_providers.text_generation.main_text_generation import llm_text_gen from ...gpt_providers.text_generation.main_text_generation import llm_text_gen
from .modules.post_generator import write_fb_post from .modules.post_generator import write_fb_post
from .modules.story_generator import write_fb_story from .modules.story_generator import write_fb_story
from .modules.reel_generator import write_fb_reel #from .modules.reel_generator import write_fb_reel
from .modules.carousel_generator import write_fb_carousel #from .modules.carousel_generator import write_fb_carousel
from .modules.event_generator import write_fb_event #from .modules.event_generator import write_fb_event
from .modules.group_post_generator import write_fb_group_post #from .modules.group_post_generator import write_fb_group_post
from .modules.page_about_generator import write_fb_page_about #from .modules.page_about_generator import write_fb_page_about
from .modules.ad_copy_generator import write_fb_ad_copy #from .modules.ad_copy_generator import write_fb_ad_copy
from .modules.hashtag_generator import write_fb_hashtags #from .modules.hashtag_generator import write_fb_hashtags
from .modules.engagement_analyzer import analyze_fb_engagement #from .modules.engagement_analyzer import analyze_fb_engagement
#from streamlit_quill import st_quill #from streamlit_quill import st_quill

View File

@@ -6,8 +6,8 @@ and optimization options.
""" """
import streamlit as st import streamlit as st
from ...gpt_providers.text_generation.main_text_generation import llm_text_gen from ....gpt_providers.text_generation.main_text_generation import llm_text_gen
from ...gpt_providers.image_generation.main_image_generation import generate_image from ....gpt_providers.text_to_image_generation.main_generate_image_from_prompt import generate_image
def write_fb_post(): def write_fb_post():

View File

@@ -6,8 +6,8 @@ and customization options.
""" """
import streamlit as st import streamlit as st
from ...gpt_providers.text_generation.main_text_generation import llm_text_gen from ....gpt_providers.text_generation.main_text_generation import llm_text_gen
from ...gpt_providers.image_generation.main_image_generation import generate_image from ....gpt_providers.text_to_image_generation.main_generate_image_from_prompt import generate_image
def write_fb_story(): def write_fb_story():

View File

@@ -6,27 +6,29 @@
import os import os
from pathlib import Path from pathlib import Path
from google.api_core import retry
import google.generativeai as genai
import streamlit as st import streamlit as st
from loguru import logger
import sys
from ...gpt_providers.text_generation.main_text_generation import llm_text_gen
def generate_with_retry(model, prompt): def generate_with_retry(prompt, system_prompt=None):
""" """
Generates content from the model with retry handling for errors. Generates content using the llm_text_gen function with retry handling for errors.
Parameters: Parameters:
model (GenerativeModel): The generative model to use for content generation.
prompt (str): The prompt to generate content from. prompt (str): The prompt to generate content from.
system_prompt (str, optional): Custom system prompt to use instead of the default one.
Returns: Returns:
str: The generated content. str: The generated content.
""" """
try: try:
# FIXME: Need a progress bar here. # Use llm_text_gen instead of directly calling the model
return model.generate_content(prompt, request_options={'retry':retry.Retry()}) return llm_text_gen(prompt, system_prompt)
except Exception as e: except Exception as e:
print(f"Error generating content: {e}") logger.error(f"Error generating content: {e}")
return "" return ""
@@ -38,8 +40,15 @@ def ai_story(persona, story_setting, character_input,
Parameters: Parameters:
persona (str): The persona statement for the author. persona (str): The persona statement for the author.
story_genre (str): The genre of the story. story_setting (str): The setting of the story.
characters (str): The characters in the story. character_input (str): The characters in the story.
plot_elements (str): The plot elements of the story.
writing_style (str): The writing style of the story.
story_tone (str): The tone of the story.
narrative_pov (str): The narrative point of view.
audience_age_group (str): The target audience age group.
content_rating (str): The content rating of the story.
ending_preference (str): The preferred ending of the story.
""" """
st.info(f""" st.info(f"""
You have chosen to create a story set in **{story_setting}**. You have chosen to create a story set in **{story_setting}**.
@@ -170,20 +179,16 @@ def ai_story(persona, story_setting, character_input,
{guidelines} {guidelines}
''' '''
genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
# Initialize the generative model
model = genai.GenerativeModel('gemini-1.5-flash')
# Generate prompts # Generate prompts
try: try:
premise = generate_with_retry(model, premise_prompt).text premise = generate_with_retry(premise_prompt)
st.info(f"The premise of the story is: {premise}") st.info(f"The premise of the story is: {premise}")
except Exception as err: except Exception as err:
st.error(f"Premise Generation Error: {err}") st.error(f"Premise Generation Error: {err}")
return return
outline = generate_with_retry(model, outline_prompt.format(premise=premise)).text outline = generate_with_retry(outline_prompt.format(premise=premise))
with st.expander("Click to Checkout the outline, writing still in progress.."): with st.expander("Click to Checkout the outline, writing still in progress.."):
st.markdown(f"The Outline of the story is: {outline}\n\n") st.markdown(f"The Outline of the story is: {outline}\n\n")
@@ -193,16 +198,16 @@ def ai_story(persona, story_setting, character_input,
# Generate starting draft # Generate starting draft
try: try:
starting_draft = generate_with_retry(model, starting_draft = generate_with_retry(
starting_prompt.format(premise=premise, outline=outline)).text starting_prompt.format(premise=premise, outline=outline))
except Exception as err: except Exception as err:
st.error(f"Failed to Generate Story draft: {err}") st.error(f"Failed to Generate Story draft: {err}")
return return
try: try:
draft = starting_draft draft = starting_draft
continuation = generate_with_retry(model, continuation = generate_with_retry(
continuation_prompt.format(premise=premise, outline=outline, story_text=draft)).text continuation_prompt.format(premise=premise, outline=outline, story_text=draft))
except Exception as err: except Exception as err:
st.error(f"Failed to write the initial draft: {err}") st.error(f"Failed to write the initial draft: {err}")
@@ -217,8 +222,8 @@ def ai_story(persona, story_setting, character_input,
while 'IAMDONE' not in continuation: while 'IAMDONE' not in continuation:
try: try:
status.update(label=f"Writing in progress... Current draft length: {len(draft)} characters") status.update(label=f"Writing in progress... Current draft length: {len(draft)} characters")
continuation = generate_with_retry(model, continuation = generate_with_retry(
continuation_prompt.format(premise=premise, outline=outline, story_text=draft)).text continuation_prompt.format(premise=premise, outline=outline, story_text=draft))
draft += '\n\n' + continuation draft += '\n\n' + continuation
except Exception as err: except Exception as err:
st.error(f"Failed to continually write the story: {err}") st.error(f"Failed to continually write the story: {err}")
@@ -230,3 +235,4 @@ def ai_story(persona, story_setting, character_input,
except Exception as e: except Exception as e:
st.error(f"Main Story writing: An error occurred: {e}") st.error(f"Main Story writing: An error occurred: {e}")
return ""

View File

@@ -2,7 +2,6 @@ import sys
import os import os
from textwrap import dedent from textwrap import dedent
from PIL import Image
import json import json
import asyncio import asyncio
from pathlib import Path from pathlib import Path
@@ -23,8 +22,7 @@ from ..blog_metadata.get_blog_metadata import blog_metadata
from ..blog_postprocessing.save_blog_to_file import save_blog_to_file from ..blog_postprocessing.save_blog_to_file import save_blog_to_file
from ..gpt_providers.text_to_image_generation.main_generate_image_from_prompt import generate_image from ..gpt_providers.text_to_image_generation.main_generate_image_from_prompt import generate_image
from ..gpt_providers.text_generation.main_text_generation import llm_text_gen from ..gpt_providers.text_generation.main_text_generation import llm_text_gen
from ..gpt_providers.image_to_text_gen.gemini_image_describe import describe_image, analyze_image_with_prompt
import google.generativeai as genai
def blog_from_image(prompt, uploaded_img): def blog_from_image(prompt, uploaded_img):
@@ -97,17 +95,15 @@ def write_blog_from_image(prompt, uploaded_img):
2). Tone and Brand Alignment: Adjust your tone, voice, personality for {blog_characteristics['Blog Tone']} audience. 2). Tone and Brand Alignment: Adjust your tone, voice, personality for {blog_characteristics['Blog Tone']} audience.
3). Make sure your response content length is of {blog_characteristics['Blog Length']} words. 3). Make sure your response content length is of {blog_characteristics['Blog Length']} words.
""" """
logger.info("Generating blog and FAQs from Google web search results.") logger.info("Generating blog and FAQs from image analysis.")
try: try:
#response = llm_text_gen(prompt) # Use the gemini_image_describe function to analyze the image with the custom prompt
genai.configure(api_key=os.getenv('GEMINI_API_KEY')) response = analyze_image_with_prompt(uploaded_img, prompt)
version = 'models/gemini-1.5-flash' if not response:
model = genai.GenerativeModel(version) logger.error("Failed to get response from image analysis")
model_info = genai.get_model(version) return "Failed to generate content from image."
print(f'{version} - input limit: {model_info.input_token_limit}, output limit: {model_info.output_token_limit}') return response
response = model.generate_content([prompt, Image.open(uploaded_img)])
return response.text
except Exception as err: except Exception as err:
logger.error(f"Exit: Failed to get response from LLM: {err}") logger.error(f"Exit: Failed to get response from image analysis: {err}")
exit(1) exit(1)

View File

@@ -15,8 +15,6 @@ from dotenv import load_dotenv
from configparser import ConfigParser from configparser import ConfigParser
import streamlit as st import streamlit as st
from google.api_core import retry
import google.generativeai as genai
from pprint import pprint from pprint import pprint
from textwrap import dedent from textwrap import dedent
@@ -32,22 +30,23 @@ from ..ai_web_researcher.gpt_online_researcher import do_metaphor_ai_research
from ..ai_web_researcher.gpt_online_researcher import do_google_serp_search, do_tavily_ai_search from ..ai_web_researcher.gpt_online_researcher import do_google_serp_search, do_tavily_ai_search
from ..blog_metadata.get_blog_metadata import get_blog_metadata_longform from ..blog_metadata.get_blog_metadata import get_blog_metadata_longform
from ..blog_postprocessing.save_blog_to_file import save_blog_to_file from ..blog_postprocessing.save_blog_to_file import save_blog_to_file
from ..gpt_providers.text_generation.main_text_generation import llm_text_gen
def generate_with_retry(model, prompt): def generate_with_retry(prompt, system_prompt=None):
""" """
Generates content from the model with retry handling for errors. Generates content from the model with retry handling for errors.
Parameters: Parameters:
model (GenerativeModel): The generative model to use for content generation.
prompt (str): The prompt to generate content from. prompt (str): The prompt to generate content from.
system_prompt (str, optional): Custom system prompt to use instead of the default one.
Returns: Returns:
str: The generated content. str: The generated content.
""" """
try: try:
# FIXME: Need a progress bar here. # FIXME: Need a progress bar here.
return model.generate_content(prompt, request_options={'retry':retry.Retry()}) return llm_text_gen(prompt, system_prompt)
except Exception as e: except Exception as e:
logger.error(f"Error generating content: {e}") logger.error(f"Error generating content: {e}")
st.error(f"Error generating content: {e}") st.error(f"Error generating content: {e}")
@@ -57,7 +56,12 @@ def generate_with_retry(model, prompt):
def long_form_generator(content_keywords): def long_form_generator(content_keywords):
""" """
Write long form content using prompt chaining and iterative generation. Write long form content using prompt chaining and iterative generation.
Parameters: Parameters:
content_keywords (str): The main keywords or topic for the long-form content.
Returns:
str: The generated long-form content.
""" """
with st.status("Start Writing Long Form Article, Hold my Beer..", expanded=True) as status: with st.status("Start Writing Long Form Article, Hold my Beer..", expanded=True) as status:
# Read the main_config to define tone, character, personality of the content to be generated. # Read the main_config to define tone, character, personality of the content to be generated.
@@ -122,39 +126,27 @@ def long_form_generator(content_keywords):
writing_guidelines=writing_guidelines writing_guidelines=writing_guidelines
) )
# Configure generative AI
load_dotenv(Path('../.env'))
generation_config = {
"temperature": 0.7,
"top_p": 1,
"max_output_tokens": 8096,
}
genai.configure(api_key=os.getenv('GEMINI_API_KEY'))
# Initialize the generative model
model_flash = genai.GenerativeModel('gemini-1.5-flash', generation_config=generation_config)
model_pro = genai.GenerativeModel('gemini-pro', generation_config=generation_config)
# Do SERP web research for given keywords to generate title and outline. # Do SERP web research for given keywords to generate title and outline.
web_research_result, g_titles = do_google_serp_search(content_keywords) web_research_result, g_titles = do_google_serp_search(content_keywords)
# Generate prompts # Generate prompts
try: try:
content_title = generate_with_retry(model_pro, content_title.format(web_research_result=web_research_result)).text content_title = generate_with_retry(content_title.format(web_research_result=web_research_result))
logger.info(f"The title of the content is: {content_title}") logger.info(f"The title of the content is: {content_title}")
status.update(label=f"The title of the content is: {content_title}") status.update(label=f"The title of the content is: {content_title}")
except Exception as err: except Exception as err:
logger.error(f"Content title Generation Error: {err}") logger.error(f"Content title Generation Error: {err}")
return return False
try: try:
content_outline = generate_with_retry(model_flash, content_outline.format( content_outline = generate_with_retry(content_outline.format(
content_title=content_title, content_title=content_title,
web_research_result=web_research_result)).text web_research_result=web_research_result))
logger.info(f"The content Outline is: {content_outline}\n\n") logger.info(f"The content Outline is: {content_outline}\n\n")
status.update(label=f"Completed with Content Outline.") status.update(label=f"Completed with Content Outline.")
except Exception as err: except Exception as err:
logger.error(f"Failed to generate content outline: {err}") logger.error(f"Failed to generate content outline: {err}")
return False
try: try:
status.update(label=f"Do web research with Tavily to provide context for content creation.") status.update(label=f"Do web research with Tavily to provide context for content creation.")
@@ -170,36 +162,38 @@ def long_form_generator(content_keywords):
except Exception as err: except Exception as err:
logger.error(f"Failed to do Tavily AI search: {err}") logger.error(f"Failed to do Tavily AI search: {err}")
st.error(f"Failed to do Tavily AI search: {err}") st.error(f"Failed to do Tavily AI search: {err}")
return return False
try: try:
starting_draft = generate_with_retry(model_pro, starting_prompt.format( starting_draft = generate_with_retry(starting_prompt.format(
content_title=content_title, content_title=content_title,
content_outline=content_outline, content_outline=content_outline,
web_research_result=web_research_result, web_research_result=web_research_result,
writing_guidelines=writing_guidelines)).text writing_guidelines=writing_guidelines))
except Exception as err: except Exception as err:
st.error(f"Failed to Generate Starting draft: {err}") st.error(f"Failed to Generate Starting draft: {err}")
logger.error(f"Failed to Generate Starting draft: {err}") logger.error(f"Failed to Generate Starting draft: {err}")
return return False
try: try:
logger.info(f"Starting to write on the outline introduction.") logger.info(f"Starting to write on the outline introduction.")
draft = starting_draft draft = starting_draft
continuation = generate_with_retry(model_pro, continuation_prompt.format( continuation = generate_with_retry(continuation_prompt.format(
content_title=content_title, content_title=content_title,
content_outline=content_outline, content_outline=content_outline,
content_text=draft, content_text=draft,
web_research_result=web_research_result, web_research_result=web_research_result,
writing_guidelines=writing_guidelines)).text writing_guidelines=writing_guidelines))
except Exception as err: except Exception as err:
logger.error(f"Failed to write the initial draft: {err}") logger.error(f"Failed to write the initial draft: {err}")
return False
# Add the continuation to the initial draft, keep building the story until we see 'IAMDONE' # Add the continuation to the initial draft, keep building the story until we see 'IAMDONE'
try: try:
draft += '\n\n' + continuation draft += '\n\n' + continuation
except Exception as err: except Exception as err:
logger.error(f"Failed as: {err} and {continuation}") logger.error(f"Failed as: {err} and {continuation}")
return False
logger.info(f"Writing in progress... Current draft length: {len(draft)} characters") logger.info(f"Writing in progress... Current draft length: {len(draft)} characters")
status.update(label=f"Writing in progress... Current draft length: {len(draft)} characters") status.update(label=f"Writing in progress... Current draft length: {len(draft)} characters")
@@ -211,7 +205,7 @@ def long_form_generator(content_keywords):
Content Outline:\n Content Outline:\n
'{content_outline}' '{content_outline}'
""" """
search_words = generate_with_retry(model_flash, search_terms).text search_words = generate_with_retry(search_terms)
status.update(label=f"Search terms from written draft: {search_words}") status.update(label=f"Search terms from written draft: {search_words}")
while 'IAMDONE' not in continuation: while 'IAMDONE' not in continuation:
@@ -230,12 +224,12 @@ def long_form_generator(content_keywords):
# web_research_result = table_data # web_research_result = table_data
try: try:
continuation = generate_with_retry(model_pro, continuation_prompt.format( continuation = generate_with_retry(continuation_prompt.format(
content_title=content_title, content_title=content_title,
content_outline=content_outline, content_outline=content_outline,
content_text=draft, content_text=draft,
web_research_result=web_research_result, web_research_result=web_research_result,
writing_guidelines=writing_guidelines)).text writing_guidelines=writing_guidelines))
draft += '\n\n' + continuation draft += '\n\n' + continuation
logger.info(f"Writing in progress... Current draft length: {len(draft)} characters") logger.info(f"Writing in progress... Current draft length: {len(draft)} characters")
@@ -245,7 +239,7 @@ def long_form_generator(content_keywords):
except Exception as err: except Exception as err:
st.error(f"Failed to continually write long-form content: {err}") st.error(f"Failed to continually write long-form content: {err}")
logger.error(f"Failed to continually write the Essay: {err}") logger.error(f"Failed to continually write the Essay: {err}")
return return False
# Remove 'IAMDONE' and print the final story # Remove 'IAMDONE' and print the final story
final = draft.replace('IAMDONE', '').strip() final = draft.replace('IAMDONE', '').strip()
@@ -267,3 +261,26 @@ def long_form_generator(content_keywords):
logger.info(f"\n\n ################ Finished writing Blog for : {content_keywords} #################### \n") logger.info(f"\n\n ################ Finished writing Blog for : {content_keywords} #################### \n")
with st.expander("**Click to View the final content draft:**"): with st.expander("**Click to View the final content draft:**"):
st.markdown(f"\n{final}\n\n") st.markdown(f"\n{final}\n\n")
return final
def generate_long_form_content(content_keywords):
"""
Main function to generate long-form content based on the provided keywords.
Parameters:
content_keywords (str): The main keywords or topic for the long-form content.
Returns:
str: The generated long-form content.
"""
return long_form_generator(content_keywords)
# Example usage
if __name__ == "__main__":
# Example usage of the function
content_keywords = "artificial intelligence in healthcare"
generated_content = generate_long_form_content(content_keywords)
print(f"Generated content: {generated_content[:100]}...")

View File

@@ -12,8 +12,6 @@ logger.add(sys.stdout,
colorize=True, colorize=True,
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}" format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
) )
import google.generativeai as genai
from google.generativeai import caching
from ..gpt_providers.text_generation.main_text_generation import llm_text_gen from ..gpt_providers.text_generation.main_text_generation import llm_text_gen
@@ -142,37 +140,38 @@ def get_blog_metadata_longform(longform_content):
file.write(longform_content) file.write(longform_content)
print(f"String saved successfully to: {filepath}") print(f"String saved successfully to: {filepath}")
genai.configure(api_key=os.environ['GEMINI_API_KEY']) #genai.configure(api_key=os.environ['GEMINI_API_KEY'])
file_path = genai.upload_file(path=filepath) #file_path = genai.upload_file(path=filepath)
# Wait for the file to finish processing # Wait for the file to finish processing
while file_path.state.name == 'PROCESSING': #while file_path.state.name == 'PROCESSING':
print('Waiting for video to be processed.') # print('Waiting for video to be processed.')
time.sleep(2) # time.sleep(2)
file_path = genai.get_file(video_file.name) # file_path = genai.get_file(video_file.name)
print(f'Video processing complete: {file_path.uri}') #print(f'Video processing complete: {file_path.uri}')
# Create a cache with a 5 minute TTL # Create a cache with a 5 minute TTL
cache = caching.CachedContent.create( #cache = caching.CachedContent.create(
model='models/gemini-1.5-flash-001', # model='models/gemini-1.5-flash-001',
display_name='Alwrity Longform content', # used to identify the cache # display_name='Alwrity Longform content', # used to identify the cache
system_instruction=( # system_instruction=(
'You are an expert file analyzer , and your job is to answer ' # 'You are an expert file analyzer , and your job is to answer '
'the user\'s query based on the file you have access to.' # 'the user\'s query based on the file you have access to.'
), # ),
contents=[file_path], # contents=[file_path],
ttl=datetime.timedelta(minutes=15), # ttl=datetime.timedelta(minutes=15),
) #)
# Construct a GenerativeModel which uses the created cache. # Construct a GenerativeModel which uses the created cache.
model = genai.GenerativeModel.from_cached_content(cached_content=cache) #model = genai.GenerativeModel.from_cached_content(cached_content=cache)
# Query the model # Query the model
response = model.generate_content([( #response = model.generate_content([(
'SUmmarize the given file ' # 'SUmmarize the given file '
'in 10 lines ' # 'in 10 lines '
'list main points')]) # 'list main points')])
#print(response.usage_metadata) #print(response.usage_metadata)
return(response.text) #return(response.text)
return("TBD: Not implemented")

View File

@@ -1,8 +1,75 @@
"""
Gemini Audio Text Generation Module
This module provides a comprehensive interface for working with audio files using Google's Gemini API.
It supports various audio processing capabilities including transcription, summarization, and analysis.
Key Features:
------------
1. Audio Transcription: Convert speech in audio files to text
2. Audio Summarization: Generate concise summaries of audio content
3. Segment Analysis: Analyze specific time segments of audio files
4. Timestamped Transcription: Generate transcriptions with timestamps
5. Token Counting: Count tokens in audio files
6. Format Support: Information about supported audio formats
Supported Audio Formats:
----------------------
- WAV (audio/wav)
- MP3 (audio/mp3)
- AIFF (audio/aiff)
- AAC (audio/aac)
- OGG Vorbis (audio/ogg)
- FLAC (audio/flac)
Technical Details:
----------------
- Each second of audio is represented as 32 tokens
- Maximum supported length of audio data in a single prompt is 9.5 hours
- Audio files are downsampled to 16 Kbps data resolution
- Multi-channel audio is combined into a single channel
Usage:
------
```python
from lib.gpt_providers.audio_to_text_generation.gemini_audio_text import transcribe_audio, summarize_audio
# Basic transcription
transcript = transcribe_audio("path/to/audio.mp3")
print(transcript)
# Summarization
summary = summarize_audio("path/to/audio.mp3")
print(summary)
# Analyze specific segment
segment_analysis = analyze_audio_segment("path/to/audio.mp3", "02:30", "03:29")
print(segment_analysis)
```
Requirements:
------------
- GEMINI_API_KEY environment variable must be set
- google-generativeai Python package
- python-dotenv for environment variable management
- loguru for logging
Dependencies:
------------
- google.genai
- dotenv
- loguru
- os, sys, base64, typing
"""
import os import os
import sys import sys
import base64
import google.generativeai as genai from typing import Optional, Dict, Any, List, Union
from dotenv import load_dotenv from dotenv import load_dotenv
from google import genai
from google.genai import types
from loguru import logger from loguru import logger
logger.remove() logger.remove()
@@ -34,12 +101,13 @@ def configure_google_api():
logger.info("Google Gemini API configured successfully.") logger.info("Google Gemini API configured successfully.")
def transcribe_audio(audio_file_path): def transcribe_audio(audio_file_path: str, prompt: str = "Transcribe the following audio:") -> Optional[str]:
""" """
Transcribes audio using Google's Gemini Pro model. Transcribes audio using Google's Gemini model.
Args: Args:
audio_file_path (str): The path to the audio file to be transcribed. audio_file_path (str): The path to the audio file to be transcribed.
prompt (str, optional): The prompt to guide the transcription. Defaults to "Transcribe the following audio:".
Returns: Returns:
str: The transcribed text from the audio. str: The transcribed text from the audio.
@@ -61,7 +129,7 @@ def transcribe_audio(audio_file_path):
logger.error(error_message) logger.error(error_message)
raise FileNotFoundError(error_message) raise FileNotFoundError(error_message)
# Initialize a Gemini model appropriate for your use case. # Initialize a Gemini model appropriate for audio understanding
model = genai.GenerativeModel(model_name="gemini-1.5-flash") model = genai.GenerativeModel(model_name="gemini-1.5-flash")
# Upload the audio file # Upload the audio file
@@ -79,7 +147,7 @@ def transcribe_audio(audio_file_path):
# Generate the transcription # Generate the transcription
try: try:
response = model.generate_content([ response = model.generate_content([
"Transcribe the following audio:", prompt,
audio_file audio_file
]) ])
@@ -99,3 +167,143 @@ def transcribe_audio(audio_file_path):
except Exception as e: except Exception as e:
logger.error(f"An unexpected error occurred: {e}") logger.error(f"An unexpected error occurred: {e}")
return None return None
def summarize_audio(audio_file_path: str) -> Optional[str]:
"""
Summarizes the content of an audio file using Google's Gemini model.
Args:
audio_file_path (str): The path to the audio file to be summarized.
Returns:
str: A summary of the audio content.
Returns None if summarization fails.
"""
return transcribe_audio(audio_file_path, prompt="Please summarize the audio content:")
def analyze_audio_segment(audio_file_path: str, start_time: str, end_time: str) -> Optional[str]:
"""
Analyzes a specific segment of an audio file using timestamps.
Args:
audio_file_path (str): The path to the audio file.
start_time (str): Start time in MM:SS format.
end_time (str): End time in MM:SS format.
Returns:
str: Analysis of the specified audio segment.
Returns None if analysis fails.
"""
prompt = f"Analyze the audio content from {start_time} to {end_time}."
return transcribe_audio(audio_file_path, prompt=prompt)
def transcribe_with_timestamps(audio_file_path: str) -> Optional[str]:
"""
Transcribes audio with timestamps for each segment.
Args:
audio_file_path (str): The path to the audio file.
Returns:
str: Transcription with timestamps.
Returns None if transcription fails.
"""
return transcribe_audio(audio_file_path, prompt="Transcribe the audio with timestamps for each segment:")
def count_tokens(audio_file_path: str) -> Optional[int]:
"""
Counts the number of tokens in an audio file.
Args:
audio_file_path (str): The path to the audio file.
Returns:
int: Number of tokens in the audio file.
Returns None if counting fails.
"""
try:
# Load environment variables and configure the Google API
load_environment()
configure_google_api()
logger.info(f"Attempting to count tokens in audio file: {audio_file_path}")
# Check if file exists
if not os.path.exists(audio_file_path):
error_message = f"FileNotFoundError: The audio file at {audio_file_path} does not exist."
logger.error(error_message)
raise FileNotFoundError(error_message)
# Initialize a Gemini model
model = genai.GenerativeModel(model_name="gemini-1.5-flash")
# Upload the audio file
try:
audio_file = genai.upload_file(audio_file_path)
logger.info(f"Audio file uploaded successfully: {audio_file=}")
except Exception as e:
logger.error(f"Error uploading audio file: {e}")
return None
# Count tokens
try:
response = model.count_tokens([audio_file])
token_count = response.total_tokens
logger.info(f"Token count: {token_count}")
return token_count
except Exception as e:
logger.error(f"Error counting tokens: {e}")
return None
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
return None
def get_supported_formats() -> List[str]:
"""
Returns a list of supported audio formats.
Returns:
List[str]: List of supported MIME types.
"""
return [
"audio/wav",
"audio/mp3",
"audio/aiff",
"audio/aac",
"audio/ogg",
"audio/flac"
]
# Example usage
if __name__ == "__main__":
# Example 1: Basic transcription
audio_path = "path/to/your/audio.mp3"
transcript = transcribe_audio(audio_path)
print(f"Transcript: {transcript}")
# Example 2: Summarization
summary = summarize_audio(audio_path)
print(f"Summary: {summary}")
# Example 3: Analyze specific segment
segment_analysis = analyze_audio_segment(audio_path, "02:30", "03:29")
print(f"Segment Analysis: {segment_analysis}")
# Example 4: Transcription with timestamps
timestamped_transcript = transcribe_with_timestamps(audio_path)
print(f"Timestamped Transcript: {timestamped_transcript}")
# Example 5: Count tokens
token_count = count_tokens(audio_path)
print(f"Token Count: {token_count}")
# Example 6: Get supported formats
formats = get_supported_formats()
print(f"Supported Formats: {formats}")

View File

@@ -0,0 +1,116 @@
"""
Gemini Image Description Module
This module provides functionality to generate text descriptions of images using Google's Gemini API.
"""
import os
import sys
from typing import Optional, Union, List
from google import genai
from PIL import Image
from dotenv import load_dotenv
from loguru import logger
logger.remove()
logger.add(sys.stdout,
colorize=True,
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
)
def describe_image(image_path: str, prompt: str = "Describe this image in detail:") -> Optional[str]:
"""
Generate a text description of an image using Google's Gemini API.
Parameters:
image_path (str): Path to the image file.
prompt (str, optional): Custom prompt to guide the image description.
Defaults to "Describe this image in detail:".
Returns:
Optional[str]: The generated description of the image, or None if an error occurs.
Raises:
FileNotFoundError: If the image file does not exist.
ValueError: If the API key is not set.
"""
try:
# Load environment variables
load_dotenv()
# Check if API key is set
api_key = os.getenv('GEMINI_API_KEY')
if not api_key:
error_message = "GEMINI_API_KEY environment variable is not set"
logger.error(error_message)
raise ValueError(error_message)
# Check if image file exists
if not os.path.exists(image_path):
error_message = f"Image file not found: {image_path}"
logger.error(error_message)
raise FileNotFoundError(error_message)
# Initialize the Gemini client
client = genai.Client(api_key=api_key)
# Open and process the image
try:
image = Image.open(image_path)
logger.info(f"Successfully opened image: {image_path}")
except Exception as e:
error_message = f"Failed to open image: {e}"
logger.error(error_message)
return None
# Generate content description
try:
response = client.models.generate_content(
model='gemini-2.0-flash',
contents=[
prompt,
image
]
)
# Extract and return the text
description = response.text
logger.info(f"Successfully generated description for image: {image_path}")
return description
except Exception as e:
error_message = f"Failed to generate content: {e}"
logger.error(error_message)
return None
except Exception as e:
error_message = f"An unexpected error occurred: {e}"
logger.error(error_message)
return None
def analyze_image_with_prompt(image_path: str, prompt: str) -> Optional[str]:
"""
Analyze an image with a custom prompt using Google's Gemini API.
Parameters:
image_path (str): Path to the image file.
prompt (str): Custom prompt for analyzing the image.
Returns:
Optional[str]: The generated analysis of the image, or None if an error occurs.
"""
return describe_image(image_path, prompt)
# Example usage
if __name__ == "__main__":
# Example usage of the function
image_path = "path/to/your/image.jpg"
description = describe_image(image_path)
if description:
print(f"Image description: {description}")
else:
print("Failed to generate image description")

View File

@@ -1,94 +0,0 @@
import requests
import re
import base64
import os
import sys
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
) # for exponential backoff
@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def analyze_and_extract_details_from_image(image_path):
"""
Analyzes an image using OpenAI's Vision API to extract Alt Text, Description, Title, and Caption.
This function encodes an image to a base64 string and sends a request to the OpenAI API.
It interprets the contents of the image, returning a textual description.
Args:
image_path (str): Path to the image file.
Returns:
dict: A dictionary with extracted details including Alt Text, Description, Title, and Caption.
None: If an error occurs during processing.
Raises:
SystemExit: If a critical error occurs that prevents the function from executing successfully.
"""
try:
logger.info("Starting image analysis using OpenAI's Vision API.")
def encode_image(path):
""" Encodes an image to a base64 string. """
with open(path, "rb", encoding="utf-8") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
base64_image = encode_image(image_path)
logger.info("Image encoded to base64 successfully.")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}"
}
payload = {
"model": "gpt-4-vision-preview",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Analyze the given image and suggest the following: Alternative text(Alt Text), description, title, caption."
},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}
}
]
}
],
"max_tokens": 300
}
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
response.raise_for_status()
assistant_message = response.json()['choices'][0]['message']['content']
logger.info("Received response from OpenAI API.")
# Extracting details using regular expressions
alt_text_match = re.search(r'Alt Text: "(.*?)"', assistant_message)
description_match = re.search(r'Description: (.*?)\n\n', assistant_message)
title_match = re.search(r'Title: "(.*?)"', assistant_message)
caption_match = re.search(r'Caption: "(.*?)"', assistant_message)
image_details = {
'alt_text': alt_text_match.group(1) if alt_text_match else "N/A",
'description': description_match.group(1) if description_match else "N/A",
'title': title_match.group(1) if title_match else "N/A",
'caption': caption_match.group(1) if caption_match else "N/A"
}
logger.info("Image analysis completed successfully.")
return image_details
except requests.RequestException as e:
logger.error(f"GPT-Vision API communication failure. Error: {e}")
sys.exit(f"Exiting due to GPT-Vision API communication failure: {e}")
except Exception as e:
logger.error(f"Unexpected error occurred during image analysis: {e}")
sys.exit(f"Exiting due to an unexpected error: {e}")

View File

@@ -1,7 +1,12 @@
import re import re
import os
import PyPDF2
import tiktoken
import openai
import streamlit as st import streamlit as st
import tempfile import tempfile
from loguru import logger from loguru import logger
from lib.ai_web_researcher.gpt_online_researcher import gpt_web_researcher from lib.ai_web_researcher.gpt_online_researcher import gpt_web_researcher
from lib.ai_writers.keywords_to_blog_streamlit import write_blog_from_keywords from lib.ai_writers.keywords_to_blog_streamlit import write_blog_from_keywords
from lib.ai_writers.speech_to_blog.main_audio_to_blog import generate_audio_blog from lib.ai_writers.speech_to_blog.main_audio_to_blog import generate_audio_blog
@@ -9,7 +14,7 @@ from lib.ai_writers.long_form_ai_writer import long_form_generator
from lib.ai_writers.ai_news_article_writer import ai_news_generation from lib.ai_writers.ai_news_article_writer import ai_news_generation
#from lib.ai_writers.ai_agents_crew_writer import ai_agents_writers #from lib.ai_writers.ai_agents_crew_writer import ai_agents_writers
from lib.ai_writers.ai_financial_writer import write_basic_ta_report from lib.ai_writers.ai_financial_writer import write_basic_ta_report
from lib.ai_writers.ai_facebook_writer.facebook_ai_writer import facebook_post_writer from lib.ai_writers.ai_facebook_writer.facebook_ai_writer import facebook_main_menu
from lib.ai_writers.linkedin_ai_writer import linked_post_writer from lib.ai_writers.linkedin_ai_writer import linked_post_writer
from lib.ai_writers.twitter_ai_writer import tweet_writer from lib.ai_writers.twitter_ai_writer import tweet_writer
from lib.ai_writers.insta_ai_writer import insta_writer from lib.ai_writers.insta_ai_writer import insta_writer
@@ -17,10 +22,6 @@ from lib.ai_writers.youtube_writers.youtube_ai_writer import youtube_main_menu
from lib.ai_writers.web_url_ai_writer import blog_from_url from lib.ai_writers.web_url_ai_writer import blog_from_url
from lib.ai_writers.image_ai_writer import blog_from_image from lib.ai_writers.image_ai_writer import blog_from_image
from lib.ai_writers.ai_essay_writer import ai_essay_generator from lib.ai_writers.ai_essay_writer import ai_essay_generator
import os
import PyPDF2
import tiktoken
import openai
from lib.gpt_providers.text_to_image_generation.main_generate_image_from_prompt import generate_image from lib.gpt_providers.text_to_image_generation.main_generate_image_from_prompt import generate_image
from lib.utils.voice_processing import record_voice from lib.utils.voice_processing import record_voice
#from lib.content_planning_calender.content_planning_agents_alwrity_crew import ai_agents_content_planner #from lib.content_planning_calender.content_planning_agents_alwrity_crew import ai_agents_content_planner
@@ -465,7 +466,7 @@ def ai_social_writer():
# Selectbox for choosing a platform # Selectbox for choosing a platform
selected_platform = st.radio("Choose a Social Media Platform:", social_media_options, format_func=lambda x: x[1]) selected_platform = st.radio("Choose a Social Media Platform:", social_media_options, format_func=lambda x: x[1])
if "facebook" in selected_platform: if "facebook" in selected_platform:
facebook_post_writer() facebook_main_menu()
elif "linkedin" in selected_platform: elif "linkedin" in selected_platform:
linked_post_writer() linked_post_writer()
elif "twitter" in selected_platform: elif "twitter" in selected_platform:

View File

@@ -19,6 +19,17 @@ def render_ai_providers(api_key_manager: APIKeyManager) -> Dict[str, Any]:
"""Render the AI providers setup step.""" """Render the AI providers setup step."""
logger.info("[render_ai_providers] Starting AI providers setup") logger.info("[render_ai_providers] Starting AI providers setup")
try: try:
# Initialize wizard state if not already initialized
if 'wizard_state' not in st.session_state:
st.session_state.wizard_state = {
'current_step': 1,
'total_steps': 6,
'progress': 0,
'completed_steps': set(),
'last_updated': datetime.now()
}
logger.info("[render_ai_providers] Initialized wizard state")
# Store API key manager in session state for update_progress # Store API key manager in session state for update_progress
st.session_state['api_key_manager'] = api_key_manager st.session_state['api_key_manager'] = api_key_manager
@@ -209,6 +220,15 @@ def render_ai_providers(api_key_manager: APIKeyManager) -> Dict[str, Any]:
'google': google_key if validate_api_key(google_key) else None 'google': google_key if validate_api_key(google_key) else None
} }
# Save API keys to .env file
if validate_api_key(openai_key):
api_key_manager.save_api_key("openai", openai_key)
logger.info("[render_ai_providers] OpenAI API key saved to .env file")
if validate_api_key(google_key):
api_key_manager.save_api_key("gemini", google_key)
logger.info("[render_ai_providers] Google Gemini API key saved to .env file")
# Update progress and move to next step # Update progress and move to next step
st.session_state['current_step'] = 2 # Set the next step explicitly st.session_state['current_step'] = 2 # Set the next step explicitly
update_progress() update_progress()

View File

@@ -91,18 +91,15 @@ def render_final_setup(api_key_manager: APIKeyManager) -> Dict[str, Any]:
logger.info("[render_final_setup] User clicked complete setup") logger.info("[render_final_setup] User clicked complete setup")
try: try:
# Verify all required API keys are present and valid # Verify all required API keys are present and valid
is_valid, missing_keys, impact_messages = check_all_api_keys(api_key_manager) is_valid = check_all_api_keys(api_key_manager)
if not is_valid: if not is_valid:
st.error("⚠️ Some required API keys are missing") st.error("⚠️ Some required API keys are missing")
st.markdown("### Missing API Keys and Impact") st.markdown("### Missing API Keys and Impact")
# Display impact messages in a structured way # Display impact messages
for message in impact_messages: st.warning("⚠️ Missing AI Provider: At least one AI provider (OpenAI, Google Gemini, Anthropic Claude, or Mistral) is required.")
if message.startswith("⚠️"): st.warning("⚠️ Missing Research Provider: At least one research provider (SerpAPI, Tavily, Metaphor, or Firecrawl) is required.")
st.error(message)
else:
st.warning(message)
st.markdown(""" st.markdown("""
<div style='background-color: #fff3cd; color: #856404; padding: 1rem; border-radius: 0.25rem; margin-top: 1rem;'> <div style='background-color: #fff3cd; color: #856404; padding: 1rem; border-radius: 0.25rem; margin-top: 1rem;'>

View File

@@ -133,16 +133,74 @@ class APIKeyManager:
except Exception as e: except Exception as e:
logger.error(f"[APIKeyManager.load_api_keys] Error loading API keys: {str(e)}") logger.error(f"[APIKeyManager.load_api_keys] Error loading API keys: {str(e)}")
def save_api_key(self, provider: str, key: str): def save_api_key(self, provider: str, api_key: str) -> bool:
"""Save an API key.""" """
logger.info(f"[APIKeyManager.save_api_key] Saving API key for provider: {provider}") Save an API key for a provider.
Args:
provider: The provider name (e.g., 'openai', 'gemini')
api_key: The API key value
Returns:
bool: True if successful, False otherwise
"""
try: try:
self.api_keys[provider] = key logger.info(f"[APIKeyManager] Saving API key for {provider}")
# Save to environment variable
os.environ[f"{provider.upper()}_API_KEY"] = key # Map provider to environment variable name
logger.info(f"[APIKeyManager.save_api_key] Successfully saved API key for {provider}") env_var_map = {
'openai': 'OPENAI_API_KEY',
'gemini': 'GEMINI_API_KEY',
'mistral': 'MISTRAL_API_KEY',
'anthropic': 'ANTHROPIC_API_KEY',
'serpapi': 'SERPAPI_API_KEY',
'tavily': 'TAVILY_API_KEY',
'metaphor': 'METAPHOR_API_KEY',
'firecrawl': 'FIRECRAWL_API_KEY'
}
env_var = env_var_map.get(provider)
if not env_var:
logger.error(f"[APIKeyManager] Unknown provider: {provider}")
return False
# Update the in-memory dictionary
self.api_keys[provider] = api_key
# Update environment variable
os.environ[env_var] = api_key
# Read existing .env file content
env_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))), '.env')
try:
with open(env_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
except FileNotFoundError:
lines = []
# Update or add the API key
key_found = False
updated_lines = []
for line in lines:
if line.startswith(f"{env_var}="):
updated_lines.append(f"{env_var}={api_key}\n")
key_found = True
else:
updated_lines.append(line)
if not key_found:
updated_lines.append(f"{env_var}={api_key}\n")
# Write back to .env file
with open(env_path, 'w', encoding='utf-8') as f:
f.writelines(updated_lines)
logger.info(f"[APIKeyManager] Successfully saved API key for {provider}")
return True
except Exception as e: except Exception as e:
logger.error(f"[APIKeyManager.save_api_key] Error saving API key: {str(e)}") logger.error(f"[APIKeyManager] Error saving API key for {provider}: {str(e)}")
return False
def get_api_key(self, provider: str) -> Optional[str]: def get_api_key(self, provider: str) -> Optional[str]:
"""Get an API key.""" """Get an API key."""

View File

@@ -6,7 +6,7 @@ from lib.utils.alwrity_utils import ai_social_writer
from lib.utils.seo_tools import ai_seo_tools from lib.utils.seo_tools import ai_seo_tools
from lib.utils.settings_page import render_settings_page from lib.utils.settings_page import render_settings_page
# Import social media writer functions # Import social media writer functions
from lib.ai_writers.ai_facebook_writer.facebook_ai_writer import facebook_post_writer from lib.ai_writers.ai_facebook_writer.facebook_ai_writer import facebook_main_menu
from lib.ai_writers.linkedin_ai_writer import linked_post_writer from lib.ai_writers.linkedin_ai_writer import linked_post_writer
from lib.ai_writers.twitter_ai_writer import tweet_writer from lib.ai_writers.twitter_ai_writer import tweet_writer
from lib.ai_writers.insta_ai_writer import insta_writer from lib.ai_writers.insta_ai_writer import insta_writer