← Back to Articles
6/6/2026Admin Post

message queues part4 advanced concepts

Message Queues Demystified - Part 4: Advanced Concepts

The difference between a system that works in a demo and a system that works in production
often comes down to these advanced concepts. This is where most teams get tripped up.


Table of Contents

  1. Message Delivery Semantics
  2. Idempotent Consumers
  3. Message Deduplication
  4. Message Ordering - Guarantees, Trade-offs, and Patterns
  5. The Saga Pattern - Distributed Transactions
  6. The Outbox Pattern - Solving the Dual-Write Problem
  7. Transactional Messaging
  8. Backpressure - Detection and Handling
  9. Schema Evolution
  10. Consumer Offset Management and Replay

1. Message Delivery Semantics

This is one of the most frequently asked topics in interviews and one of the most misunderstood in practice.

At-Most-Once Delivery

Definition: A message is delivered at most one time. It may be delivered zero times (lost), but NEVER more than once.

How it works:

  • Producer sends message and does not wait for acknowledgment
  • Or: Consumer acknowledges (commits offset) BEFORE processing
At-most-once flow:
Producer --[message]--> Broker (no ACK wait)
                              |
                        Consumer receives
                        Consumer commits offset   <-- ACK happens BEFORE processing
                        Consumer processes
                        Consumer crashes HERE     <-- Message is lost, offset already committed

When it is acceptable:

  • Metrics and telemetry data (losing an occasional metric is fine)
  • Analytics events where approximate counts are acceptable
  • Real-time sensor readings where the latest reading supersedes old ones
  • Log streaming where completeness is not critical

Implementation (Kafka - auto-commit before processing):

// WARNING: This configuration loses messages on consumer crash
@KafkaListener(topics = "telemetry", containerFactory = "atMostOnceFactory")
public void handleTelemetry(SensorReading reading) {
    // Offset is auto-committed before this code runs
    // If this method throws, offset is already committed - message is lost
    metricsService.record(reading);
}
 
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SensorReading> atMostOnceFactory() {
    ConcurrentKafkaListenerContainerFactory<String, SensorReading> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.COUNT);
    // Auto-commit enabled - do not use this for critical data
    return factory;
}

At-Least-Once Delivery

Definition: A message is guaranteed to be delivered. It may be delivered multiple times.

This is the default for almost all production messaging systems.

How it works:

  • Producer sends and waits for broker acknowledgment. If no ACK, producer retries.
  • Consumer processes FIRST, then commits offset. If consumer crashes before committing, message is redelivered.
At-least-once flow:
Producer --[message]--> Broker
Producer <--[ACK]------ Broker  (if no ACK, producer retries - potential duplicate)

Consumer receives message
Consumer processes       <-- Consumer crashes HERE
Consumer crashes before committing offset

On restart:
Consumer restarts from last committed offset
Consumer receives the SAME message again
Consumer processes it AGAIN  <-- DUPLICATE PROCESSING
Consumer commits offset

Why duplicates happen:

  1. Producer retries on network timeout (broker received it the first time, but ACK was lost)
  2. Consumer processes but crashes before ACKing (broker redelivers)
  3. Consumer takes too long and visibility timeout expires (SQS redelivers while first consumer is still working)

At-least-once is the right default when:

  • You can make your consumer idempotent (safe to process same message twice)
  • The cost of duplicate processing is acceptable and manageable
  • You need guaranteed delivery over all other concerns

Exactly-Once Delivery

Definition: A message is delivered and processed exactly one time - no loss, no duplicates.

The hard truth: True exactly-once is extremely difficult to achieve end-to-end. The "Two Generals Problem" in distributed systems proves it is theoretically impossible without constraints. What we achieve in practice is "effectively exactly-once" through a combination of deduplication and idempotency.

Exactly-Once in Kafka (Broker Level)

Kafka provides exactly-once semantics at the broker level using:

  1. Idempotent Producer: The broker assigns each producer a unique Producer ID (PID). Each message gets a sequence number. If the same message is retried (due to ACK loss), the broker detects the duplicate PID+sequence number and ignores it.

  2. Transactional Producer: Allows atomic writes across multiple topics/partitions. Either ALL messages in a transaction are committed, or NONE are.

@Configuration
public class ExactlyOnceKafkaConfig {
 
    @Bean
    public ProducerFactory<String, Object> exactlyOnceProducerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);   // Idempotent producer
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-tx-1"); // Enable transactions
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        return new DefaultKafkaProducerFactory<>(config);
    }
}
 
// Transactional producer usage
@Service
public class ExactlyOnceOrderService {
 
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    public void processAndPublish(Order order) {
        // Begin transaction
        kafkaTemplate.executeInTransaction(operations -> {
            // All publishes in this block are atomic
            operations.send("order-events", order.getId(), new OrderPlacedEvent(order));
            operations.send("inventory-commands", order.getId(), new ReserveInventoryCommand(order));
            operations.send("payment-commands", order.getId(), new ChargePaymentCommand(order));
            // Either ALL succeed or ALL are rolled back
            return true;
        });
    }
}
  1. Consume-Transform-Produce pattern (read-process-write atomically):
