← Back to Articles
6/6/2026Admin Post

consistency models part3 implementation

Consistency Models - Part 3: Java and Spring Boot Implementation

Navigation: Index | Part 1 | Part 2 | Part 3 | Part 4 | Part 5 | Part 6


Table of Contents

  1. Spring Boot Transaction Management Deep Dive
  2. Optimistic Locking with JPA @Version
  3. Pessimistic Locking with @Lock
  4. Read/Write Separation with AbstractRoutingDataSource
  5. The Outbox Pattern - Complete Implementation
  6. SAGA Pattern - Choreography-Based
  7. SAGA Pattern - Orchestration-Based
  8. Idempotency - Complete Implementation
  9. Distributed Locking with Redis (Redisson)
  10. Cache Consistency Patterns
  11. Fencing Tokens for Safe Writes
  12. Change Data Capture (CDC) Integration
  13. Production Configuration Reference

1. Spring Boot Transaction Management Deep Dive

The @Transactional Annotation

@Transactional is the cornerstone of consistency in Spring Boot. Understanding how it works internally is critical.

What happens internally:

  1. Spring creates a proxy around your class/method
  2. Before method execution: acquires connection, begins transaction
  3. Method executes
  4. On success: commits transaction, releases connection
  5. On exception: rolls back transaction, releases connection

Propagation Behaviors

@Service
public class OrderService {
 
    @Autowired
    private PaymentService paymentService;
 
    @Autowired
    private OrderRepository orderRepository;
 
    // REQUIRED (default): Join existing transaction or create new one
    @Transactional(propagation = Propagation.REQUIRED)
    public void createOrder(CreateOrderRequest request) {
        Order order = new Order(request);
        orderRepository.save(order);
        paymentService.processPayment(order); // joins THIS transaction
    }
 
    // REQUIRES_NEW: Always create a new independent transaction
    // Even if called from within an existing transaction
    // Use for audit logs, metrics -- should commit regardless of outer transaction
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public void logAuditEvent(AuditEvent event) {
        auditRepository.save(event);
        // This commits even if outer transaction rolls back
    }
 
    // MANDATORY: Must be called within an existing transaction, throw otherwise
    @Transactional(propagation = Propagation.MANDATORY)
    public void updateOrderStatus(Long orderId, OrderStatus status) {
        // Forces callers to manage transaction context explicitly
    }
 
    // NOT_SUPPORTED: Execute without transaction (suspends current if exists)
    // Use for read-heavy operations where you explicitly don't want transaction overhead
    @Transactional(propagation = Propagation.NOT_SUPPORTED)
    public List<Product> searchProducts(String query) {
        return productRepository.searchByKeyword(query);
    }
 
    // NEVER: Must NOT be called within a transaction
    @Transactional(propagation = Propagation.NEVER)
    public void longRunningReport() {
        // Explicitly prevents accidental transaction holding during long ops
    }
 
    // NESTED: Savepoint-based nested transaction (rollback within without rolling back outer)
    @Transactional(propagation = Propagation.NESTED)
    public void optionalStep(Long orderId) {
        // If this fails, outer transaction can continue from savepoint
    }
}

Isolation Levels in Production

@Service
public class AccountService {
 
    // READ_COMMITTED: Good default for most OLTP operations
    // Prevents dirty reads, allows non-repeatable reads
    @Transactional(isolation = Isolation.READ_COMMITTED)
    public AccountSummary getAccountSummary(Long accountId) {
        return accountRepository.findSummary(accountId);
    }
 
    // REPEATABLE_READ: Required when reading same data multiple times
    // MySQL default -- use when consistency within a transaction matters
    @Transactional(isolation = Isolation.REPEATABLE_READ)
    public void reconcileBalance(Long accountId) {
        BigDecimal balance1 = accountRepository.findBalance(accountId);
        // ... complex processing ...
        BigDecimal balance2 = accountRepository.findBalance(accountId);
        // balance1 == balance2 guaranteed (MVCC snapshot)
        if (!balance1.equals(balance2)) {
            throw new ReconciliationException("This should never happen!");
        }
    }
 
    // SERIALIZABLE: Only for the most critical operations
    // Prevents all anomalies including write skew
    @Transactional(isolation = Isolation.SERIALIZABLE)
    public void transferFunds(Long fromId, Long toId, BigDecimal amount) {
        Account from = accountRepository.findById(fromId)
            .orElseThrow(() -> new AccountNotFoundException(fromId));
        Account to = accountRepository.findById(toId)
            .orElseThrow(() -> new AccountNotFoundException(toId));
 
        if (from.getBalance().compareTo(amount) < 0) {
            throw new InsufficientFundsException(fromId, amount);
        }
 
        from.setBalance(from.getBalance().subtract(amount));
        to.setBalance(to.getBalance().add(amount));
 
        accountRepository.save(from);
        accountRepository.save(to);
    }
}

Transaction Timeout Configuration

// Prevent long-running transactions that cause undo log bloat
@Transactional(timeout = 30)  // 30 seconds max
public void processLargeOrder(LargeOrderRequest request) {
    // Will throw TransactionTimedOutException after 30s
}
# application.yml
spring:
  transaction:
    default-timeout: 30 # global default: 30 seconds

Common Pitfall: @Transactional in Same Class

@Service
public class UserService {
 
    // WRONG: calling transactional method from same class bypasses proxy
    public void createUserAndSendEmail(CreateUserRequest request) {
        createUser(request);           // @Transactional IGNORED -- no proxy
        emailService.sendWelcome(request.getEmail());
    }
 
    @Transactional
    public User createUser(CreateUserRequest request) {
        // This transaction is ONLY active when called from OUTSIDE the class
        return userRepository.save(new User(request));
    }
}
 
// CORRECT: Use self-injection or extract to separate service
@Service
public class UserService {
 
    @Autowired
    private UserService self;  // self-injection
 
    public void createUserAndSendEmail(CreateUserRequest request) {
        self.createUser(request);  // Goes through proxy, @Transactional works
        emailService.sendWelcome(request.getEmail());
    }
 
    @Transactional
    public User createUser(CreateUserRequest request) {
        return userRepository.save(new User(request));
    }
}

2. Optimistic Locking with JPA @Version

When to Use

