265 lines
12 KiB
Python
265 lines
12 KiB
Python
"""
|
|
Renewal History Retention Service
|
|
Manages retention policies for subscription renewal history records.
|
|
|
|
Retention Policy:
|
|
- 0-12 months: Full records with usage snapshots
|
|
- 12-24 months: Full records (compressed/removed usage snapshots)
|
|
- 24-84 months: Summary records (no usage snapshots, payment data only)
|
|
- 84+ months: Mark for archive (payment data preserved indefinitely)
|
|
"""
|
|
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func, desc
|
|
from loguru import logger
|
|
import json
|
|
|
|
from models.subscription_models import SubscriptionRenewalHistory
|
|
|
|
|
|
class RenewalHistoryRetentionService:
|
|
"""Service for managing renewal history retention policies."""
|
|
|
|
# Retention periods (in days)
|
|
COMPRESS_SNAPSHOT_DAYS = 365 # 12 months - compress/remove usage snapshots
|
|
SUMMARY_RECORDS_DAYS = 730 # 24 months - create summary records
|
|
ARCHIVE_DAYS = 2555 # 84 months (7 years) - mark for archive
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def check_and_apply_retention(self, user_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Check and apply retention policies for renewal history.
|
|
|
|
Applies retention in stages:
|
|
1. Compress usage snapshots for records 12-24 months old
|
|
2. Create summary records for records 24-84 months old
|
|
3. Mark records older than 84 months for archive
|
|
|
|
Returns:
|
|
Dict with retention status and statistics
|
|
"""
|
|
try:
|
|
now = datetime.utcnow()
|
|
compress_cutoff = now - timedelta(days=self.COMPRESS_SNAPSHOT_DAYS)
|
|
summary_cutoff = now - timedelta(days=self.SUMMARY_RECORDS_DAYS)
|
|
archive_cutoff = now - timedelta(days=self.ARCHIVE_DAYS)
|
|
|
|
# Count records in each retention tier
|
|
total_count = self.db.query(func.count(SubscriptionRenewalHistory.id)).filter(
|
|
SubscriptionRenewalHistory.user_id == user_id
|
|
).scalar() or 0
|
|
|
|
records_to_compress = self.db.query(SubscriptionRenewalHistory).filter(
|
|
SubscriptionRenewalHistory.user_id == user_id,
|
|
SubscriptionRenewalHistory.created_at < compress_cutoff,
|
|
SubscriptionRenewalHistory.created_at >= summary_cutoff,
|
|
SubscriptionRenewalHistory.usage_before_renewal.isnot(None) # Has snapshot to compress
|
|
).all()
|
|
|
|
records_to_summarize = self.db.query(SubscriptionRenewalHistory).filter(
|
|
SubscriptionRenewalHistory.user_id == user_id,
|
|
SubscriptionRenewalHistory.created_at < summary_cutoff,
|
|
SubscriptionRenewalHistory.created_at >= archive_cutoff,
|
|
SubscriptionRenewalHistory.usage_before_renewal.isnot(None) # Has snapshot to remove
|
|
).all()
|
|
|
|
records_to_archive = self.db.query(SubscriptionRenewalHistory).filter(
|
|
SubscriptionRenewalHistory.user_id == user_id,
|
|
SubscriptionRenewalHistory.created_at < archive_cutoff
|
|
).all()
|
|
|
|
# Apply retention policies
|
|
compressed_count = self._compress_usage_snapshots(records_to_compress)
|
|
summarized_count = self._create_summary_records(records_to_summarize)
|
|
archived_count = self._mark_for_archive(records_to_archive)
|
|
|
|
total_processed = compressed_count + summarized_count + archived_count
|
|
|
|
if total_processed == 0:
|
|
return {
|
|
'retention_applied': False,
|
|
'total_records': total_count,
|
|
'records_to_compress': len(records_to_compress),
|
|
'records_to_summarize': len(records_to_summarize),
|
|
'records_to_archive': len(records_to_archive),
|
|
'message': 'No records require retention processing'
|
|
}
|
|
|
|
self.db.commit()
|
|
|
|
logger.info(
|
|
f"[RenewalRetention] Applied retention for user {user_id}: "
|
|
f"{compressed_count} compressed, {summarized_count} summarized, "
|
|
f"{archived_count} archived"
|
|
)
|
|
|
|
return {
|
|
'retention_applied': True,
|
|
'total_records': total_count,
|
|
'compressed_count': compressed_count,
|
|
'summarized_count': summarized_count,
|
|
'archived_count': archived_count,
|
|
'total_processed': total_processed,
|
|
'message': f'Processed {total_processed} records: {compressed_count} compressed, {summarized_count} summarized, {archived_count} archived'
|
|
}
|
|
|
|
except Exception as e:
|
|
self.db.rollback()
|
|
logger.error(f"[RenewalRetention] Error applying retention for user {user_id}: {e}", exc_info=True)
|
|
return {
|
|
'retention_applied': False,
|
|
'error': str(e),
|
|
'message': f'Error applying retention: {str(e)}'
|
|
}
|
|
|
|
def _compress_usage_snapshots(self, records: List[SubscriptionRenewalHistory]) -> int:
|
|
"""
|
|
Compress usage snapshots for records 12-24 months old.
|
|
|
|
Strategy: Replace detailed JSON snapshot with summary statistics only.
|
|
Keeps only essential metrics: total_calls, total_tokens, total_cost.
|
|
"""
|
|
compressed = 0
|
|
for record in records:
|
|
if record.usage_before_renewal:
|
|
try:
|
|
usage_data = record.usage_before_renewal
|
|
|
|
# Handle both dict (SQLAlchemy JSON) and string formats
|
|
if isinstance(usage_data, str):
|
|
try:
|
|
usage_data = json.loads(usage_data)
|
|
except json.JSONDecodeError:
|
|
# If it's not valid JSON, remove it
|
|
record.usage_before_renewal = None
|
|
compressed += 1
|
|
continue
|
|
elif not isinstance(usage_data, dict):
|
|
# If it's not a dict or string, remove it
|
|
record.usage_before_renewal = None
|
|
compressed += 1
|
|
continue
|
|
|
|
# Check if already compressed (has 'compressed_at' key)
|
|
if isinstance(usage_data, dict) and 'compressed_at' in usage_data:
|
|
# Already compressed, skip
|
|
continue
|
|
|
|
# Create compressed summary (keep only key metrics)
|
|
compressed_summary = {
|
|
'total_calls': usage_data.get('total_calls', 0),
|
|
'total_tokens': usage_data.get('total_tokens', 0),
|
|
'total_cost': usage_data.get('total_cost', 0.0),
|
|
'compressed_at': datetime.utcnow().isoformat(),
|
|
'note': 'Usage snapshot compressed after 12 months'
|
|
}
|
|
|
|
record.usage_before_renewal = compressed_summary
|
|
compressed += 1
|
|
|
|
except (TypeError, AttributeError, KeyError) as e:
|
|
logger.warning(f"[RenewalRetention] Failed to compress snapshot for record {record.id}: {e}")
|
|
# If compression fails, remove snapshot entirely
|
|
record.usage_before_renewal = None
|
|
compressed += 1
|
|
|
|
return compressed
|
|
|
|
def _create_summary_records(self, records: List[SubscriptionRenewalHistory]) -> int:
|
|
"""
|
|
Create summary records for records 24-84 months old.
|
|
|
|
Strategy: Remove usage snapshots, keep only payment and subscription data.
|
|
"""
|
|
summarized = 0
|
|
for record in records:
|
|
if record.usage_before_renewal is not None:
|
|
# Remove usage snapshot, keep payment and subscription data
|
|
record.usage_before_renewal = None
|
|
summarized += 1
|
|
|
|
return summarized
|
|
|
|
def _mark_for_archive(self, records: List[SubscriptionRenewalHistory]) -> int:
|
|
"""
|
|
Mark records older than 84 months for archive.
|
|
|
|
Strategy: Ensure usage snapshots are removed, payment data is preserved.
|
|
Note: In future, these could be moved to an archive table.
|
|
"""
|
|
archived = 0
|
|
for record in records:
|
|
# Ensure usage snapshot is removed (should already be done)
|
|
if record.usage_before_renewal is not None:
|
|
record.usage_before_renewal = None
|
|
archived += 1
|
|
else:
|
|
# Already processed, just count
|
|
archived += 1
|
|
|
|
return archived
|
|
|
|
def get_retention_stats(self, user_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get retention statistics for a user's renewal history.
|
|
|
|
Returns breakdown by retention tier.
|
|
"""
|
|
try:
|
|
now = datetime.utcnow()
|
|
compress_cutoff = now - timedelta(days=self.COMPRESS_SNAPSHOT_DAYS)
|
|
summary_cutoff = now - timedelta(days=self.SUMMARY_RECORDS_DAYS)
|
|
archive_cutoff = now - timedelta(days=self.ARCHIVE_DAYS)
|
|
|
|
total = self.db.query(func.count(SubscriptionRenewalHistory.id)).filter(
|
|
SubscriptionRenewalHistory.user_id == user_id
|
|
).scalar() or 0
|
|
|
|
recent = self.db.query(func.count(SubscriptionRenewalHistory.id)).filter(
|
|
SubscriptionRenewalHistory.user_id == user_id,
|
|
SubscriptionRenewalHistory.created_at >= compress_cutoff
|
|
).scalar() or 0
|
|
|
|
to_compress = self.db.query(func.count(SubscriptionRenewalHistory.id)).filter(
|
|
SubscriptionRenewalHistory.user_id == user_id,
|
|
SubscriptionRenewalHistory.created_at < compress_cutoff,
|
|
SubscriptionRenewalHistory.created_at >= summary_cutoff,
|
|
SubscriptionRenewalHistory.usage_before_renewal.isnot(None)
|
|
).scalar() or 0
|
|
|
|
to_summarize = self.db.query(func.count(SubscriptionRenewalHistory.id)).filter(
|
|
SubscriptionRenewalHistory.user_id == user_id,
|
|
SubscriptionRenewalHistory.created_at < summary_cutoff,
|
|
SubscriptionRenewalHistory.created_at >= archive_cutoff,
|
|
SubscriptionRenewalHistory.usage_before_renewal.isnot(None)
|
|
).scalar() or 0
|
|
|
|
to_archive = self.db.query(func.count(SubscriptionRenewalHistory.id)).filter(
|
|
SubscriptionRenewalHistory.user_id == user_id,
|
|
SubscriptionRenewalHistory.created_at < archive_cutoff
|
|
).scalar() or 0
|
|
|
|
return {
|
|
'total_records': total,
|
|
'recent_records': recent, # 0-12 months
|
|
'records_to_compress': to_compress, # 12-24 months
|
|
'records_to_summarize': to_summarize, # 24-84 months
|
|
'records_to_archive': to_archive, # 84+ months
|
|
'retention_policy': {
|
|
'compress_after_days': self.COMPRESS_SNAPSHOT_DAYS,
|
|
'summarize_after_days': self.SUMMARY_RECORDS_DAYS,
|
|
'archive_after_days': self.ARCHIVE_DAYS
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"[RenewalRetention] Error getting retention stats for user {user_id}: {e}", exc_info=True)
|
|
return {
|
|
'error': str(e),
|
|
'total_records': 0
|
|
}
|