AI Researcher and Video Studio implementation complete
This commit is contained in:
403
backend/services/image_studio/format_converter_service.py
Normal file
403
backend/services/image_studio/format_converter_service.py
Normal file
@@ -0,0 +1,403 @@
|
||||
"""Image Format Converter Service for converting between image formats."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import io
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from PIL import Image, ImageCms
|
||||
|
||||
from utils.logger_utils import get_service_logger
|
||||
|
||||
|
||||
logger = get_service_logger("image_studio.format_converter")
|
||||
|
||||
|
||||
@dataclass
|
||||
class FormatConversionRequest:
|
||||
"""Request model for format conversion."""
|
||||
image_base64: str
|
||||
target_format: str # png, jpeg, jpg, webp, gif, bmp, tiff
|
||||
preserve_transparency: bool = True
|
||||
quality: Optional[int] = None # For lossy formats (1-100)
|
||||
color_space: Optional[str] = None # sRGB, Adobe RGB, etc.
|
||||
strip_metadata: bool = False # Keep metadata by default for conversion
|
||||
optimize: bool = True
|
||||
progressive: bool = True # For JPEG
|
||||
|
||||
|
||||
@dataclass
|
||||
class FormatConversionResult:
|
||||
"""Result of format conversion."""
|
||||
success: bool
|
||||
image_base64: str
|
||||
original_format: str
|
||||
target_format: str
|
||||
original_size_kb: float
|
||||
converted_size_kb: float
|
||||
width: int
|
||||
height: int
|
||||
transparency_preserved: bool
|
||||
metadata_preserved: bool
|
||||
color_space: Optional[str] = None
|
||||
|
||||
|
||||
class ImageFormatConverterService:
|
||||
"""Service for converting images between formats."""
|
||||
|
||||
SUPPORTED_FORMATS = {
|
||||
"png": {
|
||||
"name": "PNG",
|
||||
"description": "Lossless format with transparency support",
|
||||
"supports_transparency": True,
|
||||
"supports_lossy": False,
|
||||
"mime_type": "image/png",
|
||||
},
|
||||
"jpeg": {
|
||||
"name": "JPEG",
|
||||
"description": "Lossy format, best for photos",
|
||||
"supports_transparency": False,
|
||||
"supports_lossy": True,
|
||||
"mime_type": "image/jpeg",
|
||||
},
|
||||
"jpg": {
|
||||
"name": "JPEG",
|
||||
"description": "Lossy format, best for photos",
|
||||
"supports_transparency": False,
|
||||
"supports_lossy": True,
|
||||
"mime_type": "image/jpeg",
|
||||
},
|
||||
"webp": {
|
||||
"name": "WebP",
|
||||
"description": "Modern format with excellent compression",
|
||||
"supports_transparency": True,
|
||||
"supports_lossy": True,
|
||||
"mime_type": "image/webp",
|
||||
},
|
||||
"gif": {
|
||||
"name": "GIF",
|
||||
"description": "Supports animation and transparency",
|
||||
"supports_transparency": True,
|
||||
"supports_lossy": False,
|
||||
"mime_type": "image/gif",
|
||||
},
|
||||
"bmp": {
|
||||
"name": "BMP",
|
||||
"description": "Uncompressed bitmap format",
|
||||
"supports_transparency": False,
|
||||
"supports_lossy": False,
|
||||
"mime_type": "image/bmp",
|
||||
},
|
||||
"tiff": {
|
||||
"name": "TIFF",
|
||||
"description": "High-quality format for print",
|
||||
"supports_transparency": True,
|
||||
"supports_lossy": False,
|
||||
"mime_type": "image/tiff",
|
||||
},
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
logger.info("[Format Converter] ImageFormatConverterService initialized")
|
||||
|
||||
def _decode_image(self, image_base64: str) -> tuple[Image.Image, int, str]:
|
||||
"""Decode base64 image and return PIL Image, size, and format."""
|
||||
# Handle data URL format
|
||||
if "," in image_base64:
|
||||
image_base64 = image_base64.split(",", 1)[1]
|
||||
|
||||
image_bytes = base64.b64decode(image_base64)
|
||||
original_size = len(image_bytes)
|
||||
|
||||
image = Image.open(io.BytesIO(image_bytes))
|
||||
original_format = image.format.lower() if image.format else "unknown"
|
||||
|
||||
return image, original_size, original_format
|
||||
|
||||
def _strip_exif(self, image: Image.Image) -> Image.Image:
|
||||
"""Remove EXIF metadata from image."""
|
||||
data = list(image.getdata())
|
||||
image_without_exif = Image.new(image.mode, image.size)
|
||||
image_without_exif.putdata(data)
|
||||
return image_without_exif
|
||||
|
||||
def _convert_color_space(
|
||||
self,
|
||||
image: Image.Image,
|
||||
target_color_space: str,
|
||||
) -> Image.Image:
|
||||
"""Convert image color space."""
|
||||
try:
|
||||
# Get current color space
|
||||
if hasattr(image, 'info') and 'icc_profile' in image.info:
|
||||
# Image has ICC profile
|
||||
try:
|
||||
src_profile = ImageCms.ImageCmsProfile(io.BytesIO(image.info['icc_profile']))
|
||||
if target_color_space.lower() == "srgb":
|
||||
dst_profile = ImageCms.createProfile("sRGB")
|
||||
elif target_color_space.lower() == "adobe rgb":
|
||||
dst_profile = ImageCms.createProfile("Adobe RGB")
|
||||
else:
|
||||
return image # Unknown color space
|
||||
|
||||
transform = ImageCms.ImageCmsTransform(src_profile, dst_profile, image.mode, image.mode)
|
||||
image = ImageCms.applyTransform(image, transform)
|
||||
except Exception as e:
|
||||
logger.warning(f"[Format Converter] Color space conversion failed: {e}")
|
||||
else:
|
||||
# No ICC profile, assume sRGB
|
||||
logger.info("[Format Converter] No ICC profile found, assuming sRGB")
|
||||
except Exception as e:
|
||||
logger.warning(f"[Format Converter] Color space conversion error: {e}")
|
||||
|
||||
return image
|
||||
|
||||
def _convert_image(
|
||||
self,
|
||||
image: Image.Image,
|
||||
target_format: str,
|
||||
quality: Optional[int],
|
||||
preserve_transparency: bool,
|
||||
optimize: bool,
|
||||
progressive: bool,
|
||||
) -> bytes:
|
||||
"""Convert image to target format."""
|
||||
buffer = io.BytesIO()
|
||||
format_lower = target_format.lower()
|
||||
|
||||
# Handle format-specific conversions
|
||||
save_kwargs: Dict[str, Any] = {}
|
||||
|
||||
# Check if source has transparency and target doesn't support it
|
||||
has_transparency = image.mode in ("RGBA", "LA", "P") and (
|
||||
"transparency" in image.info or image.mode == "RGBA"
|
||||
)
|
||||
|
||||
if format_lower in ["jpeg", "jpg"]:
|
||||
# JPEG doesn't support transparency
|
||||
if has_transparency and preserve_transparency:
|
||||
# Convert to RGB, losing transparency
|
||||
if image.mode in ("RGBA", "LA"):
|
||||
# Create white background
|
||||
rgb_image = Image.new("RGB", image.size, (255, 255, 255))
|
||||
if image.mode == "RGBA":
|
||||
rgb_image.paste(image, mask=image.split()[3]) # Use alpha channel as mask
|
||||
else:
|
||||
rgb_image.paste(image)
|
||||
image = rgb_image
|
||||
elif image.mode == "P":
|
||||
image = image.convert("RGB")
|
||||
else:
|
||||
image = image.convert("RGB")
|
||||
|
||||
save_kwargs["format"] = "JPEG"
|
||||
if quality:
|
||||
save_kwargs["quality"] = quality
|
||||
else:
|
||||
save_kwargs["quality"] = 95 # Default high quality
|
||||
save_kwargs["optimize"] = optimize
|
||||
if progressive:
|
||||
save_kwargs["progressive"] = True
|
||||
|
||||
elif format_lower == "png":
|
||||
save_kwargs["format"] = "PNG"
|
||||
save_kwargs["optimize"] = optimize
|
||||
# PNG compression level (0-9)
|
||||
if quality:
|
||||
compress_level = max(0, min(9, (100 - quality) // 11))
|
||||
save_kwargs["compress_level"] = compress_level
|
||||
else:
|
||||
save_kwargs["compress_level"] = 6 # Default
|
||||
|
||||
elif format_lower == "webp":
|
||||
save_kwargs["format"] = "WEBP"
|
||||
if quality:
|
||||
save_kwargs["quality"] = quality
|
||||
else:
|
||||
save_kwargs["quality"] = 80 # Default
|
||||
save_kwargs["method"] = 6 # Best compression
|
||||
if preserve_transparency and has_transparency:
|
||||
# WebP supports transparency
|
||||
if image.mode not in ("RGBA", "LA"):
|
||||
image = image.convert("RGBA")
|
||||
|
||||
elif format_lower == "gif":
|
||||
save_kwargs["format"] = "GIF"
|
||||
# GIF conversion
|
||||
if image.mode != "P":
|
||||
# Convert to palette mode for GIF
|
||||
image = image.convert("P", palette=Image.ADAPTIVE)
|
||||
save_kwargs["optimize"] = optimize
|
||||
if preserve_transparency and has_transparency:
|
||||
save_kwargs["transparency"] = 255 # Preserve transparency
|
||||
|
||||
elif format_lower == "bmp":
|
||||
save_kwargs["format"] = "BMP"
|
||||
if image.mode in ("RGBA", "LA", "P") and has_transparency:
|
||||
# BMP doesn't support transparency, convert to RGB
|
||||
if image.mode == "RGBA":
|
||||
rgb_image = Image.new("RGB", image.size, (255, 255, 255))
|
||||
rgb_image.paste(image, mask=image.split()[3])
|
||||
image = rgb_image
|
||||
else:
|
||||
image = image.convert("RGB")
|
||||
|
||||
elif format_lower == "tiff":
|
||||
save_kwargs["format"] = "TIFF"
|
||||
save_kwargs["compression"] = "tiff_lzw" # Lossless compression
|
||||
if preserve_transparency and has_transparency:
|
||||
# TIFF supports transparency
|
||||
if image.mode not in ("RGBA", "LA"):
|
||||
image = image.convert("RGBA")
|
||||
else:
|
||||
raise ValueError(f"Unsupported target format: {target_format}")
|
||||
|
||||
image.save(buffer, **save_kwargs)
|
||||
return buffer.getvalue()
|
||||
|
||||
async def convert(
|
||||
self,
|
||||
request: FormatConversionRequest,
|
||||
user_id: Optional[str] = None,
|
||||
) -> FormatConversionResult:
|
||||
"""Convert an image to target format."""
|
||||
logger.info(f"[Format Converter] Processing conversion request for user: {user_id}")
|
||||
|
||||
try:
|
||||
# Decode image
|
||||
image, original_size, original_format = self._decode_image(request.image_base64)
|
||||
original_size_kb = original_size / 1024
|
||||
|
||||
logger.info(f"[Format Converter] Original: {original_format}, Target: {request.target_format}, Size: {original_size_kb:.2f} KB")
|
||||
|
||||
# Validate target format
|
||||
format_lower = request.target_format.lower()
|
||||
if format_lower not in self.SUPPORTED_FORMATS:
|
||||
raise ValueError(f"Unsupported format: {request.target_format}. Supported: {list(self.SUPPORTED_FORMATS.keys())}")
|
||||
|
||||
# Check transparency preservation
|
||||
has_transparency = image.mode in ("RGBA", "LA", "P") and (
|
||||
"transparency" in image.info or image.mode == "RGBA"
|
||||
)
|
||||
target_supports_transparency = self.SUPPORTED_FORMATS[format_lower]["supports_transparency"]
|
||||
transparency_preserved = (
|
||||
has_transparency and
|
||||
target_supports_transparency and
|
||||
request.preserve_transparency
|
||||
)
|
||||
|
||||
# Color space conversion
|
||||
if request.color_space:
|
||||
image = self._convert_color_space(image, request.color_space)
|
||||
|
||||
# Strip metadata if requested
|
||||
metadata_preserved = not request.strip_metadata
|
||||
if request.strip_metadata:
|
||||
image = self._strip_exif(image)
|
||||
|
||||
# Convert format
|
||||
converted_bytes = self._convert_image(
|
||||
image,
|
||||
format_lower,
|
||||
request.quality,
|
||||
request.preserve_transparency,
|
||||
request.optimize,
|
||||
request.progressive,
|
||||
)
|
||||
|
||||
converted_size_kb = len(converted_bytes) / 1024
|
||||
|
||||
# Encode result
|
||||
mime_type = self.SUPPORTED_FORMATS[format_lower]["mime_type"]
|
||||
result_base64 = f"data:{mime_type};base64,{base64.b64encode(converted_bytes).decode()}"
|
||||
|
||||
logger.info(f"[Format Converter] Converted: {original_size_kb:.2f}KB → {converted_size_kb:.2f}KB")
|
||||
|
||||
return FormatConversionResult(
|
||||
success=True,
|
||||
image_base64=result_base64,
|
||||
original_format=original_format,
|
||||
target_format=format_lower,
|
||||
original_size_kb=round(original_size_kb, 2),
|
||||
converted_size_kb=round(converted_size_kb, 2),
|
||||
width=image.width,
|
||||
height=image.height,
|
||||
transparency_preserved=transparency_preserved,
|
||||
metadata_preserved=metadata_preserved,
|
||||
color_space=request.color_space,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[Format Converter] Failed to convert image: {e}")
|
||||
raise
|
||||
|
||||
async def convert_batch(
|
||||
self,
|
||||
requests: List[FormatConversionRequest],
|
||||
user_id: Optional[str] = None,
|
||||
) -> List[FormatConversionResult]:
|
||||
"""Convert multiple images."""
|
||||
logger.info(f"[Format Converter] Processing batch of {len(requests)} images for user: {user_id}")
|
||||
|
||||
results = []
|
||||
for i, request in enumerate(requests):
|
||||
try:
|
||||
result = await self.convert(request, user_id)
|
||||
results.append(result)
|
||||
logger.info(f"[Format Converter] Batch item {i+1}/{len(requests)} complete")
|
||||
except Exception as e:
|
||||
logger.error(f"[Format Converter] Batch item {i+1} failed: {e}")
|
||||
results.append(FormatConversionResult(
|
||||
success=False,
|
||||
image_base64="",
|
||||
original_format="",
|
||||
target_format="",
|
||||
original_size_kb=0,
|
||||
converted_size_kb=0,
|
||||
width=0,
|
||||
height=0,
|
||||
transparency_preserved=False,
|
||||
metadata_preserved=False,
|
||||
))
|
||||
|
||||
return results
|
||||
|
||||
def get_supported_formats(self) -> List[Dict[str, Any]]:
|
||||
"""Get list of supported formats with details."""
|
||||
return [
|
||||
{
|
||||
"id": fmt_id,
|
||||
"name": fmt_info["name"],
|
||||
"description": fmt_info["description"],
|
||||
"supports_transparency": fmt_info["supports_transparency"],
|
||||
"supports_lossy": fmt_info["supports_lossy"],
|
||||
"mime_type": fmt_info["mime_type"],
|
||||
}
|
||||
for fmt_id, fmt_info in self.SUPPORTED_FORMATS.items()
|
||||
]
|
||||
|
||||
def get_format_recommendations(self, source_format: str) -> List[Dict[str, Any]]:
|
||||
"""Get format recommendations based on source format."""
|
||||
recommendations = {
|
||||
"png": [
|
||||
{"format": "webp", "reason": "60% smaller file size, maintains transparency"},
|
||||
{"format": "jpeg", "reason": "Best for photos, smaller file size"},
|
||||
],
|
||||
"jpeg": [
|
||||
{"format": "webp", "reason": "25-34% smaller with similar quality"},
|
||||
{"format": "png", "reason": "Lossless, supports transparency"},
|
||||
],
|
||||
"jpg": [
|
||||
{"format": "webp", "reason": "25-34% smaller with similar quality"},
|
||||
{"format": "png", "reason": "Lossless, supports transparency"},
|
||||
],
|
||||
"webp": [
|
||||
{"format": "png", "reason": "Better compatibility, lossless"},
|
||||
{"format": "jpeg", "reason": "Universal compatibility"},
|
||||
],
|
||||
}
|
||||
|
||||
source_lower = source_format.lower()
|
||||
return recommendations.get(source_lower, [])
|
||||
Reference in New Issue
Block a user