Use optimistic locking when:

  • Read operations far outnumber write operations (reads don't acquire locks)
  • Conflicts are rare but possible
  • You prefer failing with an error over waiting (non-blocking behavior)
  • Best for: e-commerce (low contention), user profile updates, CMS systems

Complete Entity Setup

@Entity
@Table(name = "products")
@EntityListeners(AuditingEntityListener.class)
public class Product {
 
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
 
    @Column(nullable = false)
    private String name;
 
    @Column(nullable = false, precision = 12, scale = 2)
    private BigDecimal price;
 
    @Column(nullable = false)
    private Integer stockQuantity;
 
    @Version
    @Column(nullable = false)
    private Long version;  // Auto-managed by JPA; incremented on every update
 
    @CreatedDate
    private Instant createdAt;
 
    @LastModifiedDate
    private Instant updatedAt;
 
    public void reduceStock(int quantity) {
        if (this.stockQuantity < quantity) {
            throw new InsufficientStockException(this.id, this.stockQuantity, quantity);
        }
        this.stockQuantity -= quantity;
    }
    // constructor, getters, setters
}

Repository

@Repository
public interface ProductRepository extends JpaRepository<Product, Long> {
 
    @Query("SELECT p FROM Product p WHERE p.id = :id")
    Optional<Product> findById(@Param("id") Long id);
 
    // Useful for reporting without version conflicts
    @Lock(LockModeType.OPTIMISTIC)
    @Query("SELECT p FROM Product p WHERE p.id = :id")
    Optional<Product> findByIdOptimistic(@Param("id") Long id);
}

Service with Retry Logic

@Service
@Slf4j
public class InventoryService {
 
    @Autowired
    private ProductRepository productRepository;
 
    // Retry on OptimisticLockingFailureException -- another transaction won the race
    @Retryable(
        retryFor = OptimisticLockingFailureException.class,
        maxAttempts = 3,
        backoff = @Backoff(delay = 50, multiplier = 2, maxDelay = 500)
    )
    @Transactional
    public void reserveStock(Long productId, int quantity) {
        Product product = productRepository.findById(productId)
            .orElseThrow(() -> new ProductNotFoundException(productId));
 
        product.reduceStock(quantity);  // throws if insufficient
        productRepository.save(product);
        // JPA generates: UPDATE products SET stock_quantity=?, version=? WHERE id=? AND version=?
        // If version mismatch: throws OptimisticLockingFailureException
        log.debug("Reserved {} units of product {}. New stock: {}, version: {}",
            quantity, productId, product.getStockQuantity(), product.getVersion());
    }
 
    @Recover
    public void recoverFromLockConflict(OptimisticLockingFailureException ex,
                                         Long productId, int quantity) {
        log.error("Failed to reserve stock for product {} after 3 attempts: {}",
            productId, ex.getMessage());
        throw new StockReservationException("Could not reserve stock due to high contention", ex);
    }
}

What the SQL Looks Like

-- JPA-generated UPDATE with version check
UPDATE products
SET name = ?,
    price = ?,
    stock_quantity = ?,
    updated_at = ?,
    version = 11          -- new version (old + 1)
WHERE id = ?
  AND version = 10;       -- optimistic lock check
 
-- If 0 rows affected: another transaction updated it first --> OptimisticLockingFailureException

3. Pessimistic Locking with @Lock

When to Use

Use pessimistic locking when:

  • Conflicts are frequent (high-contention scenarios)
  • You cannot tolerate retries (e.g., payment processing where idempotency is complex)
  • Critical operations: bank transfers, inventory decrements for flash sales

Repository with Lock Types

@Repository
public interface AccountRepository extends JpaRepository<Account, Long> {
 
    // PESSIMISTIC_WRITE: SELECT ... FOR UPDATE
    // Exclusive lock -- no other transaction can read or write this row
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @QueryHints({
        @QueryHint(name = "javax.persistence.lock.timeout", value = "5000")  // 5 second timeout
    })
    @Query("SELECT a FROM Account a WHERE a.id = :id")
    Optional<Account> findByIdForUpdate(@Param("id") Long id);
 
    // PESSIMISTIC_READ: SELECT ... FOR SHARE (or LOCK IN SHARE MODE)
    // Shared lock -- others can read but not write
    @Lock(LockModeType.PESSIMISTIC_READ)
    @Query("SELECT a FROM Account a WHERE a.id = :id")
    Optional<Account> findByIdForRead(@Param("id") Long id);
 
    // Lock multiple accounts in one query (avoids N+1 lock acquisition)
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @Query("SELECT a FROM Account a WHERE a.id IN :ids ORDER BY a.id")
    List<Account> findByIdsForUpdate(@Param("ids") List<Long> ids);
}

Service with Deadlock Prevention

@Service
@Slf4j
public class BankTransferService {
 
    @Autowired
    private AccountRepository accountRepository;
 
    @Autowired
    private TransactionRecordRepository transactionRecordRepository;
 
    @Transactional(isolation = Isolation.READ_COMMITTED, timeout = 30)
    public TransferResult transfer(Long fromAccountId, Long toAccountId,
                                    BigDecimal amount, String reference) {
 
        // CRITICAL: Always acquire locks in consistent order (by ID) to prevent deadlock
        // If T1 locks account 5 then 10, and T2 locks account 10 then 5 --> deadlock
        // If both always lock the lower ID first --> no deadlock
        Long firstLockId = Math.min(fromAccountId, toAccountId);
        Long secondLockId = Math.max(fromAccountId, toAccountId);
 
        // Acquire both locks in order
        List<Long> sortedIds = List.of(firstLockId, secondLockId);
        List<Account> lockedAccounts = accountRepository.findByIdsForUpdate(sortedIds);
 
        if (lockedAccounts.size() != 2) {
            throw new AccountNotFoundException("One or both accounts not found");
        }
 
        Account from = lockedAccounts.stream()
            .filter(a -> a.getId().equals(fromAccountId)).findFirst().orElseThrow();
        Account to = lockedAccounts.stream()
            .filter(a -> a.getId().equals(toAccountId)).findFirst().orElseThrow();
 
        if (from.getBalance().compareTo(amount) < 0) {
            throw new InsufficientFundsException(fromAccountId, amount, from.getBalance());
        }
 
        from.debit(amount);
        to.credit(amount);
 
        accountRepository.save(from);
        accountRepository.save(to);
 
        // Record the transaction
        TransactionRecord record = TransactionRecord.builder()
            .fromAccountId(fromAccountId)
            .toAccountId(toAccountId)
            .amount(amount)
            .reference(reference)
            .status(TransactionStatus.COMPLETED)
            .executedAt(Instant.now())
            .build();
        transactionRecordRepository.save(record);
 
        log.info("Transfer completed: {} -> {} amount={} ref={}",
            fromAccountId, toAccountId, amount, reference);
 
        return new TransferResult(record.getId(), TransactionStatus.COMPLETED);
    }
}

Lock Timeout Configuration

# application.yml
spring:
  jpa:
    properties:
      javax:
        persistence:
          lock:
            timeout: 5000 # 5 seconds -- throw LockTimeoutException after this
-- MySQL: Set innodb lock wait timeout
SET SESSION innodb_lock_wait_timeout = 5;  -- 5 seconds

4. Read/Write Separation with AbstractRoutingDataSource

Why Read/Write Separation Matters

Aurora MySQL supports up to 15 read replicas. Routing read traffic to replicas:

  • Reduces load on the primary writer
  • Scales read throughput horizontally
  • But introduces eventual consistency (replica may lag 10-100ms)

DataSource Configuration

@Configuration
@Slf4j
public class DataSourceConfig {
 
    @Bean
    @ConfigurationProperties("spring.datasource.write")
    public DataSourceProperties writeDataSourceProperties() {
        return new DataSourceProperties();
    }
 
    @Bean
    @ConfigurationProperties("spring.datasource.read")
    public DataSourceProperties readDataSourceProperties() {
        return new DataSourceProperties();
    }
 
    @Bean("writeDataSource")
    @ConfigurationProperties("spring.datasource.write.hikari")
    public HikariDataSource writeDataSource(
            @Qualifier("writeDataSourceProperties") DataSourceProperties properties) {
        HikariDataSource ds = properties.initializeDataSourceBuilder()
            .type(HikariDataSource.class)
            .build();
        ds.setPoolName("WritePool");
        ds.setMaximumPoolSize(20);
        ds.setMinimumIdle(5);
        ds.setConnectionTimeout(3000);
        ds.setIdleTimeout(600000);
        ds.setMaxLifetime(1800000);
        return ds;
    }
 
    @Bean("readDataSource")
    @ConfigurationProperties("spring.datasource.read.hikari")
    public HikariDataSource readDataSource(
            @Qualifier("readDataSourceProperties") DataSourceProperties properties) {
        HikariDataSource ds = properties.initializeDataSourceBuilder()
            .type(HikariDataSource.class)
            .build();
        ds.setPoolName("ReadPool");
        ds.setMaximumPoolSize(40);  // More connections for read-heavy workloads
        ds.setMinimumIdle(10);
        ds.setReadOnly(true);       // Mark pool as read-only
        return ds;
    }
 
    @Primary
    @Bean("routingDataSource")
    public DataSource routingDataSource(
            @Qualifier("writeDataSource") DataSource writeDataSource,
            @Qualifier("readDataSource") DataSource readDataSource) {
 
        RoutingDataSource routingDataSource = new RoutingDataSource();
        Map<Object, Object> dataSources = new HashMap<>();
        dataSources.put(DataSourceType.WRITE, writeDataSource);
        dataSources.put(DataSourceType.READ, readDataSource);
 
        routingDataSource.setTargetDataSources(dataSources);
        routingDataSource.setDefaultTargetDataSource(writeDataSource);
        return routingDataSource;
    }
 
    // Wrap in LazyConnectionDataSourceProxy to defer connection acquisition
    // until actually needed -- avoids grabbing connections for @Transactional methods
    // that end up not needing the DB
    @Bean
    @Primary
    public DataSource dataSource(@Qualifier("routingDataSource") DataSource routing) {
        return new LazyConnectionDataSourceProxy(routing);
    }
}

Routing Logic

public enum DataSourceType {
    WRITE, READ
}
 
public class DataSourceContextHolder {
    private static final ThreadLocal<DataSourceType> context =
        new InheritableThreadLocal<>();
 
    public static void setDataSourceType(DataSourceType type) {
        context.set(type);
    }
 
    public static DataSourceType getDataSourceType() {
        return context.get();
    }
 
    public static void clearDataSourceType() {
        context.remove();
    }
}
 
public class RoutingDataSource extends AbstractRoutingDataSource {
 
    @Override
    protected Object determineCurrentLookupKey() {
        // Route to READ replica if:
        //   1. Explicitly set in context, OR
        //   2. Current transaction is read-only
        DataSourceType type = DataSourceContextHolder.getDataSourceType();
        if (type != null) {
            return type;
        }
        return TransactionSynchronizationManager.isCurrentTransactionReadOnly()
            ? DataSourceType.READ
            : DataSourceType.WRITE;
    }
}

Usage in Services

@Service
public class ProductService {
 
    @Autowired
    private ProductRepository productRepository;
 
    // Routes to READ replica automatically (readOnly = true)
    @Transactional(readOnly = true)
    public Product findProduct(Long id) {
        return productRepository.findById(id)
            .orElseThrow(() -> new ProductNotFoundException(id));
    }
 
    // Routes to WRITE primary (not readOnly)
    @Transactional
    public Product updateProduct(Long id, UpdateProductRequest request) {
        Product product = productRepository.findById(id)
            .orElseThrow(() -> new ProductNotFoundException(id));
        product.update(request);
        return productRepository.save(product);
    }
 
    // Force primary read even though it's read-only (for read-your-writes)
    @Transactional(readOnly = true)
    public Product findProductFresh(Long id) {
        // Override routing to always use primary
        DataSourceContextHolder.setDataSourceType(DataSourceType.WRITE);
        try {
            return productRepository.findById(id)
                .orElseThrow(() -> new ProductNotFoundException(id));
        } finally {
            DataSourceContextHolder.clearDataSourceType();
        }
    }
}

application.yml for Read/Write Separation

spring:
  datasource:
    write:
      url: jdbc:mysql://aurora-cluster-writer.cluster-xxxx.us-east-1.rds.amazonaws.com:3306/yourdb
      username: ${DB_WRITE_USERNAME}
      password: ${DB_WRITE_PASSWORD}
      driver-class-name: com.mysql.cj.jdbc.Driver
      hikari:
        pool-name: WritePool
        maximum-pool-size: 20
        minimum-idle: 5
        connection-timeout: 3000
 
    read:
      url: jdbc:mysql://aurora-cluster-reader.cluster-ro-xxxx.us-east-1.rds.amazonaws.com:3306/yourdb
      username: ${DB_READ_USERNAME}
      password: ${DB_READ_PASSWORD}
      driver-class-name: com.mysql.cj.jdbc.Driver
      hikari:
        pool-name: ReadPool
        maximum-pool-size: 40
        minimum-idle: 10
        connection-timeout: 3000

5. The Outbox Pattern - Complete Implementation

Why the Outbox Pattern

The dual-write problem: You update your database AND publish an event to Kafka. These are two separate systems. If:

  • DB commit succeeds, Kafka publish fails: event lost, downstream services never notified
  • Kafka publish succeeds, DB commit fails: phantom events for data that doesn't exist

The Outbox Pattern: Write the event to a DB table (outbox) in the same transaction as your business data. A separate process reads the outbox and publishes to Kafka. Atomicity guaranteed at DB level.

Schema

CREATE TABLE outbox_events (
    id           VARCHAR(36)  NOT NULL PRIMARY KEY,
    aggregate_type VARCHAR(100) NOT NULL,   -- e.g., ORDER, USER, PAYMENT
    aggregate_id VARCHAR(100) NOT NULL,     -- e.g., order ID
    event_type   VARCHAR(200) NOT NULL,     -- e.g., ORDER_CREATED
    payload      TEXT         NOT NULL,     -- JSON event body
    status       ENUM('PENDING', 'PROCESSING', 'SENT', 'FAILED') NOT NULL DEFAULT 'PENDING',
    retry_count  INT          NOT NULL DEFAULT 0,
    created_at   DATETIME(3)  NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    processed_at DATETIME(3),
    INDEX idx_outbox_status_created (status, created_at),
    INDEX idx_outbox_aggregate (aggregate_type, aggregate_id)
);

Entity and Repository

@Entity
@Table(name = "outbox_events")
@Getter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OutboxEvent {
 
    @Id
    private String id;
 
    @Column(nullable = false, length = 100)
    private String aggregateType;
 
    @Column(nullable = false, length = 100)
    private String aggregateId;
 
    @Column(nullable = false, length = 200)
    private String eventType;
 
    @Column(nullable = false, columnDefinition = "TEXT")
    private String payload;
 
    @Column(nullable = false)
    @Enumerated(EnumType.STRING)
    private OutboxStatus status;
 
    @Column(nullable = false)
    private Integer retryCount;
 
    @Column(nullable = false)
    private Instant createdAt;
 
    private Instant processedAt;
}
 
@Repository
public interface OutboxEventRepository extends JpaRepository<OutboxEvent, String> {
 
    // Fetch pending events in order, with a limit -- avoids large result sets
    @Query("SELECT e FROM OutboxEvent e WHERE e.status = 'PENDING' " +
           "ORDER BY e.createdAt ASC")
    List<OutboxEvent> findPendingEvents(Pageable pageable);
 
    // For retry: fetch failed events within retry limit
    @Query("SELECT e FROM OutboxEvent e WHERE e.status = 'FAILED' " +
           "AND e.retryCount < :maxRetries ORDER BY e.createdAt ASC")
    List<OutboxEvent> findRetryableEvents(@Param("maxRetries") int maxRetries, Pageable pageable);
}

Service Layer - Writing to Outbox

@Service
@RequiredArgsConstructor
@Slf4j
public class OrderService {
 
    private final OrderRepository orderRepository;
    private final OutboxEventRepository outboxEventRepository;
    private final ObjectMapper objectMapper;
 
    @Transactional  // Both order save and outbox write happen atomically
    public Order createOrder(CreateOrderRequest request) {
        Order order = Order.builder()
            .customerId(request.getCustomerId())
            .items(request.getItems())
            .totalAmount(calculateTotal(request.getItems()))
            .status(OrderStatus.PENDING)
            .createdAt(Instant.now())
            .build();
 
        orderRepository.save(order);
 
        // Write event to outbox IN SAME TRANSACTION
        publishOrderCreatedEvent(order);
 
        log.info("Order {} created, outbox event enqueued", order.getId());
        return order;
    }
 
    @Transactional
    public Order confirmOrder(Long orderId) {
        Order order = orderRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));
 
        order.setStatus(OrderStatus.CONFIRMED);
        orderRepository.save(order);
 
        publishOrderConfirmedEvent(order);
        return order;
    }
 
    private void publishOrderCreatedEvent(Order order) {
        try {
            OrderCreatedEventPayload payload = OrderCreatedEventPayload.builder()
                .orderId(order.getId())
                .customerId(order.getCustomerId())
                .totalAmount(order.getTotalAmount())
                .items(order.getItems())
                .build();
 
            OutboxEvent event = OutboxEvent.builder()
                .id(UUID.randomUUID().toString())
                .aggregateType("ORDER")
                .aggregateId(order.getId().toString())
                .eventType("ORDER_CREATED")
                .payload(objectMapper.writeValueAsString(payload))
                .status(OutboxStatus.PENDING)
                .retryCount(0)
                .createdAt(Instant.now())
                .build();
 
            outboxEventRepository.save(event);
 
        } catch (JsonProcessingException e) {
            throw new EventSerializationException("Failed to serialize ORDER_CREATED event", e);
        }
    }
 
    private void publishOrderConfirmedEvent(Order order) {
        // Similar pattern...
    }
}

