Files
ALwrity/backend/services/image_studio/format_converter_service.py

404 lines
16 KiB
Python

"""Image Format Converter Service for converting between image formats."""
from __future__ import annotations
import base64
import io
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from PIL import Image, ImageCms
from utils.logger_utils import get_service_logger
logger = get_service_logger("image_studio.format_converter")
@dataclass
class FormatConversionRequest:
"""Request model for format conversion."""
image_base64: str
target_format: str # png, jpeg, jpg, webp, gif, bmp, tiff
preserve_transparency: bool = True
quality: Optional[int] = None # For lossy formats (1-100)
color_space: Optional[str] = None # sRGB, Adobe RGB, etc.
strip_metadata: bool = False # Keep metadata by default for conversion
optimize: bool = True
progressive: bool = True # For JPEG
@dataclass
class FormatConversionResult:
"""Result of format conversion."""
success: bool
image_base64: str
original_format: str
target_format: str
original_size_kb: float
converted_size_kb: float
width: int
height: int
transparency_preserved: bool
metadata_preserved: bool
color_space: Optional[str] = None
class ImageFormatConverterService:
"""Service for converting images between formats."""
SUPPORTED_FORMATS = {
"png": {
"name": "PNG",
"description": "Lossless format with transparency support",
"supports_transparency": True,
"supports_lossy": False,
"mime_type": "image/png",
},
"jpeg": {
"name": "JPEG",
"description": "Lossy format, best for photos",
"supports_transparency": False,
"supports_lossy": True,
"mime_type": "image/jpeg",
},
"jpg": {
"name": "JPEG",
"description": "Lossy format, best for photos",
"supports_transparency": False,
"supports_lossy": True,
"mime_type": "image/jpeg",
},
"webp": {
"name": "WebP",
"description": "Modern format with excellent compression",
"supports_transparency": True,
"supports_lossy": True,
"mime_type": "image/webp",
},
"gif": {
"name": "GIF",
"description": "Supports animation and transparency",
"supports_transparency": True,
"supports_lossy": False,
"mime_type": "image/gif",
},
"bmp": {
"name": "BMP",
"description": "Uncompressed bitmap format",
"supports_transparency": False,
"supports_lossy": False,
"mime_type": "image/bmp",
},
"tiff": {
"name": "TIFF",
"description": "High-quality format for print",
"supports_transparency": True,
"supports_lossy": False,
"mime_type": "image/tiff",
},
}
def __init__(self):
logger.info("[Format Converter] ImageFormatConverterService initialized")
def _decode_image(self, image_base64: str) -> tuple[Image.Image, int, str]:
"""Decode base64 image and return PIL Image, size, and format."""
# Handle data URL format
if "," in image_base64:
image_base64 = image_base64.split(",", 1)[1]
image_bytes = base64.b64decode(image_base64)
original_size = len(image_bytes)
image = Image.open(io.BytesIO(image_bytes))
original_format = image.format.lower() if image.format else "unknown"
return image, original_size, original_format
def _strip_exif(self, image: Image.Image) -> Image.Image:
"""Remove EXIF metadata from image."""
data = list(image.getdata())
image_without_exif = Image.new(image.mode, image.size)
image_without_exif.putdata(data)
return image_without_exif
def _convert_color_space(
self,
image: Image.Image,
target_color_space: str,
) -> Image.Image:
"""Convert image color space."""
try:
# Get current color space
if hasattr(image, 'info') and 'icc_profile' in image.info:
# Image has ICC profile
try:
src_profile = ImageCms.ImageCmsProfile(io.BytesIO(image.info['icc_profile']))
if target_color_space.lower() == "srgb":
dst_profile = ImageCms.createProfile("sRGB")
elif target_color_space.lower() == "adobe rgb":
dst_profile = ImageCms.createProfile("Adobe RGB")
else:
return image # Unknown color space
transform = ImageCms.ImageCmsTransform(src_profile, dst_profile, image.mode, image.mode)
image = ImageCms.applyTransform(image, transform)
except Exception as e:
logger.warning(f"[Format Converter] Color space conversion failed: {e}")
else:
# No ICC profile, assume sRGB
logger.info("[Format Converter] No ICC profile found, assuming sRGB")
except Exception as e:
logger.warning(f"[Format Converter] Color space conversion error: {e}")
return image
def _convert_image(
self,
image: Image.Image,
target_format: str,
quality: Optional[int],
preserve_transparency: bool,
optimize: bool,
progressive: bool,
) -> bytes:
"""Convert image to target format."""
buffer = io.BytesIO()
format_lower = target_format.lower()
# Handle format-specific conversions
save_kwargs: Dict[str, Any] = {}
# Check if source has transparency and target doesn't support it
has_transparency = image.mode in ("RGBA", "LA", "P") and (
"transparency" in image.info or image.mode == "RGBA"
)
if format_lower in ["jpeg", "jpg"]:
# JPEG doesn't support transparency
if has_transparency and preserve_transparency:
# Convert to RGB, losing transparency
if image.mode in ("RGBA", "LA"):
# Create white background
rgb_image = Image.new("RGB", image.size, (255, 255, 255))
if image.mode == "RGBA":
rgb_image.paste(image, mask=image.split()[3]) # Use alpha channel as mask
else:
rgb_image.paste(image)
image = rgb_image
elif image.mode == "P":
image = image.convert("RGB")
else:
image = image.convert("RGB")
save_kwargs["format"] = "JPEG"
if quality:
save_kwargs["quality"] = quality
else:
save_kwargs["quality"] = 95 # Default high quality
save_kwargs["optimize"] = optimize
if progressive:
save_kwargs["progressive"] = True
elif format_lower == "png":
save_kwargs["format"] = "PNG"
save_kwargs["optimize"] = optimize
# PNG compression level (0-9)
if quality:
compress_level = max(0, min(9, (100 - quality) // 11))
save_kwargs["compress_level"] = compress_level
else:
save_kwargs["compress_level"] = 6 # Default
elif format_lower == "webp":
save_kwargs["format"] = "WEBP"
if quality:
save_kwargs["quality"] = quality
else:
save_kwargs["quality"] = 80 # Default
save_kwargs["method"] = 6 # Best compression
if preserve_transparency and has_transparency:
# WebP supports transparency
if image.mode not in ("RGBA", "LA"):
image = image.convert("RGBA")
elif format_lower == "gif":
save_kwargs["format"] = "GIF"
# GIF conversion
if image.mode != "P":
# Convert to palette mode for GIF
image = image.convert("P", palette=Image.ADAPTIVE)
save_kwargs["optimize"] = optimize
if preserve_transparency and has_transparency:
save_kwargs["transparency"] = 255 # Preserve transparency
elif format_lower == "bmp":
save_kwargs["format"] = "BMP"
if image.mode in ("RGBA", "LA", "P") and has_transparency:
# BMP doesn't support transparency, convert to RGB
if image.mode == "RGBA":
rgb_image = Image.new("RGB", image.size, (255, 255, 255))
rgb_image.paste(image, mask=image.split()[3])
image = rgb_image
else:
image = image.convert("RGB")
elif format_lower == "tiff":
save_kwargs["format"] = "TIFF"
save_kwargs["compression"] = "tiff_lzw" # Lossless compression
if preserve_transparency and has_transparency:
# TIFF supports transparency
if image.mode not in ("RGBA", "LA"):
image = image.convert("RGBA")
else:
raise ValueError(f"Unsupported target format: {target_format}")
image.save(buffer, **save_kwargs)
return buffer.getvalue()
async def convert(
self,
request: FormatConversionRequest,
user_id: Optional[str] = None,
) -> FormatConversionResult:
"""Convert an image to target format."""
logger.info(f"[Format Converter] Processing conversion request for user: {user_id}")
try:
# Decode image
image, original_size, original_format = self._decode_image(request.image_base64)
original_size_kb = original_size / 1024
logger.info(f"[Format Converter] Original: {original_format}, Target: {request.target_format}, Size: {original_size_kb:.2f} KB")
# Validate target format
format_lower = request.target_format.lower()
if format_lower not in self.SUPPORTED_FORMATS:
raise ValueError(f"Unsupported format: {request.target_format}. Supported: {list(self.SUPPORTED_FORMATS.keys())}")
# Check transparency preservation
has_transparency = image.mode in ("RGBA", "LA", "P") and (
"transparency" in image.info or image.mode == "RGBA"
)
target_supports_transparency = self.SUPPORTED_FORMATS[format_lower]["supports_transparency"]
transparency_preserved = (
has_transparency and
target_supports_transparency and
request.preserve_transparency
)
# Color space conversion
if request.color_space:
image = self._convert_color_space(image, request.color_space)
# Strip metadata if requested
metadata_preserved = not request.strip_metadata
if request.strip_metadata:
image = self._strip_exif(image)
# Convert format
converted_bytes = self._convert_image(
image,
format_lower,
request.quality,
request.preserve_transparency,
request.optimize,
request.progressive,
)
converted_size_kb = len(converted_bytes) / 1024
# Encode result
mime_type = self.SUPPORTED_FORMATS[format_lower]["mime_type"]
result_base64 = f"data:{mime_type};base64,{base64.b64encode(converted_bytes).decode()}"
logger.info(f"[Format Converter] Converted: {original_size_kb:.2f}KB → {converted_size_kb:.2f}KB")
return FormatConversionResult(
success=True,
image_base64=result_base64,
original_format=original_format,
target_format=format_lower,
original_size_kb=round(original_size_kb, 2),
converted_size_kb=round(converted_size_kb, 2),
width=image.width,
height=image.height,
transparency_preserved=transparency_preserved,
metadata_preserved=metadata_preserved,
color_space=request.color_space,
)
except Exception as e:
logger.error(f"[Format Converter] Failed to convert image: {e}")
raise
async def convert_batch(
self,
requests: List[FormatConversionRequest],
user_id: Optional[str] = None,
) -> List[FormatConversionResult]:
"""Convert multiple images."""
logger.info(f"[Format Converter] Processing batch of {len(requests)} images for user: {user_id}")
results = []
for i, request in enumerate(requests):
try:
result = await self.convert(request, user_id)
results.append(result)
logger.info(f"[Format Converter] Batch item {i+1}/{len(requests)} complete")
except Exception as e:
logger.error(f"[Format Converter] Batch item {i+1} failed: {e}")
results.append(FormatConversionResult(
success=False,
image_base64="",
original_format="",
target_format="",
original_size_kb=0,
converted_size_kb=0,
width=0,
height=0,
transparency_preserved=False,
metadata_preserved=False,
))
return results
def get_supported_formats(self) -> List[Dict[str, Any]]:
"""Get list of supported formats with details."""
return [
{
"id": fmt_id,
"name": fmt_info["name"],
"description": fmt_info["description"],
"supports_transparency": fmt_info["supports_transparency"],
"supports_lossy": fmt_info["supports_lossy"],
"mime_type": fmt_info["mime_type"],
}
for fmt_id, fmt_info in self.SUPPORTED_FORMATS.items()
]
def get_format_recommendations(self, source_format: str) -> List[Dict[str, Any]]:
"""Get format recommendations based on source format."""
recommendations = {
"png": [
{"format": "webp", "reason": "60% smaller file size, maintains transparency"},
{"format": "jpeg", "reason": "Best for photos, smaller file size"},
],
"jpeg": [
{"format": "webp", "reason": "25-34% smaller with similar quality"},
{"format": "png", "reason": "Lossless, supports transparency"},
],
"jpg": [
{"format": "webp", "reason": "25-34% smaller with similar quality"},
{"format": "png", "reason": "Lossless, supports transparency"},
],
"webp": [
{"format": "png", "reason": "Better compatibility, lossless"},
{"format": "jpeg", "reason": "Universal compatibility"},
],
}
source_lower = source_format.lower()
return recommendations.get(source_lower, [])