AI Blog Writer enhancements & Streamlit UI updates
This commit is contained in:
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -25,7 +25,6 @@ def display_input_section():
|
||||
# First column: Keywords input
|
||||
with col1:
|
||||
st.markdown("### 📌 Content Source")
|
||||
st.markdown("#### Enter Keywords, Title or URL")
|
||||
user_input = st.text_area(
|
||||
'Power your content with keywords or a website URL',
|
||||
help='Provide keywords, a blog title, YouTube link, or web URL to generate targeted content.',
|
||||
@@ -36,7 +35,6 @@ def display_input_section():
|
||||
# Second column: File uploader
|
||||
with col2:
|
||||
st.markdown("### 📁 File Upload")
|
||||
st.markdown("#### Upload Reference Content")
|
||||
uploaded_file = st.file_uploader(
|
||||
"Add files to enhance your content",
|
||||
type=["txt", "pdf", "docx", "jpg", "jpeg", "png", "mp3", "wav", "mp4", "mkv", "avi"],
|
||||
@@ -46,7 +44,6 @@ def display_input_section():
|
||||
# Third column: Voice input
|
||||
with col3:
|
||||
st.markdown("### 🎤 Voice")
|
||||
st.markdown("#### Record Ideas")
|
||||
audio_input = record_voice()
|
||||
if audio_input:
|
||||
st.success("Voice recorded!")
|
||||
@@ -54,13 +51,20 @@ def display_input_section():
|
||||
return user_input, uploaded_file, audio_input
|
||||
|
||||
|
||||
def display_content_type_selection():
|
||||
"""Display the content type selection section and return the selected type."""
|
||||
def display_content_type_selection(inside_expander=False):
|
||||
"""Display the content type selection section and return the selected type.
|
||||
|
||||
Args:
|
||||
inside_expander (bool): If True, adjust heading levels for display inside an expander.
|
||||
"""
|
||||
# Content options in a cleaner layout
|
||||
st.markdown("### 🔧 Content Configuration")
|
||||
if not inside_expander:
|
||||
st.markdown("### 🔧 Content Configuration")
|
||||
st.markdown("#### Select Content Type")
|
||||
else:
|
||||
st.markdown("#### Content Type")
|
||||
|
||||
# Content type selection with better UI
|
||||
st.markdown("#### Select Content Type")
|
||||
content_type = st.radio(
|
||||
"Choose the format and length of your blog content",
|
||||
["Standard Blog Post", "Comprehensive Long-form", "AI Agent Team (Beta)"],
|
||||
@@ -556,8 +560,11 @@ def display_search_settings_tab():
|
||||
|
||||
def display_advanced_options():
|
||||
"""Display all advanced options tabs and return the selected configurations."""
|
||||
with st.expander("⚙️ Advanced Options", expanded=False):
|
||||
tabs = st.tabs(["Content Characteristics", "Content & Analysis Options", "Blog Images Details", "LLM Options", "Search Settings"])
|
||||
|
||||
with st.expander("⚙️ Advanced Options for Personalization, Analysis, Images, LLM, and Search", expanded=False):
|
||||
content_type, selected_content_type = display_content_type_selection(inside_expander=True)
|
||||
|
||||
tabs = st.tabs(["Personalization", "Analysis Options", "Blog Images Details", "LLM Options", "Search Settings"])
|
||||
|
||||
with tabs[0]: # Content Characteristics
|
||||
blog_params = display_content_characteristics_tab()
|
||||
@@ -574,7 +581,7 @@ def display_advanced_options():
|
||||
with tabs[4]: # Search Settings
|
||||
search_params = display_search_settings_tab()
|
||||
|
||||
return blog_params, content_analysis_params, image_params, llm_params, search_params
|
||||
return content_type, selected_content_type, blog_params, content_analysis_params, image_params, llm_params, search_params
|
||||
|
||||
|
||||
def blog_from_keyword():
|
||||
@@ -583,15 +590,12 @@ def blog_from_keyword():
|
||||
# Get user inputs
|
||||
user_input, uploaded_file, audio_input = display_input_section()
|
||||
|
||||
# Get content type selection
|
||||
content_type, selected_content_type = display_content_type_selection()
|
||||
|
||||
# Display advanced options and get configurations
|
||||
blog_params, content_analysis_params, image_params, llm_params, search_params = display_advanced_options()
|
||||
content_type, selected_content_type, blog_params, content_analysis_params, image_params, llm_params, search_params = display_advanced_options()
|
||||
|
||||
# Generate button with icon and clearer purpose
|
||||
st.markdown("") # Add spacing
|
||||
generate_pressed = st.button("✨ Generate Professional Blog Content", use_container_width=True)
|
||||
generate_pressed = st.button("✨ Generate Blog Content", use_container_width=True)
|
||||
|
||||
# Processing logic
|
||||
if generate_pressed:
|
||||
|
||||
@@ -1,18 +1,24 @@
|
||||
import re
|
||||
import os
|
||||
import json
|
||||
import asyncio
|
||||
from loguru import logger
|
||||
import PyPDF2
|
||||
import streamlit as st
|
||||
import tiktoken
|
||||
import openai
|
||||
from datetime import datetime
|
||||
|
||||
from lib.gpt_providers.text_generation.main_text_generation import llm_text_gen
|
||||
from lib.ai_writers.keywords_to_blog_streamlit import write_blog_from_keywords
|
||||
# Remove the circular import
|
||||
# from lib.ai_writers.ai_blog_writer.keywords_to_blog_streamlit import write_blog_from_keywords
|
||||
from lib.ai_writers.speech_to_blog.main_audio_to_blog import generate_audio_blog
|
||||
from lib.ai_writers.long_form_ai_writer import long_form_generator
|
||||
from lib.ai_writers.web_url_ai_writer import blog_from_url
|
||||
from lib.ai_writers.image_ai_writer import blog_from_image
|
||||
from .blog_from_google_serp import write_blog_google_serp
|
||||
from lib.blog_metadata.get_blog_metadata import blog_metadata
|
||||
from lib.gpt_providers.text_to_image_generation.main_generate_image_from_prompt import generate_image
|
||||
|
||||
# Constants
|
||||
CONFIG_PATH = os.path.join("lib", "workspace", "alwrity_config", "main_config.json")
|
||||
@@ -259,21 +265,247 @@ def process_keywords_input(user_input, search_params, blog_params, selected_cont
|
||||
st.error('Please provide at least two keywords for best results')
|
||||
return False
|
||||
|
||||
# Check for dialog states and handle them directly
|
||||
if st.session_state.get("show_title_dialog", False):
|
||||
st.warning("Please use the main function to handle title refinement dialog")
|
||||
# Clear the dialog state to avoid getting stuck
|
||||
st.session_state.show_title_dialog = False
|
||||
return False
|
||||
|
||||
if st.session_state.get("show_meta_dialog", False):
|
||||
st.warning("Please use the main function to handle meta description refinement dialog")
|
||||
# Clear the dialog state to avoid getting stuck
|
||||
st.session_state.show_meta_dialog = False
|
||||
return False
|
||||
|
||||
if st.session_state.get("show_snippet_dialog", False):
|
||||
st.warning("Please use the main function to handle structured data dialog")
|
||||
# Clear the dialog state to avoid getting stuck
|
||||
st.session_state.show_snippet_dialog = False
|
||||
return False
|
||||
|
||||
try:
|
||||
if selected_content_type == "Normal-length content":
|
||||
st.subheader("Your Generated Blog Post")
|
||||
logger.info(f"Generating standard blog post with parameters: {blog_params}")
|
||||
|
||||
# Ensure all blog parameters are properly passed
|
||||
# This is important as the UI may have settings that aren't in the default blog_params
|
||||
short_blog = write_blog_from_keywords(
|
||||
user_input,
|
||||
search_params=search_params,
|
||||
blog_params=blog_params
|
||||
)
|
||||
st.markdown(short_blog)
|
||||
return True
|
||||
|
||||
# Use a direct approach to generate blog content to avoid nested expanders
|
||||
# Instead of importing write_blog_from_keywords which contains many expanders
|
||||
try:
|
||||
# Show simplified progress UI
|
||||
progress_container = st.container()
|
||||
with progress_container:
|
||||
progress_bar = st.progress(0)
|
||||
status_text = st.empty()
|
||||
|
||||
# Step 1: Initialize and show progress
|
||||
status_text.info("Initializing blog generation...")
|
||||
progress_bar.progress(0.1)
|
||||
|
||||
# Initialize parameters
|
||||
from .blog_ai_research_utils import initialize_parameters
|
||||
search_params, blog_params = initialize_parameters(search_params, blog_params)
|
||||
|
||||
# Step 2: Research phase
|
||||
status_text.info("Researching your topic...")
|
||||
progress_bar.progress(0.2)
|
||||
|
||||
# Perform research using direct function calls
|
||||
from .blog_ai_research_utils import do_google_serp_search, do_tavily_ai_search
|
||||
|
||||
# Do Google search
|
||||
status_text.info("Searching Google for relevant information...")
|
||||
google_result = do_google_serp_search(user_input, max_results=search_params.get("max_results", 10))
|
||||
google_success = google_result and 'results' in google_result and google_result['results']
|
||||
progress_bar.progress(0.4)
|
||||
|
||||
# Do Tavily search if needed
|
||||
tavily_result = None
|
||||
tavily_success = False
|
||||
if not google_success:
|
||||
status_text.info("Performing additional research with Tavily...")
|
||||
tavily_result, _, _ = do_tavily_ai_search(
|
||||
user_input,
|
||||
max_results=search_params.get("max_results", 10),
|
||||
search_depth=search_params.get("search_depth", "basic")
|
||||
)
|
||||
tavily_success = tavily_result is not None
|
||||
progress_bar.progress(0.5)
|
||||
|
||||
# Step 3: Generate content
|
||||
status_text.info("Generating blog content...")
|
||||
progress_bar.progress(0.6)
|
||||
|
||||
# Generate content based on search results
|
||||
from .blog_from_google_serp import write_blog_google_serp
|
||||
|
||||
if google_success:
|
||||
blog_content = write_blog_google_serp(user_input, google_result['results'], blog_params=blog_params)
|
||||
elif tavily_success:
|
||||
blog_content = write_blog_google_serp(user_input, tavily_result, blog_params=blog_params)
|
||||
else:
|
||||
status_text.error("Failed to gather research data. Please try again.")
|
||||
return False
|
||||
|
||||
# Step 4: Generate metadata and image
|
||||
status_text.info("Adding metadata and final touches...")
|
||||
progress_bar.progress(0.8)
|
||||
|
||||
# Import functions from keywords_to_blog_streamlit
|
||||
from .keywords_to_blog_streamlit import generate_audio_version
|
||||
|
||||
# Define a simple update_progress function for compatibility
|
||||
def simple_update_progress(step, total, message):
|
||||
status_text.info(message)
|
||||
progress_bar.progress(step / total)
|
||||
|
||||
# Generate metadata and image
|
||||
# Import only essential functions needed for core processing
|
||||
from .ai_blog_generator_utils import generate_blog_metadata, generate_blog_image
|
||||
try:
|
||||
# Create a proper status object
|
||||
with st.status("Generating metadata and image...", expanded=True) as status:
|
||||
# Generate metadata
|
||||
blog_title, blog_meta_desc, blog_tags, blog_categories, blog_hashtags, blog_slug = generate_blog_metadata(
|
||||
blog_content, user_input, status)
|
||||
|
||||
# Generate featured image if metadata is available
|
||||
generated_image_filepath = None
|
||||
if blog_title and blog_meta_desc:
|
||||
generated_image_filepath = generate_blog_image(
|
||||
blog_title, blog_meta_desc, blog_content, status, blog_tags)
|
||||
|
||||
# Save blog content to file
|
||||
saved_blog_to_file = None
|
||||
from ...blog_postprocessing.save_blog_to_file import save_blog_to_file
|
||||
if blog_title and blog_meta_desc:
|
||||
saved_blog_to_file = save_blog_to_file(
|
||||
blog_content, blog_title, blog_meta_desc, blog_tags,
|
||||
blog_categories, generated_image_filepath)
|
||||
|
||||
# Create metadata dictionary with string conversions for table display
|
||||
metadata = {
|
||||
"blog_title": blog_title or "",
|
||||
"blog_meta_desc": blog_meta_desc or "",
|
||||
"blog_tags": ", ".join(blog_tags) if isinstance(blog_tags, list) else str(blog_tags or ""),
|
||||
"blog_categories": ", ".join(blog_categories) if isinstance(blog_categories, list) else str(blog_categories or ""),
|
||||
"blog_hashtags": blog_hashtags or "",
|
||||
"blog_slug": blog_slug or ""
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error generating metadata or image: {e}")
|
||||
metadata = {
|
||||
"blog_title": "Generated Blog",
|
||||
"blog_meta_desc": "",
|
||||
"blog_tags": "",
|
||||
"blog_categories": "",
|
||||
"blog_hashtags": "",
|
||||
"blog_slug": ""
|
||||
}
|
||||
generated_image_filepath = None
|
||||
saved_blog_to_file = None
|
||||
|
||||
# Clear progress indicators
|
||||
progress_bar.empty()
|
||||
status_text.empty()
|
||||
|
||||
# Final message
|
||||
final_message = st.empty()
|
||||
final_message.success("Blog generation complete!")
|
||||
|
||||
# Display blog content first (without using expanders)
|
||||
st.markdown("## Content")
|
||||
st.markdown(blog_content)
|
||||
|
||||
# Show file save information if available
|
||||
if saved_blog_to_file:
|
||||
st.success(f"✅ Blog saved to: {saved_blog_to_file}")
|
||||
|
||||
# Add the audio generation button
|
||||
st.markdown("---")
|
||||
audio_col1, audio_col2 = st.columns([1, 3])
|
||||
with audio_col1:
|
||||
generate_audio_button = st.button("🔊 Generate Audio Version", use_container_width=True)
|
||||
|
||||
with audio_col2:
|
||||
if generate_audio_button:
|
||||
generate_audio_version(blog_content)
|
||||
|
||||
# Display metadata success message
|
||||
if metadata["blog_title"]:
|
||||
st.success(f"✅ Generated metadata for: {metadata['blog_title']}")
|
||||
|
||||
# Display metadata table (without nesting expanders)
|
||||
st.markdown("---")
|
||||
st.subheader("🏷️ Blog SEO Metadata")
|
||||
st.table({
|
||||
"Metadata": ["Blog Title", "Meta Description", "Tags", "Categories", "Hashtags", "Slug"],
|
||||
"Value": [
|
||||
metadata["blog_title"],
|
||||
metadata["blog_meta_desc"],
|
||||
metadata["blog_tags"],
|
||||
metadata["blog_categories"],
|
||||
metadata["blog_hashtags"],
|
||||
metadata["blog_slug"]
|
||||
]
|
||||
})
|
||||
|
||||
# Display image if available
|
||||
if generated_image_filepath:
|
||||
st.subheader("🖼️ Featured Image")
|
||||
st.image(generated_image_filepath, caption=metadata["blog_title"] or "Featured Image", use_column_width=True)
|
||||
|
||||
# Add regenerate button
|
||||
if st.button("🔄 Regenerate Image", key="regenerate_image_simplified"):
|
||||
# Use the function directly to avoid any nested expanders
|
||||
new_image_path = regenerate_blog_image(
|
||||
metadata["blog_title"],
|
||||
metadata["blog_meta_desc"],
|
||||
blog_content,
|
||||
metadata["blog_tags"]
|
||||
)
|
||||
if new_image_path:
|
||||
st.success("✅ Image regenerated successfully!")
|
||||
st.image(new_image_path, caption=metadata["blog_title"], use_column_width=True)
|
||||
else:
|
||||
st.subheader("🖼️ Featured Image")
|
||||
st.info("No image was generated. Try regenerating the blog.")
|
||||
|
||||
# Add refinement buttons directly, without using helper functions
|
||||
col1, col2 = st.columns(2)
|
||||
with col1:
|
||||
if st.button("🔄 Refine Blog Title", key="refine_title_simplified", use_container_width=True):
|
||||
st.session_state.show_title_dialog = True
|
||||
st.rerun()
|
||||
with col2:
|
||||
if st.button("🔄 Refine Meta Description", key="refine_meta_simplified", use_container_width=True):
|
||||
st.session_state.show_meta_dialog = True
|
||||
st.rerun()
|
||||
|
||||
# Add structured data section directly, without using helper functions
|
||||
st.markdown("---")
|
||||
st.markdown("### Get Structured Data")
|
||||
|
||||
structured_data_col1, structured_data_col2 = st.columns([3, 1])
|
||||
with structured_data_col1:
|
||||
st.info("Rich snippets boost visibility and click-through rates in search results.")
|
||||
with structured_data_col2:
|
||||
if st.button("📊 Generate Rich Snippet", key="snippet_simplified", use_container_width=True):
|
||||
st.session_state.show_snippet_dialog = True
|
||||
st.rerun()
|
||||
|
||||
# Clear the success message after a delay
|
||||
import time
|
||||
time.sleep(3)
|
||||
final_message.empty()
|
||||
|
||||
return True
|
||||
|
||||
except Exception as inner_err:
|
||||
logger.error(f"Error in simplified blog generation: {inner_err}")
|
||||
st.error(f"Failed to generate blog content: {inner_err}")
|
||||
return False
|
||||
|
||||
elif selected_content_type == "Long-form content":
|
||||
logger.info(f"Generating long-form content with parameters: {blog_params}")
|
||||
|
||||
@@ -283,11 +515,20 @@ def process_keywords_input(user_input, search_params, blog_params, selected_cont
|
||||
search_params=search_params,
|
||||
blog_params=blog_params
|
||||
)
|
||||
st.success(f"Successfully generated long-form content for: {user_input}")
|
||||
|
||||
# Show success message briefly then clear it
|
||||
success_msg = st.empty()
|
||||
success_msg.success(f"Successfully generated long-form content for: {user_input}")
|
||||
# Clear the message after 3 seconds
|
||||
import time
|
||||
time.sleep(3)
|
||||
success_msg.empty()
|
||||
|
||||
return True
|
||||
|
||||
else:
|
||||
st.info("AI Agent Team feature is coming soon! This will provide multi-perspective content with different AI experts collaborating on your blog.")
|
||||
info_msg = st.empty()
|
||||
info_msg.info("AI Agent Team feature is coming soon! This will provide multi-perspective content with different AI experts collaborating on your blog.")
|
||||
return False
|
||||
|
||||
except Exception as err:
|
||||
@@ -298,7 +539,10 @@ def process_keywords_input(user_input, search_params, blog_params, selected_cont
|
||||
|
||||
def process_pdf_input(uploaded_file):
|
||||
"""Process a PDF file and generate content."""
|
||||
with st.expander("Processing PDF Document", expanded=True):
|
||||
# Replace expander with a container to avoid nested expanders
|
||||
pdf_container = st.container()
|
||||
with pdf_container:
|
||||
st.subheader("Processing PDF Document")
|
||||
pdf_reader = PyPDF2.PdfReader(uploaded_file)
|
||||
text = ""
|
||||
combined_result = ""
|
||||
@@ -361,22 +605,263 @@ def handle_content_generation(input_type, user_input, uploaded_file, search_para
|
||||
Returns:
|
||||
bool: True if content generation was successful, False otherwise
|
||||
"""
|
||||
with st.spinner("Crafting your blog content... Please wait."):
|
||||
# Create a status placeholder instead of a permanent message
|
||||
status_message = st.empty()
|
||||
status_message.info("Crafting your blog content... Please wait.")
|
||||
|
||||
try:
|
||||
if input_type == "keywords":
|
||||
return process_keywords_input(user_input, search_params, blog_params, selected_content_type)
|
||||
result = process_keywords_input(user_input, search_params, blog_params, selected_content_type)
|
||||
# Clear the status message when done
|
||||
status_message.empty()
|
||||
return result
|
||||
|
||||
elif input_type == "youtube_url" or input_type == "audio_file":
|
||||
return process_youtube_or_audio(user_input)
|
||||
result = process_youtube_or_audio(user_input)
|
||||
status_message.empty()
|
||||
return result
|
||||
|
||||
elif input_type == "web_url":
|
||||
return process_web_url(user_input)
|
||||
result = process_web_url(user_input)
|
||||
status_message.empty()
|
||||
return result
|
||||
|
||||
elif input_type == "image_file":
|
||||
return process_image_input(user_input, uploaded_file)
|
||||
result = process_image_input(user_input, uploaded_file)
|
||||
status_message.empty()
|
||||
return result
|
||||
|
||||
elif input_type == "PDF_file":
|
||||
return process_pdf_input(uploaded_file)
|
||||
result = process_pdf_input(uploaded_file)
|
||||
status_message.empty()
|
||||
return result
|
||||
|
||||
else:
|
||||
status_message.empty()
|
||||
st.error(f"Unsupported input type: {input_type}")
|
||||
return False
|
||||
return False
|
||||
except Exception as e:
|
||||
status_message.empty()
|
||||
st.error(f"An error occurred during content generation: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
def generate_blog_content(search_keywords, google_search_result, tavily_search_result,
|
||||
google_search_success, tavily_search_success, blog_params, status):
|
||||
"""
|
||||
Generate blog content using either Google or Tavily search results.
|
||||
|
||||
Args:
|
||||
search_keywords (str): Search keywords
|
||||
google_search_result: Results from Google search
|
||||
tavily_search_result: Results from Tavily search
|
||||
google_search_success (bool): Whether Google search was successful
|
||||
tavily_search_success (bool): Whether Tavily search was successful
|
||||
blog_params (dict): Blog parameters
|
||||
status: Streamlit status object
|
||||
|
||||
Returns:
|
||||
str: Generated blog content or None if generation failed
|
||||
"""
|
||||
# Check if both searches failed - if so, stop the process
|
||||
if not google_search_success and not tavily_search_success:
|
||||
st.error("⛔ Both Google SERP and Tavily AI searches failed. Unable to generate blog content.")
|
||||
st.warning("Please check your API keys in the environment settings and try again.")
|
||||
return None
|
||||
|
||||
# Try Google results first if available
|
||||
if google_search_success and 'results' in google_search_result:
|
||||
try:
|
||||
status.update(label=f"✏️ Writing blog from Google Search results...")
|
||||
# Pass blog parameters to the blog writing function
|
||||
blog_style_info = f"""
|
||||
Length: {blog_params.get('blog_length')} words
|
||||
Tone: {blog_params.get('blog_tone')}
|
||||
Target Audience: {blog_params.get('blog_demographic')}
|
||||
Blog Type: {blog_params.get('blog_type')}
|
||||
Language: {blog_params.get('blog_language')}
|
||||
"""
|
||||
status.update(label=f"✏️ Writing {blog_params.get('blog_tone')} {blog_params.get('blog_type')} blog for {blog_params.get('blog_demographic')} audience...")
|
||||
blog_markdown_str = write_blog_google_serp(search_keywords, google_search_result['results'], blog_params=blog_params)
|
||||
status.update(label="✅ Generated content from Google search results", state="complete")
|
||||
return blog_markdown_str
|
||||
except Exception as err:
|
||||
status.update(label=f"❌ Failed to generate content from Google results: {str(err)}", state="error")
|
||||
st.error(f"Failed to generate content from Google results: {err}")
|
||||
logger.error(f"Failed to process Google search results: {err}")
|
||||
|
||||
# If Google failed or had no results, try Tavily
|
||||
if tavily_search_success and tavily_search_result:
|
||||
try:
|
||||
status.update(label=f"✏️ Writing blog from Tavily search results...")
|
||||
status.update(label=f"✏️ Writing {blog_params.get('blog_tone')} {blog_params.get('blog_type')} blog for {blog_params.get('blog_demographic')} audience...")
|
||||
blog_markdown_str = write_blog_google_serp(search_keywords, tavily_search_result, blog_params=blog_params)
|
||||
status.update(label="✅ Generated content from Tavily search results", state="complete")
|
||||
return blog_markdown_str
|
||||
except Exception as err:
|
||||
status.update(label=f"❌ Failed to generate content from Tavily results: {str(err)}", state="error")
|
||||
st.error(f"Failed to generate content from Tavily results: {err}")
|
||||
logger.error(f"Failed to process Tavily search results: {err}")
|
||||
|
||||
# If we still don't have content, show error
|
||||
st.error("⛔ Failed to generate any blog content from the research results.")
|
||||
return None
|
||||
|
||||
|
||||
def generate_blog_metadata(blog_markdown_str, search_keywords, status):
|
||||
"""
|
||||
Generate metadata for the blog content.
|
||||
|
||||
Args:
|
||||
blog_markdown_str (str): Blog content
|
||||
search_keywords (str): Original search keywords
|
||||
status: Streamlit status object
|
||||
|
||||
Returns:
|
||||
tuple: (blog_title, blog_meta_desc, blog_tags, blog_categories, blog_hashtags, blog_slug)
|
||||
"""
|
||||
status.update(label="🔍 Generating title, meta description, tags, categories, hashtags, and slug...")
|
||||
try:
|
||||
# Get all 6 metadata values from blog_metadata
|
||||
blog_title, blog_meta_desc, blog_tags, blog_categories, blog_hashtags, blog_slug = asyncio.run(blog_metadata(blog_markdown_str))
|
||||
status.update(label="✅ Generated blog metadata successfully")
|
||||
return blog_title, blog_meta_desc, blog_tags, blog_categories, blog_hashtags, blog_slug
|
||||
except Exception as err:
|
||||
st.error(f"Failed to get blog metadata: {err}")
|
||||
logger.error(f"Failed to get blog metadata: {err}")
|
||||
status.update(label="❌ Failed to get blog metadata", state="error")
|
||||
return None, None, None, None, None, None
|
||||
|
||||
|
||||
def generate_blog_image(blog_title, blog_meta_desc, blog_markdown_str, status, blog_tags=None):
|
||||
"""
|
||||
Generate a featured image for the blog.
|
||||
|
||||
Args:
|
||||
blog_title (str): Blog title
|
||||
blog_meta_desc (str): Blog meta description
|
||||
blog_markdown_str (str): Blog content
|
||||
status: Streamlit status object
|
||||
blog_tags (list, optional): Blog tags to use for image prompt enhancement
|
||||
|
||||
Returns:
|
||||
str: Path to the generated image or None if generation failed
|
||||
"""
|
||||
try:
|
||||
status.update(label="🖼️ Generating featured image for blog...")
|
||||
|
||||
# Create a better prompt for image generation
|
||||
if blog_title and blog_meta_desc:
|
||||
# If we have both title and description, use them
|
||||
text_to_image = f"{blog_title}: {blog_meta_desc}"
|
||||
elif blog_title:
|
||||
# If we only have title, use it
|
||||
text_to_image = blog_title
|
||||
elif blog_meta_desc:
|
||||
# If we only have description, use it
|
||||
text_to_image = blog_meta_desc
|
||||
else:
|
||||
# Fallback to first 200 chars of content
|
||||
text_to_image = blog_markdown_str[:200]
|
||||
|
||||
# Ensure the prompt is of reasonable length
|
||||
if len(text_to_image) > 300:
|
||||
text_to_image = text_to_image[:300]
|
||||
|
||||
# Log the prompt being used
|
||||
logger.info(f"Generating image with prompt: {text_to_image}")
|
||||
status.update(label=f"🖼️ Creating image with prompt: \"{text_to_image[:50]}...\"")
|
||||
|
||||
# Extract blog tags if available
|
||||
blog_tags_list = blog_tags if isinstance(blog_tags, list) else []
|
||||
|
||||
# Attempt image generation with all available parameters
|
||||
generated_image_filepath = generate_image(
|
||||
user_prompt=text_to_image,
|
||||
title=blog_title,
|
||||
description=blog_meta_desc,
|
||||
tags=blog_tags_list,
|
||||
content=blog_markdown_str[:2000] # Limit content length to avoid too large payloads
|
||||
)
|
||||
|
||||
# If first attempt failed, try with a simplified prompt
|
||||
if not generated_image_filepath:
|
||||
logger.warning("First image generation attempt failed, trying with simplified prompt")
|
||||
status.update(label="⚠️ First image attempt failed, trying again with simplified prompt...")
|
||||
|
||||
# Create a simpler prompt
|
||||
simplified_prompt = " ".join(text_to_image.split()[:10])
|
||||
generated_image_filepath = generate_image(
|
||||
user_prompt=simplified_prompt,
|
||||
title=blog_title,
|
||||
description=blog_meta_desc,
|
||||
tags=blog_tags_list,
|
||||
content=blog_markdown_str[:1000] # Use even shorter content for the retry
|
||||
)
|
||||
|
||||
if generated_image_filepath:
|
||||
status.update(label="✅ Successfully generated featured image")
|
||||
return generated_image_filepath
|
||||
else:
|
||||
status.update(label="❌ Image generation failed - no image created", state="error")
|
||||
return None
|
||||
|
||||
except Exception as err:
|
||||
st.warning(f"Failed in Image generation: {err}")
|
||||
logger.error(f"Failed in Image generation: {err}")
|
||||
status.update(label="❌ Image generation failed - no image created", state="error")
|
||||
return None
|
||||
|
||||
|
||||
def regenerate_blog_image(blog_title, blog_meta_desc, blog_markdown_str, blog_tags=None):
|
||||
"""
|
||||
Regenerate a blog image on demand.
|
||||
|
||||
Args:
|
||||
blog_title (str): Blog title
|
||||
blog_meta_desc (str): Blog meta description
|
||||
blog_markdown_str (str): Blog content
|
||||
blog_tags (list, optional): Blog tags to use for image prompt enhancement
|
||||
|
||||
Returns:
|
||||
str: Path to the generated image or None if generation failed
|
||||
"""
|
||||
with st.status("Regenerating image...", expanded=True) as status:
|
||||
try:
|
||||
# Use keywords from title or description
|
||||
if blog_title:
|
||||
keywords = " ".join(blog_title.split()[:6])
|
||||
prompt = f"Blog illustration for: {keywords}"
|
||||
elif blog_meta_desc:
|
||||
keywords = " ".join(blog_meta_desc.split()[:6])
|
||||
prompt = f"Blog illustration for: {keywords}"
|
||||
else:
|
||||
keywords = blog_markdown_str.split()[:50]
|
||||
prompt = f"Blog illustration based on: {' '.join(keywords[:6])}"
|
||||
|
||||
status.update(label=f"🖼️ Generating new image with prompt: \"{prompt}\"")
|
||||
|
||||
# Extract any tags if available - will be passed as empty list otherwise
|
||||
blog_tags_list = blog_tags if isinstance(blog_tags, list) else []
|
||||
|
||||
# Generate the image with all parameters
|
||||
generated_image_filepath = generate_image(
|
||||
user_prompt=prompt,
|
||||
title=blog_title,
|
||||
description=blog_meta_desc,
|
||||
tags=blog_tags_list,
|
||||
content=blog_markdown_str[:2000] # Limit content length to avoid too large payloads
|
||||
)
|
||||
|
||||
if generated_image_filepath:
|
||||
status.update(label="✅ Successfully generated new image", state="complete")
|
||||
return generated_image_filepath
|
||||
else:
|
||||
status.update(label="❌ Image regeneration failed", state="error")
|
||||
return None
|
||||
|
||||
except Exception as err:
|
||||
st.error(f"Failed to regenerate image: {err}")
|
||||
logger.error(f"Image regeneration error: {err}")
|
||||
status.update(label="❌ Image regeneration failed", state="error")
|
||||
return None
|
||||
420
lib/ai_writers/ai_blog_writer/blog_ai_research_utils.py
Normal file
420
lib/ai_writers/ai_blog_writer/blog_ai_research_utils.py
Normal file
@@ -0,0 +1,420 @@
|
||||
import sys
|
||||
import os
|
||||
import streamlit as st
|
||||
from loguru import logger
|
||||
from dotenv import load_dotenv
|
||||
from pathlib import Path
|
||||
import time
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv(Path('../../../.env'))
|
||||
|
||||
# Import necessary modules
|
||||
from ...ai_web_researcher.gpt_online_researcher import (
|
||||
do_google_serp_search as gpt_do_google_serp_search,
|
||||
do_tavily_ai_search as gpt_do_tavily_ai_search
|
||||
)
|
||||
from ...ai_web_researcher.tavily_ai_search import do_tavily_ai_search as tavily_direct_search
|
||||
|
||||
|
||||
def initialize_parameters(search_params=None, blog_params=None):
|
||||
"""
|
||||
Initialize and validate search and blog parameters with defaults.
|
||||
|
||||
Args:
|
||||
search_params (dict, optional): Search parameters
|
||||
blog_params (dict, optional): Blog parameters
|
||||
|
||||
Returns:
|
||||
tuple: (search_params, blog_params) with defaults applied
|
||||
"""
|
||||
# Initialize search params if not provided
|
||||
if search_params is None:
|
||||
search_params = {}
|
||||
|
||||
# Initialize blog params if not provided
|
||||
if blog_params is None:
|
||||
blog_params = {}
|
||||
|
||||
# Provide default values only for missing keys
|
||||
# This ensures we don't override values that were intentionally set to 0 or other falsy values
|
||||
if "max_results" not in search_params:
|
||||
search_params["max_results"] = 10
|
||||
if "search_depth" not in search_params:
|
||||
search_params["search_depth"] = "basic"
|
||||
if "time_range" not in search_params:
|
||||
search_params["time_range"] = "year"
|
||||
if "include_domains" not in search_params:
|
||||
search_params["include_domains"] = []
|
||||
|
||||
# Provide default values only for missing blog parameter keys
|
||||
if "blog_length" not in blog_params:
|
||||
blog_params["blog_length"] = 2000
|
||||
if "blog_tone" not in blog_params:
|
||||
blog_params["blog_tone"] = "Professional"
|
||||
if "blog_demographic" not in blog_params:
|
||||
blog_params["blog_demographic"] = "Professional"
|
||||
if "blog_type" not in blog_params:
|
||||
blog_params["blog_type"] = "Informational"
|
||||
if "blog_language" not in blog_params:
|
||||
blog_params["blog_language"] = "English"
|
||||
if "blog_output_format" not in blog_params:
|
||||
blog_params["blog_output_format"] = "markdown"
|
||||
|
||||
# Log the parameters for debugging
|
||||
logger.info(f"Using search parameters: {search_params}")
|
||||
logger.info(f"Using blog parameters: {blog_params}")
|
||||
|
||||
return search_params, blog_params
|
||||
|
||||
|
||||
def perform_google_search(search_keywords, search_params, status, status_container, progress_bar):
|
||||
"""
|
||||
Perform Google SERP search for the given keywords.
|
||||
|
||||
Args:
|
||||
search_keywords (str): Keywords to search for
|
||||
search_params (dict): Search parameters
|
||||
status: Streamlit status object
|
||||
status_container: Streamlit container for status messages
|
||||
progress_bar: Streamlit progress bar
|
||||
|
||||
Returns:
|
||||
tuple: (google_search_result, g_titles, success_flag)
|
||||
"""
|
||||
def update_progress(message, progress=None, level="info"):
|
||||
"""Helper function to update progress in Streamlit UI"""
|
||||
if progress is not None:
|
||||
progress_bar.progress(progress)
|
||||
|
||||
if level == "error":
|
||||
status_container.error(f"🚫 {message}")
|
||||
elif level == "warning":
|
||||
status_container.warning(f"⚠️ {message}")
|
||||
elif level == "success":
|
||||
status_container.success(f"✅ {message}")
|
||||
else:
|
||||
status_container.info(f"🔄 {message}")
|
||||
logger.debug(f"Progress update [{level}]: {message}")
|
||||
|
||||
try:
|
||||
# Update the function call to include the required parameters and search_params
|
||||
status.update(label=f"Starting Google SERP search for: {search_keywords}")
|
||||
|
||||
# Add search params to the Google SERP search
|
||||
google_search_params = {
|
||||
"max_results": search_params.get("max_results", 10)
|
||||
}
|
||||
|
||||
# Include domains if provided
|
||||
if search_params.get("include_domains"):
|
||||
google_search_params["include_domains"] = search_params.get("include_domains")
|
||||
|
||||
google_search_result = do_google_serp_search(
|
||||
search_keywords,
|
||||
status_container=status_container,
|
||||
update_progress=update_progress,
|
||||
**google_search_params
|
||||
)
|
||||
|
||||
if google_search_result and google_search_result.get('titles') and len(google_search_result.get('titles', [])) > 0:
|
||||
status.update(label=f"✅ Finished with Google web for Search: {search_keywords}")
|
||||
g_titles = google_search_result.get('titles', [])
|
||||
return google_search_result, g_titles, True
|
||||
else:
|
||||
# Check if there's an error message in the result
|
||||
if google_search_result and 'summary' in google_search_result and 'Error' in google_search_result['summary']:
|
||||
error_msg = google_search_result['summary']
|
||||
status.update(label=f"❌ Google search failed: {error_msg}", state="error")
|
||||
st.error(f"Google SERP search failed: {error_msg}")
|
||||
else:
|
||||
status.update(label="❌ Failed to get Google SERP results. No valid data returned.", state="error")
|
||||
st.error("Google SERP search failed to return valid results.")
|
||||
return google_search_result, [], False
|
||||
except Exception as err:
|
||||
status.update(label=f"❌ Google search error: {str(err)}", state="error")
|
||||
st.error(f"Google web research failed: {err}")
|
||||
logger.error(f"Failed in Google web research: {err}")
|
||||
return None, [], False
|
||||
|
||||
|
||||
def perform_tavily_search(search_keywords, search_params, status):
|
||||
"""
|
||||
Perform Tavily AI search for the given keywords.
|
||||
|
||||
Args:
|
||||
search_keywords (str): Keywords to search for
|
||||
search_params (dict): Search parameters
|
||||
status: Streamlit status object
|
||||
|
||||
Returns:
|
||||
tuple: (tavily_search_result, success_flag)
|
||||
"""
|
||||
try:
|
||||
status.update(label=f"🔍 Starting Tavily AI research: {search_keywords}")
|
||||
|
||||
# Pass the search parameters to Tavily
|
||||
tavily_result_tuple = do_tavily_ai_search(
|
||||
search_keywords,
|
||||
max_results=search_params.get("max_results", 10),
|
||||
search_depth=search_params.get("search_depth", "basic"),
|
||||
include_domains=search_params.get("include_domains", []),
|
||||
time_range=search_params.get("time_range", "year")
|
||||
)
|
||||
|
||||
if tavily_result_tuple and len(tavily_result_tuple) == 3:
|
||||
tavily_search_result, t_titles, t_answer = tavily_result_tuple
|
||||
# If we have either titles or an answer, consider it a success
|
||||
if (t_titles and len(t_titles) > 0) or (t_answer and len(t_answer) > 10):
|
||||
status.update(label=f"✅ Finished Tavily AI Search on: {search_keywords}", state="complete")
|
||||
return tavily_search_result, True
|
||||
else:
|
||||
status.update(label="❌ Tavily search returned empty results", state="error")
|
||||
st.warning("Tavily search didn't find relevant information.")
|
||||
return tavily_search_result, False
|
||||
else:
|
||||
status.update(label="❌ Tavily search returned incomplete results", state="error")
|
||||
st.error("Tavily search failed to return valid results.")
|
||||
return None, False
|
||||
|
||||
except Exception as err:
|
||||
status.update(label=f"❌ Tavily search error: {str(err)}", state="error")
|
||||
st.error(f"Failed in Tavily web research: {err}")
|
||||
logger.error(f"Failed in Tavily web research: {err}")
|
||||
return None, False
|
||||
|
||||
|
||||
def do_google_serp_search(search_keywords, status_container=None, update_progress=None, **kwargs):
|
||||
"""
|
||||
Wrapper function to handle the parameter mismatch with the original function.
|
||||
"""
|
||||
try:
|
||||
if status_container is None:
|
||||
status_container = st.empty()
|
||||
|
||||
if update_progress is None:
|
||||
def update_progress(message, progress=None, level="info"):
|
||||
if level == "error":
|
||||
status_container.error(message)
|
||||
elif level == "warning":
|
||||
status_container.warning(message)
|
||||
else:
|
||||
status_container.info(message)
|
||||
|
||||
# Create a fixed update_progress function that handles any progress type
|
||||
def safe_update_progress(message, progress=None, level="info"):
|
||||
try:
|
||||
# Handle progress value of different types
|
||||
if progress is not None:
|
||||
if isinstance(progress, str):
|
||||
# Try to convert string to float if it represents a number
|
||||
try:
|
||||
progress = float(progress)
|
||||
except ValueError:
|
||||
# If conversion fails, just log the message without updating progress
|
||||
progress = None
|
||||
|
||||
# Call the original update_progress with sanitized values
|
||||
update_progress(message, progress, level)
|
||||
except Exception as err:
|
||||
# If there's an error in the progress function, just log to console
|
||||
logger.error(f"Error in progress update: {err}")
|
||||
# Try one more time with just the message
|
||||
try:
|
||||
update_progress(message, None, level)
|
||||
except:
|
||||
pass
|
||||
|
||||
# Set default search parameters - fix the parameter to use 'max_results' not 'num_results'
|
||||
search_params = {
|
||||
"max_results": kwargs.get("max_results", 10),
|
||||
"include_domains": kwargs.get("include_domains", []),
|
||||
"search_depth": kwargs.get("search_depth", "basic")
|
||||
}
|
||||
|
||||
# Update status to indicate we're checking API keys
|
||||
status_container.info("🔑 Checking required API keys...")
|
||||
|
||||
# Call the original function with the required parameters
|
||||
result = gpt_do_google_serp_search(search_keywords, status_container, safe_update_progress, **search_params)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
logger.error(f"Error in do_google_serp_search wrapper: {error_msg}")
|
||||
|
||||
# Check for common error patterns and display user-friendly messages
|
||||
if "SERPER_API_KEY is missing" in error_msg:
|
||||
status_container.error("🔑 Google search API key (SERPER_API_KEY) is missing. Please check your environment settings.")
|
||||
st.error("Google SERP search failed: API key is missing. Using alternative methods.")
|
||||
elif "Progress Value has invalid type" in error_msg:
|
||||
# This is an internal error, log it but show a more user-friendly message
|
||||
status_container.warning("⚠️ Internal progress tracking error. Continuing with search.")
|
||||
else:
|
||||
# For unknown errors, show the full error message
|
||||
status_container.error(f"🚫 Google search error: {error_msg}")
|
||||
st.error(f"Google SERP search failed: {error_msg}")
|
||||
|
||||
# Return a minimal result structure to prevent downstream errors
|
||||
return {
|
||||
'results': {},
|
||||
'titles': [],
|
||||
'summary': f"Error occurred during search: {error_msg}",
|
||||
'stats': {
|
||||
'organic_count': 0,
|
||||
'questions_count': 0,
|
||||
'related_count': 0
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def do_tavily_ai_search(keywords, max_results=10, search_depth="basic", include_domains=None, time_range="year"):
|
||||
"""
|
||||
Wrapper function for Tavily search to handle parameter differences.
|
||||
|
||||
Args:
|
||||
keywords (str): Keywords to search for
|
||||
max_results (int): Maximum number of search results to return
|
||||
search_depth (str): "basic" or "advanced" search depth
|
||||
include_domains (list): List of domains to prioritize in search
|
||||
time_range (str): Time range for results ("day", "week", "month", "year", "all")
|
||||
"""
|
||||
status_container = st.empty()
|
||||
|
||||
if include_domains is None:
|
||||
include_domains = []
|
||||
|
||||
try:
|
||||
# Show status message
|
||||
status_container.info(f"🔍 Preparing Tavily AI search with {search_depth} depth...")
|
||||
|
||||
# FIXED: Ensure all parameters have correct types to prevent comparison errors
|
||||
tavily_params = {
|
||||
'max_results': int(max_results), # Explicitly convert to int
|
||||
'search_depth': str(search_depth), # Ensure this is a string
|
||||
'include_domains': include_domains,
|
||||
'time_range': str(time_range)
|
||||
}
|
||||
|
||||
# Log the parameters for debugging
|
||||
logger.info(f"Tavily search parameters: {tavily_params}")
|
||||
|
||||
# Check for API key before making the request
|
||||
tavily_api_key = os.environ.get("TAVILY_API_KEY")
|
||||
if not tavily_api_key:
|
||||
status_container.error("🔑 Tavily API key (TAVILY_API_KEY) is missing. Please check your environment settings.")
|
||||
st.error("Tavily search failed: API key is missing. Using alternative methods.")
|
||||
return None, [], "API key missing"
|
||||
|
||||
status_container.info(f"🔍 Searching with Tavily AI using {search_depth} depth for: {keywords}")
|
||||
|
||||
# Direct implementation without calling gpt_do_tavily_ai_search to avoid type issues
|
||||
try:
|
||||
# Call the function directly with correct parameter types
|
||||
tavily_raw_results = tavily_direct_search(
|
||||
keywords,
|
||||
max_results=tavily_params['max_results'],
|
||||
search_depth=tavily_params['search_depth'],
|
||||
include_domains=tavily_params['include_domains'],
|
||||
time_range=tavily_params['time_range']
|
||||
)
|
||||
|
||||
# Extract the needed information
|
||||
if isinstance(tavily_raw_results, tuple) and len(tavily_raw_results) == 3:
|
||||
# If already in the right format, use it directly
|
||||
return tavily_raw_results
|
||||
|
||||
# Process the results to extract titles and answer
|
||||
t_results = tavily_raw_results
|
||||
t_titles = []
|
||||
t_answer = ""
|
||||
|
||||
# Extract titles from results if available
|
||||
if isinstance(t_results, dict):
|
||||
if 'results' in t_results and isinstance(t_results['results'], list):
|
||||
t_titles = [r.get('title', '') for r in t_results['results']]
|
||||
status_container.success(f"✅ Found {len(t_titles)} relevant articles")
|
||||
if 'answer' in t_results:
|
||||
t_answer = t_results['answer']
|
||||
status_container.success("✅ Generated a summary answer")
|
||||
|
||||
return t_results, t_titles, t_answer
|
||||
|
||||
except ImportError:
|
||||
# Fall back to the original function if direct import fails
|
||||
status_container.warning("⚠️ Using fallback Tavily search method...")
|
||||
logger.warning("Using fallback Tavily search method")
|
||||
|
||||
# FIXED: Alternative approach - wrap the call in try/except to handle type errors
|
||||
try:
|
||||
tavily_result = gpt_do_tavily_ai_search(keywords, **tavily_params)
|
||||
|
||||
# Format the result to match what the blog writer expects
|
||||
if isinstance(tavily_result, tuple) and len(tavily_result) == 3:
|
||||
status_container.success("✅ Tavily search completed successfully")
|
||||
return tavily_result
|
||||
|
||||
# If not a tuple with expected values, try to extract what we need
|
||||
t_results = tavily_result
|
||||
|
||||
# Extract titles and answer if available
|
||||
t_titles = []
|
||||
t_answer = ""
|
||||
|
||||
if isinstance(t_results, dict):
|
||||
if 'results' in t_results and isinstance(t_results['results'], list):
|
||||
t_titles = [r.get('title', '') for r in t_results['results']]
|
||||
status_container.success(f"✅ Found {len(t_titles)} relevant articles")
|
||||
if 'answer' in t_results:
|
||||
t_answer = t_results['answer']
|
||||
status_container.success("✅ Generated a summary answer")
|
||||
|
||||
return t_results, t_titles, t_answer
|
||||
|
||||
except TypeError as type_err:
|
||||
# Handle the specific type error more gracefully
|
||||
error_msg = str(type_err)
|
||||
logger.error(f"Type error in Tavily search: {error_msg}")
|
||||
|
||||
if "'>' not supported" in error_msg:
|
||||
status_container.error("🚫 Tavily search parameter type error. Trying alternative approach...")
|
||||
|
||||
# Try a simpler approach with minimal parameters
|
||||
try:
|
||||
# Call with only the keyword and fixed max_results
|
||||
tavily_result = gpt_do_tavily_ai_search(keywords, max_results=10)
|
||||
|
||||
# Minimal processing to extract titles and answer
|
||||
t_results = tavily_result
|
||||
t_titles = []
|
||||
t_answer = ""
|
||||
|
||||
if isinstance(t_results, dict):
|
||||
if 'results' in t_results and isinstance(t_results['results'], list):
|
||||
t_titles = [r.get('title', '') for r in t_results['results']]
|
||||
if 'answer' in t_results:
|
||||
t_answer = t_results['answer']
|
||||
|
||||
return t_results, t_titles, t_answer
|
||||
except Exception as inner_err:
|
||||
logger.error(f"Alternative Tavily approach also failed: {inner_err}")
|
||||
raise
|
||||
else:
|
||||
# Re-raise other type errors
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
logger.error(f"Error in do_tavily_ai_search wrapper: {error_msg}")
|
||||
|
||||
# Display user-friendly error message
|
||||
status_container.error(f"🚫 Tavily search error: {error_msg}")
|
||||
st.error(f"Tavily AI search failed: {error_msg}")
|
||||
|
||||
# Return empty results to prevent downstream errors
|
||||
return None, [], f"Error: {error_msg}"
|
||||
|
||||
finally:
|
||||
# Clear the status container after a delay
|
||||
time.sleep(2)
|
||||
status_container.empty()
|
||||
@@ -10,7 +10,7 @@ logger.add(sys.stdout,
|
||||
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}"
|
||||
)
|
||||
|
||||
from ..gpt_providers.text_generation.main_text_generation import llm_text_gen
|
||||
from ...gpt_providers.text_generation.main_text_generation import llm_text_gen
|
||||
|
||||
|
||||
def write_blog_google_serp(keywords, search_results, blog_params=None):
|
||||
872
lib/ai_writers/ai_blog_writer/keywords_to_blog_streamlit.py
Normal file
872
lib/ai_writers/ai_blog_writer/keywords_to_blog_streamlit.py
Normal file
@@ -0,0 +1,872 @@
|
||||
import sys
|
||||
import os
|
||||
import asyncio
|
||||
from textwrap import dedent
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
import streamlit as st
|
||||
from gtts import gTTS
|
||||
import base64
|
||||
from dotenv import load_dotenv
|
||||
import time
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv(Path('../../.env'))
|
||||
# Logger setup
|
||||
from loguru import logger
|
||||
logger.remove()
|
||||
logger.add(sys.stdout,
|
||||
colorize=True,
|
||||
format="<level>{level}</level>|<green>{file}:{line}:{function}</green>| {message}")
|
||||
|
||||
# Import other necessary modules
|
||||
from ...ai_web_researcher.gpt_online_researcher import (
|
||||
do_metaphor_ai_research, do_google_pytrends_analysis)
|
||||
from .blog_from_google_serp import write_blog_google_serp, blog_with_research
|
||||
from ...blog_metadata.get_blog_metadata import blog_metadata
|
||||
from ...blog_postprocessing.save_blog_to_file import save_blog_to_file
|
||||
from ...gpt_providers.text_to_image_generation.main_generate_image_from_prompt import generate_image
|
||||
from ...ai_seo_tools.content_title_generator import generate_blog_titles
|
||||
from ...ai_seo_tools.meta_desc_generator import generate_blog_metadesc
|
||||
from ...ai_seo_tools.seo_structured_data import ai_structured_data
|
||||
|
||||
# Import search functions from the research utils module
|
||||
from .blog_ai_research_utils import (
|
||||
initialize_parameters,
|
||||
perform_google_search,
|
||||
perform_tavily_search,
|
||||
do_google_serp_search,
|
||||
do_tavily_ai_search
|
||||
)
|
||||
|
||||
# REMOVED CIRCULAR IMPORTS
|
||||
# Import content and image generation functions from the generator utils module
|
||||
# from .ai_blog_generator_utils import (
|
||||
# generate_blog_content,
|
||||
# generate_blog_metadata,
|
||||
# generate_blog_image,
|
||||
# regenerate_blog_image
|
||||
# )
|
||||
|
||||
def save_blog_content(blog_markdown_str, blog_title, blog_meta_desc, blog_tags, blog_categories, generated_image_filepath, status, blog_hashtags=None, blog_slug=None):
|
||||
"""
|
||||
Save the blog content to a file.
|
||||
|
||||
Args:
|
||||
blog_markdown_str (str): Blog content
|
||||
blog_title (str): Blog title
|
||||
blog_meta_desc (str): Blog meta description
|
||||
blog_tags (list): Blog tags
|
||||
blog_categories (list): Blog categories
|
||||
generated_image_filepath (str): Path to the generated image
|
||||
status: Streamlit status object
|
||||
blog_hashtags (str, optional): Social media hashtags
|
||||
blog_slug (str, optional): SEO-friendly URL slug
|
||||
|
||||
Returns:
|
||||
str: Path to the saved file or None if saving failed
|
||||
"""
|
||||
try:
|
||||
status.update(label="💾 Saving blog content to file...")
|
||||
saved_blog_to_file = save_blog_to_file(blog_markdown_str, blog_title, blog_meta_desc,
|
||||
blog_tags, blog_categories, generated_image_filepath)
|
||||
status.update(label=f"✅ Saved the content to: {saved_blog_to_file}")
|
||||
return saved_blog_to_file
|
||||
except Exception as err:
|
||||
st.error(f"Failed to save blog to file: {err}")
|
||||
logger.error(f"Failed to save blog to file: {err}")
|
||||
status.update(label="❌ Failed to save blog to file", state="error")
|
||||
return None
|
||||
|
||||
|
||||
def generate_audio_version(blog_markdown_str, status=None):
|
||||
"""
|
||||
Generate an audio version of the blog content.
|
||||
|
||||
Args:
|
||||
blog_markdown_str (str): Blog content
|
||||
status: Streamlit status object (optional)
|
||||
|
||||
Returns:
|
||||
bool: True if audio generation was successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
if status:
|
||||
status.update(label="🔊 Generating audio version of the blog...")
|
||||
else:
|
||||
st.info("🔊 Generating audio version...")
|
||||
|
||||
# Only generate audio for reasonable-sized blogs (to avoid errors with very large text)
|
||||
if blog_markdown_str and len(blog_markdown_str) < 50000: # Max ~50KB of text
|
||||
tts = gTTS(text=blog_markdown_str[:40000], lang='en', slow=False) # Use first 40K chars to be safe
|
||||
tts.save("delete_me.mp3")
|
||||
st.audio("delete_me.mp3")
|
||||
st.download_button(
|
||||
label="📥 Download Audio File",
|
||||
data=open("delete_me.mp3", "rb").read(),
|
||||
file_name="blog_audio.mp3",
|
||||
mime="audio/mp3"
|
||||
)
|
||||
if status:
|
||||
status.update(label="✅ Audio version generated successfully", state="complete")
|
||||
else:
|
||||
st.success("✅ Audio version generated successfully")
|
||||
return True
|
||||
else:
|
||||
st.warning("Blog content too large for audio generation")
|
||||
if status:
|
||||
status.update(label="⚠️ Blog content too large for audio generation", state="complete")
|
||||
return False
|
||||
except Exception as err:
|
||||
st.warning(f"Failed to generate audio version: {err}")
|
||||
logger.error(f"Failed to generate audio version: {err}")
|
||||
if status:
|
||||
status.update(label="❌ Failed to generate audio version", state="error")
|
||||
return False
|
||||
|
||||
|
||||
# Helper functions for write_blog_from_keywords
|
||||
def setup_progress_tracking():
|
||||
"""Set up progress tracking elements for blog generation."""
|
||||
# Create a placeholder for the final blog content
|
||||
final_content_placeholder = st.empty()
|
||||
|
||||
# Create progress tracking
|
||||
progress_placeholder = st.empty()
|
||||
with progress_placeholder.container():
|
||||
progress_bar = st.progress(0)
|
||||
status_text = st.empty()
|
||||
|
||||
def update_progress(step, total_steps, message):
|
||||
"""Update the progress bar and status message"""
|
||||
progress_value = min(step / total_steps, 1.0)
|
||||
progress_bar.progress(progress_value)
|
||||
status_text.info(f"Step {step}/{total_steps}: {message}")
|
||||
|
||||
# When process is complete, clear the progress info
|
||||
if step == total_steps:
|
||||
import time
|
||||
time.sleep(3) # Show the complete message for 3 seconds
|
||||
progress_bar.empty()
|
||||
status_text.empty()
|
||||
|
||||
return final_content_placeholder, progress_placeholder, progress_bar, status_text, update_progress
|
||||
|
||||
|
||||
def perform_research_phase(search_keywords, search_params, update_progress):
|
||||
"""
|
||||
Perform the research phase of blog generation.
|
||||
|
||||
Args:
|
||||
search_keywords (str): Keywords to research
|
||||
search_params (dict): Search parameters
|
||||
update_progress (function): Function to update progress
|
||||
|
||||
Returns:
|
||||
tuple: Google search results, Tavily search results, success flags, and blog titles
|
||||
"""
|
||||
update_progress(1, 5, f"Starting web research on '{search_keywords}'")
|
||||
logger.info(f"Researching and Writing Blog on keywords: {search_keywords}")
|
||||
|
||||
# Create a section header for the research phase
|
||||
st.subheader("🔍 Web Research Progress")
|
||||
|
||||
# Use a collapsible expander for research details
|
||||
with st.expander("Research Details", expanded=True):
|
||||
example_blog_titles = []
|
||||
|
||||
# Create a status element for research updates
|
||||
with st.status("Web research in progress...", expanded=True) as status:
|
||||
status.update(label=f"📊 Performing web research on: {search_keywords}")
|
||||
|
||||
# Create status container and progress tracking for Google SERP
|
||||
status_container = st.empty()
|
||||
research_progress = st.progress(0)
|
||||
|
||||
# Google Search
|
||||
status.update(label="🔍 Performing Google search...")
|
||||
google_search_result, g_titles, google_search_success = perform_google_search(
|
||||
search_keywords, search_params, status, status_container, research_progress
|
||||
)
|
||||
if g_titles:
|
||||
example_blog_titles.append(g_titles)
|
||||
status.update(label=f"✅ Google search complete - found {len(g_titles)} relevant resources")
|
||||
else:
|
||||
status.update(label="⚠️ Google search yielded limited results")
|
||||
|
||||
# Tavily Search
|
||||
status.update(label="🔍 Performing Tavily AI search...")
|
||||
tavily_search_result, tavily_search_success = perform_tavily_search(
|
||||
search_keywords, search_params, status
|
||||
)
|
||||
|
||||
if tavily_search_success:
|
||||
status.update(label="✅ Tavily AI search complete", state="complete")
|
||||
elif google_search_success:
|
||||
status.update(label="⚠️ Tavily search had issues, but Google search was successful")
|
||||
else:
|
||||
status.update(label="❌ Both search methods encountered issues", state="error")
|
||||
|
||||
# Clear the progress indicators
|
||||
status_container.empty()
|
||||
research_progress.empty()
|
||||
|
||||
return google_search_result, tavily_search_result, google_search_success, tavily_search_success, example_blog_titles
|
||||
|
||||
|
||||
def generate_content_phase(search_keywords, google_search_result, tavily_search_result,
|
||||
google_search_success, tavily_search_success, blog_params, update_progress):
|
||||
"""
|
||||
Generate blog content from research results.
|
||||
|
||||
Args:
|
||||
search_keywords (str): Keywords to research
|
||||
google_search_result: Results from Google search
|
||||
tavily_search_result: Results from Tavily search
|
||||
google_search_success (bool): Whether Google search was successful
|
||||
tavily_search_success (bool): Whether Tavily search was successful
|
||||
blog_params (dict): Blog parameters
|
||||
update_progress (function): Function to update progress
|
||||
|
||||
Returns:
|
||||
str: Generated blog content or None if generation failed
|
||||
"""
|
||||
# Import content generation function here to avoid circular import
|
||||
from .ai_blog_generator_utils import generate_blog_content
|
||||
|
||||
update_progress(2, 5, "Generating blog content from research")
|
||||
|
||||
# Create a section header for the content generation phase
|
||||
st.subheader("✍️ Content Generation Progress")
|
||||
|
||||
# Use a collapsible expander for content generation details
|
||||
with st.expander("Content Generation Details", expanded=True):
|
||||
# Create a status element for content generation updates
|
||||
with st.status("Content generation in progress...", expanded=True) as status:
|
||||
if google_search_success:
|
||||
source = "Google search results"
|
||||
else:
|
||||
source = "Tavily AI research"
|
||||
|
||||
status.update(label=f"📝 Creating {blog_params.get('blog_tone')} {blog_params.get('blog_type')} content for {blog_params.get('blog_demographic')} audience...")
|
||||
|
||||
blog_markdown_str = generate_blog_content(
|
||||
search_keywords, google_search_result, tavily_search_result,
|
||||
google_search_success, tavily_search_success, blog_params, status
|
||||
)
|
||||
|
||||
if blog_markdown_str:
|
||||
status.update(label=f"✅ Successfully generated ~{len(blog_markdown_str.split())} words of content using {source}", state="complete")
|
||||
else:
|
||||
status.update(label="❌ Content generation failed", state="error")
|
||||
|
||||
return blog_markdown_str
|
||||
|
||||
|
||||
def generate_metadata_and_image(blog_markdown_str, search_keywords, blog_tags, update_progress):
|
||||
"""
|
||||
Generate metadata and featured image for the blog.
|
||||
|
||||
Args:
|
||||
blog_markdown_str (str): Blog content
|
||||
search_keywords (str): Keywords used for research
|
||||
blog_tags (list): Blog tags
|
||||
update_progress (function): Function to update progress
|
||||
|
||||
Returns:
|
||||
tuple: Blog metadata and image filepath
|
||||
"""
|
||||
# Import metadata and image generation functions here to avoid circular import
|
||||
from .ai_blog_generator_utils import generate_blog_metadata, generate_blog_image
|
||||
|
||||
update_progress(3, 5, "Generating SEO metadata and enhancements")
|
||||
|
||||
# Create a section header for the enhancement phase
|
||||
st.subheader("🔍 SEO & Enhancement Progress")
|
||||
|
||||
# Use a collapsible expander for enhancement details
|
||||
with st.expander("Enhancement Details", expanded=True):
|
||||
blog_title = None
|
||||
blog_meta_desc = None
|
||||
blog_categories = None
|
||||
blog_hashtags = None
|
||||
blog_slug = None
|
||||
generated_image_filepath = None
|
||||
saved_blog_to_file = None
|
||||
|
||||
# Create a status element for enhancement updates
|
||||
with st.status("Enhancing content...", expanded=True) as status:
|
||||
# Generate metadata
|
||||
status.update(label="🏷️ Generating SEO metadata (title, description, tags)...")
|
||||
blog_title, blog_meta_desc, blog_tags, blog_categories, blog_hashtags, blog_slug = generate_blog_metadata(
|
||||
blog_markdown_str, search_keywords, status
|
||||
)
|
||||
|
||||
# Check if there are updated values in session state
|
||||
if 'blog_title' in st.session_state:
|
||||
blog_title = st.session_state.blog_title
|
||||
status.update(label=f"✅ Using refined title: \"{blog_title}\"")
|
||||
|
||||
if 'blog_meta_desc' in st.session_state:
|
||||
blog_meta_desc = st.session_state.blog_meta_desc
|
||||
status.update(label=f"✅ Using refined meta description")
|
||||
|
||||
if blog_title and blog_meta_desc:
|
||||
status.update(label=f"✅ Generated metadata: \"{blog_title}\"")
|
||||
|
||||
# Generate featured image
|
||||
status.update(label="🖼️ Creating featured image...")
|
||||
generated_image_filepath = generate_blog_image(
|
||||
blog_title, blog_meta_desc, blog_markdown_str, status, blog_tags
|
||||
)
|
||||
|
||||
# Save blog content to file
|
||||
status.update(label="💾 Saving blog content...")
|
||||
saved_blog_to_file = save_blog_content(
|
||||
blog_markdown_str, blog_title, blog_meta_desc, blog_tags,
|
||||
blog_categories, generated_image_filepath, status, blog_hashtags, blog_slug
|
||||
)
|
||||
|
||||
status.update(label="✅ Content enhancement complete", state="complete")
|
||||
else:
|
||||
status.update(label="⚠️ Metadata generation had issues, using simplified format", state="warning")
|
||||
|
||||
# Add buttons for metadata refinement
|
||||
create_metadata_refinement_ui()
|
||||
|
||||
# Add rich snippet section
|
||||
create_structured_data_ui()
|
||||
|
||||
metadata = {
|
||||
"blog_title": blog_title,
|
||||
"blog_meta_desc": blog_meta_desc,
|
||||
"blog_tags": blog_tags,
|
||||
"blog_categories": blog_categories,
|
||||
"blog_hashtags": blog_hashtags,
|
||||
"blog_slug": blog_slug
|
||||
}
|
||||
|
||||
return metadata, generated_image_filepath, saved_blog_to_file
|
||||
|
||||
|
||||
def create_metadata_refinement_ui():
|
||||
"""Create UI elements for refining blog metadata (title and meta description)."""
|
||||
col1, col2 = st.columns(2)
|
||||
with col1:
|
||||
if st.button("🔄 Refine Blog Title", key="refine_title_main", use_container_width=True):
|
||||
st.session_state.show_title_dialog = True
|
||||
st.rerun()
|
||||
with col2:
|
||||
if st.button("🔄 Refine Meta Description", key="refine_meta_main", use_container_width=True):
|
||||
st.session_state.show_meta_dialog = True
|
||||
st.rerun()
|
||||
|
||||
|
||||
def create_structured_data_ui():
|
||||
"""Create UI elements for generating structured data."""
|
||||
st.markdown("---")
|
||||
structured_data_col1, structured_data_col2 = st.columns([3, 1])
|
||||
|
||||
with structured_data_col1:
|
||||
# Educational popover explaining why rich snippets are important
|
||||
with st.expander("ℹ️ Why Rich Snippets Are Important for SEO"):
|
||||
st.markdown("""
|
||||
### Rich Snippets: Boosting Your SEO and Click-Through Rates
|
||||
|
||||
**What are Rich Snippets?**
|
||||
|
||||
Rich snippets are enhanced search results that display additional information directly in search engine results pages (SERPs). They're created using structured data markup (JSON-LD) that helps search engines understand your content better.
|
||||
|
||||
**Why are they important?**
|
||||
|
||||
1. **Increased Visibility**: Rich snippets stand out in search results with stars, images, and additional information
|
||||
|
||||
2. **Higher Click-Through Rates (CTR)**: Studies show rich snippets can increase CTR by 30-150%
|
||||
|
||||
3. **Improved SEO**: They help search engines understand your content better, potentially improving rankings
|
||||
|
||||
4. **Enhanced User Experience**: Users can see key information before clicking, leading to more qualified traffic
|
||||
|
||||
5. **Mobile-Friendly**: Rich snippets are especially effective on mobile searches
|
||||
|
||||
**Common types of rich snippets include:**
|
||||
- Articles/Blogs (with author, date, image)
|
||||
- Products (with ratings, price, availability)
|
||||
- Recipes (with cooking time, ratings, calories)
|
||||
- Events (with date, location, ticket info)
|
||||
- Local Business (with address, hours, ratings)
|
||||
|
||||
Adding structured data to your content is a powerful SEO technique that requires minimal effort but provides significant benefits.
|
||||
""")
|
||||
|
||||
with structured_data_col2:
|
||||
# Button to generate rich snippet
|
||||
if st.button("📊 Generate Rich Snippet", key="snippet_main", use_container_width=True):
|
||||
st.session_state.show_snippet_dialog = True
|
||||
st.rerun()
|
||||
|
||||
|
||||
def display_featured_image(blog_title, blog_meta_desc, blog_markdown_str, blog_tags, generated_image_filepath):
|
||||
"""
|
||||
Display the featured image with regeneration options.
|
||||
|
||||
Args:
|
||||
blog_title (str): Blog title
|
||||
blog_meta_desc (str): Blog meta description
|
||||
blog_markdown_str (str): Blog content
|
||||
blog_tags (list): Blog tags
|
||||
generated_image_filepath (str): Path to the generated image
|
||||
|
||||
Returns:
|
||||
str: Updated image filepath if regenerated, otherwise original filepath
|
||||
"""
|
||||
# Import image regeneration function here to avoid circular import
|
||||
from .ai_blog_generator_utils import regenerate_blog_image
|
||||
|
||||
st.subheader("🖼️ Featured Image")
|
||||
image_container = st.container()
|
||||
|
||||
# Display featured image
|
||||
with image_container:
|
||||
if generated_image_filepath:
|
||||
st.image(generated_image_filepath, caption=blog_title or "Featured Image", use_column_width=True)
|
||||
|
||||
# Add regenerate button
|
||||
if st.button("🔄 Regenerate Image", key="regenerate_image"):
|
||||
new_image_path = regenerate_blog_image(blog_title, blog_meta_desc, blog_markdown_str, blog_tags)
|
||||
if new_image_path:
|
||||
return new_image_path
|
||||
else:
|
||||
st.info("No featured image was generated. Click below to generate one.")
|
||||
if st.button("🖼️ Generate Image", key="generate_image"):
|
||||
new_image_path = regenerate_blog_image(blog_title, blog_meta_desc, blog_markdown_str, blog_tags)
|
||||
if new_image_path:
|
||||
return new_image_path
|
||||
|
||||
return generated_image_filepath
|
||||
|
||||
|
||||
def display_blog_content_and_audio(blog_markdown_str, saved_blog_to_file):
|
||||
"""
|
||||
Display the blog content and audio generation option.
|
||||
|
||||
Args:
|
||||
blog_markdown_str (str): Blog content
|
||||
saved_blog_to_file (str): Path to the saved blog file
|
||||
"""
|
||||
# Display blog content
|
||||
st.markdown("## Content")
|
||||
st.markdown(blog_markdown_str)
|
||||
|
||||
# Show file save information if available
|
||||
if saved_blog_to_file:
|
||||
st.success(f"✅ Blog saved to: {saved_blog_to_file}")
|
||||
|
||||
# Add the audio generation button
|
||||
st.markdown("---")
|
||||
audio_col1, audio_col2 = st.columns([1, 3])
|
||||
with audio_col1:
|
||||
generate_audio_button = st.button("🔊 Generate Audio Version", use_container_width=True)
|
||||
|
||||
with audio_col2:
|
||||
if generate_audio_button:
|
||||
generate_audio_version(blog_markdown_str)
|
||||
|
||||
|
||||
def display_final_metadata_table(metadata, update_progress):
|
||||
"""
|
||||
Display the final metadata table and options.
|
||||
|
||||
Args:
|
||||
metadata (dict): Blog metadata
|
||||
update_progress (function): Function to update progress
|
||||
"""
|
||||
update_progress(4, 5, "Preparing final blog presentation")
|
||||
|
||||
st.markdown("---")
|
||||
# Display metadata in a collapsible expander to save space
|
||||
with st.expander("🏷️ Metadata", expanded=True):
|
||||
st.table({
|
||||
"Metadata": ["Blog Title", "Meta Description", "Tags", "Categories", "Hashtags", "Slug"],
|
||||
"Value": [
|
||||
metadata["blog_title"],
|
||||
metadata["blog_meta_desc"],
|
||||
metadata["blog_tags"],
|
||||
metadata["blog_categories"],
|
||||
metadata["blog_hashtags"],
|
||||
metadata["blog_slug"]
|
||||
]
|
||||
})
|
||||
|
||||
# Add buttons in columns for refining metadata
|
||||
create_metadata_refinement_ui()
|
||||
|
||||
# Add a row for structured data with a "Generate Rich Snippet" button
|
||||
st.markdown("---")
|
||||
st.markdown("### Get Structured Data")
|
||||
|
||||
# Add structured data UI
|
||||
create_structured_data_ui()
|
||||
|
||||
# Create snippet generation dialog if button is clicked
|
||||
if st.session_state.get("show_snippet_dialog", False):
|
||||
display_structured_data_dialog(metadata["blog_title"], metadata["blog_tags"])
|
||||
|
||||
|
||||
def display_structured_data_dialog(blog_title, blog_tags):
|
||||
"""
|
||||
Display the structured data generation dialog.
|
||||
|
||||
Args:
|
||||
blog_title (str): Blog title
|
||||
blog_tags (list): Blog tags
|
||||
"""
|
||||
with st.expander("Structured Data Generation Tool", expanded=True):
|
||||
st.subheader("Generate Structured Data (Rich Snippets)")
|
||||
|
||||
# Close button at the top
|
||||
if st.button("Close", key="close_structured_data"):
|
||||
st.session_state.show_snippet_dialog = False
|
||||
st.rerun()
|
||||
|
||||
# Simplified blog URL input
|
||||
blog_url = st.text_input(
|
||||
"Blog URL:",
|
||||
placeholder="https://yourblog.com/your-article",
|
||||
help="Enter the URL where this blog will be published"
|
||||
)
|
||||
|
||||
# Auto-fill content type to "Article" since we're working with a blog
|
||||
content_type = "Article"
|
||||
st.info(f"Content Type: {content_type} (Auto-selected for blog content)")
|
||||
|
||||
# Form for additional article details
|
||||
with st.form(key="structured_data_form"):
|
||||
st.markdown("#### Article Details")
|
||||
|
||||
# Pre-fill with blog title and other metadata
|
||||
article_title = st.text_input("Headline:", value=blog_title if blog_title else "")
|
||||
article_author = st.text_input("Author:", value="")
|
||||
article_date = st.date_input("Date Published:", value=datetime.now())
|
||||
article_keywords = st.text_input("Keywords:", value=blog_tags if blog_tags else "")
|
||||
|
||||
submit_structured_data = st.form_submit_button("Generate JSON-LD")
|
||||
|
||||
if submit_structured_data:
|
||||
if not blog_url:
|
||||
st.error("Please enter a blog URL to generate structured data.")
|
||||
else:
|
||||
# Create details dictionary
|
||||
details = {
|
||||
"Headline": article_title,
|
||||
"Author": article_author,
|
||||
"Date Published": article_date,
|
||||
"Keywords": article_keywords
|
||||
}
|
||||
|
||||
# Call the imported ai_structured_data function or recreate its functionality
|
||||
with st.spinner("Generating structured data..."):
|
||||
# Import and use the function from the module directly
|
||||
from ...ai_seo_tools.seo_structured_data import generate_json_data
|
||||
|
||||
# Generate the structured data
|
||||
structured_data = generate_json_data(content_type, details, blog_url)
|
||||
|
||||
if structured_data:
|
||||
st.success("✅ Structured data generated successfully!")
|
||||
st.markdown("### Generated JSON-LD Code")
|
||||
st.code(structured_data, language="json")
|
||||
|
||||
# Download button
|
||||
st.download_button(
|
||||
label="📥 Download JSON-LD",
|
||||
data=structured_data,
|
||||
file_name=f"{content_type}_structured_data.json",
|
||||
mime="application/json",
|
||||
)
|
||||
|
||||
# Implementation instructions
|
||||
with st.expander("How to Implement This Code"):
|
||||
st.markdown("""
|
||||
### Adding this JSON-LD to your website:
|
||||
|
||||
1. **Copy the generated JSON-LD code** above
|
||||
|
||||
2. **Add it to the `<head>` section of your HTML** like this:
|
||||
```html
|
||||
<script type="application/ld+json">
|
||||
[PASTE YOUR JSON-LD CODE HERE]
|
||||
</script>
|
||||
```
|
||||
|
||||
3. **Verify the implementation** using Google's Rich Results Test tool:
|
||||
[https://search.google.com/test/rich-results](https://search.google.com/test/rich-results)
|
||||
|
||||
4. **Monitor your search appearance** in Google Search Console
|
||||
""")
|
||||
else:
|
||||
st.error("Failed to generate structured data. Please check your inputs and try again.")
|
||||
|
||||
|
||||
def display_title_refinement_dialog(blog_title, blog_tags):
|
||||
"""
|
||||
Display a dialog for refining the blog title.
|
||||
|
||||
Args:
|
||||
blog_title (str): Current blog title
|
||||
blog_tags (list): Blog tags for context
|
||||
"""
|
||||
with st.expander("Blog Title Refinement Tool", expanded=True):
|
||||
st.subheader("Generate Better Blog Titles")
|
||||
|
||||
# Form for title generation
|
||||
with st.form(key="title_generation_form"):
|
||||
st.markdown("#### Title Generation Parameters")
|
||||
|
||||
# Pre-fill with blog tags if available
|
||||
keywords = st.text_input("Target Keywords:",
|
||||
value=blog_tags if blog_tags else "",
|
||||
help="Enter primary keywords to target in the title")
|
||||
|
||||
blog_type = st.selectbox(
|
||||
"Blog Type:",
|
||||
["How-to Guide", "Tutorial", "List Post", "Informational", "Case Study", "Opinion Piece", "Review"],
|
||||
index=0,
|
||||
help="Select the type of blog you're creating"
|
||||
)
|
||||
|
||||
search_intent = st.selectbox(
|
||||
"Search Intent:",
|
||||
["Informational", "Commercial", "Navigational", "Transactional"],
|
||||
index=0,
|
||||
help="Select the primary search intent your title should address"
|
||||
)
|
||||
|
||||
language = st.selectbox(
|
||||
"Language:",
|
||||
["English", "Spanish", "French", "German", "Italian"],
|
||||
index=0
|
||||
)
|
||||
|
||||
submit_title = st.form_submit_button("Generate Title Suggestions")
|
||||
|
||||
if submit_title:
|
||||
with st.spinner("Generating title suggestions..."):
|
||||
# Import and use the function from the module
|
||||
from ...ai_seo_tools.content_title_generator import generate_blog_titles
|
||||
|
||||
# Generate the titles
|
||||
title_suggestions = generate_blog_titles(
|
||||
target_keywords=keywords,
|
||||
blog_type=blog_type,
|
||||
search_intent=search_intent,
|
||||
language=language
|
||||
)
|
||||
|
||||
if title_suggestions:
|
||||
st.success("✅ Generated title suggestions!")
|
||||
|
||||
# Display each title with an option to select it
|
||||
st.markdown("### Select a Title or Modify")
|
||||
|
||||
selected_title = st.text_input(
|
||||
"Selected or Modified Title:",
|
||||
value=blog_title if blog_title else (title_suggestions[0] if title_suggestions else ""),
|
||||
help="Select one of the suggested titles or modify it to your preference"
|
||||
)
|
||||
|
||||
if st.button("Confirm Title"):
|
||||
st.session_state.blog_title = selected_title
|
||||
st.session_state.show_title_dialog = False
|
||||
st.success(f"Title updated to: {selected_title}")
|
||||
st.rerun()
|
||||
|
||||
# Display all suggestions
|
||||
for i, title in enumerate(title_suggestions):
|
||||
st.markdown(f"**Option {i+1}:** {title}")
|
||||
else:
|
||||
st.error("Failed to generate title suggestions. Please try different parameters.")
|
||||
|
||||
|
||||
def display_meta_description_dialog(blog_meta_desc, blog_tags):
|
||||
"""
|
||||
Display a dialog for refining the meta description.
|
||||
|
||||
Args:
|
||||
blog_meta_desc (str): Current meta description
|
||||
blog_tags (list): Blog tags for context
|
||||
"""
|
||||
with st.expander("Meta Description Refinement Tool", expanded=True):
|
||||
st.subheader("Generate Optimized Meta Descriptions")
|
||||
|
||||
# Form for meta description generation
|
||||
with st.form(key="meta_desc_generation_form"):
|
||||
st.markdown("#### Meta Description Parameters")
|
||||
|
||||
# Pre-fill with blog tags if available
|
||||
keywords = st.text_input("Target Keywords:",
|
||||
value=blog_tags if blog_tags else "",
|
||||
help="Enter primary keywords to target in the meta description")
|
||||
|
||||
tone = st.selectbox(
|
||||
"Tone:",
|
||||
["Informative", "Engaging", "Professional", "Conversational", "Humorous", "Urgent"],
|
||||
index=0,
|
||||
help="Select the tone for your meta description"
|
||||
)
|
||||
|
||||
search_intent = st.selectbox(
|
||||
"Search Intent:",
|
||||
["Informational", "Commercial", "Navigational", "Transactional"],
|
||||
index=0,
|
||||
help="Select the primary search intent your meta description should address"
|
||||
)
|
||||
|
||||
language = st.selectbox(
|
||||
"Language:",
|
||||
["English", "Spanish", "French", "German", "Italian"],
|
||||
index=0
|
||||
)
|
||||
|
||||
submit_meta = st.form_submit_button("Generate Meta Description Suggestions")
|
||||
|
||||
if submit_meta:
|
||||
with st.spinner("Generating meta description suggestions..."):
|
||||
# Import and use the function from the module
|
||||
from ...ai_seo_tools.meta_desc_generator import generate_blog_metadesc
|
||||
|
||||
# Generate the meta descriptions
|
||||
meta_suggestions = generate_blog_metadesc(
|
||||
target_keywords=keywords,
|
||||
tone=tone,
|
||||
search_intent=search_intent,
|
||||
language=language
|
||||
)
|
||||
|
||||
if meta_suggestions:
|
||||
st.success("✅ Generated meta description suggestions!")
|
||||
|
||||
# Display each meta description with an option to select it
|
||||
st.markdown("### Select a Meta Description or Modify")
|
||||
|
||||
selected_meta = st.text_area(
|
||||
"Selected or Modified Meta Description:",
|
||||
value=blog_meta_desc if blog_meta_desc else (meta_suggestions[0] if meta_suggestions else ""),
|
||||
height=100,
|
||||
help="Select one of the suggested meta descriptions or modify it to your preference"
|
||||
)
|
||||
|
||||
if st.button("Confirm Meta Description"):
|
||||
st.session_state.blog_meta_desc = selected_meta
|
||||
st.session_state.show_meta_dialog = False
|
||||
st.success(f"Meta description updated!")
|
||||
st.rerun()
|
||||
|
||||
# Display all suggestions
|
||||
for i, meta in enumerate(meta_suggestions):
|
||||
st.markdown(f"**Option {i+1}:** {meta}")
|
||||
else:
|
||||
st.error("Failed to generate meta description suggestions. Please try different parameters.")
|
||||
|
||||
|
||||
def write_blog_from_keywords(search_keywords, url=None, search_params=None, blog_params=None):
|
||||
"""
|
||||
This function will take a blog Topic to first generate sections for it
|
||||
and then generate content for each section.
|
||||
|
||||
Args:
|
||||
search_keywords (str): Keywords to research and write about
|
||||
url (str, optional): Optional URL to use as a source
|
||||
search_params (dict, optional): Dictionary of search parameters including:
|
||||
- max_results: Maximum number of search results (default: 10)
|
||||
- search_depth: "basic" or "advanced" search depth (default: "basic")
|
||||
- include_domains: List of domains to prioritize in search
|
||||
- time_range: Time range for results (default: "year")
|
||||
blog_params (dict, optional): Dictionary of blog content characteristics including:
|
||||
- blog_length: Target word count (default: 2000)
|
||||
- blog_tone: Tone of the content (default: "Professional")
|
||||
- blog_demographic: Target audience (default: "Professional")
|
||||
- blog_type: Type of blog post (default: "Informational")
|
||||
- blog_language: Language for the blog (default: "English")
|
||||
- blog_output_format: Format for the blog (default: "markdown")
|
||||
"""
|
||||
# Check if we need to display any dialog boxes first
|
||||
if st.session_state.get("show_title_dialog") and "blog_title" in st.session_state:
|
||||
display_title_refinement_dialog(st.session_state.blog_title, None)
|
||||
return None
|
||||
|
||||
if st.session_state.get("show_meta_dialog") and "blog_meta_desc" in st.session_state:
|
||||
display_meta_description_dialog(st.session_state.blog_meta_desc, None)
|
||||
return None
|
||||
|
||||
if st.session_state.get("show_snippet_dialog"):
|
||||
# Get blog title and tags to pass to the dialog
|
||||
blog_title = st.session_state.get("blog_title", "")
|
||||
blog_tags = st.session_state.get("blog_tags", "")
|
||||
display_structured_data_dialog(blog_title, blog_tags)
|
||||
return None
|
||||
|
||||
# Initialize parameters with defaults
|
||||
search_params, blog_params = initialize_parameters(search_params, blog_params)
|
||||
|
||||
# Set up progress tracking
|
||||
final_content_placeholder, progress_placeholder, progress_bar, status_text, update_progress = setup_progress_tracking()
|
||||
|
||||
# STEP 1: Research phase
|
||||
google_search_result, tavily_search_result, google_search_success, tavily_search_success, example_blog_titles = perform_research_phase(
|
||||
search_keywords, search_params, update_progress
|
||||
)
|
||||
|
||||
# Check if both searches failed - if so, stop the process
|
||||
if not google_search_success and not tavily_search_success:
|
||||
update_progress(5, 5, "Research failed")
|
||||
progress_placeholder.error("⛔ Both Google SERP and Tavily AI searches failed. Unable to generate blog content.")
|
||||
st.warning("Please check your API keys in the environment settings and try again.")
|
||||
st.stop()
|
||||
return None
|
||||
|
||||
# STEP 2: Content generation phase
|
||||
blog_markdown_str = generate_content_phase(
|
||||
search_keywords, google_search_result, tavily_search_result,
|
||||
google_search_success, tavily_search_success, blog_params, update_progress
|
||||
)
|
||||
|
||||
if not blog_markdown_str:
|
||||
update_progress(5, 5, "Content generation failed")
|
||||
progress_placeholder.error("⛔ Failed to generate blog content from research data.")
|
||||
st.stop()
|
||||
return None
|
||||
|
||||
# STEP 3: Metadata & enhancement phase
|
||||
metadata, generated_image_filepath, saved_blog_to_file = generate_metadata_and_image(
|
||||
blog_markdown_str, search_keywords, None, update_progress
|
||||
)
|
||||
|
||||
# Display image with regeneration option
|
||||
updated_image_filepath = display_featured_image(
|
||||
metadata["blog_title"], metadata["blog_meta_desc"],
|
||||
blog_markdown_str, metadata["blog_tags"], generated_image_filepath
|
||||
)
|
||||
|
||||
if updated_image_filepath != generated_image_filepath:
|
||||
generated_image_filepath = updated_image_filepath
|
||||
st.rerun() # Refresh the page to show the new image
|
||||
|
||||
# Display blog content and audio option
|
||||
display_blog_content_and_audio(blog_markdown_str, saved_blog_to_file)
|
||||
|
||||
# STEP 4: Final presentation
|
||||
with final_content_placeholder.container():
|
||||
display_final_metadata_table(metadata, update_progress)
|
||||
|
||||
# If there's a button click to generate a structured data snippet, handle it
|
||||
if st.session_state.get("show_snippet_dialog", False):
|
||||
display_structured_data_dialog(metadata["blog_title"], metadata["blog_tags"])
|
||||
|
||||
# Final progress update
|
||||
update_progress(5, 5, "Blog generation complete!")
|
||||
|
||||
# Replace progress bar with success message
|
||||
progress_placeholder.success("✅ Blog generation process completed successfully!")
|
||||
|
||||
return blog_markdown_str
|
||||
File diff suppressed because it is too large
Load Diff
@@ -175,7 +175,7 @@ def research_topic(keywords, search_params=None):
|
||||
placeholder.info("Researching topic... Please wait.")
|
||||
|
||||
try:
|
||||
from .keywords_to_blog_streamlit import do_tavily_ai_search
|
||||
from .ai_blog_writer.keywords_to_blog_streamlit import do_tavily_ai_search
|
||||
|
||||
# Use provided search params or defaults
|
||||
if search_params is None:
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -17,7 +17,7 @@ logger.add(sys.stdout,
|
||||
)
|
||||
|
||||
from ...ai_web_researcher.gpt_online_researcher import do_google_serp_search
|
||||
from ..blog_from_google_serp import blog_with_research
|
||||
from ..ai_blog_writer.blog_from_google_serp import blog_with_research
|
||||
from ...blog_metadata.get_blog_metadata import blog_metadata
|
||||
from ...blog_postprocessing.save_blog_to_file import save_blog_to_file
|
||||
from ...gpt_providers.audio_to_text_generation.stt_audio_blog import speech_to_text
|
||||
@@ -110,13 +110,24 @@ def generate_audio_blog(audio_input):
|
||||
logger.error(f"Error in blog_with_research: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
blog_title, blog_meta_desc, blog_tags, blog_categories = blog_metadata(blog_markdown_str)
|
||||
try:
|
||||
import asyncio
|
||||
# blog_metadata now returns 6 values: title, desc, tags, categories, hashtags, slug
|
||||
blog_title, blog_meta_desc, blog_tags, blog_categories, blog_hashtags, blog_slug = asyncio.run(blog_metadata(blog_markdown_str))
|
||||
except Exception as err:
|
||||
logger.error(f"Failed to generate blog metadata: {err}")
|
||||
# Set defaults in case of failure
|
||||
blog_title = "Blog Article"
|
||||
blog_meta_desc = "An informative blog post"
|
||||
blog_tags = "content, blog"
|
||||
blog_categories = "General, Information"
|
||||
blog_hashtags = "#content #blog"
|
||||
blog_slug = "blog-article"
|
||||
|
||||
try:
|
||||
# TBD: Save the blog content as a .md file. Markdown or HTML ?
|
||||
# Initialize generated_image_filepath to None since it's not generated in this function
|
||||
generated_image_filepath = None
|
||||
save_blog_to_file(blog_markdown_str, blog_title, blog_meta_desc, blog_tags, blog_categories, generated_image_filepath)
|
||||
except Exception as err:
|
||||
logger.error(f"Failed to save final blog in a file: {err}")
|
||||
|
||||
Reference in New Issue
Block a user