Outbox Publisher - Polling-Based

@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxEventPublisher {
 
    private final OutboxEventRepository outboxEventRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final OutboxTopicMapper topicMapper;
 
    private static final int BATCH_SIZE = 50;
    private static final int MAX_RETRIES = 5;
 
    // Runs every second, processes pending events
    @Scheduled(fixedDelay = 1000, initialDelay = 5000)
    public void publishPendingEvents() {
        processBatch(outboxEventRepository.findPendingEvents(
            PageRequest.of(0, BATCH_SIZE)));
        processBatch(outboxEventRepository.findRetryableEvents(
            MAX_RETRIES, PageRequest.of(0, BATCH_SIZE)));
    }
 
    @Transactional
    public void processBatch(List<OutboxEvent> events) {
        for (OutboxEvent event : events) {
            processEvent(event);
        }
    }
 
    private void processEvent(OutboxEvent event) {
        // Mark as PROCESSING to prevent concurrent publishers picking it up
        event.setStatus(OutboxStatus.PROCESSING);
        outboxEventRepository.save(event);
 
        try {
            String topic = topicMapper.getTopic(event.getAggregateType(), event.getEventType());
 
            // Use aggregate ID as Kafka partition key for ordering
            kafkaTemplate.send(topic, event.getAggregateId(), event.getPayload())
                .get(5, TimeUnit.SECONDS);  // synchronous -- wait for ack
 
            event.setStatus(OutboxStatus.SENT);
            event.setProcessedAt(Instant.now());
            outboxEventRepository.save(event);
 
            log.debug("Published {} event for {} {}",
                event.getEventType(), event.getAggregateType(), event.getAggregateId());
 
        } catch (Exception e) {
            log.error("Failed to publish event {} of type {}: {}",
                event.getId(), event.getEventType(), e.getMessage());
 
            event.setRetryCount(event.getRetryCount() + 1);
            if (event.getRetryCount() >= MAX_RETRIES) {
                event.setStatus(OutboxStatus.FAILED);
                log.error("Outbox event {} permanently failed after {} retries",
                    event.getId(), MAX_RETRIES);
                // Alert on this! Something needs manual intervention
                alertService.sendOutboxFailureAlert(event);
            } else {
                event.setStatus(OutboxStatus.PENDING);  // Will be retried
            }
            outboxEventRepository.save(event);
        }
    }
}

