← Back to Articles
6/6/2026Admin Post

saga demystified part2 choreography

Part 2: SAGA Choreography Pattern - Complete Implementation

Series Navigation: Index |
Part 1 - Fundamentals | Part 2 |
Part 3 - Orchestration |
Part 4 - Implementation |
Part 5 - Advanced |
Part 6 - Pitfalls |
Part 7 - Interview


Table of Contents

  1. What is Choreography?
  2. Mental Model: A Dance Without a Director
  3. Event Flow Diagrams
  4. Project Setup and Dependencies
  5. Shared Domain Models and Events
  6. Order Service - Full Implementation
  7. Payment Service - Full Implementation
  8. Inventory Service - Full Implementation
  9. Shipping Service - Full Implementation
  10. Kafka Configuration (AWS MSK)
  11. MySQL Schema Design
  12. Complete application.yml
  13. Handling the Outbox Pattern in Choreography
  14. Error Handling and Retry Strategies
  15. Testing Choreography SAGAs
  16. AWS Integration: SNS + SQS Pattern
  17. Advantages and Disadvantages
  18. When to Choose Choreography
  19. Summary

1. What is Choreography?

In choreography, services communicate by emitting events and listening to events.
There is NO central coordinator that knows the full saga flow. Each service acts
independently based on what events it receives.

Think of it as a pub/sub system where:

  • Each service PUBLISHES events when something happens
  • Each service SUBSCRIBES to events from other services
  • Services react to events and perform their local work
  • The saga progresses through cascading events
Order Service          Message Broker         Other Services
(Producer)             (Kafka / SQS)          (Consumers)
     |                       |                     |
     |-- OrderCreatedEvent -->|                     |
     |                       |-- OrderCreatedEvent-->|
     |                       |   (Payment Service subscribes)
     |                       |                     |
     |                       |<-- PaymentProcessedEvent
     |                       |                     |
     |                       |-- PaymentProcessedEvent -->
     |                       |   (Inventory Service subscribes)
     |                       |                     |
     ... and so on

What Each Service "Knows"

  • Order Service: Knows to create an order and publish OrderCreatedEvent. Knows to
    cancel an order when OrderCancelledEvent is received.
  • Payment Service: Knows to process payment when OrderCreatedEvent arrives.
    Knows to refund when InventoryReservationFailedEvent arrives.
  • Inventory Service: Knows to reserve inventory when PaymentProcessedEvent arrives.
    Knows to release when OrderCancelledEvent arrives.
  • Shipping Service: Knows to create shipment when InventoryReservedEvent arrives.

NO service knows the complete order saga flow. The saga emerges from the interaction
of all services reacting to events.


2. Mental Model: A Dance Without a Director

Imagine a jazz band:

  • There is NO conductor telling each musician what to play and when
  • Each musician listens to what the others are playing
  • Each musician plays their part in response to what they hear
  • Together, they produce a coherent piece of music

That is choreography. Each service is a musician. Events are the music. The saga is
the coherent piece that emerges.

Compare with an orchestra with a conductor - that is orchestration (Part 3).


3. Event Flow Diagrams

Happy Path

Customer          Order Svc         Payment Svc      Inventory Svc     Shipping Svc
    |                 |                  |                 |                |
    |---placeOrder--->|                  |                 |                |
    |                 |-- INSERT order   |                 |                |
    |                 |   (status=PENDING|                 |                |
    |                 |-- PUBLISH: OrderCreatedEvent       |                |
    |                 |                  |                 |                |
    |                 |          (receives event)          |                |
    |                 |                  |-- process pmt   |                |
    |                 |                  |-- INSERT payment|                |
    |                 |                  |-- PUBLISH: PaymentProcessedEvent |
    |                 |                  |                 |                |
    |                 |                  |      (receives event)            |
    |                 |                  |                 |-- reserve stock|
    |                 |                  |                 |-- INSERT rsrv  |
    |                 |                  |                 |-- PUBLISH: InventoryReservedEvent
    |                 |                  |                 |                |
    |                 |                  |                 |    (receives)  |
    |                 |                  |                 |                |-- create shipment
    |                 |                  |                 |                |-- PUBLISH: ShipmentCreatedEvent
    |                 |                  |                 |                |
    |                 | (receives event) |                 |                |
    |                 |-- UPDATE order   |                 |                |
    |                 |   status=CONFIRMED                 |                |
    |<---orderConfirmed|                 |                 |                |

Failure Path (Inventory Out of Stock)

Customer          Order Svc         Payment Svc      Inventory Svc     Shipping Svc
    |                 |                  |                 |
    |---placeOrder--->|                  |                 |
    |                 |-- INSERT order PENDING             |
    |                 |-- PUBLISH: OrderCreatedEvent       |
    |                 |                  |                 |
    |                 |          (receives event)          |
    |                 |                  |-- charge card   |
    |                 |                  |-- INSERT payment CHARGED
    |                 |                  |-- PUBLISH: PaymentProcessedEvent
    |                 |                  |                 |
    |                 |                  |      (receives event)
    |                 |                  |                 |-- check stock: 0 items
    |                 |                  |                 |-- ROLLBACK local txn
    |                 |                  |                 |-- PUBLISH: InventoryReservationFailedEvent
    |                 |                  |                 |
    |                 |                  |    (receives compensation event)
    |                 |                  |-- call refund API
    |                 |                  |-- UPDATE payment REFUNDED
    |                 |                  |-- PUBLISH: PaymentRefundedEvent
    |                 |                  |
    |                 |  (receives event)|
    |                 |-- UPDATE order CANCELLED
    |                 |-- PUBLISH: OrderCancelledEvent
    |<--orderCancelled|

4. Project Setup and Dependencies

Maven Parent POM

<!-- pom.xml (parent) -->
<project>
    <groupId>com.example</groupId>
    <artifactId>order-saga-parent</artifactId>
    <version>1.0.0</version>
    <packaging>pom</packaging>
 
    <properties>
        <java.version>17</java.version>
        <spring.boot.version>3.2.3</spring.boot.version>
        <spring.cloud.version>2023.0.1</spring.cloud.version>
    </properties>
 
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>

Order Service pom.xml

<dependencies>
    <!-- Spring Boot Starters -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-validation</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
 
    <!-- Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
 
    <!-- MySQL -->
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <scope>runtime</scope>
    </dependency>
 
    <!-- AWS SDK for MSK IAM Auth -->
    <dependency>
        <groupId>software.amazon.msk</groupId>
        <artifactId>aws-msk-iam-auth</artifactId>
        <version>2.1.0</version>
    </dependency>
 
    <!-- AWS SDK for SQS (optional, if using SQS) -->
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>sqs</artifactId>
        <version>2.23.0</version>
    </dependency>
 
    <!-- Micrometer for Metrics -->
    <dependency>
        <groupId>io.micrometer</groupId>
        <artifactId>micrometer-registry-cloudwatch2</artifactId>
    </dependency>
 
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
 
    <!-- Jackson for JSON -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
 
    <!-- Testing -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>kafka</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testcontainers</groupId>
        <artifactId>mysql</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

5. Shared Domain Models and Events

In a real system these live in a shared library module published to your artifact repository.

Order Domain

// Order.java
package com.example.orderservice.domain;
 
import jakarta.persistence.*;
import lombok.*;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
 
@Entity
@Table(name = "orders", indexes = {
    @Index(name = "idx_orders_customer_id", columnList = "customer_id"),
    @Index(name = "idx_orders_status", columnList = "status"),
    @Index(name = "idx_orders_created_at", columnList = "created_at")
})
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Order {
 
    @Id
    @Column(name = "order_id", length = 36)
    private String orderId;
 
    @Column(name = "customer_id", nullable = false, length = 36)
    private String customerId;
 
    @Enumerated(EnumType.STRING)
    @Column(name = "status", nullable = false, length = 30)
    private OrderStatus status;
 
    @Column(name = "total_amount", nullable = false, precision = 15, scale = 2)
    private BigDecimal totalAmount;
 
    @Column(name = "saga_id", length = 36)
    private String sagaId;
 
    @OneToMany(mappedBy = "order", cascade = CascadeType.ALL, orphanRemoval = true, fetch = FetchType.LAZY)
    @Builder.Default
    private List<OrderItem> items = new ArrayList<>();
 
    @Column(name = "failure_reason", length = 500)
    private String failureReason;
 
    @Column(name = "created_at", nullable = false, updatable = false)
    private Instant createdAt;
 
    @Column(name = "updated_at", nullable = false)
    private Instant updatedAt;
 
    @Version
    @Column(name = "version")
    private Long version;  // Optimistic locking
 
    @PrePersist
    public void prePersist() {
        this.createdAt = Instant.now();
        this.updatedAt = Instant.now();
    }
 
    @PreUpdate
    public void preUpdate() {
        this.updatedAt = Instant.now();
    }
}
// OrderItem.java
package com.example.orderservice.domain;
 
import jakarta.persistence.*;
import lombok.*;
import java.math.BigDecimal;
 
@Entity
@Table(name = "order_items")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class OrderItem {
 
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
 
    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "order_id", nullable = false)
    private Order order;
 
    @Column(name = "product_id", nullable = false, length = 36)
    private String productId;
 
    @Column(name = "product_name", nullable = false, length = 255)
    private String productName;
 
    @Column(name = "quantity", nullable = false)
    private Integer quantity;
 
    @Column(name = "unit_price", nullable = false, precision = 15, scale = 2)
    private BigDecimal unitPrice;
}
// OrderStatus.java
package com.example.orderservice.domain;
 
