← Back to Articles
6/6/2026Admin Post

saga demystified part4 implementation

Part 4: SAGA Deep Dive Implementation - Production Patterns

Series Navigation: Index |
Part 1 |
Part 2 |
Part 3 | Part 4 |
Part 5 - Advanced |
Part 6 - Pitfalls |
Part 7 - Interview


Table of Contents

  1. The Dual-Write Problem
  2. Transactional Outbox Pattern
  3. Outbox: Full Spring Boot Implementation
  4. Outbox with Debezium CDC (Production Grade)
  5. Idempotency: Why It Is Non-Negotiable
  6. Idempotency Implementation Patterns
  7. Retry Strategies and Exponential Backoff
  8. Dead Letter Queue Implementation
  9. Circuit Breaker with SAGAs
  10. Saga Timeout and Deadline Management
  11. Distributed Tracing with AWS X-Ray
  12. Structured Logging for SAGA Events
  13. Complete MySQL Schema Reference
  14. Optimistic vs Pessimistic Locking in SAGAs
  15. Semantic Locking
  16. Production Health Checks and Metrics
  17. Configuration Management
  18. Summary

1. The Dual-Write Problem

This is arguably the most dangerous and frequently misunderstood problem in event-driven sagas.

What Is It?

In a SAGA step, you need to do TWO things:

  1. Write to your database (local transaction)
  2. Publish an event to Kafka/SQS

These are two SEPARATE operations. If the first succeeds and the second fails, your
database has committed data but no downstream service knows about it. The saga stalls.

THE NAIVE (WRONG) APPROACH:

@Transactional
public void processPayment(OrderCreatedEvent event) {
    // Step 1: Write to DB
    payment = paymentRepository.save(new Payment(...));  // commits

    // APPLICATION CRASHES HERE

    // Step 2: Publish event - NEVER EXECUTES
    kafkaTemplate.send("payment.processed", paymentEvent);  // skipped!

    // Result: payment row in DB, no Kafka event
    // Inventory service never knows payment was made
    // Saga is STUCK
}
ANOTHER FAILURE CASE:

@Transactional
public void processPayment(OrderCreatedEvent event) {
    payment = paymentRepository.save(new Payment(...));  // Step 1
    kafkaTemplate.send("payment.processed", event).get();  // Step 2 - Kafka timeout!

    // Kafka call throws exception
    // Spring rolls back the DB transaction
    // But Kafka MIGHT have received and processed the message
    // Kafka is NOT transactional with MySQL
    // Result: no payment in DB, but message might be in Kafka
    // Downstream services process an event for a non-existent payment
}

The root cause: There is NO atomic operation that writes to a relational database AND
a message broker simultaneously. They are fundamentally different systems.


2. Transactional Outbox Pattern

The Outbox pattern solves dual-write by making the database the single source of truth.

The Concept

Instead of writing to the database AND publishing to Kafka in the same transaction,
you write to the database AND write to an outbox table in the SAME transaction.
A separate process reads the outbox table and publishes to Kafka.

BEFORE (dual-write):
  App -----> MySQL (commit)
       -----> Kafka (may fail)

AFTER (outbox):
  App -----> MySQL: { payment_record, outbox_event } (single atomic commit)
  Outbox Publisher -----> reads outbox_events table
                   -----> publishes to Kafka
                   -----> updates outbox_event status to PUBLISHED

Why This Works

The first write (DB + outbox) is one atomic transaction. Either BOTH the payment record
and the outbox event are persisted, or NEITHER is. No split state.

The outbox publisher is a separate process that will keep retrying until the message
is published to Kafka. It is resilient to temporary Kafka failures.

Outbox Guarantees

  • At-Least-Once Delivery: An event may be published more than once (if the
    publisher crashes after publishing but before marking as PUBLISHED). Consumers
    MUST be idempotent.
  • Eventually Published: Events will eventually reach Kafka as long as the
    outbox publisher is running.
  • Ordered per Aggregate: By using the order ID as the Kafka partition key,
    events for the same order are always delivered in order.

3. Outbox: Full Spring Boot Implementation

Outbox Entity

// OutboxEvent.java
package com.example.common.outbox;
 
import jakarta.persistence.*;
import lombok.*;
import java.time.Instant;
 
@Entity
@Table(name = "outbox_events", indexes = {
    @Index(name = "idx_outbox_status_created", columnList = "status, created_at"),
    @Index(name = "idx_outbox_aggregate", columnList = "aggregate_id")
})
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class OutboxEvent {
 
    public enum OutboxStatus {
        PENDING,
        PUBLISHED,
        FAILED
    }
 
    @Id
    @Column(name = "event_id", length = 36)
    private String eventId;
 
    @Column(name = "aggregate_id", nullable = false, length = 36)
    private String aggregateId;  // e.g., orderId - used as Kafka partition key
 
    @Column(name = "aggregate_type", length = 50)
    private String aggregateType;  // e.g., "ORDER"
 
    @Column(name = "topic_name", nullable = false, length = 255)
    private String topicName;
 
    @Column(name = "event_type", nullable = false, length = 100)
    private String eventType;
 
    @Column(name = "payload", nullable = false, columnDefinition = "LONGTEXT")
    private String payload;
 
    @Enumerated(EnumType.STRING)
    @Column(name = "status", nullable = false, length = 20)
    @Builder.Default
    private OutboxStatus status = OutboxStatus.PENDING;
 
    @Column(name = "retry_count")
    @Builder.Default
    private int retryCount = 0;
 
    @Column(name = "error_message", length = 1000)
    private String errorMessage;
 
    @Column(name = "created_at", updatable = false)
    private Instant createdAt;
 
    @Column(name = "published_at")
    private Instant publishedAt;
 
    @PrePersist
    public void prePersist() {
        this.createdAt = Instant.now();
    }
}

Outbox Repository

// OutboxEventRepository.java
package com.example.common.outbox;
 
import org.springframework.data.jpa.repository.*;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.List;
 
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, String> {
 
    @Lock(jakarta.persistence.LockModeType.PESSIMISTIC_WRITE)
    @Query("SELECT o FROM OutboxEvent o WHERE o.status = 'PENDING' ORDER BY o.createdAt ASC")
    @QueryHints(@QueryHint(name = "jakarta.persistence.lock.timeout", value = "0"))
    List<OutboxEvent> findPendingEventsWithLock(org.springframework.data.domain.Pageable pageable);
 
    @Query("SELECT o FROM OutboxEvent o WHERE o.status = 'FAILED' AND o.retryCount < :maxRetries")
    List<OutboxEvent> findRetryableFailedEvents(@Param("maxRetries") int maxRetries,
                                                org.springframework.data.domain.Pageable pageable);
 
    @Modifying
    @Transactional
    @Query("UPDATE OutboxEvent o SET o.status = 'PUBLISHED', o.publishedAt = :now WHERE o.eventId = :eventId")
    void markAsPublished(@Param("eventId") String eventId, @Param("now") Instant now);
 
    @Query("SELECT COUNT(o) FROM OutboxEvent o WHERE o.status = 'PENDING' AND o.createdAt < :threshold")
    long countStuckPendingEvents(@Param("threshold") Instant threshold);
}

Outbox Publisher Service

// OutboxPublisherService.java
package com.example.common.outbox;
 
import io.micrometer.core.instrument.MeterRegistry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.PageRequest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
 
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
 
@Service
@RequiredArgsConstructor
@Slf4j
public class OutboxPublisherService {
 
    private final OutboxEventRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final MeterRegistry meterRegistry;
 
    private static final int BATCH_SIZE = 50;
    private static final int MAX_RETRIES = 5;
 
    /**
     * Polls outbox table every 500ms and publishes pending events.
     * In production, use Debezium CDC instead of polling for lower latency.
     */
    @Scheduled(fixedDelayString = "${outbox.publisher.delay-ms:500}")
    public void publishPendingEvents() {
        List<OutboxEvent> pendingEvents = outboxRepository
            .findPendingEventsWithLock(PageRequest.of(0, BATCH_SIZE));
 
        if (pendingEvents.isEmpty()) {
            return;
        }
 
        log.debug("Processing {} pending outbox events", pendingEvents.size());
 
        for (OutboxEvent event : pendingEvents) {
            publishEvent(event);
        }
    }
 
    /**
     * Separate scheduled job for retrying failed events.
     * Less frequent to avoid hammering Kafka during outages.
     */
    @Scheduled(fixedDelayString = "${outbox.publisher.retry-delay-ms:30000}")
    public void retryFailedEvents() {
        List<OutboxEvent> failedEvents = outboxRepository
            .findRetryableFailedEvents(MAX_RETRIES, PageRequest.of(0, 20));
 
        for (OutboxEvent event : failedEvents) {
            publishEvent(event);
        }
    }
 
    /**
     * Each event is published in its own transaction.
     * If one fails, others can still succeed.
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    protected void publishEvent(OutboxEvent event) {
        try {
            // Synchronous send with timeout for reliability
            kafkaTemplate.send(
                event.getTopicName(),
                event.getAggregateId(),  // Key = orderId for partition ordering
                event.getPayload()
            ).get(10, TimeUnit.SECONDS);
 
            // Mark as published
            event.setStatus(OutboxEvent.OutboxStatus.PUBLISHED);
            event.setPublishedAt(Instant.now());
            outboxRepository.save(event);
 
            meterRegistry.counter("outbox.published",
                "topic", event.getTopicName(),
                "eventType", event.getEventType()).increment();
 
            log.debug("Published outbox event: eventId={}, topic={}, type={}",
                event.getEventId(), event.getTopicName(), event.getEventType());
 
        } catch (Exception e) {
            log.error("Failed to publish outbox event: eventId={}, topic={}, attempt={}",
                event.getEventId(), event.getTopicName(), event.getRetryCount() + 1, e);
 
            event.setRetryCount(event.getRetryCount() + 1);
            event.setErrorMessage(truncate(e.getMessage(), 990));
 
            if (event.getRetryCount() >= MAX_RETRIES) {
                event.setStatus(OutboxEvent.OutboxStatus.FAILED);
                meterRegistry.counter("outbox.failed",
                    "topic", event.getTopicName()).increment();
                log.error("Outbox event marked FAILED after {} attempts: eventId={}",
                    MAX_RETRIES, event.getEventId());
            }
            outboxRepository.save(event);
        }
    }
 
    private String truncate(String str, int maxLength) {
        if (str == null) return null;
        return str.length() <= maxLength ? str : str.substring(0, maxLength);
    }
}

Health Check for Outbox

// OutboxHealthIndicator.java
package com.example.common.outbox;
 
import lombok.RequiredArgsConstructor;
import org.springframework.boot.actuate.health.*;
import org.springframework.stereotype.Component;
 
import java.time.Instant;
import java.time.temporal.ChronoUnit;
 
@Component
@RequiredArgsConstructor
public class OutboxHealthIndicator implements HealthIndicator {
 
    private final OutboxEventRepository outboxRepository;
    private static final long STUCK_THRESHOLD_MINUTES = 10L;
    private static final long MAX_ACCEPTABLE_STUCK = 5L;
 
    @Override
    public Health health() {
        Instant threshold = Instant.now().minus(STUCK_THRESHOLD_MINUTES, ChronoUnit.MINUTES);
        long stuckCount = outboxRepository.countStuckPendingEvents(threshold);
 
        if (stuckCount == 0) {
            return Health.up()
                .withDetail("stuckEvents", 0)
                .build();
        } else if (stuckCount <= MAX_ACCEPTABLE_STUCK) {
            return Health.up()
                .withDetail("stuckEvents", stuckCount)
                .withDetail("warning", "Some events delayed")
                .build();
        } else {
            return Health.down()
                .withDetail("stuckEvents", stuckCount)
                .withDetail("error", "Too many stuck outbox events - Kafka may be unreachable")
                .build();
        }
    }
}

4. Outbox with Debezium CDC (Production Grade)

Debezium is a Change Data Capture (CDC) tool that reads the MySQL binary log (binlog)
and publishes changes to Kafka. This is more reliable than polling and has sub-second latency.

How It Works

MySQL Binary Log -> Debezium Connector -> Kafka Connect -> Kafka Topic

1. App writes to outbox_events table (same transaction as business data)
2. MySQL commits the transaction and writes to binlog
3. Debezium reads binlog changes (INSERT into outbox_events)
4. Debezium publishes the event to the target Kafka topic
5. No polling, no duplicate checking, guaranteed delivery

Debezium Connector Configuration (JSON)

{
  "name": "order-service-outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "rds-mysql-instance.cluster-xyz.us-east-1.rds.amazonaws.com",
    "database.port": "3306",
    "database.user": "${DB_CDC_USER}",
    "database.password": "${DB_CDC_PASSWORD}",
    "database.server.id": "184054",
    "database.server.name": "order-service",
    "database.include.list": "orders_db",
    "table.include.list": "orders_db.outbox_events",
 
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "event_id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.table.field.event.timestamp": "created_at",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.route.by.field": "topic_name",
    "transforms.outbox.route.topic.replacement": "${routedByValue}",
 
    "tombstones.on.delete": "false",
    "snapshot.mode": "initial",
 
    "database.history.kafka.bootstrap.servers": "msk-broker:9092",
    "database.history.kafka.topic": "schema-changes.order-service"
  }
}

MySQL User for CDC (Least Privilege)

-- Create CDC-specific user with minimal permissions
CREATE USER 'debezium_cdc'@'%' IDENTIFIED BY '${SECURE_PASSWORD}';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
  ON *.* TO 'debezium_cdc'@'%';
GRANT SELECT ON orders_db.outbox_events TO 'debezium_cdc'@'%';
FLUSH PRIVILEGES;
 
-- Enable binary logging in MySQL (in AWS RDS: set binlog_format=ROW)
-- This is typically done via RDS Parameter Group

5. Idempotency: Why It Is Non-Negotiable

In a SAGA with at-least-once event delivery, the same event WILL arrive multiple times.
This can happen because:

  • Network retry by the consumer after processing but before acknowledgment
  • Kafka rebalance during consumer group restart
  • Outbox publisher published the same event twice (crash between send and mark-published)
  • Manual replay of events during incident recovery

Every single event handler MUST be idempotent.

Without idempotency, you will:

  • Double-charge customers
  • Double-reserve inventory
  • Double-ship orders
  • Create duplicate compensation actions (double-refund)

6. Idempotency Implementation Patterns

Pattern 1: Database Unique Constraint

The simplest and most reliable approach. Use a database unique constraint on the
natural key of the operation.

// Payment table has: UNIQUE INDEX on order_id
// This prevents two payments for the same order
 
@Transactional
public void processPayment(OrderCreatedEvent event) {
    // This WILL throw a DataIntegrityViolationException if called twice
    // due to UNIQUE INDEX on order_id
    Payment payment = Payment.builder()
        .orderId(event.orderId())
        .amount(event.totalAmount())
        // ...
        .build();
 
    try {
        paymentRepository.save(payment);  // Unique constraint enforced here
    } catch (DataIntegrityViolationException e) {
        // Already processed - this is OK, just log and return
        log.info("Payment already exists for orderId={} (idempotent), skipping",
            event.orderId());
        return;
    }
 
    // Continue processing...
}
-- Enforce uniqueness at database level
ALTER TABLE payments ADD UNIQUE INDEX idx_payments_order_id_unique (order_id);

Pattern 2: Explicit Idempotency Check

Check before writing. Works for compensations where the natural key check is less obvious.

@Transactional
public void refundPayment(String orderId, String reason) {
    Payment payment = paymentRepository.findByOrderId(orderId)
        .orElseThrow(() -> new PaymentNotFoundException(orderId));
 
    // EXPLICIT CHECK: already in terminal compensation state?
    if (payment.getStatus() == PaymentStatus.REFUNDED) {
        log.info("Payment for orderId={} already refunded (idempotent check)", orderId);
        return;  // Safe to return - compensation was already done
    }
 
    if (payment.getStatus() == PaymentStatus.DECLINED) {
        log.info("Payment was declined (never charged), no refund needed for orderId={}", orderId);
        // Still need to publish the refunded event to continue compensation chain
        publishRefundedEvent(payment);
        return;
    }
 
    // Process refund only if status is CHARGED
    if (payment.getStatus() != PaymentStatus.CHARGED) {
        log.warn("Cannot refund payment in unexpected status {} for orderId={}",
            payment.getStatus(), orderId);
        return;
    }
 
    // ... actual refund logic
}

Pattern 3: Idempotency Key Table

For operations that cannot use the domain entity's natural key.
Store a record of which event IDs have been processed.

// ProcessedEvent.java
@Entity
@Table(name = "processed_events")
public class ProcessedEvent {
    @Id
    private String eventId;    // From the event's eventId field
 
    private String serviceId;  // which service processed it
    private Instant processedAt;
}
// IdempotencyService.java
@Service
@RequiredArgsConstructor
public class IdempotencyService {
 
    private final ProcessedEventRepository processedEventRepository;
 
    /**
     * Returns true if this event was already processed.
     * Throws IdempotencyViolationException if this is a duplicate.
     */
    @Transactional
    public boolean isAlreadyProcessed(String eventId, String serviceId) {
        return processedEventRepository.existsByEventIdAndServiceId(eventId, serviceId);
    }
 
    @Transactional
    public void markAsProcessed(String eventId, String serviceId) {
        if (!processedEventRepository.existsByEventIdAndServiceId(eventId, serviceId)) {
            ProcessedEvent record = new ProcessedEvent(eventId, serviceId, Instant.now());
            processedEventRepository.save(record);
        }
    }
}
// Using IdempotencyService in event handler
@KafkaListener(topics = KafkaTopics.ORDER_CREATED, groupId = "payment-service-group")
@Transactional
public void handleOrderCreated(OrderCreatedEvent event, Acknowledgment ack) {
    if (idempotencyService.isAlreadyProcessed(event.eventId(), "payment-service")) {
        log.info("Event {} already processed (idempotent)", event.eventId());
        ack.acknowledge();
        return;
    }
 
    // Process the event
    paymentService.processPayment(event);
 
    // Mark as processed in same transaction as the business operation
    idempotencyService.markAsProcessed(event.eventId(), "payment-service");
 
    ack.acknowledge();
}

Pattern 4: Optimistic Locking for Concurrent Idempotency

When multiple instances of a service might process the same event concurrently:

@Transactional
public void reserveInventory(PaymentProcessedEvent event) {
    // Optimistic lock: if another thread already processed this, the version check fails
    InventoryItem item = inventoryRepository.findByProductIdWithVersion(event.productId())
        .orElseThrow(() -> new ProductNotFoundException(event.productId()));
 
    if (item.getAvailableQuantity() < event.quantity()) {
        // publish failure event
        return;
    }
 
    item.setAvailableQuantity(item.getAvailableQuantity() - event.quantity());
    // If two threads get here concurrently, one will get OptimisticLockException
    // The failed one will retry - and on retry, it will see the reservation already exists
    try {
        inventoryRepository.save(item);  // Version check enforced by @Version
    } catch (OptimisticLockingFailureException e) {
        log.info("Concurrent modification detected, retrying: productId={}", event.productId());
        throw e;  // Let retry mechanism handle this
    }
}

7. Retry Strategies and Exponential Backoff

Spring Retry Configuration

// RetryConfig.java
@Configuration
@EnableRetry
@Slf4j
public class RetryConfig {
 
    /**
     * Retry template for transient errors in event handlers.
     * Uses exponential backoff to avoid overwhelming a recovering service.
     */
    @Bean
    public RetryTemplate sagaRetryTemplate() {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3,
            Map.of(
                TransientDataAccessException.class, true,    // DB transient errors
                KafkaException.class, true,                   // Kafka connectivity
                ResourceAccessException.class, true,          // Network errors
                PaymentDeclinedException.class, false,        // Business error - no retry
                IllegalArgumentException.class, false,        // Programming error - no retry
                IllegalStateException.class, false
            ), true, true);
 
        ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
        backOff.setInitialInterval(1000L);     // 1 second initial delay
        backOff.setMultiplier(2.0);            // Double each time
        backOff.setMaxInterval(30000L);        // Max 30 seconds
 
        return RetryTemplate.builder()
            .customPolicy(retryPolicy)
            .customBackoff(backOff)
            .withListener(new LoggingRetryListener())
            .build();
    }
 
    /**
     * Retry template with jitter to prevent thundering herd.
     * Used for external API calls (payment gateway, shipping carrier).
     */
    @Bean
    public RetryTemplate externalApiRetryTemplate() {
        UniformRandomBackOffPolicy jitterBackOff = new UniformRandomBackOffPolicy();
        jitterBackOff.setMinBackOffPeriod(500L);
        jitterBackOff.setMaxBackOffPeriod(5000L);
 
        return RetryTemplate.builder()
            .maxAttempts(5)
            .customBackoff(jitterBackOff)
            .retryOn(ExternalServiceException.class)
            .notRetryOn(PaymentDeclinedException.class)
            .build();
    }
 
    /**
     * Custom listener to log retry attempts.
     */
    static class LoggingRetryListener extends RetryListenerSupport {
        private static final org.slf4j.Logger logger =
            org.slf4j.LoggerFactory.getLogger(LoggingRetryListener.class);
 
        @Override
        public <T, E extends Throwable> void onError(RetryContext context,
                RetryCallback<T, E> callback, Throwable throwable) {
            logger.warn("Retry attempt {} failed: {}",
                context.getRetryCount(), throwable.getMessage());
        }
    }
}

Kafka Retry Topics (Non-Blocking Retries)

Spring Kafka supports non-blocking retry via retry topics. This allows the consumer
to process other messages while waiting for a retry.

// KafkaRetryConfig.java
@Configuration
@EnableKafka
public class KafkaRetryConfig {
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> retryKafkaListenerContainerFactory(
            ConsumerFactory<String, Object> consumerFactory,
            KafkaTemplate<String, Object> kafkaTemplate) {
 
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
 
        factory.setConsumerFactory(consumerFactory);
 
        // Non-blocking retry: failed messages go to retry topics
        // payment.events.payment-processed -> payment.events.payment-processed-retry-0 (1s)
        //                                  -> payment.events.payment-processed-retry-1 (5s)
        //                                  -> payment.events.payment-processed-retry-2 (30s)
        //                                  -> payment.events.payment-processed.DLT
        factory.setCommonErrorHandler(
            new DefaultErrorHandler(
                new DeadLetterPublishingRecoverer(kafkaTemplate),
                new ExponentialBackOffWithMaxRetries(3) {{
                    setInitialInterval(1000L);
                    setMultiplier(5.0);
                    setMaxInterval(30000L);
                }}
            )
        );
 
        return factory;
    }
}

Using @RetryableTopic for Non-Blocking Retry

@Component
@RequiredArgsConstructor
@Slf4j
public class InventorySagaEventHandler {
 
    private final InventoryService inventoryService;
 
    @RetryableTopic(
        attempts = "4",
        backoff = @Backoff(delay = 1000, multiplier = 3.0, maxDelay = 30000),
        autoCreateTopics = "true",
        topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
        retryTopicSuffix = "-retry",
        dltTopicSuffix = ".DLT",
        include = {TransientDataAccessException.class, KafkaException.class},
        exclude = {PaymentDeclinedException.class, IllegalArgumentException.class}
    )
    @KafkaListener(
        topics = KafkaTopics.PAYMENT_PROCESSED,
        groupId = "inventory-service-group"
    )
    public void handlePaymentProcessed(PaymentProcessedEvent event, Acknowledgment ack) {
        log.info("Processing PaymentProcessedEvent: orderId={}", event.orderId());
        inventoryService.reserveInventory(event);
        ack.acknowledge();
    }
 
    @DltHandler
    public void handleDlt(PaymentProcessedEvent event,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.error("DLT received for PaymentProcessedEvent: orderId={}, topic={}",
            event.orderId(), topic);
        // Alert operations team, save for manual review
    }
}

8. Dead Letter Queue Implementation

// DltConsumerService.java - consumes and processes dead letter messages
@Service
@RequiredArgsConstructor
@Slf4j
public class DltConsumerService {
 
    private final DltMessageRepository dltRepository;
    private final AlertService alertService;
    private final SagaOrchestrator sagaOrchestrator;
 
    /**
     * Process messages that failed all retries.
     * These require human review or programmatic replay.
     */
    @KafkaListener(
        topicPattern = ".*\\.DLT",
        groupId = "dlt-consumer-group"
    )
    public void handleDeadLetter(
            ConsumerRecord<String, String> record,
            @Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage,
            @Header(KafkaHeaders.EXCEPTION_STACKTRACE) String stackTrace,
            @Header(KafkaHeaders.ORIGINAL_TOPIC) String originalTopic,
            @Header(KafkaHeaders.ORIGINAL_OFFSET) long originalOffset,
            Acknowledgment ack) {
 
        log.error("DLT message received: topic={}, originalTopic={}, key={}, error={}",
            record.topic(), originalTopic, record.key(), errorMessage);
 
        DltMessage dltMessage = DltMessage.builder()
            .messageId(UUID.randomUUID().toString())
            .originalTopic(originalTopic)
            .dltTopic(record.topic())
            .messageKey(record.key())
            .payload(record.value())
            .errorMessage(truncate(errorMessage, 2000))
            .stackTrace(truncate(stackTrace, 5000))
            .originalOffset(originalOffset)
            .receivedAt(Instant.now())
            .status("PENDING_REVIEW")
            .build();
 
        dltRepository.save(dltMessage);
 
        // Alert the team
        alertService.sendSlackAlert(
            "#saga-alerts",
            String.format(
                "ALERT: Dead letter message\nOriginal Topic: %s\nKey: %s\nError: %s",
                originalTopic, record.key(),
                errorMessage != null ? errorMessage.substring(0, Math.min(200, errorMessage.length())) : "unknown"
            )
        );
 
        ack.acknowledge();
    }
 
    /**
     * Admin endpoint to replay a dead letter message.
     * Used by operations team after fixing the underlying issue.
     */
    @Transactional
    public void replayDltMessage(String dltMessageId) {
        DltMessage dltMessage = dltRepository.findById(dltMessageId)
            .orElseThrow(() -> new IllegalArgumentException("DLT message not found: " + dltMessageId));
 
        log.info("Replaying DLT message: id={}, topic={}", dltMessageId, dltMessage.getOriginalTopic());
 
        // Re-publish to original topic
        sagaOrchestrator.replayEvent(
            dltMessage.getOriginalTopic(),
            dltMessage.getMessageKey(),
            dltMessage.getPayload()
        );
 
        dltMessage.setStatus("REPLAYED");
        dltMessage.setReplayedAt(Instant.now());
        dltRepository.save(dltMessage);
    }
}

9. Circuit Breaker with SAGAs

Circuit breakers prevent cascading failures when a downstream service is unavailable.

// PaymentGatewayCircuitBreaker.java
@Component
@RequiredArgsConstructor
@Slf4j
public class ResilientPaymentService {
 
    private final PaymentGatewayClient rawClient;
 
    // Using Resilience4j (recommended with Spring Boot 3)
    @CircuitBreaker(
        name = "paymentGateway",
        fallbackMethod = "paymentGatewayFallback"
    )
    @Retry(name = "paymentGateway")
    @TimeLimiter(name = "paymentGateway")
    public PaymentGatewayResponse chargeCard(String customerId, BigDecimal amount, String paymentId) {
        return rawClient.charge(customerId, amount, paymentId);
    }
 
    /**
     * Fallback when circuit breaker is open.
     * In SAGA context, we fail the saga step - which triggers compensation.
     */
    private PaymentGatewayResponse paymentGatewayFallback(
            String customerId, BigDecimal amount, String paymentId,
            CallNotPermittedException e) {
        log.error("Circuit breaker OPEN for payment gateway: customerId={}", customerId);
        throw new PaymentDeclinedException(
            "Payment gateway temporarily unavailable",
            "GATEWAY_CIRCUIT_OPEN"
        );
    }
}
# application.yml - Resilience4j configuration
resilience4j:
  circuitbreaker:
    instances:
      paymentGateway:
        sliding-window-type: COUNT_BASED
        sliding-window-size: 10
        failure-rate-threshold: 50 # Open circuit if 50% requests fail
        wait-duration-in-open-state: 30s # Wait 30s before trying again
        permitted-number-of-calls-in-half-open-state: 3
        register-health-indicator: true
 
  retry:
    instances:
      paymentGateway:
        max-attempts: 3
        wait-duration: 1s
        exponential-backoff-multiplier: 2
        retry-exceptions:
          - java.net.ConnectException
          - java.net.SocketTimeoutException
 
  timelimiter:
    instances:
      paymentGateway:
        timeout-duration: 5s
        cancel-running-future: true

10. Saga Timeout and Deadline Management

Every saga should have a maximum execution time to prevent eternally-running sagas.

// SagaTimeoutManager.java
@Service
@RequiredArgsConstructor
@Slf4j
public class SagaTimeoutManager {
 
    private final SagaInstanceRepository sagaRepository;
    private final OrderSagaOrchestrator orchestrator;
    private final AlertService alertService;
 
    // Configuration: different timeouts for different saga types
    private static final Map<String, Long> SAGA_TIMEOUT_MINUTES = Map.of(
        "ORDER_SAGA", 30L,           // 30 minutes to complete order saga
        "REFUND_SAGA", 60L,          // 1 hour for refund
        "RETURN_SAGA", 10080L        // 7 days for return processing
    );
 
    @Scheduled(fixedDelay = 60000)  // Check every minute
    @Transactional
    public void checkForTimedOutSagas() {
        List<SagaInstance> activeSagas = sagaRepository.findByStatus(SagaStatus.IN_PROGRESS);
 
        for (SagaInstance saga : activeSagas) {
            Long timeoutMinutes = SAGA_TIMEOUT_MINUTES.getOrDefault(saga.getSagaType(), 60L);
            Instant deadline = saga.getCreatedAt()
                .plus(timeoutMinutes, java.time.temporal.ChronoUnit.MINUTES);
 
            if (Instant.now().isAfter(deadline)) {
                handleTimedOutSaga(saga);
            }
        }
    }
 
    private void handleTimedOutSaga(SagaInstance saga) {
        log.error("SAGA TIMED OUT: sagaId={}, type={}, started={}, currentStep={}",
            saga.getSagaId(), saga.getSagaType(), saga.getCreatedAt(), saga.getCurrentStep());
 
        // Trigger compensation if in forward progress
        if (saga.getStatus() == SagaStatus.IN_PROGRESS) {
            saga.setStatus(SagaStatus.COMPENSATING);
            saga.setFailureReason("Saga timed out after " +
                SAGA_TIMEOUT_MINUTES.getOrDefault(saga.getSagaType(), 60L) + " minutes");
            sagaRepository.save(saga);
 
            // Start compensation chain
            orchestrator.startCompensation(saga);
        }
 
        // Alert the team
        alertService.sendCriticalAlert(
            "SAGA TIMEOUT",
            String.format("SagaId: %s, OrderId: %s, Step: %s",
                saga.getSagaId(), saga.getOrderId(), saga.getCurrentStep())
        );
    }
}

AWS Step Functions Timeout Configuration

In the Step Functions state machine definition, you can configure timeouts per state:

