← Back to Articles
6/6/2026Admin Post

message queues supplement2 production challenges

Message Queues - Supplement 2: Production Challenges & Real-World Solutions

Series Navigation:
Main Index |
Supplement 1 - Anti-Patterns Extended |
Supplement 3 - Trade-Offs & Decision Guide |
Supplement 4 - Real-World Architecture

Each challenge in this supplement represents a class of real production incidents encountered
across companies at scale. Every solution includes: root cause, detection method,
immediate mitigation, permanent fix, and post-mortem lessons.


Table of Contents

  1. The Consumer Group Rebalancing Storm
  2. The Hot Partition Bottleneck
  3. The Duplicate Payment Incident
  4. Consumer Lag That Never Recovers
  5. The Schema Registry Cascade Failure
  6. Cross-Datacenter Message Ordering Violations
  7. The Broker Disk Full Outage
  8. Silent Data Loss from Broker Leader Election
  9. The Offset Commit Race Condition
  10. Thundering Herd After Planned Maintenance
  11. The Ghost Consumer - Zombie Consumer Group
  12. Certificate Expiry Killing All Producers
  13. Message Ordering Violation in Financial State Machine
  14. Backpressure Cascade into Full System Outage
  15. The Multi-Region Active-Active Split-Brain

1. The Consumer Group Rebalancing Storm

Incident Description

A payments processing service started experiencing 60-second processing gaps every 3-5 minutes during peak hours. Transactions were being delayed, causing SLA violations. The on-call team initially suspected a database issue.

Root Cause Analysis

Timeline:
  T+0:00  Consumer group "payment-processor" stabilizes after deployment
  T+0:03  Traffic peaks: each consumer takes 250ms per message
  T+0:05  max.poll.interval.ms=300000ms (5 min) - fine
  T+0:10  BUT: each poll fetches max.poll.records=500 messages
           500 messages x 250ms = 125 seconds to process one poll batch
           max.poll.interval.ms = 300,000ms (5 minutes)
           ...seems fine?
  T+0:15  Payment service makes external API call: card authorization
           External API slow today: 2 seconds per call (normally 200ms)
           500 messages x 2000ms = 1000 seconds to process one batch
           EXCEEDS max.poll.interval.ms = 300 seconds
  T+0:20  Kafka broker marks consumer as DEAD (poll interval exceeded)
  T+0:20  REBALANCE TRIGGERED: all partitions reassigned
           During rebalance: NO processing for 45-60 seconds
  T+0:25  Rebalance completes, consumer restarts from last committed offset
  T+0:25  Some messages re-delivered (the ones being processed when consumer died)
  T+0:28  Consumer polls 500 messages again, including re-delivered ones
  T+0:33  External API still slow: same cycle repeats
  T+0:35  REBALANCE AGAIN
  This pattern repeats every 3-5 minutes indefinitely

Detection Signals

Metrics that would have caught this earlier:
  1. kafka_consumer_fetch_rate{group="payment-processor"} drops to 0 every 3-5 minutes
  2. kafka_consumer_rebalance_rate{group="payment-processor"} > 0 during business hours
  3. kafka_consumer_group_coordinator_request_total{type="JoinGroup"} increasing
  4. Application logs: "Rebalance happened: partitions assigned/revoked" repeating
  5. External payment API latency P99 > 500ms

Immediate Mitigation

// Emergency config change (zero-downtime via Spring Cloud Config)
management.kafka.consumer.max-poll-records=10  // Reduced from 500 to 10
// This reduces per-poll processing time from 1000s to 20s
// Rebalancing stops immediately after config propagates

Permanent Fix

// Fix 1: Decouple poll frequency from processing time
// Use separate thread for processing - consumer thread only polls
 
@Configuration
public class ResilientConsumerConfig {
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, PaymentEvent> factory(
            ConsumerFactory<String, PaymentEvent> consumerFactory) {
 
        ConcurrentKafkaListenerContainerFactory<String, PaymentEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
 
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(3);
 
        // CRITICAL: Set max.poll.records small
        // Process fewer messages per poll to stay within max.poll.interval.ms
        // Rule: max.poll.records * worst_case_processing_time_ms < max.poll.interval.ms
        factory.getContainerProperties().setConsumerProperties(
            propertiesBuilder()
                .put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50)           // Reduced
                .put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000)   // 5 minutes
                .put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000)      // 45 seconds
                .put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000)   // 15 seconds
                .build()
        );
 
        return factory;
    }
}
 
// Fix 2: Async processing with bounded queue
@KafkaListener(topics = "payment-commands", containerFactory = "resilientFactory")
public void handlePayment(PaymentEvent event, Acknowledgment ack) {
    // Submit to bounded executor - never block the poll thread
    CompletableFuture.runAsync(() -> {
        try {
            paymentProcessor.processWithTimeout(event, Duration.ofSeconds(10));
            ack.acknowledge();
        } catch (Exception e) {
            log.error("Payment processing failed: {}", event.getPaymentId(), e);
            // Do NOT ack - message redelivered after visibility timeout
        }
    }, paymentExecutor);  // paymentExecutor has bounded queue and rejection policy
}

Post-Mortem Lessons

1. max.poll.records must be sized for worst-case, not average-case latency
2. External API timeouts must be factored into poll interval sizing
3. Alert on rebalance frequency (> 1 per hour during steady state = investigate)
4. Every deployment should calculate: max_poll_records × worst_case_ms < max_poll_interval_ms
5. Add circuit breaker for external payment API to fast-fail instead of timing out

2. The Hot Partition Bottleneck

Incident Description

An e-commerce platform's order processing throughput mysteriously caps at 2,000 orders/minute despite having 12 Kafka partitions and 12 consumer instances. Scaling to 24 consumers has zero effect.

Root Cause Analysis

kafka-consumer-groups.sh --describe --group order-processor

GROUP            TOPIC         PARTITION  CURRENT-OFFSET  OFFSET   LAG
order-processor  order-events  0          5,234,891       5,234,891  0
order-processor  order-events  1          5,234,892       5,234,892  0
order-processor  order-events  2          98,234,891      124,891,234  26,656,343  ← HOT!
order-processor  order-events  3          5,234,893       5,234,893  0
...
order-processor  order-events  11         5,234,890       5,234,890  0

Partition 2 has 26 MILLION message lag while all others have zero.
One consumer is processing Partition 2 alone - cannot parallelize within a partition.

Why Partition 2 is Hot

// Producer code (the culprit)
public void publishOrder(Order order) {
    String partitionKey = order.getMerchantId();  // Used as partition key
    kafkaTemplate.send("order-events", partitionKey, event);
}
 
// Problem: Top 3 merchants by order volume:
// Merchant ID "AMAZON-MARKETPLACE": 40% of all orders
// Merchant ID "WALMART-MARKETPLACE": 25% of all orders
// Merchant ID "EBAY-MARKETPLACE": 15% of all orders
 
// hash("AMAZON-MARKETPLACE") % 12 = 2  (maps to Partition 2!)
// 40% of all orders → single partition → single consumer → bottleneck
 
// Adding more consumers does nothing:
// Extra consumers for partition 2 = impossible (one consumer per partition in Kafka)
// Other 11 partitions are idle

Detection Signals

# Detect hot partitions
kafka-consumer-groups.sh --describe --group order-processor \
  | awk '{print $4, $5}' | sort -k2 -rn | head -5
# Shows partitions sorted by lag - hot partition will stand out
 
# Per-partition throughput via JMX/Prometheus
kafka_partition_records_count{topic="order-events"}
# One partition showing 10x the count of others = hot partition

Solution: Multi-Level Partition Keys

// Fix 1: Compound partition key (merchant + random suffix)
// Groups merchant orders together for ordering, but distributes across multiple partitions
 
public void publishOrder(Order order) {
    // Instead of just merchantId, use merchantId + bucket number
    // Each merchant gets up to 4 "virtual lanes"
    int bucket = order.getOrderId().hashCode() & 3;  // 0, 1, 2, or 3
    String partitionKey = order.getMerchantId() + "-" + bucket;
 
    kafkaTemplate.send("order-events", partitionKey, event);
}
 
// "AMAZON-MARKETPLACE" now maps to 4 different keys:
// "AMAZON-MARKETPLACE-0" → any partition
// "AMAZON-MARKETPLACE-1" → any partition
// "AMAZON-MARKETPLACE-2" → any partition
// "AMAZON-MARKETPLACE-3" → any partition
// 40% of orders now spread across 4 partitions instead of 1
 
// Trade-off: within-merchant ordering is NOT guaranteed across buckets
// If order-level ordering matters within a merchant: use orderId as partition key instead
// Fix 2: Custom Partitioner for known hot keys
@Component
public class LoadBalancedPartitioner implements Partitioner {
 
    private static final Map<String, Integer> PARTITION_COUNT_FOR_HOT_MERCHANTS = Map.of(
        "AMAZON-MARKETPLACE", 4,    // Give Amazon 4 partition slots
        "WALMART-MARKETPLACE", 2,   // Give Walmart 2 partition slots
        "EBAY-MARKETPLACE", 2       // Give eBay 2 partition slots
    );
 
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        String merchantId = extractMerchantId(key.toString());
        int totalPartitions = cluster.partitionCountForTopic(topic);
 
        if (PARTITION_COUNT_FOR_HOT_MERCHANTS.containsKey(merchantId)) {
            int slots = PARTITION_COUNT_FOR_HOT_MERCHANTS.get(merchantId);
            // Distribute hot merchant across multiple partitions
            int basePartition = Math.abs(merchantId.hashCode()) % totalPartitions;
            int slot = Math.abs(key.toString().hashCode()) % slots;
            return (basePartition + slot) % totalPartitions;
        }
 
        // Default: standard key-based hashing for all other merchants
        return Math.abs(key.toString().hashCode()) % totalPartitions;
    }
}

Repartitioning Without Downtime

# If topic must be repartitioned (more partitions for better distribution):
# Step 1: Create new topic with more partitions
kafka-topics.sh --create --topic order-events-v2 --partitions 48
 
# Step 2: Run dual-write (producer publishes to both topics) for 24 hours
# Step 3: Start new consumer groups on order-events-v2
# Step 4: Validate new consumers are caught up and healthy
# Step 5: Stop old consumer groups (they're caught up to 0 lag)
# Step 6: Remove dual-write from producer
# Step 7: Delete old topic after retention period

3. The Duplicate Payment Incident

Incident Description

A fintech company's payment processing system charged 12,000 customers twice during a Kafka broker rolling restart. Each affected customer was charged the correct amount twice. Total duplicate charges: $847,000.

Root Cause: The Double Delivery Chain

Timeline:
  T+0:00  Kafka broker-1 (partition 3 leader) begins rolling restart
  T+0:02  Broker-1 goes offline, partition 3 leader election begins
  T+0:03  Consumer "payment-processor-instance-7" has partition 3
           It polled 200 messages at T+0:01 (before broker restart)
           It is mid-processing: 150 messages processed, 50 in progress
  T+0:04  Partition 3 leader election completes: broker-2 becomes leader
  T+0:04  Consumer-7 tries to commit offset for the 150 processed messages
           Offset commit fails: leader has changed, consumer loses partition assignment
           REBALANCE triggered
  T+0:06  Consumer-8 takes over partition 3
           Consumer-8 starts from last COMMITTED offset (150 messages ago)
           Consumer-8 re-processes the 150 already-processed messages
  T+0:06  + Consumer-7 was mid-processing 50 messages AND now Consumer-8 also processes them
           For a brief window: concurrent processing of same messages by consumer-7 and 8
  T+0:06  chargeCustomerCard() called TWICE for 200 payments

Root cause stack:
  1. Offset commit failed during leader election
  2. Non-idempotent consumer (no deduplication)
  3. External payment processor (Stripe) does not deduplicate by Kafka message ID
  4. No distributed lock preventing concurrent charge for same payment ID

Detection: After the Fact

Detected by: Fraud alert at T+2:30 (2.5 hours later)
Trigger: Anomaly detection flagged unusually high duplicate transactions
Alert: "Duplicate charge rate 3.2% (baseline: 0.01%)"
By then: 12,000 customers charged twice

Detection could have been immediate with:
  - Real-time duplicate transaction monitoring at Stripe API level
  - kafka_consumer_rebalance_total alert during broker restart
  - Idempotency key collision counter in payment processor

Solution: Idempotent Payment Consumer with Distributed Lock

@Service
@Slf4j
public class IdempotentPaymentConsumer {
 
    private final PaymentIdempotencyRepository idempotencyRepo;
    private final StripePaymentGateway stripe;
    private final RedissonClient redisson;
 
    @KafkaListener(topics = "payment-commands", groupId = "payment-processor")
    @Transactional
    public void processPayment(
            PaymentCommandEvent event,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            Acknowledgment ack) {
 
        String idempotencyKey = event.getPaymentId();
        String lockKey = "payment-lock:" + idempotencyKey;
 
        // Distributed lock: only ONE instance can process this paymentId at a time
        // Prevents concurrent consumer-7 + consumer-8 race condition
        RLock lock = redisson.getLock(lockKey);
 
        try {
            boolean locked = lock.tryLock(5, 30, TimeUnit.SECONDS);
            if (!locked) {
                log.warn("Could not acquire lock for paymentId={}, will retry", idempotencyKey);
                // Do NOT ack - Kafka will redeliver
                return;
            }
 
            // Check if already processed (handles redelivery after offset commit failure)
            Optional<ProcessedPayment> existing = idempotencyRepo.findById(idempotencyKey);
            if (existing.isPresent()) {
                log.info("Duplicate payment command ignored: paymentId={}, " +
                         "originally processed at partition={}, offset={}",
                         idempotencyKey,
                         existing.get().getOriginalPartition(),
                         existing.get().getOriginalOffset());
                ack.acknowledge(); // Ack so we don't keep reprocessing
                return;
            }
 
            // Process (with idempotency key passed to Stripe)
            ChargeResult result = stripe.charge(
                StripeChargeRequest.builder()
                    .amount(event.getAmount())
                    .customerId(event.getCustomerId())
                    .idempotencyKey(idempotencyKey)  // Stripe-level idempotency
                    .build()
            );
 
            // Record as processed BEFORE acking
            idempotencyRepo.save(ProcessedPayment.builder()
                .paymentId(idempotencyKey)
                .stripeChargeId(result.getChargeId())
                .originalPartition(partition)
                .originalOffset(offset)
                .processedAt(Instant.now())
                .build());
 
            ack.acknowledge();
            log.info("Payment processed: paymentId={}, chargeId={}", idempotencyKey, result.getChargeId());
 
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Interrupted while waiting for lock: paymentId={}", idempotencyKey);
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

Three Layers of Defense

Layer 1: Kafka Idempotent Producer
  - enable.idempotence=true on producer
  - Prevents broker from accepting duplicate messages from the same producer sequence

Layer 2: Consumer-Level Deduplication
  - idempotencyRepo check at consumer entry point
  - Distributed lock prevents concurrent processing

Layer 3: Payment Gateway Idempotency
  - Stripe/Braintree idempotency key passed with every charge request
  - Even if Layer 1 and 2 fail, Stripe will not double-charge

At least 2 layers must fail simultaneously for a duplicate charge to occur

4. Consumer Lag That Never Recovers

Incident Description

An analytics pipeline's Kafka consumer group accumulated 8 billion message lag over 3 weeks, growing steadily. Adding more consumers had diminishing returns. The team could not determine why.

Root Cause: Coordinated Slow Processing

Consumer group: analytics-pipeline
Topic: clickstream-events (24 partitions)
Consumer count: 24 (one per partition - maximally scaled)

Observation: Consumer lag growing 1 million messages/hour despite 24 consumers running
Throughput per consumer: ~4,000 messages/minute (normal is 20,000/minute)

Consumer processing time breakdown (profiled):
  - Deserialize JSON: 2ms
  - Enrich with user metadata (Redis call): 180ms  ← BOTTLENECK
  - Aggregate into time window: 5ms
  - Write to ClickHouse: 15ms
  Total: ~202ms per message = ~295 messages/minute per consumer

But Redis call was 5ms in dev/staging. Why 180ms in production?

Root cause: Redis connection pool exhausted
  - 24 consumer instances x 1 thread x 1 Redis call per message
  - Redis connection pool max size: 20 connections
  - 24 consumers competing for 20 connections: 4 consumers always waiting ~160ms
  - Average wait time = (24/20 - 1) × 5ms / (1/24) = ~160ms added to every call
  - This math compounds: more consumers = more pool contention = slower per consumer
  - Adding consumers actually MADE IT WORSE after 20 consumers

Detection and Diagnosis

# Step 1: Calculate actual consumer throughput
kafka-consumer-groups.sh --describe --group analytics-pipeline | \
  awk 'NR>1 {print $1, $2, $6}' | \
  awk '{lag[$2]+=$6} END {for(t in lag) print t, lag[t]}'
# Shows total lag per topic
 
# Step 2: Consumer processing time (application metrics)
# histogram_quantile(0.99, kafka_consumer_fetch_latency_seconds)
# This shows HOW LONG consumers spend processing, not just polling
 
# Step 3: Throughput trend
# rate(kafka_consumer_records_consumed_total[5m])
# If this is flat or declining as you add consumers: contention somewhere
 
# Step 4: Identify the bottleneck - thread dump of consumer
jstack <consumer-pid> | grep -A 20 "WAITING\|TIMED_WAITING"
# Shows threads blocked on Redis connection pool

Solution: Batching + Async Enrichment + Connection Pool Right-Sizing

// Fix 1: Batch Redis calls (one pipeline call for 100 messages instead of 100 individual calls)
@KafkaListener(
    topics = "clickstream-events",
    containerFactory = "batchConsumerFactory"  // Batch listener mode
)
public void handleBatch(List<ConsumerRecord<String, ClickEvent>> records, Acknowledgment ack) {
    // Collect all user IDs from the batch
    Set<String> userIds = records.stream()
        .map(r -> r.value().getUserId())
        .collect(toSet());
 
    // One batched Redis call for all users in the batch
    // Replaces N individual calls with 1 pipeline call
    Map<String, UserMetadata> userMetadata = redisClient.pipeline(pipe -> {
        userIds.forEach(id -> pipe.get("user:" + id));
    });
 
    // Process records with pre-fetched metadata
    List<EnrichedClickEvent> enriched = records.stream()
        .map(r -> enrich(r.value(), userMetadata.get(r.value().getUserId())))
        .collect(toList());
 
    clickhouseWriter.writeBatch(enriched);
    ack.acknowledge();
}
 
// Fix 2: Right-size connection pool for the actual concurrency
@Bean
public RedisConnectionFactory redisConnectionFactory() {
    LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
        .clientResources(DefaultClientResources.builder()
            .ioThreadPoolSize(8)
            .computationThreadPoolSize(8)
            .build())
        .build();
 
    // Pool size = number of consumer threads × 1.5 (to avoid starvation)
    LettucePoolingClientConfiguration poolingConfig = LettucePoolingClientConfiguration.builder()
        .poolConfig(new GenericObjectPoolConfig<>()
            .maxTotal(40)     // 24 consumer threads × 1.5 = 36, rounded up
            .maxIdle(24)
            .minIdle(10)
            .maxWaitMillis(2000))
        .build();
 
    return new LettuceConnectionFactory(redisStandaloneConfig, poolingConfig);
}
 
// Result: 180ms Redis call → 5ms average (batch eliminated per-message call overhead)
// Throughput: 295 msg/min → 18,000 msg/min per consumer (60x improvement)
// Lag recovery: 8 billion messages recovered in 18 hours instead of never

5. The Schema Registry Cascade Failure

Incident Description

A Schema Registry outage of 12 minutes caused every producer in the system to stop publishing messages. 47 services stopped processing orders. The Schema Registry had a 1% SLA impact but caused 100% of ordering system unavailability.

Why One Service Knocked Out All Producers

Schema Registry interaction on EVERY produce call:
  Producer.send(record):
    1. Serialize record using Avro serializer
    2. Avro serializer checks: "Is schema ID 1234 cached locally?"
       - If YES (cached): skip registry call, use cached schema
       - If NO (not cached): call Schema Registry GET /schemas/ids/1234
    3. If Schema Registry is down: serialization THROWS exception
    4. Exception propagates: producer.send() fails
    5. Application code receives exception
    6. Application either crashes or enters error loop

Default cache behavior:
  - Cache size: up to 1000 schemas (Confluent default)
  - Cache TTL: NO EXPIRATION (schemas cached until JVM restart)

Actual incident:
  - Schema Registry pod restarted due to OOM kill
  - Took 12 minutes to restart (startup probe + GC warm-up)
  - All producers had started within the past 6 hours (fresh deployment)
  - Cache was EMPTY (first call after deployment always hits registry)
  - Every first message in those 12 minutes: Schema Registry call → failure
  - Producers fell into retry loops
  - Retry exhausted → circuit breaker opened → producers stopped
  - 47 services stopped publishing in < 30 seconds

Detection and Recovery

Alert fired at: T+2 minutes (Prometheus alert on schema_registry_health == 0)
Incident declared at: T+4 minutes
Root cause identified at: T+9 minutes
Schema Registry restarted: T+12 minutes
Producers recovered: T+15 minutes (schema cache re-populated)

Post-mortem finding: Schema Registry was a single point of failure

Solution: Schema Registry High Availability and Graceful Degradation

// Fix 1: Schema Registry cluster (multi-instance, multi-AZ)
# docker-compose / Kubernetes - NOT a single instance
 
schema-registry-1:
  image: confluentinc/cp-schema-registry:7.4.0
  environment:
    SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
    SCHEMA_REGISTRY_HOST_NAME: schema-registry-1
  deploy:
    replicas: 3                    # 3 instances
    placement:
      constraints:
        - node.labels.az != same   # Each in different AZ
 
# Load balancer in front of schema-registry instances
# A single registry instance failing does not cause service interruption
 
// Fix 2: Producer-side graceful degradation
@Configuration
public class ResilientAvroProducerConfig {
 
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
 
        // Schema Registry with timeout and fallback
        config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
            "http://schema-registry-1:8081,http://schema-registry-2:8081,http://schema-registry-3:8081");
 
        // Aggressive local caching - reduces Registry calls to near-zero at steady state
        config.put(AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_CONFIG, 100);
 
        // Use latest cached schema if Registry temporarily unavailable
        config.put("schema.registry.request.timeout.ms", "3000");
 
        // Pre-warm cache at startup (not lazily)
        config.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
 
        return new DefaultKafkaProducerFactory<>(config);
    }
}
 
// Fix 3: Startup schema pre-loading
@Component
public class SchemaRegistryCacheWarmer implements ApplicationRunner {
 
    private final SchemaRegistryClient schemaRegistryClient;
    private final List<String> criticalSchemaSubjects;
 
    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("Pre-loading schemas from registry...");
        for (String subject : criticalSchemaSubjects) {
            try {
                // Force-populate local cache
                schemaRegistryClient.getLatestSchemaMetadata(subject);
                log.info("Loaded schema: {}", subject);
            } catch (Exception e) {
                // Log but do not fail startup - use locally bundled fallback schema
                log.warn("Could not load schema {} from registry, using bundled fallback", subject);
                schemaRegistryClient.register(subject, bundledSchema(subject));
            }
        }
    }
}