// Read from input topic, transform, write to output topic - atomically
@KafkaListener(topics = "raw-orders")
@Transactional("kafkaTransactionManager")
public void processOrder(OrderEvent rawOrder, Acknowledgment ack) {
    // 1. Transform
    EnrichedOrder enrichedOrder = enrichmentService.enrich(rawOrder);
 
    // 2. Publish to output topic
    kafkaTemplate.send("enriched-orders", rawOrder.getOrderId(), enrichedOrder);
 
    // 3. Commit offset atomically with the publish
    // If kafka transaction commits, both the message publish and offset commit happen together
    ack.acknowledge();
}

The Reality of End-to-End Exactly-Once

Even with Kafka exactly-once at the broker level, your end-to-end processing can still have duplicates if:

  • Your consumer writes to a database AND publishes to another topic (dual-write)
  • Your downstream service is not idempotent
  • Network issues cause partial operations

True end-to-end exactly-once requires:

  1. Idempotent producer (Kafka transactional ID)
  2. Transactional consume-transform-produce
  3. Idempotent consumer (database deduplication)
  4. Outbox pattern for external side effects

The cost: significantly higher latency and lower throughput. Use exactly-once only when the cost of duplicates truly exceeds this cost.


Delivery Semantics Summary

SemanticMessage Loss?Duplicates?ComplexityPerformanceUse When
At-most-oncePossibleNeverLowHighestMetrics, telemetry
At-least-onceNeverPossibleMediumHighMost production systems
Exactly-onceNeverNeverVery HighLowerFinancial, payment, critical

2. Idempotent Consumers

Why Idempotency Is Not Optional

With at-least-once delivery (the default for nearly all systems), your consumer WILL receive the same message more than once. Not "might." Will.

An idempotent operation produces the same result when executed one time or many times. An idempotent consumer gives you the safety of at-least-once delivery without the risk of duplicates corrupting your data.

The Non-Idempotent Problem

// DANGEROUS - not idempotent
@KafkaListener(topics = "order-payment-events")
public void handlePaymentReceived(PaymentEvent event) {
    // This will debit the account EVERY time this message is delivered
    // If message is delivered twice, account is debited twice
    walletService.debitFunds(event.getCustomerId(), event.getAmount());
 
    // This will insert a duplicate row on second delivery
    paymentRepository.insert(new Payment(event.getPaymentId(), event.getAmount()));
}

Strategy 1: Idempotency via Unique Constraint

The database enforces uniqueness. A duplicate operation fails gracefully.

@KafkaListener(topics = "order-payment-events")
public void handlePaymentReceived(PaymentEvent event) {
    try {
        // Database has UNIQUE constraint on payment_id
        // If duplicate: INSERT throws constraint violation, we catch and ignore
        Payment payment = new Payment(
            event.getPaymentId(),   // Natural deduplication key
            event.getCustomerId(),
            event.getAmount(),
            event.getProcessedAt()
        );
        paymentRepository.save(payment);
        walletService.debitFunds(event.getCustomerId(), event.getAmount());
 
    } catch (DataIntegrityViolationException e) {
        // Payment with this ID already exists - this is a duplicate delivery
        // Safe to ignore
        log.warn("Duplicate payment event ignored: {}", event.getPaymentId());
    }
}

Strategy 2: Processed Event Table

Maintain a table of processed message IDs. Check before processing.

@Service
@RequiredArgsConstructor
public class IdempotentEventProcessor {
 
    private final ProcessedMessageRepository processedRepo;
    private final OrderService orderService;
 
    @KafkaListener(topics = "order-events")
    @Transactional
    public void handleOrderEvent(OrderEvent event, Acknowledgment ack) {
 
        // Check if already processed
        if (processedRepo.existsByMessageId(event.getMessageId())) {
            log.info("Skipping duplicate message: {}", event.getMessageId());
            ack.acknowledge();
            return;
        }
 
        // Process the event
        orderService.handleOrderPlaced(event);
 
        // Record as processed within the same transaction
        processedRepo.save(ProcessedMessage.builder()
            .messageId(event.getMessageId())
            .topic("order-events")
            .processedAt(Instant.now())
            .build());
 
        ack.acknowledge();
    }
}
-- The processed_messages table
CREATE TABLE processed_messages (
    message_id    VARCHAR(255) PRIMARY KEY,
    topic         VARCHAR(255) NOT NULL,
    processed_at  TIMESTAMP    NOT NULL,
    consumer_group VARCHAR(255)
);
 
-- Clean up old entries periodically (no need to keep forever)
DELETE FROM processed_messages WHERE processed_at < NOW() - INTERVAL '7 days';

Important: The check-and-insert must be in the same database transaction as the business logic. Otherwise you have a race condition.


Strategy 3: Conditional Updates (Optimistic Locking)

Instead of detecting duplicates, design your operations to be naturally idempotent.

// Non-idempotent:
UPDATE account SET balance = balance - 100 WHERE account_id = 123;
-- Running twice debits 200 total
 
