Base code

This commit is contained in:
Kunthawat Greethong
2026-01-08 22:39:53 +07:00
parent 697115c61a
commit c35fa52117
2169 changed files with 626670 additions and 0 deletions

View File

@@ -0,0 +1,246 @@
"""
File Storage Utility
Robust file storage helper for saving generated content assets.
"""
import os
import uuid
from pathlib import Path
from typing import Optional, Tuple
import logging
logger = logging.getLogger(__name__)
# Maximum filename length
MAX_FILENAME_LENGTH = 255
# Allowed characters in filenames (alphanumeric, dash, underscore, dot)
ALLOWED_FILENAME_CHARS = set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.')
def sanitize_filename(filename: str, max_length: int = 100) -> str:
"""
Sanitize filename to be filesystem-safe.
Args:
filename: Original filename
max_length: Maximum length for filename
Returns:
Sanitized filename
"""
if not filename:
return f"file_{uuid.uuid4().hex[:8]}"
# Remove path separators and other dangerous characters
sanitized = "".join(c if c in ALLOWED_FILENAME_CHARS else '_' for c in filename)
# Remove leading/trailing dots and spaces
sanitized = sanitized.strip('. ')
# Ensure it's not empty
if not sanitized:
sanitized = f"file_{uuid.uuid4().hex[:8]}"
# Truncate if too long
if len(sanitized) > max_length:
name, ext = os.path.splitext(sanitized)
max_name_length = max_length - len(ext) - 1
sanitized = name[:max_name_length] + ext
return sanitized
def ensure_directory_exists(directory: Path) -> bool:
"""
Ensure directory exists, creating it if necessary.
Args:
directory: Path to directory
Returns:
True if directory exists or was created, False otherwise
"""
try:
directory.mkdir(parents=True, exist_ok=True)
return True
except Exception as e:
logger.error(f"Failed to create directory {directory}: {e}")
return False
def save_file_safely(
content: bytes,
directory: Path,
filename: str,
max_file_size: int = 100 * 1024 * 1024 # 100MB default
) -> Tuple[Optional[Path], Optional[str]]:
"""
Safely save file content to disk.
Args:
content: File content as bytes
directory: Directory to save file in
filename: Filename (will be sanitized)
max_file_size: Maximum allowed file size in bytes
Returns:
Tuple of (file_path, error_message). file_path is None on error.
"""
try:
# Validate file size
if len(content) > max_file_size:
return None, f"File size {len(content)} exceeds maximum {max_file_size}"
if len(content) == 0:
return None, "File content is empty"
# Ensure directory exists
if not ensure_directory_exists(directory):
return None, f"Failed to create directory: {directory}"
# Sanitize filename
safe_filename = sanitize_filename(filename)
# Construct full path
file_path = directory / safe_filename
# Check if file already exists (unlikely with UUID, but check anyway)
if file_path.exists():
# Add UUID to make it unique
name, ext = os.path.splitext(safe_filename)
safe_filename = f"{name}_{uuid.uuid4().hex[:8]}{ext}"
file_path = directory / safe_filename
# Write file atomically (write to temp file first, then rename)
temp_path = file_path.with_suffix(file_path.suffix + '.tmp')
try:
with open(temp_path, 'wb') as f:
f.write(content)
# Atomic rename
temp_path.replace(file_path)
logger.info(f"Successfully saved file: {file_path} ({len(content)} bytes)")
return file_path, None
except Exception as write_error:
# Clean up temp file if it exists
if temp_path.exists():
try:
temp_path.unlink()
except:
pass
raise write_error
except Exception as e:
logger.error(f"Error saving file: {e}", exc_info=True)
return None, str(e)
def generate_unique_filename(
prefix: str,
extension: str = ".png",
include_uuid: bool = True
) -> str:
"""
Generate a unique filename.
Args:
prefix: Filename prefix
extension: File extension (with or without dot)
include_uuid: Whether to include UUID in filename
Returns:
Unique filename
"""
if not extension.startswith('.'):
extension = '.' + extension
prefix = sanitize_filename(prefix, max_length=50)
if include_uuid:
unique_id = uuid.uuid4().hex[:8]
return f"{prefix}_{unique_id}{extension}"
else:
return f"{prefix}{extension}"
def save_text_file_safely(
content: str,
directory: Path,
filename: str,
encoding: str = 'utf-8',
max_file_size: int = 10 * 1024 * 1024 # 10MB default for text
) -> Tuple[Optional[Path], Optional[str]]:
"""
Safely save text content to disk.
Args:
content: Text content as string
directory: Directory to save file in
filename: Filename (will be sanitized)
encoding: Text encoding (default: utf-8)
max_file_size: Maximum allowed file size in bytes
Returns:
Tuple of (file_path, error_message). file_path is None on error.
"""
try:
# Validate content
if not content or not isinstance(content, str):
return None, "Content must be a non-empty string"
# Convert to bytes for size check
content_bytes = content.encode(encoding)
# Validate file size
if len(content_bytes) > max_file_size:
return None, f"File size {len(content_bytes)} exceeds maximum {max_file_size}"
# Ensure directory exists
if not ensure_directory_exists(directory):
return None, f"Failed to create directory: {directory}"
# Sanitize filename
safe_filename = sanitize_filename(filename)
# Ensure .txt extension if not present
if not safe_filename.endswith(('.txt', '.md', '.json')):
safe_filename = os.path.splitext(safe_filename)[0] + '.txt'
# Construct full path
file_path = directory / safe_filename
# Check if file already exists
if file_path.exists():
# Add UUID to make it unique
name, ext = os.path.splitext(safe_filename)
safe_filename = f"{name}_{uuid.uuid4().hex[:8]}{ext}"
file_path = directory / safe_filename
# Write file atomically (write to temp file first, then rename)
temp_path = file_path.with_suffix(file_path.suffix + '.tmp')
try:
with open(temp_path, 'w', encoding=encoding) as f:
f.write(content)
# Atomic rename
temp_path.replace(file_path)
logger.info(f"Successfully saved text file: {file_path} ({len(content_bytes)} bytes, {len(content)} chars)")
return file_path, None
except Exception as write_error:
# Clean up temp file if it exists
if temp_path.exists():
try:
temp_path.unlink()
except:
pass
raise write_error
except Exception as e:
logger.error(f"Error saving text file: {e}", exc_info=True)
return None, str(e)