ALwrity Facebook Writer CopilotKit Implementation Plan

This commit is contained in:
ajaysi
2025-08-31 18:41:07 +05:30
parent 66c14e158c
commit eb0789321d
11 changed files with 2116 additions and 206 deletions

View File

@@ -30,10 +30,27 @@ class StoryTone(str, Enum):
class StoryVisualOptions(BaseModel):
"""Visual options for story."""
background_type: str = Field(default="Solid color", description="Background type")
# Background layer
background_type: str = Field(default="Solid color", description="Background type (Solid color, Gradient, Image, Video)")
background_image_prompt: Optional[str] = Field(None, description="If background_type is Image/Video, describe desired visual")
gradient_style: Optional[str] = Field(None, description="Gradient style if gradient background is chosen")
# Text overlay styling
text_overlay: bool = Field(default=True, description="Include text overlay")
text_style: Optional[str] = Field(None, description="Headline/Subtext style, e.g., Bold, Minimal, Handwritten")
text_color: Optional[str] = Field(None, description="Preferred text color or palette")
text_position: Optional[str] = Field(None, description="Top/Center/Bottom; Left/Center/Right")
# Embellishments and interactivity
stickers: bool = Field(default=True, description="Use stickers/emojis")
interactive_elements: bool = Field(default=True, description="Include polls/questions")
interactive_types: Optional[List[str]] = Field(
default=None,
description="List of interactive types like ['poll','quiz','slider','countdown']"
)
# CTA overlay
call_to_action: Optional[str] = Field(None, description="Optional CTA copy to place on story")
class FacebookStoryRequest(BaseModel):
@@ -47,12 +64,20 @@ class FacebookStoryRequest(BaseModel):
include: Optional[str] = Field(None, description="Elements to include in the story")
avoid: Optional[str] = Field(None, description="Elements to avoid in the story")
visual_options: StoryVisualOptions = Field(default_factory=StoryVisualOptions, description="Visual customization options")
# Advanced text generation options (parity with original Streamlit module)
use_hook: bool = Field(default=True, description="Start with a hook to grab attention")
use_story: bool = Field(default=True, description="Use a short narrative arc")
use_cta: bool = Field(default=True, description="Include a call to action")
use_question: bool = Field(default=True, description="Ask a question to spur interaction")
use_emoji: bool = Field(default=True, description="Use emojis where appropriate")
use_hashtags: bool = Field(default=True, description="Include relevant hashtags in copy")
class FacebookStoryResponse(BaseModel):
"""Response model for Facebook story generation."""
success: bool = Field(..., description="Whether the generation was successful")
content: Optional[str] = Field(None, description="Generated story content")
images_base64: Optional[List[str]] = Field(None, description="List of base64-encoded story images (PNG)")
visual_suggestions: Optional[List[str]] = Field(None, description="Visual element suggestions")
engagement_tips: Optional[List[str]] = Field(None, description="Engagement optimization tips")
error: Optional[str] = Field(None, description="Error message if generation failed")

View File

@@ -2,6 +2,7 @@
from typing import Dict, Any, List
from ..models import *
from ..models.carousel_models import CarouselSlide
from .base_service import FacebookWriterBaseService

View File