// Idempotent:
UPDATE account
SET balance = balance - 100, last_payment_id = 'pay-456'
WHERE account_id = 123
  AND last_payment_id != 'pay-456';  -- Only debit if this payment not already applied
-- Running twice only debits 100 total - second execution affects 0 rows
@Service
public class IdempotentWalletService {
 
    public boolean debitFunds(String accountId, BigDecimal amount, String paymentId) {
        int rowsAffected = jdbcTemplate.update(
            """
            UPDATE account
            SET balance = balance - ?,
                last_processed_payment = ?,
                updated_at = NOW()
            WHERE account_id = ?
              AND last_processed_payment != ?
              AND balance >= ?
            """,
            amount, paymentId, accountId, paymentId, amount
        );
 
        // rowsAffected = 0 means either:
        // - Already processed (idempotent - this is fine)
        // - Insufficient funds (need to check separately)
        return rowsAffected > 0;
    }
}

Strategy 4: Event Sourcing with Natural Idempotency

In event sourcing, you append events to an event store. You never update state directly. The system state is derived by replaying events.

// Event store with version-based deduplication
public void appendEvent(DomainEvent event) {
    // Event ID is unique - duplicate events violate the constraint
    eventStore.append(
        event.getAggregateId(),
        event.getEventId(),     // UNIQUE constraint
        event.getEventType(),
        event.getPayload(),
        event.getOccurredAt()
    );
}
 
// Processing is idempotent because the same event ID cannot be inserted twice

3. Message Deduplication

Producer-Side Deduplication (Kafka Idempotent Producer)

The Kafka broker assigns each producer a ProducerID and tracks sequence numbers per partition. If the same sequence number arrives again, it is silently discarded.

Producer sends: ProducerID=42, Partition=0, Seq=100, Message="order-123"
Network fails, producer retries
Producer sends: ProducerID=42, Partition=0, Seq=100, Message="order-123"
Broker sees: same ProducerID + Seq already received
Broker discards the duplicate
Broker sends ACK as if it were new

This only protects against duplicate messages from the SAME producer instance. Across restarts, a new ProducerID is assigned.

Consumer-Side Deduplication (Universal Approach)

Regardless of the broker technology, consumer-side deduplication is the most reliable approach:

@Component
public class DeduplicationFilter {
 
    // In-memory cache for very recent deduplication (fast, not durable)
    private final Cache<String, Boolean> recentlyProcessed = Caffeine.newBuilder()
        .maximumSize(10_000)
        .expireAfterWrite(Duration.ofMinutes(10))
        .build();
 
    // Persistent store for long-term deduplication
    private final ProcessedMessageRepository persistentStore;
 
    public boolean isDuplicate(String messageId) {
        // Check fast in-memory cache first
        if (recentlyProcessed.getIfPresent(messageId) != null) {
            return true;
        }
 
        // Fall back to database for messages outside the cache window
        if (persistentStore.existsByMessageId(messageId)) {
            recentlyProcessed.put(messageId, true); // Cache for future
            return true;
        }
 
        return false;
    }
 
    public void markProcessed(String messageId) {
        recentlyProcessed.put(messageId, true);
        persistentStore.save(new ProcessedMessage(messageId, Instant.now()));
    }
}

SQS FIFO Built-in Deduplication

SQS FIFO queues have built-in message deduplication within a 5-minute window:

SendMessageRequest request = SendMessageRequest.builder()
    .queueUrl(fifoQueueUrl)
    .messageBody(objectMapper.writeValueAsString(orderEvent))
    .messageGroupId(orderId)         // Messages in same group are ordered
    .messageDeduplicationId(         // Dedup key - same ID within 5 min = ignored
        "order-" + orderId + "-placed"
    )
    .build();
 
sqsClient.sendMessage(request);
// Sending again with same messageDeduplicationId within 5 minutes is silently ignored

4. Message Ordering

The Ordering Problem

Most systems have at least some ordering requirements:

  • All events for a given order must be processed in the order they occurred
  • User account updates must be applied in sequence
  • Payment events must be applied before refund events for the same payment

Ordering in Kafka (Partition-Level Ordering)

Kafka guarantees strict ordering within a partition. This is both the constraint and the solution.

Strategy: Route messages that need relative ordering to the SAME partition
          by using a consistent partition key.

Partition key = orderId
All events for order-123 always go to Partition 0
All events for order-456 always go to Partition 2

Result: Events for the same order are always processed in order.
        Events for different orders can be processed in parallel.
// All events for the same orderId go to the same partition
kafkaTemplate.send("order-events", orderId.toString(), event);
 
// Behind the scenes:
// partition = hash(orderId) % numberOfPartitions
// Same orderId = same partition = same consumer = same processing order

When you need global ordering (all messages in a topic, not just per key):

  • Use a topic with exactly ONE partition
  • Throughput is limited to what one consumer can handle
  • This is a deliberate trade-off: ordering vs scalability

Ordering in RabbitMQ

RabbitMQ preserves message order within a single queue when:

  • Only one consumer is active
  • Messages are not rejected and requeued (requeued messages go back to the front)

