Files
ALwrity/backend/services/blog_writer/retry_utils.py

224 lines
7.2 KiB
Python

"""
Enhanced Retry Utilities for Blog Writer
Provides advanced retry logic with exponential backoff, jitter, retry budgets,
and specific error code handling for different types of API failures.
"""
import asyncio
import random
import time
from typing import Callable, Any, Optional, Dict, List
from dataclasses import dataclass
from loguru import logger
from .exceptions import APIRateLimitException, APITimeoutException
@dataclass
class RetryConfig:
"""Configuration for retry behavior."""
max_attempts: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
max_total_time: float = 300.0 # 5 minutes max total time
retryable_errors: List[str] = None
def __post_init__(self):
if self.retryable_errors is None:
self.retryable_errors = [
"503", "502", "504", # Server errors
"429", # Rate limit
"timeout", "timed out",
"connection", "network",
"overloaded", "busy"
]
class RetryBudget:
"""Tracks retry budget to prevent excessive retries."""
def __init__(self, max_total_time: float):
self.max_total_time = max_total_time
self.start_time = time.time()
self.used_time = 0.0
def can_retry(self) -> bool:
"""Check if we can still retry within budget."""
self.used_time = time.time() - self.start_time
return self.used_time < self.max_total_time
def remaining_time(self) -> float:
"""Get remaining time in budget."""
return max(0, self.max_total_time - self.used_time)
def is_retryable_error(error: Exception, retryable_errors: List[str]) -> bool:
"""Check if an error is retryable based on error message patterns."""
error_str = str(error).lower()
return any(pattern.lower() in error_str for pattern in retryable_errors)
def calculate_delay(attempt: int, config: RetryConfig) -> float:
"""Calculate delay for retry attempt with exponential backoff and jitter."""
# Exponential backoff
delay = config.base_delay * (config.exponential_base ** attempt)
# Cap at max delay
delay = min(delay, config.max_delay)
# Add jitter to prevent thundering herd
if config.jitter:
jitter_range = delay * 0.1 # 10% jitter
delay += random.uniform(-jitter_range, jitter_range)
return max(0, delay)
async def retry_with_backoff(
func: Callable,
config: Optional[RetryConfig] = None,
operation_name: str = "operation",
context: Optional[Dict[str, Any]] = None
) -> Any:
"""
Retry a function with enhanced backoff and budget management.
Args:
func: Async function to retry
config: Retry configuration
operation_name: Name of operation for logging
context: Additional context for logging
Returns:
Function result
Raises:
Last exception if all retries fail
"""
config = config or RetryConfig()
budget = RetryBudget(config.max_total_time)
last_exception = None
for attempt in range(config.max_attempts):
try:
# Check if we're still within budget
if not budget.can_retry():
logger.warning(f"Retry budget exceeded for {operation_name} after {budget.used_time:.2f}s")
break
# Execute the function
result = await func()
logger.info(f"{operation_name} succeeded on attempt {attempt + 1}")
return result
except Exception as e:
last_exception = e
# Check if this is the last attempt
if attempt == config.max_attempts - 1:
logger.error(f"{operation_name} failed after {config.max_attempts} attempts: {str(e)}")
break
# Check if error is retryable
if not is_retryable_error(e, config.retryable_errors):
logger.warning(f"{operation_name} failed with non-retryable error: {str(e)}")
break
# Calculate delay and wait
delay = calculate_delay(attempt, config)
remaining_time = budget.remaining_time()
# Don't wait longer than remaining budget
if delay > remaining_time:
logger.warning(f"Delay {delay:.2f}s exceeds remaining budget {remaining_time:.2f}s for {operation_name}")
break
logger.warning(
f"{operation_name} attempt {attempt + 1} failed: {str(e)}. "
f"Retrying in {delay:.2f}s (attempt {attempt + 2}/{config.max_attempts})"
)
await asyncio.sleep(delay)
# If we get here, all retries failed
if last_exception:
# Enhance exception with retry context
if isinstance(last_exception, Exception):
error_str = str(last_exception)
if "429" in error_str or "rate limit" in error_str.lower():
raise APIRateLimitException(
f"Rate limit exceeded after {config.max_attempts} attempts",
retry_after=int(delay * 2), # Suggest waiting longer
context=context
)
elif "timeout" in error_str.lower():
raise APITimeoutException(
f"Request timed out after {config.max_attempts} attempts",
timeout_seconds=int(config.max_total_time),
context=context
)
raise last_exception
raise Exception(f"{operation_name} failed after {config.max_attempts} attempts")
def retry_decorator(
config: Optional[RetryConfig] = None,
operation_name: Optional[str] = None
):
"""
Decorator to add retry logic to async functions.
Args:
config: Retry configuration
operation_name: Name of operation for logging
"""
def decorator(func: Callable) -> Callable:
async def wrapper(*args, **kwargs):
op_name = operation_name or func.__name__
return await retry_with_backoff(
lambda: func(*args, **kwargs),
config=config,
operation_name=op_name
)
return wrapper
return decorator
# Predefined retry configurations for different operation types
RESEARCH_RETRY_CONFIG = RetryConfig(
max_attempts=3,
base_delay=2.0,
max_delay=30.0,
max_total_time=180.0, # 3 minutes for research
retryable_errors=["503", "429", "timeout", "overloaded", "connection"]
)
OUTLINE_RETRY_CONFIG = RetryConfig(
max_attempts=2,
base_delay=1.5,
max_delay=20.0,
max_total_time=120.0, # 2 minutes for outline
retryable_errors=["503", "429", "timeout", "overloaded"]
)
CONTENT_RETRY_CONFIG = RetryConfig(
max_attempts=3,
base_delay=1.0,
max_delay=15.0,
max_total_time=90.0, # 1.5 minutes for content
retryable_errors=["503", "429", "timeout", "overloaded"]
)
SEO_RETRY_CONFIG = RetryConfig(
max_attempts=2,
base_delay=1.0,
max_delay=10.0,
max_total_time=60.0, # 1 minute for SEO
retryable_errors=["503", "429", "timeout"]
)