Cleanup Old Sent Events

@Component
@RequiredArgsConstructor
public class OutboxCleanupJob {
 
    private final OutboxEventRepository outboxEventRepository;
 
    // Run daily -- clean up sent events older than 7 days
    @Scheduled(cron = "0 0 2 * * *")  // 2 AM daily
    @Transactional
    public void cleanupSentEvents() {
        Instant cutoff = Instant.now().minus(Duration.ofDays(7));
        outboxEventRepository.deleteByStatusAndProcessedAtBefore(OutboxStatus.SENT, cutoff);
    }
}

6. SAGA Pattern - Choreography-Based

What Is a SAGA?

A SAGA is a sequence of local transactions that together form a distributed transaction. Each local transaction updates a single service's database and publishes an event. If any step fails, compensating transactions are executed to undo previous steps.

Choreography: No central coordinator. Services react to events and emit new events.

Order Processing SAGA Example

[Order Service]       [Inventory Service]      [Payment Service]
     |                       |                       |
  Create Order               |                       |
     |-- ORDER_CREATED ------>|                       |
     |               Reserve Inventory               |
     |                       |-- INVENTORY_RESERVED ->|
     |                       |               Process Payment
     |<----------------------|<-- PAYMENT_PROCESSED --|
  Confirm Order              |

Implementation