With competing consumers, ordering is NOT guaranteed because:

  • Consumer A might process message 2 before Consumer B finishes with message 1
  • Requeued messages are interleaved with new messages

RabbitMQ ordering solution - Single Active Consumer:

@Bean
public Queue orderedQueue() {
    return QueueBuilder.durable("ordered-processing")
        .withArgument("x-single-active-consumer", true)
        // Only one consumer is active at a time
        // If it dies, the next consumer takes over from where it left off
        .build();
}

Limitation: One active consumer means no parallelism for this queue.


Ordering in SQS FIFO with Message Groups

SQS FIFO uses message groups to maintain ordering within a group while allowing parallelism across groups:

// Order events - use orderId as group
SendMessageRequest orderEvent = SendMessageRequest.builder()
    .queueUrl(fifoQueueUrl)
    .messageBody(eventBody)
    .messageGroupId(orderId)     // All order-123 events are ordered relative to each other
    .messageDeduplicationId(deduplicationKey)
    .build();
 
// Payment events for different orders processed in parallel
// But events for the SAME orderId are strictly ordered

How FIFO message groups work:

Group "order-123": [placed, paid, shipped, delivered]  --> processed in order by one consumer
Group "order-456": [placed, paid, shipped]             --> processed in order by another consumer
Groups process in parallel, but within each group, FIFO order is maintained.

The Out-of-Order Message Problem

Even with ordering mechanisms, out-of-order messages can arrive due to:

  • Network reordering between producer and broker
  • Multiple producer instances writing to the same partition
  • Message retries inserting messages out of order

Handling out-of-order messages with sequence numbers:

@KafkaListener(topics = "order-events")
public void handleOrderEvent(OrderEvent event) {
    Order currentOrder = orderRepository.findById(event.getOrderId()).orElseThrow();
 
    // Check if this event is out of order
    if (event.getSequenceNumber() <= currentOrder.getLastProcessedSequence()) {
        log.warn("Out-of-order or duplicate event: expected {}, got {}",
            currentOrder.getLastProcessedSequence() + 1,
            event.getSequenceNumber());
        return; // Skip - already processed or older than current state
    }
 
    // Process the event and update the sequence
    applyEvent(currentOrder, event);
    currentOrder.setLastProcessedSequence(event.getSequenceNumber());
    orderRepository.save(currentOrder);
}

5. The Saga Pattern - Distributed Transactions

The Problem: ACID Transactions Do Not Cross Service Boundaries

In a monolith, you can wrap multiple operations in a single database transaction:

// Works perfectly in a monolith
@Transactional
public void placeOrder(OrderRequest request) {
    paymentRepository.debit(request.getPaymentId(), request.getAmount()); // Step 1
    inventoryRepository.reserve(request.getItems());                      // Step 2
    orderRepository.save(new Order(request));                             // Step 3
    // If ANY step fails, ALL are rolled back atomically
}

In microservices, each service has its own database. There is no shared transaction. If Step 2 succeeds but Step 3 fails, you have:

  • Payment debited (Step 1: done)
  • Inventory reserved (Step 2: done)
  • Order NOT created (Step 3: failed)

This is an inconsistent state. The Saga pattern solves this.

What is a Saga?

A Saga is a sequence of local transactions where each transaction is followed by an event/message. If any transaction fails, compensating transactions are executed in reverse order to undo the completed steps.

Saga Pattern 1: Choreography-Based Saga

Each service publishes events and listens for events. There is no central coordinator.

Order Placement Saga (Choreography):

OrderService:
  1. Creates order (status: PENDING)
  2. Publishes: OrderCreated event

PaymentService (listens to OrderCreated):
  3. Charges customer card
  4a. Success: Publishes PaymentSucceeded event
  4b. Failure: Publishes PaymentFailed event

InventoryService (listens to PaymentSucceeded):
  5. Reserves inventory
  6a. Success: Publishes InventoryReserved event
  6b. Failure: Publishes InventoryReservationFailed event

FulfillmentService (listens to InventoryReserved):
  7. Creates shipment
  8. Publishes ShipmentCreated event

OrderService (listens to ShipmentCreated):
  9. Updates order status to CONFIRMED

--- Failure path ---

OrderService (listens to PaymentFailed):
  Updates order status to FAILED
  Publishes: OrderCancelled event

OrderService (listens to InventoryReservationFailed):
  Updates order status to FAILED
  Publishes: OrderCancelled event (compensating event)

PaymentService (listens to OrderCancelled):
  Refunds the payment (compensating transaction)

Implementation:

// Payment Service - choreography participant
@Service
public class PaymentSagaParticipant {
 
    @KafkaListener(topics = "order-events", groupId = "payment-saga-group")
    public void handleOrderCreated(OrderEvent event) {
        if (!"OrderCreated".equals(event.getEventType())) return;
 
        try {
            PaymentResult result = paymentGateway.charge(
                event.getCustomerId(),
                event.getTotalAmount(),
                event.getOrderId()
            );
 
            if (result.isSuccessful()) {
                kafkaTemplate.send("payment-events",
                    event.getOrderId(),
                    PaymentEvent.succeeded(event.getOrderId(), result.getTransactionId())
                );
            } else {
                kafkaTemplate.send("payment-events",
                    event.getOrderId(),
                    PaymentEvent.failed(event.getOrderId(), result.getFailureReason())
                );
            }
 
        } catch (Exception e) {
            kafkaTemplate.send("payment-events",
                event.getOrderId(),
                PaymentEvent.failed(event.getOrderId(), e.getMessage())
            );
        }
    }
 