6. Cross-Datacenter Message Ordering Violations

Incident Description

A financial services company ran Kafka MirrorMaker 2 to replicate topics from US-East to EU-West for regulatory data residency. Users in the EU region started receiving incorrect account balances. Debits appeared before credits in the replicated stream.

Root Cause: Replication Lag Differential

US-East Kafka (primary):
  Partition 3:
    Offset 1000: CreditEvent(accountId=ACC-123, amount=+500)   at T+0:00:00
    Offset 1001: DebitEvent(accountId=ACC-123,  amount=-200)   at T+0:00:01

MirrorMaker 2 replication to EU-West:
  Replication throughput: 80MB/s (under load)

  Credit event (offset 1000): 800 bytes, replicated at T+0:00:05
  Debit event (offset 1001): 2100 bytes (larger payload), replicated at T+0:00:12

  EU-West Consumer (processes as messages arrive):
    T+0:00:05: Processes Credit(+500) → balance = 500
    T+0:00:06: DebitEvent NOT YET REPLICATED
    T+0:00:06: New balance shown to EU customer: 500 ✓
    ...
    T+0:00:12: Processes Debit(-200) → balance = 300
    T+0:00:12: Balance updated: 300 ✓

  Seems fine? Not always:

  What if Consumer polls between T+0:00:05 and T+0:00:12 AND
  what if the Debit was actually published FIRST (replication reorder)?

  MirrorMaker 2 uses multiple worker threads for throughput
  Thread 1 replicates small messages faster
  Thread 2 replicates large messages slower
  If Credit(small) is replicated by Thread 1 and Debit(large) by Thread 2:
    Thread 2 lags behind Thread 1
    EU-West sees: Debit before Credit (reordering!)

  For account balance = 0 (starting):
    Wrong order: Debit(-200) first → balance = -200 (rejected: insufficient funds!)
    Then: Credit(+500) → balance = +300
    Customer's initial transaction was REJECTED despite having incoming funds

Solution: Timestamp-Ordered Consumption + Source Offset Preservation

// Fix 1: Ensure MirrorMaker 2 preserves source ordering
# MirrorMaker 2 configuration
clusters = source, target
source.bootstrap.servers = us-east-kafka:9092
target.bootstrap.servers = eu-west-kafka:9092
 
# CRITICAL: Use same number of replication tasks as source partitions
# One task per partition ensures in-order replication within partitions
source->target.tasks.max = 12  # Equal to partition count
 
# Fix 2: EU-West consumers use source-timestamp ordering, not arrival order
@Service
public class FinancialEventConsumer {
 
    private final FinancialEventBuffer eventBuffer;
 
    @KafkaListener(topics = "account-events.mirror", groupId = "eu-balance-processor")
    public void handle(AccountEvent event,
                       @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long kafkaTimestamp) {
 
        // Use source timestamp (from US-East), NOT EU Kafka arrival timestamp
        long sourceTimestamp = event.getSourceTimestamp(); // Embedded in event payload
 
        // Buffer events with out-of-order tolerance window (500ms)
        eventBuffer.add(event, sourceTimestamp);
    }
}
 
@Component
public class FinancialEventBuffer {
 
    private final PriorityQueue<BufferedEvent> buffer =
        new PriorityQueue<>(Comparator.comparingLong(BufferedEvent::getSourceTimestamp));
 
    private final Duration OUT_OF_ORDER_TOLERANCE = Duration.ofMillis(500);
    private volatile Instant lastProcessed = Instant.now();
 
    public void add(AccountEvent event, long sourceTimestamp) {
        buffer.offer(new BufferedEvent(event, sourceTimestamp));
        drainSafeEvents();
    }
 
    private void drainSafeEvents() {
        long nowMs = Instant.now().toEpochMilli();
        while (!buffer.isEmpty()) {
            BufferedEvent head = buffer.peek();
            // Only process events that are "old enough" to be considered ordered
            // Any events timestamped within the last 500ms might still be followed by
            // earlier-timestamped events still in transit from US-East
            if (nowMs - head.getSourceTimestamp() > OUT_OF_ORDER_TOLERANCE.toMillis()) {
                buffer.poll();
                processEvent(head.getEvent());
            } else {
                break; // Head of queue is too recent - wait for more events
            }
        }
    }
}

7. The Broker Disk Full Outage

Incident Description

A production Kafka cluster's disk filled to 100% at 3am on a Sunday. Brokers stopped accepting new messages, throwing RecordTooLargeException and eventually TimeoutException. All downstream services that published to Kafka failed. The on-call engineer discovered this when SLA alerts fired 45 minutes into the outage.

Root Cause Chain

Root cause 1: A new analytics topic created without retention limit
  - Team creating topic: "We'll set retention later" (they never did)
  - Default retention: none specified = server default
  - Server default (incorrectly configured): retention.bytes=-1, retention.ms=-1
  - This means: KEEP FOREVER, NO SIZE LIMIT

Root cause 2: Topic producing at 100MB/minute
  - 100MB/min × 60 min × 24 hours × 7 days = 1TB per week
  - Kafka cluster disk: 2TB (3 brokers × 2 replicas = 6TB total, ~2TB usable per topic)
  - With 2 weeks of uncontrolled retention: disk full

Root cause 3: No disk utilization alert configured
  - Memory alert: ✓ 80% threshold
  - CPU alert: ✓ 70% threshold
  - Disk alert: MISSING

Immediate Mitigation Procedure

# EMERGENCY: Kafka brokers rejecting all new messages
 
# Step 1: Identify which topics are consuming most disk
kafka-log-dirs.sh --bootstrap-server broker:9092 --describe \
  | grep -v '^{' | sort -k3 -rn | head -20
# Shows topics sorted by disk usage
 
# Step 2: Emergency retention reduction on the runaway topic
kafka-configs.sh --alter \
  --entity-type topics \
  --entity-name analytics-raw-events \
  --add-config retention.ms=3600000  # Drop to 1 hour IMMEDIATELY
# Kafka's log cleaner will start deleting old segments within minutes
 
# Step 3: Monitor disk recovery
watch -n 5 'df -h /var/kafka-logs'
# Should see disk usage declining as log cleaner works
 
# Step 4: Once disk below 80%, restore reasonable retention
kafka-configs.sh --alter \
  --entity-type topics \
  --entity-name analytics-raw-events \
  --add-config retention.ms=86400000  # 24 hours (appropriate for analytics)
  --add-config retention.bytes=52428800000  # 50GB cap regardless of time
 
# Step 5: Verify producers can send again (test message)
kafka-console-producer.sh --broker-list broker:9092 \
  --topic analytics-raw-events <<< "test"

Prevention: Infrastructure as Code + Policy Enforcement

// Enforce retention policy at topic creation time
@Component
public class TopicCreationEnforcer {
 
    private static final Map<String, RetentionPolicy> POLICIES = Map.of(
        "financial",   RetentionPolicy.of(Duration.ofDays(90), 100_000_000_000L),  // 90 days, 100GB
        "order",       RetentionPolicy.of(Duration.ofDays(30),  50_000_000_000L),  // 30 days, 50GB
        "analytics",   RetentionPolicy.of(Duration.ofDays(7),   20_000_000_000L),  // 7 days, 20GB
        "audit",       RetentionPolicy.of(Duration.ofDays(730), Long.MAX_VALUE),    // 2 years, unlimited
        "default",     RetentionPolicy.of(Duration.ofDays(3),   10_000_000_000L)   // 3 days, 10GB
    );
 
