story illustrator and story video generator, AI web researcher fixes

This commit is contained in:
ajaysi
2025-05-02 23:09:43 +05:30
parent cda275f1cc
commit 19ff21a8a1
13 changed files with 2625 additions and 186 deletions

View File

@@ -150,7 +150,7 @@ def gpt_web_researcher(search_keywords, search_mode, **kwargs):
# Pass the parameters to do_tavily_ai_search
t_results = do_tavily_ai_search(
keywords=search_keywords,
search_keywords, # Pass as positional argument
max_results=kwargs.get('num_results', 10),
include_domains=include_domains,
search_depth=search_depth,
@@ -258,7 +258,7 @@ def do_google_serp_search(search_keywords, status_container, update_progress, **
try:
# Validate parameters
update_progress("Validating search parameters")
update_progress("Validating search parameters", progress=0.1)
status_container.info("📝 Validating parameters...")
if not search_keywords or not isinstance(search_keywords, str):
@@ -266,7 +266,7 @@ def do_google_serp_search(search_keywords, status_container, update_progress, **
raise ValueError("Search keywords must be a non-empty string")
# Update search initiation
update_progress(f"Initiating search for: '{search_keywords}'")
update_progress(f"Initiating search for: '{search_keywords}'", progress=0.2)
status_container.info("🌐 Querying search API...")
logger.info(f"Search params: {kwargs}")
@@ -275,26 +275,26 @@ def do_google_serp_search(search_keywords, status_container, update_progress, **
if g_results:
# Log success
update_progress("Search completed successfully", "success")
update_progress("Search completed successfully", progress=0.8, level="success")
# Update statistics
stats = f"""Found:
- {len(g_results.get('organic', []))} organic results
- {len(g_results.get('peopleAlsoAsk', []))} related questions
- {len(g_results.get('relatedSearches', []))} related searches"""
update_progress(stats)
update_progress(stats, progress=0.9)
# Process results
update_progress("Processing search results")
update_progress("Processing search results", progress=0.95)
status_container.info("⚡ Processing results...")
processed_results = process_search_results(g_results)
# Extract titles
update_progress("Extracting information")
update_progress("Extracting information", progress=0.98)
g_titles = extract_info(g_results, 'titles')
# Final success
update_progress("Analysis completed successfully", "success")
update_progress("Analysis completed successfully", progress=1.0, level="success")
status_container.success("✨ Research completed!")
# Clear main status after delay
@@ -313,13 +313,13 @@ def do_google_serp_search(search_keywords, status_container, update_progress, **
}
else:
update_progress("No results found", "warning")
update_progress("No results found", progress=0.5, level="warning")
status_container.warning("⚠️ No results found")
return None
except Exception as err:
error_msg = f"Search failed: {str(err)}"
update_progress(error_msg, "error")
update_progress(error_msg, progress=0.5, level="error")
logger.error(error_msg)
logger.debug("Stack trace:", exc_info=True)
raise
@@ -343,8 +343,11 @@ def do_tavily_ai_search(search_keywords, max_results=10, **kwargs):
'include_domains': kwargs.get('include_domains', [""]) if kwargs.get('include_domains') else [""]
}
# Pass the parameters to do_tavily_ai_search
t_results = do_tavily_ai_search(
# Import the Tavily search function directly
from .tavily_ai_search import do_tavily_ai_search as tavily_search
# Call the actual Tavily search function
t_results = tavily_search(
keywords=search_keywords,
**tavily_params
)

View File

@@ -65,28 +65,22 @@ def metaphor_rag_search():
return processed_results
def metaphor_find_similar(similar_url, usecase, num_results=5, start_published_date=None, end_published_date=None,
include_domains=None, exclude_domains=None, include_text=None, exclude_text=None,
summary_query=None):
"""
Find similar content using the Metaphor API.
Args:
similar_url (str): The URL to find similar content.
usecase (str): The use case for the search (e.g., "similar companies", "listicles").
num_results (int): Number of results to return (default: 5).
start_published_date (str): Start date for filtering results in ISO format.
end_published_date (str): End date for filtering results in ISO format.
include_domains (list): List of domains to include in the search.
exclude_domains (list): List of domains to exclude from the search.
include_text (str): Text that must be included in the results.
exclude_text (str): Text that must be excluded from the results.
summary_query (dict): Custom query for summarization.
Returns:
tuple: (DataFrame, MetaphorResponse) - The DataFrame contains the results and the MetaphorResponse contains the raw API response.
"""
metaphor = get_metaphor_client()
def metaphor_find_similar(similar_url, usecase, num_results=5, start_published_date=None, end_published_date=None,
include_domains=None, exclude_domains=None, include_text=None, exclude_text=None,
summary_query=None, progress_bar=None):
"""Find similar content using Metaphor API."""
try:
logger.info(f"Doing similar web search for url: {similar_url}")
# Initialize progress if not provided
if progress_bar is None:
progress_bar = st.progress(0.0)
# Update progress
progress_bar.progress(0.1, text="Initializing search...")
# Get Metaphor client
metaphor = get_metaphor_client()
logger.info(f"Initialized Metaphor client for URL: {similar_url}")
# Prepare search parameters
search_params = {
@@ -94,113 +88,83 @@ def metaphor_find_similar(similar_url, usecase, num_results=5, start_published_d
"num_results": num_results,
}
# Add date parameters if provided
# Add optional parameters if provided
if start_published_date:
search_params["start_published_date"] = start_published_date
if end_published_date:
search_params["end_published_date"] = end_published_date
# Add domain filters if provided
if include_domains:
search_params["include_domains"] = include_domains
if exclude_domains:
search_params["exclude_domains"] = exclude_domains
# Add text filters if provided
if include_text:
search_params["include_text"] = include_text
if exclude_text:
search_params["exclude_text"] = exclude_text
# Add summary query if provided
# Add summary query
if summary_query:
search_params["summary"] = summary_query
else:
# Default summary query based on usecase
search_params["summary"] = {"query": f"Find {usecase} similar to the given URL."}
# Execute the search
logger.debug(f"Search parameters: {search_params}")
# Update progress
progress_bar.progress(0.2, text="Preparing search parameters...")
# Make API call
logger.info("Calling Metaphor API find_similar_and_contents...")
search_response = metaphor.find_similar_and_contents(
similar_url,
**search_params
)
if search_response and hasattr(search_response, 'results'):
competitors = search_response.results
total_results = len(competitors)
# Update progress
progress_bar.progress(0.3, text=f"Found {total_results} results...")
# Process results
processed_results = []
for i, result in enumerate(competitors):
# Calculate progress as decimal (0.0-1.0)
progress = 0.3 + (0.6 * (i / total_results))
progress_text = f"Processing result {i+1}/{total_results}..."
progress_bar.progress(progress, text=progress_text)
# Process each result
processed_result = {
"Title": result.title,
"URL": result.url,
"Content Summary": result.text if hasattr(result, 'text') else "No content available"
}
processed_results.append(processed_result)
# Update progress
progress_bar.progress(0.9, text="Finalizing results...")
# Create DataFrame
df = pd.DataFrame(processed_results)
# Update progress
progress_bar.progress(1.0, text="Analysis completed!")
return df, search_response
else:
logger.warning("No results found in search response")
progress_bar.progress(1.0, text="No results found")
return pd.DataFrame(), search_response
except Exception as e:
logger.error(f"Metaphor: Error in finding similar content: {e}")
logger.error(f"Error in metaphor_find_similar: {str(e)}", exc_info=True)
if progress_bar:
progress_bar.progress(1.0, text="Error occurred during analysis")
raise
competitors = search_response.results
# Initialize lists to store titles, URLs, and contents
titles = []
urls = []
contents = []
progress_bar = st.progress(0, text="Starting competitor analysis...")
# Extract titles, URLs, and contents from the competitors
for i, c in enumerate(competitors):
# Update progress bar for each competitor
if st.session_state.get('show_progress', True):
progress_text = f"Processing competitor {i+1}/{len(competitors)}: {c.title[:30]}..."
progress_bar.progress((i / len(competitors)) * 100, text=progress_text)
titles.append(c.title)
urls.append(c.url)
all_contents = ""
try:
# Update progress
if st.session_state.get('show_progress', True):
progress_bar.progress(25, text=f"Fetching content for {c.title[:30]}...")
search_response = metaphor.search_and_contents(
c.url,
type="keyword",
num_results=1
)
research_response = search_response.results
# Update progress
if st.session_state.get('show_progress', True):
progress_bar.progress(50, text=f"Extracting text from {c.title[:30]}...")
for r in research_response:
all_contents += r.text
# Update progress
if st.session_state.get('show_progress', True):
progress_bar.progress(75, text=f"Summarizing content for {c.title[:30]}...")
# Get the summary from the competitor content
summary_response = summarize_competitor_content(all_contents)
c.text = summary_response
# Store the raw summary in session state for display in dialog
if 'competitor_summaries' not in st.session_state:
st.session_state.competitor_summaries = {}
st.session_state.competitor_summaries[c.url] = {
'title': c.title,
'summary': summary_response
}
# Update progress to complete
if st.session_state.get('show_progress', True):
progress_bar.progress(100, text=f"Completed processing {c.title[:30]}")
except Exception as err:
c.text = f"Failed to summarize content: {err}"
# Update progress to show error
if st.session_state.get('show_progress', True):
progress_bar.progress(100, text=f"Error processing {c.title[:30]}: {str(err)[:50]}...")
contents.append(c.text)
# Create a DataFrame from the titles, URLs, and contents
df = pd.DataFrame({
"Title": titles,
"URL": urls,
"Content Summary": contents
})
# Return the DataFrame and the search response
return df, search_response
def calculate_date_range(time_range: str) -> tuple:
"""

View File

@@ -39,14 +39,6 @@ from .blog_ai_research_utils import (
do_tavily_ai_search
)
# REMOVED CIRCULAR IMPORTS
# Import content and image generation functions from the generator utils module
# from .ai_blog_generator_utils import (
# generate_blog_content,
# generate_blog_metadata,
# generate_blog_image,
# regenerate_blog_image
# )
def save_blog_content(blog_markdown_str, blog_title, blog_meta_desc, blog_tags, blog_categories, generated_image_filepath, status, blog_hashtags=None, blog_slug=None):
"""

View File

@@ -0,0 +1,75 @@
# AI Story Illustrator
The AI Story Illustrator is a powerful tool that generates beautiful illustrations for stories using Google's Gemini AI. This module allows users to input stories via text, file upload, or URL, and automatically generates appropriate illustrations for different scenes in the story.
## Features
- **Multiple Input Methods**: Input stories via direct text entry, file upload, or URL extraction
- **Intelligent Scene Segmentation**: Automatically divides stories into logical segments for illustration
- **Customizable Illustration Styles**: Choose from various artistic styles or define your own
- **Scene Element Extraction**: Analyzes story segments to identify key visual elements
- **Multiple Export Options**: Export as PDF storybook or ZIP archive of individual images
- **Customizable Aspect Ratios**: Support for different image dimensions (16:9, 4:3, 1:1)
- **Advanced Settings**: Control the number of segments to illustrate and other parameters
## Usage
The Story Illustrator is integrated into the Alwrity platform and can be accessed through the main interface. The workflow consists of three main steps:
1. **Story Input**: Enter your story text, upload a file, or provide a URL
2. **Illustration Settings**: Configure the style, aspect ratio, and other parameters
3. **Generate & Export**: Generate illustrations for all or individual segments and export the results
## Technical Details
### Dependencies
- Streamlit: For the user interface
- Gemini AI: For image generation
- BeautifulSoup: For URL text extraction
- ReportLab: For PDF generation (optional)
- PIL: For image processing
### Key Functions
- `segment_story()`: Divides a story into logical segments for illustration
- `extract_scene_elements()`: Analyzes story segments to identify key visual elements
- `generate_illustration_prompt()`: Creates detailed prompts for the AI image generator
- `create_illustration()`: Generates an illustration for a story segment
- `create_storybook_pdf()`: Combines story text and illustrations into a PDF
- `create_zip_archive()`: Creates a ZIP archive of individual illustrations
## Example
```python
from lib.ai_writers.ai_story_illustrator.story_illustrator import write_story_illustrator
# Run the Story Illustrator app
write_story_illustrator()
```
## Best Practices
- **Provide Clear Segments**: The system works best with stories that have clear scene transitions
- **Be Specific with Styles**: More specific style descriptions yield better results
- **Balance Text and Images**: For best results, aim for segments of 100-500 words per illustration
- **Review and Regenerate**: If an illustration doesn't capture the scene well, use the regenerate option
## Future Enhancements
- Support for more export formats (EPUB, HTML)
- Enhanced character consistency across illustrations
- Animation options for digital storytelling
- Voice narration integration
- Custom character design options
## Troubleshooting
- If illustrations are not generating, check your internet connection and API access
- If PDF export fails, ensure ReportLab is installed (`pip install reportlab`)
- If URL extraction fails, try copying the text manually
- For large stories, consider processing in smaller batches
## Credits
This module uses Google's Gemini AI for image generation and leverages various open-source libraries for text processing and document generation.

View File

@@ -0,0 +1,7 @@
"""
AI Story Illustrator module for generating illustrations for stories using AI.
"""
from .story_illustrator import write_story_illustrator
__all__ = ['write_story_illustrator']

View File

@@ -0,0 +1,727 @@
"""
AI Story Illustrator - Generate illustrations for stories using Gemini AI
This module provides functionality to generate illustrations for stories using Google's Gemini AI.
Users can input stories via text, file upload, or URL, and the system will generate appropriate
illustrations for different scenes in the story.
Based on: https://github.com/google-gemini/cookbook/blob/main/examples/Book_illustration.ipynb
"""
import streamlit as st
import os
import re
import time
import tempfile
import requests
from pathlib import Path
import io
import base64
import json
import uuid
import logging
from urllib.parse import urlparse
from bs4 import BeautifulSoup
import zipfile
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('story_illustrator')
# Constants
MAX_STORY_LENGTH = 10000 # Maximum story length in characters
MIN_SEGMENT_LENGTH = 100 # Minimum segment length for illustration
MAX_SEGMENTS = 20 # Maximum number of segments to illustrate
DEFAULT_STYLE = "digital art" # Default illustration style
DEFAULT_ASPECT_RATIO = "16:9" # Default aspect ratio
def extract_text_from_url(url):
"""Extract text content from a URL."""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.extract()
# Get text
text = soup.get_text(separator='\\n')
# Break into lines and remove leading and trailing space on each
lines = (line.strip() for line in text.splitlines())
# Break multi-headlines into a line each
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
# Drop blank lines
text = '\\n'.join(chunk for chunk in chunks if chunk)
return text
except Exception as e:
logger.error(f"Error extracting text from URL: {e}")
return None
def segment_story(story_text, min_segment_length=MIN_SEGMENT_LENGTH, max_segments=MAX_SEGMENTS):
"""
Segment a story into logical parts for illustration.
Uses paragraph breaks, scene changes, and other indicators to create segments.
"""
# Clean up the text
story_text = story_text.strip()
# Split by paragraphs first
paragraphs = re.split(r'\\n\s*\\n', story_text)
# Initialize segments
segments = []
current_segment = ""
for paragraph in paragraphs:
# Skip empty paragraphs
if not paragraph.strip():
continue
# If adding this paragraph would make the segment too long, start a new segment
if len(current_segment) + len(paragraph) > 1000: # Limit segment size
if current_segment:
segments.append(current_segment.strip())
current_segment = paragraph
else:
# Add paragraph to current segment
if current_segment:
current_segment += "\\n\\n" + paragraph
else:
current_segment = paragraph
# Add the last segment if it exists
if current_segment:
segments.append(current_segment.strip())
# Combine very short segments
i = 0
while i < len(segments) - 1:
if len(segments[i]) < min_segment_length:
segments[i] += "\\n\\n" + segments[i+1]
segments.pop(i+1)
else:
i += 1
# Limit the number of segments
if len(segments) > max_segments:
# Combine segments to reduce the total number
new_segments = []
segment_size = len(segments) / max_segments
for i in range(max_segments):
start_idx = int(i * segment_size)
end_idx = int((i + 1) * segment_size)
combined_segment = "\\n\\n".join(segments[start_idx:end_idx])
new_segments.append(combined_segment)
segments = new_segments
return segments
def extract_scene_elements(segment):
"""
Extract key scene elements from a story segment using LLM.
This helps create more accurate illustration prompts.
"""
from ...gpt_providers.text_generation.main_text_generation import llm_text_gen
prompt = f"""
Analyze the following story segment and extract key visual elements for an illustration:
{segment}
Please provide:
1. Main characters present (with brief visual descriptions)
2. Setting/location details
3. Key action or emotional moment to illustrate
4. Important objects or props
5. Time of day and lighting
6. Weather or atmospheric conditions (if applicable)
Format your response as JSON with these keys: "characters", "setting", "key_moment", "objects", "lighting", "atmosphere"
"""
try:
response = llm_text_gen(prompt)
# Try to extract JSON from the response
try:
# Find JSON content between triple backticks if present
json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL)
if json_match:
json_str = json_match.group(1)
else:
# Otherwise try to parse the whole response as JSON
json_str = response
scene_elements = json.loads(json_str)
return scene_elements
except json.JSONDecodeError:
# If JSON parsing fails, extract information using regex
characters = re.search(r'"characters":\s*"([^"]*)"', response)
setting = re.search(r'"setting":\s*"([^"]*)"', response)
return {
"characters": characters.group(1) if characters else "",
"setting": setting.group(1) if setting else "",
"key_moment": "",
"objects": "",
"lighting": "",
"atmosphere": ""
}
except Exception as e:
logger.error(f"Error extracting scene elements: {e}")
return {
"characters": "",
"setting": "",
"key_moment": "",
"objects": "",
"lighting": "",
"atmosphere": ""
}
def generate_illustration_prompt(segment, style, characters=None, setting=None):
"""
Generate a prompt for the illustration based on the segment content.
Args:
segment: The story segment to illustrate
style: The artistic style for the illustration
characters: Optional character descriptions
setting: Optional setting description
Returns:
A prompt string for the image generation model
"""
# Create a base prompt
base_prompt = f"""
Create a detailed illustration for the following story segment in {style} style:
{segment[:500]} # Limit segment length for prompt
The illustration should capture the key elements, mood, and action of this scene.
"""
# Add character information if provided
if characters:
base_prompt += f"\\n\\nThe main characters in this scene are: {characters}"
# Add setting information if provided
if setting:
base_prompt += f"\\n\\nThe setting is: {setting}"
# Add style-specific instructions
if "watercolor" in style.lower():
base_prompt += "\\n\\nUse soft, flowing watercolor techniques with visible brush strokes and color blending."
elif "digital art" in style.lower():
base_prompt += "\\n\\nCreate a polished digital illustration with clean lines and vibrant colors."
elif "pencil sketch" in style.lower():
base_prompt += "\\n\\nUse pencil sketch techniques with visible hatching, shading, and line work."
# Add final quality instructions
base_prompt += """
Make the illustration:
- Visually engaging and detailed
- Appropriate for a storybook
- Focused on the main action or emotion of the scene
- With good composition and visual storytelling
"""
return base_prompt.strip()
def create_illustration(segment, style, aspect_ratio="16:9"):
"""
Create an illustration for a story segment.
Args:
segment: The story segment to illustrate
style: The artistic style for the illustration
aspect_ratio: The aspect ratio for the illustration
Returns:
Path to the generated image
"""
# Import here to avoid circular imports
from ...gpt_providers.text_to_image_generation.gen_gemini_images import generate_gemini_image
# Extract scene elements to enhance the prompt
scene_elements = extract_scene_elements(segment)
# Create a detailed prompt for the illustration
prompt = generate_illustration_prompt(
segment,
style,
characters=scene_elements.get("characters", ""),
setting=scene_elements.get("setting", "")
)
# Add key elements to the prompt
key_moment = scene_elements.get("key_moment", "")
objects = scene_elements.get("objects", "")
lighting = scene_elements.get("lighting", "")
atmosphere = scene_elements.get("atmosphere", "")
if key_moment:
prompt += f"\\n\\nFocus on this key moment: {key_moment}"
if objects:
prompt += f"\\n\\nInclude these important objects: {objects}"
if lighting:
prompt += f"\\n\\nThe lighting is: {lighting}"
if atmosphere:
prompt += f"\\n\\nThe atmosphere/weather is: {atmosphere}"
# Generate the illustration
try:
# Parse aspect ratio
if aspect_ratio == "16:9":
width, height = 16, 9
elif aspect_ratio == "4:3":
width, height = 4, 3
elif aspect_ratio == "1:1":
width, height = 1, 1
else:
width, height = 16, 9 # Default
# Generate image using Gemini
image_path = generate_gemini_image(
prompt=prompt,
style=style.lower() if style else None,
aspect_ratio=aspect_ratio
)
return image_path
except Exception as e:
logger.error(f"Error creating illustration: {e}")
return None
def create_storybook_pdf(segments, illustrations, title, author, output_path):
"""
Create a PDF storybook with text and illustrations.
Args:
segments: List of story segments
illustrations: List of paths to illustrations
title: Book title
author: Book author
output_path: Path to save the PDF
Returns:
Path to the created PDF
"""
try:
from reportlab.lib.pagesizes import letter, A4
from reportlab.lib import colors
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as ReportLabImage, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.units import inch
# Create a PDF document
doc = SimpleDocTemplate(output_path, pagesize=A4)
story = []
# Get styles
styles = getSampleStyleSheet()
title_style = styles['Title']
author_style = styles['Normal']
author_style.alignment = 1 # Center alignment
normal_style = styles['Normal']
# Add title page
story.append(Paragraph(title, title_style))
story.append(Spacer(1, 0.5*inch))
story.append(Paragraph(f"by {author}", author_style))
story.append(PageBreak())
# Add content pages
for i, (segment, illustration_path) in enumerate(zip(segments, illustrations)):
if illustration_path and os.path.exists(illustration_path):
# Add illustration
img = ReportLabImage(illustration_path, width=6*inch, height=4*inch)
story.append(img)
story.append(Spacer(1, 0.25*inch))
# Add text
for paragraph in segment.split('\\n\\n'):
if paragraph.strip():
story.append(Paragraph(paragraph, normal_style))
story.append(Spacer(1, 0.1*inch))
# Add page break between segments
if i < len(segments) - 1:
story.append(PageBreak())
# Build the PDF
doc.build(story)
return output_path
except Exception as e:
logger.error(f"Error creating PDF: {e}")
return None
def create_zip_archive(files, output_path):
"""
Create a ZIP archive containing the provided files.
Args:
files: Dictionary of {filename: file_path} to include in the archive
output_path: Path to save the ZIP file
Returns:
Path to the created ZIP file
"""
try:
with zipfile.ZipFile(output_path, 'w') as zipf:
for filename, file_path in files.items():
if os.path.exists(file_path):
zipf.write(file_path, arcname=filename)
return output_path
except Exception as e:
logger.error(f"Error creating ZIP archive: {e}")
return None
def write_story_illustrator():
"""Main function for the Story Illustrator Streamlit app."""
st.title("AI Story Illustrator")
st.write("Generate beautiful illustrations for your stories using AI")
# Create tabs for different sections
tab1, tab2, tab3 = st.tabs(["Story Input", "Illustration Settings", "Generate & Export"])
# Initialize session state variables if they don't exist
if "story_text" not in st.session_state:
st.session_state.story_text = ""
if "segments" not in st.session_state:
st.session_state.segments = []
if "illustrations" not in st.session_state:
st.session_state.illustrations = []
if "book_title" not in st.session_state:
st.session_state.book_title = ""
if "book_author" not in st.session_state:
st.session_state.book_author = ""
if "illustration_style" not in st.session_state:
st.session_state.illustration_style = DEFAULT_STYLE
if "aspect_ratio" not in st.session_state:
st.session_state.aspect_ratio = DEFAULT_ASPECT_RATIO
if "temp_files" not in st.session_state:
st.session_state.temp_files = []
# Tab 1: Story Input
with tab1:
st.header("Step 1: Input Your Story")
# Input method selection
input_method = st.radio(
"Choose input method:",
["Text Input", "File Upload", "URL"]
)
if input_method == "Text Input":
st.session_state.story_text = st.text_area(
"Enter your story text:",
value=st.session_state.story_text,
height=300,
max_chars=MAX_STORY_LENGTH,
help="Enter the story text you want to illustrate (max 10,000 characters)"
)
elif input_method == "File Upload":
uploaded_file = st.file_uploader("Upload a text file:", type=["txt", "md"])
if uploaded_file is not None:
try:
st.session_state.story_text = uploaded_file.getvalue().decode("utf-8")
st.success(f"Successfully loaded file: {uploaded_file.name}")
st.text_area("Preview:", value=st.session_state.story_text[:500] + "...", height=200, disabled=True)
except Exception as e:
st.error(f"Error reading file: {e}")
elif input_method == "URL":
url = st.text_input("Enter URL containing the story:")
if url:
if st.button("Extract Text from URL"):
with st.spinner("Extracting text from URL..."):
extracted_text = extract_text_from_url(url)
if extracted_text:
st.session_state.story_text = extracted_text
st.success("Successfully extracted text from URL")
st.text_area("Preview:", value=st.session_state.story_text[:500] + "...", height=200, disabled=True)
else:
st.error("Failed to extract text from URL")
# Book metadata
st.subheader("Book Metadata")
col1, col2 = st.columns(2)
with col1:
st.session_state.book_title = st.text_input("Book Title:", value=st.session_state.book_title)
with col2:
st.session_state.book_author = st.text_input("Author:", value=st.session_state.book_author)
# Process story into segments
if st.session_state.story_text:
if st.button("Process Story into Segments"):
with st.spinner("Processing story into segments..."):
st.session_state.segments = segment_story(st.session_state.story_text)
st.success(f"Story processed into {len(st.session_state.segments)} segments")
# Initialize illustrations list with None values
st.session_state.illustrations = [None] * len(st.session_state.segments)
# Display segments
st.subheader("Story Segments")
for i, segment in enumerate(st.session_state.segments):
with st.expander(f"Segment {i+1}"):
st.write(segment)
# Tab 2: Illustration Settings
with tab2:
st.header("Step 2: Configure Illustration Settings")
# Style selection
st.subheader("Illustration Style")
style_options = [
"Digital Art",
"Watercolor Painting",
"Pencil Sketch",
"Oil Painting",
"Cartoon",
"Anime",
"3D Render",
"Pixel Art",
"Children's Book Illustration",
"Comic Book Style",
"Fantasy Art",
"Realistic"
]
st.session_state.illustration_style = st.selectbox(
"Choose an illustration style:",
style_options,
index=style_options.index(st.session_state.illustration_style) if st.session_state.illustration_style in style_options else 0
)
# Custom style input
use_custom_style = st.checkbox("Use custom style")
if use_custom_style:
custom_style = st.text_input("Describe your custom style:",
placeholder="e.g., Impressionist painting with vibrant colors and visible brushstrokes")
if custom_style:
st.session_state.illustration_style = custom_style
# Display style examples
st.info("💡 The style you choose will significantly impact the look and feel of your illustrations.")
# Aspect ratio selection
st.subheader("Image Settings")
aspect_ratio_options = {
"16:9 (Widescreen)": "16:9",
"4:3 (Standard)": "4:3",
"1:1 (Square)": "1:1"
}
selected_ratio = st.selectbox(
"Choose aspect ratio:",
list(aspect_ratio_options.keys()),
index=list(aspect_ratio_options.values()).index(st.session_state.aspect_ratio) if st.session_state.aspect_ratio in aspect_ratio_options.values() else 0
)
st.session_state.aspect_ratio = aspect_ratio_options[selected_ratio]
# Advanced settings
with st.expander("Advanced Settings"):
st.slider("Number of segments to illustrate:", 1,
max(len(st.session_state.segments), 1) if st.session_state.segments else 1,
min(len(st.session_state.segments), MAX_SEGMENTS) if st.session_state.segments else 1,
key="num_segments_to_illustrate")
st.checkbox("Generate cover image", value=True, key="generate_cover")
st.checkbox("Add text to illustrations", value=False, key="add_text_to_illustrations")
# Tab 3: Generate & Export
with tab3:
st.header("Step 3: Generate Illustrations & Export")
if not st.session_state.segments:
st.warning("Please process your story into segments in Step 1 before generating illustrations.")
else:
# Generate illustrations
st.subheader("Generate Illustrations")
num_segments = min(len(st.session_state.segments), st.session_state.get("num_segments_to_illustrate", len(st.session_state.segments)))
if st.button("Generate All Illustrations"):
with st.spinner(f"Generating {num_segments} illustrations... This may take a while."):
progress_bar = st.progress(0)
for i in range(num_segments):
# Update progress
progress_bar.progress((i) / num_segments)
st.write(f"Generating illustration {i+1} of {num_segments}...")
# Generate illustration
illustration_path = create_illustration(
st.session_state.segments[i],
st.session_state.illustration_style,
st.session_state.aspect_ratio
)
# Store the illustration path
if illustration_path:
st.session_state.illustrations[i] = illustration_path
st.session_state.temp_files.append(illustration_path)
# Complete progress
progress_bar.progress(1.0)
st.success(f"Generated {num_segments} illustrations!")
# Generate individual illustrations
st.subheader("Generate Individual Illustrations")
for i in range(num_segments):
col1, col2 = st.columns([3, 1])
with col1:
with st.expander(f"Segment {i+1}"):
st.write(st.session_state.segments[i][:300] + "..." if len(st.session_state.segments[i]) > 300 else st.session_state.segments[i])
with col2:
if st.button(f"Generate #{i+1}", key=f"gen_btn_{i}"):
with st.spinner(f"Generating illustration {i+1}..."):
illustration_path = create_illustration(
st.session_state.segments[i],
st.session_state.illustration_style,
st.session_state.aspect_ratio
)
if illustration_path:
st.session_state.illustrations[i] = illustration_path
st.session_state.temp_files.append(illustration_path)
st.success(f"Generated illustration {i+1}!")
# Display generated illustrations
st.subheader("Preview Illustrations")
if any(st.session_state.illustrations):
for i, illustration_path in enumerate(st.session_state.illustrations[:num_segments]):
if illustration_path and os.path.exists(illustration_path):
with st.expander(f"Illustration {i+1}"):
st.image(illustration_path, caption=f"Illustration for Segment {i+1}", use_column_width=True)
# Regenerate button
if st.button(f"Regenerate", key=f"regen_btn_{i}"):
with st.spinner(f"Regenerating illustration {i+1}..."):
new_illustration_path = create_illustration(
st.session_state.segments[i],
st.session_state.illustration_style,
st.session_state.aspect_ratio
)
if new_illustration_path:
st.session_state.illustrations[i] = new_illustration_path
st.session_state.temp_files.append(new_illustration_path)
st.rerun()
else:
st.info("No illustrations generated yet. Click 'Generate All Illustrations' or generate individual illustrations.")
# Export options
st.subheader("Export Options")
if any(st.session_state.illustrations):
export_format = st.radio(
"Export format:",
["PDF Storybook", "Individual Images (ZIP)", "Both"]
)
if st.button("Export"):
with st.spinner("Preparing export..."):
# Create temporary directory for exports
with tempfile.TemporaryDirectory() as temp_dir:
# Filter out None values from illustrations
valid_illustrations = [path for path in st.session_state.illustrations[:num_segments] if path and os.path.exists(path)]
valid_segments = st.session_state.segments[:len(valid_illustrations)]
# Prepare filenames
safe_title = "".join(c if c.isalnum() else "_" for c in st.session_state.book_title) if st.session_state.book_title else "story"
timestamp = int(time.time())
# Export as PDF
if export_format in ["PDF Storybook", "Both"]:
pdf_path = os.path.join(temp_dir, f"{safe_title}_{timestamp}.pdf")
try:
pdf_result = create_storybook_pdf(
valid_segments,
valid_illustrations,
st.session_state.book_title or "Untitled Story",
st.session_state.book_author or "Anonymous",
pdf_path
)
if pdf_result:
with open(pdf_path, "rb") as f:
st.download_button(
label="Download PDF Storybook",
data=f,
file_name=f"{safe_title}.pdf",
mime="application/pdf"
)
except Exception as e:
st.error(f"Error creating PDF: {e}")
st.info("Please install ReportLab to enable PDF export: pip install reportlab")
# Export as ZIP of images
if export_format in ["Individual Images (ZIP)", "Both"]:
zip_path = os.path.join(temp_dir, f"{safe_title}_illustrations_{timestamp}.zip")
# Prepare files for ZIP
files_to_zip = {}
for i, img_path in enumerate(valid_illustrations):
if img_path and os.path.exists(img_path):
files_to_zip[f"illustration_{i+1}.png"] = img_path
zip_result = create_zip_archive(files_to_zip, zip_path)
if zip_result:
with open(zip_path, "rb") as f:
st.download_button(
label="Download Illustrations ZIP",
data=f,
file_name=f"{safe_title}_illustrations.zip",
mime="application/zip"
)
else:
st.info("Generate illustrations before exporting.")
# Cleanup temporary files when the session ends
def cleanup_temp_files():
for file_path in st.session_state.temp_files:
try:
if file_path and os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
logger.error(f"Error removing temporary file {file_path}: {e}")
# Register the cleanup function to run when the session ends
import atexit
atexit.register(cleanup_temp_files)
if __name__ == "__main__":
write_story_illustrator()

View File

@@ -0,0 +1,450 @@
"""
Utility functions for the AI Story Illustrator module.
This module provides helper functions for file operations, string manipulation,
and simple text analysis relevant to story processing.
"""
import os
import re
import tempfile
import uuid
import logging
import shutil
from pathlib import Path
from typing import List, Tuple, Optional, Union
# Attempt to import Pillow for image dimensions, but don't fail if not installed
# unless the specific function is called.
try:
from PIL import Image
_PIL_AVAILABLE = True
except ImportError:
_PIL_AVAILABLE = False
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger('story_illustrator_utils')
# --- Constants ---
IMAGE_EXTENSIONS = frozenset(['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'])
TEXT_EXTENSIONS = frozenset(['.txt', '.md', '.text'])
# Common English words that often start sentences, excluded from simple name detection
COMMON_START_WORDS = frozenset([
'The', 'A', 'An', 'And', 'But', 'Or', 'For', 'Nor', 'So', 'Yet', 'He', 'She',
'It', 'They', 'We', 'You', 'I', 'In', 'On', 'At', 'To', 'From', 'With',
'About', 'As', 'Is', 'Was', 'Were', 'Be', 'Been', 'Being', 'Have', 'Has',
'Had', 'Do', 'Does', 'Did', 'Will', 'Would', 'Shall', 'Should', 'May',
'Might', 'Must', 'Can', 'Could'
])
# --- File/Directory Operations ---
def create_temp_directory(prefix: str = "story_illustrator_") -> str:
"""
Creates a temporary directory using tempfile.mkdtemp.
Args:
prefix: A prefix for the temporary directory name.
Returns:
The absolute path to the created temporary directory.
"""
try:
temp_dir = tempfile.mkdtemp(prefix=prefix)
logger.info(f"Created temporary directory: {temp_dir}")
return temp_dir
except Exception as e:
logger.error(f"Failed to create temporary directory: {e}", exc_info=True)
raise # Re-raise the exception after logging
def sanitize_filename(filename: str) -> str:
"""
Sanitizes a filename by removing/replacing invalid characters for common filesystems.
Args:
filename: The original filename string.
Returns:
A sanitized filename string suitable for use in file paths.
"""
if not isinstance(filename, str):
logger.warning("sanitize_filename received non-string input, converting.")
filename = str(filename)
# Remove characters invalid for Windows/Unix filenames
# Replace them with an underscore.
sanitized = re.sub(r'[\\/*?:"<>|\']', "_", filename)
# Replace consecutive underscores/spaces with a single underscore
sanitized = re.sub(r'[_ ]+', '_', sanitized)
# Remove leading/trailing spaces, dots, and underscores
sanitized = sanitized.strip("._ ")
# Ensure the filename is not empty after sanitization
if not sanitized:
sanitized = "unnamed_file"
logger.warning("Filename was empty after sanitization, using default.")
# Limit filename length (optional, adjust as needed)
# max_len = 255 # Example limit
# if len(sanitized) > max_len:
# name, ext = os.path.splitext(sanitized)
# sanitized = name[:max_len - len(ext) - 1] + "_" + ext
# logger.warning(f"Filename truncated to maximum length: {sanitized}")
return sanitized
def get_temp_file_path(
directory: str, prefix: str = "file_", suffix: str = ".tmp"
) -> str:
"""
Generates a unique temporary file path within the specified directory.
Args:
directory: The directory where the temporary file should be located.
prefix: A prefix for the filename.
suffix: A suffix (extension) for the filename.
Returns:
The full path for the unique temporary file.
"""
# Ensure suffix starts with a dot if it's meant to be an extension
if suffix and not suffix.startswith("."):
suffix = "." + suffix
unique_id = uuid.uuid4().hex[:12] # Longer hex UUID for better uniqueness
filename = f"{prefix}{unique_id}{suffix}"
return os.path.join(directory, filename)
def ensure_directory_exists(directory: Union[str, Path]) -> str:
"""
Ensures that a directory exists, creating it recursively if necessary.
Args:
directory: The path to the directory (string or Path object).
Returns:
The absolute path to the directory as a string.
Raises:
OSError: If the directory cannot be created (e.g., permission issues).
"""
dir_path = Path(directory).resolve() # Use Pathlib for robust handling
try:
dir_path.mkdir(parents=True, exist_ok=True)
# Log only if it needed creation (or if verbose logging is on)
# logger.info(f"Ensured directory exists: {dir_path}")
return str(dir_path)
except OSError as e:
logger.error(f"Failed to create or access directory {dir_path}: {e}", exc_info=True)
raise
def cleanup_directory(directory: Union[str, Path]) -> None:
"""
Removes a directory and all its contents recursively. Handles errors gracefully.
Args:
directory: The path to the directory to remove (string or Path object).
"""
dir_path = Path(directory)
if not dir_path.exists():
logger.debug(f"Cleanup skipped: Directory '{directory}' does not exist.")
return
if not dir_path.is_dir():
logger.warning(f"Cleanup warning: Path '{directory}' is not a directory.")
return
try:
shutil.rmtree(dir_path)
logger.info(f"Successfully removed directory: {directory}")
except OSError as e:
logger.error(f"Error removing directory {directory}: {e}", exc_info=True)
except Exception as e:
logger.error(
f"Unexpected error removing directory {directory}: {e}", exc_info=True
)
# --- File Type Checks ---
def get_file_extension(file_path: Union[str, Path]) -> str:
"""
Gets the lowercased file extension (including the dot) from a file path.
Args:
file_path: The path to the file (string or Path object).
Returns:
The file extension (e.g., '.txt', '.png') or an empty string if no extension.
"""
return Path(file_path).suffix.lower()
def is_image_file(file_path: Union[str, Path]) -> bool:
"""
Checks if a file is likely an image based on its extension.
Args:
file_path: The path to the file (string or Path object).
Returns:
True if the file extension is in IMAGE_EXTENSIONS, False otherwise.
"""
return get_file_extension(file_path) in IMAGE_EXTENSIONS
def is_text_file(file_path: Union[str, Path]) -> bool:
"""
Checks if a file is likely a text file based on its extension.
Args:
file_path: The path to the file (string or Path object).
Returns:
True if the file extension is in TEXT_EXTENSIONS, False otherwise.
"""
return get_file_extension(file_path) in TEXT_EXTENSIONS
# --- Text Analysis (Simple Heuristics) ---
def extract_story_title_from_text(text: str) -> str:
"""
Attempts to extract a title from story text using simple heuristics.
Looks for patterns (in order):
1. Markdown headers (#, ##, etc.) at the start of a line.
2. The first non-empty line if it's short (< 100 chars) and followed by
a blank line or is the only line.
3. The first non-empty line if it's entirely in uppercase (< 100 chars).
Args:
text: The story text content.
Returns:
An extracted title string, or "Untitled Story" if no pattern matches.
"""
if not isinstance(text, str) or not text.strip():
return "Untitled Story"
# 1. Check for markdown headers ( # Title, ## Title )
# Needs to match start of line (^) with optional whitespace before #
header_match = re.search(r'^\s*#+\s+(.+)$', text.strip(), re.MULTILINE)
if header_match:
title = header_match.group(1).strip()
if title: return title
lines = text.strip().split('\n')
if not lines:
return "Untitled Story"
first_line = lines[0].strip()
if not first_line: # Skip if first line is blank
if len(lines) > 1:
first_line = lines[1].strip() # Try second line
else:
return "Untitled Story"
if not first_line: # Still no title found
return "Untitled Story"
# 2. Check if first line is short and potentially a title
is_short = len(first_line) < 100
is_followed_by_blank = len(lines) > 1 and not lines[1].strip()
is_only_line = len(lines) == 1
if is_short and (is_followed_by_blank or is_only_line):
return first_line
# 3. Check if first line is all caps (and short)
is_all_caps = first_line == first_line.upper() and first_line.isalpha() # Check if it contains letters
if is_short and is_all_caps:
return first_line
# Default if no other pattern matched
return "Untitled Story"
def estimate_reading_time(text: str, words_per_minute: int = 200) -> float:
"""
Estimates the reading time of a text in minutes.
Args:
text: The text content.
words_per_minute: The assumed average reading speed.
Returns:
The estimated reading time in minutes. Returns 0.0 for empty text.
"""
if not isinstance(text, str) or not text.strip():
return 0.0
if words_per_minute <= 0:
raise ValueError("words_per_minute must be positive.")
word_count = len(text.split())
minutes = word_count / words_per_minute
return minutes
def count_sentences(text: str) -> int:
"""
Counts the number of sentences in a text using a very simple heuristic.
Note: This is a basic implementation counting sentence-ending punctuation
(. ! ?). It will be inaccurate with abbreviations (Mr., Mrs., etc.),
ellipses, and complex sentence structures.
Args:
text: The text content.
Returns:
An estimated count of sentences. Returns 0 for empty text.
"""
if not isinstance(text, str) or not text.strip():
return 0
# Find sequences of one or more sentence-ending punctuation marks
sentence_endings = re.findall(r'[.!?]+', text)
count = len(sentence_endings)
# Handle edge case where text might not end with punctuation but isn't empty
if count == 0 and len(text.strip()) > 0:
return 1 # Assume at least one sentence if text exists but no terminators found
return count
def extract_character_names(text: str, min_occurrences: int = 2) -> List[str]:
"""
Attempts to extract potential character names from story text.
Note: This is a simple heuristic based on finding capitalized words
(excluding common sentence starters) that appear multiple times. It has
limitations and may produce false positives or miss actual names.
Args:
text: The story text content.
min_occurrences: The minimum number of times a capitalized word must
appear to be considered a potential name.
Returns:
A list of potential character name strings.
"""
if not isinstance(text, str) or not text.strip():
return []
if min_occurrences < 1:
min_occurrences = 1 # Ensure at least one occurrence is required
# Find words starting with an uppercase letter, potentially followed by lowercase
# Allows for single-letter names like 'X' but focuses on typical Name structure
capitalized_words = re.findall(r'\b[A-Z][a-zA-Z]*\b', text)
# Count occurrences, excluding common words
word_counts: Dict[str, int] = {}
for word in capitalized_words:
if word not in COMMON_START_WORDS:
word_counts[word] = word_counts.get(word, 0) + 1
# Filter for words that meet the minimum occurrence threshold
potential_names = [
word for word, count in word_counts.items() if count >= min_occurrences
]
# Sort for consistency (optional)
potential_names.sort()
return potential_names
def extract_setting_details(text: str) -> List[str]:
"""
Attempts to extract potential setting details using simple regex patterns.
Note: This is a very basic heuristic looking for common prepositional
phrases (e.g., "in the forest", "at the castle"). It is highly limited
and likely to miss many setting details or extract irrelevant phrases.
Args:
text: The story text content.
Returns:
A list of potential setting phrases found.
"""
if not isinstance(text, str) or not text.strip():
return []
# Patterns looking for prepositions followed by nouns/adjectives
# Making patterns slightly more general:
# (\b\w+\b) captures single words
# (\b\w+\s+\w+\b) captures two-word phrases
# (\b[A-Z]\w*\b) captures capitalized words (potential proper nouns)
setting_patterns = [
r'\b(?:in|on|at|near|beside|inside|outside|under|over|through)\s+(?:the|a|an)\s+((?:[A-Z]\w*|\w+)(?:\s+\w+){0,2})\b', # e.g., in the old house
r'\b(?:in|on|at)\s+((?:[A-Z]\w+)(?:\s+[A-Z]\w+)*)\b', # e.g., in New York City
r'\b(?:during|before|after)\s+(?:the|a|an)\s+(\w+(?:\s+\w+){0,2})\b', # e.g., during the storm
]
settings_found = set() # Use a set to avoid duplicates
for pattern in setting_patterns:
try:
matches = re.findall(pattern, text, re.IGNORECASE) # Ignore case
for match in matches:
# If match is tuple due to multiple capture groups, join them?
# For these patterns, it should be single strings.
if isinstance(match, str):
phrase = match.strip()
if phrase and len(phrase.split()) <= 5: # Limit phrase length
settings_found.add(phrase)
except re.error as e:
logger.warning(f"Regex error in extract_setting_details: {e} with pattern: {pattern}")
# Convert set back to list and sort for consistency
sorted_settings = sorted(list(settings_found))
return sorted_settings
# --- Image Operations ---
def get_image_dimensions(image_path: Union[str, Path]) -> Optional[Tuple[int, int]]:
"""
Gets the (width, height) dimensions of an image file using Pillow.
Args:
image_path: The path to the image file (string or Path object).
Returns:
A tuple (width, height) if successful, or None if the file is not
a valid image, Pillow is not installed, or an error occurs.
"""
if not _PIL_AVAILABLE:
logger.warning("Pillow (PIL) library not installed. Cannot get image dimensions.")
return None
img_path = Path(image_path)
if not img_path.is_file():
logger.error(f"Image file not found or is not a file: {image_path}")
return None
try:
with Image.open(img_path) as img:
width, height = img.size
logger.debug(f"Dimensions for {image_path}: {width}x{height}")
return width, height
except FileNotFoundError:
logger.error(f"Image file not found at path: {image_path}")
return None
except UnidentifiedImageError: # Specific Pillow error for invalid images
logger.error(f"Could not identify image file (invalid format or corrupted): {image_path}")
return None
except Exception as e:
logger.error(f"Error getting dimensions for image {image_path}: {e}", exc_info=True)
return None

View File

@@ -0,0 +1,31 @@
# AI Story Video Generator
This module allows users to generate animated story videos using AI. It leverages Google's Gemini model to create stories and generate images for each scene, then combines them into a video.
## Features
- Generate complete stories based on user prompts
- Create scene-by-scene storyboards
- Generate images for each scene using Gemini
- Compile images into an animated video
- Add background music and text overlays
- Export videos in MP4 format
## How It Works
1. User provides a story prompt and preferences
2. AI generates a complete story with multiple scenes
3. For each scene, an image is generated
4. Images are compiled into a video with transitions
5. Optional background music and text overlays are added
6. The final video is available for download
## Requirements
- Google Gemini API key
- FFmpeg for video processing
- Python libraries: moviepy, pillow, requests
## Usage
Access this tool through the Streamlit interface by selecting "AI Story Video Generator" from the main menu.

View File

@@ -0,0 +1,4 @@
# AI Story Video Generator module
from .story_video_generator import write_story_video_generator
__all__ = ["write_story_video_generator"]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,64 @@
"""
Utility functions for the AI Story Video Generator.
"""
import os
import tempfile
import uuid
from pathlib import Path
from typing import Optional
# Constants
TEMP_DIR = Path(tempfile.gettempdir()) / "alwrity_story_generator"
def ensure_temp_dir() -> Path:
"""Ensure the temporary directory exists and return its path."""
os.makedirs(TEMP_DIR, exist_ok=True)
return TEMP_DIR
def get_temp_filepath(prefix: str, extension: str) -> str:
"""Generate a temporary file path with the given prefix and extension."""
temp_dir = ensure_temp_dir()
return str(temp_dir / f"{prefix}_{uuid.uuid4()}.{extension}")
def clean_temp_files(older_than_hours: int = 24) -> int:
"""
Clean temporary files older than the specified number of hours.
Args:
older_than_hours: Remove files older than this many hours
Returns:
Number of files removed
"""
import time
from datetime import datetime, timedelta
temp_dir = ensure_temp_dir()
cutoff_time = time.time() - (older_than_hours * 3600)
count = 0
for file_path in temp_dir.glob("*"):
if file_path.is_file() and file_path.stat().st_mtime < cutoff_time:
try:
file_path.unlink()
count += 1
except Exception:
pass
return count
def format_duration(seconds: float) -> str:
"""Format seconds into a MM:SS string."""
minutes = int(seconds // 60)
remaining_seconds = int(seconds % 60)
return f"{minutes}:{remaining_seconds:02d}"
def sanitize_filename(filename: str) -> str:
"""Sanitize a string to be used as a filename."""
import re
# Remove invalid characters
sanitized = re.sub(r'[^\w\s-]', '', filename)
# Replace spaces with underscores
sanitized = sanitized.strip().replace(' ', '_')
return sanitized

View File

@@ -494,36 +494,89 @@ def do_web_research():
with progress_col:
progress_bar = st.progress(0)
def update_progress(message, progress=None, level="info"):
"""Update progress bar and status display.
Args:
message (str): The message to display
progress (float, optional): Progress value between 0 and 100. Will be converted to 0.0-1.0
level (str, optional): Message level (info, warning, error, success)
"""
if progress is not None:
# Convert percentage to decimal (0.0-1.0)
progress = float(progress) / 100.0
# Ensure progress stays within bounds
progress = max(0.0, min(1.0, progress))
progress_bar.progress(progress)
if level == "error":
status_display.error(f"🚫 {message}")
elif level == "warning":
status_display.warning(f"⚠️ {message}")
elif level == "success":
status_display.success(f"{message}")
else:
status_display.info(f"🔄 {message}")
logger.debug(f"Progress update [{level}]: {message}")
# Execute search with all parameters
web_research_result = gpt_web_researcher(
search_keywords=st.session_state.research_options["primary_keywords"],
search_mode=st.session_state.research_options["search_mode"],
related_keywords=st.session_state.research_options["related_keywords"],
target_audience=st.session_state.research_options["target_audience"],
content_type=st.session_state.research_options["content_type"],
search_depth=st.session_state.research_options["search_depth"],
geo_location=st.session_state.research_options["geo_location"],
search_language=st.session_state.research_options["search_language"],
num_results=st.session_state.research_options["num_results"],
time_range=st.session_state.research_options["time_range"],
include_domains=st.session_state.research_options["include_domains"],
similar_url=st.session_state.research_options["similar_url"]
)
try:
update_progress("Starting search...", 0.25)
logger.info(f"Executing web research with mode: {st.session_state.research_options['search_mode']}")
# Create base parameters
research_params = {
"search_keywords": st.session_state.research_options["primary_keywords"],
"search_mode": st.session_state.research_options["search_mode"],
"related_keywords": st.session_state.research_options["related_keywords"],
"target_audience": st.session_state.research_options["target_audience"],
"content_type": st.session_state.research_options["content_type"],
"search_depth": st.session_state.research_options["search_depth"],
"geo_location": st.session_state.research_options["geo_location"],
"search_language": st.session_state.research_options["search_language"],
"num_results": st.session_state.research_options["num_results"],
"time_range": st.session_state.research_options["time_range"],
"include_domains": st.session_state.research_options["include_domains"],
"similar_url": st.session_state.research_options["similar_url"]
}
# Add UI-specific parameters
research_params.update({
"status_container": status_display,
"update_progress": update_progress
})
# For AI search mode, ensure search_keywords is passed correctly
if st.session_state.research_options["search_mode"] == "ai":
research_params["tavily_params"] = {
"max_results": st.session_state.research_options["num_results"],
"search_depth": "advanced" if st.session_state.research_options["search_depth"] > 2 else "basic",
"time_range": st.session_state.research_options["time_range"],
"include_domains": st.session_state.research_options["include_domains"].split(",") if st.session_state.research_options["include_domains"] else [""]
}
# Pass search_keywords as a positional argument
research_params["tavily_search_keywords"] = st.session_state.research_options["primary_keywords"]
# Execute the research
web_research_result = gpt_web_researcher(**research_params)
if web_research_result:
status_display.success("✨ Research completed!")
# Display results in an organized way
with st.expander("📊 Research Results", expanded=False):
st.write(web_research_result)
else:
st.warning("No results found for your search")
except Exception as e:
error_msg = f"Research failed: {str(e)}"
logger.error(error_msg, exc_info=True)
st.error(f"🚫 Research failed: {error_msg}")
if web_research_result:
status_display.success("✨ Research completed!")
# Display results in an organized way
with st.expander("📊 Research Results", expanded=False):
st.write(web_research_result)
else:
st.warning("No results found for your search")
except Exception as e:
error_msg = f"Research failed: {str(e)}"
logger.error(error_msg, exc_info=True)
st.error(f"🚫 Research failed: {error_msg}")
logger.error(f"Unexpected error in web research: {e}", exc_info=True)
st.error("🚫 An unexpected error occurred. Please try again.")
except Exception as e:
logger.error(f"Unexpected error in web research: {e}", exc_info=True)
st.error("🚫 An unexpected error occurred. Please try again.")

View File

@@ -3,7 +3,22 @@ from lib.ai_web_researcher.metaphor_basic_neural_web_search import metaphor_find
from datetime import datetime, timedelta
import re
import urllib.parse
import logging
# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Create console handler if it doesn't exist
if not logger.handlers:
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
def is_valid_url(url):
"""
@@ -17,15 +32,20 @@ def is_valid_url(url):
"""
try:
result = urllib.parse.urlparse(url)
return all([result.scheme, result.netloc])
except:
is_valid = all([result.scheme, result.netloc])
logger.debug(f"URL validation for {url}: {is_valid}")
return is_valid
except Exception as e:
logger.error(f"URL validation error for {url}: {str(e)}")
return False
def competitor_analysis():
logger.info("Starting competitor analysis")
# Initialize session state for progress bar visibility
if 'show_progress' not in st.session_state:
st.session_state.show_progress = True
logger.debug("Initialized show_progress session state")
st.title("Competitor Analysis")
st.markdown("""**Use Cases:**
@@ -44,6 +64,7 @@ def competitor_analysis():
# Validate URL
url_valid = is_valid_url(similar_url) if similar_url else False
if similar_url and not url_valid:
logger.warning(f"Invalid URL provided: {similar_url}")
st.error("⚠️ Please enter a valid URL including http:// or https://")
# Usecase selection with improved help
@@ -52,6 +73,7 @@ def competitor_analysis():
["similar companies", "listicles", "Top tools", "alternative-to", "similar products", "similar websites"],
help="Choose the type of analysis you want to perform"
)
logger.debug(f"Selected usecase: {usecase}")
# Default summary query based on usecase
default_summary_queries = {
@@ -65,6 +87,7 @@ def competitor_analysis():
# Advanced options using a modal dialog
show_advanced = st.checkbox("Show Advanced Options", help="Configure additional search parameters")
logger.debug(f"Advanced options shown: {show_advanced}")
# Initialize default values
num_results = 5
@@ -96,6 +119,7 @@ def competitor_analysis():
# Advanced options section
if show_advanced:
logger.debug("Processing advanced options")
st.markdown("### 🔧 Advanced Search Options")
# Summary query with improved help in a card
@@ -192,6 +216,9 @@ def competitor_analysis():
if st.button("Analyze", disabled=not url_valid if similar_url else False):
if similar_url and url_valid:
try:
logger.info(f"Starting analysis for URL: {similar_url}")
logger.debug(f"Analysis parameters - Usecase: {usecase}, Num Results: {num_results}, Time Range: {time_range}")
# Create a progress container
progress_container = st.empty()
status_container = st.empty()
@@ -201,11 +228,8 @@ def competitor_analysis():
status_container.info(f"Starting analysis for the URL: {similar_url}")
# Create a progress bar
progress_bar = progress_container.progress(0)
# Update progress and status
progress_bar.progress(10)
status_container.info("Initializing search parameters...")
progress_bar = progress_container.progress(0.1)
logger.debug("Initialized progress bar and status containers")
# Calculate date range based on selection
start_date = None
@@ -219,6 +243,7 @@ def competitor_analysis():
start_date = end_date - timedelta(days=30)
elif time_range == "Past Year":
start_date = end_date - timedelta(days=365)
logger.debug(f"Date range: {start_date} to {end_date}")
# Format dates for API if they exist
start_published_date = start_date.strftime("%Y-%m-%dT%H:%M:%S.000Z") if start_date else None
@@ -228,41 +253,48 @@ def competitor_analysis():
summary_query_param = None
if summary_query:
summary_query_param = {"query": summary_query}
logger.debug(f"Summary query: {summary_query}")
# Update progress
progress_bar.progress(20)
progress_bar.progress(0.2)
status_container.info("Searching for similar content...")
logger.info("Initiating similar content search")
# Call the metaphor_find_similar function with all parameters
with st.spinner("Performing competitor analysis..."):
# Update progress
progress_bar.progress(30)
status_container.info("Finding similar content...")
# Call the API
df, search_response = metaphor_find_similar(
similar_url=similar_url,
usecase=usecase,
num_results=num_results,
start_published_date=start_published_date,
end_published_date=end_published_date,
include_domains=include_domains,
exclude_domains=exclude_domains,
include_text=include_text,
exclude_text=exclude_text,
summary_query=summary_query_param
)
# Update progress
progress_bar.progress(70)
status_container.info("Processing and analyzing results...")
# Update progress to complete
progress_bar.progress(100)
status_container.success("Analysis completed successfully!")
logger.debug("Calling metaphor_find_similar API")
try:
df, search_response = metaphor_find_similar(
similar_url=similar_url,
usecase=usecase,
num_results=num_results,
start_published_date=start_published_date,
end_published_date=end_published_date,
include_domains=include_domains,
exclude_domains=exclude_domains,
include_text=include_text,
exclude_text=exclude_text,
summary_query=summary_query_param
)
logger.info(f"API call successful. Found {len(df) if not df.empty else 0} results")
# Update progress
progress_bar.progress(0.7)
status_container.info("Processing and analyzing results...")
logger.debug("Processing search results")
# Update progress to complete
progress_bar.progress(1.0)
status_container.success("Analysis completed successfully!")
logger.info("Analysis completed successfully")
except Exception as api_error:
logger.error(f"API call failed: {str(api_error)}", exc_info=True)
raise
# Display results using data editor
# Display results
if not df.empty:
logger.debug(f"Displaying {len(df)} results")
st.subheader("📊 Competitor Analysis Results")
# Add a download button for the results
@@ -331,8 +363,12 @@ def competitor_analysis():
with st.expander("View Raw Data"):
st.json(search_response)
else:
logger.warning("No results found for the given URL and parameters")
st.warning("No results found for the given URL and parameters.")
except Exception as err:
logger.error(f"Analysis failed: {str(err)}", exc_info=True)
st.error(f"✖ 🚫 Failed to do similar search.\nError: {err}")
else:
logger.warning("Analysis attempted without valid URL")
st.error("Please enter a valid URL.")