public enum OrderStatus {
    PENDING,          // Created, awaiting payment processing
    PAYMENT_PENDING,  // Saga started, payment being processed
    PAYMENT_PROCESSED,// Payment done, awaiting inventory
    CONFIRMED,        // All saga steps completed successfully
    CANCELLING,       // Compensation in progress
    CANCELLED,        // Saga compensated, order cancelled
    FAILED            // Saga failed with non-compensable error
}

Event Classes (Shared Kafka Events)

Events are records (immutable) that get serialized to JSON and published to Kafka topics.

// OrderCreatedEvent.java
package com.example.shared.events;
 
import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.List;
 
public record OrderCreatedEvent(
    @JsonProperty("eventId") String eventId,
    @JsonProperty("sagaId") String sagaId,
    @JsonProperty("orderId") String orderId,
    @JsonProperty("customerId") String customerId,
    @JsonProperty("items") List<OrderItemDto> items,
    @JsonProperty("totalAmount") BigDecimal totalAmount,
    @JsonProperty("occurredAt") Instant occurredAt
) {
    public static OrderCreatedEvent of(String sagaId, String orderId,
                                       String customerId, List<OrderItemDto> items,
                                       BigDecimal totalAmount) {
        return new OrderCreatedEvent(
            java.util.UUID.randomUUID().toString(),
            sagaId,
            orderId,
            customerId,
            items,
            totalAmount,
            Instant.now()
        );
    }
}
// OrderItemDto.java
package com.example.shared.events;
 
import java.math.BigDecimal;
 
public record OrderItemDto(
    String productId,
    String productName,
    Integer quantity,
    BigDecimal unitPrice
) {}
// PaymentProcessedEvent.java
package com.example.shared.events;
 
import java.math.BigDecimal;
import java.time.Instant;
 
public record PaymentProcessedEvent(
    String eventId,
    String sagaId,
    String orderId,
    String paymentId,
    String customerId,
    BigDecimal amount,
    String paymentMethod,
    String externalTransactionId,
    Instant occurredAt
) {}
// PaymentFailedEvent.java
package com.example.shared.events;
 
import java.time.Instant;
 
public record PaymentFailedEvent(
    String eventId,
    String sagaId,
    String orderId,
    String customerId,
    String failureReason,
    String failureCode,
    Instant occurredAt
) {}
// InventoryReservedEvent.java
package com.example.shared.events;
 
import java.time.Instant;
import java.util.List;
 
public record InventoryReservedEvent(
    String eventId,
    String sagaId,
    String orderId,
    String reservationId,
    List<ReservedItemDto> reservedItems,
    Instant occurredAt
) {}
// ReservedItemDto.java
package com.example.shared.events;
 
public record ReservedItemDto(
    String productId,
    Integer reservedQuantity,
    String warehouseId
) {}
// InventoryReservationFailedEvent.java
package com.example.shared.events;
 
import java.time.Instant;
import java.util.List;
 
public record InventoryReservationFailedEvent(
    String eventId,
    String sagaId,
    String orderId,
    List<String> outOfStockProductIds,
    String failureReason,
    Instant occurredAt
) {}
// ShipmentCreatedEvent.java
package com.example.shared.events;
 
import java.time.Instant;
import java.time.LocalDate;
 
public record ShipmentCreatedEvent(
    String eventId,
    String sagaId,
    String orderId,
    String shipmentId,
    String trackingNumber,
    LocalDate estimatedDelivery,
    Instant occurredAt
) {}
// PaymentRefundedEvent.java - for compensation
package com.example.shared.events;
 
import java.math.BigDecimal;
import java.time.Instant;
 
public record PaymentRefundedEvent(
    String eventId,
    String sagaId,
    String orderId,
    String paymentId,
    BigDecimal refundedAmount,
    String refundTransactionId,
    Instant occurredAt
) {}

Kafka Topic Constants

// KafkaTopics.java
package com.example.shared.kafka;
 
public final class KafkaTopics {
 
    private KafkaTopics() {}
 
    // Forward flow topics
    public static final String ORDER_CREATED = "order.events.order-created";
    public static final String PAYMENT_PROCESSED = "payment.events.payment-processed";
    public static final String PAYMENT_FAILED = "payment.events.payment-failed";
    public static final String INVENTORY_RESERVED = "inventory.events.inventory-reserved";
    public static final String INVENTORY_RESERVATION_FAILED = "inventory.events.reservation-failed";
    public static final String SHIPMENT_CREATED = "shipping.events.shipment-created";
    public static final String SHIPMENT_FAILED = "shipping.events.shipment-failed";
 
    // Compensation flow topics
    public static final String PAYMENT_REFUNDED = "payment.events.payment-refunded";
    public static final String INVENTORY_RELEASED = "inventory.events.inventory-released";
    public static final String ORDER_CANCELLED = "order.events.order-cancelled";
 
    // Dead letter topics (suffix convention)
    public static final String DLT_SUFFIX = ".DLT";
}

6. Order Service - Full Implementation

// OrderService.java
package com.example.orderservice.service;
 
import com.example.orderservice.domain.*;
import com.example.orderservice.dto.PlaceOrderRequest;
import com.example.orderservice.outbox.OutboxEvent;
import com.example.orderservice.outbox.OutboxEventRepository;
import com.example.orderservice.repository.OrderRepository;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
 
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderService {
 
    private final OrderRepository orderRepository;
    private final OutboxEventRepository outboxEventRepository;
    private final ObjectMapper objectMapper;
 
    /**
     * Step 1 of saga: Create order and publish OrderCreatedEvent.
     * Uses Transactional Outbox pattern to ensure atomic DB write + event publication.
     */
    @Transactional
    public Order placeOrder(PlaceOrderRequest request) {
        String orderId = UUID.randomUUID().toString();
        String sagaId = UUID.randomUUID().toString();
 
        // Build order items
        List<OrderItem> items = request.items().stream()
            .map(itemReq -> OrderItem.builder()
                .productId(itemReq.productId())
                .productName(itemReq.productName())
                .quantity(itemReq.quantity())
                .unitPrice(itemReq.unitPrice())
                .build())
            .collect(Collectors.toList());
 
        // Create order
        Order order = Order.builder()
            .orderId(orderId)
            .customerId(request.customerId())
            .status(OrderStatus.PENDING)
            .totalAmount(request.totalAmount())
            .sagaId(sagaId)
            .build();
 
        items.forEach(item -> item.setOrder(order));
        order.setItems(items);
 
        orderRepository.save(order);
        log.info("Order created: orderId={}, sagaId={}, customerId={}",
            orderId, sagaId, request.customerId());
 
        // Build event
        List<OrderItemDto> itemDtos = items.stream()
            .map(i -> new OrderItemDto(i.getProductId(), i.getProductName(),
                                       i.getQuantity(), i.getUnitPrice()))
            .collect(Collectors.toList());
 
        OrderCreatedEvent event = OrderCreatedEvent.of(
            sagaId, orderId, request.customerId(), itemDtos, request.totalAmount()
        );
 
        // Write to outbox (same transaction - atomic!)
        saveToOutbox(KafkaTopics.ORDER_CREATED, orderId, event);
 
        return order;
    }
 
    /**
     * Compensation: Cancel order after all compensations upstream are done.
     * Triggered by PaymentRefundedEvent or PaymentFailedEvent (if payment never processed).
     */
    @Transactional
    public void cancelOrder(String orderId, String reason) {
        Order order = orderRepository.findByOrderIdForUpdate(orderId)
            .orElseThrow(() -> new OrderNotFoundException("Order not found: " + orderId));
 
        if (order.getStatus() == OrderStatus.CANCELLED) {
            log.info("Order {} already cancelled, skipping (idempotent)", orderId);
            return;  // Idempotent: already done
        }
 
        order.setStatus(OrderStatus.CANCELLED);
        order.setFailureReason(reason);
        orderRepository.save(order);
 
        log.info("Order cancelled: orderId={}, reason={}", orderId, reason);
 
        // Publish OrderCancelledEvent for downstream consumers
        var cancelledEvent = new OrderCancelledEvent(
            UUID.randomUUID().toString(),
            order.getSagaId(),
            orderId,
            order.getCustomerId(),
            reason,
            Instant.now()
        );
        saveToOutbox(KafkaTopics.ORDER_CANCELLED, orderId, cancelledEvent);
    }
 
    /**
     * Update order status when shipment is confirmed (saga complete).
     */
    @Transactional
    public void confirmOrder(String orderId, String shipmentId) {
        Order order = orderRepository.findByOrderIdForUpdate(orderId)
            .orElseThrow(() -> new OrderNotFoundException("Order not found: " + orderId));
 
        if (order.getStatus() == OrderStatus.CONFIRMED) {
            log.info("Order {} already confirmed (idempotent)", orderId);
            return;
        }
 
        order.setStatus(OrderStatus.CONFIRMED);
        orderRepository.save(order);
 
        log.info("Order confirmed: orderId={}, shipmentId={}", orderId, shipmentId);
    }
 
    private void saveToOutbox(String topic, String aggregateId, Object event) {
        try {
            OutboxEvent outboxEvent = OutboxEvent.builder()
                .eventId(UUID.randomUUID().toString())
                .aggregateId(aggregateId)
                .topicName(topic)
                .payload(objectMapper.writeValueAsString(event))
                .eventType(event.getClass().getSimpleName())
                .status(OutboxEvent.OutboxStatus.PENDING)
                .createdAt(Instant.now())
                .build();
            outboxEventRepository.save(outboxEvent);
        } catch (Exception e) {
            throw new RuntimeException("Failed to write to outbox", e);
        }
    }
}
// OrderSagaEventHandler.java - handles saga events for the Order Service
package com.example.orderservice.saga;
 
import com.example.orderservice.service.OrderService;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
 
@Component
@RequiredArgsConstructor
@Slf4j
public class OrderSagaEventHandler {
 
    private final OrderService orderService;
 
    /**
     * Listen for ShipmentCreatedEvent - saga success, update order to CONFIRMED.
     */
    @KafkaListener(
        topics = KafkaTopics.SHIPMENT_CREATED,
        groupId = "order-service-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handleShipmentCreated(
            @Payload ShipmentCreatedEvent event,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            Acknowledgment acknowledgment) {
 
        log.info("Received ShipmentCreatedEvent: orderId={}, shipmentId={}, partition={}, offset={}",
            event.orderId(), event.shipmentId(), partition, offset);
 
        try {
            orderService.confirmOrder(event.orderId(), event.shipmentId());
            acknowledgment.acknowledge();
        } catch (Exception e) {
            log.error("Failed to process ShipmentCreatedEvent for orderId={}", event.orderId(), e);
            // DO NOT acknowledge - will be retried via retry topic or DLT
            throw e;
        }
    }
 
    /**
     * Compensation: Listen for PaymentRefundedEvent - cancel the order.
     * This means payment was refunded because of a downstream failure.
     */
    @KafkaListener(
        topics = KafkaTopics.PAYMENT_REFUNDED,
        groupId = "order-service-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handlePaymentRefunded(
            @Payload PaymentRefundedEvent event,
            Acknowledgment acknowledgment) {
 
        log.info("Received PaymentRefundedEvent (compensation): orderId={}, sagaId={}",
            event.orderId(), event.sagaId());
 
        try {
            orderService.cancelOrder(event.orderId(), "Payment refunded due to downstream failure");
            acknowledgment.acknowledge();
        } catch (Exception e) {
            log.error("Failed to process compensation for orderId={}", event.orderId(), e);
            throw e;
        }
    }
 
    /**
     * Compensation: Listen for PaymentFailedEvent - cancel order directly
     * (no payment was made, so no refund needed).
     */
    @KafkaListener(
        topics = KafkaTopics.PAYMENT_FAILED,
        groupId = "order-service-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    public void handlePaymentFailed(
            @Payload PaymentFailedEvent event,
            Acknowledgment acknowledgment) {
 
        log.info("Received PaymentFailedEvent (direct cancel): orderId={}, reason={}",
            event.orderId(), event.failureReason());
 
        try {
            orderService.cancelOrder(event.orderId(),
                "Payment failed: " + event.failureReason());
            acknowledgment.acknowledge();
        } catch (Exception e) {
            log.error("Failed to handle PaymentFailedEvent for orderId={}", event.orderId(), e);
            throw e;
        }
    }
}
// OrderRepository.java
package com.example.orderservice.repository;
 
import com.example.orderservice.domain.Order;
import com.example.orderservice.domain.OrderStatus;
import jakarta.persistence.LockModeType;
import org.springframework.data.jpa.repository.*;
import org.springframework.data.repository.query.Param;
import java.util.Optional;
 
public interface OrderRepository extends JpaRepository<Order, String> {
 
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @Query("SELECT o FROM Order o WHERE o.orderId = :orderId")
    Optional<Order> findByOrderIdForUpdate(@Param("orderId") String orderId);
 
    Optional<Order> findBySagaId(String sagaId);
 
    @Query("SELECT o FROM Order o WHERE o.status = :status")
    java.util.List<Order> findByStatus(@Param("status") OrderStatus status);
}
// OrderController.java - REST API
package com.example.orderservice.controller;
 
import com.example.orderservice.domain.Order;
import com.example.orderservice.dto.PlaceOrderRequest;
import com.example.orderservice.dto.OrderResponse;
import com.example.orderservice.service.OrderService;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
 
@RestController
@RequestMapping("/api/v1/orders")
@RequiredArgsConstructor
public class OrderController {
 
    private final OrderService orderService;
 
    @PostMapping
    public ResponseEntity<OrderResponse> placeOrder(@Valid @RequestBody PlaceOrderRequest request) {
        Order order = orderService.placeOrder(request);
        return ResponseEntity.status(HttpStatus.ACCEPTED)
            .body(OrderResponse.from(order));
    }
 
    @GetMapping("/{orderId}")
    public ResponseEntity<OrderResponse> getOrder(@PathVariable String orderId) {
        // query implementation
        return ResponseEntity.ok(OrderResponse.placeholder(orderId));
    }
}

7. Payment Service - Full Implementation

// PaymentService.java
package com.example.paymentservice.service;
 
import com.example.paymentservice.domain.*;
import com.example.paymentservice.gateway.PaymentGatewayClient;
import com.example.paymentservice.outbox.OutboxEvent;
import com.example.paymentservice.outbox.OutboxEventRepository;
import com.example.paymentservice.repository.PaymentRepository;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
import java.time.Instant;
import java.util.UUID;
 
@Service
@RequiredArgsConstructor
@Slf4j
public class PaymentService {
 
    private final PaymentRepository paymentRepository;
    private final OutboxEventRepository outboxEventRepository;
    private final PaymentGatewayClient gatewayClient;
    private final ObjectMapper objectMapper;
 
    /**
     * Process payment for order. Triggered by OrderCreatedEvent.
     * Idempotent: Checks if payment already processed for this order.
     */
    @Transactional
    public void processPayment(OrderCreatedEvent event) {
        // Idempotency check: has this order already been processed?
        if (paymentRepository.existsByOrderId(event.orderId())) {
            log.info("Payment already processed for orderId={} (idempotent)", event.orderId());
            return;
        }
 
        log.info("Processing payment: orderId={}, amount={}", event.orderId(), event.totalAmount());
 
        String paymentId = UUID.randomUUID().toString();
        Payment payment = Payment.builder()
            .paymentId(paymentId)
            .orderId(event.orderId())
            .customerId(event.customerId())
            .amount(event.totalAmount())
            .status(PaymentStatus.PROCESSING)
            .sagaId(event.sagaId())
            .build();
        paymentRepository.save(payment);
 
        try {
            // Call external payment gateway
            PaymentGatewayResponse gatewayResponse = gatewayClient.charge(
                event.customerId(),
                event.totalAmount(),
                paymentId
            );
 
            payment.setStatus(PaymentStatus.CHARGED);
            payment.setExternalTransactionId(gatewayResponse.transactionId());
            paymentRepository.save(payment);
 
            // Publish success event to outbox
            PaymentProcessedEvent successEvent = new PaymentProcessedEvent(
                UUID.randomUUID().toString(),
                event.sagaId(),
                event.orderId(),
                paymentId,
                event.customerId(),
                event.totalAmount(),
                "CREDIT_CARD",
                gatewayResponse.transactionId(),
                Instant.now()
            );
            saveToOutbox(KafkaTopics.PAYMENT_PROCESSED, event.orderId(), successEvent);
 
            log.info("Payment processed successfully: orderId={}, paymentId={}", event.orderId(), paymentId);
 
        } catch (PaymentDeclinedException e) {
            // Payment was declined - saga must compensate
            payment.setStatus(PaymentStatus.DECLINED);
            payment.setFailureReason(e.getMessage());
            paymentRepository.save(payment);
 
            PaymentFailedEvent failureEvent = new PaymentFailedEvent(
                UUID.randomUUID().toString(),
                event.sagaId(),
                event.orderId(),
                event.customerId(),
                e.getMessage(),
                e.getDeclineCode(),
                Instant.now()
            );
            saveToOutbox(KafkaTopics.PAYMENT_FAILED, event.orderId(), failureEvent);
 
            log.warn("Payment declined for orderId={}: {}", event.orderId(), e.getMessage());
        }
    }
 
    /**
     * Compensation: Refund payment. Triggered by InventoryReservationFailedEvent.
     * Idempotent: Safe to call multiple times.
     */
    @Transactional
    public void refundPayment(String orderId, String reason) {
        Payment payment = paymentRepository.findByOrderId(orderId)
            .orElseThrow(() -> new PaymentNotFoundException("Payment not found for order: " + orderId));
 
        // Idempotency: already refunded?
        if (payment.getStatus() == PaymentStatus.REFUNDED) {
            log.info("Payment for orderId={} already refunded (idempotent)", orderId);
            return;
        }
 
        // Only refund if actually charged
        if (payment.getStatus() != PaymentStatus.CHARGED) {
            log.warn("Cannot refund payment in status {} for orderId={}", payment.getStatus(), orderId);
            // Publish refunded event anyway to continue compensation chain
            publishRefundEvent(payment, "SKIPPED - not charged");
            return;
        }
 
        log.info("Refunding payment for orderId={}, amount={}", orderId, payment.getAmount());
 
        try {
            // Call gateway refund
            String refundTransactionId = gatewayClient.refund(
                payment.getExternalTransactionId(),
                payment.getAmount()
            );
 
            payment.setStatus(PaymentStatus.REFUNDED);
            payment.setRefundTransactionId(refundTransactionId);
            payment.setRefundReason(reason);
            paymentRepository.save(payment);
 
            publishRefundEvent(payment, refundTransactionId);
 
            log.info("Payment refunded: orderId={}, refundTxn={}", orderId, refundTransactionId);
 
        } catch (Exception e) {
            // Refund failed - this is a critical situation
            // Log for manual intervention, but still attempt to continue compensation
            log.error("CRITICAL: Refund failed for orderId={}, manual intervention required", orderId, e);
            // In production: alert ops team, create support ticket
            throw new CompensationFailedException("Refund failed for order " + orderId, e);
        }
    }
 
    private void publishRefundEvent(Payment payment, String refundTxnId) {
        PaymentRefundedEvent refundEvent = new PaymentRefundedEvent(
            UUID.randomUUID().toString(),
            payment.getSagaId(),
            payment.getOrderId(),
            payment.getPaymentId(),
            payment.getAmount(),
            refundTxnId,
            Instant.now()
        );
        saveToOutbox(KafkaTopics.PAYMENT_REFUNDED, payment.getOrderId(), refundEvent);
    }
 
    private void saveToOutbox(String topic, String aggregateId, Object event) {
        try {
            OutboxEvent outboxEvent = OutboxEvent.builder()
                .eventId(UUID.randomUUID().toString())
                .aggregateId(aggregateId)
                .topicName(topic)
                .payload(objectMapper.writeValueAsString(event))
                .eventType(event.getClass().getSimpleName())
                .status(OutboxEvent.OutboxStatus.PENDING)
                .createdAt(Instant.now())
                .build();
            outboxEventRepository.save(outboxEvent);
        } catch (Exception e) {
            throw new RuntimeException("Failed to write to outbox", e);
        }
    }
}
// PaymentSagaEventHandler.java
package com.example.paymentservice.saga;
 
import com.example.paymentservice.service.PaymentService;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
 
@Component
@RequiredArgsConstructor
@Slf4j
public class PaymentSagaEventHandler {
 
    private final PaymentService paymentService;
 
    /**
     * Forward flow: Process payment when order is created.
     */
    @KafkaListener(
        topics = KafkaTopics.ORDER_CREATED,
        groupId = "payment-service-group"
    )
    public void handleOrderCreated(@Payload OrderCreatedEvent event, Acknowledgment ack) {
        log.info("PaymentService received OrderCreatedEvent: orderId={}", event.orderId());
        try {
            paymentService.processPayment(event);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Failed to process OrderCreatedEvent: orderId={}", event.orderId(), e);
            throw e;  // Let retry/DLT handle it
        }
    }
 
    /**
     * Compensation: Refund payment when inventory reservation fails.
     */
    @KafkaListener(
        topics = KafkaTopics.INVENTORY_RESERVATION_FAILED,
        groupId = "payment-service-group"
    )
    public void handleInventoryReservationFailed(
            @Payload InventoryReservationFailedEvent event,
            Acknowledgment ack) {
 
        log.info("PaymentService received InventoryReservationFailedEvent: orderId={}", event.orderId());
        try {
            paymentService.refundPayment(event.orderId(), event.failureReason());
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Compensation failed for orderId={}", event.orderId(), e);
            throw e;
        }
    }
}

8. Inventory Service - Full Implementation

// InventoryService.java
package com.example.inventoryservice.service;
 
import com.example.inventoryservice.domain.*;
import com.example.inventoryservice.outbox.OutboxEvent;
import com.example.inventoryservice.outbox.OutboxEventRepository;
import com.example.inventoryservice.repository.*;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
 
@Service
@RequiredArgsConstructor
@Slf4j
public class InventoryService {
 
    private final InventoryItemRepository inventoryRepository;
    private final ReservationRepository reservationRepository;
    private final OutboxEventRepository outboxEventRepository;
    private final ObjectMapper objectMapper;
 
