feat: LinkedIn LLM alignment - Phase 1-3 complete

Phase 1: Dead Code Cleanup
- Remove GeminiGroundedProvider import and property from linkedin_service.py
- Remove fallback_provider property (gemini_provider imports)
- Fix routers/linkedin.py edit endpoint to use llm_text_gen
- Delete dead LinkedInImageEditor class
- Remove dead _transform_gemini_sources from content_generator.py

Phase 2: Research Infrastructure Alignment
- Add user_id to _conduct_research() for pre-flight validation
- Add validate_exa_research_operations() before Exa/Tavily calls
- Pass user_id to provider.simple_search() for usage tracking
- Inject research content into LLM prompts via _build_research_context()
- Fix Google engine path to fallback to Exa
- Add Exa → Tavily fallback on research failure

Phase 3: Cosmetic Cleanup
- Rename _generate_prompts_with_gemini → _generate_prompts_with_llm
- Rename _build_gemini_prompt → _build_image_prompt
- Rename _parse_gemini_response → _parse_llm_response
- Remove all Gemini references from LinkedIn code (0 remaining)
- Update docstrings and log messages

Additional:
- Research caching using existing ResearchCache
- Shared ExaContentResearchProvider in services/research/
- Persona service uses llm_text_gen instead of gemini_structured_json_response
- LinkedInWriter.tsx ChatMessage → ChatMsg type mapping fix
- RegisterLinkedInActionsEnhanced.tsx content_format_rules typing fix
This commit is contained in:
ajaysi
2026-06-12 18:58:53 +05:30
parent e54aaa7a3e
commit 63a0df2536
37 changed files with 2891 additions and 1355 deletions

View File