"ProcessPayment": {
  "Type": "Task",
  "Resource": "...",
  "TimeoutSeconds": 30,          // Task must complete within 30 seconds
  "HeartbeatSeconds": 10,        // Service must send heartbeat every 10 seconds
  "Catch": [{
    "ErrorEquals": ["States.Timeout"],
    "Next": "CompensateOrder",
    "ResultPath": "$.timeoutError"
  }]
}

11. Distributed Tracing with AWS X-Ray

// TracingConfig.java
@Configuration
public class TracingConfig {
 
    /**
     * AWS X-Ray segment naming strategy.
     * Each service creates a sub-segment under the root saga trace.
     */
    @Bean
    public AWSXRayRecorder awsXRayRecorder() {
        return AWSXRayRecorderBuilder.standard()
            .withPlugin(new ECSPlugin())     // On AWS ECS
            .withPlugin(new EC2Plugin())     // On AWS EC2
            .withSamplingRules(
                SamplingRulesDocument.fromJson("{\"default\":{\"fixed_target\":1,\"rate\":0.05}}")
            )
            .build();
    }
}
// SagaTracingInterceptor.java
@Component
@RequiredArgsConstructor
@Slf4j
public class SagaTracingInterceptor {
 
    private final Tracer tracer;
 
    /**
     * Starts a new trace span for each saga step.
     * Correlation: saga ID = trace ID for end-to-end visibility.
     */
    public <T> T traceStep(String sagaId, String stepName, Callable<T> operation) throws Exception {
        Span span = tracer.nextSpan()
            .name("saga." + stepName)
            .tag("saga.id", sagaId)
            .tag("saga.step", stepName)
            .start();
 
        try (Tracer.SpanInScope ws = tracer.withSpan(span)) {
            log.info("SAGA_TRACE: sagaId={}, step={}, traceId={}",
                sagaId, stepName, span.context().traceId());
            return operation.call();
        } catch (Exception e) {
            span.tag("error", e.getMessage());
            throw e;
        } finally {
            span.end();
        }
    }
}

MDC Context for Correlated Logging

// SagaMDCFilter.java
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class SagaMDCFilter implements Filter {
 
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
            throws IOException, ServletException {
        HttpServletRequest httpRequest = (HttpServletRequest) request;
 
        String sagaId = httpRequest.getHeader("X-Saga-Id");
        String traceId = httpRequest.getHeader("X-Trace-Id");
        String orderId = httpRequest.getHeader("X-Order-Id");
 
        if (sagaId != null) MDC.put("sagaId", sagaId);
        if (traceId != null) MDC.put("traceId", traceId);
        if (orderId != null) MDC.put("orderId", orderId);
 
        try {
            chain.doFilter(request, response);
        } finally {
            MDC.remove("sagaId");
            MDC.remove("traceId");
            MDC.remove("orderId");
        }
    }
}

12. Structured Logging for SAGA Events

// SagaLogger.java
@Component
@Slf4j
public class SagaLogger {
 
    /**
     * Structured log event for saga lifecycle tracking.
     * Use a log aggregator (CloudWatch Insights, Datadog) to query these.
     */
    public void logSagaEvent(String eventType, String sagaId, String orderId,
                              String step, String status, String details) {
        // Structured JSON log that can be queried in CloudWatch Insights
        log.info(
            "SAGA_EVENT type={} sagaId={} orderId={} step={} status={} details=\"{}\"",
            eventType, sagaId, orderId, step, status, details
        );
    }
 
    public void logSagaStarted(String sagaId, String orderId, String customerId) {
        logSagaEvent("SAGA_STARTED", sagaId, orderId, "N/A", "STARTED",
            "customerId=" + customerId);
    }
 
    public void logStepStarted(String sagaId, String orderId, String step) {
        logSagaEvent("STEP_STARTED", sagaId, orderId, step, "EXECUTING", "");
    }
 
    public void logStepCompleted(String sagaId, String orderId, String step, long durationMs) {
        logSagaEvent("STEP_COMPLETED", sagaId, orderId, step, "SUCCESS",
            "durationMs=" + durationMs);
    }
 
    public void logStepFailed(String sagaId, String orderId, String step, String error) {
        logSagaEvent("STEP_FAILED", sagaId, orderId, step, "FAILED", "error=" + error);
    }
 
    public void logCompensationStarted(String sagaId, String orderId, String reason) {
        logSagaEvent("COMPENSATION_STARTED", sagaId, orderId, "N/A", "COMPENSATING",
            "reason=" + reason);
    }
 
    public void logSagaCompleted(String sagaId, String orderId, long totalDurationMs) {
        logSagaEvent("SAGA_COMPLETED", sagaId, orderId, "N/A", "COMPLETED",
            "totalDurationMs=" + totalDurationMs);
    }
 
    public void logSagaCompensated(String sagaId, String orderId, String originalFailure) {
        logSagaEvent("SAGA_COMPENSATED", sagaId, orderId, "N/A", "COMPENSATED",
            "originalFailure=" + originalFailure);
    }
}

13. Complete MySQL Schema Reference

-- ==========================================================<mark class="obsidian-highlight">
-- COMPLETE SAGA DATABASE SCHEMA
-- </mark>==========================================================
 