    /**
     * Reserve inventory items. Triggered by PaymentProcessedEvent.
     * Uses pessimistic locking to prevent concurrent reservation issues.
     * Idempotent: checks existing reservation.
     */
    @Transactional
    public void reserveInventory(PaymentProcessedEvent event) {
        // Idempotency check
        if (reservationRepository.existsByOrderId(event.orderId())) {
            log.info("Inventory already reserved for orderId={} (idempotent)", event.orderId());
            return;
        }
 
        log.info("Reserving inventory for orderId={}", event.orderId());
 
        // We need to get items from the original order event context
        // In a real system, either the event contains items OR we call Order Service
        // For this example, assume the event contains item information
        // (passed through a richer PaymentProcessedEvent or looked up from a read model)
 
        List<String> failedProducts = new ArrayList<>();
        List<ReservedItemDto> reservedItems = new ArrayList<>();
        String reservationId = UUID.randomUUID().toString();
 
        // In real implementation, get items from the PaymentProcessedEvent
        // which carries the order details, or from a CQRS read model
        List<OrderItemDto> itemsToReserve = getOrderItems(event.orderId());
 
        for (OrderItemDto item : itemsToReserve) {
            // Pessimistic lock on the inventory row to prevent concurrent access
            InventoryItem inventoryItem = inventoryRepository
                .findByProductIdWithLock(item.productId())
                .orElseThrow(() -> new ProductNotFoundException("Product not found: " + item.productId()));
 
            if (inventoryItem.getAvailableQuantity() < item.quantity()) {
                failedProducts.add(item.productId());
                log.warn("Insufficient inventory: productId={}, available={}, requested={}",
                    item.productId(), inventoryItem.getAvailableQuantity(), item.quantity());
            } else {
                // Reserve the items
                inventoryItem.setAvailableQuantity(
                    inventoryItem.getAvailableQuantity() - item.quantity()
                );
                inventoryItem.setReservedQuantity(
                    inventoryItem.getReservedQuantity() + item.quantity()
                );
                inventoryRepository.save(inventoryItem);
 
                reservedItems.add(new ReservedItemDto(
                    item.productId(),
                    item.quantity(),
                    inventoryItem.getWarehouseId()
                ));
            }
        }
 
        if (!failedProducts.isEmpty()) {
            // Rollback any partial reservations we already made
            rollbackPartialReservations(reservedItems, reservationId);
 
            // Publish failure event - triggers compensation
            InventoryReservationFailedEvent failureEvent = new InventoryReservationFailedEvent(
                UUID.randomUUID().toString(),
                event.sagaId(),
                event.orderId(),
                failedProducts,
                "Insufficient stock for products: " + failedProducts,
                Instant.now()
            );
            saveToOutbox(KafkaTopics.INVENTORY_RESERVATION_FAILED, event.orderId(), failureEvent);
 
            log.warn("Inventory reservation failed for orderId={}, failedProducts={}",
                event.orderId(), failedProducts);
        } else {
            // All items reserved successfully
            // Save reservation record
            Reservation reservation = Reservation.builder()
                .reservationId(reservationId)
                .orderId(event.orderId())
                .sagaId(event.sagaId())
                .status(ReservationStatus.RESERVED)
                .createdAt(Instant.now())
                .build();
            reservationRepository.save(reservation);
 
            // Publish success event
            InventoryReservedEvent successEvent = new InventoryReservedEvent(
                UUID.randomUUID().toString(),
                event.sagaId(),
                event.orderId(),
                reservationId,
                reservedItems,
                Instant.now()
            );
            saveToOutbox(KafkaTopics.INVENTORY_RESERVED, event.orderId(), successEvent);
 
            log.info("Inventory reserved successfully: orderId={}, reservationId={}",
                event.orderId(), reservationId);
        }
    }
 
    /**
     * Compensation: Release reserved inventory when order is cancelled.
     * Idempotent.
     */
    @Transactional
    public void releaseInventory(String orderId) {
        Reservation reservation = reservationRepository.findByOrderId(orderId)
            .orElse(null);
 
        if (reservation == null) {
            log.info("No reservation found for orderId={}, nothing to release", orderId);
            return;
        }
 
        if (reservation.getStatus() == ReservationStatus.RELEASED) {
            log.info("Reservation for orderId={} already released (idempotent)", orderId);
            return;
        }
 
        log.info("Releasing inventory reservation: orderId={}, reservationId={}",
            orderId, reservation.getReservationId());
 
        // Release each reserved item back to available
        List<OrderItemDto> items = getOrderItems(orderId);
        for (OrderItemDto item : items) {
            inventoryRepository.findByProductIdWithLock(item.productId())
                .ifPresent(inventoryItem -> {
                    inventoryItem.setAvailableQuantity(
                        inventoryItem.getAvailableQuantity() + item.quantity()
                    );
                    inventoryItem.setReservedQuantity(
                        Math.max(0, inventoryItem.getReservedQuantity() - item.quantity())
                    );
                    inventoryRepository.save(inventoryItem);
                });
        }
 
        reservation.setStatus(ReservationStatus.RELEASED);
        reservationRepository.save(reservation);
 
        log.info("Inventory released for orderId={}", orderId);
    }
 
    private void rollbackPartialReservations(List<ReservedItemDto> reservedItems, String reservationId) {
        for (ReservedItemDto item : reservedItems) {
            inventoryRepository.findByProductIdWithLock(item.productId())
                .ifPresent(inventoryItem -> {
                    inventoryItem.setAvailableQuantity(
                        inventoryItem.getAvailableQuantity() + item.reservedQuantity()
                    );
                    inventoryItem.setReservedQuantity(
                        Math.max(0, inventoryItem.getReservedQuantity() - item.reservedQuantity())
                    );
                    inventoryRepository.save(inventoryItem);
                });
        }
        log.info("Partial reservations rolled back for reservationId={}", reservationId);
    }
 
    private List<OrderItemDto> getOrderItems(String orderId) {
        // In real system: call Order Service via REST or read from CQRS read model
        // For now, placeholder
        return List.of();  // Fetch from read model or order-service-client
    }
 
    private void saveToOutbox(String topic, String aggregateId, Object event) {
        try {
            OutboxEvent outboxEvent = OutboxEvent.builder()
                .eventId(UUID.randomUUID().toString())
                .aggregateId(aggregateId)
                .topicName(topic)
                .payload(objectMapper.writeValueAsString(event))
                .eventType(event.getClass().getSimpleName())
                .status(OutboxEvent.OutboxStatus.PENDING)
                .createdAt(Instant.now())
                .build();
            outboxEventRepository.save(outboxEvent);
        } catch (Exception e) {
            throw new RuntimeException("Failed to write to outbox", e);
        }
    }
}
// InventorySagaEventHandler.java
package com.example.inventoryservice.saga;
 
import com.example.inventoryservice.service.InventoryService;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
 
@Component
@RequiredArgsConstructor
@Slf4j
public class InventorySagaEventHandler {
 
    private final InventoryService inventoryService;
 
    @KafkaListener(
        topics = KafkaTopics.PAYMENT_PROCESSED,
        groupId = "inventory-service-group"
    )
    public void handlePaymentProcessed(@Payload PaymentProcessedEvent event, Acknowledgment ack) {
        log.info("InventoryService received PaymentProcessedEvent: orderId={}", event.orderId());
        try {
            inventoryService.reserveInventory(event);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Failed to reserve inventory for orderId={}", event.orderId(), e);
            throw e;
        }
    }
 
    @KafkaListener(
        topics = KafkaTopics.ORDER_CANCELLED,
        groupId = "inventory-service-group"
    )
    public void handleOrderCancelled(@Payload OrderCancelledEvent event, Acknowledgment ack) {
        log.info("InventoryService received OrderCancelledEvent: orderId={}", event.orderId());
        try {
            inventoryService.releaseInventory(event.orderId());
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Failed to release inventory for orderId={}", event.orderId(), e);
            throw e;
        }
    }
}

9. Shipping Service - Full Implementation

// ShippingService.java
package com.example.shippingservice.service;
 
import com.example.shippingservice.domain.*;
import com.example.shippingservice.client.ShippingCarrierClient;
import com.example.shippingservice.outbox.OutboxEvent;
import com.example.shippingservice.outbox.OutboxEventRepository;
import com.example.shippingservice.repository.ShipmentRepository;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
import java.time.Instant;
import java.time.LocalDate;
import java.util.UUID;
 
@Service
@RequiredArgsConstructor
@Slf4j
public class ShippingService {
 
    private final ShipmentRepository shipmentRepository;
    private final OutboxEventRepository outboxEventRepository;
    private final ShippingCarrierClient carrierClient;
    private final ObjectMapper objectMapper;
 