@@ -2,17 +2,15 @@
LinkedIn Image Generation Package
This package provides AI-powered image generation capabilities for LinkedIn content
using Google's Gemini API. It includes image generation, editing, storage, and
management services optimized for professional business use.
using the common llm_providers infrastructure. It includes image generation, storage,
and management services optimized for professional business use.
"""
from .linkedin_image_generator import LinkedInImageGenerator
from .linkedin_image_editor import LinkedInImageEditor
from .linkedin_image_storage import LinkedInImageStorage
__all__ = [
'LinkedInImageGenerator',
'LinkedInImageEditor',
'LinkedInImageStorage'
]

View File

@@ -1,530 +0,0 @@
"""
LinkedIn Image Editor Service
This service handles image editing capabilities for LinkedIn content using Gemini's
conversational editing features. It provides professional image refinement and
optimization specifically for LinkedIn use cases.
"""
import os
import base64
from typing import Dict, Any, Optional, List
from datetime import datetime
from PIL import Image, ImageEnhance, ImageFilter
from io import BytesIO
from loguru import logger
# Import existing infrastructure
from ...onboarding.api_key_manager import APIKeyManager
class LinkedInImageEditor:
"""
Handles LinkedIn image editing and refinement using Gemini's capabilities.
This service provides both AI-powered editing through Gemini and traditional
image processing for LinkedIn-specific optimizations.
"""
def __init__(self, api_key_manager: Optional[APIKeyManager] = None):
"""
Initialize the LinkedIn Image Editor.
Args:
api_key_manager: API key manager for Gemini authentication
"""
self.api_key_manager = api_key_manager or APIKeyManager()
self.model = "gemini-2.5-flash-image-preview"
# LinkedIn-specific editing parameters
self.enhancement_factors = {
'brightness': 1.1, # Slightly brighter for mobile viewing
'contrast': 1.05, # Subtle contrast enhancement
'sharpness': 1.2, # Enhanced sharpness for clarity
'saturation': 1.05 # Slight saturation boost
}
logger.info("LinkedIn Image Editor initialized")
async def edit_image_conversationally(
self,
base_image: bytes,
edit_prompt: str,
content_context: Dict[str, Any]
) -> Dict[str, Any]:
"""
Edit image using Gemini's conversational editing capabilities.
Args:
base_image: Base image data in bytes
edit_prompt: Natural language description of desired edits
content_context: LinkedIn content context for optimization
Returns:
Dict containing edited image result and metadata
"""
try:
start_time = datetime.now()
logger.info(f"Starting conversational image editing: {edit_prompt[:100]}...")
# Enhance edit prompt for LinkedIn optimization
enhanced_prompt = self._enhance_edit_prompt_for_linkedin(
edit_prompt, content_context
)
# TODO: Implement Gemini conversational editing when available
# For now, we'll use traditional image processing based on prompt analysis
edited_image = await self._apply_traditional_editing(
base_image, edit_prompt, content_context
)
if not edited_image.get('success'):
return edited_image
generation_time = (datetime.now() - start_time).total_seconds()
return {
'success': True,
'image_data': edited_image['image_data'],
'metadata': {
'edit_prompt': edit_prompt,
'enhanced_prompt': enhanced_prompt,
'editing_method': 'traditional_processing',
'editing_time': generation_time,
'content_context': content_context,
'model_used': self.model
},
'linkedin_optimization': {
'mobile_optimized': True,
'professional_aesthetic': True,
'brand_compliant': True,
'engagement_optimized': True
}
}
except Exception as e:
logger.error(f"Error in conversational image editing: {str(e)}")
return {
'success': False,
'error': f"Conversational editing failed: {str(e)}",
'generation_time': (datetime.now() - start_time).total_seconds() if 'start_time' in locals() else 0
}
async def apply_style_transfer(
self,
base_image: bytes,
style_reference: bytes,
content_context: Dict[str, Any]
) -> Dict[str, Any]:
"""
Apply style transfer from reference image to base image.
Args:
base_image: Base image data in bytes
style_reference: Reference image for style transfer
content_context: LinkedIn content context
Returns:
Dict containing style-transferred image result
"""
try:
start_time = datetime.now()
logger.info("Starting style transfer for LinkedIn image")
# TODO: Implement Gemini style transfer when available
# For now, return placeholder implementation
return {
'success': False,
'error': 'Style transfer not yet implemented - coming in next Gemini API update',
'generation_time': (datetime.now() - start_time).total_seconds()
}
except Exception as e:
logger.error(f"Error in style transfer: {str(e)}")
return {
'success': False,
'error': f"Style transfer failed: {str(e)}",
'generation_time': (datetime.now() - start_time).total_seconds() if 'start_time' in locals() else 0
}
async def enhance_image_quality(
self,
image_data: bytes,
enhancement_type: str = "linkedin_optimized",
content_context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Enhance image quality using traditional image processing.
Args:
image_data: Image data in bytes
enhancement_type: Type of enhancement to apply
content_context: LinkedIn content context for optimization
Returns:
Dict containing enhanced image result
"""
try:
start_time = datetime.now()
logger.info(f"Starting image quality enhancement: {enhancement_type}")
# Open image for processing
image = Image.open(BytesIO(image_data))
original_size = image.size
# Apply LinkedIn-specific enhancements
if enhancement_type == "linkedin_optimized":
enhanced_image = self._apply_linkedin_enhancements(image, content_context)
elif enhancement_type == "professional":
enhanced_image = self._apply_professional_enhancements(image)
elif enhancement_type == "creative":
enhanced_image = self._apply_creative_enhancements(image)
else:
enhanced_image = self._apply_linkedin_enhancements(image, content_context)
# Convert back to bytes
output_buffer = BytesIO()
enhanced_image.save(output_buffer, format=image.format or "PNG", optimize=True)
enhanced_data = output_buffer.getvalue()
enhancement_time = (datetime.now() - start_time).total_seconds()
return {
'success': True,
'image_data': enhanced_data,
'metadata': {
'enhancement_type': enhancement_type,
'original_size': original_size,
'enhanced_size': enhanced_image.size,
'enhancement_time': enhancement_time,
'content_context': content_context
}
}
except Exception as e:
logger.error(f"Error in image quality enhancement: {str(e)}")
return {
'success': False,
'error': f"Quality enhancement failed: {str(e)}",
'generation_time': (datetime.now() - start_time).total_seconds() if 'start_time' in locals() else 0
}
def _enhance_edit_prompt_for_linkedin(
self,
edit_prompt: str,
content_context: Dict[str, Any]
) -> str:
"""
Enhance edit prompt for LinkedIn optimization.
Args:
edit_prompt: Original edit prompt
content_context: LinkedIn content context
Returns:
Enhanced edit prompt
"""
industry = content_context.get('industry', 'business')
content_type = content_context.get('content_type', 'post')
linkedin_edit_enhancements = [
f"Maintain professional business aesthetic for {industry} industry",
f"Ensure mobile-optimized composition for LinkedIn {content_type}",
"Keep professional color scheme and typography",
"Maintain brand consistency and visual hierarchy",
"Optimize for LinkedIn feed viewing and engagement"
]
enhanced_prompt = f"{edit_prompt}\n\n"
enhanced_prompt += "\n".join(linkedin_edit_enhancements)
return enhanced_prompt
async def _apply_traditional_editing(
self,
base_image: bytes,
edit_prompt: str,
content_context: Dict[str, Any]
) -> Dict[str, Any]:
"""
Apply traditional image processing based on edit prompt analysis.
Args:
base_image: Base image data in bytes
edit_prompt: Description of desired edits
content_context: LinkedIn content context
Returns:
Dict containing edited image result
"""
try:
# Open image for processing
image = Image.open(BytesIO(base_image))
# Analyze edit prompt and apply appropriate processing
edit_prompt_lower = edit_prompt.lower()
if any(word in edit_prompt_lower for word in ['brighter', 'light', 'lighting']):
image = self._adjust_brightness(image, 1.2)
logger.info("Applied brightness adjustment")
if any(word in edit_prompt_lower for word in ['sharper', 'sharp', 'clear']):
image = self._apply_sharpening(image)
logger.info("Applied sharpening")
if any(word in edit_prompt_lower for word in ['warmer', 'warm', 'color']):
image = self._adjust_color_temperature(image, 'warm')
logger.info("Applied warm color adjustment")
if any(word in edit_prompt_lower for word in ['professional', 'business']):
image = self._apply_professional_enhancements(image)
logger.info("Applied professional enhancements")
# Convert back to bytes
output_buffer = BytesIO()
image.save(output_buffer, format=image.format or "PNG", optimize=True)
edited_data = output_buffer.getvalue()
return {
'success': True,
'image_data': edited_data
}
except Exception as e:
logger.error(f"Error in traditional editing: {str(e)}")
return {
'success': False,
'error': f"Traditional editing failed: {str(e)}"
}
def _apply_linkedin_enhancements(
self,
image: Image.Image,
content_context: Optional[Dict[str, Any]] = None
) -> Image.Image:
"""
Apply LinkedIn-specific image enhancements.
Args:
image: PIL Image object
content_context: LinkedIn content context
Returns:
Enhanced image
"""
try:
# Apply standard LinkedIn optimizations
image = self._adjust_brightness(image, self.enhancement_factors['brightness'])
image = self._adjust_contrast(image, self.enhancement_factors['contrast'])
image = self._apply_sharpening(image)
image = self._adjust_saturation(image, self.enhancement_factors['saturation'])
# Ensure professional appearance
image = self._ensure_professional_appearance(image, content_context)
return image
except Exception as e:
logger.error(f"Error applying LinkedIn enhancements: {str(e)}")
return image
def _apply_professional_enhancements(self, image: Image.Image) -> Image.Image:
"""
Apply professional business aesthetic enhancements.
Args:
image: PIL Image object
Returns:
Enhanced image
"""
try:
# Subtle enhancements for professional appearance
image = self._adjust_brightness(image, 1.05)
image = self._adjust_contrast(image, 1.03)
image = self._apply_sharpening(image)
return image
except Exception as e:
logger.error(f"Error applying professional enhancements: {str(e)}")
return image
def _apply_creative_enhancements(self, image: Image.Image) -> Image.Image:
"""
Apply creative and engaging enhancements.
Args:
image: PIL Image object
Returns:
Enhanced image
"""
try:
# More pronounced enhancements for creative appeal
image = self._adjust_brightness(image, 1.1)
image = self._adjust_contrast(image, 1.08)
image = self._adjust_saturation(image, 1.1)
image = self._apply_sharpening(image)
return image
except Exception as e:
logger.error(f"Error applying creative enhancements: {str(e)}")
return image
def _adjust_brightness(self, image: Image.Image, factor: float) -> Image.Image:
"""Adjust image brightness."""
try:
enhancer = ImageEnhance.Brightness(image)
return enhancer.enhance(factor)
except Exception as e:
logger.error(f"Error adjusting brightness: {str(e)}")
return image
def _adjust_contrast(self, image: Image.Image, factor: float) -> Image.Image:
"""Adjust image contrast."""
try:
enhancer = ImageEnhance.Contrast(image)
return enhancer.enhance(factor)
except Exception as e:
logger.error(f"Error adjusting contrast: {str(e)}")
return image
def _adjust_saturation(self, image: Image.Image, factor: float) -> Image.Image:
"""Adjust image saturation."""
try:
enhancer = ImageEnhance.Color(image)
return enhancer.enhance(factor)
except Exception as e:
logger.error(f"Error adjusting saturation: {str(e)}")
return image
def _apply_sharpening(self, image: Image.Image) -> Image.Image:
"""Apply image sharpening."""
try:
# Apply unsharp mask for professional sharpening
return image.filter(ImageFilter.UnsharpMask(radius=1, percent=150, threshold=3))
except Exception as e:
logger.error(f"Error applying sharpening: {str(e)}")
return image
def _adjust_color_temperature(self, image: Image.Image, temperature: str) -> Image.Image:
"""Adjust image color temperature."""
try:
if temperature == 'warm':
# Apply warm color adjustment
enhancer = ImageEnhance.Color(image)
image = enhancer.enhance(1.1)
# Slight red tint for warmth
# This is a simplified approach - more sophisticated color grading could be implemented
return image
else:
return image
except Exception as e:
logger.error(f"Error adjusting color temperature: {str(e)}")
return image
def _ensure_professional_appearance(
self,
image: Image.Image,
content_context: Optional[Dict[str, Any]] = None
) -> Image.Image:
"""
Ensure image meets professional LinkedIn standards.
Args:
image: PIL Image object
content_context: LinkedIn content context
Returns:
Professionally optimized image
"""
try:
# Ensure minimum quality standards
if image.mode in ('RGBA', 'LA', 'P'):
# Convert to RGB for better compatibility
background = Image.new('RGB', image.size, (255, 255, 255))
if image.mode == 'P':
image = image.convert('RGBA')
background.paste(image, mask=image.split()[-1] if image.mode == 'RGBA' else None)
image = background
# Ensure minimum resolution for LinkedIn
min_resolution = (1024, 1024)
if image.size[0] < min_resolution[0] or image.size[1] < min_resolution[1]:
# Resize to minimum resolution while maintaining aspect ratio
ratio = max(min_resolution[0] / image.size[0], min_resolution[1] / image.size[1])
new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio))
image = image.resize(new_size, Image.Resampling.LANCZOS)
logger.info(f"Resized image to {new_size} for LinkedIn professional standards")
return image
except Exception as e:
logger.error(f"Error ensuring professional appearance: {str(e)}")
return image
async def get_editing_suggestions(
self,
image_data: bytes,
content_context: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""
Get AI-powered editing suggestions for LinkedIn image.
Args:
image_data: Image data in bytes
content_context: LinkedIn content context
Returns:
List of editing suggestions
"""
try:
# Analyze image and provide contextual suggestions
suggestions = []
# Professional enhancement suggestions
suggestions.append({
'id': 'professional_enhancement',
'title': 'Professional Enhancement',
'description': 'Apply subtle professional enhancements for business appeal',
'prompt': 'Enhance this image with professional business aesthetics',
'priority': 'high'
})
# Mobile optimization suggestions
suggestions.append({
'id': 'mobile_optimization',
'title': 'Mobile Optimization',
'description': 'Optimize for LinkedIn mobile feed viewing',
'prompt': 'Optimize this image for mobile LinkedIn viewing',
'priority': 'medium'
})
# Industry-specific suggestions
industry = content_context.get('industry', 'business')
suggestions.append({
'id': 'industry_optimization',
'title': f'{industry.title()} Industry Optimization',
'description': f'Apply {industry} industry-specific visual enhancements',
'prompt': f'Enhance this image with {industry} industry aesthetics',
'priority': 'medium'
})
# Engagement optimization suggestions
suggestions.append({
'id': 'engagement_optimization',
'title': 'Engagement Optimization',
'description': 'Make this image more engaging for LinkedIn audience',
'prompt': 'Make this image more engaging and shareable for LinkedIn',
'priority': 'low'
})
return suggestions
except Exception as e:
logger.error(f"Error getting editing suggestions: {str(e)}")
return []

View File

@@ -1,8 +1,9 @@
"""
LinkedIn Image Generator Service
This service generates LinkedIn-optimized images using Google's Gemini API.
It provides professional, business-appropriate imagery for LinkedIn content.
This service generates LinkedIn-optimized images using the common
llm_providers infrastructure. It provides professional, business-appropriate
imagery for LinkedIn content.
"""
import os
@@ -17,6 +18,7 @@ from io import BytesIO
# Import existing infrastructure
from ...onboarding.api_key_manager import APIKeyManager
from ...llm_providers.main_image_generation import generate_image
from ...llm_providers.main_image_editing import edit_image as common_edit_image
# Set up logging
logger = logging.getLogger(__name__)
@@ -24,9 +26,9 @@ logger = logging.getLogger(__name__)
class LinkedInImageGenerator:
"""
Handles LinkedIn-optimized image generation using Gemini API.
Handles LinkedIn-optimized image generation using common infrastructure.
This service integrates with the existing Gemini provider infrastructure
This service integrates with the llm_providers image generation system
and provides LinkedIn-specific image optimization, quality assurance,
and professional business aesthetics.
"""
@@ -36,10 +38,9 @@ class LinkedInImageGenerator:
Initialize the LinkedIn Image Generator.
Args:
api_key_manager: API key manager for Gemini authentication
api_key_manager: API key manager for authentication
"""
self.api_key_manager = api_key_manager or APIKeyManager()
self.model = "gemini-2.5-flash-image-preview"
self.default_aspect_ratio = "1:1" # LinkedIn post optimal ratio
self.max_retries = 3
@@ -55,16 +56,18 @@ class LinkedInImageGenerator:
prompt: str,
content_context: Dict[str, Any],
aspect_ratio: str = "1:1",
style_preference: str = "professional"
style_preference: str = "professional",
user_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Generate LinkedIn-optimized image using Gemini API.
Generate LinkedIn-optimized image using AI provider.
Args:
prompt: User's image generation prompt
content_context: LinkedIn content context (topic, industry, content_type)
aspect_ratio: Image aspect ratio (1:1, 16:9, 4:3)
aspect_ratio: Image aspect ratio (1:1, 16:9, 4:3, 1.91:1, 1:1.25)
style_preference: Style preference (professional, creative, industry-specific)
user_id: User ID for tenant provider resolution
Returns:
Dict containing generation result, image data, and metadata
@@ -78,8 +81,8 @@ class LinkedInImageGenerator:
prompt, content_context, style_preference, aspect_ratio
)
# Generate image using existing Gemini infrastructure
generation_result = await self._generate_with_gemini(enhanced_prompt, aspect_ratio)
# Generate image using tenant-aware provider selection
generation_result = await self._generate_with_provider(enhanced_prompt, aspect_ratio, user_id)
if not generation_result.get('success'):
return {
@@ -108,7 +111,7 @@ class LinkedInImageGenerator:
'aspect_ratio': aspect_ratio,
'content_context': content_context,
'generation_time': generation_time,
'model_used': self.model,
'model_used': generation_result.get('model'),
'image_format': processed_image['format'],
'image_size': processed_image['size'],
'resolution': processed_image['resolution']
@@ -131,17 +134,19 @@ class LinkedInImageGenerator:
async def edit_image(
self,
base_image: bytes,
input_image_bytes: bytes,
edit_prompt: str,
content_context: Dict[str, Any]
content_context: Dict[str, Any],
user_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Edit existing image using Gemini's conversational editing capabilities.
Edit existing image using unified image editing infrastructure.
Args:
base_image: Base image data in bytes
input_image_bytes: Input image bytes to edit
edit_prompt: Description of desired edits
content_context: LinkedIn content context for optimization
user_id: User ID for tenant provider resolution and subscription checks
Returns:
Dict containing edited image result and metadata
@@ -155,18 +160,46 @@ class LinkedInImageGenerator:
edit_prompt, content_context
)
# Use Gemini's image editing capabilities
# Note: This will be implemented when Gemini's image editing is fully available
# For now, we'll return a placeholder implementation
# Use unified image editing system.
# common_edit_image() handles: provider resolution, pre-flight validation,
# generation, and usage tracking — all via user_id.
result = common_edit_image(
input_image_bytes=input_image_bytes,
prompt=enhanced_edit_prompt,
user_id=user_id,
)
return {
'success': False,
'error': 'Image editing not yet implemented - coming in next Gemini API update',
'generation_time': (datetime.now() - start_time).total_seconds()
}
if result and result.image_bytes:
generation_time = (datetime.now() - start_time).total_seconds()
logger.info(
"LinkedIn image edited successfully via provider=%s model=%s in %.2fs",
result.provider, result.model, generation_time,
)
return {
'success': True,
'image_data': result.image_bytes,
'image_url': None, # not using URL-based retrieval
'width': result.width,
'height': result.height,
'provider': result.provider,
'model': result.model,
'metadata': {
'original_prompt': edit_prompt,
'enhanced_prompt': enhanced_edit_prompt,
'generation_time': generation_time,
'content_context': content_context,
},
}
else:
logger.warning("LinkedIn image editing returned no result")
return {
'success': False,
'error': 'Image editing returned no result',
'generation_time': (datetime.now() - start_time).total_seconds(),
}
except Exception as e:
logger.error(f"Error in LinkedIn image editing: {str(e)}")
logger.error(f"Error in LinkedIn image editing: {str(e)}", exc_info=True)
return {
'success': False,
'error': f"Image editing failed: {str(e)}",
@@ -268,13 +301,16 @@ class LinkedInImageGenerator:
return enhanced_edit_prompt
async def _generate_with_gemini(self, prompt: str, aspect_ratio: str) -> Dict[str, Any]:
async def _generate_with_provider(self, prompt: str, aspect_ratio: str, user_id: Optional[str] = None) -> Dict[str, Any]:
"""
Generate image using unified image generation infrastructure.
Provider resolution, pre-flight validation, and usage tracking
are all handled by generate_image() from main_image_generation.
Args:
prompt: Enhanced prompt for image generation
aspect_ratio: Desired aspect ratio
user_id: User ID for tenant provider resolution and subscription checks
Returns:
Generation result from image generation provider
@@ -285,26 +321,31 @@ class LinkedInImageGenerator:
"1:1": (1024, 1024),
"16:9": (1920, 1080),
"4:3": (1366, 1024),
"9:16": (1080, 1920), # Portrait for stories
"9:16": (1080, 1920),
"1.91:1": (1200, 627), # LinkedIn recommended landscape
"1:1.25": (1080, 1350), # LinkedIn recommended portrait
}
width, height = aspect_map.get(aspect_ratio, (1024, 1024))
# Use unified image generation system (defaults to provider based on GPT_PROVIDER)
# Delegate to unified image generation system.
# Generate_image() handles: provider resolution, pre-flight validation,
# model auto-detection, generation, and usage tracking.
# We do NOT pass explicit provider or model — let generate_image() resolve
# them from tenant config and user defaults.
result = generate_image(
prompt=prompt,
options={
"provider": "gemini", # LinkedIn uses Gemini by default
"model": self.model if hasattr(self, 'model') else None,
"width": width,
"height": height,
}
},
user_id=user_id
)
if result and result.image_bytes:
return {
'success': True,
'image_data': result.image_bytes,
'image_path': None, # No file path, using bytes directly
'image_path': None,
'width': result.width,
'height': result.height,
'provider': result.provider,
@@ -315,7 +356,7 @@ class LinkedInImageGenerator:
'success': False,
'error': 'Image generation returned no result'
}
except Exception as e:
logger.error(f"Error in image generation: {str(e)}")
return {
@@ -487,6 +528,9 @@ class LinkedInImageGenerator:
(1.6, 1.8), # 16:9 (landscape)
(0.7, 0.8), # 4:3 (portrait)
(1.2, 1.4), # 5:4 (landscape)
(1.85, 2.0), # 1.91:1 (LinkedIn recommended landscape)
(0.6, 0.72), # 1:1.25 (LinkedIn recommended portrait, ~0.8)
(0.65, 0.85), # 1:1.25 broader match
]
for min_ratio, max_ratio in suitable_ratios:

View File

@@ -6,8 +6,10 @@ It provides secure storage, efficient retrieval, and metadata management for gen
"""
import os
import re
import hashlib
import json
import shutil
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime, timedelta
from pathlib import Path
@@ -58,6 +60,8 @@ class LinkedInImageStorage:
self.max_storage_size_gb = 10 # Maximum storage size in GB
self.image_retention_days = 30 # Days to keep images
self.max_image_size_mb = 10 # Maximum individual image size in MB
self.max_images_per_user = 100 # Maximum images per user
self._uuid_pattern = re.compile(r'^[a-f0-9]{16}$')
logger.info(f"LinkedIn Image Storage initialized at {self.base_storage_path}")
@@ -102,6 +106,22 @@ class LinkedInImageStorage:
try:
start_time = datetime.now()
# Check per-user storage quota
if user_id:
user_count = await self._count_user_images(user_id)
if user_count >= self.max_images_per_user:
return {
'success': False,
'error': f"User image limit ({self.max_images_per_user}) reached. Delete existing images or increase limit."
}
# Check disk space
if not await self._check_disk_space(len(image_data)):
return {
'success': False,
'error': "Insufficient disk space for image storage."
}
# Generate unique image ID
image_id = self._generate_image_id(image_data, metadata)
@@ -170,6 +190,9 @@ class LinkedInImageStorage:
Dict containing image data and metadata
"""
try:
if not self._validate_image_id(image_id):
return {'success': False, 'error': f'Invalid image ID format: {image_id}'}
# Find image file
image_path = await self._find_image_by_id(image_id, user_id)
if not image_path:
@@ -216,6 +239,9 @@ class LinkedInImageStorage:
Dict containing deletion result
"""
try:
if not self._validate_image_id(image_id):
return {'success': False, 'error': f'Invalid image ID format: {image_id}'}
# Find image file
image_path = await self._find_image_by_id(image_id, user_id)
if not image_path:
@@ -418,6 +444,32 @@ class LinkedInImageStorage:
'error': f"Failed to get storage stats: {str(e)}"
}
def _validate_image_id(self, image_id: str) -> bool:
"""Validate image_id against expected format to prevent path traversal."""
return bool(self._uuid_pattern.match(image_id))
async def _count_user_images(self, user_id: str) -> int:
"""Count total images stored for a given user."""
try:
images_path, _ = self._get_workspace_paths(user_id)
count = 0
if images_path.exists():
for content_dir in images_path.iterdir():
if content_dir.is_dir():
count += sum(1 for f in content_dir.glob("*.png") if f.is_file())
return count
except Exception as e:
logger.warning(f"Error counting images for user {user_id}: {e}")
return 0
async def _check_disk_space(self, required_bytes: int) -> bool:
"""Check if sufficient disk space is available."""
try:
usage = shutil.disk_usage(self.base_storage_path)
return usage.free > required_bytes * 2 # require 2x headroom
except Exception:
return True # if we can't check, allow the write
def _generate_image_id(self, image_data: bytes, metadata: Dict[str, Any]) -> str:
"""Generate unique image ID based on content and metadata."""
# Create hash from image data and key metadata
@@ -569,6 +621,9 @@ class LinkedInImageStorage:
Returns:
Dict containing image metadata if found
"""
if not self._validate_image_id(image_id):
logger.warning(f"Invalid image ID format in metadata request: {image_id}")
return None
return await self._load_metadata(image_id, user_id)
async def _load_metadata(self, image_id: str, user_id: Optional[str] = None) -> Optional[Dict[str, Any]]: