← Back to Articles
6/6/2026Admin Post

saga demystified part3 orchestration

Part 3: SAGA Orchestration Pattern - Complete Implementation

Series Navigation: Index |
Part 1 |
Part 2 - Choreography | Part 3 |
Part 4 - Implementation |
Part 5 - Advanced |
Part 6 - Pitfalls |
Part 7 - Interview


Table of Contents

  1. What is Orchestration?
  2. Mental Model: The Orchestra Conductor
  3. Orchestration Flow Diagrams
  4. Orchestration Architecture Design
  5. Custom Orchestrator: Domain Model
  6. Saga State Machine Design
  7. Custom Orchestrator: Full Implementation
  8. Participant Services (Command Side)
  9. Saga State Persistence (MySQL)
  10. AWS Step Functions Integration
  11. Step Functions State Machine Definition
  12. Axon Framework Orchestration
  13. Production Configuration
  14. Error Handling in Orchestration
  15. Monitoring and Observability
  16. Testing Orchestration SAGAs
  17. Choreography vs Orchestration Decision Matrix
  18. Advantages and Disadvantages
  19. Summary

1. What is Orchestration?

In orchestration, a central ORCHESTRATOR service drives the entire saga. The orchestrator:

  • Knows every step in the saga
  • Tells each service what to do (sends commands)
  • Receives results from each service
  • Decides what to do next based on results
  • Tracks the complete saga state
  • Initiates compensation when a step fails

Services do NOT know about each other. They only know about the commands they receive and
the orchestrator they respond to.

WITHOUT ORCHESTRATION:                  WITH ORCHESTRATION:
(services know each other)              (services only know orchestrator)

OrderSvc --event--> PaymentSvc          Orchestrator --command--> PaymentSvc
PaymentSvc --event--> InventorySvc           |      <--reply---  PaymentSvc
InventorySvc --event--> ShippingSvc          |
                                             +----command--> InventorySvc
Services coupled to each other               |      <--reply---  InventorySvc
                                             |
                                             +----command--> ShippingSvc
                                                    <--reply---  ShippingSvc
                                        Services only coupled to orchestrator

2. Mental Model: The Orchestra Conductor

An orchestra conductor:

  • Has the full musical score (knows all parts)
  • Cues each section when it is their turn
  • Adjusts tempo based on how sections are playing
  • Coordinates the entire performance

A SAGA orchestrator:

  • Has the full saga definition (knows all steps)
  • Sends commands to each service when it is their turn
  • Decides next action based on replies received
  • Coordinates compensations when things go wrong

Musicians (services) do not talk to each other. They watch the conductor (orchestrator).


3. Orchestration Flow Diagrams

Happy Path

  CLIENT          ORCHESTRATOR          ORDER SVC      PAYMENT SVC   INVENTORY SVC  SHIPPING SVC
     |                 |                    |               |              |               |
     |---placeOrder--->|                    |               |              |               |
     |                 |---CreateOrderCmd-->|               |              |               |
     |                 |<--OrderCreatedResp-|               |              |               |
     |                 |                    |               |              |               |
     |                 |-----------ProcessPaymentCmd------->|              |               |
     |                 |<----------PaymentProcessedResp-----|              |               |
     |                 |                    |               |              |               |
     |                 |------------------------ReserveInventoryCmd------->|               |
     |                 |<-----------------------InventoryReservedResp------|               |
     |                 |                    |               |              |               |
     |                 |--------------------------------------CreateShipmentCmd----------->|
     |                 |<-------------------------------------ShipmentCreatedResp---------|
     |                 |                    |               |              |               |
     |                 |---ConfirmOrderCmd->|               |              |               |
     |<---orderConfirmed|                   |               |              |               |

Failure and Compensation Path

  ORCHESTRATOR          ORDER SVC      PAYMENT SVC   INVENTORY SVC
       |                    |               |              |
       |---CreateOrderCmd-->|               |              |
       |<--OrderCreatedResp-|               |              |
       |                    |               |              |
       |-----------ProcessPaymentCmd------->|              |
       |<----------PaymentProcessedResp-----|              |
       |                    |               |              |
       |------------------------ReserveInventoryCmd------->|
       |<-----------------------InventoryFailedResp--------|
       |                    |               |              |
       | COMPENSATION BEGINS                |              |
       |                    |               |              |
       |-----------RefundPaymentCmd-------->|              |
       |<----------PaymentRefundedResp------|              |
       |                    |               |              |
       |---CancelOrderCmd-->|               |              |
       |<--OrderCancelledResp               |              |
       |                    |               |              |
       | SAGA COMPENSATED - END             |              |

4. Orchestration Architecture Design

The orchestrator communicates with services via:

Option A: Kafka Commands (recommended for async)

  • Orchestrator publishes commands to service-specific topics
  • Services reply to a reply topic
  • Orchestrator listens on the reply topic

Option B: Synchronous REST with async retry

  • Orchestrator calls services via REST
  • Better for simpler, low-volume sagas

Option C: AWS Step Functions

  • Fully managed orchestration
  • Best for AWS-native architectures

This section covers Option A (Kafka) and Option C (Step Functions).

KAFKA-BASED ORCHESTRATION:

Orchestrator publishes:
  command.order-service     -> Order Service listens
  command.payment-service   -> Payment Service listens
  command.inventory-service -> Inventory Service listens
  command.shipping-service  -> Shipping Service listens

Services reply to:
  reply.order-saga          -> Orchestrator listens to ALL replies here

5. Custom Orchestrator: Domain Model

// SagaState.java - the central state entity
package com.example.sagaorchestrator.domain;
 
import jakarta.persistence.*;
import lombok.*;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
 
@Entity
@Table(name = "saga_instances", indexes = {
    @Index(name = "idx_saga_status", columnList = "status"),
    @Index(name = "idx_saga_order_id", columnList = "order_id"),
    @Index(name = "idx_saga_created_at", columnList = "created_at")
})
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class SagaInstance {
 
    @Id
    @Column(name = "saga_id", length = 36)
    private String sagaId;
 
    @Column(name = "saga_type", nullable = false, length = 100)
    private String sagaType;    // e.g., "ORDER_SAGA"
 
    @Column(name = "order_id", length = 36)
    private String orderId;
 
    @Enumerated(EnumType.STRING)
    @Column(name = "status", nullable = false, length = 30)
    private SagaStatus status;
 
    @Enumerated(EnumType.STRING)
    @Column(name = "current_step", nullable = false, length = 50)
    private SagaStep currentStep;
 
    @Column(name = "payload", columnDefinition = "JSON")
    private String payload;     // Full saga context as JSON
 
    @Column(name = "failure_reason", length = 1000)
    private String failureReason;
 
    @Column(name = "retry_count")
    private int retryCount;
 
    @Column(name = "created_at", updatable = false)
    private Instant createdAt;
 
    @Column(name = "updated_at")
    private Instant updatedAt;
 
    @Column(name = "completed_at")
    private Instant completedAt;
 
    @Version
    private Long version;
 
    @OneToMany(mappedBy = "sagaInstance", cascade = CascadeType.ALL, fetch = FetchType.LAZY)
    @OrderBy("executedAt ASC")
    @Builder.Default
    private List<SagaStepExecution> stepExecutions = new ArrayList<>();
 
    @PrePersist
    public void prePersist() {
        this.createdAt = Instant.now();
        this.updatedAt = Instant.now();
    }
 
    @PreUpdate
    public void preUpdate() {
        this.updatedAt = Instant.now();
    }
}
// SagaStatus.java
package com.example.sagaorchestrator.domain;
 