// ORDER SERVICE
 
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderSagaService {
 
    private final OrderRepository orderRepository;
    private final OutboxEventRepository outboxEventRepository;
    private final ObjectMapper objectMapper;
 
    @Transactional
    public Order startOrderSaga(CreateOrderRequest request) {
        Order order = Order.builder()
            .customerId(request.getCustomerId())
            .items(request.getItems())
            .totalAmount(calculateTotal(request.getItems()))
            .status(OrderStatus.PENDING)
            .build();
        orderRepository.save(order);
 
        // Trigger inventory reservation via outbox
        emitEvent("ORDER", order.getId().toString(), "ORDER_CREATED",
            new OrderCreatedEvent(order.getId(), order.getItems(), order.getTotalAmount()));
 
        return order;
    }
 
    // Step 3: Inventory reserved -- proceed to payment
    @KafkaListener(topics = "inventory-events", groupId = "order-service")
    @Transactional
    public void handleInventoryEvent(ConsumerRecord<String, String> record) throws Exception {
        BaseEvent event = objectMapper.readValue(record.value(), BaseEvent.class);
 
        if ("INVENTORY_RESERVED".equals(event.getType())) {
            InventoryReservedEvent payload = objectMapper.readValue(
                record.value(), InventoryReservedEvent.class);
            onInventoryReserved(payload);
        } else if ("INVENTORY_RESERVATION_FAILED".equals(event.getType())) {
            InventoryFailedEvent payload = objectMapper.readValue(
                record.value(), InventoryFailedEvent.class);
            onInventoryReservationFailed(payload);
        }
    }
 
    private void onInventoryReserved(InventoryReservedEvent event) {
        Order order = orderRepository.findById(event.getOrderId()).orElseThrow();
        order.setStatus(OrderStatus.INVENTORY_RESERVED);
        orderRepository.save(order);
 
        // Trigger payment
        emitEvent("ORDER", order.getId().toString(), "PAYMENT_REQUESTED",
            new PaymentRequestedEvent(order.getId(), order.getCustomerId(),
                order.getTotalAmount()));
    }
 
    private void onInventoryReservationFailed(InventoryFailedEvent event) {
        Order order = orderRepository.findById(event.getOrderId()).orElseThrow();
        order.setStatus(OrderStatus.FAILED);
        order.setFailureReason("Inventory not available: " + event.getReason());
        orderRepository.save(order);
        // No compensation needed -- nothing was reserved
    }
 
    @KafkaListener(topics = "payment-events", groupId = "order-service")
    @Transactional
    public void handlePaymentEvent(ConsumerRecord<String, String> record) throws Exception {
        BaseEvent event = objectMapper.readValue(record.value(), BaseEvent.class);
 
        if ("PAYMENT_PROCESSED".equals(event.getType())) {
            PaymentProcessedEvent payload = objectMapper.readValue(
                record.value(), PaymentProcessedEvent.class);
            onPaymentProcessed(payload);
        } else if ("PAYMENT_FAILED".equals(event.getType())) {
            PaymentFailedEvent payload = objectMapper.readValue(
                record.value(), PaymentFailedEvent.class);
            onPaymentFailed(payload);
        }
    }
 
    private void onPaymentProcessed(PaymentProcessedEvent event) {
        Order order = orderRepository.findById(event.getOrderId()).orElseThrow();
        order.setStatus(OrderStatus.CONFIRMED);
        orderRepository.save(order);
        log.info("Order {} successfully completed", event.getOrderId());
    }
 
    private void onPaymentFailed(PaymentFailedEvent event) {
        Order order = orderRepository.findById(event.getOrderId()).orElseThrow();
        order.setStatus(OrderStatus.PAYMENT_FAILED);
        orderRepository.save(order);
 
        // COMPENSATING TRANSACTION: Release the reserved inventory
        emitEvent("ORDER", order.getId().toString(), "INVENTORY_RELEASE_REQUESTED",
            new InventoryReleaseEvent(order.getId(), order.getItems(),
                "Payment failed: " + event.getFailureReason()));
    }
 
    private void emitEvent(String aggregateType, String aggregateId,
                            String eventType, Object payload) {
        try {
            OutboxEvent outboxEvent = OutboxEvent.builder()
                .id(UUID.randomUUID().toString())
                .aggregateType(aggregateType)
                .aggregateId(aggregateId)
                .eventType(eventType)
                .payload(objectMapper.writeValueAsString(payload))
                .status(OutboxStatus.PENDING)
                .retryCount(0)
                .createdAt(Instant.now())
                .build();
            outboxEventRepository.save(outboxEvent);
        } catch (JsonProcessingException e) {
            throw new EventSerializationException("Failed to serialize " + eventType, e);
        }
    }
}

7. SAGA Pattern - Orchestration-Based

Orchestration vs Choreography

AspectChoreographyOrchestration
CoordinationServices react to eventsCentral orchestrator directs
CouplingLoose -- services don't know each otherOrchestrator knows all steps
VisibilityHard to track overall saga stateSaga state is centralized
Failure handlingDistributed -- each service handlesCentralized in orchestrator
Recommended forSimple flows (2-3 steps)Complex flows (5+ steps)

Orchestration Implementation

@Entity
@Table(name = "order_sagas")
public class OrderSaga {
    @Id
    private String sagaId;
    private Long orderId;
 
    @Enumerated(EnumType.STRING)
    private OrderSagaState state;
 
    private String failureReason;
    private Instant startedAt;
    private Instant completedAt;
    private Integer compensationStep;  // which compensation step we're on
}
 
public enum OrderSagaState {
    STARTED,
    INVENTORY_RESERVING,
    INVENTORY_RESERVED,
    PAYMENT_PROCESSING,
    COMPLETED,
    COMPENSATING_INVENTORY,
    COMPENSATING_PAYMENT,
    FAILED
}
 
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderSagaOrchestrator {
 
    private final OrderSagaRepository sagaRepository;
    private final OrderRepository orderRepository;
    private final InventoryServiceClient inventoryClient;  // Feign or RestTemplate
    private final PaymentServiceClient paymentClient;
    private final ApplicationEventPublisher eventPublisher;
 
    @Transactional
    public String startSaga(Long orderId) {
        Order order = orderRepository.findById(orderId).orElseThrow();
 
        OrderSaga saga = OrderSaga.builder()
            .sagaId(UUID.randomUUID().toString())
            .orderId(orderId)
            .state(OrderSagaState.STARTED)
            .startedAt(Instant.now())
            .build();
        sagaRepository.save(saga);
 
        // Step 1: Reserve inventory
        transitionTo(saga, OrderSagaState.INVENTORY_RESERVING);
        inventoryClient.reserve(new ReserveRequest(saga.getSagaId(), order.getItems()));
 
        return saga.getSagaId();
    }
 
    @Transactional
    public void onInventoryReserved(String sagaId) {
        OrderSaga saga = sagaRepository.findBySagaId(sagaId).orElseThrow();
        if (saga.getState() != OrderSagaState.INVENTORY_RESERVING) {
            log.warn("Unexpected state {} for saga {}", saga.getState(), sagaId);
            return;
        }
 
        Order order = orderRepository.findById(saga.getOrderId()).orElseThrow();
        transitionTo(saga, OrderSagaState.INVENTORY_RESERVED);
 
        // Step 2: Process payment
        transitionTo(saga, OrderSagaState.PAYMENT_PROCESSING);
        paymentClient.process(new PaymentRequest(sagaId, order.getCustomerId(),
            order.getTotalAmount()));
    }
 
    @Transactional
    public void onPaymentProcessed(String sagaId) {
        OrderSaga saga = sagaRepository.findBySagaId(sagaId).orElseThrow();
        Order order = orderRepository.findById(saga.getOrderId()).orElseThrow();
 
        order.setStatus(OrderStatus.CONFIRMED);
        orderRepository.save(order);
 
        transitionTo(saga, OrderSagaState.COMPLETED);
        saga.setCompletedAt(Instant.now());
        sagaRepository.save(saga);
 
        log.info("Saga {} completed successfully for order {}", sagaId, saga.getOrderId());
    }
 
    @Transactional
    public void onPaymentFailed(String sagaId, String reason) {
        OrderSaga saga = sagaRepository.findBySagaId(sagaId).orElseThrow();
        saga.setFailureReason(reason);
 
        // Start compensation
        transitionTo(saga, OrderSagaState.COMPENSATING_INVENTORY);
        inventoryClient.release(new ReleaseRequest(sagaId, "Payment failed: " + reason));
    }
 
    @Transactional
    public void onInventoryReleased(String sagaId) {
        OrderSaga saga = sagaRepository.findBySagaId(sagaId).orElseThrow();
        Order order = orderRepository.findById(saga.getOrderId()).orElseThrow();
 
        order.setStatus(OrderStatus.FAILED);
        order.setFailureReason(saga.getFailureReason());
        orderRepository.save(order);
 
        transitionTo(saga, OrderSagaState.FAILED);
        saga.setCompletedAt(Instant.now());
        sagaRepository.save(saga);
    }
 
    private void transitionTo(OrderSaga saga, OrderSagaState newState) {
        log.info("Saga {} transitioning from {} to {}", saga.getSagaId(), saga.getState(), newState);
        saga.setState(newState);
        sagaRepository.save(saga);
    }
}

