Part 3: SAGA Orchestration Pattern - Complete Implementation
Series Navigation: Index |
Part 1 |
Part 2 - Choreography | Part 3 |
Part 4 - Implementation |
Part 5 - Advanced |
Part 6 - Pitfalls |
Part 7 - Interview
Table of Contents
- What is Orchestration?
- Mental Model: The Orchestra Conductor
- Orchestration Flow Diagrams
- Orchestration Architecture Design
- Custom Orchestrator: Domain Model
- Saga State Machine Design
- Custom Orchestrator: Full Implementation
- Participant Services (Command Side)
- Saga State Persistence (MySQL)
- AWS Step Functions Integration
- Step Functions State Machine Definition
- Axon Framework Orchestration
- Production Configuration
- Error Handling in Orchestration
- Monitoring and Observability
- Testing Orchestration SAGAs
- Choreography vs Orchestration Decision Matrix
- Advantages and Disadvantages
- Summary
1. What is Orchestration?
In orchestration, a central ORCHESTRATOR service drives the entire saga. The orchestrator:
- Knows every step in the saga
- Tells each service what to do (sends commands)
- Receives results from each service
- Decides what to do next based on results
- Tracks the complete saga state
- Initiates compensation when a step fails
Services do NOT know about each other. They only know about the commands they receive and
the orchestrator they respond to.
WITHOUT ORCHESTRATION: WITH ORCHESTRATION:
(services know each other) (services only know orchestrator)
OrderSvc --event--> PaymentSvc Orchestrator --command--> PaymentSvc
PaymentSvc --event--> InventorySvc | <--reply--- PaymentSvc
InventorySvc --event--> ShippingSvc |
+----command--> InventorySvc
Services coupled to each other | <--reply--- InventorySvc
|
+----command--> ShippingSvc
<--reply--- ShippingSvc
Services only coupled to orchestrator
2. Mental Model: The Orchestra Conductor
An orchestra conductor:
- Has the full musical score (knows all parts)
- Cues each section when it is their turn
- Adjusts tempo based on how sections are playing
- Coordinates the entire performance
A SAGA orchestrator:
- Has the full saga definition (knows all steps)
- Sends commands to each service when it is their turn
- Decides next action based on replies received
- Coordinates compensations when things go wrong
Musicians (services) do not talk to each other. They watch the conductor (orchestrator).
3. Orchestration Flow Diagrams
Happy Path
CLIENT ORCHESTRATOR ORDER SVC PAYMENT SVC INVENTORY SVC SHIPPING SVC
| | | | | |
|---placeOrder--->| | | | |
| |---CreateOrderCmd-->| | | |
| |<--OrderCreatedResp-| | | |
| | | | | |
| |-----------ProcessPaymentCmd------->| | |
| |<----------PaymentProcessedResp-----| | |
| | | | | |
| |------------------------ReserveInventoryCmd------->| |
| |<-----------------------InventoryReservedResp------| |
| | | | | |
| |--------------------------------------CreateShipmentCmd----------->|
| |<-------------------------------------ShipmentCreatedResp---------|
| | | | | |
| |---ConfirmOrderCmd->| | | |
|<---orderConfirmed| | | | |
Failure and Compensation Path
ORCHESTRATOR ORDER SVC PAYMENT SVC INVENTORY SVC
| | | |
|---CreateOrderCmd-->| | |
|<--OrderCreatedResp-| | |
| | | |
|-----------ProcessPaymentCmd------->| |
|<----------PaymentProcessedResp-----| |
| | | |
|------------------------ReserveInventoryCmd------->|
|<-----------------------InventoryFailedResp--------|
| | | |
| COMPENSATION BEGINS | |
| | | |
|-----------RefundPaymentCmd-------->| |
|<----------PaymentRefundedResp------| |
| | | |
|---CancelOrderCmd-->| | |
|<--OrderCancelledResp | |
| | | |
| SAGA COMPENSATED - END | |
4. Orchestration Architecture Design
The orchestrator communicates with services via:
Option A: Kafka Commands (recommended for async)
- Orchestrator publishes commands to service-specific topics
- Services reply to a reply topic
- Orchestrator listens on the reply topic
Option B: Synchronous REST with async retry
- Orchestrator calls services via REST
- Better for simpler, low-volume sagas
Option C: AWS Step Functions
- Fully managed orchestration
- Best for AWS-native architectures
This section covers Option A (Kafka) and Option C (Step Functions).
KAFKA-BASED ORCHESTRATION:
Orchestrator publishes:
command.order-service -> Order Service listens
command.payment-service -> Payment Service listens
command.inventory-service -> Inventory Service listens
command.shipping-service -> Shipping Service listens
Services reply to:
reply.order-saga -> Orchestrator listens to ALL replies here
5. Custom Orchestrator: Domain Model
// SagaState.java - the central state entity
package com.example.sagaorchestrator.domain;
import jakarta.persistence.*;
import lombok.*;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
@Entity
@Table(name = "saga_instances", indexes = {
@Index(name = "idx_saga_status", columnList = "status"),
@Index(name = "idx_saga_order_id", columnList = "order_id"),
@Index(name = "idx_saga_created_at", columnList = "created_at")
})
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class SagaInstance {
@Id
@Column(name = "saga_id", length = 36)
private String sagaId;
@Column(name = "saga_type", nullable = false, length = 100)
private String sagaType; // e.g., "ORDER_SAGA"
@Column(name = "order_id", length = 36)
private String orderId;
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false, length = 30)
private SagaStatus status;
@Enumerated(EnumType.STRING)
@Column(name = "current_step", nullable = false, length = 50)
private SagaStep currentStep;
@Column(name = "payload", columnDefinition = "JSON")
private String payload; // Full saga context as JSON
@Column(name = "failure_reason", length = 1000)
private String failureReason;
@Column(name = "retry_count")
private int retryCount;
@Column(name = "created_at", updatable = false)
private Instant createdAt;
@Column(name = "updated_at")
private Instant updatedAt;
@Column(name = "completed_at")
private Instant completedAt;
@Version
private Long version;
@OneToMany(mappedBy = "sagaInstance", cascade = CascadeType.ALL, fetch = FetchType.LAZY)
@OrderBy("executedAt ASC")
@Builder.Default
private List<SagaStepExecution> stepExecutions = new ArrayList<>();
@PrePersist
public void prePersist() {
this.createdAt = Instant.now();
this.updatedAt = Instant.now();
}
@PreUpdate
public void preUpdate() {
this.updatedAt = Instant.now();
}
}// SagaStatus.java
package com.example.sagaorchestrator.domain;
public enum SagaStatus {
STARTED, // Saga just created, first step not yet executed
IN_PROGRESS, // Executing forward steps
COMPENSATING, // Executing compensation steps (failure detected)
COMPLETED, // All forward steps completed successfully
COMPENSATED, // All compensation steps completed
FAILED // Critical failure even in compensation
}// SagaStep.java
package com.example.sagaorchestrator.domain;
public enum SagaStep {
// Forward steps
CREATE_ORDER,
PROCESS_PAYMENT,
RESERVE_INVENTORY,
CREATE_SHIPMENT,
CONFIRM_ORDER,
// Compensation steps
COMPENSATE_SHIPMENT,
COMPENSATE_INVENTORY,
COMPENSATE_PAYMENT,
COMPENSATE_ORDER,
// Terminal states
COMPLETED,
FAILED
}// SagaStepExecution.java - execution log for each step
package com.example.sagaorchestrator.domain;
import jakarta.persistence.*;
import lombok.*;
import java.time.Instant;
@Entity
@Table(name = "saga_step_executions")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class SagaStepExecution {
@Id
@Column(name = "execution_id", length = 36)
private String executionId;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "saga_id")
private SagaInstance sagaInstance;
@Enumerated(EnumType.STRING)
@Column(name = "step", nullable = false, length = 50)
private SagaStep step;
@Enumerated(EnumType.STRING)
@Column(name = "result", length = 20)
private StepResult result; // SUCCESS, FAILURE, PENDING
@Column(name = "command_payload", columnDefinition = "JSON")
private String commandPayload;
@Column(name = "response_payload", columnDefinition = "JSON")
private String responsePayload;
@Column(name = "error_message", length = 1000)
private String errorMessage;
@Column(name = "executed_at")
private Instant executedAt;
@Column(name = "completed_at")
private Instant completedAt;
}6. Saga State Machine Design
SAGA STATE TRANSITIONS:
STARTED
|
v
CREATE_ORDER (forward)
| \
| success failure
v v
PROCESS_PAYMENT COMPENSATE_ORDER -> COMPENSATED/FAILED
| \
| success failure
v v
RESERVE_INVENTORY COMPENSATE_PAYMENT -> COMPENSATE_ORDER -> COMPENSATED
| \
| success failure
v v
CREATE_SHIPMENT COMPENSATE_INVENTORY -> COMPENSATE_PAYMENT -> COMPENSATE_ORDER -> COMPENSATED
| \
| success failure (retriable - no compensation, just retry)
v v
CONFIRM_ORDER RETRY CREATE_SHIPMENT (up to N times)
|
v
COMPLETED
// SagaTransition.java
package com.example.sagaorchestrator.statemachine;
import com.example.sagaorchestrator.domain.SagaStep;
public record SagaTransition(
SagaStep currentStep,
boolean success,
SagaStep nextStep
) {
public static SagaTransition onSuccess(SagaStep from, SagaStep to) {
return new SagaTransition(from, true, to);
}
public static SagaTransition onFailure(SagaStep from, SagaStep to) {
return new SagaTransition(from, false, to);
}
}7. Custom Orchestrator: Full Implementation
// OrderSagaOrchestrator.java - the heart of orchestration
package com.example.sagaorchestrator.orchestrator;
import com.example.sagaorchestrator.command.*;
import com.example.sagaorchestrator.domain.*;
import com.example.sagaorchestrator.repository.SagaInstanceRepository;
import com.example.sagaorchestrator.reply.*;
import com.example.shared.kafka.CommandTopics;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.UUID;
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderSagaOrchestrator {
private final SagaInstanceRepository sagaRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
// ==================<mark class="obsidian-highlight"> SAGA INITIATION </mark>==================
/**
* Start a new order saga. Called by OrderController.
*/
@Transactional
public SagaInstance startOrderSaga(StartOrderSagaRequest request) {
String sagaId = UUID.randomUUID().toString();
String orderId = UUID.randomUUID().toString();
log.info("Starting OrderSaga: sagaId={}, customerId={}", sagaId, request.customerId());
// Create saga instance
SagaInstance saga = SagaInstance.builder()
.sagaId(sagaId)
.sagaType("ORDER_SAGA")
.orderId(orderId)
.status(SagaStatus.STARTED)
.currentStep(SagaStep.CREATE_ORDER)
.payload(serialize(request))
.createdAt(Instant.now())
.build();
sagaRepository.save(saga);
// Send first command: create the order
CreateOrderCommand command = new CreateOrderCommand(
sagaId,
orderId,
request.customerId(),
request.items(),
request.totalAmount()
);
sendCommand(CommandTopics.ORDER_SERVICE, orderId, command);
return saga;
}
// ==================<mark class="obsidian-highlight"> REPLY HANDLERS </mark>==================
/**
* Listen to all saga replies on a single reply topic.
* Commands from all services return here.
*/
@KafkaListener(
topics = "saga.replies",
groupId = "saga-orchestrator-group"
)
@Transactional
public void handleSagaReply(@Payload SagaReplyEnvelope reply, Acknowledgment ack) {
log.info("Received saga reply: sagaId={}, step={}, success={}",
reply.sagaId(), reply.stepName(), reply.success());
try {
SagaInstance saga = sagaRepository.findBySagaIdWithLock(reply.sagaId())
.orElseThrow(() -> new SagaNotFoundException("Saga not found: " + reply.sagaId()));
processReply(saga, reply);
ack.acknowledge();
} catch (Exception e) {
log.error("Failed to process saga reply: sagaId={}", reply.sagaId(), e);
throw e;
}
}
private void processReply(SagaInstance saga, SagaReplyEnvelope reply) {
// Log the step execution
logStepExecution(saga, reply);
if (reply.success()) {
handleSuccessReply(saga, reply);
} else {
handleFailureReply(saga, reply);
}
}
// ==================<mark class="obsidian-highlight"> SUCCESS TRANSITIONS </mark>==================
private void handleSuccessReply(SagaInstance saga, SagaReplyEnvelope reply) {
switch (saga.getCurrentStep()) {
case CREATE_ORDER -> {
// Order created, now process payment
saga.setCurrentStep(SagaStep.PROCESS_PAYMENT);
saga.setStatus(SagaStatus.IN_PROGRESS);
sagaRepository.save(saga);
OrderCreatedReply orderReply = deserialize(reply.payload(), OrderCreatedReply.class);
ProcessPaymentCommand command = new ProcessPaymentCommand(
saga.getSagaId(),
saga.getOrderId(),
getCustomerIdFromSaga(saga),
getTotalAmountFromSaga(saga)
);
sendCommand(CommandTopics.PAYMENT_SERVICE, saga.getOrderId(), command);
log.info("Saga {}: Order created, sent ProcessPaymentCommand", saga.getSagaId());
}
case PROCESS_PAYMENT -> {
// Payment processed, now reserve inventory
saga.setCurrentStep(SagaStep.RESERVE_INVENTORY);
sagaRepository.save(saga);
PaymentProcessedReply paymentReply = deserialize(reply.payload(), PaymentProcessedReply.class);
ReserveInventoryCommand command = new ReserveInventoryCommand(
saga.getSagaId(),
saga.getOrderId(),
paymentReply.paymentId(),
getItemsFromSaga(saga)
);
sendCommand(CommandTopics.INVENTORY_SERVICE, saga.getOrderId(), command);
log.info("Saga {}: Payment processed, sent ReserveInventoryCommand", saga.getSagaId());
}
case RESERVE_INVENTORY -> {
// Inventory reserved, now create shipment
saga.setCurrentStep(SagaStep.CREATE_SHIPMENT);
sagaRepository.save(saga);
InventoryReservedReply inventoryReply = deserialize(reply.payload(), InventoryReservedReply.class);
CreateShipmentCommand command = new CreateShipmentCommand(
saga.getSagaId(),
saga.getOrderId(),
inventoryReply.reservationId(),
getDeliveryAddressFromSaga(saga)
);
sendCommand(CommandTopics.SHIPPING_SERVICE, saga.getOrderId(), command);
log.info("Saga {}: Inventory reserved, sent CreateShipmentCommand", saga.getSagaId());
}
case CREATE_SHIPMENT -> {
// Shipment created, now confirm order
saga.setCurrentStep(SagaStep.CONFIRM_ORDER);
sagaRepository.save(saga);
ShipmentCreatedReply shipmentReply = deserialize(reply.payload(), ShipmentCreatedReply.class);
ConfirmOrderCommand command = new ConfirmOrderCommand(
saga.getSagaId(),
saga.getOrderId(),
shipmentReply.shipmentId(),
shipmentReply.trackingNumber()
);
sendCommand(CommandTopics.ORDER_SERVICE, saga.getOrderId(), command);
log.info("Saga {}: Shipment created, sent ConfirmOrderCommand", saga.getSagaId());
}
case CONFIRM_ORDER -> {
// Saga completed successfully!
saga.setCurrentStep(SagaStep.COMPLETED);
saga.setStatus(SagaStatus.COMPLETED);
saga.setCompletedAt(Instant.now());
sagaRepository.save(saga);
log.info("SAGA COMPLETED SUCCESSFULLY: sagaId={}, orderId={}",
saga.getSagaId(), saga.getOrderId());
}
default -> log.error("Unexpected step during success handling: {}", saga.getCurrentStep());
}
}
// ==================<mark class="obsidian-highlight"> FAILURE TRANSITIONS (COMPENSATION) </mark>==================
private void handleFailureReply(SagaInstance saga, SagaReplyEnvelope reply) {
log.warn("Saga step failed: sagaId={}, step={}, reason={}",
saga.getSagaId(), saga.getCurrentStep(), reply.errorMessage());
saga.setFailureReason(reply.errorMessage());
saga.setStatus(SagaStatus.COMPENSATING);
switch (saga.getCurrentStep()) {
case CREATE_ORDER -> {
// Order creation itself failed - nothing to compensate
saga.setStatus(SagaStatus.FAILED);
saga.setCurrentStep(SagaStep.FAILED);
sagaRepository.save(saga);
log.error("Saga FAILED at CREATE_ORDER: sagaId={}", saga.getSagaId());
}
case PROCESS_PAYMENT -> {
// Payment failed - compensate by cancelling the order
saga.setCurrentStep(SagaStep.COMPENSATE_ORDER);
sagaRepository.save(saga);
CancelOrderCommand command = new CancelOrderCommand(
saga.getSagaId(),
saga.getOrderId(),
"Payment failed: " + reply.errorMessage()
);
sendCommand(CommandTopics.ORDER_SERVICE, saga.getOrderId(), command);
}
case RESERVE_INVENTORY -> {
// Inventory failed - compensate by refunding payment
saga.setCurrentStep(SagaStep.COMPENSATE_PAYMENT);
sagaRepository.save(saga);
RefundPaymentCommand command = new RefundPaymentCommand(
saga.getSagaId(),
saga.getOrderId(),
"Inventory reservation failed: " + reply.errorMessage()
);
sendCommand(CommandTopics.PAYMENT_SERVICE, saga.getOrderId(), command);
}
case CREATE_SHIPMENT -> {
// Shipment failed - this is AFTER the pivot. Retry, don't compensate.
// If retries exhausted, trigger manual intervention.
saga.setRetryCount(saga.getRetryCount() + 1);
if (saga.getRetryCount() >= 5) {
log.error("CRITICAL: Shipment creation exhausted retries for sagaId={}. Manual intervention needed.",
saga.getSagaId());
saga.setStatus(SagaStatus.FAILED);
saga.setCurrentStep(SagaStep.FAILED);
} else {
// Retry the shipment command
log.warn("Retrying CreateShipmentCommand for sagaId={}, attempt={}",
saga.getSagaId(), saga.getRetryCount());
// Re-send CreateShipmentCommand
}
sagaRepository.save(saga);
}
default -> log.error("Unexpected step during failure handling: {}", saga.getCurrentStep());
}
}
// ==================<mark class="obsidian-highlight"> COMPENSATION REPLIES </mark>==================
// When compensation step completes, continue compensation chain
@Transactional
public void handleCompensationReply(SagaInstance saga, SagaReplyEnvelope reply) {
switch (saga.getCurrentStep()) {
case COMPENSATE_PAYMENT -> {
// Payment refunded, now cancel the order
saga.setCurrentStep(SagaStep.COMPENSATE_ORDER);
sagaRepository.save(saga);
CancelOrderCommand command = new CancelOrderCommand(
saga.getSagaId(),
saga.getOrderId(),
saga.getFailureReason()
);
sendCommand(CommandTopics.ORDER_SERVICE, saga.getOrderId(), command);
}
case COMPENSATE_ORDER -> {
// Order cancelled - compensation complete
saga.setStatus(SagaStatus.COMPENSATED);
saga.setCurrentStep(SagaStep.COMPLETED);
saga.setCompletedAt(Instant.now());
sagaRepository.save(saga);
log.info("SAGA COMPENSATED: sagaId={}, orderId={}", saga.getSagaId(), saga.getOrderId());
}
}
}
// ==================<mark class="obsidian-highlight"> HELPERS </mark>==================
private void sendCommand(String topic, String key, Object command) {
kafkaTemplate.send(topic, key, command);
log.debug("Sent command to topic={}: type={}", topic, command.getClass().getSimpleName());
}
private void logStepExecution(SagaInstance saga, SagaReplyEnvelope reply) {
SagaStepExecution stepExec = SagaStepExecution.builder()
.executionId(UUID.randomUUID().toString())
.sagaInstance(saga)
.step(saga.getCurrentStep())
.result(reply.success() ? StepResult.SUCCESS : StepResult.FAILURE)
.responsePayload(reply.payload())
.errorMessage(reply.errorMessage())
.executedAt(Instant.now())
.completedAt(Instant.now())
.build();
saga.getStepExecutions().add(stepExec);
}
private String serialize(Object obj) {
try {
return objectMapper.writeValueAsString(obj);
} catch (Exception e) {
throw new RuntimeException("Serialization failed", e);
}
}
private <T> T deserialize(String json, Class<T> type) {
try {
return objectMapper.readValue(json, type);
} catch (Exception e) {
throw new RuntimeException("Deserialization failed", e);
}
}
// Helpers to extract data from saga payload (stored as JSON in saga_instances)
private String getCustomerIdFromSaga(SagaInstance saga) {
return deserialize(saga.getPayload(), StartOrderSagaRequest.class).customerId();
}
private java.math.BigDecimal getTotalAmountFromSaga(SagaInstance saga) {
return deserialize(saga.getPayload(), StartOrderSagaRequest.class).totalAmount();
}
private java.util.List<OrderItemCommand> getItemsFromSaga(SagaInstance saga) {
return deserialize(saga.getPayload(), StartOrderSagaRequest.class).items();
}
private String getDeliveryAddressFromSaga(SagaInstance saga) {
return deserialize(saga.getPayload(), StartOrderSagaRequest.class).deliveryAddress();
}
}8. Participant Services (Command Side)
Services in orchestration receive COMMANDS and reply to a central reply topic.
// PaymentCommandHandler.java
package com.example.paymentservice.command;
import com.example.paymentservice.service.PaymentService;
import com.example.sagaorchestrator.command.*;
import com.example.sagaorchestrator.reply.*;
import com.example.shared.kafka.CommandTopics;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class PaymentCommandHandler {
private final PaymentService paymentService;
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
@KafkaListener(
topics = CommandTopics.PAYMENT_SERVICE,
groupId = "payment-command-handler-group"
)
public void handleCommand(@Payload SagaCommandEnvelope command, Acknowledgment ack) {
log.info("PaymentService received command: type={}, sagaId={}",
command.commandType(), command.sagaId());
SagaReplyEnvelope reply;
try {
reply = switch (command.commandType()) {
case "ProcessPaymentCommand" -> {
ProcessPaymentCommand cmd = deserialize(command.payload(), ProcessPaymentCommand.class);
PaymentResult result = paymentService.processPayment(cmd);
yield SagaReplyEnvelope.success(
command.sagaId(),
"PROCESS_PAYMENT",
serialize(new PaymentProcessedReply(result.paymentId(), result.amount()))
);
}
case "RefundPaymentCommand" -> {
RefundPaymentCommand cmd = deserialize(command.payload(), RefundPaymentCommand.class);
paymentService.refundPayment(cmd.orderId(), cmd.reason());
yield SagaReplyEnvelope.success(command.sagaId(), "REFUND_PAYMENT", "{}");
}
default -> throw new IllegalArgumentException("Unknown command: " + command.commandType());
};
ack.acknowledge();
} catch (PaymentDeclinedException e) {
// Business failure - do not retry
reply = SagaReplyEnvelope.failure(
command.sagaId(),
command.commandType().replace("Command", ""),
e.getMessage(),
e.getDeclineCode()
);
ack.acknowledge(); // Acknowledge even on failure - failure is a valid result
} catch (Exception e) {
log.error("Technical failure processing command: sagaId={}", command.sagaId(), e);
// Do NOT acknowledge - will be retried
throw e;
}
// Send reply back to orchestrator
kafkaTemplate.send("saga.replies", command.sagaId(), reply);
}
private <T> T deserialize(String json, Class<T> type) {
try {
return objectMapper.readValue(json, type);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private String serialize(Object obj) {
try {
return objectMapper.writeValueAsString(obj);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}// SagaReplyEnvelope.java
package com.example.sagaorchestrator.reply;
import java.time.Instant;
public record SagaReplyEnvelope(
String replyId,
String sagaId,
String stepName,
boolean success,
String payload,
String errorMessage,
String errorCode,
Instant occurredAt
) {
public static SagaReplyEnvelope success(String sagaId, String stepName, String payload) {
return new SagaReplyEnvelope(
java.util.UUID.randomUUID().toString(),
sagaId, stepName, true, payload, null, null, Instant.now()
);
}
public static SagaReplyEnvelope failure(String sagaId, String stepName,
String errorMessage, String errorCode) {
return new SagaReplyEnvelope(
java.util.UUID.randomUUID().toString(),
sagaId, stepName, false, null, errorMessage, errorCode, Instant.now()
);
}
}// CommandTopics.java
package com.example.shared.kafka;
public final class CommandTopics {
private CommandTopics() {}
public static final String ORDER_SERVICE = "command.order-service";
public static final String PAYMENT_SERVICE = "command.payment-service";
public static final String INVENTORY_SERVICE = "command.inventory-service";
public static final String SHIPPING_SERVICE = "command.shipping-service";
public static final String SAGA_REPLIES = "saga.replies";
}9. Saga State Persistence (MySQL)
-- ==========================================================<mark class="obsidian-highlight">
-- SAGA ORCHESTRATOR DATABASE
-- </mark>==========================================================
CREATE TABLE saga_instances (
saga_id VARCHAR(36) NOT NULL,
saga_type VARCHAR(100) NOT NULL,
order_id VARCHAR(36),
status VARCHAR(30) NOT NULL DEFAULT 'STARTED',
current_step VARCHAR(50) NOT NULL,
payload JSON,
failure_reason VARCHAR(1000),
retry_count INT NOT NULL DEFAULT 0,
created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
completed_at DATETIME(6),
version BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (saga_id),
INDEX idx_saga_status (status, created_at),
INDEX idx_saga_order_id (order_id),
INDEX idx_saga_type_status (saga_type, status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE saga_step_executions (
execution_id VARCHAR(36) NOT NULL,
saga_id VARCHAR(36) NOT NULL,
step VARCHAR(50) NOT NULL,
result VARCHAR(20),
command_payload JSON,
response_payload JSON,
error_message VARCHAR(1000),
executed_at DATETIME(6),
completed_at DATETIME(6),
PRIMARY KEY (execution_id),
INDEX idx_step_exec_saga_id (saga_id),
FOREIGN KEY (saga_id) REFERENCES saga_instances(saga_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- For finding stuck sagas (monitoring query)
CREATE INDEX idx_saga_stuck ON saga_instances (status, updated_at)
WHERE status IN ('IN_PROGRESS', 'COMPENSATING');// SagaInstanceRepository.java
package com.example.sagaorchestrator.repository;
import com.example.sagaorchestrator.domain.*;
import jakarta.persistence.LockModeType;
import org.springframework.data.jpa.repository.*;
import org.springframework.data.repository.query.Param;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
public interface SagaInstanceRepository extends JpaRepository<SagaInstance, String> {
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT s FROM SagaInstance s WHERE s.sagaId = :sagaId")
Optional<SagaInstance> findBySagaIdWithLock(@Param("sagaId") String sagaId);
Optional<SagaInstance> findByOrderId(String orderId);
List<SagaInstance> findByStatus(SagaStatus status);
@Query("SELECT s FROM SagaInstance s WHERE s.status IN ('IN_PROGRESS', 'COMPENSATING') AND s.updatedAt < :threshold")
List<SagaInstance> findStuckSagas(@Param("threshold") Instant threshold);
@Query("SELECT COUNT(s) FROM SagaInstance s WHERE s.status = :status AND s.createdAt >= :since")
long countByStatusSince(@Param("status") SagaStatus status, @Param("since") Instant since);
}10. AWS Step Functions Integration
AWS Step Functions is a fully managed workflow service perfect for SAGA orchestration.
Key Benefits
- No orchestrator service to maintain
- Built-in retry logic with exponential backoff
- Visual workflow editor in AWS Console
- Execution history for every saga instance
- Native AWS service integration (Lambda, SQS, SNS, DynamoDB)
- Step Functions Express Workflow for high-throughput sagas
// StepFunctionsOrderSagaService.java
package com.example.orderservice.saga;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.sfn.SfnClient;
import software.amazon.awssdk.services.sfn.model.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Map;
import java.util.UUID;
@Service
@RequiredArgsConstructor
@Slf4j
public class StepFunctionsOrderSagaService {
private final SfnClient sfnClient;
private final ObjectMapper objectMapper;
@Value("${aws.stepfunctions.order-saga-arn}")
private String orderSagaStateMachineArn;
/**
* Start a new order saga execution in AWS Step Functions.
*/
public String startOrderSaga(PlaceOrderRequest request) {
String executionName = "order-saga-" + UUID.randomUUID();
Map<String, Object> input = Map.of(
"orderId", UUID.randomUUID().toString(),
"customerId", request.customerId(),
"items", request.items(),
"totalAmount", request.totalAmount(),
"deliveryAddress", request.deliveryAddress()
);
try {
String inputJson = objectMapper.writeValueAsString(input);
StartExecutionRequest sfnRequest = StartExecutionRequest.builder()
.stateMachineArn(orderSagaStateMachineArn)
.name(executionName)
.input(inputJson)
.build();
StartExecutionResponse response = sfnClient.startExecution(sfnRequest);
log.info("Started Step Functions execution: arn={}", response.executionArn());
return response.executionArn();
} catch (Exception e) {
log.error("Failed to start Step Functions execution", e);
throw new SagaStartException("Could not start order saga", e);
}
}
/**
* Get current status of a saga execution.
*/
public SagaExecutionStatus getSagaStatus(String executionArn) {
DescribeExecutionRequest request = DescribeExecutionRequest.builder()
.executionArn(executionArn)
.build();
DescribeExecutionResponse response = sfnClient.describeExecution(request);
return new SagaExecutionStatus(
response.statusAsString(),
response.startDate().toString(),
response.stopDate() != null ? response.stopDate().toString() : null,
response.input(),
response.output()
);
}
/**
* Send a task success result from a Lambda-invoked service.
* Used by participant services to report completion back to Step Functions.
*/
public void sendTaskSuccess(String taskToken, Object result) {
try {
SendTaskSuccessRequest request = SendTaskSuccessRequest.builder()
.taskToken(taskToken)
.output(objectMapper.writeValueAsString(result))
.build();
sfnClient.sendTaskSuccess(request);
} catch (Exception e) {
log.error("Failed to send task success", e);
throw new RuntimeException(e);
}
}
/**
* Send a task failure result.
*/
public void sendTaskFailure(String taskToken, String error, String cause) {
SendTaskFailureRequest request = SendTaskFailureRequest.builder()
.taskToken(taskToken)
.error(error)
.cause(cause)
.build();
sfnClient.sendTaskFailure(request);
}
}11. Step Functions State Machine Definition
This is the AWS Step Functions state machine definition in Amazon States Language (JSON):
{
"Comment": "Order SAGA Orchestration with Compensation",
"StartAt": "CreateOrder",
"States": {
"CreateOrder": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "${OrderServiceQueueUrl}",
"MessageBody": {
"taskToken.$": "$$.Task.Token",
"commandType": "CreateOrderCommand",
"orderId.$": "$.orderId",
"customerId.$": "$.customerId",
"items.$": "$.items",
"totalAmount.$": "$.totalAmount"
}
},
"Retry": [
{
"ErrorEquals": ["TransientError"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "SagaFailed",
"ResultPath": "$.error"
}
],
"Next": "ProcessPayment"
},
"ProcessPayment": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "${PaymentServiceQueueUrl}",
"MessageBody": {
"taskToken.$": "$$.Task.Token",
"commandType": "ProcessPaymentCommand",
"orderId.$": "$.orderId",
"customerId.$": "$.customerId",
"amount.$": "$.totalAmount"
}
},
"Retry": [
{
"ErrorEquals": ["TransientError"],
"IntervalSeconds": 2,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
],
"Catch": [
{
"ErrorEquals": ["PaymentDeclinedError"],
"Next": "CompensateOrder",
"ResultPath": "$.paymentError"
},
{
"ErrorEquals": ["States.ALL"],
"Next": "CompensateOrder",
"ResultPath": "$.error"
}
],
"Next": "ReserveInventory",
"ResultPath": "$.paymentResult"
},
"ReserveInventory": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "${InventoryServiceQueueUrl}",
"MessageBody": {
"taskToken.$": "$$.Task.Token",
"commandType": "ReserveInventoryCommand",
"orderId.$": "$.orderId",
"items.$": "$.items"
}
},
"Retry": [
{
"ErrorEquals": ["TransientError"],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 1.5
}
],
"Catch": [
{
"ErrorEquals": ["InsufficientStockError"],
"Next": "CompensatePayment",
"ResultPath": "$.inventoryError"
},
{
"ErrorEquals": ["States.ALL"],
"Next": "CompensatePayment",
"ResultPath": "$.error"
}
],
"Next": "CreateShipment",
"ResultPath": "$.inventoryResult"
},
"CreateShipment": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "${ShippingServiceQueueUrl}",
"MessageBody": {
"taskToken.$": "$$.Task.Token",
"commandType": "CreateShipmentCommand",
"orderId.$": "$.orderId",
"reservationId.$": "$.inventoryResult.reservationId"
}
},
"Retry": [
{
"ErrorEquals": ["CarrierUnavailableError", "TransientError"],
"IntervalSeconds": 5,
"MaxAttempts": 5,
"BackoffRate": 2.0,
"MaxDelaySeconds": 60
}
],
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "ShipmentFailedManualReview",
"ResultPath": "$.shippingError"
}
],
"Next": "ConfirmOrder",
"ResultPath": "$.shipmentResult"
},
"ConfirmOrder": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "${OrderServiceQueueUrl}",
"MessageBody": {
"taskToken.$": "$$.Task.Token",
"commandType": "ConfirmOrderCommand",
"orderId.$": "$.orderId",
"shipmentId.$": "$.shipmentResult.shipmentId"
}
},
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 2,
"MaxAttempts": 5,
"BackoffRate": 2.0
}
],
"Next": "SagaCompleted"
},
"SagaCompleted": {
"Type": "Succeed"
},
"SagaFailed": {
"Type": "Fail",
"Error": "SagaFailed",
"Cause": "Saga failed at an unrecoverable step"
},
"ShipmentFailedManualReview": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "${ManualReviewTopicArn}",
"Message.$": "States.Format('Order saga needs manual review for orderId: {}', $.orderId)"
},
"Next": "SagaFailed"
},
"CompensatePayment": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "${PaymentServiceQueueUrl}",
"MessageBody": {
"taskToken.$": "$$.Task.Token",
"commandType": "RefundPaymentCommand",
"orderId.$": "$.orderId"
}
},
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 5,
"MaxAttempts": 10,
"BackoffRate": 2.0,
"MaxDelaySeconds": 300
}
],
"Next": "CompensateOrder",
"ResultPath": "$.compensationResult"
},
"CompensateOrder": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "${OrderServiceQueueUrl}",
"MessageBody": {
"taskToken.$": "$$.Task.Token",
"commandType": "CancelOrderCommand",
"orderId.$": "$.orderId"
}
},
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 5,
"MaxAttempts": 10,
"BackoffRate": 2.0
}
],
"Next": "SagaCompensated",
"ResultPath": "$.orderCancellationResult"
},
"SagaCompensated": {
"Type": "Fail",
"Error": "SagaCompensated",
"Cause": "Saga was compensated due to business failure"
}
}
}12. Axon Framework Orchestration
Axon Framework provides first-class SAGA support with a clean annotation-based model.
// OrderManagementSaga.java (Axon Framework)
package com.example.saga;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.modelling.saga.*;
import org.axonframework.spring.stereotype.Saga;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.Slf4j;
@Saga
@Slf4j
public class OrderManagementSaga {
@Autowired
private transient CommandGateway commandGateway;
private String orderId;
private String paymentId;
private String reservationId;
private boolean orderConfirmed = false;
@StartSaga
@SagaEventHandler(associationProperty = "orderId")
public void on(OrderCreatedEvent event) {
this.orderId = event.orderId();
log.info("Saga started for orderId={}", orderId);
// Associate saga with paymentId for future event correlation
SagaLifecycle.associateWith("orderId", event.orderId());
// Send command to payment service
commandGateway.send(new ProcessPaymentCommand(
event.orderId(),
event.customerId(),
event.totalAmount()
));
}
@SagaEventHandler(associationProperty = "orderId")
public void on(PaymentProcessedEvent event) {
this.paymentId = event.paymentId();
log.info("Payment processed, reserving inventory: orderId={}", orderId);
commandGateway.send(new ReserveInventoryCommand(event.orderId(), event.items()));
}
@SagaEventHandler(associationProperty = "orderId")
public void on(PaymentFailedEvent event) {
log.warn("Payment failed, compensating: orderId={}, reason={}", orderId, event.reason());
commandGateway.send(new CancelOrderCommand(orderId, "Payment failed: " + event.reason()));
}
@SagaEventHandler(associationProperty = "orderId")
public void on(InventoryReservedEvent event) {
this.reservationId = event.reservationId();
log.info("Inventory reserved, creating shipment: orderId={}", orderId);
commandGateway.send(new CreateShipmentCommand(event.orderId(), event.reservationId()));
}
@SagaEventHandler(associationProperty = "orderId")
public void on(InventoryReservationFailedEvent event) {
log.warn("Inventory failed, compensating payment: orderId={}", orderId);
commandGateway.send(new RefundPaymentCommand(orderId, event.reason()));
}
@SagaEventHandler(associationProperty = "orderId")
public void on(PaymentRefundedEvent event) {
log.info("Payment refunded, cancelling order: orderId={}", orderId);
commandGateway.send(new CancelOrderCommand(orderId, "Compensated"));
}
@SagaEventHandler(associationProperty = "orderId")
public void on(ShipmentCreatedEvent event) {
log.info("Shipment created, confirming order: orderId={}", orderId);
commandGateway.send(new ConfirmOrderCommand(orderId, event.shipmentId()));
}
@EndSaga
@SagaEventHandler(associationProperty = "orderId")
public void on(OrderConfirmedEvent event) {
this.orderConfirmed = true;
log.info("SAGA COMPLETED: orderId={}", orderId);
}
@EndSaga
@SagaEventHandler(associationProperty = "orderId")
public void on(OrderCancelledEvent event) {
log.info("SAGA COMPENSATED: orderId={}", orderId);
}
}13. Production Configuration
# application.yml for Saga Orchestrator Service
spring:
application:
name: saga-orchestrator
datasource:
url: jdbc:mysql://${DB_HOST}:3306/saga_db?useSSL=true&serverTimezone=UTC
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
hikari:
pool-name: SagaOrchestratorPool
minimum-idle: 5
maximum-pool-size: 30 # Higher than order service - orchestrator is busy
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
jpa:
hibernate:
ddl-auto: validate
properties:
hibernate:
dialect: org.hibernate.dialect.MySQLDialect
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
consumer:
group-id: saga-orchestrator-group
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 5 # Lower - saga processing is complex
producer:
acks: all
retries: 5
properties:
enable.idempotence: true
# AWS Configuration
aws:
region: ${AWS_REGION:us-east-1}
stepfunctions:
order-saga-arn: ${ORDER_SAGA_ARN}
sns:
manual-review-topic-arn: ${MANUAL_REVIEW_SNS_ARN}
# Stuck saga recovery
saga:
recovery:
enabled: true
check-interval-minutes: 5
stuck-threshold-minutes: 30 # Saga stuck > 30 min triggers alert
management:
endpoints:
web:
exposure:
include: health,info,metrics,saga-status14. Error Handling in Orchestration
// SagaRecoveryService.java - finds and recovers stuck sagas
@Service
@RequiredArgsConstructor
@Slf4j
public class SagaRecoveryService {
private final SagaInstanceRepository sagaRepository;
private final OrderSagaOrchestrator orchestrator;
private final AlertService alertService;
@Scheduled(fixedDelayString = "${saga.recovery.check-interval-minutes:5}000")
@Transactional
public void recoverStuckSagas() {
Instant threshold = Instant.now()
.minus(30, java.time.temporal.ChronoUnit.MINUTES);
List<SagaInstance> stuckSagas = sagaRepository.findStuckSagas(threshold);
if (!stuckSagas.isEmpty()) {
log.warn("Found {} stuck sagas, attempting recovery", stuckSagas.size());
}
for (SagaInstance saga : stuckSagas) {
try {
log.info("Attempting to recover stuck saga: sagaId={}, step={}, lastUpdate={}",
saga.getSagaId(), saga.getCurrentStep(), saga.getUpdatedAt());
if (saga.getRetryCount() >= 10) {
// Escalate to manual intervention
alertService.sendCriticalAlert(
"SAGA STUCK - MANUAL INTERVENTION REQUIRED",
String.format("SagaId: %s, OrderId: %s, Step: %s, LastUpdate: %s",
saga.getSagaId(), saga.getOrderId(),
saga.getCurrentStep(), saga.getUpdatedAt())
);
saga.setStatus(SagaStatus.FAILED);
sagaRepository.save(saga);
continue;
}
// Re-send the current step's command
orchestrator.replayCurrentStep(saga);
saga.setRetryCount(saga.getRetryCount() + 1);
sagaRepository.save(saga);
} catch (Exception e) {
log.error("Failed to recover saga: sagaId={}", saga.getSagaId(), e);
}
}
}
}15. Monitoring and Observability
// SagaMetrics.java
@Component
@RequiredArgsConstructor
public class SagaMetrics {
private final MeterRegistry meterRegistry;
public void recordSagaStarted(String sagaType) {
meterRegistry.counter("saga.started", "type", sagaType).increment();
}
public void recordSagaCompleted(String sagaType, Duration duration) {
meterRegistry.counter("saga.completed", "type", sagaType).increment();
meterRegistry.timer("saga.duration", "type", sagaType, "outcome", "success")
.record(duration);
}
public void recordSagaCompensated(String sagaType, String failedStep) {
meterRegistry.counter("saga.compensated",
"type", sagaType, "failed_step", failedStep).increment();
}
public void recordSagaFailed(String sagaType, String failedStep) {
meterRegistry.counter("saga.failed",
"type", sagaType, "failed_step", failedStep).increment();
}
public void recordStepLatency(String sagaType, String step, Duration duration) {
meterRegistry.timer("saga.step.duration",
"type", sagaType, "step", step).record(duration);
}
}16. Testing Orchestration SAGAs
// OrderSagaOrchestratorTest.java
@SpringBootTest
@Testcontainers
class OrderSagaOrchestratorTest {
@Container
static MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0")
.withDatabaseName("saga_test")
.withUsername("test")
.withPassword("test");
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0")
);
@Autowired
private OrderSagaOrchestrator orchestrator;
@Autowired
private SagaInstanceRepository sagaRepository;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Test
@DisplayName("Complete saga: all steps succeed")
void testCompleteSaga() throws Exception {
// Start saga
StartOrderSagaRequest request = buildTestRequest();
SagaInstance saga = orchestrator.startOrderSaga(request);
assertThat(saga.getStatus()).isEqualTo(SagaStatus.STARTED);
assertThat(saga.getCurrentStep()).isEqualTo(SagaStep.CREATE_ORDER);
// Simulate each reply
simulateReply(saga.getSagaId(), "CREATE_ORDER", true,
"{\"orderId\":\"" + saga.getOrderId() + "\"}");
await().atMost(5, TimeUnit.SECONDS)
.until(() -> sagaRepository.findById(saga.getSagaId())
.map(s -> s.getCurrentStep() <mark class="obsidian-highlight"> SagaStep.PROCESS_PAYMENT)
.orElse(false));
simulateReply(saga.getSagaId(), "PROCESS_PAYMENT", true,
"{\"paymentId\":\"PAY-001\",\"amount\":99.99}");
// ... continue through all steps
await().atMost(30, TimeUnit.SECONDS)
.until(() -> sagaRepository.findById(saga.getSagaId())
.map(s -> s.getStatus() </mark> SagaStatus.COMPLETED)
.orElse(false));
SagaInstance completed = sagaRepository.findById(saga.getSagaId()).orElseThrow();
assertThat(completed.getStatus()).isEqualTo(SagaStatus.COMPLETED);
assertThat(completed.getCompletedAt()).isNotNull();
assertThat(completed.getStepExecutions()).hasSize(5);
}
@Test
@DisplayName("Compensation saga: inventory fails, payment refunded, order cancelled")
void testInventoryFailureCompensation() throws Exception {
SagaInstance saga = orchestrator.startOrderSaga(buildTestRequest());
// Order created OK
simulateReply(saga.getSagaId(), "CREATE_ORDER", true, "{}");
// Payment OK
simulateReply(saga.getSagaId(), "PROCESS_PAYMENT", true,
"{\"paymentId\":\"PAY-001\"}");
// Inventory FAILS
simulateReply(saga.getSagaId(), "RESERVE_INVENTORY", false,
null, "InsufficientStock", "Out of stock");
await().atMost(30, TimeUnit.SECONDS)
.until(() -> sagaRepository.findById(saga.getSagaId())
.map(s -> s.getStatus() == SagaStatus.COMPENSATED)
.orElse(false));
SagaInstance compensated = sagaRepository.findById(saga.getSagaId()).orElseThrow();
assertThat(compensated.getStatus()).isEqualTo(SagaStatus.COMPENSATED);
}
private void simulateReply(String sagaId, String step, boolean success,
String payload, String... errorInfo) {
SagaReplyEnvelope reply = success
? SagaReplyEnvelope.success(sagaId, step, payload)
: SagaReplyEnvelope.failure(sagaId, step,
errorInfo.length > 0 ? errorInfo[0] : "Error",
errorInfo.length > 1 ? errorInfo[1] : "ERR");
kafkaTemplate.send("saga.replies", sagaId, reply);
}
}17. Choreography vs Orchestration Decision Matrix
| Factor | Score for Choreography | Score for Orchestration |
|---|---|---|
| Number of services | Prefer for < 5 | Prefer for 5+ |
| Flow complexity | Simple, linear | Complex, branching |
| Team independence | High (teams own their events) | Medium (orchestrator needs coordination) |
| Debuggability needed | Low priority | High priority |
| Visibility requirements | Not critical | Critical |
| Performance (throughput) | Higher (no orchestrator) | Slightly lower |
| Testing ease | Harder | Easier |
| Failure tracing | Difficult | Easy |
| New step added | All downstream services change | Only orchestrator changes |
| Operational complexity | Lower | Higher (orchestrator service) |
| AWS Step Functions available | No advantage | Big advantage |
When to Definitely Use Orchestration:
- Compliance and Audit: You need to prove exactly what happened at each step for regulatory compliance
- Complex branching: "If payment method is BNPL, route to credit check service first"
- Long-running processes: Sagas that span hours or days need visible state management
- Cross-team sagas: When different teams own different services and coordination is complex
- Compensation order matters: Some compensations must happen in a specific order
18. Advantages and Disadvantages
Orchestration Advantages
| Advantage | Details |
|---|---|
| Full visibility | Always know exactly which step the saga is on |
| Easy debugging | One place to look: the orchestrator logs and saga state table |
| Explicit flow | Business logic is readable as a state machine |
| Easier testing | Test orchestrator with mock services |
| Flexible compensation | Orchestrator decides compensation order |
| Monitoring | Single source of truth for saga metrics |
Orchestration Disadvantages
| Disadvantage | Details |
|---|---|
| Central orchestrator | If orchestrator is down, no new sagas can start |
| Service coupling | Services must implement command interfaces |
| Additional service | More infrastructure to build and maintain |
| Orchestrator complexity | As sagas grow, orchestrator becomes complex |
| Throughput ceiling | Orchestrator can become a bottleneck at very high scale |
19. Summary
| Topic | Key Takeaway |
|---|---|
| Orchestration concept | Central coordinator sends commands, receives replies, drives saga state |
| Communication | Commands (orchestrator -> service), Replies (service -> orchestrator) |
| State machine | Explicit transitions between saga steps, stored in MySQL |
| Failure handling | Orchestrator decides which compensation chain to execute |
| AWS Step Functions | Fully managed orchestration, ideal for AWS-native systems |
| Axon Framework | Annotation-based saga with persistent saga state |
| Command channels | Service-specific Kafka topics per service, single reply topic |
| Recovery | Scheduled job finds stuck sagas and retries current step |
| Testing | Mock services, simulate replies, verify state transitions |
Next: Part 4 - Deep Dive Implementation
Explore the critical production patterns: Transactional Outbox, Idempotency, Retry with
Dead Letter Queues, Distributed Tracing, and complete MySQL schema design.
Series Navigation: Index |
Part 1 |
Part 2 | Part 3 |
Part 4 |
Part 5 |
Part 6 |
Part 7