@@ -3,6 +3,12 @@
from typing import Dict, Any, List
from ..models.story_models import FacebookStoryRequest, FacebookStoryResponse
from .base_service import FacebookWriterBaseService
try:
from ...services.llm_providers.text_to_image_generation.gen_gemini_images import (
generate_gemini_images_base64,
)
except Exception:
generate_gemini_images_base64 = None # type: ignore
class FacebookStoryService(FacebookWriterBaseService):
@@ -38,10 +44,28 @@ class FacebookStoryService(FacebookWriterBaseService):
# Generate visual suggestions and engagement tips
visual_suggestions = self._generate_visual_suggestions(actual_story_type, request.visual_options)
engagement_tips = self._generate_engagement_tips("story")
# Optional: generate one story image (9:16) using Gemini
images_base64: List[str] = []
try:
if generate_gemini_images_base64 is not None:
img_prompt = request.visual_options.background_image_prompt or (
f"Facebook story background for {request.business_type}. "
f"Style: {actual_tone}. Type: {actual_story_type}. Vertical mobile 9:16, high contrast, legible overlay space."
)
images_base64 = generate_gemini_images_base64(
img_prompt,
enhance_prompt=False,
aspect_ratio="9:16",
max_retries=2,
initial_retry_delay=1.0,
) or []
except Exception:
images_base64 = []
return FacebookStoryResponse(
success=True,
content=content,
images_base64=images_base64[:1],
visual_suggestions=visual_suggestions,
engagement_tips=engagement_tips,
metadata={
@@ -75,6 +99,28 @@ class FacebookStoryService(FacebookWriterBaseService):
f"Create a {story_type} story"
)
# Advanced writing flags
advanced_lines = []
if getattr(request, "use_hook", True):
advanced_lines.append("- Start with a compelling hook in the first line")
if getattr(request, "use_story", True):
advanced_lines.append("- Use a mini narrative with a clear flow")
if getattr(request, "use_cta", True):
cta_text = request.visual_options.call_to_action or "Add a clear call-to-action"
advanced_lines.append(f"- Include a CTA: {cta_text}")
if getattr(request, "use_question", True):
advanced_lines.append("- Ask a question to prompt replies or taps")
if getattr(request, "use_emoji", True):
advanced_lines.append("- Use a few relevant emojis for tone and scannability")
if getattr(request, "use_hashtags", True):
advanced_lines.append("- Include 1-3 relevant hashtags if appropriate")
advanced_str = "\n".join(advanced_lines)
# Visual details
v = request.visual_options
interactive_types_str = ", ".join(v.interactive_types) if v.interactive_types else "None specified"
prompt = f"""
{base_prompt}
@@ -86,12 +132,20 @@ class FacebookStoryService(FacebookWriterBaseService):
Content Requirements:
- Include: {request.include or 'N/A'}
- Avoid: {request.avoid or 'N/A'}
{('\n' + advanced_str) if advanced_str else ''}
Visual Options:
- Background Type: {request.visual_options.background_type}
- Text Overlay: {request.visual_options.text_overlay}
- Stickers/Emojis: {request.visual_options.stickers}
- Interactive Elements: {request.visual_options.interactive_elements}
- Background Type: {v.background_type}
- Background Visual Prompt: {v.background_image_prompt or 'N/A'}
- Gradient Style: {v.gradient_style or 'N/A'}
- Text Overlay: {v.text_overlay}
- Text Style: {v.text_style or 'N/A'}
- Text Color: {v.text_color or 'N/A'}
- Text Position: {v.text_position or 'N/A'}
- Stickers/Emojis: {v.stickers}
- Interactive Elements: {v.interactive_elements}
- Interactive Types: {interactive_types_str}
- Call To Action: {v.call_to_action or 'N/A'}
Please create a Facebook Story that:
1. Is optimized for mobile viewing (vertical format)
@@ -137,14 +191,28 @@ class FacebookStoryService(FacebookWriterBaseService):
])
# Add general suggestions based on visual options
if visual_options.text_overlay:
if getattr(visual_options, "text_overlay", True):
suggestions.append("Use bold, readable fonts for text overlays")
if visual_options.stickers:
if getattr(visual_options, "text_style", None):
suggestions.append(f"Match text style to tone: {visual_options.text_style}")
if getattr(visual_options, "text_color", None):
suggestions.append(f"Ensure sufficient contrast with text color: {visual_options.text_color}")
if getattr(visual_options, "text_position", None):
suggestions.append(f"Place text at {visual_options.text_position} to avoid occluding subject")
if getattr(visual_options, "stickers", True):
suggestions.append("Add relevant emojis and stickers to increase engagement")
if visual_options.interactive_elements:
if getattr(visual_options, "interactive_elements", True):
suggestions.append("Include polls, questions, or swipe-up actions")
if getattr(visual_options, "interactive_types", None):
suggestions.append(f"Try interactive types: {', '.join(visual_options.interactive_types)}")
if getattr(visual_options, "background_type", None) in {"Image", "Video"} and getattr(visual_options, "background_image_prompt", None):
suggestions.append("Source visuals based on background prompt for consistency")
if getattr(visual_options, "call_to_action", None):
suggestions.append(f"Overlay CTA copy near focal point: {visual_options.call_to_action}")
return suggestions

View File

@@ -2,11 +2,11 @@ import os
import sys
import time
import datetime
import streamlit as st
import base64
from typing import List, Optional, Tuple
from PIL import Image
from io import BytesIO
from loguru import logger
from tenacity import retry, stop_after_attempt, wait_random_exponential
import logging
# Import APIKeyManager
from ...api_key_manager import APIKeyManager
@@ -16,7 +16,9 @@ try:
from google.generativeai import types
except ImportError:
genai = None
logger.warning("Google genai library not available. Install with: pip install google-generativeai")
logging.getLogger('gemini_image_generator').warning(
"Google genai library not available. Install with: pip install google-generativeai"
)
from .save_image import save_generated_image
@@ -28,9 +30,8 @@ logging.basicConfig(
)
logger = logging.getLogger('gemini_image_generator')
# With image generation in Gemini, your imagination is the limit.
# If what you see doesn't quite match what you had in mind, try adding more details to the prompt.
# The more specific you are, the better Gemini can create images that reflect your vision.
# With image generation in Gemini, your imagination is the limit.
# Follow Google AI best practices for detailed prompts and iterative refinement.
# Generate images using Gemini
# Gemini 2.0 Flash Experimental supports the ability to output text and inline images.
@@ -167,161 +168,131 @@ class AIPromptGenerator:
return ", ".join(prompt_parts)
def generate_gemini_image(prompt, keywords=None, style=None, focus=None, enhance_prompt=True, max_retries=3, initial_retry_delay=2, aspect_ratio="16:9"):
"""
Generate an image using Gemini's image generation capabilities.
Args:
prompt (str): The text prompt for image generation
keywords (list, optional): Keywords to enhance the prompt
style (str, optional): Style of the image (photorealistic, artistic, etc.)
focus (str, optional): Focus area for photorealistic images
enhance_prompt (bool, optional): Whether to enhance the prompt with AI
max_retries (int, optional): Maximum number of retry attempts
initial_retry_delay (int, optional): Initial delay between retries
aspect_ratio (str, optional): Aspect ratio for the generated image
Returns:
str: The path to the generated image.
"""
logger.info(f"Generating image with prompt: '{prompt[:100]}...'")
# Use APIKeyManager instead of direct environment variable access
def _ensure_client() -> Optional[object]:
"""Create a Gemini client if available and API key is configured."""
api_key_manager = APIKeyManager()
api_key = api_key_manager.get_api_key("gemini")
if not api_key:
error_msg = "Gemini API key not found. Please configure it in the onboarding process."
logger.error(error_msg)
st.error(f"🔑 {error_msg}")
if not api_key or genai is None:
return None
# Enhance the prompt if requested
try:
return genai.Client(api_key=api_key)
except Exception:
return None
def generate_gemini_images_base64(
prompt: str,
*,
keywords: Optional[list] = None,
style: Optional[str] = None,
focus: Optional[str] = None,
enhance_prompt: bool = True,
aspect_ratio: str = "9:16",
max_retries: int = 2,
initial_retry_delay: float = 1.0,
) -> List[str]:
"""
Return list of base64 PNG images generated from a prompt.
Implements best practices per Gemini docs: send text prompt, parse inline image parts,
and return base64 data suitable for API responses. No Streamlit, no printing.
Docs: https://ai.google.dev/gemini-api/docs/image-generation
"""
logger = logging.getLogger('gemini_image_generator')
logger.info("Generating image (base64) with Gemini")
if enhance_prompt and keywords:
prompt_generator = AIPromptGenerator()
if style == "photorealistic" and focus:
logger.info(f"Generating photorealistic prompt with focus: {focus}")
enhanced_prompt = prompt_generator.generate_photorealistic_prompt(keywords, focus)
else:
logger.info("Generating enhanced prompt")
enhanced_prompt = prompt_generator.generate_prompt(keywords)
# Combine the enhanced prompt with the original prompt
prompt = f"{prompt}\n\nEnhanced prompt: {enhanced_prompt}"
logger.info(f"Final prompt: '{prompt[:100]}...'")
# Add aspect ratio to the prompt
pg = AIPromptGenerator()
enhanced = (
pg.generate_photorealistic_prompt(keywords, focus)
if style == "photorealistic" and focus
else pg.generate_prompt(keywords)
)
prompt = f"{prompt}\n\nEnhanced prompt: {enhanced}"
# Optional hint in-text for aspect ratio; API doesn't take ratio param directly
if aspect_ratio:
prompt += f"\n\nPlease generate the image with {aspect_ratio} aspect ratio."
retry_count = 0
retry_delay = initial_retry_delay
while retry_count <= max_retries:
prompt = f"{prompt}\n\nAspect ratio: {aspect_ratio}"
client = _ensure_client()
if client is None:
logger.warning("Gemini client not available or API key missing")
return []
retry = 0
delay = initial_retry_delay
while retry <= max_retries:
try:
client = genai.Client(api_key=api_key)
contents = (prompt)
logger.info("Sending request to Gemini API")
response = client.models.generate_content(
model="gemini-2.0-flash-exp-image-generation",
contents=contents,
config=types.GenerateContentConfig(
response_modalities=['Text', 'Image']
)
model="gemini-2.5-flash-image-preview",
contents=[prompt],
)
logger.info("Received response from Gemini API")
img_name = None
images_b64: List[str] = []
for part in response.candidates[0].content.parts:
if part.text is not None:
logger.info(f"Received text response: '{part.text[:100]}...'")
print(part.text)
elif part.inline_data is not None:
logger.info("Received image data from Gemini")
image = Image.open(BytesIO((part.inline_data.data)))
# Resize image to match aspect ratio if needed
if aspect_ratio:
current_width, current_height = image.size
target_width = current_width
target_height = current_height
# Calculate target dimensions based on aspect ratio
if aspect_ratio == "16:9":
target_height = int(current_width * 9/16)
elif aspect_ratio == "9:16":
target_width = int(current_height * 9/16)
elif aspect_ratio == "4:3":
target_height = int(current_width * 3/4)
elif aspect_ratio == "3:4":
target_width = int(current_height * 3/4)
elif aspect_ratio == "1:1":
target_size = min(current_width, current_height)
target_width = target_size
target_height = target_size
logger.info(f"Resizing image from {current_width}x{current_height} to {target_width}x{target_height}")
# Create a new image with the target dimensions
resized_image = Image.new('RGB', (target_width, target_height), (255, 255, 255))
# Calculate position to paste the original image
paste_x = (target_width - current_width) // 2
paste_y = (target_height - current_height) // 2
# Paste the original image onto the new canvas
resized_image.paste(image, (paste_x, paste_y))
image = resized_image
if part.text is not None:
img_name = f'{part.text}-gemini-native-image.png'
if getattr(part, 'inline_data', None) is not None:
# part.inline_data.data is bytes (base64 decoded by SDK?)
# Standardize to base64 string for API consumers
raw = part.inline_data.data
if isinstance(raw, bytes):
images_b64.append(base64.b64encode(raw).decode('utf-8'))
else:
img_name = f'gemini-native-image-{datetime.datetime.now().strftime("%Y%m%d-%H%M%S")}.png'
try:
logger.info(f"Saving image to: {img_name}")
image.save(img_name)
# Create a dictionary with the expected format for save_generated_image
img_response = {
"artifacts": [
{
"base64": base64.b64encode(open(img_name, "rb").read()).decode('utf-8')
}
]
}
# Call save_generated_image with the correct format
save_generated_image(img_response)
except Exception as err:
logger.error(f"Failed to save image: {err}")
st.error(f"Failed to save image: {err}")
logger.info(f"Image generation completed. Image name: {img_name}")
return img_name
except Exception as err:
error_message = str(err)
logger.error(f"Error in generate_gemini_image: {err}")
# Check if this is a 503 UNAVAILABLE error
if "503 UNAVAILABLE" in error_message and retry_count < max_retries:
retry_count += 1
logger.info(f"Model is overloaded. Retrying in {retry_delay} seconds (attempt {retry_count}/{max_retries})")
st.warning(f"The image generation service is currently busy. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
# Exponential backoff
retry_delay *= 2
else:
st.error(f"Error generating image: {err}")
return None
# If we've exhausted all retries
st.error("The image generation service is currently unavailable. Please try again later.")
return None
# Some SDKs may already present base64 str
images_b64.append(str(raw))
return images_b64
except Exception as e:
msg = str(e)
logger.warning(f"Gemini image gen error: {msg}")
if "503" in msg and retry < max_retries:
time.sleep(delay)
delay *= 2
retry += 1
continue
return []
def edit_image(image_path, prompt, max_retries=3, initial_retry_delay=2):
def generate_gemini_image(
prompt,
keywords=None,
style=None,
focus=None,
enhance_prompt=True,
max_retries=2,
initial_retry_delay=1.0,
aspect_ratio="9:16",
):
"""
Backward-compatible wrapper that generates a single image file on disk and returns path.
Prefer generate_gemini_images_base64 in new code paths.
"""
logger = logging.getLogger('gemini_image_generator')
images = generate_gemini_images_base64(
prompt,
keywords=keywords,
style=style,
focus=focus,
enhance_prompt=enhance_prompt,
aspect_ratio=aspect_ratio,
max_retries=max_retries,
initial_retry_delay=initial_retry_delay,
)
if not images:
return None
# Persist first image to file for legacy callers
img_b64 = images[0]
img_bytes = base64.b64decode(img_b64)
img = Image.open(BytesIO(img_bytes))
out_name = f'gemini-native-image-{datetime.datetime.now().strftime("%Y%m%d-%H%M%S")}.png'
try:
img.save(out_name)
# Also call save_generated_image to reuse existing pipeline
save_generated_image({"artifacts": [{"base64": img_b64}]})
return out_name
except Exception:
return None
def edit_image(image_path, prompt, max_retries=2, initial_retry_delay=1.0):
"""
- Image editing (text and image to image)
Example prompt: "Edit this image to make it look like a cartoon"
@@ -352,7 +323,9 @@ def edit_image(image_path, prompt, max_retries=3, initial_retry_delay=2):
while retry_count <= max_retries:
try:
client = genai.Client()
client = _ensure_client()
if client is None:
return None
text_input = (prompt)
logger.info("Sending request to Gemini API for image editing")
@@ -367,13 +340,9 @@ def edit_image(image_path, prompt, max_retries=3, initial_retry_delay=2):
edited_img_name = None
for part in response.candidates[0].content.parts:
if part.text is not None:
logger.info(f"Received text response: '{part.text[:100]}...'")
st.write(part.text)
elif part.inline_data is not None:
if getattr(part, 'inline_data', None) is not None:
logger.info("Received edited image data from Gemini")
edited_image = Image.open(BytesIO(part.inline_data.data))
edited_image.show()
# Save the edited image
edited_img_name = f'edited-{os.path.basename(image_path)}'
@@ -394,28 +363,22 @@ def edit_image(image_path, prompt, max_retries=3, initial_retry_delay=2):
save_generated_image(img_response)
except Exception as err:
logger.error(f"Failed to save edited image: {err}")
st.error(f"Failed to save edited image: {err}")
logger.info(f"Image editing completed. Edited image name: {edited_img_name}")
return edited_img_name
except Exception as err:
error_message = str(err)
logger.error(f"Error in edit_image: {err}")
# Check if this is a 503 UNAVAILABLE error
if "503 UNAVAILABLE" in error_message and retry_count < max_retries:
# Retry on transient 503
if "503" in error_message and retry_count < max_retries:
retry_count += 1
logger.info(f"Model is overloaded. Retrying in {retry_delay} seconds (attempt {retry_count}/{max_retries})")
st.warning(f"The image editing service is currently busy. Retrying in {retry_delay} seconds...")
logger.info(f"Retrying in {retry_delay} seconds (attempt {retry_count}/{max_retries})")
time.sleep(retry_delay)
# Exponential backoff
retry_delay *= 2
else:
st.error(f"Error editing image: {err}")
return None
# If we've exhausted all retries
st.error("The image editing service is currently unavailable. Please try again later.")
return None