8. Idempotency - Complete Implementation

Why Idempotency Is Critical for Consistency

In distributed systems, operations may be retried due to:

  • Network timeouts
  • Service restarts during processing
  • Load balancer retries
  • Client retry logic

Without idempotency, retried operations cause duplicate effects (double charges, duplicate orders, etc.).

Database Schema for Idempotency Keys

CREATE TABLE idempotency_keys (
    idempotency_key  VARCHAR(100) NOT NULL PRIMARY KEY,
    request_hash     VARCHAR(64)  NOT NULL,   -- SHA-256 of request body
    operation_type   VARCHAR(100) NOT NULL,
    status           ENUM('PROCESSING', 'COMPLETED', 'FAILED') NOT NULL,
    response_body    TEXT,                    -- Cached response for duplicate requests
    response_status  INT,
    created_at       DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    completed_at     DATETIME(3),
    expires_at       DATETIME(3) NOT NULL,
    INDEX idx_expires (expires_at)
);

Idempotency Filter (HTTP)

@Component
@RequiredArgsConstructor
@Slf4j
public class IdempotencyFilter extends OncePerRequestFilter {
 
    private final IdempotencyKeyRepository idempotencyKeyRepository;
    private final ObjectMapper objectMapper;
    private static final String IDEMPOTENCY_KEY_HEADER = "Idempotency-Key";
 
    @Override
    protected void doFilterInternal(HttpServletRequest request,
                                     HttpServletResponse response,
                                     FilterChain filterChain)
            throws ServletException, IOException {
 
        // Only apply to mutating operations
        if (!isMutatingMethod(request.getMethod())) {
            filterChain.doFilter(request, response);
            return;
        }
 
        String idempotencyKey = request.getHeader(IDEMPOTENCY_KEY_HEADER);
        if (idempotencyKey == null || idempotencyKey.isBlank()) {
            response.setStatus(HttpStatus.BAD_REQUEST.value());
            writeError(response, "Idempotency-Key header is required");
            return;
        }
 
        // Cache the request body for hashing
        ContentCachingRequestWrapper wrappedRequest = new ContentCachingRequestWrapper(request);
 
        // Check if key already exists
        Optional<IdempotencyRecord> existing =
            idempotencyKeyRepository.findByKey(idempotencyKey);
 
        if (existing.isPresent()) {
            IdempotencyRecord record = existing.get();
 
            if (record.getStatus() <mark class="obsidian-highlight"> IdempotencyStatus.PROCESSING) {
                // Still processing -- return 409 Conflict
                response.setStatus(HttpStatus.CONFLICT.value());
                writeError(response, "Request is still processing");
                return;
            }
 
            if (record.getStatus() </mark> IdempotencyStatus.COMPLETED) {
                // Already processed -- return cached response
                log.debug("Returning cached response for idempotency key: {}", idempotencyKey);
                response.setStatus(record.getResponseStatus());
                response.setContentType("application/json");
                response.getWriter().write(record.getResponseBody());
                return;
            }
        }
 
        // Mark as processing
        IdempotencyRecord record = IdempotencyRecord.builder()
            .idempotencyKey(idempotencyKey)
            .operationType(request.getRequestURI())
            .status(IdempotencyStatus.PROCESSING)
            .createdAt(Instant.now())
            .expiresAt(Instant.now().plus(Duration.ofHours(24)))
            .build();
 
        try {
            idempotencyKeyRepository.save(record);
        } catch (DataIntegrityViolationException e) {
            // Race condition: another request just inserted the same key
            response.setStatus(HttpStatus.CONFLICT.value());
            writeError(response, "Duplicate request processing");
            return;
        }
 
        // Capture response
        ContentCachingResponseWrapper wrappedResponse =
            new ContentCachingResponseWrapper(response);
 
        filterChain.doFilter(wrappedRequest, wrappedResponse);
 
        // Store response for future duplicate requests
        String responseBody = new String(wrappedResponse.getContentAsByteArray(),
            StandardCharsets.UTF_8);
        record.setStatus(IdempotencyStatus.COMPLETED);
        record.setResponseBody(responseBody);
        record.setResponseStatus(wrappedResponse.getStatus());
        record.setCompletedAt(Instant.now());
        idempotencyKeyRepository.save(record);
 
        wrappedResponse.copyBodyToResponse();
    }
 
    private boolean isMutatingMethod(String method) {
        return "POST".equals(method) || "PUT".equals(method) || "PATCH".equals(method);
    }
 
    private void writeError(HttpServletResponse response, String message) throws IOException {
        response.setContentType("application/json");
        response.getWriter().write("{\"error\":\"" + message + "\"}");
    }
}

Service-Level Idempotency

@Service
@RequiredArgsConstructor
public class PaymentService {
 
    private final PaymentRepository paymentRepository;
    private final ExternalPaymentGateway paymentGateway;
 
    @Transactional
    public PaymentResult processPayment(String idempotencyKey, PaymentRequest request) {
        // Check for existing payment with this key
        Optional<Payment> existing = paymentRepository
            .findByIdempotencyKey(idempotencyKey);
 
        if (existing.isPresent()) {
            log.info("Returning existing payment for idempotency key: {}", idempotencyKey);
            return PaymentResult.from(existing.get());  // Return same result
        }
 
        // Process new payment
        ExternalPaymentResponse externalResult = paymentGateway.charge(
            request.getCustomerId(),
            request.getAmount(),
            request.getCurrency()
        );
 
        Payment payment = Payment.builder()
            .idempotencyKey(idempotencyKey)
            .customerId(request.getCustomerId())
            .amount(request.getAmount())
            .status(externalResult.isSuccess() ? PaymentStatus.SUCCESS : PaymentStatus.FAILED)
            .externalTransactionId(externalResult.getTransactionId())
            .processedAt(Instant.now())
            .build();
 
        paymentRepository.save(payment);
        return PaymentResult.from(payment);
    }
}

9. Distributed Locking with Redis (Redisson)

Why Distributed Locks

When multiple instances of your Spring Boot application run concurrently, a local synchronized block is not enough. You need a lock that spans all instances.

Use cases: Preventing duplicate scheduled job execution, protecting critical sections across pods.

Redisson Configuration

@Configuration
public class RedissonConfig {
 
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        // Cluster mode for production
        config.useClusterServers()
            .setScanInterval(2000)
            .addNodeAddress(
                "redis://redis-node-1.your-cluster.cache.amazonaws.com:6379",
                "redis://redis-node-2.your-cluster.cache.amazonaws.com:6379",
                "redis://redis-node-3.your-cluster.cache.amazonaws.com:6379"
            )
            .setPassword(System.getenv("REDIS_PASSWORD"))
            .setConnectTimeout(3000)
            .setTimeout(3000)
            .setRetryAttempts(3)
            .setRetryInterval(1500);
        return Redisson.create(config);
    }
}