public enum SagaStatus {
    STARTED,          // Saga just created, first step not yet executed
    IN_PROGRESS,      // Executing forward steps
    COMPENSATING,     // Executing compensation steps (failure detected)
    COMPLETED,        // All forward steps completed successfully
    COMPENSATED,      // All compensation steps completed
    FAILED            // Critical failure even in compensation
}
// SagaStep.java
package com.example.sagaorchestrator.domain;
 
public enum SagaStep {
    // Forward steps
    CREATE_ORDER,
    PROCESS_PAYMENT,
    RESERVE_INVENTORY,
    CREATE_SHIPMENT,
    CONFIRM_ORDER,
 
    // Compensation steps
    COMPENSATE_SHIPMENT,
    COMPENSATE_INVENTORY,
    COMPENSATE_PAYMENT,
    COMPENSATE_ORDER,
 
    // Terminal states
    COMPLETED,
    FAILED
}
// SagaStepExecution.java - execution log for each step
package com.example.sagaorchestrator.domain;
 
import jakarta.persistence.*;
import lombok.*;
import java.time.Instant;
 
@Entity
@Table(name = "saga_step_executions")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class SagaStepExecution {
 
    @Id
    @Column(name = "execution_id", length = 36)
    private String executionId;
 
    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "saga_id")
    private SagaInstance sagaInstance;
 
    @Enumerated(EnumType.STRING)
    @Column(name = "step", nullable = false, length = 50)
    private SagaStep step;
 
    @Enumerated(EnumType.STRING)
    @Column(name = "result", length = 20)
    private StepResult result;  // SUCCESS, FAILURE, PENDING
 
    @Column(name = "command_payload", columnDefinition = "JSON")
    private String commandPayload;
 
    @Column(name = "response_payload", columnDefinition = "JSON")
    private String responsePayload;
 
    @Column(name = "error_message", length = 1000)
    private String errorMessage;
 
    @Column(name = "executed_at")
    private Instant executedAt;
 
    @Column(name = "completed_at")
    private Instant completedAt;
}

6. Saga State Machine Design

SAGA STATE TRANSITIONS:

STARTED
  |
  v
CREATE_ORDER (forward)
  |           \
  | success    failure
  v             v
PROCESS_PAYMENT  COMPENSATE_ORDER -> COMPENSATED/FAILED
  |           \
  | success    failure
  v             v
RESERVE_INVENTORY  COMPENSATE_PAYMENT -> COMPENSATE_ORDER -> COMPENSATED
  |           \
  | success    failure
  v             v
CREATE_SHIPMENT  COMPENSATE_INVENTORY -> COMPENSATE_PAYMENT -> COMPENSATE_ORDER -> COMPENSATED
  |           \
  | success    failure (retriable - no compensation, just retry)
  v             v
CONFIRM_ORDER  RETRY CREATE_SHIPMENT (up to N times)
  |
  v
COMPLETED
// SagaTransition.java
package com.example.sagaorchestrator.statemachine;
 
import com.example.sagaorchestrator.domain.SagaStep;
 
public record SagaTransition(
    SagaStep currentStep,
    boolean success,
    SagaStep nextStep
) {
    public static SagaTransition onSuccess(SagaStep from, SagaStep to) {
        return new SagaTransition(from, true, to);
    }
 
    public static SagaTransition onFailure(SagaStep from, SagaStep to) {
        return new SagaTransition(from, false, to);
    }
}

7. Custom Orchestrator: Full Implementation

// OrderSagaOrchestrator.java - the heart of orchestration
package com.example.sagaorchestrator.orchestrator;
 
