Message Queues Demystified - Part 4: Advanced Concepts
The difference between a system that works in a demo and a system that works in production
often comes down to these advanced concepts. This is where most teams get tripped up.
Table of Contents
- Message Delivery Semantics
- Idempotent Consumers
- Message Deduplication
- Message Ordering - Guarantees, Trade-offs, and Patterns
- The Saga Pattern - Distributed Transactions
- The Outbox Pattern - Solving the Dual-Write Problem
- Transactional Messaging
- Backpressure - Detection and Handling
- Schema Evolution
- Consumer Offset Management and Replay
1. Message Delivery Semantics
This is one of the most frequently asked topics in interviews and one of the most misunderstood in practice.
At-Most-Once Delivery
Definition: A message is delivered at most one time. It may be delivered zero times (lost), but NEVER more than once.
How it works:
- Producer sends message and does not wait for acknowledgment
- Or: Consumer acknowledges (commits offset) BEFORE processing
At-most-once flow:
Producer --[message]--> Broker (no ACK wait)
|
Consumer receives
Consumer commits offset <-- ACK happens BEFORE processing
Consumer processes
Consumer crashes HERE <-- Message is lost, offset already committed
When it is acceptable:
- Metrics and telemetry data (losing an occasional metric is fine)
- Analytics events where approximate counts are acceptable
- Real-time sensor readings where the latest reading supersedes old ones
- Log streaming where completeness is not critical
Implementation (Kafka - auto-commit before processing):
// WARNING: This configuration loses messages on consumer crash
@KafkaListener(topics = "telemetry", containerFactory = "atMostOnceFactory")
public void handleTelemetry(SensorReading reading) {
// Offset is auto-committed before this code runs
// If this method throws, offset is already committed - message is lost
metricsService.record(reading);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SensorReading> atMostOnceFactory() {
ConcurrentKafkaListenerContainerFactory<String, SensorReading> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.COUNT);
// Auto-commit enabled - do not use this for critical data
return factory;
}At-Least-Once Delivery
Definition: A message is guaranteed to be delivered. It may be delivered multiple times.
This is the default for almost all production messaging systems.
How it works:
- Producer sends and waits for broker acknowledgment. If no ACK, producer retries.
- Consumer processes FIRST, then commits offset. If consumer crashes before committing, message is redelivered.
At-least-once flow:
Producer --[message]--> Broker
Producer <--[ACK]------ Broker (if no ACK, producer retries - potential duplicate)
Consumer receives message
Consumer processes <-- Consumer crashes HERE
Consumer crashes before committing offset
On restart:
Consumer restarts from last committed offset
Consumer receives the SAME message again
Consumer processes it AGAIN <-- DUPLICATE PROCESSING
Consumer commits offset
Why duplicates happen:
- Producer retries on network timeout (broker received it the first time, but ACK was lost)
- Consumer processes but crashes before ACKing (broker redelivers)
- Consumer takes too long and visibility timeout expires (SQS redelivers while first consumer is still working)
At-least-once is the right default when:
- You can make your consumer idempotent (safe to process same message twice)
- The cost of duplicate processing is acceptable and manageable
- You need guaranteed delivery over all other concerns
Exactly-Once Delivery
Definition: A message is delivered and processed exactly one time - no loss, no duplicates.
The hard truth: True exactly-once is extremely difficult to achieve end-to-end. The "Two Generals Problem" in distributed systems proves it is theoretically impossible without constraints. What we achieve in practice is "effectively exactly-once" through a combination of deduplication and idempotency.
Exactly-Once in Kafka (Broker Level)
Kafka provides exactly-once semantics at the broker level using:
-
Idempotent Producer: The broker assigns each producer a unique
Producer ID (PID). Each message gets a sequence number. If the same message is retried (due to ACK loss), the broker detects the duplicate PID+sequence number and ignores it. -
Transactional Producer: Allows atomic writes across multiple topics/partitions. Either ALL messages in a transaction are committed, or NONE are.
@Configuration
public class ExactlyOnceKafkaConfig {
@Bean
public ProducerFactory<String, Object> exactlyOnceProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // Idempotent producer
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-tx-1"); // Enable transactions
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
return new DefaultKafkaProducerFactory<>(config);
}
}
// Transactional producer usage
@Service
public class ExactlyOnceOrderService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void processAndPublish(Order order) {
// Begin transaction
kafkaTemplate.executeInTransaction(operations -> {
// All publishes in this block are atomic
operations.send("order-events", order.getId(), new OrderPlacedEvent(order));
operations.send("inventory-commands", order.getId(), new ReserveInventoryCommand(order));
operations.send("payment-commands", order.getId(), new ChargePaymentCommand(order));
// Either ALL succeed or ALL are rolled back
return true;
});
}
}- Consume-Transform-Produce pattern (read-process-write atomically):
// Read from input topic, transform, write to output topic - atomically
@KafkaListener(topics = "raw-orders")
@Transactional("kafkaTransactionManager")
public void processOrder(OrderEvent rawOrder, Acknowledgment ack) {
// 1. Transform
EnrichedOrder enrichedOrder = enrichmentService.enrich(rawOrder);
// 2. Publish to output topic
kafkaTemplate.send("enriched-orders", rawOrder.getOrderId(), enrichedOrder);
// 3. Commit offset atomically with the publish
// If kafka transaction commits, both the message publish and offset commit happen together
ack.acknowledge();
}The Reality of End-to-End Exactly-Once
Even with Kafka exactly-once at the broker level, your end-to-end processing can still have duplicates if:
- Your consumer writes to a database AND publishes to another topic (dual-write)
- Your downstream service is not idempotent
- Network issues cause partial operations
True end-to-end exactly-once requires:
- Idempotent producer (Kafka transactional ID)
- Transactional consume-transform-produce
- Idempotent consumer (database deduplication)
- Outbox pattern for external side effects
The cost: significantly higher latency and lower throughput. Use exactly-once only when the cost of duplicates truly exceeds this cost.
Delivery Semantics Summary
| Semantic | Message Loss? | Duplicates? | Complexity | Performance | Use When |
|---|---|---|---|---|---|
| At-most-once | Possible | Never | Low | Highest | Metrics, telemetry |
| At-least-once | Never | Possible | Medium | High | Most production systems |
| Exactly-once | Never | Never | Very High | Lower | Financial, payment, critical |
2. Idempotent Consumers
Why Idempotency Is Not Optional
With at-least-once delivery (the default for nearly all systems), your consumer WILL receive the same message more than once. Not "might." Will.
An idempotent operation produces the same result when executed one time or many times. An idempotent consumer gives you the safety of at-least-once delivery without the risk of duplicates corrupting your data.
The Non-Idempotent Problem
// DANGEROUS - not idempotent
@KafkaListener(topics = "order-payment-events")
public void handlePaymentReceived(PaymentEvent event) {
// This will debit the account EVERY time this message is delivered
// If message is delivered twice, account is debited twice
walletService.debitFunds(event.getCustomerId(), event.getAmount());
// This will insert a duplicate row on second delivery
paymentRepository.insert(new Payment(event.getPaymentId(), event.getAmount()));
}Strategy 1: Idempotency via Unique Constraint
The database enforces uniqueness. A duplicate operation fails gracefully.
@KafkaListener(topics = "order-payment-events")
public void handlePaymentReceived(PaymentEvent event) {
try {
// Database has UNIQUE constraint on payment_id
// If duplicate: INSERT throws constraint violation, we catch and ignore
Payment payment = new Payment(
event.getPaymentId(), // Natural deduplication key
event.getCustomerId(),
event.getAmount(),
event.getProcessedAt()
);
paymentRepository.save(payment);
walletService.debitFunds(event.getCustomerId(), event.getAmount());
} catch (DataIntegrityViolationException e) {
// Payment with this ID already exists - this is a duplicate delivery
// Safe to ignore
log.warn("Duplicate payment event ignored: {}", event.getPaymentId());
}
}Strategy 2: Processed Event Table
Maintain a table of processed message IDs. Check before processing.
@Service
@RequiredArgsConstructor
public class IdempotentEventProcessor {
private final ProcessedMessageRepository processedRepo;
private final OrderService orderService;
@KafkaListener(topics = "order-events")
@Transactional
public void handleOrderEvent(OrderEvent event, Acknowledgment ack) {
// Check if already processed
if (processedRepo.existsByMessageId(event.getMessageId())) {
log.info("Skipping duplicate message: {}", event.getMessageId());
ack.acknowledge();
return;
}
// Process the event
orderService.handleOrderPlaced(event);
// Record as processed within the same transaction
processedRepo.save(ProcessedMessage.builder()
.messageId(event.getMessageId())
.topic("order-events")
.processedAt(Instant.now())
.build());
ack.acknowledge();
}
}-- The processed_messages table
CREATE TABLE processed_messages (
message_id VARCHAR(255) PRIMARY KEY,
topic VARCHAR(255) NOT NULL,
processed_at TIMESTAMP NOT NULL,
consumer_group VARCHAR(255)
);
-- Clean up old entries periodically (no need to keep forever)
DELETE FROM processed_messages WHERE processed_at < NOW() - INTERVAL '7 days';Important: The check-and-insert must be in the same database transaction as the business logic. Otherwise you have a race condition.
Strategy 3: Conditional Updates (Optimistic Locking)
Instead of detecting duplicates, design your operations to be naturally idempotent.
// Non-idempotent:
UPDATE account SET balance = balance - 100 WHERE account_id = 123;
-- Running twice debits 200 total
// Idempotent:
UPDATE account
SET balance = balance - 100, last_payment_id = 'pay-456'
WHERE account_id = 123
AND last_payment_id != 'pay-456'; -- Only debit if this payment not already applied
-- Running twice only debits 100 total - second execution affects 0 rows@Service
public class IdempotentWalletService {
public boolean debitFunds(String accountId, BigDecimal amount, String paymentId) {
int rowsAffected = jdbcTemplate.update(
"""
UPDATE account
SET balance = balance - ?,
last_processed_payment = ?,
updated_at = NOW()
WHERE account_id = ?
AND last_processed_payment != ?
AND balance >= ?
""",
amount, paymentId, accountId, paymentId, amount
);
// rowsAffected = 0 means either:
// - Already processed (idempotent - this is fine)
// - Insufficient funds (need to check separately)
return rowsAffected > 0;
}
}Strategy 4: Event Sourcing with Natural Idempotency
In event sourcing, you append events to an event store. You never update state directly. The system state is derived by replaying events.
// Event store with version-based deduplication
public void appendEvent(DomainEvent event) {
// Event ID is unique - duplicate events violate the constraint
eventStore.append(
event.getAggregateId(),
event.getEventId(), // UNIQUE constraint
event.getEventType(),
event.getPayload(),
event.getOccurredAt()
);
}
// Processing is idempotent because the same event ID cannot be inserted twice3. Message Deduplication
Producer-Side Deduplication (Kafka Idempotent Producer)
The Kafka broker assigns each producer a ProducerID and tracks sequence numbers per partition. If the same sequence number arrives again, it is silently discarded.
Producer sends: ProducerID=42, Partition=0, Seq=100, Message="order-123"
Network fails, producer retries
Producer sends: ProducerID=42, Partition=0, Seq=100, Message="order-123"
Broker sees: same ProducerID + Seq already received
Broker discards the duplicate
Broker sends ACK as if it were new
This only protects against duplicate messages from the SAME producer instance. Across restarts, a new ProducerID is assigned.
Consumer-Side Deduplication (Universal Approach)
Regardless of the broker technology, consumer-side deduplication is the most reliable approach:
@Component
public class DeduplicationFilter {
// In-memory cache for very recent deduplication (fast, not durable)
private final Cache<String, Boolean> recentlyProcessed = Caffeine.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(Duration.ofMinutes(10))
.build();
// Persistent store for long-term deduplication
private final ProcessedMessageRepository persistentStore;
public boolean isDuplicate(String messageId) {
// Check fast in-memory cache first
if (recentlyProcessed.getIfPresent(messageId) != null) {
return true;
}
// Fall back to database for messages outside the cache window
if (persistentStore.existsByMessageId(messageId)) {
recentlyProcessed.put(messageId, true); // Cache for future
return true;
}
return false;
}
public void markProcessed(String messageId) {
recentlyProcessed.put(messageId, true);
persistentStore.save(new ProcessedMessage(messageId, Instant.now()));
}
}SQS FIFO Built-in Deduplication
SQS FIFO queues have built-in message deduplication within a 5-minute window:
SendMessageRequest request = SendMessageRequest.builder()
.queueUrl(fifoQueueUrl)
.messageBody(objectMapper.writeValueAsString(orderEvent))
.messageGroupId(orderId) // Messages in same group are ordered
.messageDeduplicationId( // Dedup key - same ID within 5 min = ignored
"order-" + orderId + "-placed"
)
.build();
sqsClient.sendMessage(request);
// Sending again with same messageDeduplicationId within 5 minutes is silently ignored4. Message Ordering
The Ordering Problem
Most systems have at least some ordering requirements:
- All events for a given order must be processed in the order they occurred
- User account updates must be applied in sequence
- Payment events must be applied before refund events for the same payment
Ordering in Kafka (Partition-Level Ordering)
Kafka guarantees strict ordering within a partition. This is both the constraint and the solution.
Strategy: Route messages that need relative ordering to the SAME partition
by using a consistent partition key.
Partition key = orderId
All events for order-123 always go to Partition 0
All events for order-456 always go to Partition 2
Result: Events for the same order are always processed in order.
Events for different orders can be processed in parallel.
// All events for the same orderId go to the same partition
kafkaTemplate.send("order-events", orderId.toString(), event);
// Behind the scenes:
// partition = hash(orderId) % numberOfPartitions
// Same orderId = same partition = same consumer = same processing orderWhen you need global ordering (all messages in a topic, not just per key):
- Use a topic with exactly ONE partition
- Throughput is limited to what one consumer can handle
- This is a deliberate trade-off: ordering vs scalability
Ordering in RabbitMQ
RabbitMQ preserves message order within a single queue when:
- Only one consumer is active
- Messages are not rejected and requeued (requeued messages go back to the front)
With competing consumers, ordering is NOT guaranteed because:
- Consumer A might process message 2 before Consumer B finishes with message 1
- Requeued messages are interleaved with new messages
RabbitMQ ordering solution - Single Active Consumer:
@Bean
public Queue orderedQueue() {
return QueueBuilder.durable("ordered-processing")
.withArgument("x-single-active-consumer", true)
// Only one consumer is active at a time
// If it dies, the next consumer takes over from where it left off
.build();
}Limitation: One active consumer means no parallelism for this queue.
Ordering in SQS FIFO with Message Groups
SQS FIFO uses message groups to maintain ordering within a group while allowing parallelism across groups:
// Order events - use orderId as group
SendMessageRequest orderEvent = SendMessageRequest.builder()
.queueUrl(fifoQueueUrl)
.messageBody(eventBody)
.messageGroupId(orderId) // All order-123 events are ordered relative to each other
.messageDeduplicationId(deduplicationKey)
.build();
// Payment events for different orders processed in parallel
// But events for the SAME orderId are strictly orderedHow FIFO message groups work:
Group "order-123": [placed, paid, shipped, delivered] --> processed in order by one consumer
Group "order-456": [placed, paid, shipped] --> processed in order by another consumer
Groups process in parallel, but within each group, FIFO order is maintained.
The Out-of-Order Message Problem
Even with ordering mechanisms, out-of-order messages can arrive due to:
- Network reordering between producer and broker
- Multiple producer instances writing to the same partition
- Message retries inserting messages out of order
Handling out-of-order messages with sequence numbers:
@KafkaListener(topics = "order-events")
public void handleOrderEvent(OrderEvent event) {
Order currentOrder = orderRepository.findById(event.getOrderId()).orElseThrow();
// Check if this event is out of order
if (event.getSequenceNumber() <= currentOrder.getLastProcessedSequence()) {
log.warn("Out-of-order or duplicate event: expected {}, got {}",
currentOrder.getLastProcessedSequence() + 1,
event.getSequenceNumber());
return; // Skip - already processed or older than current state
}
// Process the event and update the sequence
applyEvent(currentOrder, event);
currentOrder.setLastProcessedSequence(event.getSequenceNumber());
orderRepository.save(currentOrder);
}5. The Saga Pattern - Distributed Transactions
The Problem: ACID Transactions Do Not Cross Service Boundaries
In a monolith, you can wrap multiple operations in a single database transaction:
// Works perfectly in a monolith
@Transactional
public void placeOrder(OrderRequest request) {
paymentRepository.debit(request.getPaymentId(), request.getAmount()); // Step 1
inventoryRepository.reserve(request.getItems()); // Step 2
orderRepository.save(new Order(request)); // Step 3
// If ANY step fails, ALL are rolled back atomically
}In microservices, each service has its own database. There is no shared transaction. If Step 2 succeeds but Step 3 fails, you have:
- Payment debited (Step 1: done)
- Inventory reserved (Step 2: done)
- Order NOT created (Step 3: failed)
This is an inconsistent state. The Saga pattern solves this.
What is a Saga?
A Saga is a sequence of local transactions where each transaction is followed by an event/message. If any transaction fails, compensating transactions are executed in reverse order to undo the completed steps.
Saga Pattern 1: Choreography-Based Saga
Each service publishes events and listens for events. There is no central coordinator.
Order Placement Saga (Choreography):
OrderService:
1. Creates order (status: PENDING)
2. Publishes: OrderCreated event
PaymentService (listens to OrderCreated):
3. Charges customer card
4a. Success: Publishes PaymentSucceeded event
4b. Failure: Publishes PaymentFailed event
InventoryService (listens to PaymentSucceeded):
5. Reserves inventory
6a. Success: Publishes InventoryReserved event
6b. Failure: Publishes InventoryReservationFailed event
FulfillmentService (listens to InventoryReserved):
7. Creates shipment
8. Publishes ShipmentCreated event
OrderService (listens to ShipmentCreated):
9. Updates order status to CONFIRMED
--- Failure path ---
OrderService (listens to PaymentFailed):
Updates order status to FAILED
Publishes: OrderCancelled event
OrderService (listens to InventoryReservationFailed):
Updates order status to FAILED
Publishes: OrderCancelled event (compensating event)
PaymentService (listens to OrderCancelled):
Refunds the payment (compensating transaction)
Implementation:
// Payment Service - choreography participant
@Service
public class PaymentSagaParticipant {
@KafkaListener(topics = "order-events", groupId = "payment-saga-group")
public void handleOrderCreated(OrderEvent event) {
if (!"OrderCreated".equals(event.getEventType())) return;
try {
PaymentResult result = paymentGateway.charge(
event.getCustomerId(),
event.getTotalAmount(),
event.getOrderId()
);
if (result.isSuccessful()) {
kafkaTemplate.send("payment-events",
event.getOrderId(),
PaymentEvent.succeeded(event.getOrderId(), result.getTransactionId())
);
} else {
kafkaTemplate.send("payment-events",
event.getOrderId(),
PaymentEvent.failed(event.getOrderId(), result.getFailureReason())
);
}
} catch (Exception e) {
kafkaTemplate.send("payment-events",
event.getOrderId(),
PaymentEvent.failed(event.getOrderId(), e.getMessage())
);
}
}
// Compensating transaction handler
@KafkaListener(topics = "order-events", groupId = "payment-saga-group")
public void handleOrderCancelled(OrderEvent event) {
if (!"OrderCancelled".equals(event.getEventType())) return;
paymentGateway.refund(event.getOrderId());
}
}Saga Pattern 2: Orchestration-Based Saga
A central Saga Orchestrator manages the workflow and tells each service what to do.
@Service
@Slf4j
public class OrderSagaOrchestrator {
private final Map<String, SagaState> activeSagas = new ConcurrentHashMap<>();
// Entry point - triggered when order is created
public void startOrderSaga(Order order) {
SagaState saga = SagaState.builder()
.sagaId(UUID.randomUUID().toString())
.orderId(order.getId())
.currentStep(SagaStep.PAYMENT_PENDING)
.build();
activeSagas.put(saga.getSagaId(), saga);
sagaRepository.save(saga);
// Step 1: Command the Payment Service
commandGateway.send("payment-commands",
ChargePaymentCommand.builder()
.sagaId(saga.getSagaId())
.orderId(order.getId())
.customerId(order.getCustomerId())
.amount(order.getTotalAmount())
.build()
);
}
// Payment Service replies via event
@KafkaListener(topics = "saga-replies", groupId = "saga-orchestrator")
public void handleSagaReply(SagaReplyEvent reply) {
SagaState saga = sagaRepository.findBySagaId(reply.getSagaId())
.orElseThrow(() -> new SagaNotFoundException(reply.getSagaId()));
switch (saga.getCurrentStep()) {
case PAYMENT_PENDING:
if (reply.isSuccess()) {
saga.setCurrentStep(SagaStep.INVENTORY_PENDING);
sagaRepository.save(saga);
// Step 2: Command Inventory Service
commandGateway.send("inventory-commands",
ReserveInventoryCommand.of(saga.getSagaId(), saga.getOrderId())
);
} else {
// Payment failed - cancel the order, no compensation needed (nothing to undo)
saga.setCurrentStep(SagaStep.FAILED);
sagaRepository.save(saga);
orderService.cancelOrder(saga.getOrderId(), reply.getFailureReason());
}
break;
case INVENTORY_PENDING:
if (reply.isSuccess()) {
saga.setCurrentStep(SagaStep.FULFILLMENT_PENDING);
sagaRepository.save(saga);
// Step 3: Command Fulfillment Service
commandGateway.send("fulfillment-commands",
CreateShipmentCommand.of(saga.getSagaId(), saga.getOrderId())
);
} else {
// Inventory failed - compensate: refund the payment
saga.setCurrentStep(SagaStep.COMPENSATING);
sagaRepository.save(saga);
commandGateway.send("payment-commands",
RefundPaymentCommand.of(saga.getSagaId(), saga.getOrderId())
);
orderService.cancelOrder(saga.getOrderId(), "Inventory unavailable");
}
break;
case FULFILLMENT_PENDING:
if (reply.isSuccess()) {
saga.setCurrentStep(SagaStep.COMPLETED);
sagaRepository.save(saga);
orderService.confirmOrder(saga.getOrderId());
}
break;
}
}
}6. The Outbox Pattern - Solving the Dual-Write Problem
The Dual-Write Problem
This is one of the most critical problems in distributed systems with message queues.
// The dangerous code that appears in many codebases
@Transactional
public void placeOrder(OrderRequest request) {
Order order = orderRepository.save(request); // Write 1: Database
kafkaTemplate.send("order-events", new OrderPlacedEvent(order)); // Write 2: Kafka
// PROBLEM: These two writes are NOT atomic.
// Scenario A: App crashes after DB save, before Kafka publish
// --> Order in DB, but event never published = silent data loss
// Scenario B: Kafka is temporarily unavailable
// --> Order in DB, exception thrown, transaction rolled back
// --> Customer sees error, order was actually not saved (good)
// --> But any work done before the publish is lost
// Scenario C: Kafka publish succeeds but DB transaction rolls back
// --> Event published but no order exists = consumers process ghost orders
}The Outbox Pattern Solution
Write to an outbox table in the SAME database transaction as your business logic. A separate outbox relay reads from this table and publishes to the message queue.
Step 1: Business Transaction
+-------------------------------------------+
| BEGIN TRANSACTION |
| INSERT INTO orders (...) VALUES (...) |
| INSERT INTO outbox ( |
| event_type = 'OrderPlaced', |
| payload = '{"orderId": ...}', |
| published = false |
| ) |
| COMMIT | <-- Atomic: either both succeed or both fail
+-------------------------------------------+
Step 2: Outbox Relay (separate process/thread)
Poll outbox WHERE published = false
|
v
For each pending entry:
Publish to Kafka
If success: UPDATE outbox SET published = true WHERE id = ?
If failure: Leave as unpublished, retry on next poll
This guarantees that if the order is saved, the event WILL eventually be published (at-least-once). If the save fails, no event is published.
Implementation
// Domain service - uses outbox for reliable event publishing
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
@Transactional // Single transaction covers BOTH writes
public Order placeOrder(PlaceOrderRequest request) {
// Business operation
Order order = Order.from(request);
orderRepository.save(order);
// Write to outbox IN THE SAME TRANSACTION
OutboxEvent outboxEvent = OutboxEvent.builder()
.id(UUID.randomUUID())
.aggregateType("Order")
.aggregateId(order.getId().toString())
.eventType("OrderPlaced")
.payload(objectMapper.writeValueAsString(OrderPlacedEvent.from(order)))
.createdAt(Instant.now())
.published(false)
.build();
outboxRepository.save(outboxEvent);
// No Kafka call here. The relay handles it.
return order;
}
}
// Outbox table schema
// CREATE TABLE outbox_events (
// id UUID PRIMARY KEY,
// aggregate_type VARCHAR(100) NOT NULL,
// aggregate_id VARCHAR(255) NOT NULL,
// event_type VARCHAR(100) NOT NULL,
// payload TEXT NOT NULL,
// created_at TIMESTAMP NOT NULL,
// published BOOLEAN NOT NULL DEFAULT FALSE,
// published_at TIMESTAMP,
// error_message TEXT
// );
// The Outbox Relay - publishes pending events
@Component
@RequiredArgsConstructor
@Slf4j
public class OutboxRelay {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Scheduled(fixedDelay = 1000) // Run every second
@Transactional
public void relayPendingEvents() {
List<OutboxEvent> pending = outboxRepository.findTop100ByPublishedFalseOrderByCreatedAtAsc();
for (OutboxEvent event : pending) {
try {
// Determine topic from event type
String topic = topicResolver.resolve(event.getEventType());
// Publish to Kafka
kafkaTemplate.send(topic, event.getAggregateId(), event.getPayload()).get();
// .get() makes it synchronous - wait for broker ACK
// Mark as published
event.setPublished(true);
event.setPublishedAt(Instant.now());
outboxRepository.save(event);
} catch (Exception e) {
log.error("Failed to relay outbox event {}: {}", event.getId(), e.getMessage());
event.setErrorMessage(e.getMessage());
outboxRepository.save(event);
// Will retry on next poll
}
}
}
}Production-Grade Outbox with Debezium (CDC Approach)
For high-throughput systems, polling the outbox table every second is inefficient. Use Debezium (Change Data Capture) to stream the database's transaction log directly to Kafka:
# Debezium connector configuration
{
"name": "outbox-connector",
"config":
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "orders",
"table.include.list": "public.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.fields.additional.placement": "aggregate_type:header:aggregateType",
},
}Debezium reads the PostgreSQL WAL (Write-Ahead Log) and publishes new outbox rows to Kafka in near real-time, with zero polling overhead.
7. Transactional Messaging
Kafka Transactions (Full Produce-Consume-Produce)
The canonical exactly-once pattern in Kafka:
@Configuration
public class KafkaTransactionalConfig {
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
}
@Service
public class OrderEnrichmentService {
@KafkaListener(topics = "raw-orders", groupId = "enrichment-group")
@Transactional("kafkaTransactionManager")
public void enrichOrder(RawOrderEvent rawOrder, Acknowledgment ack) {
// Read from "raw-orders"
// Transform
EnrichedOrder enriched = enrichmentService.enrich(rawOrder);
// Write to "enriched-orders"
// This ENTIRE operation is atomic:
// - Produce to enriched-orders AND
// - Commit offset on raw-orders
// Either both happen or neither does (never one without the other)
kafkaTemplate.send("enriched-orders", rawOrder.getOrderId(), enriched);
ack.acknowledge();
}
}8. Backpressure
What is Backpressure?
Backpressure is the condition where a consumer cannot process messages as fast as they are being produced. The queue grows without bound, eventually causing:
- Out-of-memory errors in the broker
- Message expiry (TTL violations)
- Delayed processing that breaks SLAs
- Cascading failures as the queue depth triggers alerts
Detecting Backpressure
The key metric is consumer lag (the difference between the latest produced offset and the latest consumed offset in Kafka):
Kafka Topic: order-events, Partition 0
Latest produced offset: 100,000
Latest consumed offset: 95,000
Consumer lag: 5,000 messages
If this lag is growing over time, you have backpressure.
If it is stable or shrinking, consumers are keeping up.
Handling Backpressure
Strategy 1: Scale out consumers
# Scale up Kubernetes deployment
kubectl scale deployment email-service --replicas=20
# The new consumer instances will be assigned partitions during rebalance
# More parallelism = faster message consumptionStrategy 2: Increase batch size and optimize consumer throughput
// Process messages in batches instead of one at a time
@KafkaListener(topics = "order-events", batch = "true")
public void handleBatch(List<OrderEvent> events) {
// Process 100 events in one database call instead of 100 separate calls
orderService.bulkProcess(events);
}
// Batch database operation
@Repository
public class OrderRepository {
public void bulkProcess(List<OrderEvent> events) {
jdbcTemplate.batchUpdate(
"INSERT INTO processed_orders(order_id, status) VALUES (?, ?)",
events,
100,
(ps, event) -> {
ps.setString(1, event.getOrderId());
ps.setString(2, "PROCESSED");
}
);
}
}Strategy 3: Circuit Breaker on the Consumer
If the downstream system (database, external API) is slow, stop consuming temporarily to avoid overwhelming it:
@Component
public class CircuitBreakerAwareConsumer {
private final CircuitBreaker circuitBreaker;
private volatile boolean circuitOpen = false;
@KafkaListener(topics = "order-events")
public void handleOrder(OrderEvent event, Acknowledgment ack) {
if (circuitOpen) {
// Pause consumption - broker will redeliver later
throw new ConsumerPausedException("Circuit breaker is open");
}
try {
circuitBreaker.executeRunnable(() -> orderService.process(event));
ack.acknowledge();
} catch (CallNotPermittedException e) {
// Circuit is open - do not ACK, message will be redelivered after cooldown
circuitOpen = true;
scheduleCircuitReset(30, TimeUnit.SECONDS);
}
}
}Strategy 4: Rate Limiting
@Component
public class RateLimitedConsumer {
// Allow max 1000 messages per second
private final RateLimiter rateLimiter = RateLimiter.create(1000.0);
@KafkaListener(topics = "high-volume-events")
public void handleEvent(HighVolumeEvent event, Acknowledgment ack) {
// Block until we have a token (rate limit admission)
rateLimiter.acquire();
externalApiService.process(event);
ack.acknowledge();
}
}Strategy 5: Increase partition count (Kafka)
More partitions allow more consumer instances to work in parallel:
# Add partitions (can only increase, never decrease)
kafka-topics.sh --alter --topic order-events --partitions 12 --bootstrap-server kafka:9092
# Then scale consumer group to 12 instances
# Each instance handles one partition = 4x throughput improvementWarning: Increasing partitions changes the partition key distribution. Messages that were previously in Partition 0 may now go to different partitions. This breaks per-key ordering for in-flight messages during the migration.
9. Schema Evolution
The Problem
Your system is live. You have 50 consumers reading from a Kafka topic. You need to add a new field to your message schema. How do you do this without taking down all consumers simultaneously?
Schema Evolution Strategies
Strategy 1: Backward Compatibility (Safe Default)
New schema can read messages written with the OLD schema. Old consumers can still read new messages.
Adding optional fields with defaults is backward compatible:
// Version 1 - Original schema
public class OrderEvent {
private String orderId; // required
private String customerId; // required
private BigDecimal amount; // required
}
// Version 2 - Adding optional field (backward compatible)
public class OrderEvent {
private String orderId; // required
private String customerId; // required
private BigDecimal amount; // required
private String couponCode; // NEW - optional, old consumers ignore it safely
private String shippingMethod; // NEW - optional with default
}Old consumers will simply ignore the new couponCode field during deserialization. New consumers can use it.
Strategy 2: Forward Compatibility
Old schema can read messages written with the NEW schema. Useful when you want to deploy producers first.
Strategy 3: Avro with Schema Registry
Apache Avro is a binary serialization format with a formal schema definition. The Confluent Schema Registry stores all schema versions and enforces compatibility rules.
// OrderEvent.avsc - Avro schema
{
"type": "record",
"name": "OrderEvent",
"namespace": "com.company.events",
"fields": [
{ "name": "orderId", "type": "string" },
{ "name": "customerId", "type": "string" },
{
"name": "amount",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 2
}
},
{ "name": "couponCode", "type": ["null", "string"], "default": null }, // Optional - null default
{ "name": "schemaVersion", "type": "int", "default": 2 }
]
}How Schema Registry works:
- Producer sends message with schema ID (not the full schema - just the ID)
- Consumer fetches schema by ID from Schema Registry if not cached
- Consumer deserializes message using the fetched schema
- Schema Registry enforces compatibility rules (BACKWARD, FORWARD, FULL, NONE)
@Configuration
public class SchemaRegistryKafkaConfig {
@Bean
public ProducerFactory<String, OrderEvent> avroProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://schema-registry:8081");
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class);
// Schema compatibility mode - new schemas must be backward compatible
config.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, true);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public ConsumerFactory<String, OrderEvent> avroConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://schema-registry:8081");
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class);
config.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(config);
}
}Schema Evolution Rules (Avro Compatibility Matrix)
| Change | BACKWARD | FORWARD | FULL |
|---|---|---|---|
| Add optional field (null default) | Safe | Safe | Safe |
| Remove optional field | Safe | Unsafe | Unsafe |
| Add required field (no default) | Unsafe | Safe | Unsafe |
| Remove required field | Unsafe | Unsafe | Unsafe |
| Change field type | Usually Unsafe | Usually Unsafe | Usually Unsafe |
| Rename field | Unsafe (use alias) | Unsafe | Unsafe |
Golden rule: Always add fields as optional with a sensible default. Never remove fields unless you have confirmed all consumers have been updated.
10. Consumer Offset Management and Replay
Offset Management in Kafka
Kafka stores committed offsets in a special internal topic: __consumer_offsets.
Consumer Group: "email-service-group"
Topic: order-events
Committed offsets:
Partition 0: offset 1500 (next message to consume is at offset 1501)
Partition 1: offset 2300
Partition 2: offset 1750
When a consumer restarts, it reads from the committed offsets. No messages are missed.
auto.offset.reset - What to Do When There Is No Committed Offset
// When a new consumer group starts (no committed offsets exist yet)
// "earliest" - start from the very beginning of the topic
// Good for: replaying all historical events, new analytics consumer
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// "latest" - start from the current end of the topic (ignore historical messages)
// Good for: real-time processing where historical messages are irrelevant
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// "none" - throw an exception if no committed offset exists
// Good for: strict environments where an unintended new consumer group would be a bug
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");Replaying Messages - A Kafka Superpower
# Scenario: Bug found in Analytics Service. Need to reprocess last 24 hours of events.
# Step 1: Stop the consumer group
kubectl scale deployment analytics-service --replicas=0
# Step 2: Reset the offset to 24 hours ago
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group analytics-service-group \
--topic order-events \
--reset-offsets \
--to-datetime 2026-06-04T10:00:00.000 \
--execute
# Step 3: Deploy the fixed consumer
kubectl set image deployment/analytics-service analytics=analytics:v2.1
# Step 4: Scale back up - it will reprocess from the reset offset
kubectl scale deployment analytics-service --replicas=3This is impossible with RabbitMQ or SQS (messages are deleted after consumption). It is one of Kafka's most powerful operational capabilities.
Seeking to Specific Offset Programmatically
@Component
public class ReplayableKafkaConsumer implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(
Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
// Option 1: Replay from beginning
assignments.forEach((partition, offset) ->
callback.seekToBeginning(partition.topic(), partition.partition()));
// Option 2: Replay from specific timestamp
assignments.forEach((partition, offset) ->
callback.seekToTimestamp(partition.topic(), partition.partition(),
Instant.now().minus(Duration.ofHours(24)).toEpochMilli()));
// Option 3: Skip to end (ignore all historical messages)
assignments.forEach((partition, offset) ->
callback.seekToEnd(partition.topic(), partition.partition()));
}
}Summary of Advanced Concepts
| Concept | Core Principle | Common Mistake |
|---|---|---|
| Delivery Semantics | Know which you have; design for it | Assuming exactly-once when you have at-least-once |
| Idempotent Consumers | Same message processed twice = same result | Not designing for duplicate delivery |
| Deduplication | Track processed message IDs | Forgetting to clean up dedup table |
| Ordering | Partition key determines order group | Expecting global ordering without single partition |
| Saga Pattern | Compensating transactions for rollback | Not implementing compensation |
| Outbox Pattern | Same DB transaction for business + event | Dual-write creates inconsistency windows |
| Backpressure | Consumer lag is the signal | Ignoring queue depth metrics |
| Schema Evolution | Add optional fields; never remove without plan | Breaking change deployed to all consumers at once |
| Offset Management | Commit only after successful processing | Auto-commit before processing causes message loss |
Previous: Part 3 - Technologies Deep Dive
Next: Part 5 - Operations and Performance
Index: Message Queues Demystified - Index