    // Compensating transaction handler
    @KafkaListener(topics = "order-events", groupId = "payment-saga-group")
    public void handleOrderCancelled(OrderEvent event) {
        if (!"OrderCancelled".equals(event.getEventType())) return;
        paymentGateway.refund(event.getOrderId());
    }
}

Saga Pattern 2: Orchestration-Based Saga

A central Saga Orchestrator manages the workflow and tells each service what to do.

@Service
@Slf4j
public class OrderSagaOrchestrator {
 
    private final Map<String, SagaState> activeSagas = new ConcurrentHashMap<>();
 
    // Entry point - triggered when order is created
    public void startOrderSaga(Order order) {
        SagaState saga = SagaState.builder()
            .sagaId(UUID.randomUUID().toString())
            .orderId(order.getId())
            .currentStep(SagaStep.PAYMENT_PENDING)
            .build();
 
        activeSagas.put(saga.getSagaId(), saga);
        sagaRepository.save(saga);
 
        // Step 1: Command the Payment Service
        commandGateway.send("payment-commands",
            ChargePaymentCommand.builder()
                .sagaId(saga.getSagaId())
                .orderId(order.getId())
                .customerId(order.getCustomerId())
                .amount(order.getTotalAmount())
                .build()
        );
    }
 
    // Payment Service replies via event
    @KafkaListener(topics = "saga-replies", groupId = "saga-orchestrator")
    public void handleSagaReply(SagaReplyEvent reply) {
        SagaState saga = sagaRepository.findBySagaId(reply.getSagaId())
            .orElseThrow(() -> new SagaNotFoundException(reply.getSagaId()));
 
        switch (saga.getCurrentStep()) {
            case PAYMENT_PENDING:
                if (reply.isSuccess()) {
                    saga.setCurrentStep(SagaStep.INVENTORY_PENDING);
                    sagaRepository.save(saga);
 
                    // Step 2: Command Inventory Service
                    commandGateway.send("inventory-commands",
                        ReserveInventoryCommand.of(saga.getSagaId(), saga.getOrderId())
                    );
                } else {
                    // Payment failed - cancel the order, no compensation needed (nothing to undo)
                    saga.setCurrentStep(SagaStep.FAILED);
                    sagaRepository.save(saga);
                    orderService.cancelOrder(saga.getOrderId(), reply.getFailureReason());
                }
                break;
 
            case INVENTORY_PENDING:
                if (reply.isSuccess()) {
                    saga.setCurrentStep(SagaStep.FULFILLMENT_PENDING);
                    sagaRepository.save(saga);
 
                    // Step 3: Command Fulfillment Service
                    commandGateway.send("fulfillment-commands",
                        CreateShipmentCommand.of(saga.getSagaId(), saga.getOrderId())
                    );
                } else {
                    // Inventory failed - compensate: refund the payment
                    saga.setCurrentStep(SagaStep.COMPENSATING);
                    sagaRepository.save(saga);
 
                    commandGateway.send("payment-commands",
                        RefundPaymentCommand.of(saga.getSagaId(), saga.getOrderId())
                    );
                    orderService.cancelOrder(saga.getOrderId(), "Inventory unavailable");
                }
                break;
 
            case FULFILLMENT_PENDING:
                if (reply.isSuccess()) {
                    saga.setCurrentStep(SagaStep.COMPLETED);
                    sagaRepository.save(saga);
                    orderService.confirmOrder(saga.getOrderId());
                }
                break;
        }
    }
}

6. The Outbox Pattern - Solving the Dual-Write Problem

The Dual-Write Problem

This is one of the most critical problems in distributed systems with message queues.

// The dangerous code that appears in many codebases
@Transactional
public void placeOrder(OrderRequest request) {
    Order order = orderRepository.save(request);  // Write 1: Database
 
    kafkaTemplate.send("order-events", new OrderPlacedEvent(order));  // Write 2: Kafka
 
    // PROBLEM: These two writes are NOT atomic.
    // Scenario A: App crashes after DB save, before Kafka publish
    //             --> Order in DB, but event never published = silent data loss
    // Scenario B: Kafka is temporarily unavailable
    //             --> Order in DB, exception thrown, transaction rolled back
    //             --> Customer sees error, order was actually not saved (good)
    //             --> But any work done before the publish is lost
    // Scenario C: Kafka publish succeeds but DB transaction rolls back
    //             --> Event published but no order exists = consumers process ghost orders
}

The Outbox Pattern Solution

Write to an outbox table in the SAME database transaction as your business logic. A separate outbox relay reads from this table and publishes to the message queue.