import com.example.sagaorchestrator.command.*;
import com.example.sagaorchestrator.domain.*;
import com.example.sagaorchestrator.repository.SagaInstanceRepository;
import com.example.sagaorchestrator.reply.*;
import com.example.shared.kafka.CommandTopics;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
import java.time.Instant;
import java.util.UUID;
 
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderSagaOrchestrator {
 
    private final SagaInstanceRepository sagaRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final ObjectMapper objectMapper;
 
    // ==================<mark class="obsidian-highlight"> SAGA INITIATION </mark>==================
 
    /**
     * Start a new order saga. Called by OrderController.
     */
    @Transactional
    public SagaInstance startOrderSaga(StartOrderSagaRequest request) {
        String sagaId = UUID.randomUUID().toString();
        String orderId = UUID.randomUUID().toString();
 
        log.info("Starting OrderSaga: sagaId={}, customerId={}", sagaId, request.customerId());
 
        // Create saga instance
        SagaInstance saga = SagaInstance.builder()
            .sagaId(sagaId)
            .sagaType("ORDER_SAGA")
            .orderId(orderId)
            .status(SagaStatus.STARTED)
            .currentStep(SagaStep.CREATE_ORDER)
            .payload(serialize(request))
            .createdAt(Instant.now())
            .build();
        sagaRepository.save(saga);
 
        // Send first command: create the order
        CreateOrderCommand command = new CreateOrderCommand(
            sagaId,
            orderId,
            request.customerId(),
            request.items(),
            request.totalAmount()
        );
        sendCommand(CommandTopics.ORDER_SERVICE, orderId, command);
 
        return saga;
    }
 
    // ==================<mark class="obsidian-highlight"> REPLY HANDLERS </mark>==================
 
    /**
     * Listen to all saga replies on a single reply topic.
     * Commands from all services return here.
     */
    @KafkaListener(
        topics = "saga.replies",
        groupId = "saga-orchestrator-group"
    )
    @Transactional
    public void handleSagaReply(@Payload SagaReplyEnvelope reply, Acknowledgment ack) {
        log.info("Received saga reply: sagaId={}, step={}, success={}",
            reply.sagaId(), reply.stepName(), reply.success());
 
        try {
            SagaInstance saga = sagaRepository.findBySagaIdWithLock(reply.sagaId())
                .orElseThrow(() -> new SagaNotFoundException("Saga not found: " + reply.sagaId()));
 
            processReply(saga, reply);
            ack.acknowledge();
 
        } catch (Exception e) {
            log.error("Failed to process saga reply: sagaId={}", reply.sagaId(), e);
            throw e;
        }
    }
 
    private void processReply(SagaInstance saga, SagaReplyEnvelope reply) {
        // Log the step execution
        logStepExecution(saga, reply);
 
        if (reply.success()) {
            handleSuccessReply(saga, reply);
        } else {
            handleFailureReply(saga, reply);
        }
    }
 
    // ==================<mark class="obsidian-highlight"> SUCCESS TRANSITIONS </mark>==================
 
    private void handleSuccessReply(SagaInstance saga, SagaReplyEnvelope reply) {
        switch (saga.getCurrentStep()) {
 
            case CREATE_ORDER -> {
                // Order created, now process payment
                saga.setCurrentStep(SagaStep.PROCESS_PAYMENT);
                saga.setStatus(SagaStatus.IN_PROGRESS);
                sagaRepository.save(saga);
 
                OrderCreatedReply orderReply = deserialize(reply.payload(), OrderCreatedReply.class);
                ProcessPaymentCommand command = new ProcessPaymentCommand(
                    saga.getSagaId(),
                    saga.getOrderId(),
                    getCustomerIdFromSaga(saga),
                    getTotalAmountFromSaga(saga)
                );
                sendCommand(CommandTopics.PAYMENT_SERVICE, saga.getOrderId(), command);
                log.info("Saga {}: Order created, sent ProcessPaymentCommand", saga.getSagaId());
            }
 
            case PROCESS_PAYMENT -> {
                // Payment processed, now reserve inventory
                saga.setCurrentStep(SagaStep.RESERVE_INVENTORY);
                sagaRepository.save(saga);
 
                PaymentProcessedReply paymentReply = deserialize(reply.payload(), PaymentProcessedReply.class);
                ReserveInventoryCommand command = new ReserveInventoryCommand(
                    saga.getSagaId(),
                    saga.getOrderId(),
                    paymentReply.paymentId(),
                    getItemsFromSaga(saga)
                );
                sendCommand(CommandTopics.INVENTORY_SERVICE, saga.getOrderId(), command);
                log.info("Saga {}: Payment processed, sent ReserveInventoryCommand", saga.getSagaId());
            }
 
            case RESERVE_INVENTORY -> {
                // Inventory reserved, now create shipment
                saga.setCurrentStep(SagaStep.CREATE_SHIPMENT);
                sagaRepository.save(saga);
 
                InventoryReservedReply inventoryReply = deserialize(reply.payload(), InventoryReservedReply.class);
                CreateShipmentCommand command = new CreateShipmentCommand(
                    saga.getSagaId(),
                    saga.getOrderId(),
                    inventoryReply.reservationId(),
                    getDeliveryAddressFromSaga(saga)
                );
                sendCommand(CommandTopics.SHIPPING_SERVICE, saga.getOrderId(), command);
                log.info("Saga {}: Inventory reserved, sent CreateShipmentCommand", saga.getSagaId());
            }
 
            case CREATE_SHIPMENT -> {
                // Shipment created, now confirm order
                saga.setCurrentStep(SagaStep.CONFIRM_ORDER);
                sagaRepository.save(saga);
 
                ShipmentCreatedReply shipmentReply = deserialize(reply.payload(), ShipmentCreatedReply.class);
                ConfirmOrderCommand command = new ConfirmOrderCommand(
                    saga.getSagaId(),
                    saga.getOrderId(),
                    shipmentReply.shipmentId(),
                    shipmentReply.trackingNumber()
                );
                sendCommand(CommandTopics.ORDER_SERVICE, saga.getOrderId(), command);
                log.info("Saga {}: Shipment created, sent ConfirmOrderCommand", saga.getSagaId());
            }
 
            case CONFIRM_ORDER -> {
                // Saga completed successfully!
                saga.setCurrentStep(SagaStep.COMPLETED);
                saga.setStatus(SagaStatus.COMPLETED);
                saga.setCompletedAt(Instant.now());
                sagaRepository.save(saga);
 
                log.info("SAGA COMPLETED SUCCESSFULLY: sagaId={}, orderId={}",
                    saga.getSagaId(), saga.getOrderId());
            }
 
            default -> log.error("Unexpected step during success handling: {}", saga.getCurrentStep());
        }
    }
 
    // ==================<mark class="obsidian-highlight"> FAILURE TRANSITIONS (COMPENSATION) </mark>==================
 
    private void handleFailureReply(SagaInstance saga, SagaReplyEnvelope reply) {
        log.warn("Saga step failed: sagaId={}, step={}, reason={}",
            saga.getSagaId(), saga.getCurrentStep(), reply.errorMessage());
 
        saga.setFailureReason(reply.errorMessage());
        saga.setStatus(SagaStatus.COMPENSATING);
 
        switch (saga.getCurrentStep()) {
 
            case CREATE_ORDER -> {
                // Order creation itself failed - nothing to compensate
                saga.setStatus(SagaStatus.FAILED);
                saga.setCurrentStep(SagaStep.FAILED);
                sagaRepository.save(saga);
                log.error("Saga FAILED at CREATE_ORDER: sagaId={}", saga.getSagaId());
            }
 
            case PROCESS_PAYMENT -> {
                // Payment failed - compensate by cancelling the order
                saga.setCurrentStep(SagaStep.COMPENSATE_ORDER);
                sagaRepository.save(saga);
 
                CancelOrderCommand command = new CancelOrderCommand(
                    saga.getSagaId(),
                    saga.getOrderId(),
                    "Payment failed: " + reply.errorMessage()
                );
                sendCommand(CommandTopics.ORDER_SERVICE, saga.getOrderId(), command);
            }
 
            case RESERVE_INVENTORY -> {
                // Inventory failed - compensate by refunding payment
                saga.setCurrentStep(SagaStep.COMPENSATE_PAYMENT);
                sagaRepository.save(saga);
 
                RefundPaymentCommand command = new RefundPaymentCommand(
                    saga.getSagaId(),
                    saga.getOrderId(),
                    "Inventory reservation failed: " + reply.errorMessage()
                );
                sendCommand(CommandTopics.PAYMENT_SERVICE, saga.getOrderId(), command);
            }
 
            case CREATE_SHIPMENT -> {
                // Shipment failed - this is AFTER the pivot. Retry, don't compensate.
                // If retries exhausted, trigger manual intervention.
                saga.setRetryCount(saga.getRetryCount() + 1);
                if (saga.getRetryCount() >= 5) {
                    log.error("CRITICAL: Shipment creation exhausted retries for sagaId={}. Manual intervention needed.",
                        saga.getSagaId());
                    saga.setStatus(SagaStatus.FAILED);
                    saga.setCurrentStep(SagaStep.FAILED);
                } else {
                    // Retry the shipment command
                    log.warn("Retrying CreateShipmentCommand for sagaId={}, attempt={}",
                        saga.getSagaId(), saga.getRetryCount());
                    // Re-send CreateShipmentCommand
                }
                sagaRepository.save(saga);
            }
 
            default -> log.error("Unexpected step during failure handling: {}", saga.getCurrentStep());
        }
    }
 
    // ==================<mark class="obsidian-highlight"> COMPENSATION REPLIES </mark>==================
 
    // When compensation step completes, continue compensation chain
    @Transactional
    public void handleCompensationReply(SagaInstance saga, SagaReplyEnvelope reply) {
        switch (saga.getCurrentStep()) {
 
            case COMPENSATE_PAYMENT -> {
                // Payment refunded, now cancel the order
                saga.setCurrentStep(SagaStep.COMPENSATE_ORDER);
                sagaRepository.save(saga);
 
                CancelOrderCommand command = new CancelOrderCommand(
                    saga.getSagaId(),
                    saga.getOrderId(),
                    saga.getFailureReason()
                );
                sendCommand(CommandTopics.ORDER_SERVICE, saga.getOrderId(), command);
            }
 
            case COMPENSATE_ORDER -> {
                // Order cancelled - compensation complete
                saga.setStatus(SagaStatus.COMPENSATED);
                saga.setCurrentStep(SagaStep.COMPLETED);
                saga.setCompletedAt(Instant.now());
                sagaRepository.save(saga);
 
                log.info("SAGA COMPENSATED: sagaId={}, orderId={}", saga.getSagaId(), saga.getOrderId());
            }
        }
    }
 
    // ==================<mark class="obsidian-highlight"> HELPERS </mark>==================
 
    private void sendCommand(String topic, String key, Object command) {
        kafkaTemplate.send(topic, key, command);
        log.debug("Sent command to topic={}: type={}", topic, command.getClass().getSimpleName());
    }
 
    private void logStepExecution(SagaInstance saga, SagaReplyEnvelope reply) {
        SagaStepExecution stepExec = SagaStepExecution.builder()
            .executionId(UUID.randomUUID().toString())
            .sagaInstance(saga)
            .step(saga.getCurrentStep())
            .result(reply.success() ? StepResult.SUCCESS : StepResult.FAILURE)
            .responsePayload(reply.payload())
            .errorMessage(reply.errorMessage())
            .executedAt(Instant.now())
            .completedAt(Instant.now())
            .build();
        saga.getStepExecutions().add(stepExec);
    }
 
    private String serialize(Object obj) {
        try {
            return objectMapper.writeValueAsString(obj);
        } catch (Exception e) {
            throw new RuntimeException("Serialization failed", e);
        }
    }
 
    private <T> T deserialize(String json, Class<T> type) {
        try {
            return objectMapper.readValue(json, type);
        } catch (Exception e) {
            throw new RuntimeException("Deserialization failed", e);
        }
    }
 
    // Helpers to extract data from saga payload (stored as JSON in saga_instances)
    private String getCustomerIdFromSaga(SagaInstance saga) {
        return deserialize(saga.getPayload(), StartOrderSagaRequest.class).customerId();
    }
 
    private java.math.BigDecimal getTotalAmountFromSaga(SagaInstance saga) {
        return deserialize(saga.getPayload(), StartOrderSagaRequest.class).totalAmount();
    }
 
    private java.util.List<OrderItemCommand> getItemsFromSaga(SagaInstance saga) {
        return deserialize(saga.getPayload(), StartOrderSagaRequest.class).items();
    }
 
    private String getDeliveryAddressFromSaga(SagaInstance saga) {
        return deserialize(saga.getPayload(), StartOrderSagaRequest.class).deliveryAddress();
    }
}

