Part 4: SAGA Deep Dive Implementation - Production Patterns
Series Navigation: Index |
Part 1 |
Part 2 |
Part 3 | Part 4 |
Part 5 - Advanced |
Part 6 - Pitfalls |
Part 7 - Interview
Table of Contents
- The Dual-Write Problem
- Transactional Outbox Pattern
- Outbox: Full Spring Boot Implementation
- Outbox with Debezium CDC (Production Grade)
- Idempotency: Why It Is Non-Negotiable
- Idempotency Implementation Patterns
- Retry Strategies and Exponential Backoff
- Dead Letter Queue Implementation
- Circuit Breaker with SAGAs
- Saga Timeout and Deadline Management
- Distributed Tracing with AWS X-Ray
- Structured Logging for SAGA Events
- Complete MySQL Schema Reference
- Optimistic vs Pessimistic Locking in SAGAs
- Semantic Locking
- Production Health Checks and Metrics
- Configuration Management
- Summary
1. The Dual-Write Problem
This is arguably the most dangerous and frequently misunderstood problem in event-driven sagas.
What Is It?
In a SAGA step, you need to do TWO things:
- Write to your database (local transaction)
- Publish an event to Kafka/SQS
These are two SEPARATE operations. If the first succeeds and the second fails, your
database has committed data but no downstream service knows about it. The saga stalls.
THE NAIVE (WRONG) APPROACH:
@Transactional
public void processPayment(OrderCreatedEvent event) {
// Step 1: Write to DB
payment = paymentRepository.save(new Payment(...)); // commits
// APPLICATION CRASHES HERE
// Step 2: Publish event - NEVER EXECUTES
kafkaTemplate.send("payment.processed", paymentEvent); // skipped!
// Result: payment row in DB, no Kafka event
// Inventory service never knows payment was made
// Saga is STUCK
}
ANOTHER FAILURE CASE:
@Transactional
public void processPayment(OrderCreatedEvent event) {
payment = paymentRepository.save(new Payment(...)); // Step 1
kafkaTemplate.send("payment.processed", event).get(); // Step 2 - Kafka timeout!
// Kafka call throws exception
// Spring rolls back the DB transaction
// But Kafka MIGHT have received and processed the message
// Kafka is NOT transactional with MySQL
// Result: no payment in DB, but message might be in Kafka
// Downstream services process an event for a non-existent payment
}
The root cause: There is NO atomic operation that writes to a relational database AND
a message broker simultaneously. They are fundamentally different systems.
2. Transactional Outbox Pattern
The Outbox pattern solves dual-write by making the database the single source of truth.
The Concept
Instead of writing to the database AND publishing to Kafka in the same transaction,
you write to the database AND write to an outbox table in the SAME transaction.
A separate process reads the outbox table and publishes to Kafka.
BEFORE (dual-write):
App -----> MySQL (commit)
-----> Kafka (may fail)
AFTER (outbox):
App -----> MySQL: { payment_record, outbox_event } (single atomic commit)
Outbox Publisher -----> reads outbox_events table
-----> publishes to Kafka
-----> updates outbox_event status to PUBLISHED
Why This Works
The first write (DB + outbox) is one atomic transaction. Either BOTH the payment record
and the outbox event are persisted, or NEITHER is. No split state.
The outbox publisher is a separate process that will keep retrying until the message
is published to Kafka. It is resilient to temporary Kafka failures.
Outbox Guarantees
- At-Least-Once Delivery: An event may be published more than once (if the
publisher crashes after publishing but before marking as PUBLISHED). Consumers
MUST be idempotent. - Eventually Published: Events will eventually reach Kafka as long as the
outbox publisher is running. - Ordered per Aggregate: By using the order ID as the Kafka partition key,
events for the same order are always delivered in order.
3. Outbox: Full Spring Boot Implementation
Outbox Entity
// OutboxEvent.java
package com.example.common.outbox;
import jakarta.persistence.*;
import lombok.*;
import java.time.Instant;
@Entity
@Table(name = "outbox_events", indexes = {
@Index(name = "idx_outbox_status_created", columnList = "status, created_at"),
@Index(name = "idx_outbox_aggregate", columnList = "aggregate_id")
})
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class OutboxEvent {
public enum OutboxStatus {
PENDING,
PUBLISHED,
FAILED
}
@Id
@Column(name = "event_id", length = 36)
private String eventId;
@Column(name = "aggregate_id", nullable = false, length = 36)
private String aggregateId; // e.g., orderId - used as Kafka partition key
@Column(name = "aggregate_type", length = 50)
private String aggregateType; // e.g., "ORDER"
@Column(name = "topic_name", nullable = false, length = 255)
private String topicName;
@Column(name = "event_type", nullable = false, length = 100)
private String eventType;
@Column(name = "payload", nullable = false, columnDefinition = "LONGTEXT")
private String payload;
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false, length = 20)
@Builder.Default
private OutboxStatus status = OutboxStatus.PENDING;
@Column(name = "retry_count")
@Builder.Default
private int retryCount = 0;
@Column(name = "error_message", length = 1000)
private String errorMessage;
@Column(name = "created_at", updatable = false)
private Instant createdAt;
@Column(name = "published_at")
private Instant publishedAt;
@PrePersist
public void prePersist() {
this.createdAt = Instant.now();
}
}Outbox Repository
// OutboxEventRepository.java
package com.example.common.outbox;
import org.springframework.data.jpa.repository.*;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.List;
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, String> {
@Lock(jakarta.persistence.LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT o FROM OutboxEvent o WHERE o.status = 'PENDING' ORDER BY o.createdAt ASC")
@QueryHints(@QueryHint(name = "jakarta.persistence.lock.timeout", value = "0"))
List<OutboxEvent> findPendingEventsWithLock(org.springframework.data.domain.Pageable pageable);
@Query("SELECT o FROM OutboxEvent o WHERE o.status = 'FAILED' AND o.retryCount < :maxRetries")
List<OutboxEvent> findRetryableFailedEvents(@Param("maxRetries") int maxRetries,
org.springframework.data.domain.Pageable pageable);
@Modifying
@Transactional
@Query("UPDATE OutboxEvent o SET o.status = 'PUBLISHED', o.publishedAt = :now WHERE o.eventId = :eventId")
void markAsPublished(@Param("eventId") String eventId, @Param("now") Instant now);
@Query("SELECT COUNT(o) FROM OutboxEvent o WHERE o.status = 'PENDING' AND o.createdAt < :threshold")
long countStuckPendingEvents(@Param("threshold") Instant threshold);
}Outbox Publisher Service
// OutboxPublisherService.java
package com.example.common.outbox;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.PageRequest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Service
@RequiredArgsConstructor
@Slf4j
public class OutboxPublisherService {
private final OutboxEventRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
private final MeterRegistry meterRegistry;
private static final int BATCH_SIZE = 50;
private static final int MAX_RETRIES = 5;
/**
* Polls outbox table every 500ms and publishes pending events.
* In production, use Debezium CDC instead of polling for lower latency.
*/
@Scheduled(fixedDelayString = "${outbox.publisher.delay-ms:500}")
public void publishPendingEvents() {
List<OutboxEvent> pendingEvents = outboxRepository
.findPendingEventsWithLock(PageRequest.of(0, BATCH_SIZE));
if (pendingEvents.isEmpty()) {
return;
}
log.debug("Processing {} pending outbox events", pendingEvents.size());
for (OutboxEvent event : pendingEvents) {
publishEvent(event);
}
}
/**
* Separate scheduled job for retrying failed events.
* Less frequent to avoid hammering Kafka during outages.
*/
@Scheduled(fixedDelayString = "${outbox.publisher.retry-delay-ms:30000}")
public void retryFailedEvents() {
List<OutboxEvent> failedEvents = outboxRepository
.findRetryableFailedEvents(MAX_RETRIES, PageRequest.of(0, 20));
for (OutboxEvent event : failedEvents) {
publishEvent(event);
}
}
/**
* Each event is published in its own transaction.
* If one fails, others can still succeed.
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
protected void publishEvent(OutboxEvent event) {
try {
// Synchronous send with timeout for reliability
kafkaTemplate.send(
event.getTopicName(),
event.getAggregateId(), // Key = orderId for partition ordering
event.getPayload()
).get(10, TimeUnit.SECONDS);
// Mark as published
event.setStatus(OutboxEvent.OutboxStatus.PUBLISHED);
event.setPublishedAt(Instant.now());
outboxRepository.save(event);
meterRegistry.counter("outbox.published",
"topic", event.getTopicName(),
"eventType", event.getEventType()).increment();
log.debug("Published outbox event: eventId={}, topic={}, type={}",
event.getEventId(), event.getTopicName(), event.getEventType());
} catch (Exception e) {
log.error("Failed to publish outbox event: eventId={}, topic={}, attempt={}",
event.getEventId(), event.getTopicName(), event.getRetryCount() + 1, e);
event.setRetryCount(event.getRetryCount() + 1);
event.setErrorMessage(truncate(e.getMessage(), 990));
if (event.getRetryCount() >= MAX_RETRIES) {
event.setStatus(OutboxEvent.OutboxStatus.FAILED);
meterRegistry.counter("outbox.failed",
"topic", event.getTopicName()).increment();
log.error("Outbox event marked FAILED after {} attempts: eventId={}",
MAX_RETRIES, event.getEventId());
}
outboxRepository.save(event);
}
}
private String truncate(String str, int maxLength) {
if (str == null) return null;
return str.length() <= maxLength ? str : str.substring(0, maxLength);
}
}Health Check for Outbox
// OutboxHealthIndicator.java
package com.example.common.outbox;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.actuate.health.*;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
@Component
@RequiredArgsConstructor
public class OutboxHealthIndicator implements HealthIndicator {
private final OutboxEventRepository outboxRepository;
private static final long STUCK_THRESHOLD_MINUTES = 10L;
private static final long MAX_ACCEPTABLE_STUCK = 5L;
@Override
public Health health() {
Instant threshold = Instant.now().minus(STUCK_THRESHOLD_MINUTES, ChronoUnit.MINUTES);
long stuckCount = outboxRepository.countStuckPendingEvents(threshold);
if (stuckCount == 0) {
return Health.up()
.withDetail("stuckEvents", 0)
.build();
} else if (stuckCount <= MAX_ACCEPTABLE_STUCK) {
return Health.up()
.withDetail("stuckEvents", stuckCount)
.withDetail("warning", "Some events delayed")
.build();
} else {
return Health.down()
.withDetail("stuckEvents", stuckCount)
.withDetail("error", "Too many stuck outbox events - Kafka may be unreachable")
.build();
}
}
}4. Outbox with Debezium CDC (Production Grade)
Debezium is a Change Data Capture (CDC) tool that reads the MySQL binary log (binlog)
and publishes changes to Kafka. This is more reliable than polling and has sub-second latency.
How It Works
MySQL Binary Log -> Debezium Connector -> Kafka Connect -> Kafka Topic
1. App writes to outbox_events table (same transaction as business data)
2. MySQL commits the transaction and writes to binlog
3. Debezium reads binlog changes (INSERT into outbox_events)
4. Debezium publishes the event to the target Kafka topic
5. No polling, no duplicate checking, guaranteed delivery
Debezium Connector Configuration (JSON)
{
"name": "order-service-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "rds-mysql-instance.cluster-xyz.us-east-1.rds.amazonaws.com",
"database.port": "3306",
"database.user": "${DB_CDC_USER}",
"database.password": "${DB_CDC_PASSWORD}",
"database.server.id": "184054",
"database.server.name": "order-service",
"database.include.list": "orders_db",
"table.include.list": "orders_db.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "event_id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.field.event.timestamp": "created_at",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.route.by.field": "topic_name",
"transforms.outbox.route.topic.replacement": "${routedByValue}",
"tombstones.on.delete": "false",
"snapshot.mode": "initial",
"database.history.kafka.bootstrap.servers": "msk-broker:9092",
"database.history.kafka.topic": "schema-changes.order-service"
}
}MySQL User for CDC (Least Privilege)
-- Create CDC-specific user with minimal permissions
CREATE USER 'debezium_cdc'@'%' IDENTIFIED BY '${SECURE_PASSWORD}';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'debezium_cdc'@'%';
GRANT SELECT ON orders_db.outbox_events TO 'debezium_cdc'@'%';
FLUSH PRIVILEGES;
-- Enable binary logging in MySQL (in AWS RDS: set binlog_format=ROW)
-- This is typically done via RDS Parameter Group5. Idempotency: Why It Is Non-Negotiable
In a SAGA with at-least-once event delivery, the same event WILL arrive multiple times.
This can happen because:
- Network retry by the consumer after processing but before acknowledgment
- Kafka rebalance during consumer group restart
- Outbox publisher published the same event twice (crash between send and mark-published)
- Manual replay of events during incident recovery
Every single event handler MUST be idempotent.
Without idempotency, you will:
- Double-charge customers
- Double-reserve inventory
- Double-ship orders
- Create duplicate compensation actions (double-refund)
6. Idempotency Implementation Patterns
Pattern 1: Database Unique Constraint
The simplest and most reliable approach. Use a database unique constraint on the
natural key of the operation.
// Payment table has: UNIQUE INDEX on order_id
// This prevents two payments for the same order
@Transactional
public void processPayment(OrderCreatedEvent event) {
// This WILL throw a DataIntegrityViolationException if called twice
// due to UNIQUE INDEX on order_id
Payment payment = Payment.builder()
.orderId(event.orderId())
.amount(event.totalAmount())
// ...
.build();
try {
paymentRepository.save(payment); // Unique constraint enforced here
} catch (DataIntegrityViolationException e) {
// Already processed - this is OK, just log and return
log.info("Payment already exists for orderId={} (idempotent), skipping",
event.orderId());
return;
}
// Continue processing...
}-- Enforce uniqueness at database level
ALTER TABLE payments ADD UNIQUE INDEX idx_payments_order_id_unique (order_id);Pattern 2: Explicit Idempotency Check
Check before writing. Works for compensations where the natural key check is less obvious.
@Transactional
public void refundPayment(String orderId, String reason) {
Payment payment = paymentRepository.findByOrderId(orderId)
.orElseThrow(() -> new PaymentNotFoundException(orderId));
// EXPLICIT CHECK: already in terminal compensation state?
if (payment.getStatus() == PaymentStatus.REFUNDED) {
log.info("Payment for orderId={} already refunded (idempotent check)", orderId);
return; // Safe to return - compensation was already done
}
if (payment.getStatus() == PaymentStatus.DECLINED) {
log.info("Payment was declined (never charged), no refund needed for orderId={}", orderId);
// Still need to publish the refunded event to continue compensation chain
publishRefundedEvent(payment);
return;
}
// Process refund only if status is CHARGED
if (payment.getStatus() != PaymentStatus.CHARGED) {
log.warn("Cannot refund payment in unexpected status {} for orderId={}",
payment.getStatus(), orderId);
return;
}
// ... actual refund logic
}Pattern 3: Idempotency Key Table
For operations that cannot use the domain entity's natural key.
Store a record of which event IDs have been processed.
// ProcessedEvent.java
@Entity
@Table(name = "processed_events")
public class ProcessedEvent {
@Id
private String eventId; // From the event's eventId field
private String serviceId; // which service processed it
private Instant processedAt;
}// IdempotencyService.java
@Service
@RequiredArgsConstructor
public class IdempotencyService {
private final ProcessedEventRepository processedEventRepository;
/**
* Returns true if this event was already processed.
* Throws IdempotencyViolationException if this is a duplicate.
*/
@Transactional
public boolean isAlreadyProcessed(String eventId, String serviceId) {
return processedEventRepository.existsByEventIdAndServiceId(eventId, serviceId);
}
@Transactional
public void markAsProcessed(String eventId, String serviceId) {
if (!processedEventRepository.existsByEventIdAndServiceId(eventId, serviceId)) {
ProcessedEvent record = new ProcessedEvent(eventId, serviceId, Instant.now());
processedEventRepository.save(record);
}
}
}// Using IdempotencyService in event handler
@KafkaListener(topics = KafkaTopics.ORDER_CREATED, groupId = "payment-service-group")
@Transactional
public void handleOrderCreated(OrderCreatedEvent event, Acknowledgment ack) {
if (idempotencyService.isAlreadyProcessed(event.eventId(), "payment-service")) {
log.info("Event {} already processed (idempotent)", event.eventId());
ack.acknowledge();
return;
}
// Process the event
paymentService.processPayment(event);
// Mark as processed in same transaction as the business operation
idempotencyService.markAsProcessed(event.eventId(), "payment-service");
ack.acknowledge();
}Pattern 4: Optimistic Locking for Concurrent Idempotency
When multiple instances of a service might process the same event concurrently:
@Transactional
public void reserveInventory(PaymentProcessedEvent event) {
// Optimistic lock: if another thread already processed this, the version check fails
InventoryItem item = inventoryRepository.findByProductIdWithVersion(event.productId())
.orElseThrow(() -> new ProductNotFoundException(event.productId()));
if (item.getAvailableQuantity() < event.quantity()) {
// publish failure event
return;
}
item.setAvailableQuantity(item.getAvailableQuantity() - event.quantity());
// If two threads get here concurrently, one will get OptimisticLockException
// The failed one will retry - and on retry, it will see the reservation already exists
try {
inventoryRepository.save(item); // Version check enforced by @Version
} catch (OptimisticLockingFailureException e) {
log.info("Concurrent modification detected, retrying: productId={}", event.productId());
throw e; // Let retry mechanism handle this
}
}7. Retry Strategies and Exponential Backoff
Spring Retry Configuration
// RetryConfig.java
@Configuration
@EnableRetry
@Slf4j
public class RetryConfig {
/**
* Retry template for transient errors in event handlers.
* Uses exponential backoff to avoid overwhelming a recovering service.
*/
@Bean
public RetryTemplate sagaRetryTemplate() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3,
Map.of(
TransientDataAccessException.class, true, // DB transient errors
KafkaException.class, true, // Kafka connectivity
ResourceAccessException.class, true, // Network errors
PaymentDeclinedException.class, false, // Business error - no retry
IllegalArgumentException.class, false, // Programming error - no retry
IllegalStateException.class, false
), true, true);
ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
backOff.setInitialInterval(1000L); // 1 second initial delay
backOff.setMultiplier(2.0); // Double each time
backOff.setMaxInterval(30000L); // Max 30 seconds
return RetryTemplate.builder()
.customPolicy(retryPolicy)
.customBackoff(backOff)
.withListener(new LoggingRetryListener())
.build();
}
/**
* Retry template with jitter to prevent thundering herd.
* Used for external API calls (payment gateway, shipping carrier).
*/
@Bean
public RetryTemplate externalApiRetryTemplate() {
UniformRandomBackOffPolicy jitterBackOff = new UniformRandomBackOffPolicy();
jitterBackOff.setMinBackOffPeriod(500L);
jitterBackOff.setMaxBackOffPeriod(5000L);
return RetryTemplate.builder()
.maxAttempts(5)
.customBackoff(jitterBackOff)
.retryOn(ExternalServiceException.class)
.notRetryOn(PaymentDeclinedException.class)
.build();
}
/**
* Custom listener to log retry attempts.
*/
static class LoggingRetryListener extends RetryListenerSupport {
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(LoggingRetryListener.class);
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
logger.warn("Retry attempt {} failed: {}",
context.getRetryCount(), throwable.getMessage());
}
}
}Kafka Retry Topics (Non-Blocking Retries)
Spring Kafka supports non-blocking retry via retry topics. This allows the consumer
to process other messages while waiting for a retry.
// KafkaRetryConfig.java
@Configuration
@EnableKafka
public class KafkaRetryConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> retryKafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory,
KafkaTemplate<String, Object> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// Non-blocking retry: failed messages go to retry topics
// payment.events.payment-processed -> payment.events.payment-processed-retry-0 (1s)
// -> payment.events.payment-processed-retry-1 (5s)
// -> payment.events.payment-processed-retry-2 (30s)
// -> payment.events.payment-processed.DLT
factory.setCommonErrorHandler(
new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate),
new ExponentialBackOffWithMaxRetries(3) {{
setInitialInterval(1000L);
setMultiplier(5.0);
setMaxInterval(30000L);
}}
)
);
return factory;
}
}Using @RetryableTopic for Non-Blocking Retry
@Component
@RequiredArgsConstructor
@Slf4j
public class InventorySagaEventHandler {
private final InventoryService inventoryService;
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000, multiplier = 3.0, maxDelay = 30000),
autoCreateTopics = "true",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
retryTopicSuffix = "-retry",
dltTopicSuffix = ".DLT",
include = {TransientDataAccessException.class, KafkaException.class},
exclude = {PaymentDeclinedException.class, IllegalArgumentException.class}
)
@KafkaListener(
topics = KafkaTopics.PAYMENT_PROCESSED,
groupId = "inventory-service-group"
)
public void handlePaymentProcessed(PaymentProcessedEvent event, Acknowledgment ack) {
log.info("Processing PaymentProcessedEvent: orderId={}", event.orderId());
inventoryService.reserveInventory(event);
ack.acknowledge();
}
@DltHandler
public void handleDlt(PaymentProcessedEvent event,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.error("DLT received for PaymentProcessedEvent: orderId={}, topic={}",
event.orderId(), topic);
// Alert operations team, save for manual review
}
}8. Dead Letter Queue Implementation
// DltConsumerService.java - consumes and processes dead letter messages
@Service
@RequiredArgsConstructor
@Slf4j
public class DltConsumerService {
private final DltMessageRepository dltRepository;
private final AlertService alertService;
private final SagaOrchestrator sagaOrchestrator;
/**
* Process messages that failed all retries.
* These require human review or programmatic replay.
*/
@KafkaListener(
topicPattern = ".*\\.DLT",
groupId = "dlt-consumer-group"
)
public void handleDeadLetter(
ConsumerRecord<String, String> record,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage,
@Header(KafkaHeaders.EXCEPTION_STACKTRACE) String stackTrace,
@Header(KafkaHeaders.ORIGINAL_TOPIC) String originalTopic,
@Header(KafkaHeaders.ORIGINAL_OFFSET) long originalOffset,
Acknowledgment ack) {
log.error("DLT message received: topic={}, originalTopic={}, key={}, error={}",
record.topic(), originalTopic, record.key(), errorMessage);
DltMessage dltMessage = DltMessage.builder()
.messageId(UUID.randomUUID().toString())
.originalTopic(originalTopic)
.dltTopic(record.topic())
.messageKey(record.key())
.payload(record.value())
.errorMessage(truncate(errorMessage, 2000))
.stackTrace(truncate(stackTrace, 5000))
.originalOffset(originalOffset)
.receivedAt(Instant.now())
.status("PENDING_REVIEW")
.build();
dltRepository.save(dltMessage);
// Alert the team
alertService.sendSlackAlert(
"#saga-alerts",
String.format(
"ALERT: Dead letter message\nOriginal Topic: %s\nKey: %s\nError: %s",
originalTopic, record.key(),
errorMessage != null ? errorMessage.substring(0, Math.min(200, errorMessage.length())) : "unknown"
)
);
ack.acknowledge();
}
/**
* Admin endpoint to replay a dead letter message.
* Used by operations team after fixing the underlying issue.
*/
@Transactional
public void replayDltMessage(String dltMessageId) {
DltMessage dltMessage = dltRepository.findById(dltMessageId)
.orElseThrow(() -> new IllegalArgumentException("DLT message not found: " + dltMessageId));
log.info("Replaying DLT message: id={}, topic={}", dltMessageId, dltMessage.getOriginalTopic());
// Re-publish to original topic
sagaOrchestrator.replayEvent(
dltMessage.getOriginalTopic(),
dltMessage.getMessageKey(),
dltMessage.getPayload()
);
dltMessage.setStatus("REPLAYED");
dltMessage.setReplayedAt(Instant.now());
dltRepository.save(dltMessage);
}
}9. Circuit Breaker with SAGAs
Circuit breakers prevent cascading failures when a downstream service is unavailable.
// PaymentGatewayCircuitBreaker.java
@Component
@RequiredArgsConstructor
@Slf4j
public class ResilientPaymentService {
private final PaymentGatewayClient rawClient;
// Using Resilience4j (recommended with Spring Boot 3)
@CircuitBreaker(
name = "paymentGateway",
fallbackMethod = "paymentGatewayFallback"
)
@Retry(name = "paymentGateway")
@TimeLimiter(name = "paymentGateway")
public PaymentGatewayResponse chargeCard(String customerId, BigDecimal amount, String paymentId) {
return rawClient.charge(customerId, amount, paymentId);
}
/**
* Fallback when circuit breaker is open.
* In SAGA context, we fail the saga step - which triggers compensation.
*/
private PaymentGatewayResponse paymentGatewayFallback(
String customerId, BigDecimal amount, String paymentId,
CallNotPermittedException e) {
log.error("Circuit breaker OPEN for payment gateway: customerId={}", customerId);
throw new PaymentDeclinedException(
"Payment gateway temporarily unavailable",
"GATEWAY_CIRCUIT_OPEN"
);
}
}# application.yml - Resilience4j configuration
resilience4j:
circuitbreaker:
instances:
paymentGateway:
sliding-window-type: COUNT_BASED
sliding-window-size: 10
failure-rate-threshold: 50 # Open circuit if 50% requests fail
wait-duration-in-open-state: 30s # Wait 30s before trying again
permitted-number-of-calls-in-half-open-state: 3
register-health-indicator: true
retry:
instances:
paymentGateway:
max-attempts: 3
wait-duration: 1s
exponential-backoff-multiplier: 2
retry-exceptions:
- java.net.ConnectException
- java.net.SocketTimeoutException
timelimiter:
instances:
paymentGateway:
timeout-duration: 5s
cancel-running-future: true10. Saga Timeout and Deadline Management
Every saga should have a maximum execution time to prevent eternally-running sagas.
// SagaTimeoutManager.java
@Service
@RequiredArgsConstructor
@Slf4j
public class SagaTimeoutManager {
private final SagaInstanceRepository sagaRepository;
private final OrderSagaOrchestrator orchestrator;
private final AlertService alertService;
// Configuration: different timeouts for different saga types
private static final Map<String, Long> SAGA_TIMEOUT_MINUTES = Map.of(
"ORDER_SAGA", 30L, // 30 minutes to complete order saga
"REFUND_SAGA", 60L, // 1 hour for refund
"RETURN_SAGA", 10080L // 7 days for return processing
);
@Scheduled(fixedDelay = 60000) // Check every minute
@Transactional
public void checkForTimedOutSagas() {
List<SagaInstance> activeSagas = sagaRepository.findByStatus(SagaStatus.IN_PROGRESS);
for (SagaInstance saga : activeSagas) {
Long timeoutMinutes = SAGA_TIMEOUT_MINUTES.getOrDefault(saga.getSagaType(), 60L);
Instant deadline = saga.getCreatedAt()
.plus(timeoutMinutes, java.time.temporal.ChronoUnit.MINUTES);
if (Instant.now().isAfter(deadline)) {
handleTimedOutSaga(saga);
}
}
}
private void handleTimedOutSaga(SagaInstance saga) {
log.error("SAGA TIMED OUT: sagaId={}, type={}, started={}, currentStep={}",
saga.getSagaId(), saga.getSagaType(), saga.getCreatedAt(), saga.getCurrentStep());
// Trigger compensation if in forward progress
if (saga.getStatus() == SagaStatus.IN_PROGRESS) {
saga.setStatus(SagaStatus.COMPENSATING);
saga.setFailureReason("Saga timed out after " +
SAGA_TIMEOUT_MINUTES.getOrDefault(saga.getSagaType(), 60L) + " minutes");
sagaRepository.save(saga);
// Start compensation chain
orchestrator.startCompensation(saga);
}
// Alert the team
alertService.sendCriticalAlert(
"SAGA TIMEOUT",
String.format("SagaId: %s, OrderId: %s, Step: %s",
saga.getSagaId(), saga.getOrderId(), saga.getCurrentStep())
);
}
}AWS Step Functions Timeout Configuration
In the Step Functions state machine definition, you can configure timeouts per state:
"ProcessPayment": {
"Type": "Task",
"Resource": "...",
"TimeoutSeconds": 30, // Task must complete within 30 seconds
"HeartbeatSeconds": 10, // Service must send heartbeat every 10 seconds
"Catch": [{
"ErrorEquals": ["States.Timeout"],
"Next": "CompensateOrder",
"ResultPath": "$.timeoutError"
}]
}11. Distributed Tracing with AWS X-Ray
// TracingConfig.java
@Configuration
public class TracingConfig {
/**
* AWS X-Ray segment naming strategy.
* Each service creates a sub-segment under the root saga trace.
*/
@Bean
public AWSXRayRecorder awsXRayRecorder() {
return AWSXRayRecorderBuilder.standard()
.withPlugin(new ECSPlugin()) // On AWS ECS
.withPlugin(new EC2Plugin()) // On AWS EC2
.withSamplingRules(
SamplingRulesDocument.fromJson("{\"default\":{\"fixed_target\":1,\"rate\":0.05}}")
)
.build();
}
}// SagaTracingInterceptor.java
@Component
@RequiredArgsConstructor
@Slf4j
public class SagaTracingInterceptor {
private final Tracer tracer;
/**
* Starts a new trace span for each saga step.
* Correlation: saga ID = trace ID for end-to-end visibility.
*/
public <T> T traceStep(String sagaId, String stepName, Callable<T> operation) throws Exception {
Span span = tracer.nextSpan()
.name("saga." + stepName)
.tag("saga.id", sagaId)
.tag("saga.step", stepName)
.start();
try (Tracer.SpanInScope ws = tracer.withSpan(span)) {
log.info("SAGA_TRACE: sagaId={}, step={}, traceId={}",
sagaId, stepName, span.context().traceId());
return operation.call();
} catch (Exception e) {
span.tag("error", e.getMessage());
throw e;
} finally {
span.end();
}
}
}MDC Context for Correlated Logging
// SagaMDCFilter.java
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class SagaMDCFilter implements Filter {
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
String sagaId = httpRequest.getHeader("X-Saga-Id");
String traceId = httpRequest.getHeader("X-Trace-Id");
String orderId = httpRequest.getHeader("X-Order-Id");
if (sagaId != null) MDC.put("sagaId", sagaId);
if (traceId != null) MDC.put("traceId", traceId);
if (orderId != null) MDC.put("orderId", orderId);
try {
chain.doFilter(request, response);
} finally {
MDC.remove("sagaId");
MDC.remove("traceId");
MDC.remove("orderId");
}
}
}12. Structured Logging for SAGA Events
// SagaLogger.java
@Component
@Slf4j
public class SagaLogger {
/**
* Structured log event for saga lifecycle tracking.
* Use a log aggregator (CloudWatch Insights, Datadog) to query these.
*/
public void logSagaEvent(String eventType, String sagaId, String orderId,
String step, String status, String details) {
// Structured JSON log that can be queried in CloudWatch Insights
log.info(
"SAGA_EVENT type={} sagaId={} orderId={} step={} status={} details=\"{}\"",
eventType, sagaId, orderId, step, status, details
);
}
public void logSagaStarted(String sagaId, String orderId, String customerId) {
logSagaEvent("SAGA_STARTED", sagaId, orderId, "N/A", "STARTED",
"customerId=" + customerId);
}
public void logStepStarted(String sagaId, String orderId, String step) {
logSagaEvent("STEP_STARTED", sagaId, orderId, step, "EXECUTING", "");
}
public void logStepCompleted(String sagaId, String orderId, String step, long durationMs) {
logSagaEvent("STEP_COMPLETED", sagaId, orderId, step, "SUCCESS",
"durationMs=" + durationMs);
}
public void logStepFailed(String sagaId, String orderId, String step, String error) {
logSagaEvent("STEP_FAILED", sagaId, orderId, step, "FAILED", "error=" + error);
}
public void logCompensationStarted(String sagaId, String orderId, String reason) {
logSagaEvent("COMPENSATION_STARTED", sagaId, orderId, "N/A", "COMPENSATING",
"reason=" + reason);
}
public void logSagaCompleted(String sagaId, String orderId, long totalDurationMs) {
logSagaEvent("SAGA_COMPLETED", sagaId, orderId, "N/A", "COMPLETED",
"totalDurationMs=" + totalDurationMs);
}
public void logSagaCompensated(String sagaId, String orderId, String originalFailure) {
logSagaEvent("SAGA_COMPENSATED", sagaId, orderId, "N/A", "COMPENSATED",
"originalFailure=" + originalFailure);
}
}13. Complete MySQL Schema Reference
-- ==========================================================<mark class="obsidian-highlight">
-- COMPLETE SAGA DATABASE SCHEMA
-- </mark>==========================================================
-- 1. OUTBOX TABLE (per service, same DB as service tables)
CREATE TABLE outbox_events (
event_id VARCHAR(36) NOT NULL,
aggregate_id VARCHAR(36) NOT NULL,
aggregate_type VARCHAR(50) NOT NULL DEFAULT 'ORDER',
topic_name VARCHAR(255) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload LONGTEXT NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
retry_count INT NOT NULL DEFAULT 0,
error_message VARCHAR(1000),
created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
published_at DATETIME(6),
PRIMARY KEY (event_id),
INDEX idx_outbox_status (status, created_at),
INDEX idx_outbox_aggregate (aggregate_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- 2. IDEMPOTENCY TABLE (per service)
CREATE TABLE processed_events (
event_id VARCHAR(36) NOT NULL,
service_id VARCHAR(50) NOT NULL,
processed_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
PRIMARY KEY (event_id, service_id),
INDEX idx_proc_events_service (service_id, processed_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- Auto-cleanup after 30 days (events older than this are irrelevant)
CREATE EVENT cleanup_processed_events
ON SCHEDULE EVERY 1 DAY
DO DELETE FROM processed_events WHERE processed_at < NOW() - INTERVAL 30 DAY;
-- 3. SAGA INSTANCES TABLE (orchestrator service only)
CREATE TABLE saga_instances (
saga_id VARCHAR(36) NOT NULL,
saga_type VARCHAR(100) NOT NULL,
order_id VARCHAR(36),
status VARCHAR(30) NOT NULL DEFAULT 'STARTED',
current_step VARCHAR(50) NOT NULL,
payload JSON,
failure_reason VARCHAR(1000),
retry_count INT NOT NULL DEFAULT 0,
created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
completed_at DATETIME(6),
version BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (saga_id),
UNIQUE INDEX idx_saga_order_id (order_id),
INDEX idx_saga_status_updated (status, updated_at),
INDEX idx_saga_type_status (saga_type, status),
INDEX idx_saga_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- 4. SAGA STEP EXECUTIONS TABLE
CREATE TABLE saga_step_executions (
execution_id VARCHAR(36) NOT NULL,
saga_id VARCHAR(36) NOT NULL,
step VARCHAR(50) NOT NULL,
result VARCHAR(20), -- SUCCESS, FAILURE, PENDING
command_payload JSON,
response_payload JSON,
error_message VARCHAR(1000),
executed_at DATETIME(6),
completed_at DATETIME(6),
PRIMARY KEY (execution_id),
INDEX idx_step_exec_saga_id (saga_id),
FOREIGN KEY fk_step_saga (saga_id) REFERENCES saga_instances(saga_id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- 5. DLT MESSAGES TABLE
CREATE TABLE dlt_messages (
message_id VARCHAR(36) NOT NULL,
original_topic VARCHAR(255) NOT NULL,
dlt_topic VARCHAR(255) NOT NULL,
message_key VARCHAR(255),
payload LONGTEXT NOT NULL,
error_message VARCHAR(2000),
stack_trace TEXT,
original_offset BIGINT,
received_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
status VARCHAR(30) NOT NULL DEFAULT 'PENDING_REVIEW',
replayed_at DATETIME(6),
resolved_by VARCHAR(100),
notes TEXT,
PRIMARY KEY (message_id),
INDEX idx_dlt_status (status),
INDEX idx_dlt_topic (original_topic),
INDEX idx_dlt_received_at (received_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- 6. USEFUL MONITORING QUERIES
-- Find active sagas
SELECT saga_id, order_id, status, current_step, created_at, updated_at
FROM saga_instances
WHERE status IN ('STARTED', 'IN_PROGRESS', 'COMPENSATING')
ORDER BY created_at ASC;
-- Find stuck sagas (not updated in 30 minutes)
SELECT saga_id, order_id, status, current_step, updated_at,
TIMESTAMPDIFF(MINUTE, updated_at, NOW()) AS minutes_stuck
FROM saga_instances
WHERE status IN ('IN_PROGRESS', 'COMPENSATING')
AND updated_at < NOW() - INTERVAL 30 MINUTE
ORDER BY updated_at ASC;
-- Saga success rate over last 24 hours
SELECT
saga_type,
COUNT(*) AS total,
SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) AS completed,
SUM(CASE WHEN status = 'COMPENSATED' THEN 1 ELSE 0 END) AS compensated,
SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed,
ROUND(100.0 * SUM(CASE WHEN status = 'COMPLETED' THEN 1 ELSE 0 END) / COUNT(*), 2) AS success_rate
FROM saga_instances
WHERE created_at >= NOW() - INTERVAL 24 HOUR
GROUP BY saga_type;
-- Average saga duration
SELECT
saga_type,
AVG(TIMESTAMPDIFF(SECOND, created_at, completed_at)) AS avg_duration_seconds,
MAX(TIMESTAMPDIFF(SECOND, created_at, completed_at)) AS max_duration_seconds,
MIN(TIMESTAMPDIFF(SECOND, created_at, completed_at)) AS min_duration_seconds
FROM saga_instances
WHERE status = 'COMPLETED'
AND created_at >= NOW() - INTERVAL 24 HOUR
GROUP BY saga_type;
-- Most frequent failure steps
SELECT current_step, COUNT(*) AS failure_count, failure_reason
FROM saga_instances
WHERE status IN ('COMPENSATED', 'FAILED')
AND created_at >= NOW() - INTERVAL 7 DAY
GROUP BY current_step, failure_reason
ORDER BY failure_count DESC
LIMIT 20;14. Optimistic vs Pessimistic Locking in SAGAs
Pessimistic Locking
Use when multiple services might concurrently update the same record (e.g., inventory).
// Use FOR UPDATE lock - prevents concurrent reads until transaction commits
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT i FROM InventoryItem i WHERE i.productId = :productId")
Optional<InventoryItem> findByProductIdWithLock(@Param("productId") String productId);Use pessimistic locking for: Inventory reservations, seat bookings, ticket allocation.
Cost: Reduces concurrency, increases latency. Only hold locks for milliseconds.
Optimistic Locking
Use when concurrent updates are rare but possible (e.g., order status updates).
@Entity
public class Order {
@Version
private Long version; // JPA manages this automatically
// If two threads update the same order concurrently,
// one will get OptimisticLockingFailureException
// This prevents lost updates without holding locks
}Use optimistic locking for: Order status updates, saga state transitions.
Benefit: No database locks held, high concurrency.
Cost: Must handle retry logic on OptimisticLockingFailureException.
15. Semantic Locking
Semantic locking is an application-level technique to prevent dirty reads of saga state.
// Add a status field that indicates "saga in progress"
// Other services check this before reading data
@Entity
public class Order {
private OrderStatus status; // PENDING, CONFIRMED, etc.
private boolean sagaInProgress; // semantic lock flag
private String currentSagaStep; // which step is running
}// Customer-facing API checks semantic lock
@GetMapping("/orders/{orderId}")
public OrderResponse getOrder(@PathVariable String orderId) {
Order order = orderRepository.findById(orderId).orElseThrow();
if (order.isSagaInProgress()) {
// Return a "processing" response, not the partial state
return OrderResponse.processing(orderId, order.getCurrentSagaStep());
}
return OrderResponse.from(order);
}16. Production Health Checks and Metrics
// SagaHealthController.java - Admin endpoint for ops team
@RestController
@RequestMapping("/admin/saga")
@RequiredArgsConstructor
public class SagaAdminController {
private final SagaInstanceRepository sagaRepository;
private final OutboxEventRepository outboxRepository;
@GetMapping("/health")
public ResponseEntity<SagaHealthReport> getSagaHealth() {
SagaHealthReport report = SagaHealthReport.builder()
.activeSagas(sagaRepository.countByStatusSince(SagaStatus.IN_PROGRESS,
Instant.now().minus(1, ChronoUnit.HOURS)))
.stuckSagas(sagaRepository.findStuckSagas(
Instant.now().minus(30, ChronoUnit.MINUTES)).size())
.failedSagas(sagaRepository.countByStatusSince(SagaStatus.FAILED,
Instant.now().minus(24, ChronoUnit.HOURS)))
.pendingOutboxEvents(outboxRepository.countStuckPendingEvents(
Instant.now().minus(5, ChronoUnit.MINUTES)))
.timestamp(Instant.now())
.build();
boolean healthy = report.stuckSagas() == 0 &&
report.failedSagas() < 10 &&
report.pendingOutboxEvents() < 100;
return ResponseEntity
.status(healthy ? HttpStatus.OK : HttpStatus.SERVICE_UNAVAILABLE)
.body(report);
}
@PostMapping("/{sagaId}/replay")
public ResponseEntity<Void> replaySaga(@PathVariable String sagaId) {
// Admin endpoint to manually replay a failed/stuck saga
// ... implementation
return ResponseEntity.accepted().build();
}
}17. Configuration Management
# Complete production application.yml
spring:
application:
name: order-service
# Database Configuration
datasource:
url: jdbc:mysql://${DB_HOST}:3306/${DB_NAME}?useSSL=true&requireSSL=true&serverTimezone=UTC
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
hikari:
pool-name: ${spring.application.name}-pool
minimum-idle: 5
maximum-pool-size: 20
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
leak-detection-threshold: 60000 # Warn if connection held > 60s
jpa:
hibernate:
ddl-auto: validate
open-in-view: false # MUST be false in production
properties:
hibernate:
dialect: org.hibernate.dialect.MySQLDialect
format_sql: false
generate_statistics: false
jdbc:
batch_size: 20
order_inserts: true
order_updates: true
cache:
use_second_level_cache: false # Disable L2 cache for saga entities
# Kafka Configuration
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
consumer:
group-id: ${spring.application.name}-group
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 10
max-poll-interval-ms: 300000
session-timeout-ms: 30000
heartbeat-interval-ms: 10000
properties:
isolation.level: read_committed # Only read committed messages (transactional producers)
producer:
acks: all
retries: 5
batch-size: 16384
linger-ms: 5
compression-type: lz4
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 1
transaction.timeout.ms: 60000
# Outbox Configuration
outbox:
publisher:
enabled: true
delay-ms: 500
retry-delay-ms: 30000
batch-size: 50
max-retries: 5
# Saga Configuration
saga:
timeout:
order-saga-minutes: 30
refund-saga-minutes: 60
recovery:
enabled: true
check-interval-minutes: 5
stuck-threshold-minutes: 30
# Resilience4j
resilience4j:
circuitbreaker:
instances:
paymentGateway:
sliding-window-size: 20
failure-rate-threshold: 50
wait-duration-in-open-state: 30s
shippingCarrier:
sliding-window-size: 10
failure-rate-threshold: 60
wait-duration-in-open-state: 60s
# Actuator
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus,circuitbreakers,retries
endpoint:
health:
show-details: always
probes:
enabled: true # /health/liveness and /health/readiness for k8s
metrics:
export:
cloudwatch:
namespace: SagaMetrics
step: 1m
tags:
service: ${spring.application.name}
environment: ${ENVIRONMENT:dev}
region: ${AWS_REGION:us-east-1}18. Summary
| Pattern | Purpose | Key Rule |
|---|---|---|
| Transactional Outbox | Solve dual-write problem | Write to outbox in same DB transaction as business data |
| Debezium CDC | Low-latency outbox publishing | Reads MySQL binlog, no polling overhead |
| Idempotency | Handle duplicate events safely | Every handler must check if already processed |
| Unique Constraint | Database-level idempotency | Add UNIQUE INDEX on natural key per aggregate |
| Exponential Backoff | Smart retry for transient failures | Double delay each retry, add jitter |
| Dead Letter Queue | Handle permanently failed events | Alert team, save for manual review/replay |
| Circuit Breaker | Stop cascading failures | Open circuit when failure rate too high |
| Saga Timeout | Prevent eternally-running sagas | Check and compensate sagas exceeding deadline |
| Semantic Locking | Prevent dirty reads during saga | Status flag: sagaInProgress=true while active |
| Structured Logging | Operational visibility | sagaId, orderId in every log line as MDC keys |
| Distributed Tracing | End-to-end saga visibility | Use sagaId as trace ID across all services |
Next: Part 5 - Advanced Patterns
Explore SAGA combined with CQRS, Event Sourcing, parallel saga steps, sub-sagas,
and large-scale AWS architecture patterns.
Series Navigation: Index |
Part 1 |
Part 2 |
Part 3 | Part 4 |
Part 5 |
Part 6 |
Part 7