Step 1: Business Transaction
+-------------------------------------------+
| BEGIN TRANSACTION                         |
| INSERT INTO orders (...) VALUES (...)     |
| INSERT INTO outbox (                      |
|   event_type = 'OrderPlaced',             |
|   payload = '{"orderId": ...}',           |
|   published = false                       |
| )                                         |
| COMMIT                                    |  <-- Atomic: either both succeed or both fail
+-------------------------------------------+

Step 2: Outbox Relay (separate process/thread)
Poll outbox WHERE published = false
  |
  v
For each pending entry:
  Publish to Kafka
  If success: UPDATE outbox SET published = true WHERE id = ?
  If failure: Leave as unpublished, retry on next poll

This guarantees that if the order is saved, the event WILL eventually be published (at-least-once). If the save fails, no event is published.

Implementation

// Domain service - uses outbox for reliable event publishing
@Service
@RequiredArgsConstructor
public class OrderService {
 
    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;
 
    @Transactional  // Single transaction covers BOTH writes
    public Order placeOrder(PlaceOrderRequest request) {
        // Business operation
        Order order = Order.from(request);
        orderRepository.save(order);
 
        // Write to outbox IN THE SAME TRANSACTION
        OutboxEvent outboxEvent = OutboxEvent.builder()
            .id(UUID.randomUUID())
            .aggregateType("Order")
            .aggregateId(order.getId().toString())
            .eventType("OrderPlaced")
            .payload(objectMapper.writeValueAsString(OrderPlacedEvent.from(order)))
            .createdAt(Instant.now())
            .published(false)
            .build();
 
        outboxRepository.save(outboxEvent);
 
        // No Kafka call here. The relay handles it.
        return order;
    }
}
 
// Outbox table schema
// CREATE TABLE outbox_events (
//   id            UUID PRIMARY KEY,
//   aggregate_type VARCHAR(100) NOT NULL,
//   aggregate_id   VARCHAR(255) NOT NULL,
//   event_type     VARCHAR(100) NOT NULL,
//   payload        TEXT NOT NULL,
//   created_at     TIMESTAMP NOT NULL,
//   published      BOOLEAN NOT NULL DEFAULT FALSE,
//   published_at   TIMESTAMP,
//   error_message  TEXT
// );
 
// The Outbox Relay - publishes pending events
@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxRelay {
 
    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;
 
    @Scheduled(fixedDelay = 1000)  // Run every second
    @Transactional
    public void relayPendingEvents() {
        List<OutboxEvent> pending = outboxRepository.findTop100ByPublishedFalseOrderByCreatedAtAsc();
 
        for (OutboxEvent event : pending) {
            try {
                // Determine topic from event type
                String topic = topicResolver.resolve(event.getEventType());
 
                // Publish to Kafka
                kafkaTemplate.send(topic, event.getAggregateId(), event.getPayload()).get();
                // .get() makes it synchronous - wait for broker ACK
 
                // Mark as published
                event.setPublished(true);
                event.setPublishedAt(Instant.now());
                outboxRepository.save(event);
 
            } catch (Exception e) {
                log.error("Failed to relay outbox event {}: {}", event.getId(), e.getMessage());
                event.setErrorMessage(e.getMessage());
                outboxRepository.save(event);
                // Will retry on next poll
            }
        }
    }
}

Production-Grade Outbox with Debezium (CDC Approach)

For high-throughput systems, polling the outbox table every second is inefficient. Use Debezium (Change Data Capture) to stream the database's transaction log directly to Kafka:

# Debezium connector configuration
{
  "name": "outbox-connector",
  "config":
    {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "debezium",
      "database.password": "secret",
      "database.dbname": "orders",
      "table.include.list": "public.outbox_events",
      "transforms": "outbox",
      "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
      "transforms.outbox.table.fields.additional.placement": "aggregate_type:header:aggregateType",
    },
}

Debezium reads the PostgreSQL WAL (Write-Ahead Log) and publishes new outbox rows to Kafka in near real-time, with zero polling overhead.


7. Transactional Messaging

Kafka Transactions (Full Produce-Consume-Produce)

The canonical exactly-once pattern in Kafka:

@Configuration
public class KafkaTransactionalConfig {
 
    @Bean
    public KafkaTransactionManager<String, Object> kafkaTransactionManager(
            ProducerFactory<String, Object> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }
}
 
@Service
public class OrderEnrichmentService {
 
    @KafkaListener(topics = "raw-orders", groupId = "enrichment-group")
    @Transactional("kafkaTransactionManager")
    public void enrichOrder(RawOrderEvent rawOrder, Acknowledgment ack) {
        // Read from "raw-orders"
        // Transform
        EnrichedOrder enriched = enrichmentService.enrich(rawOrder);
 
        // Write to "enriched-orders"
        // This ENTIRE operation is atomic:
        //   - Produce to enriched-orders AND
        //   - Commit offset on raw-orders
        // Either both happen or neither does (never one without the other)
        kafkaTemplate.send("enriched-orders", rawOrder.getOrderId(), enriched);
        ack.acknowledge();
    }
}

8. Backpressure

What is Backpressure?

