Message Queues - Supplement 2: Production Challenges & Real-World Solutions
Series Navigation:
Main Index |
Supplement 1 - Anti-Patterns Extended |
Supplement 3 - Trade-Offs & Decision Guide |
Supplement 4 - Real-World Architecture
Each challenge in this supplement represents a class of real production incidents encountered
across companies at scale. Every solution includes: root cause, detection method,
immediate mitigation, permanent fix, and post-mortem lessons.
Table of Contents
- The Consumer Group Rebalancing Storm
- The Hot Partition Bottleneck
- The Duplicate Payment Incident
- Consumer Lag That Never Recovers
- The Schema Registry Cascade Failure
- Cross-Datacenter Message Ordering Violations
- The Broker Disk Full Outage
- Silent Data Loss from Broker Leader Election
- The Offset Commit Race Condition
- Thundering Herd After Planned Maintenance
- The Ghost Consumer - Zombie Consumer Group
- Certificate Expiry Killing All Producers
- Message Ordering Violation in Financial State Machine
- Backpressure Cascade into Full System Outage
- The Multi-Region Active-Active Split-Brain
1. The Consumer Group Rebalancing Storm
Incident Description
A payments processing service started experiencing 60-second processing gaps every 3-5 minutes during peak hours. Transactions were being delayed, causing SLA violations. The on-call team initially suspected a database issue.
Root Cause Analysis
Timeline:
T+0:00 Consumer group "payment-processor" stabilizes after deployment
T+0:03 Traffic peaks: each consumer takes 250ms per message
T+0:05 max.poll.interval.ms=300000ms (5 min) - fine
T+0:10 BUT: each poll fetches max.poll.records=500 messages
500 messages x 250ms = 125 seconds to process one poll batch
max.poll.interval.ms = 300,000ms (5 minutes)
...seems fine?
T+0:15 Payment service makes external API call: card authorization
External API slow today: 2 seconds per call (normally 200ms)
500 messages x 2000ms = 1000 seconds to process one batch
EXCEEDS max.poll.interval.ms = 300 seconds
T+0:20 Kafka broker marks consumer as DEAD (poll interval exceeded)
T+0:20 REBALANCE TRIGGERED: all partitions reassigned
During rebalance: NO processing for 45-60 seconds
T+0:25 Rebalance completes, consumer restarts from last committed offset
T+0:25 Some messages re-delivered (the ones being processed when consumer died)
T+0:28 Consumer polls 500 messages again, including re-delivered ones
T+0:33 External API still slow: same cycle repeats
T+0:35 REBALANCE AGAIN
This pattern repeats every 3-5 minutes indefinitely
Detection Signals
Metrics that would have caught this earlier:
1. kafka_consumer_fetch_rate{group="payment-processor"} drops to 0 every 3-5 minutes
2. kafka_consumer_rebalance_rate{group="payment-processor"} > 0 during business hours
3. kafka_consumer_group_coordinator_request_total{type="JoinGroup"} increasing
4. Application logs: "Rebalance happened: partitions assigned/revoked" repeating
5. External payment API latency P99 > 500ms
Immediate Mitigation
// Emergency config change (zero-downtime via Spring Cloud Config)
management.kafka.consumer.max-poll-records=10 // Reduced from 500 to 10
// This reduces per-poll processing time from 1000s to 20s
// Rebalancing stops immediately after config propagatesPermanent Fix
// Fix 1: Decouple poll frequency from processing time
// Use separate thread for processing - consumer thread only polls
@Configuration
public class ResilientConsumerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PaymentEvent> factory(
ConsumerFactory<String, PaymentEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, PaymentEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);
// CRITICAL: Set max.poll.records small
// Process fewer messages per poll to stay within max.poll.interval.ms
// Rule: max.poll.records * worst_case_processing_time_ms < max.poll.interval.ms
factory.getContainerProperties().setConsumerProperties(
propertiesBuilder()
.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50) // Reduced
.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000) // 5 minutes
.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000) // 45 seconds
.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000) // 15 seconds
.build()
);
return factory;
}
}
// Fix 2: Async processing with bounded queue
@KafkaListener(topics = "payment-commands", containerFactory = "resilientFactory")
public void handlePayment(PaymentEvent event, Acknowledgment ack) {
// Submit to bounded executor - never block the poll thread
CompletableFuture.runAsync(() -> {
try {
paymentProcessor.processWithTimeout(event, Duration.ofSeconds(10));
ack.acknowledge();
} catch (Exception e) {
log.error("Payment processing failed: {}", event.getPaymentId(), e);
// Do NOT ack - message redelivered after visibility timeout
}
}, paymentExecutor); // paymentExecutor has bounded queue and rejection policy
}Post-Mortem Lessons
1. max.poll.records must be sized for worst-case, not average-case latency
2. External API timeouts must be factored into poll interval sizing
3. Alert on rebalance frequency (> 1 per hour during steady state = investigate)
4. Every deployment should calculate: max_poll_records × worst_case_ms < max_poll_interval_ms
5. Add circuit breaker for external payment API to fast-fail instead of timing out
2. The Hot Partition Bottleneck
Incident Description
An e-commerce platform's order processing throughput mysteriously caps at 2,000 orders/minute despite having 12 Kafka partitions and 12 consumer instances. Scaling to 24 consumers has zero effect.
Root Cause Analysis
kafka-consumer-groups.sh --describe --group order-processor
GROUP TOPIC PARTITION CURRENT-OFFSET OFFSET LAG
order-processor order-events 0 5,234,891 5,234,891 0
order-processor order-events 1 5,234,892 5,234,892 0
order-processor order-events 2 98,234,891 124,891,234 26,656,343 ← HOT!
order-processor order-events 3 5,234,893 5,234,893 0
...
order-processor order-events 11 5,234,890 5,234,890 0
Partition 2 has 26 MILLION message lag while all others have zero.
One consumer is processing Partition 2 alone - cannot parallelize within a partition.
Why Partition 2 is Hot
// Producer code (the culprit)
public void publishOrder(Order order) {
String partitionKey = order.getMerchantId(); // Used as partition key
kafkaTemplate.send("order-events", partitionKey, event);
}
// Problem: Top 3 merchants by order volume:
// Merchant ID "AMAZON-MARKETPLACE": 40% of all orders
// Merchant ID "WALMART-MARKETPLACE": 25% of all orders
// Merchant ID "EBAY-MARKETPLACE": 15% of all orders
// hash("AMAZON-MARKETPLACE") % 12 = 2 (maps to Partition 2!)
// 40% of all orders → single partition → single consumer → bottleneck
// Adding more consumers does nothing:
// Extra consumers for partition 2 = impossible (one consumer per partition in Kafka)
// Other 11 partitions are idleDetection Signals
# Detect hot partitions
kafka-consumer-groups.sh --describe --group order-processor \
| awk '{print $4, $5}' | sort -k2 -rn | head -5
# Shows partitions sorted by lag - hot partition will stand out
# Per-partition throughput via JMX/Prometheus
kafka_partition_records_count{topic="order-events"}
# One partition showing 10x the count of others = hot partitionSolution: Multi-Level Partition Keys
// Fix 1: Compound partition key (merchant + random suffix)
// Groups merchant orders together for ordering, but distributes across multiple partitions
public void publishOrder(Order order) {
// Instead of just merchantId, use merchantId + bucket number
// Each merchant gets up to 4 "virtual lanes"
int bucket = order.getOrderId().hashCode() & 3; // 0, 1, 2, or 3
String partitionKey = order.getMerchantId() + "-" + bucket;
kafkaTemplate.send("order-events", partitionKey, event);
}
// "AMAZON-MARKETPLACE" now maps to 4 different keys:
// "AMAZON-MARKETPLACE-0" → any partition
// "AMAZON-MARKETPLACE-1" → any partition
// "AMAZON-MARKETPLACE-2" → any partition
// "AMAZON-MARKETPLACE-3" → any partition
// 40% of orders now spread across 4 partitions instead of 1
// Trade-off: within-merchant ordering is NOT guaranteed across buckets
// If order-level ordering matters within a merchant: use orderId as partition key instead// Fix 2: Custom Partitioner for known hot keys
@Component
public class LoadBalancedPartitioner implements Partitioner {
private static final Map<String, Integer> PARTITION_COUNT_FOR_HOT_MERCHANTS = Map.of(
"AMAZON-MARKETPLACE", 4, // Give Amazon 4 partition slots
"WALMART-MARKETPLACE", 2, // Give Walmart 2 partition slots
"EBAY-MARKETPLACE", 2 // Give eBay 2 partition slots
);
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
String merchantId = extractMerchantId(key.toString());
int totalPartitions = cluster.partitionCountForTopic(topic);
if (PARTITION_COUNT_FOR_HOT_MERCHANTS.containsKey(merchantId)) {
int slots = PARTITION_COUNT_FOR_HOT_MERCHANTS.get(merchantId);
// Distribute hot merchant across multiple partitions
int basePartition = Math.abs(merchantId.hashCode()) % totalPartitions;
int slot = Math.abs(key.toString().hashCode()) % slots;
return (basePartition + slot) % totalPartitions;
}
// Default: standard key-based hashing for all other merchants
return Math.abs(key.toString().hashCode()) % totalPartitions;
}
}Repartitioning Without Downtime
# If topic must be repartitioned (more partitions for better distribution):
# Step 1: Create new topic with more partitions
kafka-topics.sh --create --topic order-events-v2 --partitions 48
# Step 2: Run dual-write (producer publishes to both topics) for 24 hours
# Step 3: Start new consumer groups on order-events-v2
# Step 4: Validate new consumers are caught up and healthy
# Step 5: Stop old consumer groups (they're caught up to 0 lag)
# Step 6: Remove dual-write from producer
# Step 7: Delete old topic after retention period3. The Duplicate Payment Incident
Incident Description
A fintech company's payment processing system charged 12,000 customers twice during a Kafka broker rolling restart. Each affected customer was charged the correct amount twice. Total duplicate charges: $847,000.
Root Cause: The Double Delivery Chain
Timeline:
T+0:00 Kafka broker-1 (partition 3 leader) begins rolling restart
T+0:02 Broker-1 goes offline, partition 3 leader election begins
T+0:03 Consumer "payment-processor-instance-7" has partition 3
It polled 200 messages at T+0:01 (before broker restart)
It is mid-processing: 150 messages processed, 50 in progress
T+0:04 Partition 3 leader election completes: broker-2 becomes leader
T+0:04 Consumer-7 tries to commit offset for the 150 processed messages
Offset commit fails: leader has changed, consumer loses partition assignment
REBALANCE triggered
T+0:06 Consumer-8 takes over partition 3
Consumer-8 starts from last COMMITTED offset (150 messages ago)
Consumer-8 re-processes the 150 already-processed messages
T+0:06 + Consumer-7 was mid-processing 50 messages AND now Consumer-8 also processes them
For a brief window: concurrent processing of same messages by consumer-7 and 8
T+0:06 chargeCustomerCard() called TWICE for 200 payments
Root cause stack:
1. Offset commit failed during leader election
2. Non-idempotent consumer (no deduplication)
3. External payment processor (Stripe) does not deduplicate by Kafka message ID
4. No distributed lock preventing concurrent charge for same payment ID
Detection: After the Fact
Detected by: Fraud alert at T+2:30 (2.5 hours later)
Trigger: Anomaly detection flagged unusually high duplicate transactions
Alert: "Duplicate charge rate 3.2% (baseline: 0.01%)"
By then: 12,000 customers charged twice
Detection could have been immediate with:
- Real-time duplicate transaction monitoring at Stripe API level
- kafka_consumer_rebalance_total alert during broker restart
- Idempotency key collision counter in payment processor
Solution: Idempotent Payment Consumer with Distributed Lock
@Service
@Slf4j
public class IdempotentPaymentConsumer {
private final PaymentIdempotencyRepository idempotencyRepo;
private final StripePaymentGateway stripe;
private final RedissonClient redisson;
@KafkaListener(topics = "payment-commands", groupId = "payment-processor")
@Transactional
public void processPayment(
PaymentCommandEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment ack) {
String idempotencyKey = event.getPaymentId();
String lockKey = "payment-lock:" + idempotencyKey;
// Distributed lock: only ONE instance can process this paymentId at a time
// Prevents concurrent consumer-7 + consumer-8 race condition
RLock lock = redisson.getLock(lockKey);
try {
boolean locked = lock.tryLock(5, 30, TimeUnit.SECONDS);
if (!locked) {
log.warn("Could not acquire lock for paymentId={}, will retry", idempotencyKey);
// Do NOT ack - Kafka will redeliver
return;
}
// Check if already processed (handles redelivery after offset commit failure)
Optional<ProcessedPayment> existing = idempotencyRepo.findById(idempotencyKey);
if (existing.isPresent()) {
log.info("Duplicate payment command ignored: paymentId={}, " +
"originally processed at partition={}, offset={}",
idempotencyKey,
existing.get().getOriginalPartition(),
existing.get().getOriginalOffset());
ack.acknowledge(); // Ack so we don't keep reprocessing
return;
}
// Process (with idempotency key passed to Stripe)
ChargeResult result = stripe.charge(
StripeChargeRequest.builder()
.amount(event.getAmount())
.customerId(event.getCustomerId())
.idempotencyKey(idempotencyKey) // Stripe-level idempotency
.build()
);
// Record as processed BEFORE acking
idempotencyRepo.save(ProcessedPayment.builder()
.paymentId(idempotencyKey)
.stripeChargeId(result.getChargeId())
.originalPartition(partition)
.originalOffset(offset)
.processedAt(Instant.now())
.build());
ack.acknowledge();
log.info("Payment processed: paymentId={}, chargeId={}", idempotencyKey, result.getChargeId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Interrupted while waiting for lock: paymentId={}", idempotencyKey);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}Three Layers of Defense
Layer 1: Kafka Idempotent Producer
- enable.idempotence=true on producer
- Prevents broker from accepting duplicate messages from the same producer sequence
Layer 2: Consumer-Level Deduplication
- idempotencyRepo check at consumer entry point
- Distributed lock prevents concurrent processing
Layer 3: Payment Gateway Idempotency
- Stripe/Braintree idempotency key passed with every charge request
- Even if Layer 1 and 2 fail, Stripe will not double-charge
At least 2 layers must fail simultaneously for a duplicate charge to occur
4. Consumer Lag That Never Recovers
Incident Description
An analytics pipeline's Kafka consumer group accumulated 8 billion message lag over 3 weeks, growing steadily. Adding more consumers had diminishing returns. The team could not determine why.
Root Cause: Coordinated Slow Processing
Consumer group: analytics-pipeline
Topic: clickstream-events (24 partitions)
Consumer count: 24 (one per partition - maximally scaled)
Observation: Consumer lag growing 1 million messages/hour despite 24 consumers running
Throughput per consumer: ~4,000 messages/minute (normal is 20,000/minute)
Consumer processing time breakdown (profiled):
- Deserialize JSON: 2ms
- Enrich with user metadata (Redis call): 180ms ← BOTTLENECK
- Aggregate into time window: 5ms
- Write to ClickHouse: 15ms
Total: ~202ms per message = ~295 messages/minute per consumer
But Redis call was 5ms in dev/staging. Why 180ms in production?
Root cause: Redis connection pool exhausted
- 24 consumer instances x 1 thread x 1 Redis call per message
- Redis connection pool max size: 20 connections
- 24 consumers competing for 20 connections: 4 consumers always waiting ~160ms
- Average wait time = (24/20 - 1) × 5ms / (1/24) = ~160ms added to every call
- This math compounds: more consumers = more pool contention = slower per consumer
- Adding consumers actually MADE IT WORSE after 20 consumers
Detection and Diagnosis
# Step 1: Calculate actual consumer throughput
kafka-consumer-groups.sh --describe --group analytics-pipeline | \
awk 'NR>1 {print $1, $2, $6}' | \
awk '{lag[$2]+=$6} END {for(t in lag) print t, lag[t]}'
# Shows total lag per topic
# Step 2: Consumer processing time (application metrics)
# histogram_quantile(0.99, kafka_consumer_fetch_latency_seconds)
# This shows HOW LONG consumers spend processing, not just polling
# Step 3: Throughput trend
# rate(kafka_consumer_records_consumed_total[5m])
# If this is flat or declining as you add consumers: contention somewhere
# Step 4: Identify the bottleneck - thread dump of consumer
jstack <consumer-pid> | grep -A 20 "WAITING\|TIMED_WAITING"
# Shows threads blocked on Redis connection poolSolution: Batching + Async Enrichment + Connection Pool Right-Sizing
// Fix 1: Batch Redis calls (one pipeline call for 100 messages instead of 100 individual calls)
@KafkaListener(
topics = "clickstream-events",
containerFactory = "batchConsumerFactory" // Batch listener mode
)
public void handleBatch(List<ConsumerRecord<String, ClickEvent>> records, Acknowledgment ack) {
// Collect all user IDs from the batch
Set<String> userIds = records.stream()
.map(r -> r.value().getUserId())
.collect(toSet());
// One batched Redis call for all users in the batch
// Replaces N individual calls with 1 pipeline call
Map<String, UserMetadata> userMetadata = redisClient.pipeline(pipe -> {
userIds.forEach(id -> pipe.get("user:" + id));
});
// Process records with pre-fetched metadata
List<EnrichedClickEvent> enriched = records.stream()
.map(r -> enrich(r.value(), userMetadata.get(r.value().getUserId())))
.collect(toList());
clickhouseWriter.writeBatch(enriched);
ack.acknowledge();
}
// Fix 2: Right-size connection pool for the actual concurrency
@Bean
public RedisConnectionFactory redisConnectionFactory() {
LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
.clientResources(DefaultClientResources.builder()
.ioThreadPoolSize(8)
.computationThreadPoolSize(8)
.build())
.build();
// Pool size = number of consumer threads × 1.5 (to avoid starvation)
LettucePoolingClientConfiguration poolingConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(new GenericObjectPoolConfig<>()
.maxTotal(40) // 24 consumer threads × 1.5 = 36, rounded up
.maxIdle(24)
.minIdle(10)
.maxWaitMillis(2000))
.build();
return new LettuceConnectionFactory(redisStandaloneConfig, poolingConfig);
}
// Result: 180ms Redis call → 5ms average (batch eliminated per-message call overhead)
// Throughput: 295 msg/min → 18,000 msg/min per consumer (60x improvement)
// Lag recovery: 8 billion messages recovered in 18 hours instead of never5. The Schema Registry Cascade Failure
Incident Description
A Schema Registry outage of 12 minutes caused every producer in the system to stop publishing messages. 47 services stopped processing orders. The Schema Registry had a 1% SLA impact but caused 100% of ordering system unavailability.
Why One Service Knocked Out All Producers
Schema Registry interaction on EVERY produce call:
Producer.send(record):
1. Serialize record using Avro serializer
2. Avro serializer checks: "Is schema ID 1234 cached locally?"
- If YES (cached): skip registry call, use cached schema
- If NO (not cached): call Schema Registry GET /schemas/ids/1234
3. If Schema Registry is down: serialization THROWS exception
4. Exception propagates: producer.send() fails
5. Application code receives exception
6. Application either crashes or enters error loop
Default cache behavior:
- Cache size: up to 1000 schemas (Confluent default)
- Cache TTL: NO EXPIRATION (schemas cached until JVM restart)
Actual incident:
- Schema Registry pod restarted due to OOM kill
- Took 12 minutes to restart (startup probe + GC warm-up)
- All producers had started within the past 6 hours (fresh deployment)
- Cache was EMPTY (first call after deployment always hits registry)
- Every first message in those 12 minutes: Schema Registry call → failure
- Producers fell into retry loops
- Retry exhausted → circuit breaker opened → producers stopped
- 47 services stopped publishing in < 30 seconds
Detection and Recovery
Alert fired at: T+2 minutes (Prometheus alert on schema_registry_health == 0)
Incident declared at: T+4 minutes
Root cause identified at: T+9 minutes
Schema Registry restarted: T+12 minutes
Producers recovered: T+15 minutes (schema cache re-populated)
Post-mortem finding: Schema Registry was a single point of failure
Solution: Schema Registry High Availability and Graceful Degradation
// Fix 1: Schema Registry cluster (multi-instance, multi-AZ)
# docker-compose / Kubernetes - NOT a single instance
schema-registry-1:
image: confluentinc/cp-schema-registry:7.4.0
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_HOST_NAME: schema-registry-1
deploy:
replicas: 3 # 3 instances
placement:
constraints:
- node.labels.az != same # Each in different AZ
# Load balancer in front of schema-registry instances
# A single registry instance failing does not cause service interruption
// Fix 2: Producer-side graceful degradation
@Configuration
public class ResilientAvroProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
// Schema Registry with timeout and fallback
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://schema-registry-1:8081,http://schema-registry-2:8081,http://schema-registry-3:8081");
// Aggressive local caching - reduces Registry calls to near-zero at steady state
config.put(AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_CONFIG, 100);
// Use latest cached schema if Registry temporarily unavailable
config.put("schema.registry.request.timeout.ms", "3000");
// Pre-warm cache at startup (not lazily)
config.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
return new DefaultKafkaProducerFactory<>(config);
}
}
// Fix 3: Startup schema pre-loading
@Component
public class SchemaRegistryCacheWarmer implements ApplicationRunner {
private final SchemaRegistryClient schemaRegistryClient;
private final List<String> criticalSchemaSubjects;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("Pre-loading schemas from registry...");
for (String subject : criticalSchemaSubjects) {
try {
// Force-populate local cache
schemaRegistryClient.getLatestSchemaMetadata(subject);
log.info("Loaded schema: {}", subject);
} catch (Exception e) {
// Log but do not fail startup - use locally bundled fallback schema
log.warn("Could not load schema {} from registry, using bundled fallback", subject);
schemaRegistryClient.register(subject, bundledSchema(subject));
}
}
}
}6. Cross-Datacenter Message Ordering Violations
Incident Description
A financial services company ran Kafka MirrorMaker 2 to replicate topics from US-East to EU-West for regulatory data residency. Users in the EU region started receiving incorrect account balances. Debits appeared before credits in the replicated stream.
Root Cause: Replication Lag Differential
US-East Kafka (primary):
Partition 3:
Offset 1000: CreditEvent(accountId=ACC-123, amount=+500) at T+0:00:00
Offset 1001: DebitEvent(accountId=ACC-123, amount=-200) at T+0:00:01
MirrorMaker 2 replication to EU-West:
Replication throughput: 80MB/s (under load)
Credit event (offset 1000): 800 bytes, replicated at T+0:00:05
Debit event (offset 1001): 2100 bytes (larger payload), replicated at T+0:00:12
EU-West Consumer (processes as messages arrive):
T+0:00:05: Processes Credit(+500) → balance = 500
T+0:00:06: DebitEvent NOT YET REPLICATED
T+0:00:06: New balance shown to EU customer: 500 ✓
...
T+0:00:12: Processes Debit(-200) → balance = 300
T+0:00:12: Balance updated: 300 ✓
Seems fine? Not always:
What if Consumer polls between T+0:00:05 and T+0:00:12 AND
what if the Debit was actually published FIRST (replication reorder)?
MirrorMaker 2 uses multiple worker threads for throughput
Thread 1 replicates small messages faster
Thread 2 replicates large messages slower
If Credit(small) is replicated by Thread 1 and Debit(large) by Thread 2:
Thread 2 lags behind Thread 1
EU-West sees: Debit before Credit (reordering!)
For account balance = 0 (starting):
Wrong order: Debit(-200) first → balance = -200 (rejected: insufficient funds!)
Then: Credit(+500) → balance = +300
Customer's initial transaction was REJECTED despite having incoming funds
Solution: Timestamp-Ordered Consumption + Source Offset Preservation
// Fix 1: Ensure MirrorMaker 2 preserves source ordering
# MirrorMaker 2 configuration
clusters = source, target
source.bootstrap.servers = us-east-kafka:9092
target.bootstrap.servers = eu-west-kafka:9092
# CRITICAL: Use same number of replication tasks as source partitions
# One task per partition ensures in-order replication within partitions
source->target.tasks.max = 12 # Equal to partition count
# Fix 2: EU-West consumers use source-timestamp ordering, not arrival order
@Service
public class FinancialEventConsumer {
private final FinancialEventBuffer eventBuffer;
@KafkaListener(topics = "account-events.mirror", groupId = "eu-balance-processor")
public void handle(AccountEvent event,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long kafkaTimestamp) {
// Use source timestamp (from US-East), NOT EU Kafka arrival timestamp
long sourceTimestamp = event.getSourceTimestamp(); // Embedded in event payload
// Buffer events with out-of-order tolerance window (500ms)
eventBuffer.add(event, sourceTimestamp);
}
}
@Component
public class FinancialEventBuffer {
private final PriorityQueue<BufferedEvent> buffer =
new PriorityQueue<>(Comparator.comparingLong(BufferedEvent::getSourceTimestamp));
private final Duration OUT_OF_ORDER_TOLERANCE = Duration.ofMillis(500);
private volatile Instant lastProcessed = Instant.now();
public void add(AccountEvent event, long sourceTimestamp) {
buffer.offer(new BufferedEvent(event, sourceTimestamp));
drainSafeEvents();
}
private void drainSafeEvents() {
long nowMs = Instant.now().toEpochMilli();
while (!buffer.isEmpty()) {
BufferedEvent head = buffer.peek();
// Only process events that are "old enough" to be considered ordered
// Any events timestamped within the last 500ms might still be followed by
// earlier-timestamped events still in transit from US-East
if (nowMs - head.getSourceTimestamp() > OUT_OF_ORDER_TOLERANCE.toMillis()) {
buffer.poll();
processEvent(head.getEvent());
} else {
break; // Head of queue is too recent - wait for more events
}
}
}
}7. The Broker Disk Full Outage
Incident Description
A production Kafka cluster's disk filled to 100% at 3am on a Sunday. Brokers stopped accepting new messages, throwing RecordTooLargeException and eventually TimeoutException. All downstream services that published to Kafka failed. The on-call engineer discovered this when SLA alerts fired 45 minutes into the outage.
Root Cause Chain
Root cause 1: A new analytics topic created without retention limit
- Team creating topic: "We'll set retention later" (they never did)
- Default retention: none specified = server default
- Server default (incorrectly configured): retention.bytes=-1, retention.ms=-1
- This means: KEEP FOREVER, NO SIZE LIMIT
Root cause 2: Topic producing at 100MB/minute
- 100MB/min × 60 min × 24 hours × 7 days = 1TB per week
- Kafka cluster disk: 2TB (3 brokers × 2 replicas = 6TB total, ~2TB usable per topic)
- With 2 weeks of uncontrolled retention: disk full
Root cause 3: No disk utilization alert configured
- Memory alert: ✓ 80% threshold
- CPU alert: ✓ 70% threshold
- Disk alert: MISSING
Immediate Mitigation Procedure
# EMERGENCY: Kafka brokers rejecting all new messages
# Step 1: Identify which topics are consuming most disk
kafka-log-dirs.sh --bootstrap-server broker:9092 --describe \
| grep -v '^{' | sort -k3 -rn | head -20
# Shows topics sorted by disk usage
# Step 2: Emergency retention reduction on the runaway topic
kafka-configs.sh --alter \
--entity-type topics \
--entity-name analytics-raw-events \
--add-config retention.ms=3600000 # Drop to 1 hour IMMEDIATELY
# Kafka's log cleaner will start deleting old segments within minutes
# Step 3: Monitor disk recovery
watch -n 5 'df -h /var/kafka-logs'
# Should see disk usage declining as log cleaner works
# Step 4: Once disk below 80%, restore reasonable retention
kafka-configs.sh --alter \
--entity-type topics \
--entity-name analytics-raw-events \
--add-config retention.ms=86400000 # 24 hours (appropriate for analytics)
--add-config retention.bytes=52428800000 # 50GB cap regardless of time
# Step 5: Verify producers can send again (test message)
kafka-console-producer.sh --broker-list broker:9092 \
--topic analytics-raw-events <<< "test"Prevention: Infrastructure as Code + Policy Enforcement
// Enforce retention policy at topic creation time
@Component
public class TopicCreationEnforcer {
private static final Map<String, RetentionPolicy> POLICIES = Map.of(
"financial", RetentionPolicy.of(Duration.ofDays(90), 100_000_000_000L), // 90 days, 100GB
"order", RetentionPolicy.of(Duration.ofDays(30), 50_000_000_000L), // 30 days, 50GB
"analytics", RetentionPolicy.of(Duration.ofDays(7), 20_000_000_000L), // 7 days, 20GB
"audit", RetentionPolicy.of(Duration.ofDays(730), Long.MAX_VALUE), // 2 years, unlimited
"default", RetentionPolicy.of(Duration.ofDays(3), 10_000_000_000L) // 3 days, 10GB
);
@EventListener
public void onTopicCreation(TopicCreationRequestedEvent event) {
String topicCategory = inferCategory(event.getTopicName());
RetentionPolicy policy = POLICIES.getOrDefault(topicCategory, POLICIES.get("default"));
// BLOCK topic creation without retention policy
if (!event.hasRetentionPolicy()) {
throw new TopicPolicyViolationException(
"Topic " + event.getTopicName() + " must specify retention. " +
"Recommended policy for '" + topicCategory + "': " + policy
);
}
// Warn if retention exceeds policy
if (event.getRetentionMs() > policy.getMaxRetentionMs()) {
alertingService.sendWarning(
"Topic " + event.getTopicName() + " requests " +
event.getRetentionDays() + " day retention. " +
"Policy max for '" + topicCategory + "' is " + policy.getMaxDays() + " days."
);
}
}
}# Disk utilization alert - add to monitoring
groups:
- name: kafka-disk-alerts
rules:
- alert: KafkaBrokerDiskUsageHigh
expr: (node_filesystem_size_bytes{mountpoint="/var/kafka-logs"}
- node_filesystem_avail_bytes{mountpoint="/var/kafka-logs"})
/ node_filesystem_size_bytes{mountpoint="/var/kafka-logs"} > 0.70
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka broker disk {{ $labels.instance }} at {{ $value | humanizePercentage }}"
- alert: KafkaBrokerDiskUsageCritical
expr: ... > 0.85
labels:
severity: critical
annotations:
summary: "CRITICAL: Kafka broker disk nearly full - broker will stop accepting messages at 100%"8. Silent Data Loss from Broker Leader Election
Incident Description
During a planned Kafka broker rolling upgrade, an engineering team lost 3 minutes of order events. The events were published by producers, received acks=1 success responses, but were never delivered to consumers. The lost messages were never recoverable.
Root Cause: acks=1 + Unclean Leader Election
Configuration used (a common but dangerous default):
acks = 1 (leader-only acknowledgment)
min.insync.replicas = 1 (default, no enforcement)
unclean.leader.election.enable = true (dangerous default in older Kafka versions)
Timeline of the incident:
T+0:00 Broker-1 is partition leader (5 replicas: Broker-1, 2, 3)
T+0:00 Replication status: Broker-1 = leader, Broker-2 = in-sync, Broker-3 = lagging (2s behind)
T+0:01 Rolling upgrade begins: Broker-1 taken offline
T+0:01 Leader election needed: choose between Broker-2 (in-sync) and Broker-3 (lagging)
T+0:01 Broker-2 is available: elected as new leader ✓
But this is not what happened. There was a network partition:
T+0:00 Broker-2 loses network to ZooKeeper (briefly - network hiccup)
T+0:00 Broker-2 removed from ISR (not in-sync anymore)
T+0:00 Broker-3 becomes the only available replica (but 2 seconds behind)
T+0:01 Broker-1 goes offline for upgrade
T+0:01 Leader election: only Broker-3 available (not in ISR)
T+0:01 unclean.leader.election.enable=true → Broker-3 elected despite being out of ISR
T+0:01 Broker-3 becomes leader but is 2 SECONDS of messages behind Broker-1
The messages that Broker-1 received between T-2s and T+0:01:
- Were ack'd to producers with acks=1 (producers got success)
- Were stored on Broker-1 but NOT replicated to Broker-3
- Are NOW GONE because Broker-1 went offline
- Broker-3 has no knowledge of these messages
Result: 2 seconds × (order volume during that window) = ~3,000 orders lost
Orders are in DB (placed), but no downstream events fired (warehouse, email, payment)
Production-Safe Configuration
# NEVER use these settings for critical data:
# acks=1 (leader only - loses data on leader failure before replication)
# unclean.leader.election.enable=true (elects out-of-sync replicas, losing messages)
# ALWAYS use for critical data:
# Producer configuration
acks = all # Wait for ALL in-sync replicas to acknowledge
enable.idempotence = true # Prevent duplicates from producer retries
max.in.flight.requests.per.connection = 1 # Strict ordering with idempotence
# Broker configuration (set globally and per-topic)
default.replication.factor = 3
min.insync.replicas = 2 # At least 2 replicas must acknowledge before success
unclean.leader.election.enable = false # NEVER elect out-of-sync replica as leader
# Topic-level override for financial topics
kafka-configs.sh --alter \
--entity-type topics \
--entity-name order-events \
--add-config min.insync.replicas=2 \
--add-config unclean.leader.election.enable=falseThe Trade-Off Explained
acks=1 + unclean.leader.election=true:
Pros: Higher throughput (faster acknowledgment), more availability (any replica elected)
Cons: Data loss possible on broker failure (exactly what happened)
Use for: Metrics, telemetry, analytics - where losing a message is acceptable
acks=all + unclean.leader.election=false + min.insync.replicas=2:
Pros: Strong durability guarantee - messages survive leader failures
Cons: Slightly lower throughput (waits for 2 replicas), reduced availability
(write fails if < 2 replicas available - but this is the RIGHT behavior)
Use for: Financial data, orders, user actions, anything you cannot afford to lose
Think of it this way:
If the broker crashes and you lose messages:
- With acks=1: you return success to the producer before the message is safe
- With acks=all: you only return success after the message cannot be lost
The availability vs durability trade-off is real - choose based on your data's value
9. The Offset Commit Race Condition
Incident Description
A consumer application that processed payments was found to have a race condition where, under high load, offsets were committed before all in-flight messages completed processing. Some payments were processed, but their offsets committed, causing them to be lost if the consumer restarted.
Root Cause: Async Processing + Auto-Commit
// BROKEN: The actual production code that caused the incident
@KafkaListener(topics = "payment-events")
public void handlePayments(List<PaymentEvent> events) {
// Process each event asynchronously for throughput
List<CompletableFuture<Void>> futures = events.stream()
.map(event -> CompletableFuture.runAsync(
() -> paymentService.processPayment(event),
processingExecutor
))
.collect(toList());
// WRONG: This does NOT wait for all futures to complete
// The @KafkaListener method returns immediately
// Spring Kafka sees the method returned without exception
// If using auto-commit or BATCH ack mode: offsets committed NOW
// The async tasks are still running in processingExecutor
// If the application restarts in the next 500ms:
// - Offsets were committed (consumer thinks these are done)
// - But 30% of async tasks haven't completed yet
// - Those payments are LOST (offset committed, won't be redelivered)
}The Fix: Synchronize Async Processing with Offset Commit
// CORRECT: Batch listener with explicit acknowledgment after all async work completes
@KafkaListener(
topics = "payment-events",
containerFactory = "batchManualAckFactory"
)
public void handlePayments(
List<ConsumerRecord<String, PaymentEvent>> records,
Acknowledgment ack) {
// Launch all async tasks
List<CompletableFuture<PaymentResult>> futures = records.stream()
.map(record -> CompletableFuture.supplyAsync(
() -> paymentService.processPayment(record.value()),
processingExecutor
))
.collect(toList());
// Wait for ALL futures to complete before acknowledging
CompletableFuture<Void> allDone = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
try {
// CRITICAL: Block until ALL async processing is done
// Only then: commit the offset
allDone.get(60, TimeUnit.SECONDS);
ack.acknowledge(); // Safe to commit now - all processing complete
} catch (TimeoutException e) {
log.error("Batch processing timed out - NOT committing offset, will retry");
// Do NOT ack - Kafka redelivers the entire batch
// Some might be processed twice - must be idempotent
} catch (ExecutionException e) {
log.error("Batch processing failed - checking individual results");
// Check which futures succeeded and which failed
handlePartialBatchFailure(records, futures, ack);
}
}
private void handlePartialBatchFailure(
List<ConsumerRecord<String, PaymentEvent>> records,
List<CompletableFuture<PaymentResult>> futures,
Acknowledgment ack) {
// Find the first failed record
for (int i = 0; i < futures.size(); i++) {
if (futures.get(i).isCompletedExceptionally()) {
// Commit offsets only up to the failed record
// Records after the failed one will be redelivered
if (i > 0) {
ack.nack(i - 1, Duration.ofSeconds(1)); // Commit up to i-1
}
return;
}
}
}10. Thundering Herd After Planned Maintenance
Incident Description
After a 20-minute scheduled maintenance window on a high-traffic notification service's Kafka brokers, all consumers came online simultaneously and began processing 20 minutes of backlogged messages. The downstream notification service (SendGrid) received 40x its normal message rate, hit rate limits, and caused cascading failures including the main application losing its SendGrid connection pool.
Root Cause and Cascading Failure Chain
Normal throughput: 5,000 notification events/minute
Maintenance duration: 20 minutes
Messages accumulated: 100,000 messages
Post-maintenance sequence:
T+0:00 All 12 consumer instances restart simultaneously
T+0:05 All 12 consumers polling full speed
T+0:05 Processing rate: 12 consumers × 1000 msg/min = 12,000/min (2.4x normal)
But they have 100,000 backlogged messages - processing at max speed
T+0:05 SendGrid API: receives 40,000 emails/minute (normal: 5,000)
T+0:06 SendGrid returns HTTP 429 (rate limited)
T+0:06 Consumer retry logic kicks in: retry 3 times with 1-second backoff
T+0:07 SendGrid HTTP connection pool on consumers exhausted (all connections in retry state)
T+0:07 New email requests from live traffic: "Connection pool timeout"
T+0:08 Live user notification processing STOPS (pool exhausted by retries)
T+0:15 Consumer lag grows DESPITE maintenance window being over
More messages arriving than being processed
T+0:20 Full outage: notification service unavailable
Original cause: planned maintenance
Actual impact: 30-minute full notification outage
Solution: Controlled Consumer Ramp-Up
// Consumer with startup rate limiting
@Component
public class ThrottledNotificationConsumer {
private static final int NORMAL_RATE_PER_SECOND = 83; // 5000/minute ÷ 60
private static final int MAX_SENDGRID_RATE = 150; // Never exceed this
private final AtomicBoolean rampUpComplete = new AtomicBoolean(false);
private volatile RateLimiter currentRateLimiter;
@PostConstruct
public void initRampUp() {
// Start at 20% of normal rate, ramp up to 120% over 5 minutes
currentRateLimiter = RateLimiter.create(NORMAL_RATE_PER_SECOND * 0.20);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(
this::increaseRate,
30, // Start increasing after 30 seconds
30, // Increase every 30 seconds
TimeUnit.SECONDS
);
}
private void increaseRate() {
double currentRate = currentRateLimiter.getRate();
double targetRate = Math.min(NORMAL_RATE_PER_SECOND * 1.2, MAX_SENDGRID_RATE);
if (currentRate < targetRate) {
double newRate = Math.min(currentRate * 1.5, targetRate);
currentRateLimiter = RateLimiter.create(newRate);
log.info("Consumer ramp-up: rate increased to {}/second", newRate);
if (newRate >= targetRate) {
rampUpComplete.set(true);
log.info("Consumer ramp-up complete: steady state rate {}/second", newRate);
}
}
}
@KafkaListener(topics = "notification-commands", groupId = "notification-processor")
public void handleNotification(NotificationCommand command, Acknowledgment ack) {
// Acquire rate limit token before processing
// This naturally throttles the consumer during ramp-up
currentRateLimiter.acquire();
try {
notificationSender.send(command);
ack.acknowledge();
} catch (SendGridRateLimitException e) {
// Even with our rate limiter, hit SendGrid limit
// Back off and do not ack (redelivery after visibility timeout)
log.warn("SendGrid rate limit hit, reducing consumer rate");
currentRateLimiter = RateLimiter.create(currentRateLimiter.getRate() * 0.5);
}
}
}11. The Ghost Consumer - Zombie Consumer Group
Incident Description
A decommissioned service's consumer group was never cleaned up. 8 months later, a new service was deployed that happened to use the same consumer group name. On first startup, it consumed 8 months of messages from offset 0 (where the old service left off) rather than from the current position.
The Zombie Consumer Group
Consumer group "analytics-processor" history:
March 2025: Service "analytics-v1" created, uses group "analytics-processor"
July 2025: analytics-v1 decommissioned
July 2025: Consumer group "analytics-processor" left in Kafka
Committed offset: 28,000,000 (last position when service was alive)
Topic had 250,000,000 messages at time of decommission
November 2025: analytics-v2 created
Developer assumes offset 0 = "start from beginning"
Developer creates consumer with group "analytics-processor" (same name, reused)
Developer expects: "It'll start from the latest offset"
Actual behavior:
Kafka finds existing group "analytics-processor" with committed offset 28,000,000
New consumer starts from offset 28,000,000 (8 months ago)
Topic has grown to 400,000,000 messages
Consumer begins processing 372,000,000 messages from July 2025
Impact:
- 372 million stale events processed
- Analytics dashboards show data from July 2025 as "current"
- New event stream mixed with 8-month-old events
- $12,000 in BigQuery write costs processing stale data
- 6 hours to identify and stop the zombie consumption
Prevention and Cleanup Procedures
# Step 1: List all consumer groups
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list
# Step 2: Identify zombie (inactive) groups
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --all-groups --describe \
| awk '$6 <mark class="obsidian-highlight"> "-" || $6 </mark> "0"' | grep -v "^GROUP"
# Groups with STATE=Empty or Dead and no active members are zombies
# Step 3: Archive the group's offsets before deletion
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group analytics-processor > /archive/analytics-processor-offsets.txt
# Step 4: Delete zombie consumer groups
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--delete --group analytics-processor
# Prevention: Consumer group naming convention
# Format: <service>.<environment>.<version>
# analytics-v1.prod.2025-03 (includes version and date)
# analytics-v2.prod.2025-11 (new service = new name, can never conflict)
// Prevention: Code-level safeguard
@Configuration
public class ConsumerGroupValidation {
@Value("${spring.application.name}")
private String serviceName;
@Value("${spring.application.version}")
private String version;
@Value("${spring.profiles.active}")
private String environment;
@Bean
public String consumerGroupId() {
// Auto-generate consumer group from service identity
// Cannot reuse old group by accident
return String.format("%s.%s.%s", serviceName, environment, version);
}
}12. Certificate Expiry Killing All Producers
Incident Description
At 2:14am on a Monday, all 38 services that produced to Kafka stopped publishing simultaneously. The cause: a wildcard TLS certificate used for Kafka broker authentication expired. No alerts had been configured for certificate expiry. By the time the on-call team woke up, the outage had lasted 47 minutes.
Impact Chain
T+0:00 Kafka broker TLS certificate expires
T+0:00 All producer connections: SSL handshake fails
javax.net.ssl.SSLHandshakeException: certificate_expired
T+0:01 Kafka producers enter reconnection backoff loop
T+0:05 Reconnection backoff maxes out at 10 seconds
T+0:05 New connection attempts every 10 seconds, all failing
T+0:05 Producer local buffers start filling up (messages not being sent)
T+0:15 Producer send buffer full (32MB default)
T+0:15 kafkaTemplate.send() starts throwing: BufferExhaustedException
T+0:15 Application-level error handling: varies by service
- Some: restart and re-attempt (cycling, not helping)
- Some: drop messages silently (fire-and-forget paths)
- Some: propagate exception to caller (HTTP 500 cascade)
T+0:47 On-call engineer wakes up, diagnoses, rotates certificate
T+1:30 All services reconnected, processing backlog
T+2:00 Some services that dropped messages in outbox: ok
T+2:00 Some services that dropped messages silently: data loss
Prevention: Certificate Lifecycle Management
# Alert 30 days before expiry
# Using Prometheus blackbox exporter for certificate monitoring
- job_name: 'kafka-cert-check'
metrics_path: /probe
params:
module: [tcp_connect]
static_configs:
- targets:
- kafka-broker-1:9093
- kafka-broker-2:9093
- kafka-broker-3:9093
relabel_configs:
- source_labels: [__address__]
target_label: __param_target
- source_labels: [__param_target]
target_label: instance
- target_label: __address__
replacement: blackbox-exporter:9115
# Alert rule
- alert: KafkaCertificateExpiringSoon
expr: probe_ssl_earliest_cert_expiry - time() < 30 * 24 * 3600
labels:
severity: warning
annotations:
summary: "Kafka TLS certificate expires in {{ $value | humanizeDuration }}"
- alert: KafkaCertificateExpiryCritical
expr: probe_ssl_earliest_cert_expiry - time() < 7 * 24 * 3600
labels:
severity: critical
annotations:
summary: "CRITICAL: Kafka TLS certificate expires in {{ $value | humanizeDuration }}"// Producer-side graceful handling of certificate expiry
@Component
public class ResilientKafkaProducer {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
private volatile KafkaProducer<String, byte[]> producer;
public void publishWithFallback(String topic, String key, Object event) {
try {
producer.send(new ProducerRecord<>(topic, key, serialize(event)));
} catch (AuthenticationException | SSLException e) {
// Certificate-related failure - save to outbox for later
log.error("Kafka auth failure (possible cert expiry): {}", e.getMessage());
outboxRepository.save(OutboxEntry.builder()
.topic(topic)
.key(key)
.payload(serialize(event))
.failureReason("KAFKA_AUTH_FAILURE")
.createdAt(Instant.now())
.build());
// Alert immediately - this needs human intervention
alertingService.sendCritical("Kafka authentication failure - possible certificate expiry");
}
}
}13. Message Ordering Violation in Financial State Machine
Incident Description
A loan origination system processed loan state transitions via Kafka events. Due to a consumer deployment that briefly ran two versions simultaneously, some loans entered invalid states: receiving "Disbursed" events before receiving "Approved" events.
State Machine Violation
Valid loan state machine:
APPLIED → CREDIT_CHECKED → APPROVED → DISBURSED → REPAYING → COMPLETED
↘ REJECTED
What happened:
Consumer v1 (old) processing Partition 5:
Offset 1000: LoanApproved(loanId=L-123) → state: APPROVED ✓
Offset 1001: LoanDisbursed(loanId=L-123) → state: DISBURSED ✓
During rolling deployment (both v1 and v2 running simultaneously):
v1 consumer commits offset up to 999 on Partition 5
v2 consumer takes over Partition 5 during rebalance
v2 starts from offset 1000 (LoanApproved - same as v1 was processing)
Race condition during rebalance:
v1 processes LoanApproved(L-123) → state saved as APPROVED
REBALANCE OCCURS
v2 assigned Partition 5, starts from offset 1000 (re-delivered)
But v2 has a bug: it processes LoanDisbursed before re-processing LoanApproved
Because v2 uses an async event handler that does not guarantee order within a batch
v2 receives batch: [LoanApproved(1000), LoanDisbursed(1001)]
v2 async handler processes LoanDisbursed FIRST (faster path)
LoanDisbursed arrives at state machine: current state is CREDIT_CHECKED
CREDIT_CHECKED → DISBURSED is INVALID
State machine guard: "Cannot disburse a loan that is not approved"
Exception thrown → event sent to DLQ
LoanApproved arrives: state still CREDIT_CHECKED → APPROVED ✓
LoanDisbursed is now in DLQ
Loan is stuck in APPROVED state permanently
Customer received loan funds but the system shows APPROVED, not DISBURSED
Loan management dashboard shows incorrect state
Collections team gets confused - no repayment schedule created
Solution: Ordered State Machine with Version-Locked Transitions
// Financial state machine with strict ordering enforcement
@Service
public class LoanStateMachineConsumer {
@KafkaListener(
topics = "loan-events",
groupId = "loan-state-machine",
containerFactory = "orderedConsumerFactory" // BATCH mode, manual ack
)
public void processLoanEvents(
List<ConsumerRecord<String, LoanEvent>> records,
Acknowledgment ack) {
// Process sequentially - NEVER async within a batch
// Ordering within a partition is guaranteed by Kafka
// Sequential processing ensures we respect that ordering
for (ConsumerRecord<String, LoanEvent> record : records) {
try {
processInOrder(record.value(), record.offset());
} catch (InvalidStateTransitionException e) {
// State transition guard violated - this is a serious consistency error
log.error("CRITICAL: Invalid state transition loan={}, event={}, currentState={}",
record.value().getLoanId(),
record.value().getType(),
e.getCurrentState());
// Move to investigation DLQ with full context
Map<String, String> headers = Map.of(
"violation-type", "INVALID_STATE_TRANSITION",
"current-state", e.getCurrentState(),
"attempted-transition", record.value().getType().name(),
"offset", String.valueOf(record.offset()),
"partition", String.valueOf(record.partition())
);
dlqPublisher.sendWithHeaders("loan-events.state-violations", record, headers);
// Continue processing other loans - do not block on one invalid transition
}
}
ack.acknowledge();
}
@Transactional
private void processInOrder(LoanEvent event, long offset) {
Loan loan = loanRepository.findById(event.getLoanId())
.orElseThrow(() -> new LoanNotFoundException(event.getLoanId()));
// Version-based idempotency: if we've already processed this or a later event, skip
if (loan.getLastProcessedOffset() >= offset) {
log.info("Already processed offset {} for loan {}, skipping", offset, event.getLoanId());
return;
}
// Validate transition BEFORE applying
stateMachine.validateTransition(loan.getStatus(), event.getType());
// Apply transition and record offset
loan.apply(event);
loan.setLastProcessedOffset(offset);
loanRepository.save(loan);
}
}14. Backpressure Cascade into Full System Outage
Incident Description
An order management system's database became slow due to an unrelated index corruption. This caused consumers to process slowly, which filled the broker queues, which caused producers to block, which eventually cascaded into a full system outage affecting APIs that had nothing to do with the original slow database.
The Cascade
T+0:00 Database index corruption: query time increases from 5ms to 500ms
T+0:01 Order consumer processing time: 5ms → 500ms per message
T+0:05 Consumer processing rate: 1000/sec → 10/sec (100x slower)
T+0:05 Consumer lag starts building: 990 messages/sec accumulating
T+0:20 Consumer lag: 1,000,000 messages
T+0:20 Memory pressure on Kafka brokers from unacked in-flight messages
T+0:30 Kafka producer buffer (32MB) full for order-service
T+0:30 kafkaTemplate.send("order-events") blocks producer threads
T+0:30 Order service HTTP thread pool: threads blocked on kafkaTemplate.send()
T+0:35 Order service HTTP thread pool exhausted: new HTTP requests return 503
T+0:35 API gateway returns 503 for ALL order endpoints
T+0:40 Circuit breaker on API gateway opens for order-service
T+0:40 ALL order creation attempts fail with 503 - including checkout endpoint
T+0:40 FULL OUTAGE on checkout for entire website
Original cause: database index corruption
Direct cause: producer buffer full
Cascaded to: complete e-commerce checkout unavailability
Solution: Backpressure Isolation with Circuit Breaker at Producer Level
// Fix 1: Producer-level circuit breaker + async fallback
@Service
public class BackpressureAwareOrderPublisher {
private final CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("kafka-order-publisher");
private final Bulkhead bulkhead = Bulkhead.of("kafka-publish",
BulkheadConfig.custom().maxConcurrentCalls(50).build());
public void publishOrderEvent(OrderEvent event) {
try {
// Bulkhead prevents thread pool exhaustion on the CALLER side
// Max 50 concurrent publish attempts
Bulkhead.decorateRunnable(bulkhead, () ->
CircuitBreaker.decorateRunnable(circuitBreaker, () ->
kafkaTemplate.send("order-events", event.getOrderId(), event)
.get(2, TimeUnit.SECONDS) // Never block longer than 2s
).run()
).run();
} catch (BulkheadFullException e) {
// 50 threads already waiting - Kafka is backed up
// Save to outbox immediately, don't queue more
log.warn("Kafka publish bulkhead full - routing to outbox");
outboxRepository.save(OutboxEntry.from(event));
} catch (CallNotPermittedException e) {
// Circuit is open - Kafka unavailable
// Save to outbox, circuit will close when Kafka recovers
log.warn("Kafka circuit open - routing to outbox");
outboxRepository.save(OutboxEntry.from(event));
} catch (TimeoutException e) {
// Individual send timeout - slow consumer (backpressure signal)
// Route to outbox and open circuit if this is frequent
circuitBreaker.onError(2, TimeUnit.SECONDS, e);
outboxRepository.save(OutboxEntry.from(event));
}
}
}
// Fix 2: Consumer circuit breaker - slow down consumption when DB is slow
// This signals backpressure upstream rather than building up retry storms
@KafkaListener(topics = "order-events")
public void processOrder(OrderEvent event, Acknowledgment ack) {
try {
CircuitBreaker.decorateRunnable(dbCircuitBreaker, () -> {
orderRepository.save(Order.from(event)); // The slow DB call
}).run();
ack.acknowledge();
} catch (CallNotPermittedException e) {
// DB circuit is open - don't ack, let message be redelivered
// This naturally reduces consumer throughput when DB is unavailable
// Consumer lag builds up in Kafka (which has infinite buffer) rather than
// cascading into producer thread pool exhaustion
log.warn("DB circuit open - not processing, will retry when DB recovers");
Thread.sleep(5000); // Back off before next poll
}
}15. The Multi-Region Active-Active Split-Brain
Incident Description
A global SaaS company ran Kafka in active-active mode across US-East and EU-West. A transient network partition between regions caused both regions to independently process the same user actions, resulting in conflicting state in the user profile database that was irreconcilable without manual intervention.
The Split-Brain Scenario
Normal operation:
User action in US → US Kafka → US consumer → updates US DB + replicates event to EU
User action in EU → EU Kafka → EU consumer → updates EU DB + replicates event to US
Network partition at T+0:00 (US ↔ EU unreachable for 8 minutes):
T+0:01 US user updates profile: email changed to new-us@email.com
US Kafka: processes event, updates US DB
Replication to EU: FAILS (network partition)
T+0:03 EU user (same account, same user, opened app on different device in EU)
Updates profile: email changed to new-eu@email.com
EU Kafka: processes event, updates EU DB
Replication to US: FAILS (network partition)
T+0:08 Network partition heals
T+0:08 MirrorMaker replication resumes
T+0:08 US sees EU's "email=new-eu@email.com" event
US DB currently has "email=new-us@email.com"
CONFLICT: Which one wins?
Last-write-wins? Both wrote at T+0:01 and T+0:03 - US was first by 2 seconds
US timestamp: T+0:01 (earlier)
EU timestamp: T+0:03 (later)
Last-write-wins = EU event wins
US DB updated to new-eu@email.com
T+0:08 EU sees US's "email=new-us@email.com" event
EU DB currently has "email=new-eu@email.com"
Last-write-wins = US event (T+0:01) loses to EU event (T+0:03)
EU DB keeps new-eu@email.com
Final state:
US DB: new-eu@email.com (LWW resolution)
EU DB: new-eu@email.com (LWW resolution)
Outcome: user's US change (new-us) is LOST
User: "I updated my email in the US app, but it reverted to something I typed in EU"
In 8 minutes of partition: 1,247 conflicting writes
Manual reconciliation: 6 person-days
Solution: Conflict Resolution Strategy Selection
// Active-active is fundamentally hard. Choose your strategy based on data type:
// Strategy 1: Last Write Wins (LWW) with vector clocks (not wall clock)
// Use for: User preferences, profile data that users understand can conflict
@Service
public class ProfileConflictResolver {
public UserProfile resolve(UserProfile local, UserProfile remote) {
// Use vector clocks, not wall-clock time
// Vector clock comparison is causally correct; wall clock is not
VectorClock localClock = local.getVectorClock();
VectorClock remoteClock = remote.getVectorClock();
if (localClock.happensBefore(remoteClock)) {
return remote; // Remote is causally after local - remote wins
} else if (remoteClock.happensBefore(localClock)) {
return local; // Local is causally after remote - local wins
} else {
// Concurrent writes - this is the true conflict
// Strategy: merge non-conflicting fields, flag conflicting ones for user resolution
return mergeWithUserNotification(local, remote);
}
}
}
// Strategy 2: Region-Owned Data (recommended for most cases)
// Each user is "owned" by one region - only that region accepts writes for their data
// Other regions redirect writes to the owning region
// No conflicts possible - at the cost of latency for cross-region writes
@Service
public class RegionOwnershipRouter {
public void updateProfile(String userId, ProfileUpdate update) {
String owningRegion = userOwnershipRegistry.getOwningRegion(userId);
if (owningRegion.equals(currentRegion)) {
profileService.applyUpdate(userId, update);
} else {
// Proxy to owning region (accept slightly higher latency)
regionRouter.forward(owningRegion, userId, update);
}
}
}
// Strategy 3: CRDT (Conflict-free Replicated Data Types)
// For data that naturally merges: shopping carts, feature flags, counters
// Mathematical properties guarantee convergence regardless of update order
@Service
public class CartCRDTConsumer {
// G-Set (grow-only set) for cart items - no conflicts possible
// Adding the same item twice = idempotent (set semantics)
// Removing items requires 2P-Set (two-phase set) approach
}Previous: Supplement 1 - Anti-Patterns Extended
Next: Supplement 3 - Trade-Offs & Decision Guide
Index: Message Queues Demystified - Index