Consistency Models - Part 3: Java and Spring Boot Implementation
Navigation: Index | Part 1 | Part 2 | Part 3 | Part 4 | Part 5 | Part 6
Table of Contents
- Spring Boot Transaction Management Deep Dive
- Optimistic Locking with JPA @Version
- Pessimistic Locking with @Lock
- Read/Write Separation with AbstractRoutingDataSource
- The Outbox Pattern - Complete Implementation
- SAGA Pattern - Choreography-Based
- SAGA Pattern - Orchestration-Based
- Idempotency - Complete Implementation
- Distributed Locking with Redis (Redisson)
- Cache Consistency Patterns
- Fencing Tokens for Safe Writes
- Change Data Capture (CDC) Integration
- Production Configuration Reference
1. Spring Boot Transaction Management Deep Dive
The @Transactional Annotation
@Transactional is the cornerstone of consistency in Spring Boot. Understanding how it works internally is critical.
What happens internally:
- Spring creates a proxy around your class/method
- Before method execution: acquires connection, begins transaction
- Method executes
- On success: commits transaction, releases connection
- On exception: rolls back transaction, releases connection
Propagation Behaviors
@Service
public class OrderService {
@Autowired
private PaymentService paymentService;
@Autowired
private OrderRepository orderRepository;
// REQUIRED (default): Join existing transaction or create new one
@Transactional(propagation = Propagation.REQUIRED)
public void createOrder(CreateOrderRequest request) {
Order order = new Order(request);
orderRepository.save(order);
paymentService.processPayment(order); // joins THIS transaction
}
// REQUIRES_NEW: Always create a new independent transaction
// Even if called from within an existing transaction
// Use for audit logs, metrics -- should commit regardless of outer transaction
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void logAuditEvent(AuditEvent event) {
auditRepository.save(event);
// This commits even if outer transaction rolls back
}
// MANDATORY: Must be called within an existing transaction, throw otherwise
@Transactional(propagation = Propagation.MANDATORY)
public void updateOrderStatus(Long orderId, OrderStatus status) {
// Forces callers to manage transaction context explicitly
}
// NOT_SUPPORTED: Execute without transaction (suspends current if exists)
// Use for read-heavy operations where you explicitly don't want transaction overhead
@Transactional(propagation = Propagation.NOT_SUPPORTED)
public List<Product> searchProducts(String query) {
return productRepository.searchByKeyword(query);
}
// NEVER: Must NOT be called within a transaction
@Transactional(propagation = Propagation.NEVER)
public void longRunningReport() {
// Explicitly prevents accidental transaction holding during long ops
}
// NESTED: Savepoint-based nested transaction (rollback within without rolling back outer)
@Transactional(propagation = Propagation.NESTED)
public void optionalStep(Long orderId) {
// If this fails, outer transaction can continue from savepoint
}
}Isolation Levels in Production
@Service
public class AccountService {
// READ_COMMITTED: Good default for most OLTP operations
// Prevents dirty reads, allows non-repeatable reads
@Transactional(isolation = Isolation.READ_COMMITTED)
public AccountSummary getAccountSummary(Long accountId) {
return accountRepository.findSummary(accountId);
}
// REPEATABLE_READ: Required when reading same data multiple times
// MySQL default -- use when consistency within a transaction matters
@Transactional(isolation = Isolation.REPEATABLE_READ)
public void reconcileBalance(Long accountId) {
BigDecimal balance1 = accountRepository.findBalance(accountId);
// ... complex processing ...
BigDecimal balance2 = accountRepository.findBalance(accountId);
// balance1 == balance2 guaranteed (MVCC snapshot)
if (!balance1.equals(balance2)) {
throw new ReconciliationException("This should never happen!");
}
}
// SERIALIZABLE: Only for the most critical operations
// Prevents all anomalies including write skew
@Transactional(isolation = Isolation.SERIALIZABLE)
public void transferFunds(Long fromId, Long toId, BigDecimal amount) {
Account from = accountRepository.findById(fromId)
.orElseThrow(() -> new AccountNotFoundException(fromId));
Account to = accountRepository.findById(toId)
.orElseThrow(() -> new AccountNotFoundException(toId));
if (from.getBalance().compareTo(amount) < 0) {
throw new InsufficientFundsException(fromId, amount);
}
from.setBalance(from.getBalance().subtract(amount));
to.setBalance(to.getBalance().add(amount));
accountRepository.save(from);
accountRepository.save(to);
}
}Transaction Timeout Configuration
// Prevent long-running transactions that cause undo log bloat
@Transactional(timeout = 30) // 30 seconds max
public void processLargeOrder(LargeOrderRequest request) {
// Will throw TransactionTimedOutException after 30s
}# application.yml
spring:
transaction:
default-timeout: 30 # global default: 30 secondsCommon Pitfall: @Transactional in Same Class
@Service
public class UserService {
// WRONG: calling transactional method from same class bypasses proxy
public void createUserAndSendEmail(CreateUserRequest request) {
createUser(request); // @Transactional IGNORED -- no proxy
emailService.sendWelcome(request.getEmail());
}
@Transactional
public User createUser(CreateUserRequest request) {
// This transaction is ONLY active when called from OUTSIDE the class
return userRepository.save(new User(request));
}
}
// CORRECT: Use self-injection or extract to separate service
@Service
public class UserService {
@Autowired
private UserService self; // self-injection
public void createUserAndSendEmail(CreateUserRequest request) {
self.createUser(request); // Goes through proxy, @Transactional works
emailService.sendWelcome(request.getEmail());
}
@Transactional
public User createUser(CreateUserRequest request) {
return userRepository.save(new User(request));
}
}2. Optimistic Locking with JPA @Version
When to Use
Use optimistic locking when:
- Read operations far outnumber write operations (reads don't acquire locks)
- Conflicts are rare but possible
- You prefer failing with an error over waiting (non-blocking behavior)
- Best for: e-commerce (low contention), user profile updates, CMS systems
Complete Entity Setup
@Entity
@Table(name = "products")
@EntityListeners(AuditingEntityListener.class)
public class Product {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private String name;
@Column(nullable = false, precision = 12, scale = 2)
private BigDecimal price;
@Column(nullable = false)
private Integer stockQuantity;
@Version
@Column(nullable = false)
private Long version; // Auto-managed by JPA; incremented on every update
@CreatedDate
private Instant createdAt;
@LastModifiedDate
private Instant updatedAt;
public void reduceStock(int quantity) {
if (this.stockQuantity < quantity) {
throw new InsufficientStockException(this.id, this.stockQuantity, quantity);
}
this.stockQuantity -= quantity;
}
// constructor, getters, setters
}Repository
@Repository
public interface ProductRepository extends JpaRepository<Product, Long> {
@Query("SELECT p FROM Product p WHERE p.id = :id")
Optional<Product> findById(@Param("id") Long id);
// Useful for reporting without version conflicts
@Lock(LockModeType.OPTIMISTIC)
@Query("SELECT p FROM Product p WHERE p.id = :id")
Optional<Product> findByIdOptimistic(@Param("id") Long id);
}Service with Retry Logic
@Service
@Slf4j
public class InventoryService {
@Autowired
private ProductRepository productRepository;
// Retry on OptimisticLockingFailureException -- another transaction won the race
@Retryable(
retryFor = OptimisticLockingFailureException.class,
maxAttempts = 3,
backoff = @Backoff(delay = 50, multiplier = 2, maxDelay = 500)
)
@Transactional
public void reserveStock(Long productId, int quantity) {
Product product = productRepository.findById(productId)
.orElseThrow(() -> new ProductNotFoundException(productId));
product.reduceStock(quantity); // throws if insufficient
productRepository.save(product);
// JPA generates: UPDATE products SET stock_quantity=?, version=? WHERE id=? AND version=?
// If version mismatch: throws OptimisticLockingFailureException
log.debug("Reserved {} units of product {}. New stock: {}, version: {}",
quantity, productId, product.getStockQuantity(), product.getVersion());
}
@Recover
public void recoverFromLockConflict(OptimisticLockingFailureException ex,
Long productId, int quantity) {
log.error("Failed to reserve stock for product {} after 3 attempts: {}",
productId, ex.getMessage());
throw new StockReservationException("Could not reserve stock due to high contention", ex);
}
}What the SQL Looks Like
-- JPA-generated UPDATE with version check
UPDATE products
SET name = ?,
price = ?,
stock_quantity = ?,
updated_at = ?,
version = 11 -- new version (old + 1)
WHERE id = ?
AND version = 10; -- optimistic lock check
-- If 0 rows affected: another transaction updated it first --> OptimisticLockingFailureException3. Pessimistic Locking with @Lock
When to Use
Use pessimistic locking when:
- Conflicts are frequent (high-contention scenarios)
- You cannot tolerate retries (e.g., payment processing where idempotency is complex)
- Critical operations: bank transfers, inventory decrements for flash sales
Repository with Lock Types
@Repository
public interface AccountRepository extends JpaRepository<Account, Long> {
// PESSIMISTIC_WRITE: SELECT ... FOR UPDATE
// Exclusive lock -- no other transaction can read or write this row
@Lock(LockModeType.PESSIMISTIC_WRITE)
@QueryHints({
@QueryHint(name = "javax.persistence.lock.timeout", value = "5000") // 5 second timeout
})
@Query("SELECT a FROM Account a WHERE a.id = :id")
Optional<Account> findByIdForUpdate(@Param("id") Long id);
// PESSIMISTIC_READ: SELECT ... FOR SHARE (or LOCK IN SHARE MODE)
// Shared lock -- others can read but not write
@Lock(LockModeType.PESSIMISTIC_READ)
@Query("SELECT a FROM Account a WHERE a.id = :id")
Optional<Account> findByIdForRead(@Param("id") Long id);
// Lock multiple accounts in one query (avoids N+1 lock acquisition)
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("SELECT a FROM Account a WHERE a.id IN :ids ORDER BY a.id")
List<Account> findByIdsForUpdate(@Param("ids") List<Long> ids);
}Service with Deadlock Prevention
@Service
@Slf4j
public class BankTransferService {
@Autowired
private AccountRepository accountRepository;
@Autowired
private TransactionRecordRepository transactionRecordRepository;
@Transactional(isolation = Isolation.READ_COMMITTED, timeout = 30)
public TransferResult transfer(Long fromAccountId, Long toAccountId,
BigDecimal amount, String reference) {
// CRITICAL: Always acquire locks in consistent order (by ID) to prevent deadlock
// If T1 locks account 5 then 10, and T2 locks account 10 then 5 --> deadlock
// If both always lock the lower ID first --> no deadlock
Long firstLockId = Math.min(fromAccountId, toAccountId);
Long secondLockId = Math.max(fromAccountId, toAccountId);
// Acquire both locks in order
List<Long> sortedIds = List.of(firstLockId, secondLockId);
List<Account> lockedAccounts = accountRepository.findByIdsForUpdate(sortedIds);
if (lockedAccounts.size() != 2) {
throw new AccountNotFoundException("One or both accounts not found");
}
Account from = lockedAccounts.stream()
.filter(a -> a.getId().equals(fromAccountId)).findFirst().orElseThrow();
Account to = lockedAccounts.stream()
.filter(a -> a.getId().equals(toAccountId)).findFirst().orElseThrow();
if (from.getBalance().compareTo(amount) < 0) {
throw new InsufficientFundsException(fromAccountId, amount, from.getBalance());
}
from.debit(amount);
to.credit(amount);
accountRepository.save(from);
accountRepository.save(to);
// Record the transaction
TransactionRecord record = TransactionRecord.builder()
.fromAccountId(fromAccountId)
.toAccountId(toAccountId)
.amount(amount)
.reference(reference)
.status(TransactionStatus.COMPLETED)
.executedAt(Instant.now())
.build();
transactionRecordRepository.save(record);
log.info("Transfer completed: {} -> {} amount={} ref={}",
fromAccountId, toAccountId, amount, reference);
return new TransferResult(record.getId(), TransactionStatus.COMPLETED);
}
}Lock Timeout Configuration
# application.yml
spring:
jpa:
properties:
javax:
persistence:
lock:
timeout: 5000 # 5 seconds -- throw LockTimeoutException after this-- MySQL: Set innodb lock wait timeout
SET SESSION innodb_lock_wait_timeout = 5; -- 5 seconds4. Read/Write Separation with AbstractRoutingDataSource
Why Read/Write Separation Matters
Aurora MySQL supports up to 15 read replicas. Routing read traffic to replicas:
- Reduces load on the primary writer
- Scales read throughput horizontally
- But introduces eventual consistency (replica may lag 10-100ms)
DataSource Configuration
@Configuration
@Slf4j
public class DataSourceConfig {
@Bean
@ConfigurationProperties("spring.datasource.write")
public DataSourceProperties writeDataSourceProperties() {
return new DataSourceProperties();
}
@Bean
@ConfigurationProperties("spring.datasource.read")
public DataSourceProperties readDataSourceProperties() {
return new DataSourceProperties();
}
@Bean("writeDataSource")
@ConfigurationProperties("spring.datasource.write.hikari")
public HikariDataSource writeDataSource(
@Qualifier("writeDataSourceProperties") DataSourceProperties properties) {
HikariDataSource ds = properties.initializeDataSourceBuilder()
.type(HikariDataSource.class)
.build();
ds.setPoolName("WritePool");
ds.setMaximumPoolSize(20);
ds.setMinimumIdle(5);
ds.setConnectionTimeout(3000);
ds.setIdleTimeout(600000);
ds.setMaxLifetime(1800000);
return ds;
}
@Bean("readDataSource")
@ConfigurationProperties("spring.datasource.read.hikari")
public HikariDataSource readDataSource(
@Qualifier("readDataSourceProperties") DataSourceProperties properties) {
HikariDataSource ds = properties.initializeDataSourceBuilder()
.type(HikariDataSource.class)
.build();
ds.setPoolName("ReadPool");
ds.setMaximumPoolSize(40); // More connections for read-heavy workloads
ds.setMinimumIdle(10);
ds.setReadOnly(true); // Mark pool as read-only
return ds;
}
@Primary
@Bean("routingDataSource")
public DataSource routingDataSource(
@Qualifier("writeDataSource") DataSource writeDataSource,
@Qualifier("readDataSource") DataSource readDataSource) {
RoutingDataSource routingDataSource = new RoutingDataSource();
Map<Object, Object> dataSources = new HashMap<>();
dataSources.put(DataSourceType.WRITE, writeDataSource);
dataSources.put(DataSourceType.READ, readDataSource);
routingDataSource.setTargetDataSources(dataSources);
routingDataSource.setDefaultTargetDataSource(writeDataSource);
return routingDataSource;
}
// Wrap in LazyConnectionDataSourceProxy to defer connection acquisition
// until actually needed -- avoids grabbing connections for @Transactional methods
// that end up not needing the DB
@Bean
@Primary
public DataSource dataSource(@Qualifier("routingDataSource") DataSource routing) {
return new LazyConnectionDataSourceProxy(routing);
}
}Routing Logic
public enum DataSourceType {
WRITE, READ
}
public class DataSourceContextHolder {
private static final ThreadLocal<DataSourceType> context =
new InheritableThreadLocal<>();
public static void setDataSourceType(DataSourceType type) {
context.set(type);
}
public static DataSourceType getDataSourceType() {
return context.get();
}
public static void clearDataSourceType() {
context.remove();
}
}
public class RoutingDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
// Route to READ replica if:
// 1. Explicitly set in context, OR
// 2. Current transaction is read-only
DataSourceType type = DataSourceContextHolder.getDataSourceType();
if (type != null) {
return type;
}
return TransactionSynchronizationManager.isCurrentTransactionReadOnly()
? DataSourceType.READ
: DataSourceType.WRITE;
}
}Usage in Services
@Service
public class ProductService {
@Autowired
private ProductRepository productRepository;
// Routes to READ replica automatically (readOnly = true)
@Transactional(readOnly = true)
public Product findProduct(Long id) {
return productRepository.findById(id)
.orElseThrow(() -> new ProductNotFoundException(id));
}
// Routes to WRITE primary (not readOnly)
@Transactional
public Product updateProduct(Long id, UpdateProductRequest request) {
Product product = productRepository.findById(id)
.orElseThrow(() -> new ProductNotFoundException(id));
product.update(request);
return productRepository.save(product);
}
// Force primary read even though it's read-only (for read-your-writes)
@Transactional(readOnly = true)
public Product findProductFresh(Long id) {
// Override routing to always use primary
DataSourceContextHolder.setDataSourceType(DataSourceType.WRITE);
try {
return productRepository.findById(id)
.orElseThrow(() -> new ProductNotFoundException(id));
} finally {
DataSourceContextHolder.clearDataSourceType();
}
}
}application.yml for Read/Write Separation
spring:
datasource:
write:
url: jdbc:mysql://aurora-cluster-writer.cluster-xxxx.us-east-1.rds.amazonaws.com:3306/yourdb
username: ${DB_WRITE_USERNAME}
password: ${DB_WRITE_PASSWORD}
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
pool-name: WritePool
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 3000
read:
url: jdbc:mysql://aurora-cluster-reader.cluster-ro-xxxx.us-east-1.rds.amazonaws.com:3306/yourdb
username: ${DB_READ_USERNAME}
password: ${DB_READ_PASSWORD}
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
pool-name: ReadPool
maximum-pool-size: 40
minimum-idle: 10
connection-timeout: 30005. The Outbox Pattern - Complete Implementation
Why the Outbox Pattern
The dual-write problem: You update your database AND publish an event to Kafka. These are two separate systems. If:
- DB commit succeeds, Kafka publish fails: event lost, downstream services never notified
- Kafka publish succeeds, DB commit fails: phantom events for data that doesn't exist
The Outbox Pattern: Write the event to a DB table (outbox) in the same transaction as your business data. A separate process reads the outbox and publishes to Kafka. Atomicity guaranteed at DB level.
Schema
CREATE TABLE outbox_events (
id VARCHAR(36) NOT NULL PRIMARY KEY,
aggregate_type VARCHAR(100) NOT NULL, -- e.g., ORDER, USER, PAYMENT
aggregate_id VARCHAR(100) NOT NULL, -- e.g., order ID
event_type VARCHAR(200) NOT NULL, -- e.g., ORDER_CREATED
payload TEXT NOT NULL, -- JSON event body
status ENUM('PENDING', 'PROCESSING', 'SENT', 'FAILED') NOT NULL DEFAULT 'PENDING',
retry_count INT NOT NULL DEFAULT 0,
created_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
processed_at DATETIME(3),
INDEX idx_outbox_status_created (status, created_at),
INDEX idx_outbox_aggregate (aggregate_type, aggregate_id)
);Entity and Repository
@Entity
@Table(name = "outbox_events")
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OutboxEvent {
@Id
private String id;
@Column(nullable = false, length = 100)
private String aggregateType;
@Column(nullable = false, length = 100)
private String aggregateId;
@Column(nullable = false, length = 200)
private String eventType;
@Column(nullable = false, columnDefinition = "TEXT")
private String payload;
@Column(nullable = false)
@Enumerated(EnumType.STRING)
private OutboxStatus status;
@Column(nullable = false)
private Integer retryCount;
@Column(nullable = false)
private Instant createdAt;
private Instant processedAt;
}
@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, String> {
// Fetch pending events in order, with a limit -- avoids large result sets
@Query("SELECT e FROM OutboxEvent e WHERE e.status = 'PENDING' " +
"ORDER BY e.createdAt ASC")
List<OutboxEvent> findPendingEvents(Pageable pageable);
// For retry: fetch failed events within retry limit
@Query("SELECT e FROM OutboxEvent e WHERE e.status = 'FAILED' " +
"AND e.retryCount < :maxRetries ORDER BY e.createdAt ASC")
List<OutboxEvent> findRetryableEvents(@Param("maxRetries") int maxRetries, Pageable pageable);
}Service Layer - Writing to Outbox
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxEventRepository;
private final ObjectMapper objectMapper;
@Transactional // Both order save and outbox write happen atomically
public Order createOrder(CreateOrderRequest request) {
Order order = Order.builder()
.customerId(request.getCustomerId())
.items(request.getItems())
.totalAmount(calculateTotal(request.getItems()))
.status(OrderStatus.PENDING)
.createdAt(Instant.now())
.build();
orderRepository.save(order);
// Write event to outbox IN SAME TRANSACTION
publishOrderCreatedEvent(order);
log.info("Order {} created, outbox event enqueued", order.getId());
return order;
}
@Transactional
public Order confirmOrder(Long orderId) {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
order.setStatus(OrderStatus.CONFIRMED);
orderRepository.save(order);
publishOrderConfirmedEvent(order);
return order;
}
private void publishOrderCreatedEvent(Order order) {
try {
OrderCreatedEventPayload payload = OrderCreatedEventPayload.builder()
.orderId(order.getId())
.customerId(order.getCustomerId())
.totalAmount(order.getTotalAmount())
.items(order.getItems())
.build();
OutboxEvent event = OutboxEvent.builder()
.id(UUID.randomUUID().toString())
.aggregateType("ORDER")
.aggregateId(order.getId().toString())
.eventType("ORDER_CREATED")
.payload(objectMapper.writeValueAsString(payload))
.status(OutboxStatus.PENDING)
.retryCount(0)
.createdAt(Instant.now())
.build();
outboxEventRepository.save(event);
} catch (JsonProcessingException e) {
throw new EventSerializationException("Failed to serialize ORDER_CREATED event", e);
}
}
private void publishOrderConfirmedEvent(Order order) {
// Similar pattern...
}
}Outbox Publisher - Polling-Based
@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxEventPublisher {
private final OutboxEventRepository outboxEventRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
private final OutboxTopicMapper topicMapper;
private static final int BATCH_SIZE = 50;
private static final int MAX_RETRIES = 5;
// Runs every second, processes pending events
@Scheduled(fixedDelay = 1000, initialDelay = 5000)
public void publishPendingEvents() {
processBatch(outboxEventRepository.findPendingEvents(
PageRequest.of(0, BATCH_SIZE)));
processBatch(outboxEventRepository.findRetryableEvents(
MAX_RETRIES, PageRequest.of(0, BATCH_SIZE)));
}
@Transactional
public void processBatch(List<OutboxEvent> events) {
for (OutboxEvent event : events) {
processEvent(event);
}
}
private void processEvent(OutboxEvent event) {
// Mark as PROCESSING to prevent concurrent publishers picking it up
event.setStatus(OutboxStatus.PROCESSING);
outboxEventRepository.save(event);
try {
String topic = topicMapper.getTopic(event.getAggregateType(), event.getEventType());
// Use aggregate ID as Kafka partition key for ordering
kafkaTemplate.send(topic, event.getAggregateId(), event.getPayload())
.get(5, TimeUnit.SECONDS); // synchronous -- wait for ack
event.setStatus(OutboxStatus.SENT);
event.setProcessedAt(Instant.now());
outboxEventRepository.save(event);
log.debug("Published {} event for {} {}",
event.getEventType(), event.getAggregateType(), event.getAggregateId());
} catch (Exception e) {
log.error("Failed to publish event {} of type {}: {}",
event.getId(), event.getEventType(), e.getMessage());
event.setRetryCount(event.getRetryCount() + 1);
if (event.getRetryCount() >= MAX_RETRIES) {
event.setStatus(OutboxStatus.FAILED);
log.error("Outbox event {} permanently failed after {} retries",
event.getId(), MAX_RETRIES);
// Alert on this! Something needs manual intervention
alertService.sendOutboxFailureAlert(event);
} else {
event.setStatus(OutboxStatus.PENDING); // Will be retried
}
outboxEventRepository.save(event);
}
}
}Cleanup Old Sent Events
@Component
@RequiredArgsConstructor
public class OutboxCleanupJob {
private final OutboxEventRepository outboxEventRepository;
// Run daily -- clean up sent events older than 7 days
@Scheduled(cron = "0 0 2 * * *") // 2 AM daily
@Transactional
public void cleanupSentEvents() {
Instant cutoff = Instant.now().minus(Duration.ofDays(7));
outboxEventRepository.deleteByStatusAndProcessedAtBefore(OutboxStatus.SENT, cutoff);
}
}6. SAGA Pattern - Choreography-Based
What Is a SAGA?
A SAGA is a sequence of local transactions that together form a distributed transaction. Each local transaction updates a single service's database and publishes an event. If any step fails, compensating transactions are executed to undo previous steps.
Choreography: No central coordinator. Services react to events and emit new events.
Order Processing SAGA Example
[Order Service] [Inventory Service] [Payment Service]
| | |
Create Order | |
|-- ORDER_CREATED ------>| |
| Reserve Inventory |
| |-- INVENTORY_RESERVED ->|
| | Process Payment
|<----------------------|<-- PAYMENT_PROCESSED --|
Confirm Order |
Implementation
// ORDER SERVICE
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderSagaService {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxEventRepository;
private final ObjectMapper objectMapper;
@Transactional
public Order startOrderSaga(CreateOrderRequest request) {
Order order = Order.builder()
.customerId(request.getCustomerId())
.items(request.getItems())
.totalAmount(calculateTotal(request.getItems()))
.status(OrderStatus.PENDING)
.build();
orderRepository.save(order);
// Trigger inventory reservation via outbox
emitEvent("ORDER", order.getId().toString(), "ORDER_CREATED",
new OrderCreatedEvent(order.getId(), order.getItems(), order.getTotalAmount()));
return order;
}
// Step 3: Inventory reserved -- proceed to payment
@KafkaListener(topics = "inventory-events", groupId = "order-service")
@Transactional
public void handleInventoryEvent(ConsumerRecord<String, String> record) throws Exception {
BaseEvent event = objectMapper.readValue(record.value(), BaseEvent.class);
if ("INVENTORY_RESERVED".equals(event.getType())) {
InventoryReservedEvent payload = objectMapper.readValue(
record.value(), InventoryReservedEvent.class);
onInventoryReserved(payload);
} else if ("INVENTORY_RESERVATION_FAILED".equals(event.getType())) {
InventoryFailedEvent payload = objectMapper.readValue(
record.value(), InventoryFailedEvent.class);
onInventoryReservationFailed(payload);
}
}
private void onInventoryReserved(InventoryReservedEvent event) {
Order order = orderRepository.findById(event.getOrderId()).orElseThrow();
order.setStatus(OrderStatus.INVENTORY_RESERVED);
orderRepository.save(order);
// Trigger payment
emitEvent("ORDER", order.getId().toString(), "PAYMENT_REQUESTED",
new PaymentRequestedEvent(order.getId(), order.getCustomerId(),
order.getTotalAmount()));
}
private void onInventoryReservationFailed(InventoryFailedEvent event) {
Order order = orderRepository.findById(event.getOrderId()).orElseThrow();
order.setStatus(OrderStatus.FAILED);
order.setFailureReason("Inventory not available: " + event.getReason());
orderRepository.save(order);
// No compensation needed -- nothing was reserved
}
@KafkaListener(topics = "payment-events", groupId = "order-service")
@Transactional
public void handlePaymentEvent(ConsumerRecord<String, String> record) throws Exception {
BaseEvent event = objectMapper.readValue(record.value(), BaseEvent.class);
if ("PAYMENT_PROCESSED".equals(event.getType())) {
PaymentProcessedEvent payload = objectMapper.readValue(
record.value(), PaymentProcessedEvent.class);
onPaymentProcessed(payload);
} else if ("PAYMENT_FAILED".equals(event.getType())) {
PaymentFailedEvent payload = objectMapper.readValue(
record.value(), PaymentFailedEvent.class);
onPaymentFailed(payload);
}
}
private void onPaymentProcessed(PaymentProcessedEvent event) {
Order order = orderRepository.findById(event.getOrderId()).orElseThrow();
order.setStatus(OrderStatus.CONFIRMED);
orderRepository.save(order);
log.info("Order {} successfully completed", event.getOrderId());
}
private void onPaymentFailed(PaymentFailedEvent event) {
Order order = orderRepository.findById(event.getOrderId()).orElseThrow();
order.setStatus(OrderStatus.PAYMENT_FAILED);
orderRepository.save(order);
// COMPENSATING TRANSACTION: Release the reserved inventory
emitEvent("ORDER", order.getId().toString(), "INVENTORY_RELEASE_REQUESTED",
new InventoryReleaseEvent(order.getId(), order.getItems(),
"Payment failed: " + event.getFailureReason()));
}
private void emitEvent(String aggregateType, String aggregateId,
String eventType, Object payload) {
try {
OutboxEvent outboxEvent = OutboxEvent.builder()
.id(UUID.randomUUID().toString())
.aggregateType(aggregateType)
.aggregateId(aggregateId)
.eventType(eventType)
.payload(objectMapper.writeValueAsString(payload))
.status(OutboxStatus.PENDING)
.retryCount(0)
.createdAt(Instant.now())
.build();
outboxEventRepository.save(outboxEvent);
} catch (JsonProcessingException e) {
throw new EventSerializationException("Failed to serialize " + eventType, e);
}
}
}7. SAGA Pattern - Orchestration-Based
Orchestration vs Choreography
| Aspect | Choreography | Orchestration |
|---|---|---|
| Coordination | Services react to events | Central orchestrator directs |
| Coupling | Loose -- services don't know each other | Orchestrator knows all steps |
| Visibility | Hard to track overall saga state | Saga state is centralized |
| Failure handling | Distributed -- each service handles | Centralized in orchestrator |
| Recommended for | Simple flows (2-3 steps) | Complex flows (5+ steps) |
Orchestration Implementation
@Entity
@Table(name = "order_sagas")
public class OrderSaga {
@Id
private String sagaId;
private Long orderId;
@Enumerated(EnumType.STRING)
private OrderSagaState state;
private String failureReason;
private Instant startedAt;
private Instant completedAt;
private Integer compensationStep; // which compensation step we're on
}
public enum OrderSagaState {
STARTED,
INVENTORY_RESERVING,
INVENTORY_RESERVED,
PAYMENT_PROCESSING,
COMPLETED,
COMPENSATING_INVENTORY,
COMPENSATING_PAYMENT,
FAILED
}
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderSagaOrchestrator {
private final OrderSagaRepository sagaRepository;
private final OrderRepository orderRepository;
private final InventoryServiceClient inventoryClient; // Feign or RestTemplate
private final PaymentServiceClient paymentClient;
private final ApplicationEventPublisher eventPublisher;
@Transactional
public String startSaga(Long orderId) {
Order order = orderRepository.findById(orderId).orElseThrow();
OrderSaga saga = OrderSaga.builder()
.sagaId(UUID.randomUUID().toString())
.orderId(orderId)
.state(OrderSagaState.STARTED)
.startedAt(Instant.now())
.build();
sagaRepository.save(saga);
// Step 1: Reserve inventory
transitionTo(saga, OrderSagaState.INVENTORY_RESERVING);
inventoryClient.reserve(new ReserveRequest(saga.getSagaId(), order.getItems()));
return saga.getSagaId();
}
@Transactional
public void onInventoryReserved(String sagaId) {
OrderSaga saga = sagaRepository.findBySagaId(sagaId).orElseThrow();
if (saga.getState() != OrderSagaState.INVENTORY_RESERVING) {
log.warn("Unexpected state {} for saga {}", saga.getState(), sagaId);
return;
}
Order order = orderRepository.findById(saga.getOrderId()).orElseThrow();
transitionTo(saga, OrderSagaState.INVENTORY_RESERVED);
// Step 2: Process payment
transitionTo(saga, OrderSagaState.PAYMENT_PROCESSING);
paymentClient.process(new PaymentRequest(sagaId, order.getCustomerId(),
order.getTotalAmount()));
}
@Transactional
public void onPaymentProcessed(String sagaId) {
OrderSaga saga = sagaRepository.findBySagaId(sagaId).orElseThrow();
Order order = orderRepository.findById(saga.getOrderId()).orElseThrow();
order.setStatus(OrderStatus.CONFIRMED);
orderRepository.save(order);
transitionTo(saga, OrderSagaState.COMPLETED);
saga.setCompletedAt(Instant.now());
sagaRepository.save(saga);
log.info("Saga {} completed successfully for order {}", sagaId, saga.getOrderId());
}
@Transactional
public void onPaymentFailed(String sagaId, String reason) {
OrderSaga saga = sagaRepository.findBySagaId(sagaId).orElseThrow();
saga.setFailureReason(reason);
// Start compensation
transitionTo(saga, OrderSagaState.COMPENSATING_INVENTORY);
inventoryClient.release(new ReleaseRequest(sagaId, "Payment failed: " + reason));
}
@Transactional
public void onInventoryReleased(String sagaId) {
OrderSaga saga = sagaRepository.findBySagaId(sagaId).orElseThrow();
Order order = orderRepository.findById(saga.getOrderId()).orElseThrow();
order.setStatus(OrderStatus.FAILED);
order.setFailureReason(saga.getFailureReason());
orderRepository.save(order);
transitionTo(saga, OrderSagaState.FAILED);
saga.setCompletedAt(Instant.now());
sagaRepository.save(saga);
}
private void transitionTo(OrderSaga saga, OrderSagaState newState) {
log.info("Saga {} transitioning from {} to {}", saga.getSagaId(), saga.getState(), newState);
saga.setState(newState);
sagaRepository.save(saga);
}
}8. Idempotency - Complete Implementation
Why Idempotency Is Critical for Consistency
In distributed systems, operations may be retried due to:
- Network timeouts
- Service restarts during processing
- Load balancer retries
- Client retry logic
Without idempotency, retried operations cause duplicate effects (double charges, duplicate orders, etc.).
Database Schema for Idempotency Keys
CREATE TABLE idempotency_keys (
idempotency_key VARCHAR(100) NOT NULL PRIMARY KEY,
request_hash VARCHAR(64) NOT NULL, -- SHA-256 of request body
operation_type VARCHAR(100) NOT NULL,
status ENUM('PROCESSING', 'COMPLETED', 'FAILED') NOT NULL,
response_body TEXT, -- Cached response for duplicate requests
response_status INT,
created_at DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
completed_at DATETIME(3),
expires_at DATETIME(3) NOT NULL,
INDEX idx_expires (expires_at)
);Idempotency Filter (HTTP)
@Component
@RequiredArgsConstructor
@Slf4j
public class IdempotencyFilter extends OncePerRequestFilter {
private final IdempotencyKeyRepository idempotencyKeyRepository;
private final ObjectMapper objectMapper;
private static final String IDEMPOTENCY_KEY_HEADER = "Idempotency-Key";
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain filterChain)
throws ServletException, IOException {
// Only apply to mutating operations
if (!isMutatingMethod(request.getMethod())) {
filterChain.doFilter(request, response);
return;
}
String idempotencyKey = request.getHeader(IDEMPOTENCY_KEY_HEADER);
if (idempotencyKey == null || idempotencyKey.isBlank()) {
response.setStatus(HttpStatus.BAD_REQUEST.value());
writeError(response, "Idempotency-Key header is required");
return;
}
// Cache the request body for hashing
ContentCachingRequestWrapper wrappedRequest = new ContentCachingRequestWrapper(request);
// Check if key already exists
Optional<IdempotencyRecord> existing =
idempotencyKeyRepository.findByKey(idempotencyKey);
if (existing.isPresent()) {
IdempotencyRecord record = existing.get();
if (record.getStatus() <mark class="obsidian-highlight"> IdempotencyStatus.PROCESSING) {
// Still processing -- return 409 Conflict
response.setStatus(HttpStatus.CONFLICT.value());
writeError(response, "Request is still processing");
return;
}
if (record.getStatus() </mark> IdempotencyStatus.COMPLETED) {
// Already processed -- return cached response
log.debug("Returning cached response for idempotency key: {}", idempotencyKey);
response.setStatus(record.getResponseStatus());
response.setContentType("application/json");
response.getWriter().write(record.getResponseBody());
return;
}
}
// Mark as processing
IdempotencyRecord record = IdempotencyRecord.builder()
.idempotencyKey(idempotencyKey)
.operationType(request.getRequestURI())
.status(IdempotencyStatus.PROCESSING)
.createdAt(Instant.now())
.expiresAt(Instant.now().plus(Duration.ofHours(24)))
.build();
try {
idempotencyKeyRepository.save(record);
} catch (DataIntegrityViolationException e) {
// Race condition: another request just inserted the same key
response.setStatus(HttpStatus.CONFLICT.value());
writeError(response, "Duplicate request processing");
return;
}
// Capture response
ContentCachingResponseWrapper wrappedResponse =
new ContentCachingResponseWrapper(response);
filterChain.doFilter(wrappedRequest, wrappedResponse);
// Store response for future duplicate requests
String responseBody = new String(wrappedResponse.getContentAsByteArray(),
StandardCharsets.UTF_8);
record.setStatus(IdempotencyStatus.COMPLETED);
record.setResponseBody(responseBody);
record.setResponseStatus(wrappedResponse.getStatus());
record.setCompletedAt(Instant.now());
idempotencyKeyRepository.save(record);
wrappedResponse.copyBodyToResponse();
}
private boolean isMutatingMethod(String method) {
return "POST".equals(method) || "PUT".equals(method) || "PATCH".equals(method);
}
private void writeError(HttpServletResponse response, String message) throws IOException {
response.setContentType("application/json");
response.getWriter().write("{\"error\":\"" + message + "\"}");
}
}Service-Level Idempotency
@Service
@RequiredArgsConstructor
public class PaymentService {
private final PaymentRepository paymentRepository;
private final ExternalPaymentGateway paymentGateway;
@Transactional
public PaymentResult processPayment(String idempotencyKey, PaymentRequest request) {
// Check for existing payment with this key
Optional<Payment> existing = paymentRepository
.findByIdempotencyKey(idempotencyKey);
if (existing.isPresent()) {
log.info("Returning existing payment for idempotency key: {}", idempotencyKey);
return PaymentResult.from(existing.get()); // Return same result
}
// Process new payment
ExternalPaymentResponse externalResult = paymentGateway.charge(
request.getCustomerId(),
request.getAmount(),
request.getCurrency()
);
Payment payment = Payment.builder()
.idempotencyKey(idempotencyKey)
.customerId(request.getCustomerId())
.amount(request.getAmount())
.status(externalResult.isSuccess() ? PaymentStatus.SUCCESS : PaymentStatus.FAILED)
.externalTransactionId(externalResult.getTransactionId())
.processedAt(Instant.now())
.build();
paymentRepository.save(payment);
return PaymentResult.from(payment);
}
}9. Distributed Locking with Redis (Redisson)
Why Distributed Locks
When multiple instances of your Spring Boot application run concurrently, a local synchronized block is not enough. You need a lock that spans all instances.
Use cases: Preventing duplicate scheduled job execution, protecting critical sections across pods.
Redisson Configuration
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
// Cluster mode for production
config.useClusterServers()
.setScanInterval(2000)
.addNodeAddress(
"redis://redis-node-1.your-cluster.cache.amazonaws.com:6379",
"redis://redis-node-2.your-cluster.cache.amazonaws.com:6379",
"redis://redis-node-3.your-cluster.cache.amazonaws.com:6379"
)
.setPassword(System.getenv("REDIS_PASSWORD"))
.setConnectTimeout(3000)
.setTimeout(3000)
.setRetryAttempts(3)
.setRetryInterval(1500);
return Redisson.create(config);
}
}Usage in Service
@Service
@RequiredArgsConstructor
@Slf4j
public class PaymentProcessingService {
private final RedissonClient redissonClient;
private final PaymentRepository paymentRepository;
public PaymentResult processPaymentExclusively(String orderId,
PaymentRequest request) {
// Distributed lock key scoped to the specific order
String lockKey = "payment:lock:order:" + orderId;
RLock lock = redissonClient.getLock(lockKey);
try {
// Try to acquire lock -- wait up to 10 seconds, hold for up to 30 seconds
boolean acquired = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (!acquired) {
throw new PaymentConcurrencyException(
"Another payment is being processed for order " + orderId);
}
return doProcessPayment(orderId, request);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PaymentProcessingException("Lock acquisition interrupted", e);
} finally {
// Always release the lock -- even on exception
if (lock.isHeldByCurrentThread()) {
lock.unlock();
log.debug("Released lock for order {}", orderId);
}
}
}
@Transactional
private PaymentResult doProcessPayment(String orderId, PaymentRequest request) {
// Check for duplicate (belt and suspenders with the lock)
if (paymentRepository.existsByOrderId(orderId)) {
return PaymentResult.fromExisting(paymentRepository.findByOrderId(orderId));
}
// ... actual payment processing
}
}10. Cache Consistency Patterns
Cache-Aside (Lazy Loading) -- Most Common
Read: Check cache first. If miss, read from DB, populate cache, return.
Write: Write to DB. Invalidate or update cache.
@Service
@RequiredArgsConstructor
public class ProductCatalogService {
private final ProductRepository productRepository;
private final RedisTemplate<String, Product> redisTemplate;
private static final Duration CACHE_TTL = Duration.ofMinutes(15);
public Product getProduct(Long productId) {
String cacheKey = "product:" + productId;
// 1. Try cache first
Product cached = redisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
return cached;
}
// 2. Cache miss -- read from database
Product product = productRepository.findById(productId)
.orElseThrow(() -> new ProductNotFoundException(productId));
// 3. Populate cache with TTL
redisTemplate.opsForValue().set(cacheKey, product, CACHE_TTL);
return product;
}
@Transactional
public Product updateProduct(Long productId, UpdateProductRequest request) {
Product product = productRepository.findById(productId).orElseThrow();
product.update(request);
productRepository.save(product);
// Invalidate cache -- next read will fetch fresh from DB
redisTemplate.delete("product:" + productId);
return product;
}
}Write-Through -- Strong Consistency for Writes
Write to cache and DB simultaneously (synchronously). Every write is guaranteed to update both.
@Service
@RequiredArgsConstructor
public class UserSessionService {
private final UserRepository userRepository;
private final RedisTemplate<String, UserSession> redisTemplate;
@Transactional
public UserSession updateUserSession(String userId, SessionUpdateRequest request) {
// 1. Write to database
User user = userRepository.findById(userId).orElseThrow();
user.updateSession(request);
userRepository.save(user);
// 2. Simultaneously write to cache (write-through)
UserSession session = UserSession.from(user);
redisTemplate.opsForValue().set(
"session:" + userId,
session,
Duration.ofHours(2)
);
// If cache write fails, should we roll back DB write?
// Option 1: Accept temporary inconsistency (cache will expire)
// Option 2: Throw exception and let @Transactional roll back DB write
return session;
}
}Write-Behind (Write-Back) -- High Performance
Write to cache immediately. Asynchronously flush to DB. Highest performance but risk of data loss.
@Service
@RequiredArgsConstructor
@Slf4j
public class CounterService {
private final RedisTemplate<String, Long> redisTemplate;
private final CounterRepository counterRepository;
// Increment counter in cache -- very fast
public Long increment(String counterKey) {
String redisKey = "counter:" + counterKey;
return redisTemplate.opsForValue().increment(redisKey);
}
// Flush dirty counters to DB periodically (write-behind)
@Scheduled(fixedDelay = 5000) // Every 5 seconds
public void flushCountersToDB() {
Set<String> dirtyCounters = redisTemplate.keys("counter:*");
if (dirtyCounters == null || dirtyCounters.isEmpty()) {
return;
}
for (String key : dirtyCounters) {
try {
Long value = redisTemplate.opsForValue().get(key);
if (value != null) {
String counterKey = key.substring("counter:".length());
counterRepository.upsertCounter(counterKey, value);
}
} catch (Exception e) {
log.error("Failed to flush counter {}: {}", key, e.getMessage());
}
}
}
// WARNING: Data between flushes is in memory only.
// If Redis crashes before flush, counter increments since last flush are LOST.
// Only use for non-critical counters (view counts, like counts).
}Cache Stampede Prevention (Probabilistic Early Expiry)
@Service
@RequiredArgsConstructor
public class TrendingProductService {
private final ProductRepository productRepository;
private final RedisTemplate<String, CachedProduct> redisTemplate;
// Probabilistic early expiry: recalculate before actual expiry
// to avoid all clients hitting DB simultaneously at expiry time
public List<Product> getTrendingProducts() {
String key = "trending:products";
CachedProduct cached = redisTemplate.opsForValue().get(key);
if (cached != null && !shouldRefreshEarly(cached)) {
return cached.getProducts();
}
// Refresh from DB (with lock to prevent stampede)
RLock lock = redissonClient.getLock("trending:lock");
try {
if (lock.tryLock(0, 5, TimeUnit.SECONDS)) {
// Double-check after acquiring lock
cached = redisTemplate.opsForValue().get(key);
if (cached != null && !shouldRefreshEarly(cached)) {
return cached.getProducts();
}
// Refresh
List<Product> products = productRepository.findTrending();
redisTemplate.opsForValue().set(key,
new CachedProduct(products, Instant.now(), Duration.ofMinutes(10)),
Duration.ofMinutes(10));
return products;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
}
// Could not get lock -- return stale if available
return cached != null ? cached.getProducts() : productRepository.findTrending();
}
// XFetch algorithm: probabilistic early refresh
private boolean shouldRefreshEarly(CachedProduct cached) {
double beta = 1.0; // tuning parameter (higher = more aggressive prefetch)
long ttlSeconds = cached.getTtl().getSeconds();
long ageSeconds = Duration.between(cached.getCachedAt(), Instant.now()).getSeconds();
long remainingSeconds = ttlSeconds - ageSeconds;
return remainingSeconds <= beta * Math.log(Math.random() + 1e-10) * (-ttlSeconds);
}
}11. Fencing Tokens for Safe Writes
Fencing tokens prevent "zombie writers" -- processes that believe they hold a lock (because they were paused) but the lock has been given to another process.
@Entity
@Table(name = "distributed_locks")
public class DistributedLock {
@Id
private String resourceId;
private String ownerId;
@Version // Used as fencing token
private Long fencingToken;
private Instant acquiredAt;
private Instant expiresAt;
}
@Service
@RequiredArgsConstructor
public class FencedWriteService {
private final DistributedLockRepository lockRepository;
private final ResourceRepository resourceRepository;
@Transactional
public LockResult acquireLock(String resourceId, String ownerId) {
DistributedLock lock = lockRepository.findById(resourceId)
.orElse(new DistributedLock(resourceId));
// Check if current lock is expired
if (lock.getOwnerId() != null && lock.getExpiresAt().isAfter(Instant.now())) {
throw new LockAlreadyHeldException(resourceId, lock.getOwnerId());
}
lock.setOwnerId(ownerId);
lock.setAcquiredAt(Instant.now());
lock.setExpiresAt(Instant.now().plus(Duration.ofSeconds(30)));
DistributedLock saved = lockRepository.save(lock); // increments @Version
return new LockResult(resourceId, ownerId, saved.getFencingToken());
}
// The fencing token is included in every write operation
// If the token doesn't match what's in DB (because lock was re-acquired), write is rejected
@Transactional
public void writeWithFence(String resourceId, String ownerId,
Long fencingToken, ResourceUpdate update) {
DistributedLock lock = lockRepository.findById(resourceId)
.orElseThrow(() -> new LockNotFoundException(resourceId));
// Verify we still hold the lock and token matches
if (!ownerId.equals(lock.getOwnerId())) {
throw new LockExpiredException("Lock was taken by " + lock.getOwnerId());
}
if (!fencingToken.equals(lock.getFencingToken())) {
throw new StaleTokenException("Fencing token is stale. Expected " +
lock.getFencingToken() + " but got " + fencingToken);
}
// Safe to write -- we verified our token is current
resourceRepository.update(resourceId, update);
}
}12. Change Data Capture (CDC) Integration
CDC reads the database binary log to capture every change and publish it as an event. This is the production-grade alternative to the polling-based outbox.
Debezium + Kafka (Production Setup)
# debezium-connector-config.json (Kafka Connect configuration)
# Deploy via Kafka Connect REST API
{
"name": "orders-mysql-connector",
"config":
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "aurora-cluster.cluster-xxxx.us-east-1.rds.amazonaws.com",
"database.port": "3306",
"database.user": "debezium_user",
"database.password": "${DEBEZIUM_PASSWORD}",
"database.server.id": "1",
"topic.prefix": "myapp",
"database.include.list": "orders_db",
"table.include.list": "orders_db.orders,orders_db.outbox_events",
"include.schema.changes": "false",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "${routedByValue}-events",
},
}Spring Boot Kafka Consumer for CDC Events
@Component
@RequiredArgsConstructor
@Slf4j
public class OrderCDCConsumer {
private final OrderProjectionService projectionService;
private final ObjectMapper objectMapper;
@KafkaListener(
topics = "myapp.orders_db.orders",
groupId = "order-projector",
containerFactory = "kafkaListenerContainerFactory"
)
@Transactional
public void handleOrderChange(ConsumerRecord<String, String> record) throws Exception {
DebeziumChangeEvent changeEvent = objectMapper.readValue(
record.value(), DebeziumChangeEvent.class);
switch (changeEvent.getOp()) {
case "c": // CREATE
projectionService.handleOrderCreated(changeEvent.getAfter());
break;
case "u": // UPDATE
projectionService.handleOrderUpdated(changeEvent.getBefore(),
changeEvent.getAfter());
break;
case "d": // DELETE
projectionService.handleOrderDeleted(changeEvent.getBefore());
break;
default:
log.debug("Ignoring operation type: {}", changeEvent.getOp());
}
}
}13. Production Configuration Reference
application.yml for Production
spring:
datasource:
url: jdbc:mysql://${DB_WRITE_HOST}:3306/${DB_NAME}?useSSL=true&requireSSL=true&serverTimezone=UTC&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSize=250&prepStmtCacheSqlLimit=2048
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
hikari:
pool-name: MainPool
maximum-pool-size: 20
minimum-idle: 5
idle-timeout: 600000 # 10 minutes
max-lifetime: 1800000 # 30 minutes
connection-timeout: 30000 # 30 seconds
keepalive-time: 60000 # 1 minute -- prevents AWS firewall from closing idle connections
validation-timeout: 5000
jpa:
open-in-view: false # CRITICAL: Do not hold DB connection for the entire HTTP request
hibernate:
ddl-auto: validate # Never use create/update in production
properties:
hibernate:
dialect: org.hibernate.dialect.MySQLDialect
jdbc:
batch_size: 50
fetch_size: 100
order_inserts: true
order_updates: true
batch_versioned_data: true
connection:
provider_disables_autocommit: true
query:
in_clause_parameter_padding: true
transaction:
default-timeout: 30 # seconds
logging:
level:
org.springframework.transaction: DEBUG # Enable in troubleshooting only
org.hibernate.SQL: WARN
com.zaxxer.hikari: INFOpom.xml Dependencies
<dependencies>
<!-- Spring Boot Starters -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- MySQL Connector -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- HikariCP (included in spring-boot-starter-data-jpa) -->
<!-- Redisson for distributed locking -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.25.2</version>
</dependency>
<!-- Spring Retry for optimistic lock retries -->
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
</dependency>
<!-- AWS SDK v2 for DynamoDB, SQS -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb-enhanced</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>Next: Part 4: AWS Production Configurations -- Infrastructure-level consistency configuration for RDS, Aurora, DynamoDB, and ElastiCache.
Part of the Consistency Models Demystified series
Stack: Java 17, Spring Boot 3.x, MySQL 8.0, AWS