    @EventListener
    public void onTopicCreation(TopicCreationRequestedEvent event) {
        String topicCategory = inferCategory(event.getTopicName());
        RetentionPolicy policy = POLICIES.getOrDefault(topicCategory, POLICIES.get("default"));
 
        // BLOCK topic creation without retention policy
        if (!event.hasRetentionPolicy()) {
            throw new TopicPolicyViolationException(
                "Topic " + event.getTopicName() + " must specify retention. " +
                "Recommended policy for '" + topicCategory + "': " + policy
            );
        }
 
        // Warn if retention exceeds policy
        if (event.getRetentionMs() > policy.getMaxRetentionMs()) {
            alertingService.sendWarning(
                "Topic " + event.getTopicName() + " requests " +
                event.getRetentionDays() + " day retention. " +
                "Policy max for '" + topicCategory + "' is " + policy.getMaxDays() + " days."
            );
        }
    }
}
# Disk utilization alert - add to monitoring
groups:
  - name: kafka-disk-alerts
    rules:
      - alert: KafkaBrokerDiskUsageHigh
        expr: (node_filesystem_size_bytes{mountpoint="/var/kafka-logs"}
          - node_filesystem_avail_bytes{mountpoint="/var/kafka-logs"})
          / node_filesystem_size_bytes{mountpoint="/var/kafka-logs"} > 0.70
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka broker disk {{ $labels.instance }} at {{ $value | humanizePercentage }}"
 
      - alert: KafkaBrokerDiskUsageCritical
        expr: ... > 0.85
        labels:
          severity: critical
        annotations:
          summary: "CRITICAL: Kafka broker disk nearly full - broker will stop accepting messages at 100%"

8. Silent Data Loss from Broker Leader Election

Incident Description

During a planned Kafka broker rolling upgrade, an engineering team lost 3 minutes of order events. The events were published by producers, received acks=1 success responses, but were never delivered to consumers. The lost messages were never recoverable.

Root Cause: acks=1 + Unclean Leader Election

Configuration used (a common but dangerous default):
  acks = 1           (leader-only acknowledgment)
  min.insync.replicas = 1   (default, no enforcement)
  unclean.leader.election.enable = true (dangerous default in older Kafka versions)

Timeline of the incident:
  T+0:00  Broker-1 is partition leader (5 replicas: Broker-1, 2, 3)
  T+0:00  Replication status: Broker-1 = leader, Broker-2 = in-sync, Broker-3 = lagging (2s behind)
  T+0:01  Rolling upgrade begins: Broker-1 taken offline
  T+0:01  Leader election needed: choose between Broker-2 (in-sync) and Broker-3 (lagging)
  T+0:01  Broker-2 is available: elected as new leader ✓

  But this is not what happened. There was a network partition:
  T+0:00  Broker-2 loses network to ZooKeeper (briefly - network hiccup)
  T+0:00  Broker-2 removed from ISR (not in-sync anymore)
  T+0:00  Broker-3 becomes the only available replica (but 2 seconds behind)
  T+0:01  Broker-1 goes offline for upgrade
  T+0:01  Leader election: only Broker-3 available (not in ISR)
  T+0:01  unclean.leader.election.enable=true → Broker-3 elected despite being out of ISR
  T+0:01  Broker-3 becomes leader but is 2 SECONDS of messages behind Broker-1

  The messages that Broker-1 received between T-2s and T+0:01:
    - Were ack'd to producers with acks=1 (producers got success)
    - Were stored on Broker-1 but NOT replicated to Broker-3
    - Are NOW GONE because Broker-1 went offline
    - Broker-3 has no knowledge of these messages

  Result: 2 seconds × (order volume during that window) = ~3,000 orders lost
  Orders are in DB (placed), but no downstream events fired (warehouse, email, payment)

Production-Safe Configuration

# NEVER use these settings for critical data:
# acks=1 (leader only - loses data on leader failure before replication)
# unclean.leader.election.enable=true (elects out-of-sync replicas, losing messages)
 
# ALWAYS use for critical data:
 
# Producer configuration
acks = all                    # Wait for ALL in-sync replicas to acknowledge
enable.idempotence = true      # Prevent duplicates from producer retries
max.in.flight.requests.per.connection = 1  # Strict ordering with idempotence
 
# Broker configuration (set globally and per-topic)
default.replication.factor = 3
min.insync.replicas = 2       # At least 2 replicas must acknowledge before success
unclean.leader.election.enable = false  # NEVER elect out-of-sync replica as leader
 
# Topic-level override for financial topics
kafka-configs.sh --alter \
  --entity-type topics \
  --entity-name order-events \
  --add-config min.insync.replicas=2 \
  --add-config unclean.leader.election.enable=false

The Trade-Off Explained

acks=1 + unclean.leader.election=true:
  Pros: Higher throughput (faster acknowledgment), more availability (any replica elected)
  Cons: Data loss possible on broker failure (exactly what happened)
  Use for: Metrics, telemetry, analytics - where losing a message is acceptable

acks=all + unclean.leader.election=false + min.insync.replicas=2:
  Pros: Strong durability guarantee - messages survive leader failures
  Cons: Slightly lower throughput (waits for 2 replicas), reduced availability
        (write fails if < 2 replicas available - but this is the RIGHT behavior)
  Use for: Financial data, orders, user actions, anything you cannot afford to lose

Think of it this way:
  If the broker crashes and you lose messages:
    - With acks=1: you return success to the producer before the message is safe
    - With acks=all: you only return success after the message cannot be lost
  The availability vs durability trade-off is real - choose based on your data's value

9. The Offset Commit Race Condition

Incident Description

A consumer application that processed payments was found to have a race condition where, under high load, offsets were committed before all in-flight messages completed processing. Some payments were processed, but their offsets committed, causing them to be lost if the consumer restarted.

Root Cause: Async Processing + Auto-Commit

// BROKEN: The actual production code that caused the incident
 
@KafkaListener(topics = "payment-events")
public void handlePayments(List<PaymentEvent> events) {
    // Process each event asynchronously for throughput
    List<CompletableFuture<Void>> futures = events.stream()
        .map(event -> CompletableFuture.runAsync(
            () -> paymentService.processPayment(event),
            processingExecutor
        ))
        .collect(toList());
 
    // WRONG: This does NOT wait for all futures to complete
    // The @KafkaListener method returns immediately
    // Spring Kafka sees the method returned without exception
    // If using auto-commit or BATCH ack mode: offsets committed NOW
    // The async tasks are still running in processingExecutor
    // If the application restarts in the next 500ms:
    //   - Offsets were committed (consumer thinks these are done)
    //   - But 30% of async tasks haven't completed yet
    //   - Those payments are LOST (offset committed, won't be redelivered)
}

The Fix: Synchronize Async Processing with Offset Commit

// CORRECT: Batch listener with explicit acknowledgment after all async work completes
 
@KafkaListener(
    topics = "payment-events",
    containerFactory = "batchManualAckFactory"
)
public void handlePayments(
        List<ConsumerRecord<String, PaymentEvent>> records,
        Acknowledgment ack) {
 
    // Launch all async tasks
    List<CompletableFuture<PaymentResult>> futures = records.stream()
        .map(record -> CompletableFuture.supplyAsync(
            () -> paymentService.processPayment(record.value()),
            processingExecutor
        ))
        .collect(toList());
 
    // Wait for ALL futures to complete before acknowledging
    CompletableFuture<Void> allDone = CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[0])
    );
 
    try {
        // CRITICAL: Block until ALL async processing is done
        // Only then: commit the offset
        allDone.get(60, TimeUnit.SECONDS);
        ack.acknowledge();  // Safe to commit now - all processing complete
 
    } catch (TimeoutException e) {
        log.error("Batch processing timed out - NOT committing offset, will retry");
        // Do NOT ack - Kafka redelivers the entire batch
        // Some might be processed twice - must be idempotent
 
    } catch (ExecutionException e) {
        log.error("Batch processing failed - checking individual results");
        // Check which futures succeeded and which failed
        handlePartialBatchFailure(records, futures, ack);
    }
}
 