    /**
     * Schedule shipment. This is a RETRIABLE transaction (after pivot).
     * If this fails, we retry - we do NOT compensate backward.
     * The payment is taken, inventory is reserved. We MUST ship.
     */
    @Transactional
    @Retryable(
        retryFor = {ShipmentCarrierException.class},
        maxAttempts = 5,
        backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 30000)
    )
    public void createShipment(InventoryReservedEvent event) {
        // Idempotency check
        if (shipmentRepository.existsByOrderId(event.orderId())) {
            log.info("Shipment already created for orderId={} (idempotent)", event.orderId());
            return;
        }
 
        log.info("Creating shipment for orderId={}", event.orderId());
 
        String shipmentId = UUID.randomUUID().toString();
        Shipment shipment = Shipment.builder()
            .shipmentId(shipmentId)
            .orderId(event.orderId())
            .sagaId(event.sagaId())
            .status(ShipmentStatus.PENDING)
            .reservationId(event.reservationId())
            .createdAt(Instant.now())
            .build();
        shipmentRepository.save(shipment);
 
        // Call carrier API (retriable)
        CarrierBookingResponse booking = carrierClient.bookPickup(
            event.orderId(),
            event.reservedItems()
        );
 
        shipment.setTrackingNumber(booking.trackingNumber());
        shipment.setCarrierCode(booking.carrierCode());
        shipment.setEstimatedDelivery(booking.estimatedDelivery());
        shipment.setStatus(ShipmentStatus.SCHEDULED);
        shipmentRepository.save(shipment);
 
        // Publish success event - this triggers order confirmation
        ShipmentCreatedEvent successEvent = new ShipmentCreatedEvent(
            UUID.randomUUID().toString(),
            event.sagaId(),
            event.orderId(),
            shipmentId,
            booking.trackingNumber(),
            booking.estimatedDelivery(),
            Instant.now()
        );
        saveToOutbox(KafkaTopics.SHIPMENT_CREATED, event.orderId(), successEvent);
 
        log.info("Shipment created: orderId={}, trackingNumber={}", event.orderId(), booking.trackingNumber());
    }
 
    private void saveToOutbox(String topic, String aggregateId, Object event) {
        try {
            OutboxEvent outboxEvent = OutboxEvent.builder()
                .eventId(UUID.randomUUID().toString())
                .aggregateId(aggregateId)
                .topicName(topic)
                .payload(objectMapper.writeValueAsString(event))
                .eventType(event.getClass().getSimpleName())
                .status(OutboxEvent.OutboxStatus.PENDING)
                .createdAt(Instant.now())
                .build();
            outboxEventRepository.save(outboxEvent);
        } catch (Exception e) {
            throw new RuntimeException("Failed to write to outbox", e);
        }
    }
}

10. Kafka Configuration (AWS MSK)

// KafkaConfig.java
package com.example.orderservice.config;
 
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.backoff.FixedBackOff;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
@EnableKafka
public class KafkaConfig {
 
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
 
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
 
    // ==================<mark class="obsidian-highlight"> PRODUCER CONFIGURATION </mark>==================
 
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
 
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
 
        // Reliability settings (production)
        config.put(ProducerConfig.ACKS_CONFIG, "all");         // All ISR must acknowledge
        config.put(ProducerConfig.RETRIES_CONFIG, 3);
        config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Preserve ordering
 
        // Idempotent producer (prevents duplicate messages on retry)
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
 
        // Performance settings
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        config.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
 
        // AWS MSK IAM Authentication
        if (bootstrapServers.contains("amazonaws.com")) {
            config.put("security.protocol", "SASL_SSL");
            config.put("sasl.mechanism", "AWS_MSK_IAM");
            config.put("sasl.jaas.config",
                "software.amazon.msk.auth.iam.IAMLoginModule required;");
            config.put("sasl.client.callback.handler.class",
                "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
        }
 
        return new DefaultKafkaProducerFactory<>(config);
    }
 
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
 
    // ==================<mark class="obsidian-highlight"> CONSUMER CONFIGURATION </mark>==================
 
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> config = new HashMap<>();
 
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
 
        // Manual offset commit for reliability
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
        // Fetch settings
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 minutes max processing
 
        // Trusted packages for deserialization
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.*");
        config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "java.util.Map");
 
        // Offset reset: earliest for new groups, latest for existing
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
        // AWS MSK IAM Authentication
        if (bootstrapServers.contains("amazonaws.com")) {
            config.put("security.protocol", "SASL_SSL");
            config.put("sasl.mechanism", "AWS_MSK_IAM");
            config.put("sasl.jaas.config",
                "software.amazon.msk.auth.iam.IAMLoginModule required;");
            config.put("sasl.client.callback.handler.class",
                "software.amazon.msk.auth.iam.IAMClientCallbackHandler");
        }
 
        return new DefaultKafkaConsumerFactory<>(config);
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
 
        factory.setConsumerFactory(consumerFactory());
 
        // Manual acknowledgment mode - we control when offset is committed
        factory.getContainerProperties()
            .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
 
        // Concurrency: number of threads per listener (set to number of topic partitions)
        factory.setConcurrency(3);
 
        // Error handler with retry + Dead Letter Topic
        factory.setCommonErrorHandler(errorHandler());
 
        return factory;
    }
 
    @Bean
    public DefaultErrorHandler errorHandler() {
        // Retry up to 3 times with 1 second delay, then send to DLT
        FixedBackOff backOff = new FixedBackOff(1000L, 3L);
 
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            deadLetterPublishingRecoverer(),
            backOff
        );
 
        // Do not retry on these exceptions (business logic errors, not transient)
        errorHandler.addNotRetryableExceptions(
            IllegalArgumentException.class,
            IllegalStateException.class
        );
 
        return errorHandler;
    }
 
    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
        return new DeadLetterPublishingRecoverer(kafkaTemplate());
        // Default: sends to {topic}.DLT
    }
}

11. MySQL Schema Design

-- ==========================================================<mark class="obsidian-highlight">
-- ORDER SERVICE DATABASE
-- </mark>==========================================================
 
