Part 5: Advanced SAGA Patterns
Series Navigation: Index |
Part 1 |
Part 2 |
Part 3 |
Part 4 | Part 5 |
Part 6 - Pitfalls |
Part 7 - Interview
Table of Contents
- SAGA + CQRS: The Perfect Combination
- SAGA + Event Sourcing
- Parallel SAGA Steps
- Sub-SAGAs and Nested SAGAs
- Long-Running SAGAs
- SAGA with SQS FIFO (AWS)
- SAGA with DynamoDB State Store (AWS)
- High-Throughput SAGA Design
- Multi-Region SAGA Architecture
- SAGA Observability Dashboard
- SAGA with Temporal Workflow Engine
- Real-World Architecture Examples
- Summary
1. SAGA + CQRS: The Perfect Combination
CQRS (Command Query Responsibility Segregation) and SAGA are complementary patterns.
Why They Work Together
SAGA updates WRITE models (commands). CQRS reads from READ models (queries).
Together they solve:
- Isolation problem: Read models only show COMPLETED saga state, hiding intermediate states
- Performance: Reads from optimized read models, not the saga state table
- Eventual consistency: Read models are updated only when sagas complete
SAGA WRITE SIDE (Commands)
|
Customer ---> API ---> Order SAGA ---> Updates:
orders table
payments table
inventory table
saga_instances table
|
(When saga COMPLETES, publish domain event)
|
v
EVENT BUS (Kafka)
|
CQRS READ SIDE (Projections)
|
Updates read models:
order_summary_view
customer_order_history
inventory_report_view
Customers QUERY read models (not saga tables)
Implementation: Read Model Projection
// OrderProjectionService.java
// Updates the CQRS read model when saga-related events are received
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderProjectionService {
private final OrderSummaryRepository summaryRepository;
private final CustomerOrderHistoryRepository historyRepository;
/**
* Update read model when order is CONFIRMED (saga completed successfully).
* Only update read model on saga completion - never on intermediate states.
*/
@KafkaListener(topics = "order.events.order-confirmed", groupId = "projection-service-group")
@Transactional
public void onOrderConfirmed(OrderConfirmedEvent event, Acknowledgment ack) {
log.info("Updating read model for confirmed order: orderId={}", event.orderId());
// Upsert (create or update) the read model
OrderSummary summary = summaryRepository.findByOrderId(event.orderId())
.orElse(new OrderSummary());
summary.setOrderId(event.orderId());
summary.setCustomerId(event.customerId());
summary.setStatus("CONFIRMED");
summary.setTotalAmount(event.totalAmount());
summary.setShipmentId(event.shipmentId());
summary.setTrackingNumber(event.trackingNumber());
summary.setEstimatedDelivery(event.estimatedDelivery());
summary.setLastUpdated(Instant.now());
summaryRepository.save(summary);
// Update customer order history
updateCustomerHistory(event);
ack.acknowledge();
}
/**
* Update read model when order is CANCELLED (saga compensated).
*/
@KafkaListener(topics = "order.events.order-cancelled", groupId = "projection-service-group")
@Transactional
public void onOrderCancelled(OrderCancelledEvent event, Acknowledgment ack) {
log.info("Updating read model for cancelled order: orderId={}", event.orderId());
summaryRepository.findByOrderId(event.orderId()).ifPresent(summary -> {
summary.setStatus("CANCELLED");
summary.setFailureReason(event.reason());
summary.setLastUpdated(Instant.now());
summaryRepository.save(summary);
});
ack.acknowledge();
}
private void updateCustomerHistory(OrderConfirmedEvent event) {
CustomerOrderHistoryEntry entry = CustomerOrderHistoryEntry.builder()
.customerId(event.customerId())
.orderId(event.orderId())
.amount(event.totalAmount())
.status("CONFIRMED")
.orderDate(event.confirmedAt())
.build();
historyRepository.save(entry);
}
}Read Model Entity (Optimized for Queries)
// OrderSummary.java - denormalized for fast reads
@Entity
@Table(name = "order_summary_view", indexes = {
@Index(name = "idx_summary_customer", columnList = "customer_id, last_updated"),
@Index(name = "idx_summary_status", columnList = "status"),
@Index(name = "idx_summary_tracking", columnList = "tracking_number")
})
@Getter
@Setter
public class OrderSummary {
@Id
private String orderId;
private String customerId;
private String status;
private BigDecimal totalAmount;
private String shipmentId;
private String trackingNumber;
private String estimatedDelivery;
private String failureReason;
// Denormalized customer info (no joins needed)
private String customerName;
private String customerEmail;
// Denormalized product info
@Column(columnDefinition = "JSON")
private String itemsSummary; // JSON array of items
private Instant lastUpdated;
}CQRS Query Controller
// OrderQueryController.java
@RestController
@RequestMapping("/api/v1/orders")
@RequiredArgsConstructor
public class OrderQueryController {
private final OrderSummaryRepository summaryRepository;
// Query the read model - fast, no joins across service databases
@GetMapping("/{orderId}")
public ResponseEntity<OrderSummary> getOrder(@PathVariable String orderId) {
return summaryRepository.findByOrderId(orderId)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
@GetMapping("/customers/{customerId}")
public ResponseEntity<Page<OrderSummary>> getCustomerOrders(
@PathVariable String customerId,
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size) {
Page<OrderSummary> orders = summaryRepository
.findByCustomerIdOrderByLastUpdatedDesc(customerId, PageRequest.of(page, size));
return ResponseEntity.ok(orders);
}
}The Key Rule for SAGA + CQRS
Never update read models with intermediate saga state.
Only update read models when the saga reaches a terminal state (COMPLETED or COMPENSATED).
If you update the read model at each saga step, customers will see inconsistent data.
Wait for the saga to complete, then project the final state.
2. SAGA + Event Sourcing
Event Sourcing stores the complete history of state changes as events instead of current state.
Why SAGA Fits Naturally with Event Sourcing
Event Sourcing has:
- Immutable event log
- Replay capability
- Full audit trail
SAGA needs:
- Full audit trail of all steps
- Ability to replay failed sagas
- Understanding of what happened at each step
They complement each other perfectly.
TRADITIONAL: EVENT SOURCING:
orders table order_events table
+--------+----------+ +-----------+-------------------+----------+
| id | status | | order_id | event_type | payload |
+--------+----------+ +-----------+-------------------+----------+
| ORD-01 | CONFIRMED| | ORD-01 | OrderPlaced | {...} |
+--------+----------+ | ORD-01 | PaymentCharged | {...} |
| ORD-01 | InventoryReserved | {...} |
| ORD-01 | OrderConfirmed | {...} |
+-----------+-------------------+----------+
To get current state: replay all events for ORD-01
Event Store Implementation
// EventStore.java
@Service
@RequiredArgsConstructor
@Slf4j
public class EventStore {
private final DomainEventRepository domainEventRepository;
private final OutboxEventRepository outboxEventRepository;
private final ObjectMapper objectMapper;
/**
* Append events to the event store.
* Also writes to outbox for async publishing (same transaction).
*/
@Transactional
public void appendEvents(String aggregateId, String aggregateType,
long expectedVersion, List<DomainEvent> events) {
for (int i = 0; i < events.size(); i++) {
DomainEvent event = events.get(i);
long eventVersion = expectedVersion + i + 1;
StoredEvent storedEvent = StoredEvent.builder()
.eventId(event.getEventId())
.aggregateId(aggregateId)
.aggregateType(aggregateType)
.eventType(event.getClass().getSimpleName())
.version(eventVersion)
.payload(serialize(event))
.occurredAt(Instant.now())
.build();
try {
domainEventRepository.save(storedEvent);
} catch (DataIntegrityViolationException e) {
// Version conflict - optimistic concurrency violation
throw new ConcurrencyConflictException(
"Version conflict for aggregate " + aggregateId +
": expected " + eventVersion
);
}
// Write to outbox (publish asynchronously)
writeToOutbox(aggregateId, event);
}
}
/**
* Load all events for an aggregate (for replay).
*/
public List<StoredEvent> loadEvents(String aggregateId) {
return domainEventRepository.findByAggregateIdOrderByVersionAsc(aggregateId);
}
/**
* Load events after a specific version (for catch-up projections).
*/
public List<StoredEvent> loadEventsSince(String aggregateId, long sinceVersion) {
return domainEventRepository
.findByAggregateIdAndVersionGreaterThanOrderByVersionAsc(aggregateId, sinceVersion);
}
private void writeToOutbox(String aggregateId, DomainEvent event) {
OutboxEvent outbox = OutboxEvent.builder()
.eventId(UUID.randomUUID().toString())
.aggregateId(aggregateId)
.topicName(event.getTopicName())
.eventType(event.getClass().getSimpleName())
.payload(serialize(event))
.status(OutboxEvent.OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxEventRepository.save(outbox);
}
private String serialize(Object obj) {
try {
return objectMapper.writeValueAsString(obj);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}// OrderAggregate.java - Event Sourced aggregate
public class OrderAggregate {
private String orderId;
private String customerId;
private OrderStatus status;
private BigDecimal totalAmount;
private long version = 0;
private List<DomainEvent> pendingEvents = new ArrayList<>();
// Factory method - creates new order
public static OrderAggregate create(String orderId, String customerId,
BigDecimal totalAmount) {
OrderAggregate aggregate = new OrderAggregate();
aggregate.apply(new OrderPlacedEvent(orderId, customerId, totalAmount));
return aggregate;
}
// Load from event history (replay)
public static OrderAggregate reconstitute(List<StoredEvent> events,
ObjectMapper mapper) {
OrderAggregate aggregate = new OrderAggregate();
for (StoredEvent event : events) {
aggregate.applyStored(event, mapper);
}
return aggregate;
}
public void confirm(String shipmentId) {
if (this.status != OrderStatus.PAYMENT_PROCESSED) {
throw new IllegalStateException("Cannot confirm order in status: " + status);
}
apply(new OrderConfirmedEvent(this.orderId, shipmentId));
}
public void cancel(String reason) {
if (this.status <mark class="obsidian-highlight"> OrderStatus.CONFIRMED || this.status </mark> OrderStatus.CANCELLED) {
throw new IllegalStateException("Cannot cancel order in status: " + status);
}
apply(new OrderCancelledEvent(this.orderId, reason));
}
// Event application - pure state change, no business logic
private void apply(DomainEvent event) {
handle(event);
pendingEvents.add(event);
version++;
}
private void handle(DomainEvent event) {
switch (event) {
case OrderPlacedEvent e -> {
this.orderId = e.orderId();
this.customerId = e.customerId();
this.totalAmount = e.totalAmount();
this.status = OrderStatus.PENDING;
}
case OrderConfirmedEvent e -> this.status = OrderStatus.CONFIRMED;
case OrderCancelledEvent e -> this.status = OrderStatus.CANCELLED;
default -> throw new IllegalArgumentException("Unknown event: " + event.getClass());
}
}
private void applyStored(StoredEvent stored, ObjectMapper mapper) {
DomainEvent event = deserializeEvent(stored, mapper);
handle(event);
version = stored.getVersion();
}
public List<DomainEvent> getAndClearPendingEvents() {
List<DomainEvent> events = new ArrayList<>(pendingEvents);
pendingEvents.clear();
return events;
}
// Getters
public String getOrderId() { return orderId; }
public OrderStatus getStatus() { return status; }
public long getVersion() { return version; }
}3. Parallel SAGA Steps
Some saga steps can execute concurrently to improve throughput and reduce latency.
Scenario: Parallel Notification and Shipping
After inventory is reserved, we need to:
- Create the shipment (must succeed before confirming)
- Send email notification to customer (fire and forget)
- Update analytics (fire and forget)
The shipment creation blocks the saga. The others can run in parallel.
SERIAL (slow):
Reserve Inventory -> Create Shipment -> Send Email -> Update Analytics -> Confirm Order
| | | |
100ms 200ms 50ms 30ms = 380ms total
PARALLEL (fast):
+---> Create Shipment (200ms) -----+
Reserve Inventory --+----| +--> Confirm Order
| +---> Send Email (50ms, async) |
| +---> Update Analytics (30ms, async)+
Total = 100ms + 200ms + Confirm = 300ms (21% faster)
Fire-and-forget steps don't block saga progression
Implementation with AWS Step Functions Parallel State
"PostInventoryActions": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "CreateShipment",
"States": {
"CreateShipment": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "${ShippingServiceQueueUrl}",
"MessageBody": {
"taskToken.$": "$$.Task.Token",
"orderId.$": "$.orderId"
}
},
"Retry": [{"ErrorEquals": ["States.ALL"], "MaxAttempts": 5}],
"End": true
}
}
},
{
"StartAt": "SendConfirmationEmail",
"States": {
"SendConfirmationEmail": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"FunctionName": "${EmailLambdaArn}",
"Payload": {
"orderId.$": "$.orderId",
"customerId.$": "$.customerId"
}
},
"Retry": [{"ErrorEquals": ["States.ALL"], "MaxAttempts": 3}],
"Catch": [{"ErrorEquals": ["States.ALL"], "Next": "EmailFailedIgnore"}],
"End": true
},
"EmailFailedIgnore": {
"Type": "Pass",
"End": true
}
}
}
],
"Next": "ConfirmOrder"
}Implementation with Custom Orchestrator (Spring)
// Parallel step execution using CompletableFuture
@Service
@RequiredArgsConstructor
@Slf4j
public class ParallelSagaStepExecutor {
private final ShippingCommandSender shippingCommandSender;
private final NotificationCommandSender notificationCommandSender;
private final AnalyticsCommandSender analyticsCommandSender;
@Async("sagaParallelExecutor")
public CompletableFuture<ShipmentResult> createShipment(String sagaId, String orderId) {
return CompletableFuture.supplyAsync(() -> {
log.info("PARALLEL: Starting shipment creation for orderId={}", orderId);
return shippingCommandSender.sendAndWait(new CreateShipmentCommand(sagaId, orderId));
});
}
@Async("sagaParallelExecutor")
public CompletableFuture<Void> sendNotification(String orderId, String customerId) {
return CompletableFuture.runAsync(() -> {
try {
notificationCommandSender.send(new SendConfirmationEmailCommand(orderId, customerId));
} catch (Exception e) {
log.warn("Non-critical: notification failed for orderId={}", orderId, e);
// Fire and forget - do not fail saga
}
});
}
/**
* Execute critical and non-critical steps in parallel.
* Critical step (shipment) must succeed. Others are best-effort.
*/
public ShipmentResult executePostInventorySteps(String sagaId, String orderId, String customerId) {
// Start all steps in parallel
CompletableFuture<ShipmentResult> shipmentFuture = createShipment(sagaId, orderId);
CompletableFuture<Void> emailFuture = sendNotification(orderId, customerId);
// Wait for CRITICAL step only (shipment)
ShipmentResult result;
try {
result = shipmentFuture.get(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new ShipmentTimeoutException("Shipment creation timed out for orderId=" + orderId);
} catch (Exception e) {
throw new ShipmentException("Shipment creation failed for orderId=" + orderId, e);
}
// Best-effort wait for non-critical steps (brief timeout)
try {
emailFuture.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("Non-critical email notification may have failed, continuing saga");
}
return result;
}
}// Thread pool configuration for parallel steps
@Configuration
public class AsyncConfig {
@Bean("sagaParallelExecutor")
public Executor sagaParallelExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("saga-parallel-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}4. Sub-SAGAs and Nested SAGAs
Complex business processes can be decomposed into a main SAGA that delegates to sub-SAGAs.
When to Use Sub-SAGAs
- Reusable business processes (payment processing is used in many sagas)
- Complex processes within a single service domain
- Independent failure handling at different levels
MAIN SAGA: Order Fulfilment
|
+--> Sub-SAGA 1: Payment Processing
| - Credit check
| - Charge card
| - Issue receipt
|
+--> Sub-SAGA 2: Inventory Fulfilment
| - Find warehouse
| - Reserve items
| - Print pick list
|
+--> Sub-SAGA 3: Delivery Scheduling
- Select carrier
- Book time slot
- Generate tracking
Implementation Pattern
// SubSagaCoordinator.java
@Service
@RequiredArgsConstructor
@Slf4j
public class SubSagaCoordinator {
private final SagaInstanceRepository sagaRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
/**
* Start a sub-saga from within a parent saga.
* Links the sub-saga to the parent for correlation.
*/
@Transactional
public SagaInstance startSubSaga(String parentSagaId, String subSagaType,
String aggregateId, String subSagaPayload) {
SagaInstance subSaga = SagaInstance.builder()
.sagaId(UUID.randomUUID().toString())
.sagaType(subSagaType)
.orderId(aggregateId)
.status(SagaStatus.STARTED)
.currentStep(SagaStep.valueOf(subSagaType + "_INIT"))
.payload(subSagaPayload)
.build();
// Store parent-child relationship
subSaga.setParentSagaId(parentSagaId);
sagaRepository.save(subSaga);
// Publish sub-saga start event
kafkaTemplate.send("saga.sub-saga.started",
aggregateId,
new SubSagaStartedEvent(subSaga.getSagaId(), parentSagaId, subSagaType)
);
return subSaga;
}
/**
* Called when sub-saga completes. Notifies the parent saga.
*/
@Transactional
public void onSubSagaCompleted(String subSagaId, boolean success, String result) {
SagaInstance subSaga = sagaRepository.findById(subSagaId).orElseThrow();
String parentSagaId = subSaga.getParentSagaId();
if (parentSagaId != null) {
// Notify parent saga of sub-saga completion
kafkaTemplate.send("saga.replies",
parentSagaId,
SagaReplyEnvelope.success(parentSagaId,
subSaga.getSagaType() + "_COMPLETED", result)
);
}
}
}5. Long-Running SAGAs
Some business processes take hours, days, or even weeks. These require special consideration.
Characteristics of Long-Running SAGAs
- Insurance claim processing: days to weeks
- Manufacturing order: hours to days
- International shipping clearance: days
- Background check for hiring: days
Key Design Considerations
// LongRunningSagaManager.java
@Service
@RequiredArgsConstructor
@Slf4j
public class LongRunningSagaManager {
private final SagaInstanceRepository sagaRepository;
private final SagaCheckpointRepository checkpointRepository;
/**
* Save a checkpoint so long-running sagas can resume after restarts.
* Called at each completed step.
*/
@Transactional
public void saveCheckpoint(String sagaId, String completedStep, String stepResult) {
SagaCheckpoint checkpoint = SagaCheckpoint.builder()
.checkpointId(UUID.randomUUID().toString())
.sagaId(sagaId)
.step(completedStep)
.result(stepResult)
.savedAt(Instant.now())
.build();
checkpointRepository.save(checkpoint);
log.info("Checkpoint saved: sagaId={}, step={}", sagaId, completedStep);
}
/**
* Resume a long-running saga from its last checkpoint.
* Called on service restart or manual recovery.
*/
@Transactional
public void resumeSagaFromCheckpoint(String sagaId) {
SagaInstance saga = sagaRepository.findById(sagaId).orElseThrow();
List<SagaCheckpoint> checkpoints = checkpointRepository
.findBySagaIdOrderBySavedAtAsc(sagaId);
if (checkpoints.isEmpty()) {
log.info("No checkpoints found, resuming from current step: {}", saga.getCurrentStep());
} else {
SagaCheckpoint lastCheckpoint = checkpoints.get(checkpoints.size() - 1);
log.info("Resuming saga {} from checkpoint at step: {}",
sagaId, lastCheckpoint.getStep());
}
// Re-send current step's command
// ... implementation
}
/**
* Handle waiting steps (e.g., waiting for human approval).
* These sagas are in WAITING state and do not time out in the usual way.
*/
@Transactional
public void waitForExternalEvent(String sagaId, String waitingFor, Duration maxWait) {
SagaInstance saga = sagaRepository.findById(sagaId).orElseThrow();
saga.setStatus(SagaStatus.WAITING);
saga.setWaitingFor(waitingFor);
saga.setWaitDeadline(Instant.now().plus(maxWait));
sagaRepository.save(saga);
log.info("Saga {} is waiting for: {}, deadline: {}",
sagaId, waitingFor, saga.getWaitDeadline());
}
}6. SAGA with SQS FIFO (AWS)
SQS FIFO guarantees exactly-once processing and message ordering within a message group.
Ideal for sagas where order and no-duplicates are critical.
// SqsFifoSagaPublisher.java
@Service
@RequiredArgsConstructor
@Slf4j
public class SqsFifoSagaPublisher {
private final SqsClient sqsClient;
private final ObjectMapper objectMapper;
@Value("${aws.sqs.saga-events-fifo-url}")
private String sagaEventsFifoUrl;
/**
* Publish saga event to SQS FIFO.
* - MessageGroupId: orderId (ensures ordering per order)
* - MessageDeduplicationId: eventId (prevents duplicates)
*/
public void publishSagaEvent(String orderId, String eventId, Object event) {
try {
String messageBody = objectMapper.writeValueAsString(event);
SendMessageRequest request = SendMessageRequest.builder()
.queueUrl(sagaEventsFifoUrl)
.messageBody(messageBody)
.messageGroupId(orderId) // All events for same order are ordered
.messageDeduplicationId(eventId) // Content-based deduplication
.messageAttributes(Map.of(
"eventType",
MessageAttributeValue.builder()
.dataType("String")
.stringValue(event.getClass().getSimpleName())
.build()
))
.build();
SendMessageResponse response = sqsClient.sendMessage(request);
log.info("Published to FIFO: messageId={}, orderId={}, eventId={}",
response.messageId(), orderId, eventId);
} catch (Exception e) {
log.error("Failed to publish to SQS FIFO: orderId={}", orderId, e);
throw new EventPublicationException("SQS FIFO publish failed", e);
}
}
}SQS FIFO Queue Configuration (AWS CDK / CloudFormation)
# CloudFormation template for SQS FIFO queues
Resources:
SagaEventsFifoQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: saga-events.fifo
FifoQueue: true
ContentBasedDeduplication: false # Explicit deduplication IDs
DeduplicationScope: messageGroup # Dedup per message group (orderId)
FifoThroughputLimit: perMessageGroupId
VisibilityTimeout: 60 # 60 seconds to process
MessageRetentionPeriod: 345600 # 4 days
RedrivePolicy:
deadLetterTargetArn: !GetAtt SagaEventsDLQ.Arn
maxReceiveCount: 5
SagaEventsDLQ:
Type: AWS::SQS::Queue
Properties:
QueueName: saga-events-dlq.fifo
FifoQueue: true
MessageRetentionPeriod: 1209600 # 14 days for DLQ messages7. SAGA with DynamoDB State Store (AWS)
DynamoDB is an excellent choice for saga state when:
- You need single-digit millisecond latency for state reads
- You want a serverless, fully managed state store
- Your saga volume is high (millions per day)
// DynamoDbSagaStateStore.java
@Service
@RequiredArgsConstructor
@Slf4j
public class DynamoDbSagaStateStore {
private final DynamoDbClient dynamoDbClient;
private final ObjectMapper objectMapper;
@Value("${aws.dynamodb.saga-table}")
private String sagaTableName;
/**
* Save saga state to DynamoDB with conditional write (optimistic locking).
*/
public void saveSagaState(SagaStateDto sagaState) {
Map<String, AttributeValue> item = new HashMap<>();
item.put("PK", AttributeValue.builder().s("SAGA#" + sagaState.sagaId()).build());
item.put("SK", AttributeValue.builder().s("STATE").build());
item.put("sagaId", AttributeValue.builder().s(sagaState.sagaId()).build());
item.put("orderId", AttributeValue.builder().s(sagaState.orderId()).build());
item.put("status", AttributeValue.builder().s(sagaState.status().name()).build());
item.put("currentStep", AttributeValue.builder().s(sagaState.currentStep().name()).build());
item.put("version", AttributeValue.builder().n(String.valueOf(sagaState.version())).build());
item.put("updatedAt", AttributeValue.builder().s(Instant.now().toString()).build());
item.put("TTL", AttributeValue.builder()
.n(String.valueOf(Instant.now().plusSeconds(86400 * 30).getEpochSecond())) // 30 day TTL
.build());
try {
PutItemRequest request = PutItemRequest.builder()
.tableName(sagaTableName)
.item(item)
// Optimistic locking: only update if version matches
.conditionExpression("attribute_not_exists(PK) OR version = :expectedVersion")
.expressionAttributeValues(Map.of(
":expectedVersion",
AttributeValue.builder().n(String.valueOf(sagaState.version() - 1)).build()
))
.build();
dynamoDbClient.putItem(request);
log.debug("Saga state saved to DynamoDB: sagaId={}", sagaState.sagaId());
} catch (ConditionalCheckFailedException e) {
throw new ConcurrencyConflictException(
"Concurrent saga state update detected for sagaId=" + sagaState.sagaId()
);
}
}
public Optional<SagaStateDto> loadSagaState(String sagaId) {
GetItemRequest request = GetItemRequest.builder()
.tableName(sagaTableName)
.key(Map.of(
"PK", AttributeValue.builder().s("SAGA#" + sagaId).build(),
"SK", AttributeValue.builder().s("STATE").build()
))
.build();
GetItemResponse response = dynamoDbClient.getItem(request);
if (!response.hasItem()) return Optional.empty();
Map<String, AttributeValue> item = response.item();
return Optional.of(new SagaStateDto(
item.get("sagaId").s(),
item.get("orderId").s(),
SagaStatus.valueOf(item.get("status").s()),
SagaStep.valueOf(item.get("currentStep").s()),
Long.parseLong(item.get("version").n())
));
}
}DynamoDB Table Design for SAGAs
DynamoDB Table: saga-state
Partition Key (PK): SAGA#{sagaId}
Sort Key (SK): STATE | STEP#{stepName} | EVENT#{eventId}
Access patterns:
- Get saga state: PK=SAGA#123, SK=STATE
- Get all saga steps: PK=SAGA#123, SK begins_with STEP#
- Get step execution: PK=SAGA#123, SK=STEP#PROCESS_PAYMENT
- Get all events: PK=SAGA#123, SK begins_with EVENT#
GSI 1 (OrderIndex):
PK: orderId
SK: createdAt
For: "Get saga for order ID"
GSI 2 (StatusIndex):
PK: status
SK: updatedAt
For: "Find all in-progress sagas" (monitoring)
8. High-Throughput SAGA Design
For systems processing tens of thousands of sagas per second:
Key Optimizations
1. Partitioning
- Use orderId as Kafka/SQS partition key
- Ensures ordering per order, parallelism across orders
- Target: N partitions = N parallel saga executions
2. Sharding the Orchestrator
- Multiple orchestrator instances
- Each handles a partition range
- No cross-instance coordination needed
3. Async Everywhere
- Never block in event handlers
- Use non-blocking Kafka consumers (reactive)
- Fire-and-forget for non-critical steps
4. Read-Model Separation
- Never read business data from saga state table
- All reads from denormalized CQRS read models
- Saga state table is write-heavy, optimize accordingly
5. MySQL Optimizations for Saga Tables
- InnoDB row-level locking
- Partition saga_instances by created_at (monthly partitions)
- Regular archival of completed sagas
Reactive Kafka Consumer for High Throughput
// ReactiveInventoryEventHandler.java
@Component
@RequiredArgsConstructor
@Slf4j
public class ReactiveInventoryEventHandler {
private final InventoryService inventoryService;
@Bean
public Consumer<Flux<Message<PaymentProcessedEvent>>> inventoryEventProcessor() {
return messages -> messages
.flatMap(message ->
Mono.fromCallable(() -> {
inventoryService.reserveInventory(message.getPayload());
return message;
})
.subscribeOn(Schedulers.boundedElastic())
.doOnError(e -> log.error("Failed to reserve inventory", e))
.onErrorResume(e -> Mono.empty()) // Retry topic handles it
)
.subscribe(message ->
message.getHeaders()
.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class)
.acknowledge()
);
}
}9. Multi-Region SAGA Architecture
For global applications, sagas may span across AWS regions.
REGION: us-east-1 (Primary) REGION: eu-west-1 (Secondary)
+-----------------------------+ +-----------------------------+
| Order Service | | Order Service (replica) |
| Payment Service |<----->| Payment Service (replica) |
| Inventory Service | | Inventory Service |
| Saga Orchestrator | | |
+-----------------------------+ +-----------------------------+
| |
MSK us-east-1 MSK eu-west-1
(Primary) (Replica via MirrorMaker)
| |
RDS MySQL us-east-1 RDS MySQL eu-west-1
(Primary writer) (Read replica)
Cross-Region Saga Considerations
// RegionAwareEventPublisher.java
@Service
@RequiredArgsConstructor
public class RegionAwareEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final EventBridgeClient eventBridgeClient;
@Value("${aws.region}")
private String currentRegion;
@Value("${aws.eventbridge.cross-region-bus-arn}")
private String crossRegionBusArn;
/**
* For cross-region events, use EventBridge for routing.
* For same-region, use Kafka.
*/
public void publish(String targetRegion, String topic, String key, Object event) {
if (targetRegion.equals(currentRegion)) {
// Same region - use Kafka
kafkaTemplate.send(topic, key, event);
} else {
// Cross-region - use EventBridge
PutEventsRequest request = PutEventsRequest.builder()
.entries(PutEventsRequestEntry.builder()
.eventBusName(crossRegionBusArn)
.source("com.example.saga")
.detailType(event.getClass().getSimpleName())
.detail(serialize(event))
.build())
.build();
eventBridgeClient.putEvents(request);
}
}
}10. SAGA Observability Dashboard
Essential metrics and queries for a production SAGA monitoring dashboard.
CloudWatch Metrics to Track
// SagaMetricsPublisher.java
@Component
@RequiredArgsConstructor
@Slf4j
public class SagaMetricsPublisher {
private final CloudWatchClient cloudWatchClient;
@Value("${aws.region}")
private String region;
@Scheduled(fixedDelay = 60000) // Every minute
public void publishSagaMetrics(SagaStats stats) {
List<MetricDatum> metrics = List.of(
MetricDatum.builder()
.metricName("ActiveSagas")
.value((double) stats.activeSagas())
.unit(StandardUnit.COUNT)
.build(),
MetricDatum.builder()
.metricName("SagaSuccessRate")
.value(stats.successRate())
.unit(StandardUnit.PERCENT)
.build(),
MetricDatum.builder()
.metricName("SagaP99Duration")
.value((double) stats.p99DurationSeconds())
.unit(StandardUnit.SECONDS)
.build(),
MetricDatum.builder()
.metricName("StuckSagas")
.value((double) stats.stuckSagas())
.unit(StandardUnit.COUNT)
.build(),
MetricDatum.builder()
.metricName("PendingOutboxEvents")
.value((double) stats.pendingOutboxEvents())
.unit(StandardUnit.COUNT)
.build()
);
cloudWatchClient.putMetricData(PutMetricDataRequest.builder()
.namespace("SagaMetrics")
.metricData(metrics)
.build());
}
}CloudWatch Insights Queries
# Query 1: Find all failed saga steps in last 24 hours
fields @timestamp, sagaId, orderId, step, errorMessage
| filter type = "STEP_FAILED"
| sort @timestamp desc
| limit 100
# Query 2: Average saga completion time per hour
stats avg(totalDurationMs) as avgDuration by bin(1h)
| filter type = "SAGA_COMPLETED"
| sort @timestamp asc
# Query 3: Top failing saga steps
stats count(*) as failureCount by step
| filter type = "STEP_FAILED"
| sort failureCount desc
| limit 10
# Query 4: Sagas that triggered compensation
fields sagaId, orderId, originalFailure, compensatedAt
| filter type = "SAGA_COMPENSATED"
| stats count(*) as compensationCount by originalFailure
| sort compensationCount desc
11. SAGA with Temporal Workflow Engine
Temporal is an open-source workflow engine that simplifies SAGA implementation significantly.
It is used by Uber, Netflix, and many other large-scale systems.
// OrderSagaWorkflow.java (Temporal)
@WorkflowInterface
public interface OrderSagaWorkflow {
@WorkflowMethod
OrderResult processOrder(OrderRequest request);
}
// OrderSagaWorkflowImpl.java
@Slf4j
public class OrderSagaWorkflowImpl implements OrderSagaWorkflow {
// Activities are the actual service calls
private final OrderActivities orderActivities = Workflow.newActivityStub(
OrderActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.setRetryOptions(RetryOptions.newBuilder()
.setMaximumAttempts(3)
.setInitialInterval(Duration.ofSeconds(1))
.build())
.build()
);
private final PaymentActivities paymentActivities = Workflow.newActivityStub(
PaymentActivities.class,
ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build()
);
@Override
public OrderResult processOrder(OrderRequest request) {
String orderId = UUID.randomUUID().toString();
String paymentId = null;
String reservationId = null;
// Temporal handles retries, persistence, and crash recovery automatically
try {
// Forward steps
orderActivities.createOrder(orderId, request);
paymentId = paymentActivities.processPayment(orderId, request.amount());
reservationId = orderActivities.reserveInventory(orderId, request.items());
String shipmentId = orderActivities.createShipment(orderId, reservationId);
orderActivities.confirmOrder(orderId, shipmentId);
return OrderResult.success(orderId, shipmentId);
} catch (PaymentDeclinedException e) {
// Compensation: order was created but payment failed
Workflow.newDetachedCancellationScope(() ->
orderActivities.cancelOrder(orderId, "Payment declined: " + e.getMessage())
).run();
return OrderResult.failed(orderId, "Payment declined");
} catch (InventoryException e) {
// Compensation: payment was taken, need to refund
Workflow.newDetachedCancellationScope(() -> {
paymentActivities.refundPayment(orderId, paymentId);
orderActivities.cancelOrder(orderId, "Out of stock");
}).run();
return OrderResult.failed(orderId, "Out of stock");
}
}
}Why Temporal Simplifies SAGAs
| Without Temporal | With Temporal |
|---|---|
| Manual state machine | Workflow code reads like sequential code |
| Custom outbox pattern | Built-in durability (workflow state persisted automatically) |
| Manual retry logic | Declarative retry policies |
| Manual timeout handling | Built-in timeouts per activity |
| Custom saga recovery | Automatic replay on worker restart |
| Custom DLQ | Built-in task queue with backoff |
12. Real-World Architecture Examples
Example 1: Uber Eats Order Flow
Customer Orders Food
|
v
Order SAGA (Choreography)
|
+---> Restaurant Service: check if restaurant accepts order
| (with timeout: 30 seconds for restaurant to accept)
|
+---> Driver Matching Service: find nearby driver
| (with retry: retry every 30s for 10 minutes)
|
+---> Payment Service: authorize card
| (compensation: release authorization if driver not found)
|
+---> Order Tracker Service: initialize tracking
Compensation: if restaurant rejects -> cancel order, refund payment
Example 2: Netflix Content Publishing
Content Team Uploads New Show
|
v
Content Publishing SAGA (Orchestration)
|
+---> Video Encoding Service: transcode to 5 quality levels
| (parallel, long-running: hours)
|
+---> DRM Service: apply content protection
|
+---> CDN Distribution Service: push to 190+ edge locations
|
+---> Metadata Service: update catalog
|
+---> Recommendation Service: train model on new content
Compensation: if any step fails, mark content as UNAVAILABLE
Long-running saga checkpoints after each step
Example 3: Banking Money Transfer
Customer Initiates Wire Transfer
|
v
Transfer SAGA (Orchestration - strict ordering required)
|
+---> Fraud Detection: ML-based fraud check
| (synchronous, blocks transfer)
|
+---> Compliance Check: sanctions screening (OFAC, etc.)
| (synchronous, blocks transfer)
|
+---> Debit Source Account: hold funds (semantic lock)
| (compensation: release hold)
|
+---> Credit Destination Account
| (if bank != source bank: SWIFT/ACH integration)
|
+---> Release Source Account Hold: confirm debit permanent
|
+---> Notification Service: alert both parties
This saga uses semantic locking (funds on hold) to prevent concurrent transfers
draining the account during the saga. This is a PIVOT at "Credit Destination Account"
13. Summary
| Advanced Pattern | When to Use | Key Benefit |
|---|---|---|
| SAGA + CQRS | Always | Hides intermediate saga state from reads |
| SAGA + Event Sourcing | High-audit requirements | Complete history, replay capability |
| Parallel Steps | Independent steps at same saga level | Reduced end-to-end latency |
| Sub-SAGAs | Reusable complex sub-processes | Modularity, independent failure handling |
| Long-Running SAGAs | Business processes over hours/days | Checkpointing, waiting states |
| SQS FIFO | Strict ordering + no-duplicates needed | AWS-native, managed exactly-once |
| DynamoDB State Store | High-throughput, AWS-native | Sub-millisecond state access |
| Temporal | Complex saga logic | Simplest code, automatic persistence |
Next: Part 6 - Pitfalls and Best Practices
Learn every anti-pattern, common mistake, isolation anomaly, and production challenge
that engineers face with SAGAs - with solutions for each.
Series Navigation: Index |
Part 1 |
Part 2 |
Part 3 |
Part 4 | Part 5 |
Part 6 |
Part 7