Part 2: SAGA Choreography Pattern - Complete Implementation
Series Navigation: Index |
Part 1 - Fundamentals | Part 2 |
Part 3 - Orchestration |
Part 4 - Implementation |
Part 5 - Advanced |
Part 6 - Pitfalls |
Part 7 - Interview
Table of Contents
- What is Choreography?
- Mental Model: A Dance Without a Director
- Event Flow Diagrams
- Project Setup and Dependencies
- Shared Domain Models and Events
- Order Service - Full Implementation
- Payment Service - Full Implementation
- Inventory Service - Full Implementation
- Shipping Service - Full Implementation
- Kafka Configuration (AWS MSK)
- MySQL Schema Design
- Complete application.yml
- Handling the Outbox Pattern in Choreography
- Error Handling and Retry Strategies
- Testing Choreography SAGAs
- AWS Integration: SNS + SQS Pattern
- Advantages and Disadvantages
- When to Choose Choreography
- Summary
1. What is Choreography?
In choreography, services communicate by emitting events and listening to events.
There is NO central coordinator that knows the full saga flow. Each service acts
independently based on what events it receives.
Think of it as a pub/sub system where:
- Each service PUBLISHES events when something happens
- Each service SUBSCRIBES to events from other services
- Services react to events and perform their local work
- The saga progresses through cascading events
Order Service Message Broker Other Services
(Producer) (Kafka / SQS) (Consumers)
| | |
|-- OrderCreatedEvent -->| |
| |-- OrderCreatedEvent-->|
| | (Payment Service subscribes)
| | |
| |<-- PaymentProcessedEvent
| | |
| |-- PaymentProcessedEvent -->
| | (Inventory Service subscribes)
| | |
... and so on
What Each Service "Knows"
- Order Service: Knows to create an order and publish OrderCreatedEvent. Knows to
cancel an order when OrderCancelledEvent is received. - Payment Service: Knows to process payment when OrderCreatedEvent arrives.
Knows to refund when InventoryReservationFailedEvent arrives. - Inventory Service: Knows to reserve inventory when PaymentProcessedEvent arrives.
Knows to release when OrderCancelledEvent arrives. - Shipping Service: Knows to create shipment when InventoryReservedEvent arrives.
NO service knows the complete order saga flow. The saga emerges from the interaction
of all services reacting to events.
2. Mental Model: A Dance Without a Director
Imagine a jazz band:
- There is NO conductor telling each musician what to play and when
- Each musician listens to what the others are playing
- Each musician plays their part in response to what they hear
- Together, they produce a coherent piece of music
That is choreography. Each service is a musician. Events are the music. The saga is
the coherent piece that emerges.
Compare with an orchestra with a conductor - that is orchestration (Part 3).
3. Event Flow Diagrams
Happy Path
Customer Order Svc Payment Svc Inventory Svc Shipping Svc
| | | | |
|---placeOrder--->| | | |
| |-- INSERT order | | |
| | (status=PENDING| | |
| |-- PUBLISH: OrderCreatedEvent | |
| | | | |
| | (receives event) | |
| | |-- process pmt | |
| | |-- INSERT payment| |
| | |-- PUBLISH: PaymentProcessedEvent |
| | | | |
| | | (receives event) |
| | | |-- reserve stock|
| | | |-- INSERT rsrv |
| | | |-- PUBLISH: InventoryReservedEvent
| | | | |
| | | | (receives) |
| | | | |-- create shipment
| | | | |-- PUBLISH: ShipmentCreatedEvent
| | | | |
| | (receives event) | | |
| |-- UPDATE order | | |
| | status=CONFIRMED | |
|<---orderConfirmed| | | |
Failure Path (Inventory Out of Stock)
Customer Order Svc Payment Svc Inventory Svc Shipping Svc
| | | |
|---placeOrder--->| | |
| |-- INSERT order PENDING |
| |-- PUBLISH: OrderCreatedEvent |
| | | |
| | (receives event) |
| | |-- charge card |
| | |-- INSERT payment CHARGED
| | |-- PUBLISH: PaymentProcessedEvent
| | | |
| | | (receives event)
| | | |-- check stock: 0 items
| | | |-- ROLLBACK local txn
| | | |-- PUBLISH: InventoryReservationFailedEvent
| | | |
| | | (receives compensation event)
| | |-- call refund API
| | |-- UPDATE payment REFUNDED
| | |-- PUBLISH: PaymentRefundedEvent
| | |
| | (receives event)|
| |-- UPDATE order CANCELLED
| |-- PUBLISH: OrderCancelledEvent
|<--orderCancelled|
4. Project Setup and Dependencies
Maven Parent POM
<!-- pom.xml (parent) -->
<project>
<groupId>com.example</groupId>
<artifactId>order-saga-parent</artifactId>
<version>1.0.0</version>
<packaging>pom</packaging>
<properties>
<java.version>17</java.version>
<spring.boot.version>3.2.3</spring.boot.version>
<spring.cloud.version>2023.0.1</spring.cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>Order Service pom.xml
<dependencies>
<!-- Spring Boot Starters -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- MySQL -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- AWS SDK for MSK IAM Auth -->
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>2.1.0</version>
</dependency>
<!-- AWS SDK for SQS (optional, if using SQS) -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
<version>2.23.0</version>
</dependency>
<!-- Micrometer for Metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-cloudwatch2</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Jackson for JSON -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<scope>test</scope>
</dependency>
</dependencies>5. Shared Domain Models and Events
In a real system these live in a shared library module published to your artifact repository.
Order Domain
// Order.java
package com.example.orderservice.domain;
import jakarta.persistence.*;
import lombok.*;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
@Entity
@Table(name = "orders", indexes = {
@Index(name = "idx_orders_customer_id", columnList = "customer_id"),
@Index(name = "idx_orders_status", columnList = "status"),
@Index(name = "idx_orders_created_at", columnList = "created_at")
})
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Order {
@Id
@Column(name = "order_id", length = 36)
private String orderId;
@Column(name = "customer_id", nullable = false, length = 36)
private String customerId;
@Enumerated(EnumType.STRING)
@Column(name = "status", nullable = false, length = 30)
private OrderStatus status;
@Column(name = "total_amount", nullable = false, precision = 15, scale = 2)
private BigDecimal totalAmount;
@Column(name = "saga_id", length = 36)
private String sagaId;
@OneToMany(mappedBy = "order", cascade = CascadeType.ALL, orphanRemoval = true, fetch = FetchType.LAZY)
@Builder.Default
private List<OrderItem> items = new ArrayList<>();
@Column(name = "failure_reason", length = 500)
private String failureReason;
@Column(name = "created_at", nullable = false, updatable = false)
private Instant createdAt;
@Column(name = "updated_at", nullable = false)
private Instant updatedAt;
@Version
@Column(name = "version")
private Long version; // Optimistic locking
@PrePersist
public void prePersist() {
this.createdAt = Instant.now();
this.updatedAt = Instant.now();
}
@PreUpdate
public void preUpdate() {
this.updatedAt = Instant.now();
}
}// OrderItem.java
package com.example.orderservice.domain;
import jakarta.persistence.*;
import lombok.*;
import java.math.BigDecimal;
@Entity
@Table(name = "order_items")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class OrderItem {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "order_id", nullable = false)
private Order order;
@Column(name = "product_id", nullable = false, length = 36)
private String productId;
@Column(name = "product_name", nullable = false, length = 255)
private String productName;
@Column(name = "quantity", nullable = false)
private Integer quantity;
@Column(name = "unit_price", nullable = false, precision = 15, scale = 2)
private BigDecimal unitPrice;
}// OrderStatus.java
package com.example.orderservice.domain;
public enum OrderStatus {
PENDING, // Created, awaiting payment processing
PAYMENT_PENDING, // Saga started, payment being processed
PAYMENT_PROCESSED,// Payment done, awaiting inventory
CONFIRMED, // All saga steps completed successfully
CANCELLING, // Compensation in progress
CANCELLED, // Saga compensated, order cancelled
FAILED // Saga failed with non-compensable error
}Event Classes (Shared Kafka Events)
Events are records (immutable) that get serialized to JSON and published to Kafka topics.
// OrderCreatedEvent.java
package com.example.shared.events;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.List;
public record OrderCreatedEvent(
@JsonProperty("eventId") String eventId,
@JsonProperty("sagaId") String sagaId,
@JsonProperty("orderId") String orderId,
@JsonProperty("customerId") String customerId,
@JsonProperty("items") List<OrderItemDto> items,
@JsonProperty("totalAmount") BigDecimal totalAmount,
@JsonProperty("occurredAt") Instant occurredAt
) {
public static OrderCreatedEvent of(String sagaId, String orderId,
String customerId, List<OrderItemDto> items,
BigDecimal totalAmount) {
return new OrderCreatedEvent(
java.util.UUID.randomUUID().toString(),
sagaId,
orderId,
customerId,
items,
totalAmount,
Instant.now()
);
}
}// OrderItemDto.java
package com.example.shared.events;
import java.math.BigDecimal;
public record OrderItemDto(
String productId,
String productName,
Integer quantity,
BigDecimal unitPrice
) {}// PaymentProcessedEvent.java
package com.example.shared.events;
import java.math.BigDecimal;
import java.time.Instant;
public record PaymentProcessedEvent(
String eventId,
String sagaId,
String orderId,
String paymentId,
String customerId,
BigDecimal amount,
String paymentMethod,
String externalTransactionId,
Instant occurredAt
) {}// PaymentFailedEvent.java
package com.example.shared.events;
import java.time.Instant;
public record PaymentFailedEvent(
String eventId,
String sagaId,
String orderId,
String customerId,
String failureReason,
String failureCode,
Instant occurredAt
) {}// InventoryReservedEvent.java
package com.example.shared.events;
import java.time.Instant;
import java.util.List;
public record InventoryReservedEvent(
String eventId,
String sagaId,
String orderId,
String reservationId,
List<ReservedItemDto> reservedItems,
Instant occurredAt
) {}// ReservedItemDto.java
package com.example.shared.events;
public record ReservedItemDto(
String productId,
Integer reservedQuantity,
String warehouseId
) {}// InventoryReservationFailedEvent.java
package com.example.shared.events;
import java.time.Instant;
import java.util.List;
public record InventoryReservationFailedEvent(
String eventId,
String sagaId,
String orderId,
List<String> outOfStockProductIds,
String failureReason,
Instant occurredAt
) {}// ShipmentCreatedEvent.java
package com.example.shared.events;
import java.time.Instant;
import java.time.LocalDate;
public record ShipmentCreatedEvent(
String eventId,
String sagaId,
String orderId,
String shipmentId,
String trackingNumber,
LocalDate estimatedDelivery,
Instant occurredAt
) {}// PaymentRefundedEvent.java - for compensation
package com.example.shared.events;
import java.math.BigDecimal;
import java.time.Instant;
public record PaymentRefundedEvent(
String eventId,
String sagaId,
String orderId,
String paymentId,
BigDecimal refundedAmount,
String refundTransactionId,
Instant occurredAt
) {}Kafka Topic Constants
// KafkaTopics.java
package com.example.shared.kafka;
public final class KafkaTopics {
private KafkaTopics() {}
// Forward flow topics
public static final String ORDER_CREATED = "order.events.order-created";
public static final String PAYMENT_PROCESSED = "payment.events.payment-processed";
public static final String PAYMENT_FAILED = "payment.events.payment-failed";
public static final String INVENTORY_RESERVED = "inventory.events.inventory-reserved";
public static final String INVENTORY_RESERVATION_FAILED = "inventory.events.reservation-failed";
public static final String SHIPMENT_CREATED = "shipping.events.shipment-created";
public static final String SHIPMENT_FAILED = "shipping.events.shipment-failed";
// Compensation flow topics
public static final String PAYMENT_REFUNDED = "payment.events.payment-refunded";
public static final String INVENTORY_RELEASED = "inventory.events.inventory-released";
public static final String ORDER_CANCELLED = "order.events.order-cancelled";
// Dead letter topics (suffix convention)
public static final String DLT_SUFFIX = ".DLT";
}6. Order Service - Full Implementation
// OrderService.java
package com.example.orderservice.service;
import com.example.orderservice.domain.*;
import com.example.orderservice.dto.PlaceOrderRequest;
import com.example.orderservice.outbox.OutboxEvent;
import com.example.orderservice.outbox.OutboxEventRepository;
import com.example.orderservice.repository.OrderRepository;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxEventRepository;
private final ObjectMapper objectMapper;
/**
* Step 1 of saga: Create order and publish OrderCreatedEvent.
* Uses Transactional Outbox pattern to ensure atomic DB write + event publication.
*/
@Transactional
public Order placeOrder(PlaceOrderRequest request) {
String orderId = UUID.randomUUID().toString();
String sagaId = UUID.randomUUID().toString();
// Build order items
List<OrderItem> items = request.items().stream()
.map(itemReq -> OrderItem.builder()
.productId(itemReq.productId())
.productName(itemReq.productName())
.quantity(itemReq.quantity())
.unitPrice(itemReq.unitPrice())
.build())
.collect(Collectors.toList());
// Create order
Order order = Order.builder()
.orderId(orderId)
.customerId(request.customerId())
.status(OrderStatus.PENDING)
.totalAmount(request.totalAmount())
.sagaId(sagaId)
.build();
items.forEach(item -> item.setOrder(order));
order.setItems(items);
orderRepository.save(order);
log.info("Order created: orderId={}, sagaId={}, customerId={}",
orderId, sagaId, request.customerId());
// Build event
List<OrderItemDto> itemDtos = items.stream()
.map(i -> new OrderItemDto(i.getProductId(), i.getProductName(),
i.getQuantity(), i.getUnitPrice()))
.collect(Collectors.toList());
OrderCreatedEvent event = OrderCreatedEvent.of(
sagaId, orderId, request.customerId(), itemDtos, request.totalAmount()
);
// Write to outbox (same transaction - atomic!)
saveToOutbox(KafkaTopics.ORDER_CREATED, orderId, event);
return order;
}
/**
* Compensation: Cancel order after all compensations upstream are done.
* Triggered by PaymentRefundedEvent or PaymentFailedEvent (if payment never processed).
*/
@Transactional
public void cancelOrder(String orderId, String reason) {
Order order = orderRepository.findByOrderIdForUpdate(orderId)
.orElseThrow(() -> new OrderNotFoundException("Order not found: " + orderId));
if (order.getStatus() == OrderStatus.CANCELLED) {
log.info("Order {} already cancelled, skipping (idempotent)", orderId);
return; // Idempotent: already done
}
order.setStatus(OrderStatus.CANCELLED);
order.setFailureReason(reason);
orderRepository.save(order);
log.info("Order cancelled: orderId={}, reason={}", orderId, reason);
// Publish OrderCancelledEvent for downstream consumers
var cancelledEvent = new OrderCancelledEvent(
UUID.randomUUID().toString(),
order.getSagaId(),
orderId,
order.getCustomerId(),
reason,
Instant.now()
);
saveToOutbox(KafkaTopics.ORDER_CANCELLED, orderId, cancelledEvent);
}
/**
* Update order status when shipment is confirmed (saga complete).
*/
@Transactional
public void confirmOrder(String orderId, String shipmentId) {
Order order = orderRepository.findByOrderIdForUpdate(orderId)
.orElseThrow(() -> new OrderNotFoundException("Order not found: " + orderId));
if (order.getStatus() == OrderStatus.CONFIRMED) {
log.info("Order {} already confirmed (idempotent)", orderId);
return;
}
order.setStatus(OrderStatus.CONFIRMED);
orderRepository.save(order);
log.info("Order confirmed: orderId={}, shipmentId={}", orderId, shipmentId);
}
private void saveToOutbox(String topic, String aggregateId, Object event) {
try {
OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(UUID.randomUUID().toString())
.aggregateId(aggregateId)
.topicName(topic)
.payload(objectMapper.writeValueAsString(event))
.eventType(event.getClass().getSimpleName())
.status(OutboxEvent.OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxEventRepository.save(outboxEvent);
} catch (Exception e) {
throw new RuntimeException("Failed to write to outbox", e);
}
}
}// OrderSagaEventHandler.java - handles saga events for the Order Service
package com.example.orderservice.saga;
import com.example.orderservice.service.OrderService;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class OrderSagaEventHandler {
private final OrderService orderService;
/**
* Listen for ShipmentCreatedEvent - saga success, update order to CONFIRMED.
*/
@KafkaListener(
topics = KafkaTopics.SHIPMENT_CREATED,
groupId = "order-service-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleShipmentCreated(
@Payload ShipmentCreatedEvent event,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment acknowledgment) {
log.info("Received ShipmentCreatedEvent: orderId={}, shipmentId={}, partition={}, offset={}",
event.orderId(), event.shipmentId(), partition, offset);
try {
orderService.confirmOrder(event.orderId(), event.shipmentId());
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Failed to process ShipmentCreatedEvent for orderId={}", event.orderId(), e);
// DO NOT acknowledge - will be retried via retry topic or DLT
throw e;
}
}
/**
* Compensation: Listen for PaymentRefundedEvent - cancel the order.
* This means payment was refunded because of a downstream failure.
*/
@KafkaListener(
topics = KafkaTopics.PAYMENT_REFUNDED,
groupId = "order-service-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void handlePaymentRefunded(
@Payload PaymentRefundedEvent event,
Acknowledgment acknowledgment) {
log.info("Received PaymentRefundedEvent (compensation): orderId={}, sagaId={}",
event.orderId(), event.sagaId());
try {
orderService.cancelOrder(event.orderId(), "Payment refunded due to downstream failure");
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Failed to process compensation for orderId={}", event.orderId(), e);
throw e;
}
}
/**
* Compensation: Listen for PaymentFailedEvent - cancel order directly
* (no payment was made, so no refund needed).
*/
@KafkaListener(
topics = KafkaTopics.PAYMENT_FAILED,
groupId = "order-service-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void handlePaymentFailed(
@Payload PaymentFailedEvent event,
Acknowledgment acknowledgment) {
log.info("Received PaymentFailedEvent (direct cancel): orderId={}, reason={}",
event.orderId(), event.failureReason());
try {
orderService.cancelOrder(event.orderId(),
"Payment failed: " + event.failureReason());
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("Failed to handle PaymentFailedEvent for orderId={}", event.orderId(), e);
throw e;
}
}
}// OrderRepository.java
package com.example.orderservice.repository;
import com.example.orderservice.domain.Order;
import com.example.orderservice.domain.OrderStatus;
import jakarta.persistence.LockModeType;
import org.springframework.data.jpa.repository.*;
import org.springframework.data.repository.query.Param;
import java.util.Optional;
public interface OrderRepository extends JpaRepository<Order, String> {
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT o FROM Order o WHERE o.orderId = :orderId")
Optional<Order> findByOrderIdForUpdate(@Param("orderId") String orderId);
Optional<Order> findBySagaId(String sagaId);
@Query("SELECT o FROM Order o WHERE o.status = :status")
java.util.List<Order> findByStatus(@Param("status") OrderStatus status);
}// OrderController.java - REST API
package com.example.orderservice.controller;
import com.example.orderservice.domain.Order;
import com.example.orderservice.dto.PlaceOrderRequest;
import com.example.orderservice.dto.OrderResponse;
import com.example.orderservice.service.OrderService;
import jakarta.validation.Valid;
import lombok.RequiredArgsConstructor;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/v1/orders")
@RequiredArgsConstructor
public class OrderController {
private final OrderService orderService;
@PostMapping
public ResponseEntity<OrderResponse> placeOrder(@Valid @RequestBody PlaceOrderRequest request) {
Order order = orderService.placeOrder(request);
return ResponseEntity.status(HttpStatus.ACCEPTED)
.body(OrderResponse.from(order));
}
@GetMapping("/{orderId}")
public ResponseEntity<OrderResponse> getOrder(@PathVariable String orderId) {
// query implementation
return ResponseEntity.ok(OrderResponse.placeholder(orderId));
}
}7. Payment Service - Full Implementation
// PaymentService.java
package com.example.paymentservice.service;
import com.example.paymentservice.domain.*;
import com.example.paymentservice.gateway.PaymentGatewayClient;
import com.example.paymentservice.outbox.OutboxEvent;
import com.example.paymentservice.outbox.OutboxEventRepository;
import com.example.paymentservice.repository.PaymentRepository;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.UUID;
@Service
@RequiredArgsConstructor
@Slf4j
public class PaymentService {
private final PaymentRepository paymentRepository;
private final OutboxEventRepository outboxEventRepository;
private final PaymentGatewayClient gatewayClient;
private final ObjectMapper objectMapper;
/**
* Process payment for order. Triggered by OrderCreatedEvent.
* Idempotent: Checks if payment already processed for this order.
*/
@Transactional
public void processPayment(OrderCreatedEvent event) {
// Idempotency check: has this order already been processed?
if (paymentRepository.existsByOrderId(event.orderId())) {
log.info("Payment already processed for orderId={} (idempotent)", event.orderId());
return;
}
log.info("Processing payment: orderId={}, amount={}", event.orderId(), event.totalAmount());
String paymentId = UUID.randomUUID().toString();
Payment payment = Payment.builder()
.paymentId(paymentId)
.orderId(event.orderId())
.customerId(event.customerId())
.amount(event.totalAmount())
.status(PaymentStatus.PROCESSING)
.sagaId(event.sagaId())
.build();
paymentRepository.save(payment);
try {
// Call external payment gateway
PaymentGatewayResponse gatewayResponse = gatewayClient.charge(
event.customerId(),
event.totalAmount(),
paymentId
);
payment.setStatus(PaymentStatus.CHARGED);
payment.setExternalTransactionId(gatewayResponse.transactionId());
paymentRepository.save(payment);
// Publish success event to outbox
PaymentProcessedEvent successEvent = new PaymentProcessedEvent(
UUID.randomUUID().toString(),
event.sagaId(),
event.orderId(),
paymentId,
event.customerId(),
event.totalAmount(),
"CREDIT_CARD",
gatewayResponse.transactionId(),
Instant.now()
);
saveToOutbox(KafkaTopics.PAYMENT_PROCESSED, event.orderId(), successEvent);
log.info("Payment processed successfully: orderId={}, paymentId={}", event.orderId(), paymentId);
} catch (PaymentDeclinedException e) {
// Payment was declined - saga must compensate
payment.setStatus(PaymentStatus.DECLINED);
payment.setFailureReason(e.getMessage());
paymentRepository.save(payment);
PaymentFailedEvent failureEvent = new PaymentFailedEvent(
UUID.randomUUID().toString(),
event.sagaId(),
event.orderId(),
event.customerId(),
e.getMessage(),
e.getDeclineCode(),
Instant.now()
);
saveToOutbox(KafkaTopics.PAYMENT_FAILED, event.orderId(), failureEvent);
log.warn("Payment declined for orderId={}: {}", event.orderId(), e.getMessage());
}
}
/**
* Compensation: Refund payment. Triggered by InventoryReservationFailedEvent.
* Idempotent: Safe to call multiple times.
*/
@Transactional
public void refundPayment(String orderId, String reason) {
Payment payment = paymentRepository.findByOrderId(orderId)
.orElseThrow(() -> new PaymentNotFoundException("Payment not found for order: " + orderId));
// Idempotency: already refunded?
if (payment.getStatus() == PaymentStatus.REFUNDED) {
log.info("Payment for orderId={} already refunded (idempotent)", orderId);
return;
}
// Only refund if actually charged
if (payment.getStatus() != PaymentStatus.CHARGED) {
log.warn("Cannot refund payment in status {} for orderId={}", payment.getStatus(), orderId);
// Publish refunded event anyway to continue compensation chain
publishRefundEvent(payment, "SKIPPED - not charged");
return;
}
log.info("Refunding payment for orderId={}, amount={}", orderId, payment.getAmount());
try {
// Call gateway refund
String refundTransactionId = gatewayClient.refund(
payment.getExternalTransactionId(),
payment.getAmount()
);
payment.setStatus(PaymentStatus.REFUNDED);
payment.setRefundTransactionId(refundTransactionId);
payment.setRefundReason(reason);
paymentRepository.save(payment);
publishRefundEvent(payment, refundTransactionId);
log.info("Payment refunded: orderId={}, refundTxn={}", orderId, refundTransactionId);
} catch (Exception e) {
// Refund failed - this is a critical situation
// Log for manual intervention, but still attempt to continue compensation
log.error("CRITICAL: Refund failed for orderId={}, manual intervention required", orderId, e);
// In production: alert ops team, create support ticket
throw new CompensationFailedException("Refund failed for order " + orderId, e);
}
}
private void publishRefundEvent(Payment payment, String refundTxnId) {
PaymentRefundedEvent refundEvent = new PaymentRefundedEvent(
UUID.randomUUID().toString(),
payment.getSagaId(),
payment.getOrderId(),
payment.getPaymentId(),
payment.getAmount(),
refundTxnId,
Instant.now()
);
saveToOutbox(KafkaTopics.PAYMENT_REFUNDED, payment.getOrderId(), refundEvent);
}
private void saveToOutbox(String topic, String aggregateId, Object event) {
try {
OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(UUID.randomUUID().toString())
.aggregateId(aggregateId)
.topicName(topic)
.payload(objectMapper.writeValueAsString(event))
.eventType(event.getClass().getSimpleName())
.status(OutboxEvent.OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxEventRepository.save(outboxEvent);
} catch (Exception e) {
throw new RuntimeException("Failed to write to outbox", e);
}
}
}// PaymentSagaEventHandler.java
package com.example.paymentservice.saga;
import com.example.paymentservice.service.PaymentService;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class PaymentSagaEventHandler {
private final PaymentService paymentService;
/**
* Forward flow: Process payment when order is created.
*/
@KafkaListener(
topics = KafkaTopics.ORDER_CREATED,
groupId = "payment-service-group"
)
public void handleOrderCreated(@Payload OrderCreatedEvent event, Acknowledgment ack) {
log.info("PaymentService received OrderCreatedEvent: orderId={}", event.orderId());
try {
paymentService.processPayment(event);
ack.acknowledge();
} catch (Exception e) {
log.error("Failed to process OrderCreatedEvent: orderId={}", event.orderId(), e);
throw e; // Let retry/DLT handle it
}
}
/**
* Compensation: Refund payment when inventory reservation fails.
*/
@KafkaListener(
topics = KafkaTopics.INVENTORY_RESERVATION_FAILED,
groupId = "payment-service-group"
)
public void handleInventoryReservationFailed(
@Payload InventoryReservationFailedEvent event,
Acknowledgment ack) {
log.info("PaymentService received InventoryReservationFailedEvent: orderId={}", event.orderId());
try {
paymentService.refundPayment(event.orderId(), event.failureReason());
ack.acknowledge();
} catch (Exception e) {
log.error("Compensation failed for orderId={}", event.orderId(), e);
throw e;
}
}
}8. Inventory Service - Full Implementation
// InventoryService.java
package com.example.inventoryservice.service;
import com.example.inventoryservice.domain.*;
import com.example.inventoryservice.outbox.OutboxEvent;
import com.example.inventoryservice.outbox.OutboxEventRepository;
import com.example.inventoryservice.repository.*;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
@Slf4j
public class InventoryService {
private final InventoryItemRepository inventoryRepository;
private final ReservationRepository reservationRepository;
private final OutboxEventRepository outboxEventRepository;
private final ObjectMapper objectMapper;
/**
* Reserve inventory items. Triggered by PaymentProcessedEvent.
* Uses pessimistic locking to prevent concurrent reservation issues.
* Idempotent: checks existing reservation.
*/
@Transactional
public void reserveInventory(PaymentProcessedEvent event) {
// Idempotency check
if (reservationRepository.existsByOrderId(event.orderId())) {
log.info("Inventory already reserved for orderId={} (idempotent)", event.orderId());
return;
}
log.info("Reserving inventory for orderId={}", event.orderId());
// We need to get items from the original order event context
// In a real system, either the event contains items OR we call Order Service
// For this example, assume the event contains item information
// (passed through a richer PaymentProcessedEvent or looked up from a read model)
List<String> failedProducts = new ArrayList<>();
List<ReservedItemDto> reservedItems = new ArrayList<>();
String reservationId = UUID.randomUUID().toString();
// In real implementation, get items from the PaymentProcessedEvent
// which carries the order details, or from a CQRS read model
List<OrderItemDto> itemsToReserve = getOrderItems(event.orderId());
for (OrderItemDto item : itemsToReserve) {
// Pessimistic lock on the inventory row to prevent concurrent access
InventoryItem inventoryItem = inventoryRepository
.findByProductIdWithLock(item.productId())
.orElseThrow(() -> new ProductNotFoundException("Product not found: " + item.productId()));
if (inventoryItem.getAvailableQuantity() < item.quantity()) {
failedProducts.add(item.productId());
log.warn("Insufficient inventory: productId={}, available={}, requested={}",
item.productId(), inventoryItem.getAvailableQuantity(), item.quantity());
} else {
// Reserve the items
inventoryItem.setAvailableQuantity(
inventoryItem.getAvailableQuantity() - item.quantity()
);
inventoryItem.setReservedQuantity(
inventoryItem.getReservedQuantity() + item.quantity()
);
inventoryRepository.save(inventoryItem);
reservedItems.add(new ReservedItemDto(
item.productId(),
item.quantity(),
inventoryItem.getWarehouseId()
));
}
}
if (!failedProducts.isEmpty()) {
// Rollback any partial reservations we already made
rollbackPartialReservations(reservedItems, reservationId);
// Publish failure event - triggers compensation
InventoryReservationFailedEvent failureEvent = new InventoryReservationFailedEvent(
UUID.randomUUID().toString(),
event.sagaId(),
event.orderId(),
failedProducts,
"Insufficient stock for products: " + failedProducts,
Instant.now()
);
saveToOutbox(KafkaTopics.INVENTORY_RESERVATION_FAILED, event.orderId(), failureEvent);
log.warn("Inventory reservation failed for orderId={}, failedProducts={}",
event.orderId(), failedProducts);
} else {
// All items reserved successfully
// Save reservation record
Reservation reservation = Reservation.builder()
.reservationId(reservationId)
.orderId(event.orderId())
.sagaId(event.sagaId())
.status(ReservationStatus.RESERVED)
.createdAt(Instant.now())
.build();
reservationRepository.save(reservation);
// Publish success event
InventoryReservedEvent successEvent = new InventoryReservedEvent(
UUID.randomUUID().toString(),
event.sagaId(),
event.orderId(),
reservationId,
reservedItems,
Instant.now()
);
saveToOutbox(KafkaTopics.INVENTORY_RESERVED, event.orderId(), successEvent);
log.info("Inventory reserved successfully: orderId={}, reservationId={}",
event.orderId(), reservationId);
}
}
/**
* Compensation: Release reserved inventory when order is cancelled.
* Idempotent.
*/
@Transactional
public void releaseInventory(String orderId) {
Reservation reservation = reservationRepository.findByOrderId(orderId)
.orElse(null);
if (reservation == null) {
log.info("No reservation found for orderId={}, nothing to release", orderId);
return;
}
if (reservation.getStatus() == ReservationStatus.RELEASED) {
log.info("Reservation for orderId={} already released (idempotent)", orderId);
return;
}
log.info("Releasing inventory reservation: orderId={}, reservationId={}",
orderId, reservation.getReservationId());
// Release each reserved item back to available
List<OrderItemDto> items = getOrderItems(orderId);
for (OrderItemDto item : items) {
inventoryRepository.findByProductIdWithLock(item.productId())
.ifPresent(inventoryItem -> {
inventoryItem.setAvailableQuantity(
inventoryItem.getAvailableQuantity() + item.quantity()
);
inventoryItem.setReservedQuantity(
Math.max(0, inventoryItem.getReservedQuantity() - item.quantity())
);
inventoryRepository.save(inventoryItem);
});
}
reservation.setStatus(ReservationStatus.RELEASED);
reservationRepository.save(reservation);
log.info("Inventory released for orderId={}", orderId);
}
private void rollbackPartialReservations(List<ReservedItemDto> reservedItems, String reservationId) {
for (ReservedItemDto item : reservedItems) {
inventoryRepository.findByProductIdWithLock(item.productId())
.ifPresent(inventoryItem -> {
inventoryItem.setAvailableQuantity(
inventoryItem.getAvailableQuantity() + item.reservedQuantity()
);
inventoryItem.setReservedQuantity(
Math.max(0, inventoryItem.getReservedQuantity() - item.reservedQuantity())
);
inventoryRepository.save(inventoryItem);
});
}
log.info("Partial reservations rolled back for reservationId={}", reservationId);
}
private List<OrderItemDto> getOrderItems(String orderId) {
// In real system: call Order Service via REST or read from CQRS read model
// For now, placeholder
return List.of(); // Fetch from read model or order-service-client
}
private void saveToOutbox(String topic, String aggregateId, Object event) {
try {
OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(UUID.randomUUID().toString())
.aggregateId(aggregateId)
.topicName(topic)
.payload(objectMapper.writeValueAsString(event))
.eventType(event.getClass().getSimpleName())
.status(OutboxEvent.OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxEventRepository.save(outboxEvent);
} catch (Exception e) {
throw new RuntimeException("Failed to write to outbox", e);
}
}
}// InventorySagaEventHandler.java
package com.example.inventoryservice.saga;
import com.example.inventoryservice.service.InventoryService;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class InventorySagaEventHandler {
private final InventoryService inventoryService;
@KafkaListener(
topics = KafkaTopics.PAYMENT_PROCESSED,
groupId = "inventory-service-group"
)
public void handlePaymentProcessed(@Payload PaymentProcessedEvent event, Acknowledgment ack) {
log.info("InventoryService received PaymentProcessedEvent: orderId={}", event.orderId());
try {
inventoryService.reserveInventory(event);
ack.acknowledge();
} catch (Exception e) {
log.error("Failed to reserve inventory for orderId={}", event.orderId(), e);
throw e;
}
}
@KafkaListener(
topics = KafkaTopics.ORDER_CANCELLED,
groupId = "inventory-service-group"
)
public void handleOrderCancelled(@Payload OrderCancelledEvent event, Acknowledgment ack) {
log.info("InventoryService received OrderCancelledEvent: orderId={}", event.orderId());
try {
inventoryService.releaseInventory(event.orderId());
ack.acknowledge();
} catch (Exception e) {
log.error("Failed to release inventory for orderId={}", event.orderId(), e);
throw e;
}
}
}9. Shipping Service - Full Implementation
// ShippingService.java
package com.example.shippingservice.service;
import com.example.shippingservice.domain.*;
import com.example.shippingservice.client.ShippingCarrierClient;
import com.example.shippingservice.outbox.OutboxEvent;
import com.example.shippingservice.outbox.OutboxEventRepository;
import com.example.shippingservice.repository.ShipmentRepository;
import com.example.shared.events.*;
import com.example.shared.kafka.KafkaTopics;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.time.LocalDate;
import java.util.UUID;
@Service
@RequiredArgsConstructor
@Slf4j
public class ShippingService {
private final ShipmentRepository shipmentRepository;
private final OutboxEventRepository outboxEventRepository;
private final ShippingCarrierClient carrierClient;
private final ObjectMapper objectMapper;
/**
* Schedule shipment. This is a RETRIABLE transaction (after pivot).
* If this fails, we retry - we do NOT compensate backward.
* The payment is taken, inventory is reserved. We MUST ship.
*/
@Transactional
@Retryable(
retryFor = {ShipmentCarrierException.class},
maxAttempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 30000)
)
public void createShipment(InventoryReservedEvent event) {
// Idempotency check
if (shipmentRepository.existsByOrderId(event.orderId())) {
log.info("Shipment already created for orderId={} (idempotent)", event.orderId());
return;
}
log.info("Creating shipment for orderId={}", event.orderId());
String shipmentId = UUID.randomUUID().toString();
Shipment shipment = Shipment.builder()
.shipmentId(shipmentId)
.orderId(event.orderId())
.sagaId(event.sagaId())
.status(ShipmentStatus.PENDING)
.reservationId(event.reservationId())
.createdAt(Instant.now())
.build();
shipmentRepository.save(shipment);
// Call carrier API (retriable)
CarrierBookingResponse booking = carrierClient.bookPickup(
event.orderId(),
event.reservedItems()
);
shipment.setTrackingNumber(booking.trackingNumber());
shipment.setCarrierCode(booking.carrierCode());
shipment.setEstimatedDelivery(booking.estimatedDelivery());
shipment.setStatus(ShipmentStatus.SCHEDULED);
shipmentRepository.save(shipment);
// Publish success event - this triggers order confirmation
ShipmentCreatedEvent successEvent = new ShipmentCreatedEvent(
UUID.randomUUID().toString(),
event.sagaId(),
event.orderId(),
shipmentId,
booking.trackingNumber(),
booking.estimatedDelivery(),
Instant.now()
);
saveToOutbox(KafkaTopics.SHIPMENT_CREATED, event.orderId(), successEvent);
log.info("Shipment created: orderId={}, trackingNumber={}", event.orderId(), booking.trackingNumber());
}
private void saveToOutbox(String topic, String aggregateId, Object event) {
try {
OutboxEvent outboxEvent = OutboxEvent.builder()
.eventId(UUID.randomUUID().toString())
.aggregateId(aggregateId)
.topicName(topic)
.payload(objectMapper.writeValueAsString(event))
.eventType(event.getClass().getSimpleName())
.status(OutboxEvent.OutboxStatus.PENDING)
.createdAt(Instant.now())
.build();
outboxEventRepository.save(outboxEvent);
} catch (Exception e) {
throw new RuntimeException("Failed to write to outbox", e);
}
}
}10. Kafka Configuration (AWS MSK)
// KafkaConfig.java
package com.example.orderservice.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.util.backoff.FixedBackOff;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
// ==================<mark class="obsidian-highlight"> PRODUCER CONFIGURATION </mark>==================
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// Reliability settings (production)
config.put(ProducerConfig.ACKS_CONFIG, "all"); // All ISR must acknowledge
config.put(ProducerConfig.RETRIES_CONFIG, 3);
config.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // Preserve ordering
// Idempotent producer (prevents duplicate messages on retry)
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Performance settings
config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
config.put(ProducerConfig.LINGER_MS_CONFIG, 5);
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
// AWS MSK IAM Authentication
if (bootstrapServers.contains("amazonaws.com")) {
config.put("security.protocol", "SASL_SSL");
config.put("sasl.mechanism", "AWS_MSK_IAM");
config.put("sasl.jaas.config",
"software.amazon.msk.auth.iam.IAMLoginModule required;");
config.put("sasl.client.callback.handler.class",
"software.amazon.msk.auth.iam.IAMClientCallbackHandler");
}
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// ==================<mark class="obsidian-highlight"> CONSUMER CONFIGURATION </mark>==================
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// Manual offset commit for reliability
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// Fetch settings
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 5 minutes max processing
// Trusted packages for deserialization
config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.*");
config.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "java.util.Map");
// Offset reset: earliest for new groups, latest for existing
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// AWS MSK IAM Authentication
if (bootstrapServers.contains("amazonaws.com")) {
config.put("security.protocol", "SASL_SSL");
config.put("sasl.mechanism", "AWS_MSK_IAM");
config.put("sasl.jaas.config",
"software.amazon.msk.auth.iam.IAMLoginModule required;");
config.put("sasl.client.callback.handler.class",
"software.amazon.msk.auth.iam.IAMClientCallbackHandler");
}
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Manual acknowledgment mode - we control when offset is committed
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// Concurrency: number of threads per listener (set to number of topic partitions)
factory.setConcurrency(3);
// Error handler with retry + Dead Letter Topic
factory.setCommonErrorHandler(errorHandler());
return factory;
}
@Bean
public DefaultErrorHandler errorHandler() {
// Retry up to 3 times with 1 second delay, then send to DLT
FixedBackOff backOff = new FixedBackOff(1000L, 3L);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
deadLetterPublishingRecoverer(),
backOff
);
// Do not retry on these exceptions (business logic errors, not transient)
errorHandler.addNotRetryableExceptions(
IllegalArgumentException.class,
IllegalStateException.class
);
return errorHandler;
}
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate());
// Default: sends to {topic}.DLT
}
}11. MySQL Schema Design
-- ==========================================================<mark class="obsidian-highlight">
-- ORDER SERVICE DATABASE
-- </mark>==========================================================
CREATE TABLE orders (
order_id VARCHAR(36) NOT NULL,
customer_id VARCHAR(36) NOT NULL,
status VARCHAR(30) NOT NULL DEFAULT 'PENDING',
total_amount DECIMAL(15, 2) NOT NULL,
saga_id VARCHAR(36),
failure_reason VARCHAR(500),
created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
version BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (order_id),
INDEX idx_orders_customer_id (customer_id),
INDEX idx_orders_status (status),
INDEX idx_orders_saga_id (saga_id),
INDEX idx_orders_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE order_items (
id BIGINT NOT NULL AUTO_INCREMENT,
order_id VARCHAR(36) NOT NULL,
product_id VARCHAR(36) NOT NULL,
product_name VARCHAR(255) NOT NULL,
quantity INT NOT NULL,
unit_price DECIMAL(15, 2) NOT NULL,
PRIMARY KEY (id),
INDEX idx_order_items_order_id (order_id),
FOREIGN KEY (order_id) REFERENCES orders(order_id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- Transactional Outbox Table (shared pattern for all services)
CREATE TABLE outbox_events (
event_id VARCHAR(36) NOT NULL,
aggregate_id VARCHAR(36) NOT NULL, -- e.g., order_id
topic_name VARCHAR(255) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSON NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'PENDING', -- PENDING, PUBLISHED, FAILED
retry_count INT NOT NULL DEFAULT 0,
error_message VARCHAR(1000),
created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
published_at DATETIME(6),
PRIMARY KEY (event_id),
INDEX idx_outbox_status (status, created_at),
INDEX idx_outbox_aggregate (aggregate_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- ==========================================================<mark class="obsidian-highlight">
-- PAYMENT SERVICE DATABASE
-- </mark>==========================================================
CREATE TABLE payments (
payment_id VARCHAR(36) NOT NULL,
order_id VARCHAR(36) NOT NULL UNIQUE, -- one payment per order
customer_id VARCHAR(36) NOT NULL,
saga_id VARCHAR(36),
amount DECIMAL(15, 2) NOT NULL,
status VARCHAR(30) NOT NULL DEFAULT 'PROCESSING',
payment_method VARCHAR(50),
external_transaction_id VARCHAR(100),
refund_transaction_id VARCHAR(100),
refund_reason VARCHAR(500),
failure_reason VARCHAR(500),
created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
version BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (payment_id),
UNIQUE INDEX idx_payments_order_id (order_id),
INDEX idx_payments_customer_id (customer_id),
INDEX idx_payments_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
-- ==========================================================<mark class="obsidian-highlight">
-- INVENTORY SERVICE DATABASE
-- </mark>==========================================================
CREATE TABLE inventory_items (
product_id VARCHAR(36) NOT NULL,
product_name VARCHAR(255) NOT NULL,
available_quantity INT NOT NULL DEFAULT 0,
reserved_quantity INT NOT NULL DEFAULT 0,
warehouse_id VARCHAR(36) NOT NULL,
updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
version BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (product_id),
INDEX idx_inventory_warehouse (warehouse_id),
CONSTRAINT chk_inventory_non_negative CHECK (available_quantity >= 0 AND reserved_quantity >= 0)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE reservations (
reservation_id VARCHAR(36) NOT NULL,
order_id VARCHAR(36) NOT NULL UNIQUE,
saga_id VARCHAR(36),
status VARCHAR(20) NOT NULL DEFAULT 'RESERVED', -- RESERVED, RELEASED, FULFILLED
created_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
updated_at DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
PRIMARY KEY (reservation_id),
UNIQUE INDEX idx_reservations_order_id (order_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;12. Complete application.yml
# application.yml for Order Service
spring:
application:
name: order-service
datasource:
url: jdbc:mysql://${DB_HOST:localhost}:3306/orders_db?useSSL=true&requireSSL=true&verifyServerCertificate=false&serverTimezone=UTC&createDatabaseIfNotExist=true
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
hikari:
pool-name: OrderServiceHikariPool
minimum-idle: 5
maximum-pool-size: 20
idle-timeout: 600000 # 10 minutes
max-lifetime: 1800000 # 30 minutes
connection-timeout: 30000
validation-timeout: 5000
connection-test-query: SELECT 1
jpa:
hibernate:
ddl-auto: validate # Use Flyway in production, never auto
show-sql: false
properties:
hibernate:
dialect: org.hibernate.dialect.MySQLDialect
format_sql: true
default_batch_fetch_size: 20
jdbc:
batch_size: 20
order_inserts: true
order_updates: true
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
consumer:
group-id: order-service-group
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 10
properties:
max.poll.interval.ms: 300000
producer:
acks: all
retries: 3
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 1
# Outbox publisher scheduler
outbox:
publisher:
enabled: true
cron: "*/5 * * * * *" # Every 5 seconds
batch-size: 50
max-retries: 3
# Retry configuration for Spring Retry
spring:
retry:
enabled: true
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus
endpoint:
health:
show-details: always
metrics:
tags:
application: order-service
environment: ${ENVIRONMENT:dev}
logging:
level:
com.example: INFO
org.springframework.kafka: WARN
org.hibernate.SQL: WARN
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level [%X{traceId},%X{spanId}] %logger{36} - %msg%n"13. Handling the Outbox Pattern in Choreography
The Outbox pattern is ESSENTIAL in choreography. Without it, you have a dual-write problem:
write to DB and publish event are TWO separate operations that can fail independently.
See Part 4 - Deep Dive Implementation for the
complete Outbox implementation. Here is the key concept:
// OutboxPublisher.java - scheduled job that reads outbox and publishes to Kafka
@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxPublisher {
private final OutboxEventRepository outboxEventRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
@Scheduled(cron = "${outbox.publisher.cron:*/5 * * * * *}")
@Transactional
public void publishPendingEvents() {
List<OutboxEvent> pendingEvents = outboxEventRepository
.findTop50ByStatusOrderByCreatedAtAsc(OutboxEvent.OutboxStatus.PENDING);
for (OutboxEvent outboxEvent : pendingEvents) {
try {
Object payload = objectMapper.readValue(outboxEvent.getPayload(), Object.class);
kafkaTemplate.send(
outboxEvent.getTopicName(),
outboxEvent.getAggregateId(), // Use aggregateId as Kafka key for ordering
payload
).get(5, TimeUnit.SECONDS); // Sync send with timeout for reliability
outboxEvent.setStatus(OutboxEvent.OutboxStatus.PUBLISHED);
outboxEvent.setPublishedAt(Instant.now());
outboxEventRepository.save(outboxEvent);
} catch (Exception e) {
log.error("Failed to publish outbox event: eventId={}, topic={}",
outboxEvent.getEventId(), outboxEvent.getTopicName(), e);
outboxEvent.setRetryCount(outboxEvent.getRetryCount() + 1);
outboxEvent.setErrorMessage(e.getMessage());
if (outboxEvent.getRetryCount() >= 3) {
outboxEvent.setStatus(OutboxEvent.OutboxStatus.FAILED);
// Alert: send notification to ops team
}
outboxEventRepository.save(outboxEvent);
}
}
}
}14. Error Handling and Retry Strategies
// RetryConfig.java
@Configuration
@EnableRetry
public class RetryConfig {
@Bean
public RetryTemplate retryTemplate() {
return RetryTemplate.builder()
.maxAttempts(3)
.exponentialBackoff(1000, 2, 10000) // 1s, 2s, 4s, max 10s
.retryOn(TransientDataAccessException.class)
.retryOn(RuntimeException.class)
.notRetryOn(IllegalArgumentException.class)
.notRetryOn(PaymentDeclinedException.class) // Business error, no retry
.build();
}
}Dead Letter Topic Consumer
// DeadLetterTopicConsumer.java
@Component
@RequiredArgsConstructor
@Slf4j
public class DeadLetterTopicConsumer {
private final AlertService alertService;
private final ManualInterventionRepository interventionRepo;
@KafkaListener(
topicPattern = ".*.DLT",
groupId = "dlt-consumer-group"
)
public void handleDeadLetterEvent(
@Payload String rawMessage,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String exceptionMessage,
@Header(KafkaHeaders.EXCEPTION_STACKTRACE) String stackTrace,
Acknowledgment ack) {
log.error("Message in DLT: topic={}, error={}", topic, exceptionMessage);
// Save for manual review
ManualInterventionRecord record = ManualInterventionRecord.builder()
.topic(topic)
.rawMessage(rawMessage)
.errorMessage(exceptionMessage)
.stackTrace(stackTrace.substring(0, Math.min(stackTrace.length(), 2000)))
.createdAt(Instant.now())
.status("PENDING_REVIEW")
.build();
interventionRepo.save(record);
// Alert the operations team
alertService.sendAlert(
"Dead Letter Topic Message",
String.format("Topic: %s, Error: %s", topic, exceptionMessage)
);
ack.acknowledge();
}
}15. Testing Choreography SAGAs
// OrderSagaChoreographyIntegrationTest.java
@SpringBootTest
@Testcontainers
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class OrderSagaChoreographyIntegrationTest {
@Container
static MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0")
.withDatabaseName("test_orders")
.withUsername("test")
.withPassword("test");
@Container
static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
@Autowired
private OrderService orderService;
@Autowired
private OrderRepository orderRepository;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Test
@DisplayName("Happy path: Complete saga from order creation to confirmation")
void testHappyPath() throws InterruptedException {
// Given
PlaceOrderRequest request = buildTestOrderRequest();
// When
Order order = orderService.placeOrder(request);
// Then: simulate saga steps by publishing events
PaymentProcessedEvent paymentEvent = buildPaymentProcessedEvent(order);
kafkaTemplate.send(KafkaTopics.PAYMENT_PROCESSED, order.getOrderId(), paymentEvent);
InventoryReservedEvent inventoryEvent = buildInventoryReservedEvent(order);
kafkaTemplate.send(KafkaTopics.INVENTORY_RESERVED, order.getOrderId(), inventoryEvent);
ShipmentCreatedEvent shipmentEvent = buildShipmentCreatedEvent(order);
kafkaTemplate.send(KafkaTopics.SHIPMENT_CREATED, order.getOrderId(), shipmentEvent);
// Wait for async processing
await().atMost(30, TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.until(() -> {
Order updated = orderRepository.findById(order.getOrderId()).orElseThrow();
return updated.getStatus() == OrderStatus.CONFIRMED;
});
// Verify
Order finalOrder = orderRepository.findById(order.getOrderId()).orElseThrow();
assertThat(finalOrder.getStatus()).isEqualTo(OrderStatus.CONFIRMED);
}
@Test
@DisplayName("Compensation path: Order cancelled when inventory fails")
void testInventoryFailureCompensation() throws InterruptedException {
// Given
PlaceOrderRequest request = buildTestOrderRequest();
Order order = orderService.placeOrder(request);
// When: Payment succeeds but Inventory fails
PaymentProcessedEvent paymentEvent = buildPaymentProcessedEvent(order);
kafkaTemplate.send(KafkaTopics.PAYMENT_PROCESSED, order.getOrderId(), paymentEvent);
InventoryReservationFailedEvent failureEvent = new InventoryReservationFailedEvent(
UUID.randomUUID().toString(),
order.getSagaId(),
order.getOrderId(),
List.of("PROD-999"),
"Out of stock",
Instant.now()
);
kafkaTemplate.send(KafkaTopics.INVENTORY_RESERVATION_FAILED, order.getOrderId(), failureEvent);
// Then: Payment should be refunded, Order should be cancelled
await().atMost(30, TimeUnit.SECONDS)
.until(() -> {
Order updated = orderRepository.findById(order.getOrderId()).orElseThrow();
return updated.getStatus() == OrderStatus.CANCELLED;
});
Order cancelledOrder = orderRepository.findById(order.getOrderId()).orElseThrow();
assertThat(cancelledOrder.getStatus()).isEqualTo(OrderStatus.CANCELLED);
assertThat(cancelledOrder.getFailureReason()).contains("downstream failure");
}
}16. AWS Integration: SNS + SQS Pattern
For cross-region or cross-account saga communication, SNS + SQS fan-out is preferred over Kafka.
SNS Topic (order.events)
|
+--------------+--------------+
| | |
SQS Queue SQS Queue SQS Queue
(payment-svc) (inventory-svc) (analytics)
// AwsSnsEventPublisher.java
@Component
@RequiredArgsConstructor
@Slf4j
public class AwsSnsEventPublisher {
private final SnsClient snsClient;
private final ObjectMapper objectMapper;
@Value("${aws.sns.order-events-topic-arn}")
private String orderEventsTopicArn;
public void publishOrderCreatedEvent(OrderCreatedEvent event) {
try {
String messageBody = objectMapper.writeValueAsString(event);
PublishRequest request = PublishRequest.builder()
.topicArn(orderEventsTopicArn)
.message(messageBody)
.subject("OrderCreatedEvent")
.messageAttributes(Map.of(
"eventType", MessageAttributeValue.builder()
.dataType("String")
.stringValue("OrderCreatedEvent")
.build(),
"sagaId", MessageAttributeValue.builder()
.dataType("String")
.stringValue(event.sagaId())
.build()
))
.build();
PublishResponse response = snsClient.publish(request);
log.info("Published OrderCreatedEvent to SNS: messageId={}, orderId={}",
response.messageId(), event.orderId());
} catch (Exception e) {
log.error("Failed to publish to SNS: orderId={}", event.orderId(), e);
throw new EventPublicationException("SNS publish failed", e);
}
}
}// AwsSqsEventConsumer.java
@Component
@RequiredArgsConstructor
@Slf4j
public class AwsSqsEventConsumer {
private final SqsClient sqsClient;
private final ObjectMapper objectMapper;
private final PaymentService paymentService;
@Value("${aws.sqs.payment-service-queue-url}")
private String queueUrl;
@Scheduled(fixedDelay = 1000)
public void pollMessages() {
ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(10)
.waitTimeSeconds(20) // Long polling
.visibilityTimeout(30) // 30 seconds to process
.messageAttributeNames("All")
.build();
ReceiveMessageResponse response = sqsClient.receiveMessage(receiveRequest);
for (Message message : response.messages()) {
try {
processMessage(message);
deleteMessage(message);
} catch (Exception e) {
log.error("Failed to process SQS message: messageId={}", message.messageId(), e);
// Message will return to queue after visibility timeout expires
// After maxReceiveCount, it goes to DLQ
}
}
}
private void processMessage(Message message) {
String eventType = extractEventType(message);
switch (eventType) {
case "OrderCreatedEvent" -> {
OrderCreatedEvent event = deserialize(message.body(), OrderCreatedEvent.class);
paymentService.processPayment(event);
}
default -> log.warn("Unknown event type: {}", eventType);
}
}
private void deleteMessage(Message message) {
sqsClient.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(message.receiptHandle())
.build());
}
private String extractEventType(Message message) {
return message.messageAttributes()
.getOrDefault("eventType",
MessageAttributeValue.builder().stringValue("UNKNOWN").build())
.stringValue();
}
private <T> T deserialize(String body, Class<T> type) {
try {
// SNS wraps message in an envelope
Map<String, Object> envelope = objectMapper.readValue(body, Map.class);
String innerMessage = (String) envelope.get("Message");
return objectMapper.readValue(innerMessage, type);
} catch (Exception e) {
throw new RuntimeException("Failed to deserialize message", e);
}
}
}17. Advantages and Disadvantages
Advantages
| Advantage | Explanation |
|---|---|
| Loose coupling | Services only know about events, not other services |
| High availability | No central coordinator = no single point of failure |
| Independent scaling | Each service scales based on its event load |
| Technology diversity | Each service can use any messaging technology |
| Simplicity per service | Each service has simple, clear responsibilities |
| Natural resilience | Failures in one service do not block others |
Disadvantages
| Disadvantage | Explanation |
|---|---|
| Difficult to trace | No central view of where a saga currently is |
| Cyclic event dependencies | Service A responds to B's event, B responds to A's event |
| Testing complexity | Must simulate entire event chain end-to-end |
| Event explosion | Adding a new step requires new events, new subscriptions |
| Hard to enforce sequence | Events can arrive out of order |
| Debugging is painful | Distributed across multiple service logs and topics |
18. When to Choose Choreography
Choose Choreography When:
- You have 3-5 services maximum in the saga
- Services are truly independent teams that should not couple to a central orchestrator
- The saga flow is relatively simple and linear
- High throughput is required (no orchestrator bottleneck)
- You prefer eventual consistency with minimal infrastructure
Choose Orchestration (Part 3) When:
- Saga involves more than 5 services
- Business logic is complex with branching, conditional steps
- You need full visibility into saga state at any point
- Rollback ordering is complex
- Your team needs to debug and trace saga execution easily
19. Summary
| Topic | Key Points |
|---|---|
| Choreography concept | Services communicate via events, no central coordinator |
| Event flow | OrderCreated -> PaymentProcessed -> InventoryReserved -> ShipmentCreated |
| Compensation flow | Failure triggers reverse events for compensation |
| Idempotency | EVERY handler must check if work was already done |
| Outbox pattern | Always use outbox to avoid dual-write |
| Kafka config | ACKS=all, idempotent producer, manual ACK on consumer |
| Error handling | Retry with backoff + Dead Letter Topic |
| MySQL schema | Pessimistic locking on inventory, optimistic versioning on orders |
| Testing | Testcontainers with full event chain simulation |
Next: Part 3 - Orchestration Pattern
Discover how to build a central saga orchestrator that provides full visibility and control,
including AWS Step Functions integration.
Series Navigation: Index |
Part 1 | Part 2 |
Part 3 |
Part 4 |
Part 5 |
Part 6 |
Part 7