Usage in Service

@Service
@RequiredArgsConstructor
@Slf4j
public class PaymentProcessingService {
 
    private final RedissonClient redissonClient;
    private final PaymentRepository paymentRepository;
 
    public PaymentResult processPaymentExclusively(String orderId,
                                                    PaymentRequest request) {
        // Distributed lock key scoped to the specific order
        String lockKey = "payment:lock:order:" + orderId;
        RLock lock = redissonClient.getLock(lockKey);
 
        try {
            // Try to acquire lock -- wait up to 10 seconds, hold for up to 30 seconds
            boolean acquired = lock.tryLock(10, 30, TimeUnit.SECONDS);
 
            if (!acquired) {
                throw new PaymentConcurrencyException(
                    "Another payment is being processed for order " + orderId);
            }
 
            return doProcessPayment(orderId, request);
 
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PaymentProcessingException("Lock acquisition interrupted", e);
        } finally {
            // Always release the lock -- even on exception
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
                log.debug("Released lock for order {}", orderId);
            }
        }
    }
 
    @Transactional
    private PaymentResult doProcessPayment(String orderId, PaymentRequest request) {
        // Check for duplicate (belt and suspenders with the lock)
        if (paymentRepository.existsByOrderId(orderId)) {
            return PaymentResult.fromExisting(paymentRepository.findByOrderId(orderId));
        }
        // ... actual payment processing
    }
}

10. Cache Consistency Patterns

Cache-Aside (Lazy Loading) -- Most Common

Read: Check cache first. If miss, read from DB, populate cache, return.
Write: Write to DB. Invalidate or update cache.

@Service
@RequiredArgsConstructor
public class ProductCatalogService {
 
    private final ProductRepository productRepository;
    private final RedisTemplate<String, Product> redisTemplate;
    private static final Duration CACHE_TTL = Duration.ofMinutes(15);
 
    public Product getProduct(Long productId) {
        String cacheKey = "product:" + productId;
 
        // 1. Try cache first
        Product cached = redisTemplate.opsForValue().get(cacheKey);
        if (cached != null) {
            return cached;
        }
 
        // 2. Cache miss -- read from database
        Product product = productRepository.findById(productId)
            .orElseThrow(() -> new ProductNotFoundException(productId));
 
        // 3. Populate cache with TTL
        redisTemplate.opsForValue().set(cacheKey, product, CACHE_TTL);
 
        return product;
    }
 
    @Transactional
    public Product updateProduct(Long productId, UpdateProductRequest request) {
        Product product = productRepository.findById(productId).orElseThrow();
        product.update(request);
        productRepository.save(product);
 
        // Invalidate cache -- next read will fetch fresh from DB
        redisTemplate.delete("product:" + productId);
 
        return product;
    }
}

Write-Through -- Strong Consistency for Writes

Write to cache and DB simultaneously (synchronously). Every write is guaranteed to update both.

@Service
@RequiredArgsConstructor
public class UserSessionService {
 
    private final UserRepository userRepository;
    private final RedisTemplate<String, UserSession> redisTemplate;
 
    @Transactional
    public UserSession updateUserSession(String userId, SessionUpdateRequest request) {
        // 1. Write to database
        User user = userRepository.findById(userId).orElseThrow();
        user.updateSession(request);
        userRepository.save(user);
 
        // 2. Simultaneously write to cache (write-through)
        UserSession session = UserSession.from(user);
        redisTemplate.opsForValue().set(
            "session:" + userId,
            session,
            Duration.ofHours(2)
        );
 
        // If cache write fails, should we roll back DB write?
        // Option 1: Accept temporary inconsistency (cache will expire)
        // Option 2: Throw exception and let @Transactional roll back DB write
 
        return session;
    }
}

Write-Behind (Write-Back) -- High Performance

Write to cache immediately. Asynchronously flush to DB. Highest performance but risk of data loss.

@Service
@RequiredArgsConstructor
@Slf4j
public class CounterService {
 
    private final RedisTemplate<String, Long> redisTemplate;
    private final CounterRepository counterRepository;
 
    // Increment counter in cache -- very fast
    public Long increment(String counterKey) {
        String redisKey = "counter:" + counterKey;
        return redisTemplate.opsForValue().increment(redisKey);
    }
 
    // Flush dirty counters to DB periodically (write-behind)
    @Scheduled(fixedDelay = 5000)  // Every 5 seconds
    public void flushCountersToDB() {
        Set<String> dirtyCounters = redisTemplate.keys("counter:*");
        if (dirtyCounters == null || dirtyCounters.isEmpty()) {
            return;
        }
        for (String key : dirtyCounters) {
            try {
                Long value = redisTemplate.opsForValue().get(key);
                if (value != null) {
                    String counterKey = key.substring("counter:".length());
                    counterRepository.upsertCounter(counterKey, value);
                }
            } catch (Exception e) {
                log.error("Failed to flush counter {}: {}", key, e.getMessage());
            }
        }
    }
 
    // WARNING: Data between flushes is in memory only.
    // If Redis crashes before flush, counter increments since last flush are LOST.
    // Only use for non-critical counters (view counts, like counts).
}

Cache Stampede Prevention (Probabilistic Early Expiry)

@Service
@RequiredArgsConstructor
public class TrendingProductService {
 
    private final ProductRepository productRepository;
    private final RedisTemplate<String, CachedProduct> redisTemplate;
 
    // Probabilistic early expiry: recalculate before actual expiry
    // to avoid all clients hitting DB simultaneously at expiry time
    public List<Product> getTrendingProducts() {
        String key = "trending:products";
        CachedProduct cached = redisTemplate.opsForValue().get(key);
 
        if (cached != null && !shouldRefreshEarly(cached)) {
            return cached.getProducts();
        }
 
        // Refresh from DB (with lock to prevent stampede)
        RLock lock = redissonClient.getLock("trending:lock");
        try {
            if (lock.tryLock(0, 5, TimeUnit.SECONDS)) {
                // Double-check after acquiring lock
                cached = redisTemplate.opsForValue().get(key);
                if (cached != null && !shouldRefreshEarly(cached)) {
                    return cached.getProducts();
                }
                // Refresh
                List<Product> products = productRepository.findTrending();
                redisTemplate.opsForValue().set(key,
                    new CachedProduct(products, Instant.now(), Duration.ofMinutes(10)),
                    Duration.ofMinutes(10));
                return products;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            if (lock.isHeldByCurrentThread()) lock.unlock();
        }
        // Could not get lock -- return stale if available
        return cached != null ? cached.getProducts() : productRepository.findTrending();
    }
 
    // XFetch algorithm: probabilistic early refresh
    private boolean shouldRefreshEarly(CachedProduct cached) {
        double beta = 1.0;  // tuning parameter (higher = more aggressive prefetch)
        long ttlSeconds = cached.getTtl().getSeconds();
        long ageSeconds = Duration.between(cached.getCachedAt(), Instant.now()).getSeconds();
        long remainingSeconds = ttlSeconds - ageSeconds;
        return remainingSeconds <= beta * Math.log(Math.random() + 1e-10) * (-ttlSeconds);
    }
}

11. Fencing Tokens for Safe Writes

Fencing tokens prevent "zombie writers" -- processes that believe they hold a lock (because they were paused) but the lock has been given to another process.

@Entity
@Table(name = "distributed_locks")
public class DistributedLock {
    @Id
    private String resourceId;
 