CREATE TABLE orders (
    order_id     VARCHAR(36)    NOT NULL,
    customer_id  VARCHAR(36)    NOT NULL,
    status       VARCHAR(30)    NOT NULL DEFAULT 'PENDING',
    total_amount DECIMAL(15, 2) NOT NULL,
    saga_id      VARCHAR(36),
    failure_reason VARCHAR(500),
    created_at   DATETIME(6)    NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at   DATETIME(6)    NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
    version      BIGINT         NOT NULL DEFAULT 0,
    PRIMARY KEY (order_id),
    INDEX idx_orders_customer_id (customer_id),
    INDEX idx_orders_status      (status),
    INDEX idx_orders_saga_id     (saga_id),
    INDEX idx_orders_created_at  (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
CREATE TABLE order_items (
    id           BIGINT         NOT NULL AUTO_INCREMENT,
    order_id     VARCHAR(36)    NOT NULL,
    product_id   VARCHAR(36)    NOT NULL,
    product_name VARCHAR(255)   NOT NULL,
    quantity     INT            NOT NULL,
    unit_price   DECIMAL(15, 2) NOT NULL,
    PRIMARY KEY (id),
    INDEX idx_order_items_order_id (order_id),
    FOREIGN KEY (order_id) REFERENCES orders(order_id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
-- Transactional Outbox Table (shared pattern for all services)
CREATE TABLE outbox_events (
    event_id      VARCHAR(36)   NOT NULL,
    aggregate_id  VARCHAR(36)   NOT NULL,  -- e.g., order_id
    topic_name    VARCHAR(255)  NOT NULL,
    event_type    VARCHAR(100)  NOT NULL,
    payload       JSON          NOT NULL,
    status        VARCHAR(20)   NOT NULL DEFAULT 'PENDING', -- PENDING, PUBLISHED, FAILED
    retry_count   INT           NOT NULL DEFAULT 0,
    error_message VARCHAR(1000),
    created_at    DATETIME(6)   NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    published_at  DATETIME(6),
    PRIMARY KEY (event_id),
    INDEX idx_outbox_status      (status, created_at),
    INDEX idx_outbox_aggregate   (aggregate_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
-- ==========================================================<mark class="obsidian-highlight">
-- PAYMENT SERVICE DATABASE
-- </mark>==========================================================
 
CREATE TABLE payments (
    payment_id              VARCHAR(36)    NOT NULL,
    order_id                VARCHAR(36)    NOT NULL UNIQUE,  -- one payment per order
    customer_id             VARCHAR(36)    NOT NULL,
    saga_id                 VARCHAR(36),
    amount                  DECIMAL(15, 2) NOT NULL,
    status                  VARCHAR(30)    NOT NULL DEFAULT 'PROCESSING',
    payment_method          VARCHAR(50),
    external_transaction_id VARCHAR(100),
    refund_transaction_id   VARCHAR(100),
    refund_reason           VARCHAR(500),
    failure_reason          VARCHAR(500),
    created_at              DATETIME(6)    NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at              DATETIME(6)    NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
    version                 BIGINT         NOT NULL DEFAULT 0,
    PRIMARY KEY (payment_id),
    UNIQUE INDEX idx_payments_order_id (order_id),
    INDEX idx_payments_customer_id (customer_id),
    INDEX idx_payments_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
-- ==========================================================<mark class="obsidian-highlight">
-- INVENTORY SERVICE DATABASE
-- </mark>==========================================================
 
CREATE TABLE inventory_items (
    product_id          VARCHAR(36)  NOT NULL,
    product_name        VARCHAR(255) NOT NULL,
    available_quantity  INT          NOT NULL DEFAULT 0,
    reserved_quantity   INT          NOT NULL DEFAULT 0,
    warehouse_id        VARCHAR(36)  NOT NULL,
    updated_at          DATETIME(6)  NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
    version             BIGINT       NOT NULL DEFAULT 0,
    PRIMARY KEY (product_id),
    INDEX idx_inventory_warehouse (warehouse_id),
    CONSTRAINT chk_inventory_non_negative CHECK (available_quantity >= 0 AND reserved_quantity >= 0)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
CREATE TABLE reservations (
    reservation_id VARCHAR(36)  NOT NULL,
    order_id       VARCHAR(36)  NOT NULL UNIQUE,
    saga_id        VARCHAR(36),
    status         VARCHAR(20)  NOT NULL DEFAULT 'RESERVED',  -- RESERVED, RELEASED, FULFILLED
    created_at     DATETIME(6)  NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
    updated_at     DATETIME(6)  NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
    PRIMARY KEY (reservation_id),
    UNIQUE INDEX idx_reservations_order_id (order_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

12. Complete application.yml

# application.yml for Order Service
spring:
  application:
    name: order-service
 
  datasource:
    url: jdbc:mysql://${DB_HOST:localhost}:3306/orders_db?useSSL=true&requireSSL=true&verifyServerCertificate=false&serverTimezone=UTC&createDatabaseIfNotExist=true
    username: ${DB_USERNAME}
    password: ${DB_PASSWORD}
    hikari:
      pool-name: OrderServiceHikariPool
      minimum-idle: 5
      maximum-pool-size: 20
      idle-timeout: 600000     # 10 minutes
      max-lifetime: 1800000    # 30 minutes
      connection-timeout: 30000
      validation-timeout: 5000
      connection-test-query: SELECT 1
 
  jpa:
    hibernate:
      ddl-auto: validate   # Use Flyway in production, never auto
    show-sql: false
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQLDialect
        format_sql: true
        default_batch_fetch_size: 20
        jdbc:
          batch_size: 20
          order_inserts: true
          order_updates: true
 
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
    consumer:
      group-id: order-service-group
      auto-offset-reset: earliest
      enable-auto-commit: false
      max-poll-records: 10
      properties:
        max.poll.interval.ms: 300000
    producer:
      acks: all
      retries: 3
      properties:
        enable.idempotence: true
        max.in.flight.requests.per.connection: 1
 
# Outbox publisher scheduler
outbox:
  publisher:
    enabled: true
    cron: "*/5 * * * * *"    # Every 5 seconds
    batch-size: 50
    max-retries: 3
 
# Retry configuration for Spring Retry
spring:
  retry:
    enabled: true
 
management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,prometheus
  endpoint:
    health:
      show-details: always
  metrics:
    tags:
      application: order-service
      environment: ${ENVIRONMENT:dev}
 
logging:
  level:
    com.example: INFO
    org.springframework.kafka: WARN
    org.hibernate.SQL: WARN
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%X{traceId},%X{spanId}] %logger{36} - %msg%n"

13. Handling the Outbox Pattern in Choreography

The Outbox pattern is ESSENTIAL in choreography. Without it, you have a dual-write problem:
write to DB and publish event are TWO separate operations that can fail independently.

See Part 4 - Deep Dive Implementation for the
complete Outbox implementation. Here is the key concept:

// OutboxPublisher.java - scheduled job that reads outbox and publishes to Kafka
@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxPublisher {
 
    private final OutboxEventRepository outboxEventRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final ObjectMapper objectMapper;
 
    @Scheduled(cron = "${outbox.publisher.cron:*/5 * * * * *}")
    @Transactional
    public void publishPendingEvents() {
        List<OutboxEvent> pendingEvents = outboxEventRepository
            .findTop50ByStatusOrderByCreatedAtAsc(OutboxEvent.OutboxStatus.PENDING);
 
        for (OutboxEvent outboxEvent : pendingEvents) {
            try {
                Object payload = objectMapper.readValue(outboxEvent.getPayload(), Object.class);
 
                kafkaTemplate.send(
                    outboxEvent.getTopicName(),
                    outboxEvent.getAggregateId(),  // Use aggregateId as Kafka key for ordering
                    payload
                ).get(5, TimeUnit.SECONDS);  // Sync send with timeout for reliability
 
                outboxEvent.setStatus(OutboxEvent.OutboxStatus.PUBLISHED);
                outboxEvent.setPublishedAt(Instant.now());
                outboxEventRepository.save(outboxEvent);
 
            } catch (Exception e) {
                log.error("Failed to publish outbox event: eventId={}, topic={}",
                    outboxEvent.getEventId(), outboxEvent.getTopicName(), e);
 
                outboxEvent.setRetryCount(outboxEvent.getRetryCount() + 1);
                outboxEvent.setErrorMessage(e.getMessage());
 
                if (outboxEvent.getRetryCount() >= 3) {
                    outboxEvent.setStatus(OutboxEvent.OutboxStatus.FAILED);
                    // Alert: send notification to ops team
                }
                outboxEventRepository.save(outboxEvent);
            }
        }
    }
}

14. Error Handling and Retry Strategies

// RetryConfig.java
@Configuration
@EnableRetry
public class RetryConfig {
 
    @Bean
    public RetryTemplate retryTemplate() {
        return RetryTemplate.builder()
            .maxAttempts(3)
            .exponentialBackoff(1000, 2, 10000)  // 1s, 2s, 4s, max 10s
            .retryOn(TransientDataAccessException.class)
            .retryOn(RuntimeException.class)
            .notRetryOn(IllegalArgumentException.class)
            .notRetryOn(PaymentDeclinedException.class)  // Business error, no retry
            .build();
    }
}

Dead Letter Topic Consumer

// DeadLetterTopicConsumer.java
@Component
@RequiredArgsConstructor
@Slf4j
public class DeadLetterTopicConsumer {
 
    private final AlertService alertService;
    private final ManualInterventionRepository interventionRepo;
 
    @KafkaListener(
        topicPattern = ".*.DLT",
        groupId = "dlt-consumer-group"
    )
    public void handleDeadLetterEvent(
            @Payload String rawMessage,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.EXCEPTION_MESSAGE) String exceptionMessage,
            @Header(KafkaHeaders.EXCEPTION_STACKTRACE) String stackTrace,
            Acknowledgment ack) {
 
        log.error("Message in DLT: topic={}, error={}", topic, exceptionMessage);
 
        // Save for manual review
        ManualInterventionRecord record = ManualInterventionRecord.builder()
            .topic(topic)
            .rawMessage(rawMessage)
            .errorMessage(exceptionMessage)
            .stackTrace(stackTrace.substring(0, Math.min(stackTrace.length(), 2000)))
            .createdAt(Instant.now())
            .status("PENDING_REVIEW")
            .build();
        interventionRepo.save(record);
 
        // Alert the operations team
        alertService.sendAlert(
            "Dead Letter Topic Message",
            String.format("Topic: %s, Error: %s", topic, exceptionMessage)
        );
 
        ack.acknowledge();
    }
}

15. Testing Choreography SAGAs

// OrderSagaChoreographyIntegrationTest.java
@SpringBootTest
@Testcontainers
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class OrderSagaChoreographyIntegrationTest {
 
    @Container
    static MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0")
        .withDatabaseName("test_orders")
        .withUsername("test")
        .withPassword("test");
 
    @Container
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
 
    @Autowired
    private OrderService orderService;
 
    @Autowired
    private OrderRepository orderRepository;
 
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    @Test
    @DisplayName("Happy path: Complete saga from order creation to confirmation")
    void testHappyPath() throws InterruptedException {
        // Given
        PlaceOrderRequest request = buildTestOrderRequest();
 
        // When
        Order order = orderService.placeOrder(request);
 
        // Then: simulate saga steps by publishing events
        PaymentProcessedEvent paymentEvent = buildPaymentProcessedEvent(order);
        kafkaTemplate.send(KafkaTopics.PAYMENT_PROCESSED, order.getOrderId(), paymentEvent);
 
        InventoryReservedEvent inventoryEvent = buildInventoryReservedEvent(order);
        kafkaTemplate.send(KafkaTopics.INVENTORY_RESERVED, order.getOrderId(), inventoryEvent);
 
        ShipmentCreatedEvent shipmentEvent = buildShipmentCreatedEvent(order);
        kafkaTemplate.send(KafkaTopics.SHIPMENT_CREATED, order.getOrderId(), shipmentEvent);
 
        // Wait for async processing
        await().atMost(30, TimeUnit.SECONDS)
            .pollInterval(500, TimeUnit.MILLISECONDS)
            .until(() -> {
                Order updated = orderRepository.findById(order.getOrderId()).orElseThrow();
                return updated.getStatus() == OrderStatus.CONFIRMED;
            });
 
        // Verify
        Order finalOrder = orderRepository.findById(order.getOrderId()).orElseThrow();
        assertThat(finalOrder.getStatus()).isEqualTo(OrderStatus.CONFIRMED);
    }
 
    @Test
    @DisplayName("Compensation path: Order cancelled when inventory fails")
    void testInventoryFailureCompensation() throws InterruptedException {
        // Given
        PlaceOrderRequest request = buildTestOrderRequest();
        Order order = orderService.placeOrder(request);
 
        // When: Payment succeeds but Inventory fails
        PaymentProcessedEvent paymentEvent = buildPaymentProcessedEvent(order);
        kafkaTemplate.send(KafkaTopics.PAYMENT_PROCESSED, order.getOrderId(), paymentEvent);
 
        InventoryReservationFailedEvent failureEvent = new InventoryReservationFailedEvent(
            UUID.randomUUID().toString(),
            order.getSagaId(),
            order.getOrderId(),
            List.of("PROD-999"),
            "Out of stock",
            Instant.now()
        );
        kafkaTemplate.send(KafkaTopics.INVENTORY_RESERVATION_FAILED, order.getOrderId(), failureEvent);
 
        // Then: Payment should be refunded, Order should be cancelled
        await().atMost(30, TimeUnit.SECONDS)
            .until(() -> {
                Order updated = orderRepository.findById(order.getOrderId()).orElseThrow();
                return updated.getStatus() == OrderStatus.CANCELLED;
            });
 
        Order cancelledOrder = orderRepository.findById(order.getOrderId()).orElseThrow();
        assertThat(cancelledOrder.getStatus()).isEqualTo(OrderStatus.CANCELLED);
        assertThat(cancelledOrder.getFailureReason()).contains("downstream failure");
    }
}

16. AWS Integration: SNS + SQS Pattern

For cross-region or cross-account saga communication, SNS + SQS fan-out is preferred over Kafka.

                  SNS Topic (order.events)
                         |
          +--------------+--------------+
          |              |              |
     SQS Queue      SQS Queue      SQS Queue
   (payment-svc)  (inventory-svc) (analytics)
// AwsSnsEventPublisher.java
@Component
@RequiredArgsConstructor
@Slf4j
public class AwsSnsEventPublisher {
 
    private final SnsClient snsClient;
    private final ObjectMapper objectMapper;
 
    @Value("${aws.sns.order-events-topic-arn}")
    private String orderEventsTopicArn;
 
    public void publishOrderCreatedEvent(OrderCreatedEvent event) {
        try {
            String messageBody = objectMapper.writeValueAsString(event);
 
            PublishRequest request = PublishRequest.builder()
                .topicArn(orderEventsTopicArn)
                .message(messageBody)
                .subject("OrderCreatedEvent")
                .messageAttributes(Map.of(
                    "eventType", MessageAttributeValue.builder()
                        .dataType("String")
                        .stringValue("OrderCreatedEvent")
                        .build(),
                    "sagaId", MessageAttributeValue.builder()
                        .dataType("String")
                        .stringValue(event.sagaId())
                        .build()
                ))
                .build();
 
            PublishResponse response = snsClient.publish(request);
            log.info("Published OrderCreatedEvent to SNS: messageId={}, orderId={}",
                response.messageId(), event.orderId());
 
        } catch (Exception e) {
            log.error("Failed to publish to SNS: orderId={}", event.orderId(), e);
            throw new EventPublicationException("SNS publish failed", e);
        }
    }
}
// AwsSqsEventConsumer.java
@Component
@RequiredArgsConstructor
@Slf4j
public class AwsSqsEventConsumer {
 
    private final SqsClient sqsClient;
    private final ObjectMapper objectMapper;
    private final PaymentService paymentService;
 
    @Value("${aws.sqs.payment-service-queue-url}")
    private String queueUrl;
 
    @Scheduled(fixedDelay = 1000)
    public void pollMessages() {
        ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder()
            .queueUrl(queueUrl)
            .maxNumberOfMessages(10)
            .waitTimeSeconds(20)   // Long polling
            .visibilityTimeout(30) // 30 seconds to process
            .messageAttributeNames("All")
            .build();
 
        ReceiveMessageResponse response = sqsClient.receiveMessage(receiveRequest);
 
        for (Message message : response.messages()) {
            try {
                processMessage(message);
                deleteMessage(message);
            } catch (Exception e) {
                log.error("Failed to process SQS message: messageId={}", message.messageId(), e);
                // Message will return to queue after visibility timeout expires
                // After maxReceiveCount, it goes to DLQ
            }
        }
    }
 
    private void processMessage(Message message) {
        String eventType = extractEventType(message);
        switch (eventType) {
            case "OrderCreatedEvent" -> {
                OrderCreatedEvent event = deserialize(message.body(), OrderCreatedEvent.class);
                paymentService.processPayment(event);
            }
            default -> log.warn("Unknown event type: {}", eventType);
        }
    }
 
    private void deleteMessage(Message message) {
        sqsClient.deleteMessage(DeleteMessageRequest.builder()
            .queueUrl(queueUrl)
            .receiptHandle(message.receiptHandle())
            .build());
    }
 
    private String extractEventType(Message message) {
        return message.messageAttributes()
            .getOrDefault("eventType",
                MessageAttributeValue.builder().stringValue("UNKNOWN").build())
            .stringValue();
    }
 
    private <T> T deserialize(String body, Class<T> type) {
        try {
            // SNS wraps message in an envelope
            Map<String, Object> envelope = objectMapper.readValue(body, Map.class);
            String innerMessage = (String) envelope.get("Message");
            return objectMapper.readValue(innerMessage, type);
        } catch (Exception e) {
            throw new RuntimeException("Failed to deserialize message", e);
        }
    }
}

17. Advantages and Disadvantages

Advantages

AdvantageExplanation
Loose couplingServices only know about events, not other services
High availabilityNo central coordinator = no single point of failure
Independent scalingEach service scales based on its event load
Technology diversityEach service can use any messaging technology
Simplicity per serviceEach service has simple, clear responsibilities
Natural resilienceFailures in one service do not block others

Disadvantages

DisadvantageExplanation
Difficult to traceNo central view of where a saga currently is
Cyclic event dependenciesService A responds to B's event, B responds to A's event
Testing complexityMust simulate entire event chain end-to-end
Event explosionAdding a new step requires new events, new subscriptions
Hard to enforce sequenceEvents can arrive out of order
Debugging is painfulDistributed across multiple service logs and topics

18. When to Choose Choreography

Choose Choreography When:

  • You have 3-5 services maximum in the saga
  • Services are truly independent teams that should not couple to a central orchestrator
  • The saga flow is relatively simple and linear
  • High throughput is required (no orchestrator bottleneck)
  • You prefer eventual consistency with minimal infrastructure

Choose Orchestration (Part 3) When:

  • Saga involves more than 5 services
  • Business logic is complex with branching, conditional steps
  • You need full visibility into saga state at any point
  • Rollback ordering is complex
  • Your team needs to debug and trace saga execution easily

19. Summary

TopicKey Points
Choreography conceptServices communicate via events, no central coordinator
Event flowOrderCreated -> PaymentProcessed -> InventoryReserved -> ShipmentCreated
Compensation flowFailure triggers reverse events for compensation
IdempotencyEVERY handler must check if work was already done
Outbox patternAlways use outbox to avoid dual-write
Kafka configACKS=all, idempotent producer, manual ACK on consumer
Error handlingRetry with backoff + Dead Letter Topic
MySQL schemaPessimistic locking on inventory, optimistic versioning on orders
TestingTestcontainers with full event chain simulation

Next: Part 3 - Orchestration Pattern

Discover how to build a central saga orchestrator that provides full visibility and control,
including AWS Step Functions integration.


Series Navigation: Index |
Part 1 | Part 2 |
Part 3 |
Part 4 |
Part 5 |
Part 6 |
Part 7