8. Participant Services (Command Side)

Services in orchestration receive COMMANDS and reply to a central reply topic.

// PaymentCommandHandler.java
package com.example.paymentservice.command;
 
import com.example.paymentservice.service.PaymentService;
import com.example.sagaorchestrator.command.*;
import com.example.sagaorchestrator.reply.*;
import com.example.shared.kafka.CommandTopics;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
 
@Component
@RequiredArgsConstructor
@Slf4j
public class PaymentCommandHandler {
 
    private final PaymentService paymentService;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final ObjectMapper objectMapper;
 
    @KafkaListener(
        topics = CommandTopics.PAYMENT_SERVICE,
        groupId = "payment-command-handler-group"
    )
    public void handleCommand(@Payload SagaCommandEnvelope command, Acknowledgment ack) {
        log.info("PaymentService received command: type={}, sagaId={}",
            command.commandType(), command.sagaId());
 
        SagaReplyEnvelope reply;
        try {
            reply = switch (command.commandType()) {
                case "ProcessPaymentCommand" -> {
                    ProcessPaymentCommand cmd = deserialize(command.payload(), ProcessPaymentCommand.class);
                    PaymentResult result = paymentService.processPayment(cmd);
                    yield SagaReplyEnvelope.success(
                        command.sagaId(),
                        "PROCESS_PAYMENT",
                        serialize(new PaymentProcessedReply(result.paymentId(), result.amount()))
                    );
                }
                case "RefundPaymentCommand" -> {
                    RefundPaymentCommand cmd = deserialize(command.payload(), RefundPaymentCommand.class);
                    paymentService.refundPayment(cmd.orderId(), cmd.reason());
                    yield SagaReplyEnvelope.success(command.sagaId(), "REFUND_PAYMENT", "{}");
                }
                default -> throw new IllegalArgumentException("Unknown command: " + command.commandType());
            };
 
            ack.acknowledge();
 
        } catch (PaymentDeclinedException e) {
            // Business failure - do not retry
            reply = SagaReplyEnvelope.failure(
                command.sagaId(),
                command.commandType().replace("Command", ""),
                e.getMessage(),
                e.getDeclineCode()
            );
            ack.acknowledge();  // Acknowledge even on failure - failure is a valid result
 
        } catch (Exception e) {
            log.error("Technical failure processing command: sagaId={}", command.sagaId(), e);
            // Do NOT acknowledge - will be retried
            throw e;
        }
 
        // Send reply back to orchestrator
        kafkaTemplate.send("saga.replies", command.sagaId(), reply);
    }
 