private void handlePartialBatchFailure(
        List<ConsumerRecord<String, PaymentEvent>> records,
        List<CompletableFuture<PaymentResult>> futures,
        Acknowledgment ack) {
 
    // Find the first failed record
    for (int i = 0; i < futures.size(); i++) {
        if (futures.get(i).isCompletedExceptionally()) {
            // Commit offsets only up to the failed record
            // Records after the failed one will be redelivered
            if (i > 0) {
                ack.nack(i - 1, Duration.ofSeconds(1)); // Commit up to i-1
            }
            return;
        }
    }
}

10. Thundering Herd After Planned Maintenance

Incident Description

After a 20-minute scheduled maintenance window on a high-traffic notification service's Kafka brokers, all consumers came online simultaneously and began processing 20 minutes of backlogged messages. The downstream notification service (SendGrid) received 40x its normal message rate, hit rate limits, and caused cascading failures including the main application losing its SendGrid connection pool.

Root Cause and Cascading Failure Chain

Normal throughput: 5,000 notification events/minute
Maintenance duration: 20 minutes
Messages accumulated: 100,000 messages

Post-maintenance sequence:
  T+0:00  All 12 consumer instances restart simultaneously
  T+0:05  All 12 consumers polling full speed
  T+0:05  Processing rate: 12 consumers × 1000 msg/min = 12,000/min (2.4x normal)
           But they have 100,000 backlogged messages - processing at max speed
  T+0:05  SendGrid API: receives 40,000 emails/minute (normal: 5,000)
  T+0:06  SendGrid returns HTTP 429 (rate limited)
  T+0:06  Consumer retry logic kicks in: retry 3 times with 1-second backoff
  T+0:07  SendGrid HTTP connection pool on consumers exhausted (all connections in retry state)
  T+0:07  New email requests from live traffic: "Connection pool timeout"
  T+0:08  Live user notification processing STOPS (pool exhausted by retries)
  T+0:15  Consumer lag grows DESPITE maintenance window being over
           More messages arriving than being processed
  T+0:20  Full outage: notification service unavailable
           Original cause: planned maintenance
           Actual impact: 30-minute full notification outage

Solution: Controlled Consumer Ramp-Up

// Consumer with startup rate limiting
 
@Component
public class ThrottledNotificationConsumer {
 
    private static final int NORMAL_RATE_PER_SECOND = 83;  // 5000/minute ÷ 60
    private static final int MAX_SENDGRID_RATE = 150;       // Never exceed this
 
    private final AtomicBoolean rampUpComplete = new AtomicBoolean(false);
    private volatile RateLimiter currentRateLimiter;
 
    @PostConstruct
    public void initRampUp() {
        // Start at 20% of normal rate, ramp up to 120% over 5 minutes
        currentRateLimiter = RateLimiter.create(NORMAL_RATE_PER_SECOND * 0.20);
 
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(
            this::increaseRate,
            30,   // Start increasing after 30 seconds
            30,   // Increase every 30 seconds
            TimeUnit.SECONDS
        );
    }
 
    private void increaseRate() {
        double currentRate = currentRateLimiter.getRate();
        double targetRate = Math.min(NORMAL_RATE_PER_SECOND * 1.2, MAX_SENDGRID_RATE);
 
        if (currentRate < targetRate) {
            double newRate = Math.min(currentRate * 1.5, targetRate);
            currentRateLimiter = RateLimiter.create(newRate);
            log.info("Consumer ramp-up: rate increased to {}/second", newRate);
 
            if (newRate >= targetRate) {
                rampUpComplete.set(true);
                log.info("Consumer ramp-up complete: steady state rate {}/second", newRate);
            }
        }
    }
 
    @KafkaListener(topics = "notification-commands", groupId = "notification-processor")
    public void handleNotification(NotificationCommand command, Acknowledgment ack) {
        // Acquire rate limit token before processing
        // This naturally throttles the consumer during ramp-up
        currentRateLimiter.acquire();
 
        try {
            notificationSender.send(command);
            ack.acknowledge();
        } catch (SendGridRateLimitException e) {
            // Even with our rate limiter, hit SendGrid limit
            // Back off and do not ack (redelivery after visibility timeout)
            log.warn("SendGrid rate limit hit, reducing consumer rate");
            currentRateLimiter = RateLimiter.create(currentRateLimiter.getRate() * 0.5);
        }
    }
}

11. The Ghost Consumer - Zombie Consumer Group

Incident Description

A decommissioned service's consumer group was never cleaned up. 8 months later, a new service was deployed that happened to use the same consumer group name. On first startup, it consumed 8 months of messages from offset 0 (where the old service left off) rather than from the current position.

The Zombie Consumer Group

Consumer group "analytics-processor" history:
  March 2025: Service "analytics-v1" created, uses group "analytics-processor"
  July 2025: analytics-v1 decommissioned
  July 2025: Consumer group "analytics-processor" left in Kafka
             Committed offset: 28,000,000 (last position when service was alive)
             Topic had 250,000,000 messages at time of decommission

November 2025: analytics-v2 created
  Developer assumes offset 0 = "start from beginning"
  Developer creates consumer with group "analytics-processor" (same name, reused)
  Developer expects: "It'll start from the latest offset"

  Actual behavior:
    Kafka finds existing group "analytics-processor" with committed offset 28,000,000
    New consumer starts from offset 28,000,000 (8 months ago)
    Topic has grown to 400,000,000 messages
    Consumer begins processing 372,000,000 messages from July 2025

Impact:
  - 372 million stale events processed
  - Analytics dashboards show data from July 2025 as "current"
  - New event stream mixed with 8-month-old events
  - $12,000 in BigQuery write costs processing stale data
  - 6 hours to identify and stop the zombie consumption

Prevention and Cleanup Procedures

# Step 1: List all consumer groups
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list
 
# Step 2: Identify zombie (inactive) groups
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --all-groups --describe \
  | awk '$6 <mark class="obsidian-highlight"> "-" || $6 </mark> "0"' | grep -v "^GROUP"
# Groups with STATE=Empty or Dead and no active members are zombies
 
# Step 3: Archive the group's offsets before deletion
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group analytics-processor > /archive/analytics-processor-offsets.txt
 
# Step 4: Delete zombie consumer groups
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --delete --group analytics-processor
 
# Prevention: Consumer group naming convention
# Format: <service>.<environment>.<version>
# analytics-v1.prod.2025-03  (includes version and date)
# analytics-v2.prod.2025-11  (new service = new name, can never conflict)
 
// Prevention: Code-level safeguard
@Configuration
public class ConsumerGroupValidation {
 
    @Value("${spring.application.name}")
    private String serviceName;
 
    @Value("${spring.application.version}")
    private String version;
 
    @Value("${spring.profiles.active}")
    private String environment;
 
    @Bean
    public String consumerGroupId() {
        // Auto-generate consumer group from service identity
        // Cannot reuse old group by accident
        return String.format("%s.%s.%s", serviceName, environment, version);
    }
}

12. Certificate Expiry Killing All Producers

Incident Description

At 2:14am on a Monday, all 38 services that produced to Kafka stopped publishing simultaneously. The cause: a wildcard TLS certificate used for Kafka broker authentication expired. No alerts had been configured for certificate expiry. By the time the on-call team woke up, the outage had lasted 47 minutes.

Impact Chain

T+0:00  Kafka broker TLS certificate expires
T+0:00  All producer connections: SSL handshake fails
         javax.net.ssl.SSLHandshakeException: certificate_expired
