404 lines
16 KiB
Python
404 lines
16 KiB
Python
"""Image Format Converter Service for converting between image formats."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import io
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from PIL import Image, ImageCms
|
|
|
|
from utils.logger_utils import get_service_logger
|
|
|
|
|
|
logger = get_service_logger("image_studio.format_converter")
|
|
|
|
|
|
@dataclass
|
|
class FormatConversionRequest:
|
|
"""Request model for format conversion."""
|
|
image_base64: str
|
|
target_format: str # png, jpeg, jpg, webp, gif, bmp, tiff
|
|
preserve_transparency: bool = True
|
|
quality: Optional[int] = None # For lossy formats (1-100)
|
|
color_space: Optional[str] = None # sRGB, Adobe RGB, etc.
|
|
strip_metadata: bool = False # Keep metadata by default for conversion
|
|
optimize: bool = True
|
|
progressive: bool = True # For JPEG
|
|
|
|
|
|
@dataclass
|
|
class FormatConversionResult:
|
|
"""Result of format conversion."""
|
|
success: bool
|
|
image_base64: str
|
|
original_format: str
|
|
target_format: str
|
|
original_size_kb: float
|
|
converted_size_kb: float
|
|
width: int
|
|
height: int
|
|
transparency_preserved: bool
|
|
metadata_preserved: bool
|
|
color_space: Optional[str] = None
|
|
|
|
|
|
class ImageFormatConverterService:
|
|
"""Service for converting images between formats."""
|
|
|
|
SUPPORTED_FORMATS = {
|
|
"png": {
|
|
"name": "PNG",
|
|
"description": "Lossless format with transparency support",
|
|
"supports_transparency": True,
|
|
"supports_lossy": False,
|
|
"mime_type": "image/png",
|
|
},
|
|
"jpeg": {
|
|
"name": "JPEG",
|
|
"description": "Lossy format, best for photos",
|
|
"supports_transparency": False,
|
|
"supports_lossy": True,
|
|
"mime_type": "image/jpeg",
|
|
},
|
|
"jpg": {
|
|
"name": "JPEG",
|
|
"description": "Lossy format, best for photos",
|
|
"supports_transparency": False,
|
|
"supports_lossy": True,
|
|
"mime_type": "image/jpeg",
|
|
},
|
|
"webp": {
|
|
"name": "WebP",
|
|
"description": "Modern format with excellent compression",
|
|
"supports_transparency": True,
|
|
"supports_lossy": True,
|
|
"mime_type": "image/webp",
|
|
},
|
|
"gif": {
|
|
"name": "GIF",
|
|
"description": "Supports animation and transparency",
|
|
"supports_transparency": True,
|
|
"supports_lossy": False,
|
|
"mime_type": "image/gif",
|
|
},
|
|
"bmp": {
|
|
"name": "BMP",
|
|
"description": "Uncompressed bitmap format",
|
|
"supports_transparency": False,
|
|
"supports_lossy": False,
|
|
"mime_type": "image/bmp",
|
|
},
|
|
"tiff": {
|
|
"name": "TIFF",
|
|
"description": "High-quality format for print",
|
|
"supports_transparency": True,
|
|
"supports_lossy": False,
|
|
"mime_type": "image/tiff",
|
|
},
|
|
}
|
|
|
|
def __init__(self):
|
|
logger.info("[Format Converter] ImageFormatConverterService initialized")
|
|
|
|
def _decode_image(self, image_base64: str) -> tuple[Image.Image, int, str]:
|
|
"""Decode base64 image and return PIL Image, size, and format."""
|
|
# Handle data URL format
|
|
if "," in image_base64:
|
|
image_base64 = image_base64.split(",", 1)[1]
|
|
|
|
image_bytes = base64.b64decode(image_base64)
|
|
original_size = len(image_bytes)
|
|
|
|
image = Image.open(io.BytesIO(image_bytes))
|
|
original_format = image.format.lower() if image.format else "unknown"
|
|
|
|
return image, original_size, original_format
|
|
|
|
def _strip_exif(self, image: Image.Image) -> Image.Image:
|
|
"""Remove EXIF metadata from image."""
|
|
data = list(image.getdata())
|
|
image_without_exif = Image.new(image.mode, image.size)
|
|
image_without_exif.putdata(data)
|
|
return image_without_exif
|
|
|
|
def _convert_color_space(
|
|
self,
|
|
image: Image.Image,
|
|
target_color_space: str,
|
|
) -> Image.Image:
|
|
"""Convert image color space."""
|
|
try:
|
|
# Get current color space
|
|
if hasattr(image, 'info') and 'icc_profile' in image.info:
|
|
# Image has ICC profile
|
|
try:
|
|
src_profile = ImageCms.ImageCmsProfile(io.BytesIO(image.info['icc_profile']))
|
|
if target_color_space.lower() == "srgb":
|
|
dst_profile = ImageCms.createProfile("sRGB")
|
|
elif target_color_space.lower() == "adobe rgb":
|
|
dst_profile = ImageCms.createProfile("Adobe RGB")
|
|
else:
|
|
return image # Unknown color space
|
|
|
|
transform = ImageCms.ImageCmsTransform(src_profile, dst_profile, image.mode, image.mode)
|
|
image = ImageCms.applyTransform(image, transform)
|
|
except Exception as e:
|
|
logger.warning(f"[Format Converter] Color space conversion failed: {e}")
|
|
else:
|
|
# No ICC profile, assume sRGB
|
|
logger.info("[Format Converter] No ICC profile found, assuming sRGB")
|
|
except Exception as e:
|
|
logger.warning(f"[Format Converter] Color space conversion error: {e}")
|
|
|
|
return image
|
|
|
|
def _convert_image(
|
|
self,
|
|
image: Image.Image,
|
|
target_format: str,
|
|
quality: Optional[int],
|
|
preserve_transparency: bool,
|
|
optimize: bool,
|
|
progressive: bool,
|
|
) -> bytes:
|
|
"""Convert image to target format."""
|
|
buffer = io.BytesIO()
|
|
format_lower = target_format.lower()
|
|
|
|
# Handle format-specific conversions
|
|
save_kwargs: Dict[str, Any] = {}
|
|
|
|
# Check if source has transparency and target doesn't support it
|
|
has_transparency = image.mode in ("RGBA", "LA", "P") and (
|
|
"transparency" in image.info or image.mode == "RGBA"
|
|
)
|
|
|
|
if format_lower in ["jpeg", "jpg"]:
|
|
# JPEG doesn't support transparency
|
|
if has_transparency and preserve_transparency:
|
|
# Convert to RGB, losing transparency
|
|
if image.mode in ("RGBA", "LA"):
|
|
# Create white background
|
|
rgb_image = Image.new("RGB", image.size, (255, 255, 255))
|
|
if image.mode == "RGBA":
|
|
rgb_image.paste(image, mask=image.split()[3]) # Use alpha channel as mask
|
|
else:
|
|
rgb_image.paste(image)
|
|
image = rgb_image
|
|
elif image.mode == "P":
|
|
image = image.convert("RGB")
|
|
else:
|
|
image = image.convert("RGB")
|
|
|
|
save_kwargs["format"] = "JPEG"
|
|
if quality:
|
|
save_kwargs["quality"] = quality
|
|
else:
|
|
save_kwargs["quality"] = 95 # Default high quality
|
|
save_kwargs["optimize"] = optimize
|
|
if progressive:
|
|
save_kwargs["progressive"] = True
|
|
|
|
elif format_lower == "png":
|
|
save_kwargs["format"] = "PNG"
|
|
save_kwargs["optimize"] = optimize
|
|
# PNG compression level (0-9)
|
|
if quality:
|
|
compress_level = max(0, min(9, (100 - quality) // 11))
|
|
save_kwargs["compress_level"] = compress_level
|
|
else:
|
|
save_kwargs["compress_level"] = 6 # Default
|
|
|
|
elif format_lower == "webp":
|
|
save_kwargs["format"] = "WEBP"
|
|
if quality:
|
|
save_kwargs["quality"] = quality
|
|
else:
|
|
save_kwargs["quality"] = 80 # Default
|
|
save_kwargs["method"] = 6 # Best compression
|
|
if preserve_transparency and has_transparency:
|
|
# WebP supports transparency
|
|
if image.mode not in ("RGBA", "LA"):
|
|
image = image.convert("RGBA")
|
|
|
|
elif format_lower == "gif":
|
|
save_kwargs["format"] = "GIF"
|
|
# GIF conversion
|
|
if image.mode != "P":
|
|
# Convert to palette mode for GIF
|
|
image = image.convert("P", palette=Image.ADAPTIVE)
|
|
save_kwargs["optimize"] = optimize
|
|
if preserve_transparency and has_transparency:
|
|
save_kwargs["transparency"] = 255 # Preserve transparency
|
|
|
|
elif format_lower == "bmp":
|
|
save_kwargs["format"] = "BMP"
|
|
if image.mode in ("RGBA", "LA", "P") and has_transparency:
|
|
# BMP doesn't support transparency, convert to RGB
|
|
if image.mode == "RGBA":
|
|
rgb_image = Image.new("RGB", image.size, (255, 255, 255))
|
|
rgb_image.paste(image, mask=image.split()[3])
|
|
image = rgb_image
|
|
else:
|
|
image = image.convert("RGB")
|
|
|
|
elif format_lower == "tiff":
|
|
save_kwargs["format"] = "TIFF"
|
|
save_kwargs["compression"] = "tiff_lzw" # Lossless compression
|
|
if preserve_transparency and has_transparency:
|
|
# TIFF supports transparency
|
|
if image.mode not in ("RGBA", "LA"):
|
|
image = image.convert("RGBA")
|
|
else:
|
|
raise ValueError(f"Unsupported target format: {target_format}")
|
|
|
|
image.save(buffer, **save_kwargs)
|
|
return buffer.getvalue()
|
|
|
|
async def convert(
|
|
self,
|
|
request: FormatConversionRequest,
|
|
user_id: Optional[str] = None,
|
|
) -> FormatConversionResult:
|
|
"""Convert an image to target format."""
|
|
logger.info(f"[Format Converter] Processing conversion request for user: {user_id}")
|
|
|
|
try:
|
|
# Decode image
|
|
image, original_size, original_format = self._decode_image(request.image_base64)
|
|
original_size_kb = original_size / 1024
|
|
|
|
logger.info(f"[Format Converter] Original: {original_format}, Target: {request.target_format}, Size: {original_size_kb:.2f} KB")
|
|
|
|
# Validate target format
|
|
format_lower = request.target_format.lower()
|
|
if format_lower not in self.SUPPORTED_FORMATS:
|
|
raise ValueError(f"Unsupported format: {request.target_format}. Supported: {list(self.SUPPORTED_FORMATS.keys())}")
|
|
|
|
# Check transparency preservation
|
|
has_transparency = image.mode in ("RGBA", "LA", "P") and (
|
|
"transparency" in image.info or image.mode == "RGBA"
|
|
)
|
|
target_supports_transparency = self.SUPPORTED_FORMATS[format_lower]["supports_transparency"]
|
|
transparency_preserved = (
|
|
has_transparency and
|
|
target_supports_transparency and
|
|
request.preserve_transparency
|
|
)
|
|
|
|
# Color space conversion
|
|
if request.color_space:
|
|
image = self._convert_color_space(image, request.color_space)
|
|
|
|
# Strip metadata if requested
|
|
metadata_preserved = not request.strip_metadata
|
|
if request.strip_metadata:
|
|
image = self._strip_exif(image)
|
|
|
|
# Convert format
|
|
converted_bytes = self._convert_image(
|
|
image,
|
|
format_lower,
|
|
request.quality,
|
|
request.preserve_transparency,
|
|
request.optimize,
|
|
request.progressive,
|
|
)
|
|
|
|
converted_size_kb = len(converted_bytes) / 1024
|
|
|
|
# Encode result
|
|
mime_type = self.SUPPORTED_FORMATS[format_lower]["mime_type"]
|
|
result_base64 = f"data:{mime_type};base64,{base64.b64encode(converted_bytes).decode()}"
|
|
|
|
logger.info(f"[Format Converter] Converted: {original_size_kb:.2f}KB → {converted_size_kb:.2f}KB")
|
|
|
|
return FormatConversionResult(
|
|
success=True,
|
|
image_base64=result_base64,
|
|
original_format=original_format,
|
|
target_format=format_lower,
|
|
original_size_kb=round(original_size_kb, 2),
|
|
converted_size_kb=round(converted_size_kb, 2),
|
|
width=image.width,
|
|
height=image.height,
|
|
transparency_preserved=transparency_preserved,
|
|
metadata_preserved=metadata_preserved,
|
|
color_space=request.color_space,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"[Format Converter] Failed to convert image: {e}")
|
|
raise
|
|
|
|
async def convert_batch(
|
|
self,
|
|
requests: List[FormatConversionRequest],
|
|
user_id: Optional[str] = None,
|
|
) -> List[FormatConversionResult]:
|
|
"""Convert multiple images."""
|
|
logger.info(f"[Format Converter] Processing batch of {len(requests)} images for user: {user_id}")
|
|
|
|
results = []
|
|
for i, request in enumerate(requests):
|
|
try:
|
|
result = await self.convert(request, user_id)
|
|
results.append(result)
|
|
logger.info(f"[Format Converter] Batch item {i+1}/{len(requests)} complete")
|
|
except Exception as e:
|
|
logger.error(f"[Format Converter] Batch item {i+1} failed: {e}")
|
|
results.append(FormatConversionResult(
|
|
success=False,
|
|
image_base64="",
|
|
original_format="",
|
|
target_format="",
|
|
original_size_kb=0,
|
|
converted_size_kb=0,
|
|
width=0,
|
|
height=0,
|
|
transparency_preserved=False,
|
|
metadata_preserved=False,
|
|
))
|
|
|
|
return results
|
|
|
|
def get_supported_formats(self) -> List[Dict[str, Any]]:
|
|
"""Get list of supported formats with details."""
|
|
return [
|
|
{
|
|
"id": fmt_id,
|
|
"name": fmt_info["name"],
|
|
"description": fmt_info["description"],
|
|
"supports_transparency": fmt_info["supports_transparency"],
|
|
"supports_lossy": fmt_info["supports_lossy"],
|
|
"mime_type": fmt_info["mime_type"],
|
|
}
|
|
for fmt_id, fmt_info in self.SUPPORTED_FORMATS.items()
|
|
]
|
|
|
|
def get_format_recommendations(self, source_format: str) -> List[Dict[str, Any]]:
|
|
"""Get format recommendations based on source format."""
|
|
recommendations = {
|
|
"png": [
|
|
{"format": "webp", "reason": "60% smaller file size, maintains transparency"},
|
|
{"format": "jpeg", "reason": "Best for photos, smaller file size"},
|
|
],
|
|
"jpeg": [
|
|
{"format": "webp", "reason": "25-34% smaller with similar quality"},
|
|
{"format": "png", "reason": "Lossless, supports transparency"},
|
|
],
|
|
"jpg": [
|
|
{"format": "webp", "reason": "25-34% smaller with similar quality"},
|
|
{"format": "png", "reason": "Lossless, supports transparency"},
|
|
],
|
|
"webp": [
|
|
{"format": "png", "reason": "Better compatibility, lossless"},
|
|
{"format": "jpeg", "reason": "Universal compatibility"},
|
|
],
|
|
}
|
|
|
|
source_lower = source_format.lower()
|
|
return recommendations.get(source_lower, [])
|