    private <T> T deserialize(String json, Class<T> type) {
        try {
            return objectMapper.readValue(json, type);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
 
    private String serialize(Object obj) {
        try {
            return objectMapper.writeValueAsString(obj);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
// SagaReplyEnvelope.java
package com.example.sagaorchestrator.reply;
 
import java.time.Instant;
 
public record SagaReplyEnvelope(
    String replyId,
    String sagaId,
    String stepName,
    boolean success,
    String payload,
    String errorMessage,
    String errorCode,
    Instant occurredAt
) {
    public static SagaReplyEnvelope success(String sagaId, String stepName, String payload) {
        return new SagaReplyEnvelope(
            java.util.UUID.randomUUID().toString(),
            sagaId, stepName, true, payload, null, null, Instant.now()
        );
    }
 
    public static SagaReplyEnvelope failure(String sagaId, String stepName,
                                             String errorMessage, String errorCode) {
        return new SagaReplyEnvelope(
            java.util.UUID.randomUUID().toString(),
            sagaId, stepName, false, null, errorMessage, errorCode, Instant.now()
        );
    }
}
// CommandTopics.java
package com.example.shared.kafka;
 
public final class CommandTopics {
    private CommandTopics() {}
 
    public static final String ORDER_SERVICE     = "command.order-service";
    public static final String PAYMENT_SERVICE   = "command.payment-service";
    public static final String INVENTORY_SERVICE = "command.inventory-service";
    public static final String SHIPPING_SERVICE  = "command.shipping-service";
    public static final String SAGA_REPLIES      = "saga.replies";
}

9. Saga State Persistence (MySQL)

-- ==========================================================<mark class="obsidian-highlight">
-- SAGA ORCHESTRATOR DATABASE
-- </mark>==========================================================
 
CREATE TABLE saga_instances (
    saga_id        VARCHAR(36)   NOT NULL,
    saga_type      VARCHAR(100)  NOT NULL,
    order_id       VARCHAR(36),
    status         VARCHAR(30)   NOT NULL DEFAULT 'STARTED',
    current_step   VARCHAR(50)   NOT NULL,
    payload        JSON,
    failure_reason VARCHAR(1000),
    retry_count    INT           NOT NULL DEFAULT 0,
    created_at     DATETIME(6)   NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at     DATETIME(6)   NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
    completed_at   DATETIME(6),
    version        BIGINT        NOT NULL DEFAULT 0,
    PRIMARY KEY (saga_id),
    INDEX idx_saga_status       (status, created_at),
    INDEX idx_saga_order_id     (order_id),
    INDEX idx_saga_type_status  (saga_type, status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
CREATE TABLE saga_step_executions (
    execution_id     VARCHAR(36)  NOT NULL,
    saga_id          VARCHAR(36)  NOT NULL,
    step             VARCHAR(50)  NOT NULL,
    result           VARCHAR(20),
    command_payload  JSON,
    response_payload JSON,
    error_message    VARCHAR(1000),
    executed_at      DATETIME(6),
    completed_at     DATETIME(6),
    PRIMARY KEY (execution_id),
    INDEX idx_step_exec_saga_id (saga_id),
    FOREIGN KEY (saga_id) REFERENCES saga_instances(saga_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
-- For finding stuck sagas (monitoring query)
CREATE INDEX idx_saga_stuck ON saga_instances (status, updated_at)
    WHERE status IN ('IN_PROGRESS', 'COMPENSATING');
// SagaInstanceRepository.java
package com.example.sagaorchestrator.repository;
 
import com.example.sagaorchestrator.domain.*;
import jakarta.persistence.LockModeType;
import org.springframework.data.jpa.repository.*;
import org.springframework.data.repository.query.Param;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
 
public interface SagaInstanceRepository extends JpaRepository<SagaInstance, String> {
 
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @Query("SELECT s FROM SagaInstance s WHERE s.sagaId = :sagaId")
    Optional<SagaInstance> findBySagaIdWithLock(@Param("sagaId") String sagaId);
 
    Optional<SagaInstance> findByOrderId(String orderId);
 
    List<SagaInstance> findByStatus(SagaStatus status);
 
    @Query("SELECT s FROM SagaInstance s WHERE s.status IN ('IN_PROGRESS', 'COMPENSATING') AND s.updatedAt < :threshold")
    List<SagaInstance> findStuckSagas(@Param("threshold") Instant threshold);
 
    @Query("SELECT COUNT(s) FROM SagaInstance s WHERE s.status = :status AND s.createdAt >= :since")
    long countByStatusSince(@Param("status") SagaStatus status, @Param("since") Instant since);
}

10. AWS Step Functions Integration

AWS Step Functions is a fully managed workflow service perfect for SAGA orchestration.

Key Benefits

  • No orchestrator service to maintain
  • Built-in retry logic with exponential backoff
  • Visual workflow editor in AWS Console
  • Execution history for every saga instance
  • Native AWS service integration (Lambda, SQS, SNS, DynamoDB)
  • Step Functions Express Workflow for high-throughput sagas
// StepFunctionsOrderSagaService.java
package com.example.orderservice.saga;
 
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.services.sfn.SfnClient;
import software.amazon.awssdk.services.sfn.model.*;
import com.fasterxml.jackson.databind.ObjectMapper;
 
import java.util.Map;
import java.util.UUID;
 
@Service
@RequiredArgsConstructor
@Slf4j
public class StepFunctionsOrderSagaService {
 
    private final SfnClient sfnClient;
    private final ObjectMapper objectMapper;
 
    @Value("${aws.stepfunctions.order-saga-arn}")
    private String orderSagaStateMachineArn;
 
    /**
     * Start a new order saga execution in AWS Step Functions.
     */
    public String startOrderSaga(PlaceOrderRequest request) {
        String executionName = "order-saga-" + UUID.randomUUID();
 
        Map<String, Object> input = Map.of(
            "orderId", UUID.randomUUID().toString(),
            "customerId", request.customerId(),
            "items", request.items(),
            "totalAmount", request.totalAmount(),
            "deliveryAddress", request.deliveryAddress()
        );
 
        try {
            String inputJson = objectMapper.writeValueAsString(input);
 
            StartExecutionRequest sfnRequest = StartExecutionRequest.builder()
                .stateMachineArn(orderSagaStateMachineArn)
                .name(executionName)
                .input(inputJson)
                .build();
 
            StartExecutionResponse response = sfnClient.startExecution(sfnRequest);
            log.info("Started Step Functions execution: arn={}", response.executionArn());
 
            return response.executionArn();
 
        } catch (Exception e) {
            log.error("Failed to start Step Functions execution", e);
            throw new SagaStartException("Could not start order saga", e);
        }
    }
 
    /**
     * Get current status of a saga execution.
     */
    public SagaExecutionStatus getSagaStatus(String executionArn) {
        DescribeExecutionRequest request = DescribeExecutionRequest.builder()
            .executionArn(executionArn)
            .build();
 
        DescribeExecutionResponse response = sfnClient.describeExecution(request);
        return new SagaExecutionStatus(
            response.statusAsString(),
            response.startDate().toString(),
            response.stopDate() != null ? response.stopDate().toString() : null,
            response.input(),
            response.output()
        );
    }
 
    /**
     * Send a task success result from a Lambda-invoked service.
     * Used by participant services to report completion back to Step Functions.
     */
    public void sendTaskSuccess(String taskToken, Object result) {
        try {
            SendTaskSuccessRequest request = SendTaskSuccessRequest.builder()
                .taskToken(taskToken)
                .output(objectMapper.writeValueAsString(result))
                .build();
            sfnClient.sendTaskSuccess(request);
        } catch (Exception e) {
            log.error("Failed to send task success", e);
            throw new RuntimeException(e);
        }
    }
 
    /**
     * Send a task failure result.
     */
    public void sendTaskFailure(String taskToken, String error, String cause) {
        SendTaskFailureRequest request = SendTaskFailureRequest.builder()
            .taskToken(taskToken)
            .error(error)
            .cause(cause)
            .build();
        sfnClient.sendTaskFailure(request);
    }
}

11. Step Functions State Machine Definition

This is the AWS Step Functions state machine definition in Amazon States Language (JSON):

{
  "Comment": "Order SAGA Orchestration with Compensation",
  "StartAt": "CreateOrder",
 
  "States": {
    "CreateOrder": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl": "${OrderServiceQueueUrl}",
        "MessageBody": {
          "taskToken.$": "$$.Task.Token",
          "commandType": "CreateOrderCommand",
          "orderId.$": "$.orderId",
          "customerId.$": "$.customerId",
          "items.$": "$.items",
          "totalAmount.$": "$.totalAmount"
        }
      },
      "Retry": [
        {
          "ErrorEquals": ["TransientError"],
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "SagaFailed",
          "ResultPath": "$.error"
        }
      ],
      "Next": "ProcessPayment"
    },
 
    "ProcessPayment": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl": "${PaymentServiceQueueUrl}",
        "MessageBody": {
          "taskToken.$": "$$.Task.Token",
          "commandType": "ProcessPaymentCommand",
          "orderId.$": "$.orderId",
          "customerId.$": "$.customerId",
          "amount.$": "$.totalAmount"
        }
      },
      "Retry": [
        {
          "ErrorEquals": ["TransientError"],
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["PaymentDeclinedError"],
          "Next": "CompensateOrder",
          "ResultPath": "$.paymentError"
        },
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "CompensateOrder",
          "ResultPath": "$.error"
        }
      ],
      "Next": "ReserveInventory",
      "ResultPath": "$.paymentResult"
    },
 
    "ReserveInventory": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl": "${InventoryServiceQueueUrl}",
        "MessageBody": {
          "taskToken.$": "$$.Task.Token",
          "commandType": "ReserveInventoryCommand",
          "orderId.$": "$.orderId",
          "items.$": "$.items"
        }
      },
      "Retry": [
        {
          "ErrorEquals": ["TransientError"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 1.5
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["InsufficientStockError"],
          "Next": "CompensatePayment",
          "ResultPath": "$.inventoryError"
        },
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "CompensatePayment",
          "ResultPath": "$.error"
        }
      ],
      "Next": "CreateShipment",
      "ResultPath": "$.inventoryResult"
    },
 
    "CreateShipment": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl": "${ShippingServiceQueueUrl}",
        "MessageBody": {
          "taskToken.$": "$$.Task.Token",
          "commandType": "CreateShipmentCommand",
          "orderId.$": "$.orderId",
          "reservationId.$": "$.inventoryResult.reservationId"
        }
      },
      "Retry": [
        {
          "ErrorEquals": ["CarrierUnavailableError", "TransientError"],
          "IntervalSeconds": 5,
          "MaxAttempts": 5,
          "BackoffRate": 2.0,
          "MaxDelaySeconds": 60
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "ShipmentFailedManualReview",
          "ResultPath": "$.shippingError"
        }
      ],
      "Next": "ConfirmOrder",
      "ResultPath": "$.shipmentResult"
    },
 
    "ConfirmOrder": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl": "${OrderServiceQueueUrl}",
        "MessageBody": {
          "taskToken.$": "$$.Task.Token",
          "commandType": "ConfirmOrderCommand",
          "orderId.$": "$.orderId",
          "shipmentId.$": "$.shipmentResult.shipmentId"
        }
      },
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 2,
          "MaxAttempts": 5,
          "BackoffRate": 2.0
        }
      ],
      "Next": "SagaCompleted"
    },
 
    "SagaCompleted": {
      "Type": "Succeed"
    },
 
    "SagaFailed": {
      "Type": "Fail",
      "Error": "SagaFailed",
      "Cause": "Saga failed at an unrecoverable step"
    },
 
    "ShipmentFailedManualReview": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "${ManualReviewTopicArn}",
        "Message.$": "States.Format('Order saga needs manual review for orderId: {}', $.orderId)"
      },
      "Next": "SagaFailed"
    },
 
    "CompensatePayment": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl": "${PaymentServiceQueueUrl}",
        "MessageBody": {
          "taskToken.$": "$$.Task.Token",
          "commandType": "RefundPaymentCommand",
          "orderId.$": "$.orderId"
        }
      },
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 5,
          "MaxAttempts": 10,
          "BackoffRate": 2.0,
          "MaxDelaySeconds": 300
        }
      ],
      "Next": "CompensateOrder",
      "ResultPath": "$.compensationResult"
    },
 
    "CompensateOrder": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl": "${OrderServiceQueueUrl}",
        "MessageBody": {
          "taskToken.$": "$$.Task.Token",
          "commandType": "CancelOrderCommand",
          "orderId.$": "$.orderId"
        }
      },
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 5,
          "MaxAttempts": 10,
          "BackoffRate": 2.0
        }
      ],
      "Next": "SagaCompensated",
      "ResultPath": "$.orderCancellationResult"
    },
 
    "SagaCompensated": {
      "Type": "Fail",
      "Error": "SagaCompensated",
      "Cause": "Saga was compensated due to business failure"
    }
  }
}