T+0:01  Kafka producers enter reconnection backoff loop
T+0:05  Reconnection backoff maxes out at 10 seconds
T+0:05  New connection attempts every 10 seconds, all failing
T+0:05  Producer local buffers start filling up (messages not being sent)
T+0:15  Producer send buffer full (32MB default)
T+0:15  kafkaTemplate.send() starts throwing: BufferExhaustedException
T+0:15  Application-level error handling: varies by service
         - Some: restart and re-attempt (cycling, not helping)
         - Some: drop messages silently (fire-and-forget paths)
         - Some: propagate exception to caller (HTTP 500 cascade)
T+0:47  On-call engineer wakes up, diagnoses, rotates certificate
T+1:30  All services reconnected, processing backlog
T+2:00  Some services that dropped messages in outbox: ok
T+2:00  Some services that dropped messages silently: data loss

Prevention: Certificate Lifecycle Management

# Alert 30 days before expiry
# Using Prometheus blackbox exporter for certificate monitoring
 
- job_name: 'kafka-cert-check'
  metrics_path: /probe
  params:
    module: [tcp_connect]
  static_configs:
    - targets:
      - kafka-broker-1:9093
      - kafka-broker-2:9093
      - kafka-broker-3:9093
  relabel_configs:
    - source_labels: [__address__]
      target_label: __param_target
    - source_labels: [__param_target]
      target_label: instance
    - target_label: __address__
      replacement: blackbox-exporter:9115
 
# Alert rule
- alert: KafkaCertificateExpiringSoon
  expr: probe_ssl_earliest_cert_expiry - time() < 30 * 24 * 3600
  labels:
    severity: warning
  annotations:
    summary: "Kafka TLS certificate expires in {{ $value | humanizeDuration }}"
 
- alert: KafkaCertificateExpiryCritical
  expr: probe_ssl_earliest_cert_expiry - time() < 7 * 24 * 3600
  labels:
    severity: critical
  annotations:
    summary: "CRITICAL: Kafka TLS certificate expires in {{ $value | humanizeDuration }}"
// Producer-side graceful handling of certificate expiry
@Component
public class ResilientKafkaProducer {
 
    @Value("${kafka.bootstrap.servers}")
    private String bootstrapServers;
 
    private volatile KafkaProducer<String, byte[]> producer;
 
    public void publishWithFallback(String topic, String key, Object event) {
        try {
            producer.send(new ProducerRecord<>(topic, key, serialize(event)));
        } catch (AuthenticationException | SSLException e) {
            // Certificate-related failure - save to outbox for later
            log.error("Kafka auth failure (possible cert expiry): {}", e.getMessage());
            outboxRepository.save(OutboxEntry.builder()
                .topic(topic)
                .key(key)
                .payload(serialize(event))
                .failureReason("KAFKA_AUTH_FAILURE")
                .createdAt(Instant.now())
                .build());
            // Alert immediately - this needs human intervention
            alertingService.sendCritical("Kafka authentication failure - possible certificate expiry");
        }
    }
}

13. Message Ordering Violation in Financial State Machine

Incident Description

A loan origination system processed loan state transitions via Kafka events. Due to a consumer deployment that briefly ran two versions simultaneously, some loans entered invalid states: receiving "Disbursed" events before receiving "Approved" events.

State Machine Violation

Valid loan state machine:
  APPLIED → CREDIT_CHECKED → APPROVED → DISBURSED → REPAYING → COMPLETED
                          ↘ REJECTED

What happened:
  Consumer v1 (old) processing Partition 5:
    Offset 1000: LoanApproved(loanId=L-123)    → state: APPROVED ✓
    Offset 1001: LoanDisbursed(loanId=L-123)   → state: DISBURSED ✓

  During rolling deployment (both v1 and v2 running simultaneously):
    v1 consumer commits offset up to 999 on Partition 5
    v2 consumer takes over Partition 5 during rebalance
    v2 starts from offset 1000 (LoanApproved - same as v1 was processing)

  Race condition during rebalance:
    v1 processes LoanApproved(L-123) → state saved as APPROVED
    REBALANCE OCCURS
    v2 assigned Partition 5, starts from offset 1000 (re-delivered)

    But v2 has a bug: it processes LoanDisbursed before re-processing LoanApproved
    Because v2 uses an async event handler that does not guarantee order within a batch

    v2 receives batch: [LoanApproved(1000), LoanDisbursed(1001)]
    v2 async handler processes LoanDisbursed FIRST (faster path)
    LoanDisbursed arrives at state machine: current state is CREDIT_CHECKED
    CREDIT_CHECKED → DISBURSED is INVALID
    State machine guard: "Cannot disburse a loan that is not approved"
    Exception thrown → event sent to DLQ
    LoanApproved arrives: state still CREDIT_CHECKED → APPROVED ✓

    LoanDisbursed is now in DLQ
    Loan is stuck in APPROVED state permanently
    Customer received loan funds but the system shows APPROVED, not DISBURSED
    Loan management dashboard shows incorrect state
    Collections team gets confused - no repayment schedule created

Solution: Ordered State Machine with Version-Locked Transitions

// Financial state machine with strict ordering enforcement
 
@Service
public class LoanStateMachineConsumer {
 
    @KafkaListener(
        topics = "loan-events",
        groupId = "loan-state-machine",
        containerFactory = "orderedConsumerFactory"  // BATCH mode, manual ack
    )
    public void processLoanEvents(
            List<ConsumerRecord<String, LoanEvent>> records,
            Acknowledgment ack) {
 
        // Process sequentially - NEVER async within a batch
        // Ordering within a partition is guaranteed by Kafka
        // Sequential processing ensures we respect that ordering
        for (ConsumerRecord<String, LoanEvent> record : records) {
            try {
                processInOrder(record.value(), record.offset());
            } catch (InvalidStateTransitionException e) {
                // State transition guard violated - this is a serious consistency error
                log.error("CRITICAL: Invalid state transition loan={}, event={}, currentState={}",
                    record.value().getLoanId(),
                    record.value().getType(),
                    e.getCurrentState());
 
                // Move to investigation DLQ with full context
                Map<String, String> headers = Map.of(
                    "violation-type", "INVALID_STATE_TRANSITION",
                    "current-state", e.getCurrentState(),
                    "attempted-transition", record.value().getType().name(),
                    "offset", String.valueOf(record.offset()),
                    "partition", String.valueOf(record.partition())
                );
                dlqPublisher.sendWithHeaders("loan-events.state-violations", record, headers);
                // Continue processing other loans - do not block on one invalid transition
            }
        }
 
        ack.acknowledge();
    }
 
    @Transactional
    private void processInOrder(LoanEvent event, long offset) {
        Loan loan = loanRepository.findById(event.getLoanId())
            .orElseThrow(() -> new LoanNotFoundException(event.getLoanId()));
 
        // Version-based idempotency: if we've already processed this or a later event, skip
        if (loan.getLastProcessedOffset() >= offset) {
            log.info("Already processed offset {} for loan {}, skipping", offset, event.getLoanId());
            return;
        }
 
        // Validate transition BEFORE applying
        stateMachine.validateTransition(loan.getStatus(), event.getType());
 
        // Apply transition and record offset
        loan.apply(event);
        loan.setLastProcessedOffset(offset);
        loanRepository.save(loan);
    }
}

14. Backpressure Cascade into Full System Outage

Incident Description

An order management system's database became slow due to an unrelated index corruption. This caused consumers to process slowly, which filled the broker queues, which caused producers to block, which eventually cascaded into a full system outage affecting APIs that had nothing to do with the original slow database.

The Cascade