    private String ownerId;
 
    @Version  // Used as fencing token
    private Long fencingToken;
 
    private Instant acquiredAt;
    private Instant expiresAt;
}
 
@Service
@RequiredArgsConstructor
public class FencedWriteService {
 
    private final DistributedLockRepository lockRepository;
    private final ResourceRepository resourceRepository;
 
    @Transactional
    public LockResult acquireLock(String resourceId, String ownerId) {
        DistributedLock lock = lockRepository.findById(resourceId)
            .orElse(new DistributedLock(resourceId));
 
        // Check if current lock is expired
        if (lock.getOwnerId() != null && lock.getExpiresAt().isAfter(Instant.now())) {
            throw new LockAlreadyHeldException(resourceId, lock.getOwnerId());
        }
 
        lock.setOwnerId(ownerId);
        lock.setAcquiredAt(Instant.now());
        lock.setExpiresAt(Instant.now().plus(Duration.ofSeconds(30)));
        DistributedLock saved = lockRepository.save(lock);  // increments @Version
 
        return new LockResult(resourceId, ownerId, saved.getFencingToken());
    }
 
    // The fencing token is included in every write operation
    // If the token doesn't match what's in DB (because lock was re-acquired), write is rejected
    @Transactional
    public void writeWithFence(String resourceId, String ownerId,
                                Long fencingToken, ResourceUpdate update) {
        DistributedLock lock = lockRepository.findById(resourceId)
            .orElseThrow(() -> new LockNotFoundException(resourceId));
 
        // Verify we still hold the lock and token matches
        if (!ownerId.equals(lock.getOwnerId())) {
            throw new LockExpiredException("Lock was taken by " + lock.getOwnerId());
        }
        if (!fencingToken.equals(lock.getFencingToken())) {
            throw new StaleTokenException("Fencing token is stale. Expected " +
                lock.getFencingToken() + " but got " + fencingToken);
        }
 
        // Safe to write -- we verified our token is current
        resourceRepository.update(resourceId, update);
    }
}

12. Change Data Capture (CDC) Integration

CDC reads the database binary log to capture every change and publish it as an event. This is the production-grade alternative to the polling-based outbox.

Debezium + Kafka (Production Setup)

# debezium-connector-config.json (Kafka Connect configuration)
# Deploy via Kafka Connect REST API
{
  "name": "orders-mysql-connector",
  "config":
    {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "database.hostname": "aurora-cluster.cluster-xxxx.us-east-1.rds.amazonaws.com",
      "database.port": "3306",
      "database.user": "debezium_user",
      "database.password": "${DEBEZIUM_PASSWORD}",
      "database.server.id": "1",
      "topic.prefix": "myapp",
      "database.include.list": "orders_db",
      "table.include.list": "orders_db.orders,orders_db.outbox_events",
      "include.schema.changes": "false",
      "transforms": "outbox",
      "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
      "transforms.outbox.table.field.event.id": "id",
      "transforms.outbox.table.field.event.key": "aggregate_id",
      "transforms.outbox.table.field.event.type": "event_type",
      "transforms.outbox.table.field.event.payload": "payload",
      "transforms.outbox.route.by.field": "aggregate_type",
      "transforms.outbox.route.topic.replacement": "${routedByValue}-events",
    },
}

Spring Boot Kafka Consumer for CDC Events

@Component
@RequiredArgsConstructor
@Slf4j
public class OrderCDCConsumer {
 
    private final OrderProjectionService projectionService;
    private final ObjectMapper objectMapper;
 
    @KafkaListener(
        topics = "myapp.orders_db.orders",
        groupId = "order-projector",
        containerFactory = "kafkaListenerContainerFactory"
    )
    @Transactional
    public void handleOrderChange(ConsumerRecord<String, String> record) throws Exception {
        DebeziumChangeEvent changeEvent = objectMapper.readValue(
            record.value(), DebeziumChangeEvent.class);
 
        switch (changeEvent.getOp()) {
            case "c":  // CREATE
                projectionService.handleOrderCreated(changeEvent.getAfter());
                break;
            case "u":  // UPDATE
                projectionService.handleOrderUpdated(changeEvent.getBefore(),
                    changeEvent.getAfter());
                break;
            case "d":  // DELETE
                projectionService.handleOrderDeleted(changeEvent.getBefore());
                break;
            default:
                log.debug("Ignoring operation type: {}", changeEvent.getOp());
        }
    }
}

13. Production Configuration Reference

application.yml for Production

spring:
  datasource:
    url: jdbc:mysql://${DB_WRITE_HOST}:3306/${DB_NAME}?useSSL=true&requireSSL=true&serverTimezone=UTC&rewriteBatchedStatements=true&cachePrepStmts=true&prepStmtCacheSize=250&prepStmtCacheSqlLimit=2048
    username: ${DB_USERNAME}
    password: ${DB_PASSWORD}
    hikari:
      pool-name: MainPool
      maximum-pool-size: 20
      minimum-idle: 5
      idle-timeout: 600000 # 10 minutes
      max-lifetime: 1800000 # 30 minutes
      connection-timeout: 30000 # 30 seconds
      keepalive-time: 60000 # 1 minute -- prevents AWS firewall from closing idle connections
      validation-timeout: 5000
 
  jpa:
    open-in-view: false # CRITICAL: Do not hold DB connection for the entire HTTP request
    hibernate:
      ddl-auto: validate # Never use create/update in production
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQLDialect
        jdbc:
          batch_size: 50
          fetch_size: 100
        order_inserts: true
        order_updates: true
        batch_versioned_data: true
        connection:
          provider_disables_autocommit: true
        query:
          in_clause_parameter_padding: true
 
  transaction:
    default-timeout: 30 # seconds
 
logging:
  level:
    org.springframework.transaction: DEBUG # Enable in troubleshooting only
    org.hibernate.SQL: WARN
    com.zaxxer.hikari: INFO

pom.xml Dependencies

<dependencies>
    <!-- Spring Boot Starters -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
 
    <!-- MySQL Connector -->
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <scope>runtime</scope>
    </dependency>
 
    <!-- HikariCP (included in spring-boot-starter-data-jpa) -->
 
    <!-- Redisson for distributed locking -->
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson-spring-boot-starter</artifactId>
        <version>3.25.2</version>
    </dependency>
 
    <!-- Spring Retry for optimistic lock retries -->
    <dependency>
        <groupId>org.springframework.retry</groupId>
        <artifactId>spring-retry</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aspects</artifactId>
    </dependency>
 
    <!-- AWS SDK v2 for DynamoDB, SQS -->
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>dynamodb-enhanced</artifactId>
    </dependency>
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>sqs</artifactId>
    </dependency>
</dependencies>

Next: Part 4: AWS Production Configurations -- Infrastructure-level consistency configuration for RDS, Aurora, DynamoDB, and ElastiCache.


Part of the Consistency Models Demystified series
Stack: Java 17, Spring Boot 3.x, MySQL 8.0, AWS