From 19ff21a8a11b50b8d8245444baba3e4241148126 Mon Sep 17 00:00:00 2001 From: ajaysi Date: Fri, 2 May 2025 23:09:43 +0530 Subject: [PATCH] story illustrator and story video generator, AI web researcher fixes --- .../gpt_online_researcher.py | 27 +- .../metaphor_basic_neural_web_search.py | 172 ++- .../keywords_to_blog_streamlit.py | 8 - lib/ai_writers/ai_story_illustrator/README.md | 75 ++ .../ai_story_illustrator/__init__.py | 7 + .../ai_story_illustrator/story_illustrator.py | 727 ++++++++++++ lib/ai_writers/ai_story_illustrator/utils.py | 450 +++++++ .../ai_story_video_generator/README.md | 31 + .../ai_story_video_generator/__init__.py | 4 + .../story_video_generator.py | 1033 +++++++++++++++++ .../ai_story_video_generator/utils.py | 64 + lib/alwrity_ui/keyword_web_researcher.py | 107 +- lib/alwrity_ui/similar_analysis.py | 106 +- 13 files changed, 2625 insertions(+), 186 deletions(-) create mode 100644 lib/ai_writers/ai_story_illustrator/README.md create mode 100644 lib/ai_writers/ai_story_illustrator/__init__.py create mode 100644 lib/ai_writers/ai_story_illustrator/story_illustrator.py create mode 100644 lib/ai_writers/ai_story_illustrator/utils.py create mode 100644 lib/ai_writers/ai_story_video_generator/README.md create mode 100644 lib/ai_writers/ai_story_video_generator/__init__.py create mode 100644 lib/ai_writers/ai_story_video_generator/story_video_generator.py create mode 100644 lib/ai_writers/ai_story_video_generator/utils.py diff --git a/lib/ai_web_researcher/gpt_online_researcher.py b/lib/ai_web_researcher/gpt_online_researcher.py index f301ca48..9ec31a92 100644 --- a/lib/ai_web_researcher/gpt_online_researcher.py +++ b/lib/ai_web_researcher/gpt_online_researcher.py @@ -150,7 +150,7 @@ def gpt_web_researcher(search_keywords, search_mode, **kwargs): # Pass the parameters to do_tavily_ai_search t_results = do_tavily_ai_search( - keywords=search_keywords, + search_keywords, # Pass as positional argument max_results=kwargs.get('num_results', 10), include_domains=include_domains, search_depth=search_depth, @@ -258,7 +258,7 @@ def do_google_serp_search(search_keywords, status_container, update_progress, ** try: # Validate parameters - update_progress("Validating search parameters") + update_progress("Validating search parameters", progress=0.1) status_container.info("📝 Validating parameters...") if not search_keywords or not isinstance(search_keywords, str): @@ -266,7 +266,7 @@ def do_google_serp_search(search_keywords, status_container, update_progress, ** raise ValueError("Search keywords must be a non-empty string") # Update search initiation - update_progress(f"Initiating search for: '{search_keywords}'") + update_progress(f"Initiating search for: '{search_keywords}'", progress=0.2) status_container.info("🌐 Querying search API...") logger.info(f"Search params: {kwargs}") @@ -275,26 +275,26 @@ def do_google_serp_search(search_keywords, status_container, update_progress, ** if g_results: # Log success - update_progress("Search completed successfully", "success") + update_progress("Search completed successfully", progress=0.8, level="success") # Update statistics stats = f"""Found: - {len(g_results.get('organic', []))} organic results - {len(g_results.get('peopleAlsoAsk', []))} related questions - {len(g_results.get('relatedSearches', []))} related searches""" - update_progress(stats) + update_progress(stats, progress=0.9) # Process results - update_progress("Processing search results") + update_progress("Processing search results", progress=0.95) status_container.info("⚡ Processing results...") processed_results = process_search_results(g_results) # Extract titles - update_progress("Extracting information") + update_progress("Extracting information", progress=0.98) g_titles = extract_info(g_results, 'titles') # Final success - update_progress("Analysis completed successfully", "success") + update_progress("Analysis completed successfully", progress=1.0, level="success") status_container.success("✨ Research completed!") # Clear main status after delay @@ -313,13 +313,13 @@ def do_google_serp_search(search_keywords, status_container, update_progress, ** } else: - update_progress("No results found", "warning") + update_progress("No results found", progress=0.5, level="warning") status_container.warning("⚠️ No results found") return None except Exception as err: error_msg = f"Search failed: {str(err)}" - update_progress(error_msg, "error") + update_progress(error_msg, progress=0.5, level="error") logger.error(error_msg) logger.debug("Stack trace:", exc_info=True) raise @@ -343,8 +343,11 @@ def do_tavily_ai_search(search_keywords, max_results=10, **kwargs): 'include_domains': kwargs.get('include_domains', [""]) if kwargs.get('include_domains') else [""] } - # Pass the parameters to do_tavily_ai_search - t_results = do_tavily_ai_search( + # Import the Tavily search function directly + from .tavily_ai_search import do_tavily_ai_search as tavily_search + + # Call the actual Tavily search function + t_results = tavily_search( keywords=search_keywords, **tavily_params ) diff --git a/lib/ai_web_researcher/metaphor_basic_neural_web_search.py b/lib/ai_web_researcher/metaphor_basic_neural_web_search.py index d4a85fbc..7a3c470f 100644 --- a/lib/ai_web_researcher/metaphor_basic_neural_web_search.py +++ b/lib/ai_web_researcher/metaphor_basic_neural_web_search.py @@ -65,28 +65,22 @@ def metaphor_rag_search(): return processed_results -def metaphor_find_similar(similar_url, usecase, num_results=5, start_published_date=None, end_published_date=None, - include_domains=None, exclude_domains=None, include_text=None, exclude_text=None, - summary_query=None): - """ - Find similar content using the Metaphor API. - Args: - similar_url (str): The URL to find similar content. - usecase (str): The use case for the search (e.g., "similar companies", "listicles"). - num_results (int): Number of results to return (default: 5). - start_published_date (str): Start date for filtering results in ISO format. - end_published_date (str): End date for filtering results in ISO format. - include_domains (list): List of domains to include in the search. - exclude_domains (list): List of domains to exclude from the search. - include_text (str): Text that must be included in the results. - exclude_text (str): Text that must be excluded from the results. - summary_query (dict): Custom query for summarization. - Returns: - tuple: (DataFrame, MetaphorResponse) - The DataFrame contains the results and the MetaphorResponse contains the raw API response. - """ - metaphor = get_metaphor_client() +def metaphor_find_similar(similar_url, usecase, num_results=5, start_published_date=None, end_published_date=None, + include_domains=None, exclude_domains=None, include_text=None, exclude_text=None, + summary_query=None, progress_bar=None): + """Find similar content using Metaphor API.""" + try: - logger.info(f"Doing similar web search for url: {similar_url}") + # Initialize progress if not provided + if progress_bar is None: + progress_bar = st.progress(0.0) + + # Update progress + progress_bar.progress(0.1, text="Initializing search...") + + # Get Metaphor client + metaphor = get_metaphor_client() + logger.info(f"Initialized Metaphor client for URL: {similar_url}") # Prepare search parameters search_params = { @@ -94,113 +88,83 @@ def metaphor_find_similar(similar_url, usecase, num_results=5, start_published_d "num_results": num_results, } - # Add date parameters if provided + # Add optional parameters if provided if start_published_date: search_params["start_published_date"] = start_published_date if end_published_date: search_params["end_published_date"] = end_published_date - - # Add domain filters if provided if include_domains: search_params["include_domains"] = include_domains if exclude_domains: search_params["exclude_domains"] = exclude_domains - - # Add text filters if provided if include_text: search_params["include_text"] = include_text if exclude_text: search_params["exclude_text"] = exclude_text - # Add summary query if provided + # Add summary query if summary_query: search_params["summary"] = summary_query else: - # Default summary query based on usecase search_params["summary"] = {"query": f"Find {usecase} similar to the given URL."} - - # Execute the search + + logger.debug(f"Search parameters: {search_params}") + + # Update progress + progress_bar.progress(0.2, text="Preparing search parameters...") + + # Make API call + logger.info("Calling Metaphor API find_similar_and_contents...") search_response = metaphor.find_similar_and_contents( similar_url, **search_params ) + + if search_response and hasattr(search_response, 'results'): + competitors = search_response.results + total_results = len(competitors) + + # Update progress + progress_bar.progress(0.3, text=f"Found {total_results} results...") + + # Process results + processed_results = [] + for i, result in enumerate(competitors): + # Calculate progress as decimal (0.0-1.0) + progress = 0.3 + (0.6 * (i / total_results)) + progress_text = f"Processing result {i+1}/{total_results}..." + progress_bar.progress(progress, text=progress_text) + + # Process each result + processed_result = { + "Title": result.title, + "URL": result.url, + "Content Summary": result.text if hasattr(result, 'text') else "No content available" + } + processed_results.append(processed_result) + + # Update progress + progress_bar.progress(0.9, text="Finalizing results...") + + # Create DataFrame + df = pd.DataFrame(processed_results) + + # Update progress + progress_bar.progress(1.0, text="Analysis completed!") + + return df, search_response + + else: + logger.warning("No results found in search response") + progress_bar.progress(1.0, text="No results found") + return pd.DataFrame(), search_response + except Exception as e: - logger.error(f"Metaphor: Error in finding similar content: {e}") + logger.error(f"Error in metaphor_find_similar: {str(e)}", exc_info=True) + if progress_bar: + progress_bar.progress(1.0, text="Error occurred during analysis") raise - competitors = search_response.results - # Initialize lists to store titles, URLs, and contents - titles = [] - urls = [] - contents = [] - progress_bar = st.progress(0, text="Starting competitor analysis...") - - # Extract titles, URLs, and contents from the competitors - for i, c in enumerate(competitors): - # Update progress bar for each competitor - if st.session_state.get('show_progress', True): - progress_text = f"Processing competitor {i+1}/{len(competitors)}: {c.title[:30]}..." - progress_bar.progress((i / len(competitors)) * 100, text=progress_text) - titles.append(c.title) - urls.append(c.url) - all_contents = "" - try: - # Update progress - if st.session_state.get('show_progress', True): - progress_bar.progress(25, text=f"Fetching content for {c.title[:30]}...") - - search_response = metaphor.search_and_contents( - c.url, - type="keyword", - num_results=1 - ) - research_response = search_response.results - - # Update progress - if st.session_state.get('show_progress', True): - progress_bar.progress(50, text=f"Extracting text from {c.title[:30]}...") - - for r in research_response: - all_contents += r.text - - # Update progress - if st.session_state.get('show_progress', True): - progress_bar.progress(75, text=f"Summarizing content for {c.title[:30]}...") - - # Get the summary from the competitor content - summary_response = summarize_competitor_content(all_contents) - c.text = summary_response - - # Store the raw summary in session state for display in dialog - if 'competitor_summaries' not in st.session_state: - st.session_state.competitor_summaries = {} - st.session_state.competitor_summaries[c.url] = { - 'title': c.title, - 'summary': summary_response - } - - # Update progress to complete - if st.session_state.get('show_progress', True): - progress_bar.progress(100, text=f"Completed processing {c.title[:30]}") - - except Exception as err: - c.text = f"Failed to summarize content: {err}" - # Update progress to show error - if st.session_state.get('show_progress', True): - progress_bar.progress(100, text=f"Error processing {c.title[:30]}: {str(err)[:50]}...") - - contents.append(c.text) - - # Create a DataFrame from the titles, URLs, and contents - df = pd.DataFrame({ - "Title": titles, - "URL": urls, - "Content Summary": contents - }) - - # Return the DataFrame and the search response - return df, search_response - def calculate_date_range(time_range: str) -> tuple: """ diff --git a/lib/ai_writers/ai_blog_writer/keywords_to_blog_streamlit.py b/lib/ai_writers/ai_blog_writer/keywords_to_blog_streamlit.py index df44084c..5836d205 100644 --- a/lib/ai_writers/ai_blog_writer/keywords_to_blog_streamlit.py +++ b/lib/ai_writers/ai_blog_writer/keywords_to_blog_streamlit.py @@ -39,14 +39,6 @@ from .blog_ai_research_utils import ( do_tavily_ai_search ) -# REMOVED CIRCULAR IMPORTS -# Import content and image generation functions from the generator utils module -# from .ai_blog_generator_utils import ( -# generate_blog_content, -# generate_blog_metadata, -# generate_blog_image, -# regenerate_blog_image -# ) def save_blog_content(blog_markdown_str, blog_title, blog_meta_desc, blog_tags, blog_categories, generated_image_filepath, status, blog_hashtags=None, blog_slug=None): """ diff --git a/lib/ai_writers/ai_story_illustrator/README.md b/lib/ai_writers/ai_story_illustrator/README.md new file mode 100644 index 00000000..ad1afbe8 --- /dev/null +++ b/lib/ai_writers/ai_story_illustrator/README.md @@ -0,0 +1,75 @@ +# AI Story Illustrator + +The AI Story Illustrator is a powerful tool that generates beautiful illustrations for stories using Google's Gemini AI. This module allows users to input stories via text, file upload, or URL, and automatically generates appropriate illustrations for different scenes in the story. + +## Features + +- **Multiple Input Methods**: Input stories via direct text entry, file upload, or URL extraction +- **Intelligent Scene Segmentation**: Automatically divides stories into logical segments for illustration +- **Customizable Illustration Styles**: Choose from various artistic styles or define your own +- **Scene Element Extraction**: Analyzes story segments to identify key visual elements +- **Multiple Export Options**: Export as PDF storybook or ZIP archive of individual images +- **Customizable Aspect Ratios**: Support for different image dimensions (16:9, 4:3, 1:1) +- **Advanced Settings**: Control the number of segments to illustrate and other parameters + +## Usage + +The Story Illustrator is integrated into the Alwrity platform and can be accessed through the main interface. The workflow consists of three main steps: + +1. **Story Input**: Enter your story text, upload a file, or provide a URL +2. **Illustration Settings**: Configure the style, aspect ratio, and other parameters +3. **Generate & Export**: Generate illustrations for all or individual segments and export the results + +## Technical Details + +### Dependencies + +- Streamlit: For the user interface +- Gemini AI: For image generation +- BeautifulSoup: For URL text extraction +- ReportLab: For PDF generation (optional) +- PIL: For image processing + +### Key Functions + +- `segment_story()`: Divides a story into logical segments for illustration +- `extract_scene_elements()`: Analyzes story segments to identify key visual elements +- `generate_illustration_prompt()`: Creates detailed prompts for the AI image generator +- `create_illustration()`: Generates an illustration for a story segment +- `create_storybook_pdf()`: Combines story text and illustrations into a PDF +- `create_zip_archive()`: Creates a ZIP archive of individual illustrations + +## Example + +```python +from lib.ai_writers.ai_story_illustrator.story_illustrator import write_story_illustrator + +# Run the Story Illustrator app +write_story_illustrator() +``` + +## Best Practices + +- **Provide Clear Segments**: The system works best with stories that have clear scene transitions +- **Be Specific with Styles**: More specific style descriptions yield better results +- **Balance Text and Images**: For best results, aim for segments of 100-500 words per illustration +- **Review and Regenerate**: If an illustration doesn't capture the scene well, use the regenerate option + +## Future Enhancements + +- Support for more export formats (EPUB, HTML) +- Enhanced character consistency across illustrations +- Animation options for digital storytelling +- Voice narration integration +- Custom character design options + +## Troubleshooting + +- If illustrations are not generating, check your internet connection and API access +- If PDF export fails, ensure ReportLab is installed (`pip install reportlab`) +- If URL extraction fails, try copying the text manually +- For large stories, consider processing in smaller batches + +## Credits + +This module uses Google's Gemini AI for image generation and leverages various open-source libraries for text processing and document generation. \ No newline at end of file diff --git a/lib/ai_writers/ai_story_illustrator/__init__.py b/lib/ai_writers/ai_story_illustrator/__init__.py new file mode 100644 index 00000000..cd765890 --- /dev/null +++ b/lib/ai_writers/ai_story_illustrator/__init__.py @@ -0,0 +1,7 @@ +""" +AI Story Illustrator module for generating illustrations for stories using AI. +""" + +from .story_illustrator import write_story_illustrator + +__all__ = ['write_story_illustrator'] \ No newline at end of file diff --git a/lib/ai_writers/ai_story_illustrator/story_illustrator.py b/lib/ai_writers/ai_story_illustrator/story_illustrator.py new file mode 100644 index 00000000..189e9430 --- /dev/null +++ b/lib/ai_writers/ai_story_illustrator/story_illustrator.py @@ -0,0 +1,727 @@ +""" +AI Story Illustrator - Generate illustrations for stories using Gemini AI + +This module provides functionality to generate illustrations for stories using Google's Gemini AI. +Users can input stories via text, file upload, or URL, and the system will generate appropriate +illustrations for different scenes in the story. + +Based on: https://github.com/google-gemini/cookbook/blob/main/examples/Book_illustration.ipynb +""" + +import streamlit as st +import os +import re +import time +import tempfile +import requests +from pathlib import Path +import io +import base64 +import json +import uuid +import logging +from urllib.parse import urlparse +from bs4 import BeautifulSoup +import zipfile + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger('story_illustrator') + +# Constants +MAX_STORY_LENGTH = 10000 # Maximum story length in characters +MIN_SEGMENT_LENGTH = 100 # Minimum segment length for illustration +MAX_SEGMENTS = 20 # Maximum number of segments to illustrate +DEFAULT_STYLE = "digital art" # Default illustration style +DEFAULT_ASPECT_RATIO = "16:9" # Default aspect ratio + + +def extract_text_from_url(url): + """Extract text content from a URL.""" + try: + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } + response = requests.get(url, headers=headers, timeout=10) + response.raise_for_status() + + soup = BeautifulSoup(response.content, 'html.parser') + + # Remove script and style elements + for script in soup(["script", "style"]): + script.extract() + + # Get text + text = soup.get_text(separator='\\n') + + # Break into lines and remove leading and trailing space on each + lines = (line.strip() for line in text.splitlines()) + # Break multi-headlines into a line each + chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) + # Drop blank lines + text = '\\n'.join(chunk for chunk in chunks if chunk) + + return text + except Exception as e: + logger.error(f"Error extracting text from URL: {e}") + return None + + +def segment_story(story_text, min_segment_length=MIN_SEGMENT_LENGTH, max_segments=MAX_SEGMENTS): + """ + Segment a story into logical parts for illustration. + Uses paragraph breaks, scene changes, and other indicators to create segments. + """ + # Clean up the text + story_text = story_text.strip() + + # Split by paragraphs first + paragraphs = re.split(r'\\n\s*\\n', story_text) + + # Initialize segments + segments = [] + current_segment = "" + + for paragraph in paragraphs: + # Skip empty paragraphs + if not paragraph.strip(): + continue + + # If adding this paragraph would make the segment too long, start a new segment + if len(current_segment) + len(paragraph) > 1000: # Limit segment size + if current_segment: + segments.append(current_segment.strip()) + current_segment = paragraph + else: + # Add paragraph to current segment + if current_segment: + current_segment += "\\n\\n" + paragraph + else: + current_segment = paragraph + + # Add the last segment if it exists + if current_segment: + segments.append(current_segment.strip()) + + # Combine very short segments + i = 0 + while i < len(segments) - 1: + if len(segments[i]) < min_segment_length: + segments[i] += "\\n\\n" + segments[i+1] + segments.pop(i+1) + else: + i += 1 + + # Limit the number of segments + if len(segments) > max_segments: + # Combine segments to reduce the total number + new_segments = [] + segment_size = len(segments) / max_segments + + for i in range(max_segments): + start_idx = int(i * segment_size) + end_idx = int((i + 1) * segment_size) + combined_segment = "\\n\\n".join(segments[start_idx:end_idx]) + new_segments.append(combined_segment) + + segments = new_segments + + return segments + + +def extract_scene_elements(segment): + """ + Extract key scene elements from a story segment using LLM. + This helps create more accurate illustration prompts. + """ + from ...gpt_providers.text_generation.main_text_generation import llm_text_gen + + prompt = f""" + Analyze the following story segment and extract key visual elements for an illustration: + + {segment} + + Please provide: + 1. Main characters present (with brief visual descriptions) + 2. Setting/location details + 3. Key action or emotional moment to illustrate + 4. Important objects or props + 5. Time of day and lighting + 6. Weather or atmospheric conditions (if applicable) + + Format your response as JSON with these keys: "characters", "setting", "key_moment", "objects", "lighting", "atmosphere" + """ + + try: + response = llm_text_gen(prompt) + + # Try to extract JSON from the response + try: + # Find JSON content between triple backticks if present + json_match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL) + if json_match: + json_str = json_match.group(1) + else: + # Otherwise try to parse the whole response as JSON + json_str = response + + scene_elements = json.loads(json_str) + return scene_elements + except json.JSONDecodeError: + # If JSON parsing fails, extract information using regex + characters = re.search(r'"characters":\s*"([^"]*)"', response) + setting = re.search(r'"setting":\s*"([^"]*)"', response) + + return { + "characters": characters.group(1) if characters else "", + "setting": setting.group(1) if setting else "", + "key_moment": "", + "objects": "", + "lighting": "", + "atmosphere": "" + } + except Exception as e: + logger.error(f"Error extracting scene elements: {e}") + return { + "characters": "", + "setting": "", + "key_moment": "", + "objects": "", + "lighting": "", + "atmosphere": "" + } + + +def generate_illustration_prompt(segment, style, characters=None, setting=None): + """ + Generate a prompt for the illustration based on the segment content. + + Args: + segment: The story segment to illustrate + style: The artistic style for the illustration + characters: Optional character descriptions + setting: Optional setting description + + Returns: + A prompt string for the image generation model + """ + # Create a base prompt + base_prompt = f""" + Create a detailed illustration for the following story segment in {style} style: + + {segment[:500]} # Limit segment length for prompt + + The illustration should capture the key elements, mood, and action of this scene. + """ + + # Add character information if provided + if characters: + base_prompt += f"\\n\\nThe main characters in this scene are: {characters}" + + # Add setting information if provided + if setting: + base_prompt += f"\\n\\nThe setting is: {setting}" + + # Add style-specific instructions + if "watercolor" in style.lower(): + base_prompt += "\\n\\nUse soft, flowing watercolor techniques with visible brush strokes and color blending." + elif "digital art" in style.lower(): + base_prompt += "\\n\\nCreate a polished digital illustration with clean lines and vibrant colors." + elif "pencil sketch" in style.lower(): + base_prompt += "\\n\\nUse pencil sketch techniques with visible hatching, shading, and line work." + + # Add final quality instructions + base_prompt += """ + + Make the illustration: + - Visually engaging and detailed + - Appropriate for a storybook + - Focused on the main action or emotion of the scene + - With good composition and visual storytelling + """ + + return base_prompt.strip() + + +def create_illustration(segment, style, aspect_ratio="16:9"): + """ + Create an illustration for a story segment. + + Args: + segment: The story segment to illustrate + style: The artistic style for the illustration + aspect_ratio: The aspect ratio for the illustration + + Returns: + Path to the generated image + """ + # Import here to avoid circular imports + from ...gpt_providers.text_to_image_generation.gen_gemini_images import generate_gemini_image + + # Extract scene elements to enhance the prompt + scene_elements = extract_scene_elements(segment) + + # Create a detailed prompt for the illustration + prompt = generate_illustration_prompt( + segment, + style, + characters=scene_elements.get("characters", ""), + setting=scene_elements.get("setting", "") + ) + + # Add key elements to the prompt + key_moment = scene_elements.get("key_moment", "") + objects = scene_elements.get("objects", "") + lighting = scene_elements.get("lighting", "") + atmosphere = scene_elements.get("atmosphere", "") + + if key_moment: + prompt += f"\\n\\nFocus on this key moment: {key_moment}" + + if objects: + prompt += f"\\n\\nInclude these important objects: {objects}" + + if lighting: + prompt += f"\\n\\nThe lighting is: {lighting}" + + if atmosphere: + prompt += f"\\n\\nThe atmosphere/weather is: {atmosphere}" + + # Generate the illustration + try: + # Parse aspect ratio + if aspect_ratio == "16:9": + width, height = 16, 9 + elif aspect_ratio == "4:3": + width, height = 4, 3 + elif aspect_ratio == "1:1": + width, height = 1, 1 + else: + width, height = 16, 9 # Default + + # Generate image using Gemini + image_path = generate_gemini_image( + prompt=prompt, + style=style.lower() if style else None, + aspect_ratio=aspect_ratio + ) + + return image_path + except Exception as e: + logger.error(f"Error creating illustration: {e}") + return None + + +def create_storybook_pdf(segments, illustrations, title, author, output_path): + """ + Create a PDF storybook with text and illustrations. + + Args: + segments: List of story segments + illustrations: List of paths to illustrations + title: Book title + author: Book author + output_path: Path to save the PDF + + Returns: + Path to the created PDF + """ + try: + from reportlab.lib.pagesizes import letter, A4 + from reportlab.lib import colors + from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as ReportLabImage, PageBreak + from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle + from reportlab.lib.units import inch + + # Create a PDF document + doc = SimpleDocTemplate(output_path, pagesize=A4) + story = [] + + # Get styles + styles = getSampleStyleSheet() + title_style = styles['Title'] + author_style = styles['Normal'] + author_style.alignment = 1 # Center alignment + normal_style = styles['Normal'] + + # Add title page + story.append(Paragraph(title, title_style)) + story.append(Spacer(1, 0.5*inch)) + story.append(Paragraph(f"by {author}", author_style)) + story.append(PageBreak()) + + # Add content pages + for i, (segment, illustration_path) in enumerate(zip(segments, illustrations)): + if illustration_path and os.path.exists(illustration_path): + # Add illustration + img = ReportLabImage(illustration_path, width=6*inch, height=4*inch) + story.append(img) + story.append(Spacer(1, 0.25*inch)) + + # Add text + for paragraph in segment.split('\\n\\n'): + if paragraph.strip(): + story.append(Paragraph(paragraph, normal_style)) + story.append(Spacer(1, 0.1*inch)) + + # Add page break between segments + if i < len(segments) - 1: + story.append(PageBreak()) + + # Build the PDF + doc.build(story) + return output_path + except Exception as e: + logger.error(f"Error creating PDF: {e}") + return None + + +def create_zip_archive(files, output_path): + """ + Create a ZIP archive containing the provided files. + + Args: + files: Dictionary of {filename: file_path} to include in the archive + output_path: Path to save the ZIP file + + Returns: + Path to the created ZIP file + """ + try: + with zipfile.ZipFile(output_path, 'w') as zipf: + for filename, file_path in files.items(): + if os.path.exists(file_path): + zipf.write(file_path, arcname=filename) + return output_path + except Exception as e: + logger.error(f"Error creating ZIP archive: {e}") + return None + + +def write_story_illustrator(): + """Main function for the Story Illustrator Streamlit app.""" + st.title("AI Story Illustrator") + st.write("Generate beautiful illustrations for your stories using AI") + + # Create tabs for different sections + tab1, tab2, tab3 = st.tabs(["Story Input", "Illustration Settings", "Generate & Export"]) + + # Initialize session state variables if they don't exist + if "story_text" not in st.session_state: + st.session_state.story_text = "" + if "segments" not in st.session_state: + st.session_state.segments = [] + if "illustrations" not in st.session_state: + st.session_state.illustrations = [] + if "book_title" not in st.session_state: + st.session_state.book_title = "" + if "book_author" not in st.session_state: + st.session_state.book_author = "" + if "illustration_style" not in st.session_state: + st.session_state.illustration_style = DEFAULT_STYLE + if "aspect_ratio" not in st.session_state: + st.session_state.aspect_ratio = DEFAULT_ASPECT_RATIO + if "temp_files" not in st.session_state: + st.session_state.temp_files = [] + + # Tab 1: Story Input + with tab1: + st.header("Step 1: Input Your Story") + + # Input method selection + input_method = st.radio( + "Choose input method:", + ["Text Input", "File Upload", "URL"] + ) + + if input_method == "Text Input": + st.session_state.story_text = st.text_area( + "Enter your story text:", + value=st.session_state.story_text, + height=300, + max_chars=MAX_STORY_LENGTH, + help="Enter the story text you want to illustrate (max 10,000 characters)" + ) + + elif input_method == "File Upload": + uploaded_file = st.file_uploader("Upload a text file:", type=["txt", "md"]) + if uploaded_file is not None: + try: + st.session_state.story_text = uploaded_file.getvalue().decode("utf-8") + st.success(f"Successfully loaded file: {uploaded_file.name}") + st.text_area("Preview:", value=st.session_state.story_text[:500] + "...", height=200, disabled=True) + except Exception as e: + st.error(f"Error reading file: {e}") + + elif input_method == "URL": + url = st.text_input("Enter URL containing the story:") + if url: + if st.button("Extract Text from URL"): + with st.spinner("Extracting text from URL..."): + extracted_text = extract_text_from_url(url) + if extracted_text: + st.session_state.story_text = extracted_text + st.success("Successfully extracted text from URL") + st.text_area("Preview:", value=st.session_state.story_text[:500] + "...", height=200, disabled=True) + else: + st.error("Failed to extract text from URL") + + # Book metadata + st.subheader("Book Metadata") + col1, col2 = st.columns(2) + with col1: + st.session_state.book_title = st.text_input("Book Title:", value=st.session_state.book_title) + with col2: + st.session_state.book_author = st.text_input("Author:", value=st.session_state.book_author) + + # Process story into segments + if st.session_state.story_text: + if st.button("Process Story into Segments"): + with st.spinner("Processing story into segments..."): + st.session_state.segments = segment_story(st.session_state.story_text) + st.success(f"Story processed into {len(st.session_state.segments)} segments") + + # Initialize illustrations list with None values + st.session_state.illustrations = [None] * len(st.session_state.segments) + + # Display segments + st.subheader("Story Segments") + for i, segment in enumerate(st.session_state.segments): + with st.expander(f"Segment {i+1}"): + st.write(segment) + + # Tab 2: Illustration Settings + with tab2: + st.header("Step 2: Configure Illustration Settings") + + # Style selection + st.subheader("Illustration Style") + style_options = [ + "Digital Art", + "Watercolor Painting", + "Pencil Sketch", + "Oil Painting", + "Cartoon", + "Anime", + "3D Render", + "Pixel Art", + "Children's Book Illustration", + "Comic Book Style", + "Fantasy Art", + "Realistic" + ] + + st.session_state.illustration_style = st.selectbox( + "Choose an illustration style:", + style_options, + index=style_options.index(st.session_state.illustration_style) if st.session_state.illustration_style in style_options else 0 + ) + + # Custom style input + use_custom_style = st.checkbox("Use custom style") + if use_custom_style: + custom_style = st.text_input("Describe your custom style:", + placeholder="e.g., Impressionist painting with vibrant colors and visible brushstrokes") + if custom_style: + st.session_state.illustration_style = custom_style + + # Display style examples + st.info("💡 The style you choose will significantly impact the look and feel of your illustrations.") + + # Aspect ratio selection + st.subheader("Image Settings") + aspect_ratio_options = { + "16:9 (Widescreen)": "16:9", + "4:3 (Standard)": "4:3", + "1:1 (Square)": "1:1" + } + + selected_ratio = st.selectbox( + "Choose aspect ratio:", + list(aspect_ratio_options.keys()), + index=list(aspect_ratio_options.values()).index(st.session_state.aspect_ratio) if st.session_state.aspect_ratio in aspect_ratio_options.values() else 0 + ) + st.session_state.aspect_ratio = aspect_ratio_options[selected_ratio] + + # Advanced settings + with st.expander("Advanced Settings"): + st.slider("Number of segments to illustrate:", 1, + max(len(st.session_state.segments), 1) if st.session_state.segments else 1, + min(len(st.session_state.segments), MAX_SEGMENTS) if st.session_state.segments else 1, + key="num_segments_to_illustrate") + + st.checkbox("Generate cover image", value=True, key="generate_cover") + + st.checkbox("Add text to illustrations", value=False, key="add_text_to_illustrations") + + # Tab 3: Generate & Export + with tab3: + st.header("Step 3: Generate Illustrations & Export") + + if not st.session_state.segments: + st.warning("Please process your story into segments in Step 1 before generating illustrations.") + else: + # Generate illustrations + st.subheader("Generate Illustrations") + + num_segments = min(len(st.session_state.segments), st.session_state.get("num_segments_to_illustrate", len(st.session_state.segments))) + + if st.button("Generate All Illustrations"): + with st.spinner(f"Generating {num_segments} illustrations... This may take a while."): + progress_bar = st.progress(0) + + for i in range(num_segments): + # Update progress + progress_bar.progress((i) / num_segments) + st.write(f"Generating illustration {i+1} of {num_segments}...") + + # Generate illustration + illustration_path = create_illustration( + st.session_state.segments[i], + st.session_state.illustration_style, + st.session_state.aspect_ratio + ) + + # Store the illustration path + if illustration_path: + st.session_state.illustrations[i] = illustration_path + st.session_state.temp_files.append(illustration_path) + + # Complete progress + progress_bar.progress(1.0) + st.success(f"Generated {num_segments} illustrations!") + + # Generate individual illustrations + st.subheader("Generate Individual Illustrations") + + for i in range(num_segments): + col1, col2 = st.columns([3, 1]) + + with col1: + with st.expander(f"Segment {i+1}"): + st.write(st.session_state.segments[i][:300] + "..." if len(st.session_state.segments[i]) > 300 else st.session_state.segments[i]) + + with col2: + if st.button(f"Generate #{i+1}", key=f"gen_btn_{i}"): + with st.spinner(f"Generating illustration {i+1}..."): + illustration_path = create_illustration( + st.session_state.segments[i], + st.session_state.illustration_style, + st.session_state.aspect_ratio + ) + + if illustration_path: + st.session_state.illustrations[i] = illustration_path + st.session_state.temp_files.append(illustration_path) + st.success(f"Generated illustration {i+1}!") + + # Display generated illustrations + st.subheader("Preview Illustrations") + + if any(st.session_state.illustrations): + for i, illustration_path in enumerate(st.session_state.illustrations[:num_segments]): + if illustration_path and os.path.exists(illustration_path): + with st.expander(f"Illustration {i+1}"): + st.image(illustration_path, caption=f"Illustration for Segment {i+1}", use_column_width=True) + + # Regenerate button + if st.button(f"Regenerate", key=f"regen_btn_{i}"): + with st.spinner(f"Regenerating illustration {i+1}..."): + new_illustration_path = create_illustration( + st.session_state.segments[i], + st.session_state.illustration_style, + st.session_state.aspect_ratio + ) + + if new_illustration_path: + st.session_state.illustrations[i] = new_illustration_path + st.session_state.temp_files.append(new_illustration_path) + st.rerun() + else: + st.info("No illustrations generated yet. Click 'Generate All Illustrations' or generate individual illustrations.") + + # Export options + st.subheader("Export Options") + + if any(st.session_state.illustrations): + export_format = st.radio( + "Export format:", + ["PDF Storybook", "Individual Images (ZIP)", "Both"] + ) + + if st.button("Export"): + with st.spinner("Preparing export..."): + # Create temporary directory for exports + with tempfile.TemporaryDirectory() as temp_dir: + # Filter out None values from illustrations + valid_illustrations = [path for path in st.session_state.illustrations[:num_segments] if path and os.path.exists(path)] + valid_segments = st.session_state.segments[:len(valid_illustrations)] + + # Prepare filenames + safe_title = "".join(c if c.isalnum() else "_" for c in st.session_state.book_title) if st.session_state.book_title else "story" + timestamp = int(time.time()) + + # Export as PDF + if export_format in ["PDF Storybook", "Both"]: + pdf_path = os.path.join(temp_dir, f"{safe_title}_{timestamp}.pdf") + + try: + pdf_result = create_storybook_pdf( + valid_segments, + valid_illustrations, + st.session_state.book_title or "Untitled Story", + st.session_state.book_author or "Anonymous", + pdf_path + ) + + if pdf_result: + with open(pdf_path, "rb") as f: + st.download_button( + label="Download PDF Storybook", + data=f, + file_name=f"{safe_title}.pdf", + mime="application/pdf" + ) + except Exception as e: + st.error(f"Error creating PDF: {e}") + st.info("Please install ReportLab to enable PDF export: pip install reportlab") + + # Export as ZIP of images + if export_format in ["Individual Images (ZIP)", "Both"]: + zip_path = os.path.join(temp_dir, f"{safe_title}_illustrations_{timestamp}.zip") + + # Prepare files for ZIP + files_to_zip = {} + for i, img_path in enumerate(valid_illustrations): + if img_path and os.path.exists(img_path): + files_to_zip[f"illustration_{i+1}.png"] = img_path + + zip_result = create_zip_archive(files_to_zip, zip_path) + + if zip_result: + with open(zip_path, "rb") as f: + st.download_button( + label="Download Illustrations ZIP", + data=f, + file_name=f"{safe_title}_illustrations.zip", + mime="application/zip" + ) + else: + st.info("Generate illustrations before exporting.") + + # Cleanup temporary files when the session ends + def cleanup_temp_files(): + for file_path in st.session_state.temp_files: + try: + if file_path and os.path.exists(file_path): + os.remove(file_path) + except Exception as e: + logger.error(f"Error removing temporary file {file_path}: {e}") + + # Register the cleanup function to run when the session ends + import atexit + atexit.register(cleanup_temp_files) + + +if __name__ == "__main__": + write_story_illustrator() diff --git a/lib/ai_writers/ai_story_illustrator/utils.py b/lib/ai_writers/ai_story_illustrator/utils.py new file mode 100644 index 00000000..f1c05ecb --- /dev/null +++ b/lib/ai_writers/ai_story_illustrator/utils.py @@ -0,0 +1,450 @@ +""" +Utility functions for the AI Story Illustrator module. + +This module provides helper functions for file operations, string manipulation, +and simple text analysis relevant to story processing. +""" + +import os +import re +import tempfile +import uuid +import logging +import shutil +from pathlib import Path +from typing import List, Tuple, Optional, Union + +# Attempt to import Pillow for image dimensions, but don't fail if not installed +# unless the specific function is called. +try: + from PIL import Image + _PIL_AVAILABLE = True +except ImportError: + _PIL_AVAILABLE = False + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger('story_illustrator_utils') + +# --- Constants --- +IMAGE_EXTENSIONS = frozenset(['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp']) +TEXT_EXTENSIONS = frozenset(['.txt', '.md', '.text']) +# Common English words that often start sentences, excluded from simple name detection +COMMON_START_WORDS = frozenset([ + 'The', 'A', 'An', 'And', 'But', 'Or', 'For', 'Nor', 'So', 'Yet', 'He', 'She', + 'It', 'They', 'We', 'You', 'I', 'In', 'On', 'At', 'To', 'From', 'With', + 'About', 'As', 'Is', 'Was', 'Were', 'Be', 'Been', 'Being', 'Have', 'Has', + 'Had', 'Do', 'Does', 'Did', 'Will', 'Would', 'Shall', 'Should', 'May', + 'Might', 'Must', 'Can', 'Could' +]) + + +# --- File/Directory Operations --- + +def create_temp_directory(prefix: str = "story_illustrator_") -> str: + """ + Creates a temporary directory using tempfile.mkdtemp. + + Args: + prefix: A prefix for the temporary directory name. + + Returns: + The absolute path to the created temporary directory. + """ + try: + temp_dir = tempfile.mkdtemp(prefix=prefix) + logger.info(f"Created temporary directory: {temp_dir}") + return temp_dir + except Exception as e: + logger.error(f"Failed to create temporary directory: {e}", exc_info=True) + raise # Re-raise the exception after logging + + +def sanitize_filename(filename: str) -> str: + """ + Sanitizes a filename by removing/replacing invalid characters for common filesystems. + + Args: + filename: The original filename string. + + Returns: + A sanitized filename string suitable for use in file paths. + """ + if not isinstance(filename, str): + logger.warning("sanitize_filename received non-string input, converting.") + filename = str(filename) + + # Remove characters invalid for Windows/Unix filenames + # Replace them with an underscore. + sanitized = re.sub(r'[\\/*?:"<>|\']', "_", filename) + # Replace consecutive underscores/spaces with a single underscore + sanitized = re.sub(r'[_ ]+', '_', sanitized) + # Remove leading/trailing spaces, dots, and underscores + sanitized = sanitized.strip("._ ") + + # Ensure the filename is not empty after sanitization + if not sanitized: + sanitized = "unnamed_file" + logger.warning("Filename was empty after sanitization, using default.") + + # Limit filename length (optional, adjust as needed) + # max_len = 255 # Example limit + # if len(sanitized) > max_len: + # name, ext = os.path.splitext(sanitized) + # sanitized = name[:max_len - len(ext) - 1] + "_" + ext + # logger.warning(f"Filename truncated to maximum length: {sanitized}") + + return sanitized + + +def get_temp_file_path( + directory: str, prefix: str = "file_", suffix: str = ".tmp" +) -> str: + """ + Generates a unique temporary file path within the specified directory. + + Args: + directory: The directory where the temporary file should be located. + prefix: A prefix for the filename. + suffix: A suffix (extension) for the filename. + + Returns: + The full path for the unique temporary file. + """ + # Ensure suffix starts with a dot if it's meant to be an extension + if suffix and not suffix.startswith("."): + suffix = "." + suffix + + unique_id = uuid.uuid4().hex[:12] # Longer hex UUID for better uniqueness + filename = f"{prefix}{unique_id}{suffix}" + return os.path.join(directory, filename) + + +def ensure_directory_exists(directory: Union[str, Path]) -> str: + """ + Ensures that a directory exists, creating it recursively if necessary. + + Args: + directory: The path to the directory (string or Path object). + + Returns: + The absolute path to the directory as a string. + + Raises: + OSError: If the directory cannot be created (e.g., permission issues). + """ + dir_path = Path(directory).resolve() # Use Pathlib for robust handling + try: + dir_path.mkdir(parents=True, exist_ok=True) + # Log only if it needed creation (or if verbose logging is on) + # logger.info(f"Ensured directory exists: {dir_path}") + return str(dir_path) + except OSError as e: + logger.error(f"Failed to create or access directory {dir_path}: {e}", exc_info=True) + raise + + +def cleanup_directory(directory: Union[str, Path]) -> None: + """ + Removes a directory and all its contents recursively. Handles errors gracefully. + + Args: + directory: The path to the directory to remove (string or Path object). + """ + dir_path = Path(directory) + if not dir_path.exists(): + logger.debug(f"Cleanup skipped: Directory '{directory}' does not exist.") + return + + if not dir_path.is_dir(): + logger.warning(f"Cleanup warning: Path '{directory}' is not a directory.") + return + + try: + shutil.rmtree(dir_path) + logger.info(f"Successfully removed directory: {directory}") + except OSError as e: + logger.error(f"Error removing directory {directory}: {e}", exc_info=True) + except Exception as e: + logger.error( + f"Unexpected error removing directory {directory}: {e}", exc_info=True + ) + + +# --- File Type Checks --- + +def get_file_extension(file_path: Union[str, Path]) -> str: + """ + Gets the lowercased file extension (including the dot) from a file path. + + Args: + file_path: The path to the file (string or Path object). + + Returns: + The file extension (e.g., '.txt', '.png') or an empty string if no extension. + """ + return Path(file_path).suffix.lower() + + +def is_image_file(file_path: Union[str, Path]) -> bool: + """ + Checks if a file is likely an image based on its extension. + + Args: + file_path: The path to the file (string or Path object). + + Returns: + True if the file extension is in IMAGE_EXTENSIONS, False otherwise. + """ + return get_file_extension(file_path) in IMAGE_EXTENSIONS + + +def is_text_file(file_path: Union[str, Path]) -> bool: + """ + Checks if a file is likely a text file based on its extension. + + Args: + file_path: The path to the file (string or Path object). + + Returns: + True if the file extension is in TEXT_EXTENSIONS, False otherwise. + """ + return get_file_extension(file_path) in TEXT_EXTENSIONS + + +# --- Text Analysis (Simple Heuristics) --- + +def extract_story_title_from_text(text: str) -> str: + """ + Attempts to extract a title from story text using simple heuristics. + + Looks for patterns (in order): + 1. Markdown headers (#, ##, etc.) at the start of a line. + 2. The first non-empty line if it's short (< 100 chars) and followed by + a blank line or is the only line. + 3. The first non-empty line if it's entirely in uppercase (< 100 chars). + + Args: + text: The story text content. + + Returns: + An extracted title string, or "Untitled Story" if no pattern matches. + """ + if not isinstance(text, str) or not text.strip(): + return "Untitled Story" + + # 1. Check for markdown headers ( # Title, ## Title ) + # Needs to match start of line (^) with optional whitespace before # + header_match = re.search(r'^\s*#+\s+(.+)$', text.strip(), re.MULTILINE) + if header_match: + title = header_match.group(1).strip() + if title: return title + + lines = text.strip().split('\n') + if not lines: + return "Untitled Story" + + first_line = lines[0].strip() + if not first_line: # Skip if first line is blank + if len(lines) > 1: + first_line = lines[1].strip() # Try second line + else: + return "Untitled Story" + + if not first_line: # Still no title found + return "Untitled Story" + + # 2. Check if first line is short and potentially a title + is_short = len(first_line) < 100 + is_followed_by_blank = len(lines) > 1 and not lines[1].strip() + is_only_line = len(lines) == 1 + + if is_short and (is_followed_by_blank or is_only_line): + return first_line + + # 3. Check if first line is all caps (and short) + is_all_caps = first_line == first_line.upper() and first_line.isalpha() # Check if it contains letters + if is_short and is_all_caps: + return first_line + + # Default if no other pattern matched + return "Untitled Story" + + +def estimate_reading_time(text: str, words_per_minute: int = 200) -> float: + """ + Estimates the reading time of a text in minutes. + + Args: + text: The text content. + words_per_minute: The assumed average reading speed. + + Returns: + The estimated reading time in minutes. Returns 0.0 for empty text. + """ + if not isinstance(text, str) or not text.strip(): + return 0.0 + if words_per_minute <= 0: + raise ValueError("words_per_minute must be positive.") + + word_count = len(text.split()) + minutes = word_count / words_per_minute + return minutes + + +def count_sentences(text: str) -> int: + """ + Counts the number of sentences in a text using a very simple heuristic. + + Note: This is a basic implementation counting sentence-ending punctuation + (. ! ?). It will be inaccurate with abbreviations (Mr., Mrs., etc.), + ellipses, and complex sentence structures. + + Args: + text: The text content. + + Returns: + An estimated count of sentences. Returns 0 for empty text. + """ + if not isinstance(text, str) or not text.strip(): + return 0 + + # Find sequences of one or more sentence-ending punctuation marks + sentence_endings = re.findall(r'[.!?]+', text) + count = len(sentence_endings) + + # Handle edge case where text might not end with punctuation but isn't empty + if count == 0 and len(text.strip()) > 0: + return 1 # Assume at least one sentence if text exists but no terminators found + return count + + +def extract_character_names(text: str, min_occurrences: int = 2) -> List[str]: + """ + Attempts to extract potential character names from story text. + + Note: This is a simple heuristic based on finding capitalized words + (excluding common sentence starters) that appear multiple times. It has + limitations and may produce false positives or miss actual names. + + Args: + text: The story text content. + min_occurrences: The minimum number of times a capitalized word must + appear to be considered a potential name. + + Returns: + A list of potential character name strings. + """ + if not isinstance(text, str) or not text.strip(): + return [] + if min_occurrences < 1: + min_occurrences = 1 # Ensure at least one occurrence is required + + # Find words starting with an uppercase letter, potentially followed by lowercase + # Allows for single-letter names like 'X' but focuses on typical Name structure + capitalized_words = re.findall(r'\b[A-Z][a-zA-Z]*\b', text) + + # Count occurrences, excluding common words + word_counts: Dict[str, int] = {} + for word in capitalized_words: + if word not in COMMON_START_WORDS: + word_counts[word] = word_counts.get(word, 0) + 1 + + # Filter for words that meet the minimum occurrence threshold + potential_names = [ + word for word, count in word_counts.items() if count >= min_occurrences + ] + + # Sort for consistency (optional) + potential_names.sort() + + return potential_names + + +def extract_setting_details(text: str) -> List[str]: + """ + Attempts to extract potential setting details using simple regex patterns. + + Note: This is a very basic heuristic looking for common prepositional + phrases (e.g., "in the forest", "at the castle"). It is highly limited + and likely to miss many setting details or extract irrelevant phrases. + + Args: + text: The story text content. + + Returns: + A list of potential setting phrases found. + """ + if not isinstance(text, str) or not text.strip(): + return [] + + # Patterns looking for prepositions followed by nouns/adjectives + # Making patterns slightly more general: + # (\b\w+\b) captures single words + # (\b\w+\s+\w+\b) captures two-word phrases + # (\b[A-Z]\w*\b) captures capitalized words (potential proper nouns) + setting_patterns = [ + r'\b(?:in|on|at|near|beside|inside|outside|under|over|through)\s+(?:the|a|an)\s+((?:[A-Z]\w*|\w+)(?:\s+\w+){0,2})\b', # e.g., in the old house + r'\b(?:in|on|at)\s+((?:[A-Z]\w+)(?:\s+[A-Z]\w+)*)\b', # e.g., in New York City + r'\b(?:during|before|after)\s+(?:the|a|an)\s+(\w+(?:\s+\w+){0,2})\b', # e.g., during the storm + ] + + settings_found = set() # Use a set to avoid duplicates + for pattern in setting_patterns: + try: + matches = re.findall(pattern, text, re.IGNORECASE) # Ignore case + for match in matches: + # If match is tuple due to multiple capture groups, join them? + # For these patterns, it should be single strings. + if isinstance(match, str): + phrase = match.strip() + if phrase and len(phrase.split()) <= 5: # Limit phrase length + settings_found.add(phrase) + except re.error as e: + logger.warning(f"Regex error in extract_setting_details: {e} with pattern: {pattern}") + + + # Convert set back to list and sort for consistency + sorted_settings = sorted(list(settings_found)) + return sorted_settings + + +# --- Image Operations --- + +def get_image_dimensions(image_path: Union[str, Path]) -> Optional[Tuple[int, int]]: + """ + Gets the (width, height) dimensions of an image file using Pillow. + + Args: + image_path: The path to the image file (string or Path object). + + Returns: + A tuple (width, height) if successful, or None if the file is not + a valid image, Pillow is not installed, or an error occurs. + """ + if not _PIL_AVAILABLE: + logger.warning("Pillow (PIL) library not installed. Cannot get image dimensions.") + return None + + img_path = Path(image_path) + if not img_path.is_file(): + logger.error(f"Image file not found or is not a file: {image_path}") + return None + + try: + with Image.open(img_path) as img: + width, height = img.size + logger.debug(f"Dimensions for {image_path}: {width}x{height}") + return width, height + except FileNotFoundError: + logger.error(f"Image file not found at path: {image_path}") + return None + except UnidentifiedImageError: # Specific Pillow error for invalid images + logger.error(f"Could not identify image file (invalid format or corrupted): {image_path}") + return None + except Exception as e: + logger.error(f"Error getting dimensions for image {image_path}: {e}", exc_info=True) + return None \ No newline at end of file diff --git a/lib/ai_writers/ai_story_video_generator/README.md b/lib/ai_writers/ai_story_video_generator/README.md new file mode 100644 index 00000000..edc451cb --- /dev/null +++ b/lib/ai_writers/ai_story_video_generator/README.md @@ -0,0 +1,31 @@ +# AI Story Video Generator + +This module allows users to generate animated story videos using AI. It leverages Google's Gemini model to create stories and generate images for each scene, then combines them into a video. + +## Features + +- Generate complete stories based on user prompts +- Create scene-by-scene storyboards +- Generate images for each scene using Gemini +- Compile images into an animated video +- Add background music and text overlays +- Export videos in MP4 format + +## How It Works + +1. User provides a story prompt and preferences +2. AI generates a complete story with multiple scenes +3. For each scene, an image is generated +4. Images are compiled into a video with transitions +5. Optional background music and text overlays are added +6. The final video is available for download + +## Requirements + +- Google Gemini API key +- FFmpeg for video processing +- Python libraries: moviepy, pillow, requests + +## Usage + +Access this tool through the Streamlit interface by selecting "AI Story Video Generator" from the main menu. \ No newline at end of file diff --git a/lib/ai_writers/ai_story_video_generator/__init__.py b/lib/ai_writers/ai_story_video_generator/__init__.py new file mode 100644 index 00000000..6432e465 --- /dev/null +++ b/lib/ai_writers/ai_story_video_generator/__init__.py @@ -0,0 +1,4 @@ +# AI Story Video Generator module +from .story_video_generator import write_story_video_generator + +__all__ = ["write_story_video_generator"] \ No newline at end of file diff --git a/lib/ai_writers/ai_story_video_generator/story_video_generator.py b/lib/ai_writers/ai_story_video_generator/story_video_generator.py new file mode 100644 index 00000000..decbddad --- /dev/null +++ b/lib/ai_writers/ai_story_video_generator/story_video_generator.py @@ -0,0 +1,1033 @@ +""" +AI Story Video Generator + +This module provides functionality to generate animated story videos using AI. +It adapts the Google Gemini cookbook example for Streamlit. +""" + +import os +import re +import time +import json +import uuid +import tempfile +import logging +from pathlib import Path +from typing import List, Dict, Any, Tuple, Optional, Union + +import streamlit as st +import numpy as np +from PIL import Image, ImageDraw, ImageFont +import requests +from moviepy.editor import ( + ImageSequenceClip, + TextClip, + CompositeVideoClip, + AudioFileClip, +) + +# Import Gemini functionality (Ensure these paths are correct in your project) +try: + from lib.gpt_providers.text_generation.main_text_generation import ( + llm_text_gen, + ) + from lib.gpt_providers.text_to_image_generation.gen_gemini_images import ( + generate_gemini_image, + ) +except ImportError as e: + st.error( + f"Failed to import custom libraries: {e}. " + "Please ensure 'lib/gpt_providers/...' structure is correct and accessible." + ) + # You might want to exit or disable functionality if imports fail + llm_text_gen = None + generate_gemini_image = None + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +# Constants +DEFAULT_FPS = 1 +DEFAULT_DURATION = 3 # seconds per image +DEFAULT_TRANSITION_DURATION = 1 # seconds for transition (Currently unused, potential future feature) +DEFAULT_FONT_SIZE = 24 +DEFAULT_FONT_COLOR = "white" +DEFAULT_MUSIC_URL = "https://freepd.com/music/Magical%20Transition.mp3" # Example free music URL +DEFAULT_IMAGE_WIDTH = 1024 +DEFAULT_IMAGE_HEIGHT = 768 +TEXT_AREA_HEIGHT_RATIO = 1 / 3 +TEXT_PADDING = 20 +TEXT_OVERLAY_ALPHA = 160 # Semi-transparent overlay (0-255) + +class StoryVideoGenerator: + """Class to handle the generation of animated story videos.""" + + def __init__(self): + """Initialize the StoryVideoGenerator.""" + self.temp_dir = tempfile.mkdtemp() + logger.info( + f"Initialized StoryVideoGenerator. Temp directory: {self.temp_dir}" + ) + # Consider adding cleanup logic, e.g., using atexit or context manager + + def generate_story( + self, prompt: str, num_scenes: int = 5, style: str = "children's story" + ) -> Dict[str, Any]: + """ + Generate a story based on the given prompt using an LLM. + + Args: + prompt: The story prompt. + num_scenes: Number of scenes to generate. + style: Style of the story (e.g., "children's story", "sci-fi"). + + Returns: + A dictionary containing the story title and a list of scenes. + + Raises: + Exception: If story generation or parsing fails. + """ + if not llm_text_gen: + raise RuntimeError("LLM text generation function not available.") + + logger.info( + f"Generating story: prompt='{prompt}', num_scenes={num_scenes}, style='{style}'" + ) + + system_prompt = f"""You are a creative story writer specializing in {style} stories. +Create a short story based on the prompt below. +The story should have exactly {num_scenes} scenes. +Format your response STRICTLY as a JSON object with the following structure: +{{ +"title": "Story Title", +"scenes": [ +{{ +"scene_number": 1, +"description": "Brief visual description of the scene suitable for image generation", +"narration": "The narration text for this scene" +}}, +... +] +}} +Ensure each scene has a clear visual description and corresponding narration. +Do not include any text outside the JSON structure itself (e.g., no '```json' markers). +""" + + user_prompt = f"Create a {style} story about: {prompt}" + + try: + response = llm_text_gen(user_prompt, system_prompt=system_prompt) + logger.debug(f"Raw LLM response received:\n{response}") + + # Attempt to directly parse the response as JSON + try: + # Clean potential markdown fences and surrounding whitespace aggressively + cleaned_response = re.sub(r'^```(json)?\s*|\s*```$', '', response, flags=re.DOTALL | re.IGNORECASE).strip() + story_data = json.loads(cleaned_response) + except json.JSONDecodeError as json_err: + logger.error(f"JSONDecodeError: {json_err}. Raw response was: {response}") + # Fallback: Try regex extraction if direct parsing fails + json_match = re.search(r'\{\s*"title":.*\}\s*$', cleaned_response, re.DOTALL) + if json_match: + json_str = json_match.group(0) + try: + story_data = json.loads(json_str) + logger.info("Successfully parsed JSON using regex fallback.") + except json.JSONDecodeError as fallback_err: + logger.error(f"Fallback JSON parsing failed: {fallback_err}") + raise Exception(f"Failed to parse LLM response as JSON. Response:\n{response}") from fallback_err + else: + raise Exception(f"Could not find valid JSON in LLM response. Response:\n{response}") from json_err + + + # Validate structure (basic check) + if "title" not in story_data or "scenes" not in story_data: + raise ValueError("Generated JSON missing 'title' or 'scenes' key.") + if not isinstance(story_data["scenes"], list): + raise ValueError("'scenes' key must contain a list.") + + logger.info(f"Successfully generated story: {story_data.get('title', 'Untitled')}") + return story_data + + except Exception as e: + logger.error(f"Error generating story: {str(e)}", exc_info=True) + raise Exception(f"Failed to generate or parse story: {str(e)}") from e + + def generate_scene_image( + self, scene: Dict[str, Any], style: str = "digital art" + ) -> str: + """ + Generate an image for a single scene using an image generation model. + + Args: + scene: The scene dictionary containing "scene_number" and "description". + style: The visual style for the image (e.g., "digital art", "cartoon"). + + Returns: + Path to the generated image file. Falls back to a placeholder on error. + """ + if not generate_gemini_image: + raise RuntimeError("Image generation function not available.") + + scene_num = scene.get("scene_number", "unknown") + description = scene.get("description", "No description provided.") + + logger.info( + f"Generating image for scene {scene_num}: '{description}', style: '{style}'" + ) + + prompt = f"Create a {style} image representing this scene: {description}. Image should be visually clear and focus on the core elements described." + + try: + # Generate image using the imported function + # This function should save the image and return its path + image_path = generate_gemini_image(prompt, style=style) # Assuming this function saves the image and returns path + + if not image_path or not os.path.exists(image_path): + raise Exception(f"Image generation function did not return a valid path: {image_path}") + + logger.info(f"Successfully generated image for scene {scene_num}: {image_path}") + return image_path + + except Exception as e: + logger.error( + f"Error generating image for scene {scene_num}: {str(e)}", + exc_info=True, + ) + # Fallback to creating a placeholder image + logger.warning(f"Creating placeholder image for scene {scene_num}.") + return self._create_placeholder_image(scene_num, description) + + def _create_placeholder_image( + self, scene_num: Union[int, str], description: str + ) -> str: + """Create a placeholder image with text when image generation fails.""" + width, height = DEFAULT_IMAGE_WIDTH, DEFAULT_IMAGE_HEIGHT + image = Image.new("RGB", (width, height), color=(73, 109, 137)) + draw = ImageDraw.Draw(image) + + try: + # Try loading a common font, fall back to default + font_size = 36 + try: + font = ImageFont.truetype("arial.ttf", font_size) + except IOError: + try: + font = ImageFont.truetype("DejaVuSans.ttf", font_size) # Common on Linux + except IOError: + font = ImageFont.load_default() + logger.warning("Arial/DejaVuSans font not found. Using default PIL font.") + + text = f"Scene {scene_num}\n\nImage Generation Failed\n\nDescription:\n{description}" + + # Simple text wrapping + max_text_width = width - 2 * TEXT_PADDING + lines = [] + words = text.split() + current_line = "" + for word in words: + if not current_line: + test_line = word + else: + test_line = current_line + " " + word + + # Use textbbox for potentially more accurate width estimation + try: + bbox = draw.textbbox((0,0), test_line, font=font) + line_width = bbox[2] - bbox[0] + except AttributeError: # older Pillow versions might not have textbbox + line_width = draw.textlength(test_line, font=font) # Use textlength + + + if line_width <= max_text_width: + current_line = test_line + else: + lines.append(current_line) + current_line = word + lines.append(current_line) # Add the last line + + # Calculate text block height and starting position + total_text_height = 0 + line_heights = [] + for line in lines: + try: + bbox = draw.textbbox((0,0), line, font=font) + h = bbox[3] - bbox[1] + except AttributeError: + # Estimate height if textbbox not available + (_, h) = draw.textsize(line, font=font) + line_heights.append(h) + total_text_height += h + 5 # Add small spacing + + start_y = (height - total_text_height) // 2 + + # Draw text line by line + current_y = start_y + for i, line in enumerate(lines): + try: + bbox = draw.textbbox((0,0), line, font=font) + line_width = bbox[2] - bbox[0] + except AttributeError: + line_width = draw.textlength(line, font=font) + + x_position = (width - line_width) // 2 + draw.text((x_position, current_y), line, fill="white", font=font) + current_y += line_heights[i] + 5 # Move y for next line + + except Exception as font_err: + logger.error(f"Error drawing text on placeholder: {font_err}", exc_info=True) + # Draw a simple error message if font loading/drawing fails + draw.text((10, 10), f"Error creating placeholder text for Scene {scene_num}", fill="red") + + + # Save image + output_path = os.path.join( + self.temp_dir, f"placeholder_scene_{scene_num}_{uuid.uuid4()}.png" + ) + image.save(output_path) + logger.info(f"Saved placeholder image to {output_path}") + return output_path + + def add_text_to_image(self, image_path: str, text: str) -> str: + """ + Add narration text overlayed on an image. + + Args: + image_path: Path to the source image. + text: The narration text to add. + + Returns: + Path to the new image with text overlay. Returns original path on error. + """ + try: + image = Image.open(image_path).convert("RGBA") # Ensure RGBA for overlay + width, height = image.size + + # Create a semi-transparent overlay for the bottom part + overlay_height = int(height * TEXT_AREA_HEIGHT_RATIO) + overlay = Image.new( + "RGBA", (width, overlay_height), (0, 0, 0, TEXT_OVERLAY_ALPHA) + ) + + # Paste overlay onto a copy of the image + image_with_overlay = image.copy() + image_with_overlay.paste( + overlay, (0, height - overlay_height), overlay + ) + + # Prepare to draw text + draw = ImageDraw.Draw(image_with_overlay) + try: + font = ImageFont.truetype("arial.ttf", DEFAULT_FONT_SIZE) + except IOError: + try: + font = ImageFont.truetype("DejaVuSans.ttf", DEFAULT_FONT_SIZE) + except IOError: + font = ImageFont.load_default() + logger.warning("Arial/DejaVuSans font not found. Using default PIL font for overlay.") + + + # Wrap text + max_text_width = width - 2 * TEXT_PADDING + words = text.split() + lines = [] + current_line = "" + + if not words: # Handle empty narration + logger.warning(f"Empty narration text for image {image_path}. No text added.") + return image_path # Return original if no text + + for word in words: + if not current_line: + test_line = word + else: + test_line = current_line + " " + word + + try: + bbox = draw.textbbox((0,0), test_line, font=font) + line_width = bbox[2] - bbox[0] + except AttributeError: + line_width = draw.textlength(test_line, font=font) + + if line_width <= max_text_width: + current_line = test_line + else: + lines.append(current_line) + current_line = word + lines.append(current_line) # Add the last line + + # Calculate starting Y position for text + total_text_height = 0 + line_heights = [] + line_spacing = 10 + for line in lines: + try: + bbox = draw.textbbox((0,0), line, font=font) + h = bbox[3] - bbox[1] + except AttributeError: + (_, h) = draw.textsize(line, font=font) + line_heights.append(h) + total_text_height += h + line_spacing + + total_text_height -= line_spacing # Remove extra spacing after last line + + # Adjust starting position to center text vertically within the overlay area + text_area_top = height - overlay_height + start_y = text_area_top + (overlay_height - total_text_height) // 2 + if start_y < text_area_top + TEXT_PADDING: # Ensure padding from top of overlay + start_y = text_area_top + TEXT_PADDING + + # Draw text lines + current_y = start_y + for i, line in enumerate(lines): + try: + bbox = draw.textbbox((0,0), line, font=font) + line_width = bbox[2] - bbox[0] + except AttributeError: + line_width = draw.textlength(line, font=font) + + x_position = (width - line_width) // 2 # Center horizontally + draw.text( + (x_position, current_y), + line, + fill=DEFAULT_FONT_COLOR, + font=font, + ) + current_y += line_heights[i] + line_spacing + + # Save the new image (use PNG to preserve transparency) + base_name = os.path.basename(image_path) + name, ext = os.path.splitext(base_name) + output_path = os.path.join( + self.temp_dir, f"text_{name}_{uuid.uuid4()}.png" + ) + # Convert back to RGB before saving if target format doesn't need alpha + image_with_overlay.convert("RGB").save(output_path) + logger.info(f"Added text overlay to {image_path}, saved as {output_path}") + return output_path + + except FileNotFoundError: + logger.error(f"Error adding text: Image file not found at {image_path}") + return image_path # Return original path if file is missing + except Exception as e: + logger.error( + f"Error adding text to image {image_path}: {str(e)}", exc_info=True + ) + return image_path # Return original image path if text addition fails + + def download_audio(self, url: str) -> Optional[str]: + """ + Download audio file from a URL. + + Args: + url: URL of the audio file (expects MP3). + + Returns: + Path to the downloaded audio file, or None if download fails. + """ + if not url: + logger.warning("No audio URL provided.") + return None + + logger.info(f"Downloading audio from {url}") + try: + response = requests.get(url, stream=True, timeout=30) # Add timeout + response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) + + # Check content type (optional but recommended) + content_type = response.headers.get('content-type') + if content_type and 'audio' not in content_type: + logger.warning(f"URL content type is '{content_type}', expected audio. Proceeding anyway.") + + audio_path = os.path.join(self.temp_dir, f"background_music_{uuid.uuid4()}.mp3") + with open(audio_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + logger.info(f"Successfully downloaded audio to {audio_path}") + return audio_path + + except requests.exceptions.RequestException as e: + logger.error(f"Error downloading audio from {url}: {str(e)}") + return None + except Exception as e: + logger.error(f"An unexpected error occurred during audio download: {str(e)}", exc_info=True) + return None + + + def create_video( + self, + image_paths: List[str], + audio_path: Optional[str] = None, + fps: int = DEFAULT_FPS, + duration_per_image: int = DEFAULT_DURATION, + ) -> str: + """ + Create a video from a sequence of images with optional audio. + + Args: + image_paths: List of paths to the image files (should include text overlays if added). + audio_path: Path to the background audio file (optional). + fps: Frames per second for the output video. + duration_per_image: How long each image should be displayed in seconds. + + Returns: + Path to the created video file. + + Raises: + Exception: If video creation fails. + FileNotFoundError: If any image path is invalid. + """ + logger.info( + f"Creating video: {len(image_paths)} images, fps={fps}, duration={duration_per_image}s/image" + ) + if not image_paths: + raise ValueError("Cannot create video with no images.") + + # Verify all image paths exist before processing + for img_path in image_paths: + if not os.path.exists(img_path): + raise FileNotFoundError(f"Image file not found: {img_path}") + + try: + # Create a unique output filename + output_path = os.path.join( + self.temp_dir, f"story_video_{uuid.uuid4()}.mp4" + ) + + # Load images and create frames list + # Need to ensure all images are the same size, resize if necessary + frames = [] + target_size = None + + logger.info("Loading and processing images for video...") + for i, img_path in enumerate(image_paths): + try: + img = Image.open(img_path).convert("RGB") # Ensure RGB format for video + + if target_size is None: + target_size = img.size + logger.info(f"Video frame size set to: {target_size}") + elif img.size != target_size: + logger.warning(f"Image {i} ({img_path}) has size {img.size}, resizing to {target_size}") + img = img.resize(target_size, Image.LANCZOS) # Use high-quality resize filter + + # Duplicate frame for the duration + num_frames_per_image = duration_per_image * fps + img_array = np.array(img) + for _ in range(num_frames_per_image): + frames.append(img_array) + except Exception as img_err: + logger.error(f"Error processing image {img_path}: {img_err}", exc_info=True) + # Option: skip image, use placeholder, or raise error + raise Exception(f"Failed to load or process image: {img_path}") from img_err + + + if not frames: + raise ValueError("No valid frames could be generated from the images.") + + # Create video clip from image sequence + logger.info(f"Creating ImageSequenceClip with {len(frames)} total frames.") + clip = ImageSequenceClip(frames, fps=fps) + + # Add audio if provided and valid + final_audio_clip = None + if audio_path and os.path.exists(audio_path): + logger.info(f"Adding audio from: {audio_path}") + try: + audio_clip = AudioFileClip(audio_path) + video_duration = clip.duration + + # Loop or trim audio to match video duration + if audio_clip.duration < video_duration: + logger.info(f"Looping audio (duration {audio_clip.duration}s) for video (duration {video_duration}s)") + final_audio_clip = audio_clip.loop(duration=video_duration) + elif audio_clip.duration > video_duration: + logger.info(f"Trimming audio (duration {audio_clip.duration}s) to video duration ({video_duration}s)") + final_audio_clip = audio_clip.subclip(0, video_duration) + else: + final_audio_clip = audio_clip # Duration matches exactly + + clip = clip.set_audio(final_audio_clip) + except Exception as audio_err: + logger.error(f"Error processing audio file {audio_path}: {audio_err}. Proceeding without audio.", exc_info=True) + # Ensure audio clip resources are closed if error occurs mid-process + if 'audio_clip' in locals() and hasattr(audio_clip, 'close'): + audio_clip.close() + elif audio_path: + logger.warning(f"Audio path provided ({audio_path}) but file not found. Creating video without audio.") + + + # Write video file + logger.info(f"Writing video file to: {output_path}") + # Use sensible codecs and parameters + clip.write_videofile( + output_path, + codec="libx264", # Common and efficient codec + audio_codec="aac", # Standard audio codec + ffmpeg_params=["-pix_fmt", "yuv420p"], # Ensure compatibility + logger='bar' # Show progress bar + ) + + logger.info(f"Successfully created video: {output_path}") + + # Clean up moviepy resources + clip.close() + if final_audio_clip and hasattr(final_audio_clip, 'close'): + final_audio_clip.close() + + return output_path + + except Exception as e: + logger.error(f"Error creating video: {str(e)}", exc_info=True) + # Clean up partial resources if possible + if 'clip' in locals() and hasattr(clip, 'close'): + clip.close() + if 'final_audio_clip' in locals() and hasattr(final_audio_clip, 'close'): + final_audio_clip.close() + + raise Exception(f"Failed to create video: {str(e)}") from e + +# --- Streamlit UI --- + +def write_story_video_generator(): + """Main function to run the Streamlit application interface.""" + st.set_page_config(layout="wide") # Use wider layout + st.title("🎬 AI Story Video Generator") + st.write( + "Create animated story videos using AI. Provide a prompt, and we'll " + "generate a story, visualize it with images, and compile it into a video." + ) + + # Check if dependencies are loaded + if not llm_text_gen or not generate_gemini_image: + st.error("Core AI functionalities could not be loaded. Please check the logs and library paths.") + st.stop() # Stop execution if core parts are missing + + + # Initialize session state variables + if "story_data" not in st.session_state: + st.session_state.story_data = None + if "generated_images" not in st.session_state: + st.session_state.generated_images = [] # Stores paths to final images (with text if added) + if "original_images" not in st.session_state: + st.session_state.original_images = [] # Stores paths to original generated images + if "video_path" not in st.session_state: + st.session_state.video_path = None + # Use a single generator instance stored in session state? + # Could be useful if it holds state, but here it seems stateless except for temp_dir + # Creating it when needed might be simpler. + + + tab1, tab2, tab3, tab4 = st.tabs( + ["**1. Story Prompt**", "**2. Storyboard**", "**3. Generate Images**", "**4. Create Video**"] + ) + + # --- Step 1: Story Prompt --- + with tab1: + st.header("Step 1: Create Your Story") + + col1, col2 = st.columns([2, 1]) + + with col1: + story_prompt = st.text_area( + "Enter your story idea:", + placeholder="e.g., A brave squirrel who learns to fly with the help of an old owl.", + height=100, + key="story_prompt_input" + ) + + col1_1, col1_2 = st.columns(2) + with col1_1: + num_scenes = st.slider( + "Number of Scenes", min_value=2, max_value=10, value=4, key="num_scenes_slider" + ) + with col1_2: + story_style = st.selectbox( + "Story Style", + [ + "children's story", + "adventure story", + "fairy tale", + "sci-fi story", + "fantasy story", + "mystery story", + "fable", + ], + key="story_style_select" + ) + + with col2: + st.markdown("#### Tips for Good Prompts") + st.markdown( + """ + * **Be specific:** Mention characters, setting, and the main plot point. + * **Include conflict:** What challenge do the characters face? + * **Suggest a mood:** Happy, mysterious, exciting? + * **Target Audience:** Helps the AI tailor the tone (e.g., "for young children"). + * **Example:** "A funny children's story about a clumsy robot trying to bake a cake for its creator's birthday in a futuristic kitchen." + """ + ) + + if st.button("✨ Generate Story", type="primary", key="generate_story_button"): + if not story_prompt: + st.error("Please enter a story prompt.") + else: + with st.spinner("✍️ Generating your story... This may take a moment."): + try: + generator = StoryVideoGenerator() # Create instance + story_data = generator.generate_story( + story_prompt, num_scenes, story_style + ) + st.session_state.story_data = story_data + # Reset downstream states + st.session_state.generated_images = [] + st.session_state.original_images = [] + st.session_state.video_path = None + st.success( + "Story generated successfully! Proceed to the **Storyboard** tab to review and edit." + ) + # Consider automatically switching tabs here if desired (more complex JS interaction) + except Exception as e: + st.error(f"Error generating story: {str(e)}") + logger.error("Story generation failed in UI", exc_info=True) + + + # --- Step 2: Storyboard --- + with tab2: + st.header("Step 2: Review Your Storyboard") + + if st.session_state.story_data: + story_data = st.session_state.story_data + + st.subheader(f"Title: {story_data.get('title', 'Untitled Story')}") + st.markdown("Review and edit the scene descriptions and narrations below.") + + # Use st.form to batch edits? Could be smoother but adds complexity. + # Simple sequential editing for now. + edited = False + for i, scene in enumerate(story_data["scenes"]): + st.markdown("---") + st.markdown(f"**🎬 Scene {scene.get('scene_number', i+1)}**") + + # Store original values for comparison/reset? + original_desc = scene.get("description", "") + original_narr = scene.get("narration", "") + + # Use unique keys for each text area + desc_key = f"desc_{scene.get('scene_number', i)}" + narr_key = f"narr_{scene.get('scene_number', i)}" + + new_description = st.text_area( + "Visual Description (for image generation)", + value=original_desc, + key=desc_key, + height=100 + ) + new_narration = st.text_area( + "Narration Text (for voiceover/overlay)", + value=original_narr, + key=narr_key, + height=100 + ) + + # Update the scene data in session state if changed + if new_description != original_desc: + st.session_state.story_data["scenes"][i]["description"] = new_description + edited = True + if new_narration != original_narr: + st.session_state.story_data["scenes"][i]["narration"] = new_narration + edited = True + + if edited: + # Use st.info for non-blocking notification + st.info("Changes saved in session. Proceed when ready.") + + if st.button("🖼️ Proceed to Image Generation", type="primary", key="proceed_to_images_button"): + # Re-check story_data exists before proceeding + if not st.session_state.story_data or not st.session_state.story_data.get("scenes"): + st.error("No story data available. Please generate a story first.") + else: + # Reset image/video state if proceeding from edits + st.session_state.generated_images = [] + st.session_state.original_images = [] + st.session_state.video_path = None + st.success("Ready! Go to the **Generate Images** tab.") + + else: + st.info("Generate a story in **Step 1** first.") + + # --- Step 3: Generate Images --- + with tab3: + st.header("Step 3: Generate Scene Images") + + if st.session_state.story_data and st.session_state.story_data.get("scenes"): + story_data = st.session_state.story_data + + col1, col2 = st.columns([1, 2]) # Settings | Preview + + with col1: + st.subheader("Image Settings") + image_style = st.selectbox( + "Image Style", + [ + "digital art", + "cartoon", + "watercolor", + "photorealistic", # Changed 'realistic' + "anime", + "pixel art", + "oil painting", + "line art", + "cinematic", + ], + key="image_style_select" + ) + + include_text = st.checkbox( + "Overlay narration text on images", value=True, key="include_text_checkbox" + ) + + st.markdown("---") + + if st.button("🎨 Generate All Images", type="primary", key="generate_images_button"): + # Check if images already exist for current story? Ask to regenerate? + # Simple approach: always regenerate when button is clicked. + with st.spinner("Generating images... This can take some time depending on the number of scenes."): + try: + generator = StoryVideoGenerator() # New instance for this task + generated_images = [] + original_images = [] # Store originals separately + + num_scenes_total = len(story_data["scenes"]) + progress_bar = st.progress(0.0) + status_text = st.empty() + + for i, scene in enumerate(story_data["scenes"]): + status_text.text(f"Generating image for scene {i+1}/{num_scenes_total}...") + # Generate the base image + original_image_path = generator.generate_scene_image( + scene, image_style + ) + original_images.append(original_image_path) + + # Add text if requested + if include_text: + status_text.text(f"Adding text overlay for scene {i+1}...") + final_image_path = generator.add_text_to_image( + original_image_path, scene.get("narration", "") + ) + # Check if text addition failed (returned original path) + if final_image_path == original_image_path and scene.get("narration", ""): + st.warning(f"Could not add text overlay to scene {i+1}. Using original image.") + generated_images.append(final_image_path) + else: + generated_images.append(original_image_path) # Use original if no text needed + + progress_bar.progress((i + 1) / num_scenes_total) + + status_text.text("Image generation complete!") + st.session_state.original_images = original_images + st.session_state.generated_images = generated_images + st.session_state.video_path = None # Reset video path + st.success( + "All images generated! Review them here and proceed to the **Create Video** tab." + ) + except FileNotFoundError as fnf_err: + st.error(f"Image Generation Error: A required file was not found. {fnf_err}") + logger.error("Image generation failed due to FileNotFoundError", exc_info=True) + except Exception as e: + st.error(f"Error generating images: {str(e)}") + logger.error("Image generation failed in UI", exc_info=True) + # Clear potentially partial results + st.session_state.generated_images = [] + st.session_state.original_images = [] + + + with col2: + st.subheader("Image Preview") + if st.session_state.generated_images: + # Display images (final versions with text if applicable) + for i, img_path in enumerate(st.session_state.generated_images): + scene_num = st.session_state.story_data["scenes"][i].get("scene_number", i+1) + st.image( + img_path, + caption=f"Scene {scene_num}", + use_column_width=True, + ) + else: + st.info( + "Click 'Generate All Images' after configuring settings." + ) + else: + st.info( + "Please generate or review a story in **Step 1** or **Step 2** first." + ) + + + # --- Step 4: Create Video --- + with tab4: + st.header("Step 4: Create Your Story Video") + + if st.session_state.generated_images: + col1, col2 = st.columns([1, 1]) # Settings | Video Player + + with col1: + st.subheader("Video Settings") + fps = st.slider( + "Frames Per Second (Video Smoothness)", + min_value=1, + max_value=30, + value=max(DEFAULT_FPS, 10), # Default to slightly smoother + key="fps_slider" + ) + duration_per_image = st.slider( + "Seconds Per Scene", + min_value=1, + max_value=15, + value=DEFAULT_DURATION, + key="duration_slider" + ) + + st.markdown("---") + st.subheader("Audio Settings") + use_music = st.checkbox("Add background music", value=True, key="use_music_checkbox") + + music_url_to_use = None + if use_music: + music_option = st.radio( + "Music Source", + ["Use default soothing music", "Provide music URL (MP3)"], + key="music_option_radio", + horizontal=True + ) + + if music_option == "Provide music URL (MP3)": + custom_music_url = st.text_input( + "Music URL (must be direct MP3 link)", + placeholder="https://example.com/path/to/music.mp3", + key="custom_music_url_input" + ) + if custom_music_url: + music_url_to_use = custom_music_url + else: + # Explicitly set to None if field is empty but option selected + music_url_to_use = None + else: + music_url_to_use = DEFAULT_MUSIC_URL + st.caption(f"Using default: {DEFAULT_MUSIC_URL}") + + st.markdown("---") + + if st.button("🎞️ Create Video", type="primary", key="create_video_button"): + if not st.session_state.generated_images: + st.error("No images found. Please generate images in Step 3.") + else: + with st.spinner("🎬 Creating your story video... This might take some time."): + try: + generator = StoryVideoGenerator() # New instance + + # Download audio if requested and URL is valid + audio_path = None + if use_music and music_url_to_use: + status_text = st.empty() + status_text.text("Downloading background music...") + audio_path = generator.download_audio(music_url_to_use) + if not audio_path: + st.warning("Failed to download music. Proceeding without audio.") + status_text.text("Music download failed. Continuing...") + else: + status_text.text("Music downloaded.") + + # Create video using the final generated images + status_text.text("Compiling video...") + video_path = generator.create_video( + st.session_state.generated_images, # Use images (potentially with text) + audio_path, + fps, + duration_per_image, + ) + + st.session_state.video_path = video_path + status_text.empty() # Clear status message + st.success("Video created successfully!") + + except FileNotFoundError as fnf_err: + st.error(f"Video Creation Error: A required file was not found. {fnf_err}") + logger.error("Video creation failed due to FileNotFoundError", exc_info=True) + except Exception as e: + st.error(f"Error creating video: {str(e)}") + logger.error("Video creation failed in UI", exc_info=True) + st.session_state.video_path = None # Clear video path on error + + with col2: + st.subheader("Video Preview") + if st.session_state.video_path: + try: + video_file = open(st.session_state.video_path, "rb") + video_bytes = video_file.read() + st.video(video_bytes) + video_file.close() # Close the file handle + + # Prepare download button + try: + video_title = st.session_state.story_data.get("title", "story") + safe_title = re.sub(r'[^\w\-]+', '_', video_title) # Sanitize title for filename + download_filename = f"{safe_title}_video.mp4" + + st.download_button( + label="⬇️ Download Video", + data=video_bytes, # Use the bytes already read + file_name=download_filename, + mime="video/mp4", + key="download_video_button" + ) + except Exception as download_err: + st.error(f"Error preparing download button: {download_err}") + + except FileNotFoundError: + st.error("The generated video file could not be found. Please try generating again.") + st.session_state.video_path = None # Reset state + except Exception as display_err: + st.error(f"Error displaying video: {display_err}") + logger.error("Error displaying video in UI", exc_info=True) + + + else: + st.info( + "Click 'Create Video' after generating images and configuring settings." + ) + else: + st.info( + "Please generate images in **Step 3** first." + ) + + # --- Footer --- + st.markdown("---") + st.markdown( + "Powered by AI | Story generation, image creation, and video compilation." + ) + # Add link to your repo or project if desired + # st.markdown("[GitHub Repository](your-link-here)") + +if __name__ == "__main__": + # Ensure essential libraries are installed + try: + import streamlit + import numpy + import PIL + import requests + import moviepy + # Optionally check for google-generativeai if it's the backend + # import google.generativeai + except ImportError as e: + print(f"Error: Missing required library: {e.name}") + print("Please install all required libraries: pip install streamlit numpy Pillow requests moviepy") + # Add other dependencies like google-generativeai if needed + exit(1) + + write_story_video_generator() \ No newline at end of file diff --git a/lib/ai_writers/ai_story_video_generator/utils.py b/lib/ai_writers/ai_story_video_generator/utils.py new file mode 100644 index 00000000..81d5a96b --- /dev/null +++ b/lib/ai_writers/ai_story_video_generator/utils.py @@ -0,0 +1,64 @@ +""" +Utility functions for the AI Story Video Generator. +""" + +import os +import tempfile +import uuid +from pathlib import Path +from typing import Optional + +# Constants +TEMP_DIR = Path(tempfile.gettempdir()) / "alwrity_story_generator" + +def ensure_temp_dir() -> Path: + """Ensure the temporary directory exists and return its path.""" + os.makedirs(TEMP_DIR, exist_ok=True) + return TEMP_DIR + +def get_temp_filepath(prefix: str, extension: str) -> str: + """Generate a temporary file path with the given prefix and extension.""" + temp_dir = ensure_temp_dir() + return str(temp_dir / f"{prefix}_{uuid.uuid4()}.{extension}") + +def clean_temp_files(older_than_hours: int = 24) -> int: + """ + Clean temporary files older than the specified number of hours. + + Args: + older_than_hours: Remove files older than this many hours + + Returns: + Number of files removed + """ + import time + from datetime import datetime, timedelta + + temp_dir = ensure_temp_dir() + cutoff_time = time.time() - (older_than_hours * 3600) + count = 0 + + for file_path in temp_dir.glob("*"): + if file_path.is_file() and file_path.stat().st_mtime < cutoff_time: + try: + file_path.unlink() + count += 1 + except Exception: + pass + + return count + +def format_duration(seconds: float) -> str: + """Format seconds into a MM:SS string.""" + minutes = int(seconds // 60) + remaining_seconds = int(seconds % 60) + return f"{minutes}:{remaining_seconds:02d}" + +def sanitize_filename(filename: str) -> str: + """Sanitize a string to be used as a filename.""" + import re + # Remove invalid characters + sanitized = re.sub(r'[^\w\s-]', '', filename) + # Replace spaces with underscores + sanitized = sanitized.strip().replace(' ', '_') + return sanitized \ No newline at end of file diff --git a/lib/alwrity_ui/keyword_web_researcher.py b/lib/alwrity_ui/keyword_web_researcher.py index 7158d985..b9063381 100644 --- a/lib/alwrity_ui/keyword_web_researcher.py +++ b/lib/alwrity_ui/keyword_web_researcher.py @@ -494,36 +494,89 @@ def do_web_research(): with progress_col: progress_bar = st.progress(0) + def update_progress(message, progress=None, level="info"): + """Update progress bar and status display. + + Args: + message (str): The message to display + progress (float, optional): Progress value between 0 and 100. Will be converted to 0.0-1.0 + level (str, optional): Message level (info, warning, error, success) + """ + if progress is not None: + # Convert percentage to decimal (0.0-1.0) + progress = float(progress) / 100.0 + # Ensure progress stays within bounds + progress = max(0.0, min(1.0, progress)) + progress_bar.progress(progress) + + if level == "error": + status_display.error(f"🚫 {message}") + elif level == "warning": + status_display.warning(f"⚠️ {message}") + elif level == "success": + status_display.success(f"✨ {message}") + else: + status_display.info(f"🔄 {message}") + logger.debug(f"Progress update [{level}]: {message}") + # Execute search with all parameters - web_research_result = gpt_web_researcher( - search_keywords=st.session_state.research_options["primary_keywords"], - search_mode=st.session_state.research_options["search_mode"], - related_keywords=st.session_state.research_options["related_keywords"], - target_audience=st.session_state.research_options["target_audience"], - content_type=st.session_state.research_options["content_type"], - search_depth=st.session_state.research_options["search_depth"], - geo_location=st.session_state.research_options["geo_location"], - search_language=st.session_state.research_options["search_language"], - num_results=st.session_state.research_options["num_results"], - time_range=st.session_state.research_options["time_range"], - include_domains=st.session_state.research_options["include_domains"], - similar_url=st.session_state.research_options["similar_url"] - ) + try: + update_progress("Starting search...", 0.25) + logger.info(f"Executing web research with mode: {st.session_state.research_options['search_mode']}") + + # Create base parameters + research_params = { + "search_keywords": st.session_state.research_options["primary_keywords"], + "search_mode": st.session_state.research_options["search_mode"], + "related_keywords": st.session_state.research_options["related_keywords"], + "target_audience": st.session_state.research_options["target_audience"], + "content_type": st.session_state.research_options["content_type"], + "search_depth": st.session_state.research_options["search_depth"], + "geo_location": st.session_state.research_options["geo_location"], + "search_language": st.session_state.research_options["search_language"], + "num_results": st.session_state.research_options["num_results"], + "time_range": st.session_state.research_options["time_range"], + "include_domains": st.session_state.research_options["include_domains"], + "similar_url": st.session_state.research_options["similar_url"] + } + + # Add UI-specific parameters + research_params.update({ + "status_container": status_display, + "update_progress": update_progress + }) + + # For AI search mode, ensure search_keywords is passed correctly + if st.session_state.research_options["search_mode"] == "ai": + research_params["tavily_params"] = { + "max_results": st.session_state.research_options["num_results"], + "search_depth": "advanced" if st.session_state.research_options["search_depth"] > 2 else "basic", + "time_range": st.session_state.research_options["time_range"], + "include_domains": st.session_state.research_options["include_domains"].split(",") if st.session_state.research_options["include_domains"] else [""] + } + # Pass search_keywords as a positional argument + research_params["tavily_search_keywords"] = st.session_state.research_options["primary_keywords"] + + # Execute the research + web_research_result = gpt_web_researcher(**research_params) + + if web_research_result: + status_display.success("✨ Research completed!") + + # Display results in an organized way + with st.expander("📊 Research Results", expanded=False): + st.write(web_research_result) + else: + st.warning("No results found for your search") + + except Exception as e: + error_msg = f"Research failed: {str(e)}" + logger.error(error_msg, exc_info=True) + st.error(f"🚫 Research failed: {error_msg}") - if web_research_result: - status_display.success("✨ Research completed!") - - # Display results in an organized way - with st.expander("📊 Research Results", expanded=False): - st.write(web_research_result) - else: - st.warning("No results found for your search") - except Exception as e: - error_msg = f"Research failed: {str(e)}" - logger.error(error_msg, exc_info=True) - st.error(f"🚫 Research failed: {error_msg}") - + logger.error(f"Unexpected error in web research: {e}", exc_info=True) + st.error("🚫 An unexpected error occurred. Please try again.") except Exception as e: logger.error(f"Unexpected error in web research: {e}", exc_info=True) st.error("🚫 An unexpected error occurred. Please try again.") \ No newline at end of file diff --git a/lib/alwrity_ui/similar_analysis.py b/lib/alwrity_ui/similar_analysis.py index 87ba7e54..d7e844a5 100644 --- a/lib/alwrity_ui/similar_analysis.py +++ b/lib/alwrity_ui/similar_analysis.py @@ -3,7 +3,22 @@ from lib.ai_web_researcher.metaphor_basic_neural_web_search import metaphor_find from datetime import datetime, timedelta import re import urllib.parse +import logging +# Configure logging +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +# Create console handler if it doesn't exist +if not logger.handlers: + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.DEBUG) + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) def is_valid_url(url): """ @@ -17,15 +32,20 @@ def is_valid_url(url): """ try: result = urllib.parse.urlparse(url) - return all([result.scheme, result.netloc]) - except: + is_valid = all([result.scheme, result.netloc]) + logger.debug(f"URL validation for {url}: {is_valid}") + return is_valid + except Exception as e: + logger.error(f"URL validation error for {url}: {str(e)}") return False - def competitor_analysis(): + logger.info("Starting competitor analysis") + # Initialize session state for progress bar visibility if 'show_progress' not in st.session_state: st.session_state.show_progress = True + logger.debug("Initialized show_progress session state") st.title("Competitor Analysis") st.markdown("""**Use Cases:** @@ -44,6 +64,7 @@ def competitor_analysis(): # Validate URL url_valid = is_valid_url(similar_url) if similar_url else False if similar_url and not url_valid: + logger.warning(f"Invalid URL provided: {similar_url}") st.error("⚠️ Please enter a valid URL including http:// or https://") # Usecase selection with improved help @@ -52,6 +73,7 @@ def competitor_analysis(): ["similar companies", "listicles", "Top tools", "alternative-to", "similar products", "similar websites"], help="Choose the type of analysis you want to perform" ) + logger.debug(f"Selected usecase: {usecase}") # Default summary query based on usecase default_summary_queries = { @@ -65,6 +87,7 @@ def competitor_analysis(): # Advanced options using a modal dialog show_advanced = st.checkbox("Show Advanced Options", help="Configure additional search parameters") + logger.debug(f"Advanced options shown: {show_advanced}") # Initialize default values num_results = 5 @@ -96,6 +119,7 @@ def competitor_analysis(): # Advanced options section if show_advanced: + logger.debug("Processing advanced options") st.markdown("### 🔧 Advanced Search Options") # Summary query with improved help in a card @@ -192,6 +216,9 @@ def competitor_analysis(): if st.button("Analyze", disabled=not url_valid if similar_url else False): if similar_url and url_valid: try: + logger.info(f"Starting analysis for URL: {similar_url}") + logger.debug(f"Analysis parameters - Usecase: {usecase}, Num Results: {num_results}, Time Range: {time_range}") + # Create a progress container progress_container = st.empty() status_container = st.empty() @@ -201,11 +228,8 @@ def competitor_analysis(): status_container.info(f"Starting analysis for the URL: {similar_url}") # Create a progress bar - progress_bar = progress_container.progress(0) - - # Update progress and status - progress_bar.progress(10) - status_container.info("Initializing search parameters...") + progress_bar = progress_container.progress(0.1) + logger.debug("Initialized progress bar and status containers") # Calculate date range based on selection start_date = None @@ -219,6 +243,7 @@ def competitor_analysis(): start_date = end_date - timedelta(days=30) elif time_range == "Past Year": start_date = end_date - timedelta(days=365) + logger.debug(f"Date range: {start_date} to {end_date}") # Format dates for API if they exist start_published_date = start_date.strftime("%Y-%m-%dT%H:%M:%S.000Z") if start_date else None @@ -228,41 +253,48 @@ def competitor_analysis(): summary_query_param = None if summary_query: summary_query_param = {"query": summary_query} + logger.debug(f"Summary query: {summary_query}") # Update progress - progress_bar.progress(20) + progress_bar.progress(0.2) status_container.info("Searching for similar content...") + logger.info("Initiating similar content search") # Call the metaphor_find_similar function with all parameters with st.spinner("Performing competitor analysis..."): - # Update progress - progress_bar.progress(30) - status_container.info("Finding similar content...") - - # Call the API - df, search_response = metaphor_find_similar( - similar_url=similar_url, - usecase=usecase, - num_results=num_results, - start_published_date=start_published_date, - end_published_date=end_published_date, - include_domains=include_domains, - exclude_domains=exclude_domains, - include_text=include_text, - exclude_text=exclude_text, - summary_query=summary_query_param - ) - - # Update progress - progress_bar.progress(70) - status_container.info("Processing and analyzing results...") - - # Update progress to complete - progress_bar.progress(100) - status_container.success("Analysis completed successfully!") + logger.debug("Calling metaphor_find_similar API") + try: + df, search_response = metaphor_find_similar( + similar_url=similar_url, + usecase=usecase, + num_results=num_results, + start_published_date=start_published_date, + end_published_date=end_published_date, + include_domains=include_domains, + exclude_domains=exclude_domains, + include_text=include_text, + exclude_text=exclude_text, + summary_query=summary_query_param + ) + logger.info(f"API call successful. Found {len(df) if not df.empty else 0} results") + + # Update progress + progress_bar.progress(0.7) + status_container.info("Processing and analyzing results...") + logger.debug("Processing search results") + + # Update progress to complete + progress_bar.progress(1.0) + status_container.success("Analysis completed successfully!") + logger.info("Analysis completed successfully") + + except Exception as api_error: + logger.error(f"API call failed: {str(api_error)}", exc_info=True) + raise - # Display results using data editor + # Display results if not df.empty: + logger.debug(f"Displaying {len(df)} results") st.subheader("📊 Competitor Analysis Results") # Add a download button for the results @@ -331,8 +363,12 @@ def competitor_analysis(): with st.expander("View Raw Data"): st.json(search_response) else: + logger.warning("No results found for the given URL and parameters") st.warning("No results found for the given URL and parameters.") + except Exception as err: + logger.error(f"Analysis failed: {str(err)}", exc_info=True) st.error(f"✖ 🚫 Failed to do similar search.\nError: {err}") else: + logger.warning("Analysis attempted without valid URL") st.error("Please enter a valid URL.") \ No newline at end of file