From c604dc87ec36b89b3a983ccf46f7dc068894dcec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D9=8A?= Date: Wed, 4 Mar 2026 20:44:04 +0530 Subject: [PATCH] Add Stripe webhook event persistence and idempotency --- backend/models/subscription_models.py | 16 ++++ .../services/subscription/stripe_service.py | 88 ++++++++++++++++--- 2 files changed, 90 insertions(+), 14 deletions(-) diff --git a/backend/models/subscription_models.py b/backend/models/subscription_models.py index 25f9b788..b5dc2392 100644 --- a/backend/models/subscription_models.py +++ b/backend/models/subscription_models.py @@ -129,6 +129,22 @@ class UserSubscription(Base): # Relationships plan = relationship("SubscriptionPlan") + +class ProcessedStripeEvent(Base): + """Tracks Stripe webhook processing to enforce idempotency and aid observability.""" + + __tablename__ = "processed_stripe_events" + + event_id = Column(String(255), primary_key=True) + event_type = Column(String(255), nullable=False) + status = Column(String(50), nullable=False, default="processing") + attempt_count = Column(Integer, nullable=False, default=1) + received_at = Column(DateTime, default=datetime.utcnow, nullable=False) + processing_started_at = Column(DateTime, default=datetime.utcnow, nullable=False) + processed_at = Column(DateTime, nullable=True) + last_error = Column(Text, nullable=True) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + class APIUsageLog(Base): """Detailed log of every API call for billing and monitoring.""" diff --git a/backend/services/subscription/stripe_service.py b/backend/services/subscription/stripe_service.py index afd0e133..475eff65 100644 --- a/backend/services/subscription/stripe_service.py +++ b/backend/services/subscription/stripe_service.py @@ -4,7 +4,8 @@ from typing import Optional, Dict, Any from loguru import logger from fastapi import HTTPException from sqlalchemy.orm import Session -from models.subscription_models import UserSubscription, SubscriptionPlan, SubscriptionTier, BillingCycle, UsageStatus, FraudWarning +from sqlalchemy.exc import IntegrityError +from models.subscription_models import UserSubscription, SubscriptionPlan, SubscriptionTier, BillingCycle, UsageStatus, FraudWarning, ProcessedStripeEvent from services.subscription.pricing_service import PricingService from datetime import datetime @@ -235,23 +236,82 @@ class StripeService: logger.error(f"Invalid signature: {e}") raise HTTPException(status_code=400, detail="Invalid signature") + event_id = event.get("id") event_type = event["type"] data = event["data"]["object"] + if not event_id: + logger.error("Stripe webhook event missing id") + raise HTTPException(status_code=400, detail="Missing event id") + + now = datetime.utcnow() + processed_event = self.db.query(ProcessedStripeEvent).filter( + ProcessedStripeEvent.event_id == event_id + ).first() + + if processed_event and processed_event.status == "processed": + logger.info(f"Skipping already processed Stripe event {event_id}") + return {"status": "success"} + + if processed_event: + processed_event.status = "processing" + processed_event.processing_started_at = now + processed_event.last_error = None + processed_event.attempt_count = (processed_event.attempt_count or 0) + 1 + else: + processed_event = ProcessedStripeEvent( + event_id=event_id, + event_type=event_type, + status="processing", + received_at=now, + processing_started_at=now, + attempt_count=1, + ) + self.db.add(processed_event) + + try: + self.db.commit() + except IntegrityError: + self.db.rollback() + existing_event = self.db.query(ProcessedStripeEvent).filter( + ProcessedStripeEvent.event_id == event_id + ).first() + if existing_event and existing_event.status == "processed": + logger.info(f"Skipping already processed Stripe event {event_id} after race") + return {"status": "success"} + raise + logger.info(f"Received Stripe webhook: {event_type}") - - if event_type == "checkout.session.completed": - await self._handle_checkout_completed(data) - elif event_type == "invoice.payment_succeeded": - await self._handle_invoice_payment_succeeded(data) - elif event_type == "invoice.payment_failed": - await self._handle_invoice_payment_failed(data) - elif event_type == "customer.subscription.updated": - await self._handle_subscription_updated(data) - elif event_type == "customer.subscription.deleted": - await self._handle_subscription_deleted(data) - elif event_type.startswith("radar.early_fraud_warning."): - await self._handle_early_fraud_warning(data) + + try: + if event_type == "checkout.session.completed": + await self._handle_checkout_completed(data) + elif event_type == "invoice.payment_succeeded": + await self._handle_invoice_payment_succeeded(data) + elif event_type == "invoice.payment_failed": + await self._handle_invoice_payment_failed(data) + elif event_type == "customer.subscription.updated": + await self._handle_subscription_updated(data) + elif event_type == "customer.subscription.deleted": + await self._handle_subscription_deleted(data) + elif event_type.startswith("radar.early_fraud_warning."): + await self._handle_early_fraud_warning(data) + + processed_event.status = "processed" + processed_event.processed_at = datetime.utcnow() + processed_event.last_error = None + self.db.commit() + except Exception as e: + self.db.rollback() + failed_event = self.db.query(ProcessedStripeEvent).filter( + ProcessedStripeEvent.event_id == event_id + ).first() + if failed_event: + failed_event.status = "failed" + failed_event.last_error = str(e)[:2000] + failed_event.processed_at = datetime.utcnow() + self.db.commit() + raise return {"status": "success"}