Backpressure is the condition where a consumer cannot process messages as fast as they are being produced. The queue grows without bound, eventually causing:

  • Out-of-memory errors in the broker
  • Message expiry (TTL violations)
  • Delayed processing that breaks SLAs
  • Cascading failures as the queue depth triggers alerts

Detecting Backpressure

The key metric is consumer lag (the difference between the latest produced offset and the latest consumed offset in Kafka):

Kafka Topic: order-events, Partition 0

Latest produced offset: 100,000
Latest consumed offset: 95,000
Consumer lag:            5,000 messages

If this lag is growing over time, you have backpressure.
If it is stable or shrinking, consumers are keeping up.

Handling Backpressure

Strategy 1: Scale out consumers

# Scale up Kubernetes deployment
kubectl scale deployment email-service --replicas=20
 
# The new consumer instances will be assigned partitions during rebalance
# More parallelism = faster message consumption

Strategy 2: Increase batch size and optimize consumer throughput

// Process messages in batches instead of one at a time
@KafkaListener(topics = "order-events", batch = "true")
public void handleBatch(List<OrderEvent> events) {
    // Process 100 events in one database call instead of 100 separate calls
    orderService.bulkProcess(events);
}
 
// Batch database operation
@Repository
public class OrderRepository {
    public void bulkProcess(List<OrderEvent> events) {
        jdbcTemplate.batchUpdate(
            "INSERT INTO processed_orders(order_id, status) VALUES (?, ?)",
            events,
            100,
            (ps, event) -> {
                ps.setString(1, event.getOrderId());
                ps.setString(2, "PROCESSED");
            }
        );
    }
}

Strategy 3: Circuit Breaker on the Consumer

If the downstream system (database, external API) is slow, stop consuming temporarily to avoid overwhelming it:

@Component
public class CircuitBreakerAwareConsumer {
 
    private final CircuitBreaker circuitBreaker;
    private volatile boolean circuitOpen = false;
 
    @KafkaListener(topics = "order-events")
    public void handleOrder(OrderEvent event, Acknowledgment ack) {
        if (circuitOpen) {
            // Pause consumption - broker will redeliver later
            throw new ConsumerPausedException("Circuit breaker is open");
        }
 
        try {
            circuitBreaker.executeRunnable(() -> orderService.process(event));
            ack.acknowledge();
 
        } catch (CallNotPermittedException e) {
            // Circuit is open - do not ACK, message will be redelivered after cooldown
            circuitOpen = true;
            scheduleCircuitReset(30, TimeUnit.SECONDS);
        }
    }
}

Strategy 4: Rate Limiting

@Component
public class RateLimitedConsumer {
 
    // Allow max 1000 messages per second
    private final RateLimiter rateLimiter = RateLimiter.create(1000.0);
 
    @KafkaListener(topics = "high-volume-events")
    public void handleEvent(HighVolumeEvent event, Acknowledgment ack) {
        // Block until we have a token (rate limit admission)
        rateLimiter.acquire();
 
        externalApiService.process(event);
        ack.acknowledge();
    }
}

Strategy 5: Increase partition count (Kafka)

More partitions allow more consumer instances to work in parallel:

# Add partitions (can only increase, never decrease)
kafka-topics.sh --alter --topic order-events --partitions 12 --bootstrap-server kafka:9092
 
# Then scale consumer group to 12 instances
# Each instance handles one partition = 4x throughput improvement

Warning: Increasing partitions changes the partition key distribution. Messages that were previously in Partition 0 may now go to different partitions. This breaks per-key ordering for in-flight messages during the migration.


9. Schema Evolution

The Problem

Your system is live. You have 50 consumers reading from a Kafka topic. You need to add a new field to your message schema. How do you do this without taking down all consumers simultaneously?

Schema Evolution Strategies

Strategy 1: Backward Compatibility (Safe Default)

New schema can read messages written with the OLD schema. Old consumers can still read new messages.

Adding optional fields with defaults is backward compatible:

// Version 1 - Original schema
public class OrderEvent {
    private String orderId;       // required
    private String customerId;    // required
    private BigDecimal amount;    // required
}
 
// Version 2 - Adding optional field (backward compatible)
public class OrderEvent {
    private String orderId;           // required
    private String customerId;        // required
    private BigDecimal amount;        // required
    private String couponCode;        // NEW - optional, old consumers ignore it safely
    private String shippingMethod;    // NEW - optional with default
}

Old consumers will simply ignore the new couponCode field during deserialization. New consumers can use it.

Strategy 2: Forward Compatibility

Old schema can read messages written with the NEW schema. Useful when you want to deploy producers first.

Strategy 3: Avro with Schema Registry

Apache Avro is a binary serialization format with a formal schema definition. The Confluent Schema Registry stores all schema versions and enforces compatibility rules.

// OrderEvent.avsc - Avro schema
{
  "type": "record",
  "name": "OrderEvent",
  "namespace": "com.company.events",
  "fields": [
    { "name": "orderId", "type": "string" },
    { "name": "customerId", "type": "string" },
    {
      "name": "amount",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 10,
        "scale": 2
      }
    },
    { "name": "couponCode", "type": ["null", "string"], "default": null }, // Optional - null default
    { "name": "schemaVersion", "type": "int", "default": 2 }
  ]
}