12. Axon Framework Orchestration

Axon Framework provides first-class SAGA support with a clean annotation-based model.

// OrderManagementSaga.java (Axon Framework)
package com.example.saga;
 
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.modelling.saga.*;
import org.axonframework.spring.stereotype.Saga;
import org.springframework.beans.factory.annotation.Autowired;
import lombok.Slf4j;
 
@Saga
@Slf4j
public class OrderManagementSaga {
 
    @Autowired
    private transient CommandGateway commandGateway;
 
    private String orderId;
    private String paymentId;
    private String reservationId;
    private boolean orderConfirmed = false;
 
    @StartSaga
    @SagaEventHandler(associationProperty = "orderId")
    public void on(OrderCreatedEvent event) {
        this.orderId = event.orderId();
 
        log.info("Saga started for orderId={}", orderId);
 
        // Associate saga with paymentId for future event correlation
        SagaLifecycle.associateWith("orderId", event.orderId());
 
        // Send command to payment service
        commandGateway.send(new ProcessPaymentCommand(
            event.orderId(),
            event.customerId(),
            event.totalAmount()
        ));
    }
 
    @SagaEventHandler(associationProperty = "orderId")
    public void on(PaymentProcessedEvent event) {
        this.paymentId = event.paymentId();
 
        log.info("Payment processed, reserving inventory: orderId={}", orderId);
        commandGateway.send(new ReserveInventoryCommand(event.orderId(), event.items()));
    }
 