T+0:00  Database index corruption: query time increases from 5ms to 500ms
T+0:01  Order consumer processing time: 5ms → 500ms per message
T+0:05  Consumer processing rate: 1000/sec → 10/sec (100x slower)
T+0:05  Consumer lag starts building: 990 messages/sec accumulating
T+0:20  Consumer lag: 1,000,000 messages
T+0:20  Memory pressure on Kafka brokers from unacked in-flight messages
T+0:30  Kafka producer buffer (32MB) full for order-service
T+0:30  kafkaTemplate.send("order-events") blocks producer threads
T+0:30  Order service HTTP thread pool: threads blocked on kafkaTemplate.send()
T+0:35  Order service HTTP thread pool exhausted: new HTTP requests return 503
T+0:35  API gateway returns 503 for ALL order endpoints
T+0:40  Circuit breaker on API gateway opens for order-service
T+0:40  ALL order creation attempts fail with 503 - including checkout endpoint
T+0:40  FULL OUTAGE on checkout for entire website
         Original cause: database index corruption
         Direct cause: producer buffer full
         Cascaded to: complete e-commerce checkout unavailability

Solution: Backpressure Isolation with Circuit Breaker at Producer Level

// Fix 1: Producer-level circuit breaker + async fallback
@Service
public class BackpressureAwareOrderPublisher {
 
    private final CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("kafka-order-publisher");
    private final Bulkhead bulkhead = Bulkhead.of("kafka-publish",
        BulkheadConfig.custom().maxConcurrentCalls(50).build());
 
    public void publishOrderEvent(OrderEvent event) {
        try {
            // Bulkhead prevents thread pool exhaustion on the CALLER side
            // Max 50 concurrent publish attempts
            Bulkhead.decorateRunnable(bulkhead, () ->
                CircuitBreaker.decorateRunnable(circuitBreaker, () ->
                    kafkaTemplate.send("order-events", event.getOrderId(), event)
                        .get(2, TimeUnit.SECONDS)  // Never block longer than 2s
                ).run()
            ).run();
 
        } catch (BulkheadFullException e) {
            // 50 threads already waiting - Kafka is backed up
            // Save to outbox immediately, don't queue more
            log.warn("Kafka publish bulkhead full - routing to outbox");
            outboxRepository.save(OutboxEntry.from(event));
 
        } catch (CallNotPermittedException e) {
            // Circuit is open - Kafka unavailable
            // Save to outbox, circuit will close when Kafka recovers
            log.warn("Kafka circuit open - routing to outbox");
            outboxRepository.save(OutboxEntry.from(event));
 
        } catch (TimeoutException e) {
            // Individual send timeout - slow consumer (backpressure signal)
            // Route to outbox and open circuit if this is frequent
            circuitBreaker.onError(2, TimeUnit.SECONDS, e);
            outboxRepository.save(OutboxEntry.from(event));
        }
    }
}
 
// Fix 2: Consumer circuit breaker - slow down consumption when DB is slow
// This signals backpressure upstream rather than building up retry storms
 
@KafkaListener(topics = "order-events")
public void processOrder(OrderEvent event, Acknowledgment ack) {
    try {
        CircuitBreaker.decorateRunnable(dbCircuitBreaker, () -> {
            orderRepository.save(Order.from(event)); // The slow DB call
        }).run();
        ack.acknowledge();
 
    } catch (CallNotPermittedException e) {
        // DB circuit is open - don't ack, let message be redelivered
        // This naturally reduces consumer throughput when DB is unavailable
        // Consumer lag builds up in Kafka (which has infinite buffer) rather than
        // cascading into producer thread pool exhaustion
        log.warn("DB circuit open - not processing, will retry when DB recovers");
        Thread.sleep(5000); // Back off before next poll
    }
}

15. The Multi-Region Active-Active Split-Brain

Incident Description

A global SaaS company ran Kafka in active-active mode across US-East and EU-West. A transient network partition between regions caused both regions to independently process the same user actions, resulting in conflicting state in the user profile database that was irreconcilable without manual intervention.

The Split-Brain Scenario

Normal operation:
  User action in US → US Kafka → US consumer → updates US DB + replicates event to EU
  User action in EU → EU Kafka → EU consumer → updates EU DB + replicates event to US

Network partition at T+0:00 (US ↔ EU unreachable for 8 minutes):

  T+0:01  US user updates profile: email changed to new-us@email.com
           US Kafka: processes event, updates US DB
           Replication to EU: FAILS (network partition)

  T+0:03  EU user (same account, same user, opened app on different device in EU)
           Updates profile: email changed to new-eu@email.com
           EU Kafka: processes event, updates EU DB
           Replication to US: FAILS (network partition)

  T+0:08  Network partition heals
  T+0:08  MirrorMaker replication resumes
  T+0:08  US sees EU's "email=new-eu@email.com" event
           US DB currently has "email=new-us@email.com"
           CONFLICT: Which one wins?
           Last-write-wins? Both wrote at T+0:01 and T+0:03 - US was first by 2 seconds
           US timestamp: T+0:01 (earlier)
           EU timestamp: T+0:03 (later)
           Last-write-wins = EU event wins
           US DB updated to new-eu@email.com

  T+0:08  EU sees US's "email=new-us@email.com" event
           EU DB currently has "email=new-eu@email.com"
           Last-write-wins = US event (T+0:01) loses to EU event (T+0:03)
           EU DB keeps new-eu@email.com

  Final state:
    US DB: new-eu@email.com (LWW resolution)
    EU DB: new-eu@email.com (LWW resolution)
    Outcome: user's US change (new-us) is LOST
    User: "I updated my email in the US app, but it reverted to something I typed in EU"

  In 8 minutes of partition: 1,247 conflicting writes
  Manual reconciliation: 6 person-days

Solution: Conflict Resolution Strategy Selection

// Active-active is fundamentally hard. Choose your strategy based on data type:
 
// Strategy 1: Last Write Wins (LWW) with vector clocks (not wall clock)
// Use for: User preferences, profile data that users understand can conflict
@Service
public class ProfileConflictResolver {
 
    public UserProfile resolve(UserProfile local, UserProfile remote) {
        // Use vector clocks, not wall-clock time
        // Vector clock comparison is causally correct; wall clock is not
        VectorClock localClock = local.getVectorClock();
        VectorClock remoteClock = remote.getVectorClock();
 
        if (localClock.happensBefore(remoteClock)) {
            return remote; // Remote is causally after local - remote wins
        } else if (remoteClock.happensBefore(localClock)) {
            return local;  // Local is causally after remote - local wins
        } else {
            // Concurrent writes - this is the true conflict
            // Strategy: merge non-conflicting fields, flag conflicting ones for user resolution
            return mergeWithUserNotification(local, remote);
        }
    }
}
 
// Strategy 2: Region-Owned Data (recommended for most cases)
// Each user is "owned" by one region - only that region accepts writes for their data
// Other regions redirect writes to the owning region
// No conflicts possible - at the cost of latency for cross-region writes
 
@Service
public class RegionOwnershipRouter {
    public void updateProfile(String userId, ProfileUpdate update) {
        String owningRegion = userOwnershipRegistry.getOwningRegion(userId);
        if (owningRegion.equals(currentRegion)) {
            profileService.applyUpdate(userId, update);
        } else {
            // Proxy to owning region (accept slightly higher latency)
            regionRouter.forward(owningRegion, userId, update);
        }
    }
}
 
// Strategy 3: CRDT (Conflict-free Replicated Data Types)
// For data that naturally merges: shopping carts, feature flags, counters
// Mathematical properties guarantee convergence regardless of update order
 
@Service
public class CartCRDTConsumer {
    // G-Set (grow-only set) for cart items - no conflicts possible
    // Adding the same item twice = idempotent (set semantics)
    // Removing items requires 2P-Set (two-phase set) approach
}

Previous: Supplement 1 - Anti-Patterns Extended
Next: Supplement 3 - Trade-Offs & Decision Guide
Index: Message Queues Demystified - Index