How Schema Registry works:

  1. Producer sends message with schema ID (not the full schema - just the ID)
  2. Consumer fetches schema by ID from Schema Registry if not cached
  3. Consumer deserializes message using the fetched schema
  4. Schema Registry enforces compatibility rules (BACKWARD, FORWARD, FULL, NONE)
@Configuration
public class SchemaRegistryKafkaConfig {
 
    @Bean
    public ProducerFactory<String, OrderEvent> avroProducerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://schema-registry:8081");
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            KafkaAvroSerializer.class);
        // Schema compatibility mode - new schemas must be backward compatible
        config.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, true);
        return new DefaultKafkaProducerFactory<>(config);
    }
 
    @Bean
    public ConsumerFactory<String, OrderEvent> avroConsumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://schema-registry:8081");
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            KafkaAvroDeserializer.class);
        config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        return new DefaultKafkaConsumerFactory<>(config);
    }
}

Schema Evolution Rules (Avro Compatibility Matrix)

ChangeBACKWARDFORWARDFULL
Add optional field (null default)SafeSafeSafe
Remove optional fieldSafeUnsafeUnsafe
Add required field (no default)UnsafeSafeUnsafe
Remove required fieldUnsafeUnsafeUnsafe
Change field typeUsually UnsafeUsually UnsafeUsually Unsafe
Rename fieldUnsafe (use alias)UnsafeUnsafe

Golden rule: Always add fields as optional with a sensible default. Never remove fields unless you have confirmed all consumers have been updated.


10. Consumer Offset Management and Replay

Offset Management in Kafka

Kafka stores committed offsets in a special internal topic: __consumer_offsets.

Consumer Group: "email-service-group"
Topic: order-events

Committed offsets:
  Partition 0: offset 1500  (next message to consume is at offset 1501)
  Partition 1: offset 2300
  Partition 2: offset 1750

When a consumer restarts, it reads from the committed offsets. No messages are missed.

auto.offset.reset - What to Do When There Is No Committed Offset

// When a new consumer group starts (no committed offsets exist yet)
 
// "earliest" - start from the very beginning of the topic
// Good for: replaying all historical events, new analytics consumer
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
// "latest" - start from the current end of the topic (ignore historical messages)
// Good for: real-time processing where historical messages are irrelevant
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
 
// "none" - throw an exception if no committed offset exists
// Good for: strict environments where an unintended new consumer group would be a bug
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

Replaying Messages - A Kafka Superpower

# Scenario: Bug found in Analytics Service. Need to reprocess last 24 hours of events.
 
# Step 1: Stop the consumer group
kubectl scale deployment analytics-service --replicas=0
 
# Step 2: Reset the offset to 24 hours ago
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group analytics-service-group \
  --topic order-events \
  --reset-offsets \
  --to-datetime 2026-06-04T10:00:00.000 \
  --execute
 
# Step 3: Deploy the fixed consumer
kubectl set image deployment/analytics-service analytics=analytics:v2.1
 
# Step 4: Scale back up - it will reprocess from the reset offset
kubectl scale deployment analytics-service --replicas=3

This is impossible with RabbitMQ or SQS (messages are deleted after consumption). It is one of Kafka's most powerful operational capabilities.


Seeking to Specific Offset Programmatically

@Component
public class ReplayableKafkaConsumer implements ConsumerSeekAware {
 
    @Override
    public void onPartitionsAssigned(
            Map<TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {
 
        // Option 1: Replay from beginning
        assignments.forEach((partition, offset) ->
            callback.seekToBeginning(partition.topic(), partition.partition()));
 
        // Option 2: Replay from specific timestamp
        assignments.forEach((partition, offset) ->
            callback.seekToTimestamp(partition.topic(), partition.partition(),
                Instant.now().minus(Duration.ofHours(24)).toEpochMilli()));
 
        // Option 3: Skip to end (ignore all historical messages)
        assignments.forEach((partition, offset) ->
            callback.seekToEnd(partition.topic(), partition.partition()));
    }
}

Summary of Advanced Concepts

ConceptCore PrincipleCommon Mistake
Delivery SemanticsKnow which you have; design for itAssuming exactly-once when you have at-least-once
Idempotent ConsumersSame message processed twice = same resultNot designing for duplicate delivery
DeduplicationTrack processed message IDsForgetting to clean up dedup table
OrderingPartition key determines order groupExpecting global ordering without single partition
Saga PatternCompensating transactions for rollbackNot implementing compensation
Outbox PatternSame DB transaction for business + eventDual-write creates inconsistency windows
BackpressureConsumer lag is the signalIgnoring queue depth metrics
Schema EvolutionAdd optional fields; never remove without planBreaking change deployed to all consumers at once
Offset ManagementCommit only after successful processingAuto-commit before processing causes message loss

Previous: Part 3 - Technologies Deep Dive
Next: Part 5 - Operations and Performance
Index: Message Queues Demystified - Index