    @SagaEventHandler(associationProperty = "orderId")
    public void on(PaymentFailedEvent event) {
        log.warn("Payment failed, compensating: orderId={}, reason={}", orderId, event.reason());
        commandGateway.send(new CancelOrderCommand(orderId, "Payment failed: " + event.reason()));
    }
 
    @SagaEventHandler(associationProperty = "orderId")
    public void on(InventoryReservedEvent event) {
        this.reservationId = event.reservationId();
 
        log.info("Inventory reserved, creating shipment: orderId={}", orderId);
        commandGateway.send(new CreateShipmentCommand(event.orderId(), event.reservationId()));
    }
 
    @SagaEventHandler(associationProperty = "orderId")
    public void on(InventoryReservationFailedEvent event) {
        log.warn("Inventory failed, compensating payment: orderId={}", orderId);
        commandGateway.send(new RefundPaymentCommand(orderId, event.reason()));
    }
 
    @SagaEventHandler(associationProperty = "orderId")
    public void on(PaymentRefundedEvent event) {
        log.info("Payment refunded, cancelling order: orderId={}", orderId);
        commandGateway.send(new CancelOrderCommand(orderId, "Compensated"));
    }
 
    @SagaEventHandler(associationProperty = "orderId")
    public void on(ShipmentCreatedEvent event) {
        log.info("Shipment created, confirming order: orderId={}", orderId);
        commandGateway.send(new ConfirmOrderCommand(orderId, event.shipmentId()));
    }
 
    @EndSaga
    @SagaEventHandler(associationProperty = "orderId")
    public void on(OrderConfirmedEvent event) {
        this.orderConfirmed = true;
        log.info("SAGA COMPLETED: orderId={}", orderId);
    }
 
    @EndSaga
    @SagaEventHandler(associationProperty = "orderId")
    public void on(OrderCancelledEvent event) {
        log.info("SAGA COMPENSATED: orderId={}", orderId);
    }
}

13. Production Configuration

# application.yml for Saga Orchestrator Service
spring:
  application:
    name: saga-orchestrator
 
  datasource:
    url: jdbc:mysql://${DB_HOST}:3306/saga_db?useSSL=true&serverTimezone=UTC
    username: ${DB_USERNAME}
    password: ${DB_PASSWORD}
    hikari:
      pool-name: SagaOrchestratorPool
      minimum-idle: 5
      maximum-pool-size: 30 # Higher than order service - orchestrator is busy
      connection-timeout: 30000
      idle-timeout: 600000
      max-lifetime: 1800000
 
  jpa:
    hibernate:
      ddl-auto: validate
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQLDialect
 
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
    consumer:
      group-id: saga-orchestrator-group
      auto-offset-reset: earliest
      enable-auto-commit: false
      max-poll-records: 5 # Lower - saga processing is complex
    producer:
      acks: all
      retries: 5
      properties:
        enable.idempotence: true
 
# AWS Configuration
aws:
  region: ${AWS_REGION:us-east-1}
  stepfunctions:
    order-saga-arn: ${ORDER_SAGA_ARN}
  sns:
    manual-review-topic-arn: ${MANUAL_REVIEW_SNS_ARN}
 
# Stuck saga recovery
saga:
  recovery:
    enabled: true
    check-interval-minutes: 5
    stuck-threshold-minutes: 30 # Saga stuck > 30 min triggers alert
 
management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,saga-status

14. Error Handling in Orchestration

// SagaRecoveryService.java - finds and recovers stuck sagas
@Service
@RequiredArgsConstructor
@Slf4j
public class SagaRecoveryService {
 
    private final SagaInstanceRepository sagaRepository;
    private final OrderSagaOrchestrator orchestrator;
    private final AlertService alertService;
 
    @Scheduled(fixedDelayString = "${saga.recovery.check-interval-minutes:5}000")
    @Transactional
    public void recoverStuckSagas() {
        Instant threshold = Instant.now()
            .minus(30, java.time.temporal.ChronoUnit.MINUTES);
 
        List<SagaInstance> stuckSagas = sagaRepository.findStuckSagas(threshold);
 
        if (!stuckSagas.isEmpty()) {
            log.warn("Found {} stuck sagas, attempting recovery", stuckSagas.size());
        }
 
        for (SagaInstance saga : stuckSagas) {
            try {
                log.info("Attempting to recover stuck saga: sagaId={}, step={}, lastUpdate={}",
                    saga.getSagaId(), saga.getCurrentStep(), saga.getUpdatedAt());
 
                if (saga.getRetryCount() >= 10) {
                    // Escalate to manual intervention
                    alertService.sendCriticalAlert(
                        "SAGA STUCK - MANUAL INTERVENTION REQUIRED",
                        String.format("SagaId: %s, OrderId: %s, Step: %s, LastUpdate: %s",
                            saga.getSagaId(), saga.getOrderId(),
                            saga.getCurrentStep(), saga.getUpdatedAt())
                    );
                    saga.setStatus(SagaStatus.FAILED);
                    sagaRepository.save(saga);
                    continue;
                }
 
                // Re-send the current step's command
                orchestrator.replayCurrentStep(saga);
                saga.setRetryCount(saga.getRetryCount() + 1);
                sagaRepository.save(saga);
 
            } catch (Exception e) {
                log.error("Failed to recover saga: sagaId={}", saga.getSagaId(), e);
            }
        }
    }
}

15. Monitoring and Observability

// SagaMetrics.java
@Component
@RequiredArgsConstructor
public class SagaMetrics {
 
    private final MeterRegistry meterRegistry;
 
    public void recordSagaStarted(String sagaType) {
        meterRegistry.counter("saga.started", "type", sagaType).increment();
    }
 
    public void recordSagaCompleted(String sagaType, Duration duration) {
        meterRegistry.counter("saga.completed", "type", sagaType).increment();
        meterRegistry.timer("saga.duration", "type", sagaType, "outcome", "success")
            .record(duration);
    }
 
    public void recordSagaCompensated(String sagaType, String failedStep) {
        meterRegistry.counter("saga.compensated",
            "type", sagaType, "failed_step", failedStep).increment();
    }
 
    public void recordSagaFailed(String sagaType, String failedStep) {
        meterRegistry.counter("saga.failed",
            "type", sagaType, "failed_step", failedStep).increment();
    }
 