-- 1. OUTBOX TABLE (per service, same DB as service tables)
CREATE TABLE outbox_events (
    event_id       VARCHAR(36)   NOT NULL,
    aggregate_id   VARCHAR(36)   NOT NULL,
    aggregate_type VARCHAR(50)   NOT NULL DEFAULT 'ORDER',
    topic_name     VARCHAR(255)  NOT NULL,
    event_type     VARCHAR(100)  NOT NULL,
    payload        LONGTEXT      NOT NULL,
    status         VARCHAR(20)   NOT NULL DEFAULT 'PENDING',
    retry_count    INT           NOT NULL DEFAULT 0,
    error_message  VARCHAR(1000),
    created_at     DATETIME(6)   NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    published_at   DATETIME(6),
    PRIMARY KEY (event_id),
    INDEX idx_outbox_status      (status, created_at),
    INDEX idx_outbox_aggregate   (aggregate_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
-- 2. IDEMPOTENCY TABLE (per service)
CREATE TABLE processed_events (
    event_id      VARCHAR(36)  NOT NULL,
    service_id    VARCHAR(50)  NOT NULL,
    processed_at  DATETIME(6)  NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    PRIMARY KEY (event_id, service_id),
    INDEX idx_proc_events_service (service_id, processed_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
-- Auto-cleanup after 30 days (events older than this are irrelevant)
CREATE EVENT cleanup_processed_events
ON SCHEDULE EVERY 1 DAY
DO DELETE FROM processed_events WHERE processed_at < NOW() - INTERVAL 30 DAY;
 
-- 3. SAGA INSTANCES TABLE (orchestrator service only)
CREATE TABLE saga_instances (
    saga_id        VARCHAR(36)    NOT NULL,
    saga_type      VARCHAR(100)   NOT NULL,
    order_id       VARCHAR(36),
    status         VARCHAR(30)    NOT NULL DEFAULT 'STARTED',
    current_step   VARCHAR(50)    NOT NULL,
    payload        JSON,
    failure_reason VARCHAR(1000),
    retry_count    INT            NOT NULL DEFAULT 0,
    created_at     DATETIME(6)    NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at     DATETIME(6)    NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
    completed_at   DATETIME(6),
    version        BIGINT         NOT NULL DEFAULT 0,
    PRIMARY KEY (saga_id),
    UNIQUE INDEX idx_saga_order_id (order_id),
    INDEX idx_saga_status_updated  (status, updated_at),
    INDEX idx_saga_type_status     (saga_type, status),
    INDEX idx_saga_created_at      (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
-- 4. SAGA STEP EXECUTIONS TABLE
CREATE TABLE saga_step_executions (
    execution_id     VARCHAR(36)  NOT NULL,
    saga_id          VARCHAR(36)  NOT NULL,
    step             VARCHAR(50)  NOT NULL,
    result           VARCHAR(20),  -- SUCCESS, FAILURE, PENDING
    command_payload  JSON,
    response_payload JSON,
    error_message    VARCHAR(1000),
    executed_at      DATETIME(6),
    completed_at     DATETIME(6),
    PRIMARY KEY (execution_id),
    INDEX idx_step_exec_saga_id    (saga_id),
    FOREIGN KEY fk_step_saga (saga_id) REFERENCES saga_instances(saga_id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
-- 5. DLT MESSAGES TABLE
CREATE TABLE dlt_messages (
    message_id     VARCHAR(36)   NOT NULL,
    original_topic VARCHAR(255)  NOT NULL,
    dlt_topic      VARCHAR(255)  NOT NULL,
    message_key    VARCHAR(255),
    payload        LONGTEXT      NOT NULL,
    error_message  VARCHAR(2000),
    stack_trace    TEXT,
    original_offset BIGINT,
    received_at    DATETIME(6)   NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    status         VARCHAR(30)   NOT NULL DEFAULT 'PENDING_REVIEW',
    replayed_at    DATETIME(6),
    resolved_by    VARCHAR(100),
    notes          TEXT,
    PRIMARY KEY (message_id),
    INDEX idx_dlt_status       (status),
    INDEX idx_dlt_topic        (original_topic),
    INDEX idx_dlt_received_at  (received_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
-- 6. USEFUL MONITORING QUERIES
 
-- Find active sagas
SELECT saga_id, order_id, status, current_step, created_at, updated_at
FROM saga_instances
WHERE status IN ('STARTED', 'IN_PROGRESS', 'COMPENSATING')
ORDER BY created_at ASC;
 
-- Find stuck sagas (not updated in 30 minutes)
SELECT saga_id, order_id, status, current_step, updated_at,
       TIMESTAMPDIFF(MINUTE, updated_at, NOW()) AS minutes_stuck
FROM saga_instances
WHERE status IN ('IN_PROGRESS', 'COMPENSATING')
  AND updated_at < NOW() - INTERVAL 30 MINUTE
ORDER BY updated_at ASC;
 
-- Saga success rate over last 24 hours
SELECT
    saga_type,
    COUNT(*) AS total,
    SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) AS completed,
    SUM(CASE WHEN status = 'COMPENSATED' THEN 1 ELSE 0 END) AS compensated,
    SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed,
    ROUND(100.0 * SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) / COUNT(*), 2) AS success_rate
FROM saga_instances
WHERE created_at >= NOW() - INTERVAL 24 HOUR
GROUP BY saga_type;
 
-- Average saga duration
SELECT
    saga_type,
    AVG(TIMESTAMPDIFF(SECOND, created_at, completed_at)) AS avg_duration_seconds,
    MAX(TIMESTAMPDIFF(SECOND, created_at, completed_at)) AS max_duration_seconds,
    MIN(TIMESTAMPDIFF(SECOND, created_at, completed_at)) AS min_duration_seconds
FROM saga_instances
WHERE status = 'COMPLETED'
  AND created_at >= NOW() - INTERVAL 24 HOUR
GROUP BY saga_type;
 
-- Most frequent failure steps
SELECT current_step, COUNT(*) AS failure_count, failure_reason
FROM saga_instances
WHERE status IN ('COMPENSATED', 'FAILED')
  AND created_at >= NOW() - INTERVAL 7 DAY
GROUP BY current_step, failure_reason
ORDER BY failure_count DESC
LIMIT 20;

14. Optimistic vs Pessimistic Locking in SAGAs

Pessimistic Locking

Use when multiple services might concurrently update the same record (e.g., inventory).

// Use FOR UPDATE lock - prevents concurrent reads until transaction commits
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT i FROM InventoryItem i WHERE i.productId = :productId")
Optional<InventoryItem> findByProductIdWithLock(@Param("productId") String productId);

Use pessimistic locking for: Inventory reservations, seat bookings, ticket allocation.
Cost: Reduces concurrency, increases latency. Only hold locks for milliseconds.

Optimistic Locking

Use when concurrent updates are rare but possible (e.g., order status updates).

@Entity
public class Order {
    @Version
    private Long version;  // JPA manages this automatically
 
    // If two threads update the same order concurrently,
    // one will get OptimisticLockingFailureException
    // This prevents lost updates without holding locks
}

Use optimistic locking for: Order status updates, saga state transitions.
Benefit: No database locks held, high concurrency.
Cost: Must handle retry logic on OptimisticLockingFailureException.


15. Semantic Locking

Semantic locking is an application-level technique to prevent dirty reads of saga state.

// Add a status field that indicates "saga in progress"
// Other services check this before reading data
 
@Entity
public class Order {
    private OrderStatus status;  // PENDING, CONFIRMED, etc.
    private boolean sagaInProgress;   // semantic lock flag
    private String currentSagaStep;   // which step is running
}
// Customer-facing API checks semantic lock
@GetMapping("/orders/{orderId}")
public OrderResponse getOrder(@PathVariable String orderId) {
    Order order = orderRepository.findById(orderId).orElseThrow();
 
    if (order.isSagaInProgress()) {
        // Return a "processing" response, not the partial state
        return OrderResponse.processing(orderId, order.getCurrentSagaStep());
    }
 
    return OrderResponse.from(order);
}

16. Production Health Checks and Metrics

// SagaHealthController.java - Admin endpoint for ops team
@RestController
@RequestMapping("/admin/saga")
@RequiredArgsConstructor
public class SagaAdminController {
 
    private final SagaInstanceRepository sagaRepository;
    private final OutboxEventRepository outboxRepository;
 
    @GetMapping("/health")
    public ResponseEntity<SagaHealthReport> getSagaHealth() {
        SagaHealthReport report = SagaHealthReport.builder()
            .activeSagas(sagaRepository.countByStatusSince(SagaStatus.IN_PROGRESS,
                Instant.now().minus(1, ChronoUnit.HOURS)))
            .stuckSagas(sagaRepository.findStuckSagas(
                Instant.now().minus(30, ChronoUnit.MINUTES)).size())
            .failedSagas(sagaRepository.countByStatusSince(SagaStatus.FAILED,
                Instant.now().minus(24, ChronoUnit.HOURS)))
            .pendingOutboxEvents(outboxRepository.countStuckPendingEvents(
                Instant.now().minus(5, ChronoUnit.MINUTES)))
            .timestamp(Instant.now())
            .build();
 
        boolean healthy = report.stuckSagas() == 0 &&
                          report.failedSagas() < 10 &&
                          report.pendingOutboxEvents() < 100;
 
        return ResponseEntity
            .status(healthy ? HttpStatus.OK : HttpStatus.SERVICE_UNAVAILABLE)
            .body(report);
    }
 
    @PostMapping("/{sagaId}/replay")
    public ResponseEntity<Void> replaySaga(@PathVariable String sagaId) {
        // Admin endpoint to manually replay a failed/stuck saga
        // ... implementation
        return ResponseEntity.accepted().build();
    }
}

17. Configuration Management

# Complete production application.yml
 
spring:
  application:
    name: order-service
 
  # Database Configuration
  datasource:
    url: jdbc:mysql://${DB_HOST}:3306/${DB_NAME}?useSSL=true&requireSSL=true&serverTimezone=UTC
    username: ${DB_USERNAME}
    password: ${DB_PASSWORD}
    hikari:
      pool-name: ${spring.application.name}-pool
      minimum-idle: 5
      maximum-pool-size: 20
      connection-timeout: 30000
      idle-timeout: 600000
      max-lifetime: 1800000
      leak-detection-threshold: 60000 # Warn if connection held > 60s
 
  jpa:
    hibernate:
      ddl-auto: validate
    open-in-view: false # MUST be false in production
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQLDialect
        format_sql: false
        generate_statistics: false
        jdbc:
          batch_size: 20
          order_inserts: true
          order_updates: true
        cache:
          use_second_level_cache: false # Disable L2 cache for saga entities
 
  # Kafka Configuration
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
    consumer:
      group-id: ${spring.application.name}-group
      auto-offset-reset: earliest
      enable-auto-commit: false
      max-poll-records: 10
      max-poll-interval-ms: 300000
      session-timeout-ms: 30000
      heartbeat-interval-ms: 10000
      properties:
        isolation.level: read_committed # Only read committed messages (transactional producers)
    producer:
      acks: all
      retries: 5
      batch-size: 16384
      linger-ms: 5
      compression-type: lz4
      properties:
        enable.idempotence: true
        max.in.flight.requests.per.connection: 1
        transaction.timeout.ms: 60000
 
# Outbox Configuration
outbox:
  publisher:
    enabled: true
    delay-ms: 500
    retry-delay-ms: 30000
    batch-size: 50
    max-retries: 5
 
# Saga Configuration
saga:
  timeout:
    order-saga-minutes: 30
    refund-saga-minutes: 60
  recovery:
    enabled: true
    check-interval-minutes: 5
    stuck-threshold-minutes: 30
 
# Resilience4j
resilience4j:
  circuitbreaker:
    instances:
      paymentGateway:
        sliding-window-size: 20
        failure-rate-threshold: 50
        wait-duration-in-open-state: 30s
      shippingCarrier:
        sliding-window-size: 10
        failure-rate-threshold: 60
        wait-duration-in-open-state: 60s
 
# Actuator
management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,prometheus,circuitbreakers,retries
  endpoint:
    health:
      show-details: always
      probes:
        enabled: true # /health/liveness and /health/readiness for k8s
  metrics:
    export:
      cloudwatch:
        namespace: SagaMetrics
        step: 1m
    tags:
      service: ${spring.application.name}
      environment: ${ENVIRONMENT:dev}
      region: ${AWS_REGION:us-east-1}

18. Summary

PatternPurposeKey Rule
Transactional OutboxSolve dual-write problemWrite to outbox in same DB transaction as business data
Debezium CDCLow-latency outbox publishingReads MySQL binlog, no polling overhead
IdempotencyHandle duplicate events safelyEvery handler must check if already processed
Unique ConstraintDatabase-level idempotencyAdd UNIQUE INDEX on natural key per aggregate
Exponential BackoffSmart retry for transient failuresDouble delay each retry, add jitter
Dead Letter QueueHandle permanently failed eventsAlert team, save for manual review/replay
Circuit BreakerStop cascading failuresOpen circuit when failure rate too high
Saga TimeoutPrevent eternally-running sagasCheck and compensate sagas exceeding deadline
Semantic LockingPrevent dirty reads during sagaStatus flag: sagaInProgress=true while active
Structured LoggingOperational visibilitysagaId, orderId in every log line as MDC keys
Distributed TracingEnd-to-end saga visibilityUse sagaId as trace ID across all services

Next: Part 5 - Advanced Patterns

Explore SAGA combined with CQRS, Event Sourcing, parallel saga steps, sub-sagas,
and large-scale AWS architecture patterns.


Series Navigation: Index |
Part 1 |
Part 2 |
Part 3 | Part 4 |
Part 5 |
Part 6 |
Part 7