    public void recordStepLatency(String sagaType, String step, Duration duration) {
        meterRegistry.timer("saga.step.duration",
            "type", sagaType, "step", step).record(duration);
    }
}

16. Testing Orchestration SAGAs

// OrderSagaOrchestratorTest.java
@SpringBootTest
@Testcontainers
class OrderSagaOrchestratorTest {
 
    @Container
    static MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0")
        .withDatabaseName("saga_test")
        .withUsername("test")
        .withPassword("test");
 
    @Container
    static KafkaContainer kafka = new KafkaContainer(
        DockerImageName.parse("confluentinc/cp-kafka:7.5.0")
    );
 
    @Autowired
    private OrderSagaOrchestrator orchestrator;
 
    @Autowired
    private SagaInstanceRepository sagaRepository;
 
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    @Test
    @DisplayName("Complete saga: all steps succeed")
    void testCompleteSaga() throws Exception {
        // Start saga
        StartOrderSagaRequest request = buildTestRequest();
        SagaInstance saga = orchestrator.startOrderSaga(request);
 
        assertThat(saga.getStatus()).isEqualTo(SagaStatus.STARTED);
        assertThat(saga.getCurrentStep()).isEqualTo(SagaStep.CREATE_ORDER);
 
        // Simulate each reply
        simulateReply(saga.getSagaId(), "CREATE_ORDER", true,
            "{\"orderId\":\"" + saga.getOrderId() + "\"}");
 
        await().atMost(5, TimeUnit.SECONDS)
            .until(() -> sagaRepository.findById(saga.getSagaId())
                .map(s -> s.getCurrentStep() <mark class="obsidian-highlight"> SagaStep.PROCESS_PAYMENT)
                .orElse(false));
 
        simulateReply(saga.getSagaId(), "PROCESS_PAYMENT", true,
            "{\"paymentId\":\"PAY-001\",\"amount\":99.99}");
 
        // ... continue through all steps
 
        await().atMost(30, TimeUnit.SECONDS)
            .until(() -> sagaRepository.findById(saga.getSagaId())
                .map(s -> s.getStatus() </mark> SagaStatus.COMPLETED)
                .orElse(false));
 
        SagaInstance completed = sagaRepository.findById(saga.getSagaId()).orElseThrow();
        assertThat(completed.getStatus()).isEqualTo(SagaStatus.COMPLETED);
        assertThat(completed.getCompletedAt()).isNotNull();
        assertThat(completed.getStepExecutions()).hasSize(5);
    }
 
    @Test
    @DisplayName("Compensation saga: inventory fails, payment refunded, order cancelled")
    void testInventoryFailureCompensation() throws Exception {
        SagaInstance saga = orchestrator.startOrderSaga(buildTestRequest());
 
        // Order created OK
        simulateReply(saga.getSagaId(), "CREATE_ORDER", true, "{}");
        // Payment OK
        simulateReply(saga.getSagaId(), "PROCESS_PAYMENT", true,
            "{\"paymentId\":\"PAY-001\"}");
        // Inventory FAILS
        simulateReply(saga.getSagaId(), "RESERVE_INVENTORY", false,
            null, "InsufficientStock", "Out of stock");
 
        await().atMost(30, TimeUnit.SECONDS)
            .until(() -> sagaRepository.findById(saga.getSagaId())
                .map(s -> s.getStatus() == SagaStatus.COMPENSATED)
                .orElse(false));
 
        SagaInstance compensated = sagaRepository.findById(saga.getSagaId()).orElseThrow();
        assertThat(compensated.getStatus()).isEqualTo(SagaStatus.COMPENSATED);
    }
 
    private void simulateReply(String sagaId, String step, boolean success,
                                String payload, String... errorInfo) {
        SagaReplyEnvelope reply = success
            ? SagaReplyEnvelope.success(sagaId, step, payload)
            : SagaReplyEnvelope.failure(sagaId, step,
                errorInfo.length > 0 ? errorInfo[0] : "Error",
                errorInfo.length > 1 ? errorInfo[1] : "ERR");
        kafkaTemplate.send("saga.replies", sagaId, reply);
    }
}

17. Choreography vs Orchestration Decision Matrix

FactorScore for ChoreographyScore for Orchestration
Number of servicesPrefer for < 5Prefer for 5+
Flow complexitySimple, linearComplex, branching
Team independenceHigh (teams own their events)Medium (orchestrator needs coordination)
Debuggability neededLow priorityHigh priority
Visibility requirementsNot criticalCritical
Performance (throughput)Higher (no orchestrator)Slightly lower
Testing easeHarderEasier
Failure tracingDifficultEasy
New step addedAll downstream services changeOnly orchestrator changes
Operational complexityLowerHigher (orchestrator service)
AWS Step Functions availableNo advantageBig advantage

When to Definitely Use Orchestration:

  1. Compliance and Audit: You need to prove exactly what happened at each step for regulatory compliance
  2. Complex branching: "If payment method is BNPL, route to credit check service first"
  3. Long-running processes: Sagas that span hours or days need visible state management
  4. Cross-team sagas: When different teams own different services and coordination is complex
  5. Compensation order matters: Some compensations must happen in a specific order

18. Advantages and Disadvantages

Orchestration Advantages

AdvantageDetails
Full visibilityAlways know exactly which step the saga is on
Easy debuggingOne place to look: the orchestrator logs and saga state table
Explicit flowBusiness logic is readable as a state machine
Easier testingTest orchestrator with mock services
Flexible compensationOrchestrator decides compensation order
MonitoringSingle source of truth for saga metrics

Orchestration Disadvantages

DisadvantageDetails
Central orchestratorIf orchestrator is down, no new sagas can start
Service couplingServices must implement command interfaces
Additional serviceMore infrastructure to build and maintain
Orchestrator complexityAs sagas grow, orchestrator becomes complex
Throughput ceilingOrchestrator can become a bottleneck at very high scale

19. Summary

TopicKey Takeaway
Orchestration conceptCentral coordinator sends commands, receives replies, drives saga state
CommunicationCommands (orchestrator -> service), Replies (service -> orchestrator)
State machineExplicit transitions between saga steps, stored in MySQL
Failure handlingOrchestrator decides which compensation chain to execute
AWS Step FunctionsFully managed orchestration, ideal for AWS-native systems
Axon FrameworkAnnotation-based saga with persistent saga state
Command channelsService-specific Kafka topics per service, single reply topic
RecoveryScheduled job finds stuck sagas and retries current step
TestingMock services, simulate replies, verify state transitions

Next: Part 4 - Deep Dive Implementation

Explore the critical production patterns: Transactional Outbox, Idempotency, Retry with
Dead Letter Queues, Distributed Tracing, and complete MySQL schema design.


Series Navigation: Index |
Part 1 |
Part 2 | Part 3 |
Part 4 |
Part 5 |
Part 6 |
Part 7