← Back to Articles
6/6/2026Admin Post

message queues supplement1 antipatterns

Message Queues - Supplement 1: Anti-Patterns Extended Deep Dive

Series Navigation:
Main Index |
Part 6 - Pitfalls & Best Practices (15 original pitfalls) |
Supplement 2 - Production Challenges |
Supplement 3 - Trade-Offs & Decision Guide |
Supplement 4 - Real-World Architecture

Part 6 covered 15 foundational pitfalls. This supplement adds 30 more advanced anti-patterns
across five categories, each with production-grade broken examples, fixes, and real consequences.


Table of Contents

Architectural Anti-Patterns

  1. The Chatty Queue
  2. Event Soup - Publishing Everything as Events
  3. The Synchronous Disguise
  4. The God Consumer
  5. The Invisible Contract
  6. Queue as RPC Proxy
  7. The Temporal Coupling Trap
  8. Wrong Level of Granularity Events

Infrastructure Anti-Patterns 9. The Shared Broker Monolith 10. The Fan-Out Bomb 11. The Retention Cliff 12. No-Schema Schema Evolution 13. The Consumer Race Condition 14. The Offset Reset Catastrophe

Operational Anti-Patterns 15. The Silent DLQ 16. The Reprocessing Roulette 17. The Uncontrolled Fanout Migration 18. Blind Broker Upgrades 19. The Missing Runbook 20. The Noisy Neighbor Queue

Security Anti-Patterns 21. The Open Exchange 22. Message Payload PII Leakage 23. The Unauthenticated Consumer 24. Audit Log Bypass via Async 25. The Insecure DLQ

Testing and Reliability Anti-Patterns 26. The Fake In-Memory Broker Test 27. The Stubbed Consumer Test 28. No Chaos in CI/CD 29. The One-Time Migration Script 30. Load Testing the Happy Path Only


Architectural Anti-Patterns


AP-1: The Chatty Queue

What it looks like:

Publishing a separate message for every small state change rather than meaningful domain events.

// BROKEN: One message per field change
public void updateOrderStatus(String orderId, String newStatus) {
    orderRepository.updateStatus(orderId, newStatus);
    kafkaTemplate.send("order-status-changed", orderId, new StatusChangedEvent(orderId, newStatus));
}
 
public void updateOrderAddress(String orderId, Address newAddress) {
    orderRepository.updateAddress(orderId, newAddress);
    kafkaTemplate.send("order-address-changed", orderId, new AddressChangedEvent(orderId, newAddress));
}
 
public void updateOrderItem(String orderId, OrderItem item) {
    orderRepository.updateItem(orderId, item);
    kafkaTemplate.send("order-item-changed", orderId, new ItemChangedEvent(orderId, item));
}
 
// An order going through fulfilment publishes 15 separate events
// Each has its own consumer logic, retry handling, DLQ setup
// Consumers now need to JOIN 15 event streams to understand a single order

Why it is dangerous:

  • Consumers must correlate events across time windows to reconstruct state
  • Ordering between related events is not guaranteed across partitions
  • A single business transaction spawns 10-20 broker roundtrips
  • Broker and consumer infrastructure overhead multiplies with each topic
  • Very hard to reason about causal ordering across fine-grained events

The Fix: Domain Events at Business Grain

// CORRECT: Publish meaningful business state transitions
// An event represents a business fact, not a field mutation
 
@Service
public class OrderService {
 
    public void fulfillOrder(OrderFulfillmentRequest request) {
        Order order = orderRepository.findById(request.getOrderId());
 
        // Multiple field changes happen atomically
        order.setStatus("FULFILLING");
        order.setWarehouseId(request.getWarehouseId());
        order.setEstimatedDelivery(request.getDeliveryDate());
        order.setFulfillmentNotes(request.getNotes());
 
        orderRepository.save(order);
 
        // ONE meaningful event captures the complete state transition
        OrderFulfilledEvent event = OrderFulfilledEvent.builder()
            .eventId(UUID.randomUUID().toString())
            .orderId(order.getId())
            .customerId(order.getCustomerId())
            .warehouseId(request.getWarehouseId())
            .estimatedDelivery(request.getDeliveryDate())
            .lineItems(order.getLineItems())
            .totalAmount(order.getTotalAmount())
            .timestamp(Instant.now())
            .build();
 
        kafkaTemplate.send("order-events", order.getId(), event);
    }
}

Rule of thumb: An event should represent a meaningful thing that happened in the business domain. "Order Fulfilled" is an event. "Order.warehouseId changed from null to WH-42" is a database audit log, not a domain event.

Real-world consequence: At a mid-size e-commerce company, chatty events caused Kafka consumer lag to climb to 50 million messages because downstream services had to read 12 events per order to process one. Broker storage tripled, monitoring became noise-filled, and the system had to be redesigned from scratch.


AP-2: Event Soup

What it looks like:

Publishing undifferentiated "events" that mix commands, state changes, and notifications in the same topic without clear semantic meaning.

// BROKEN: Everything is an "event" - no semantic distinction
kafkaTemplate.send("app-events", new Event("CREATE_ORDER", payload));         // Command
kafkaTemplate.send("app-events", new Event("ORDER_CREATED", payload));         // State change
kafkaTemplate.send("app-events", new Event("NOTIFY_WAREHOUSE", payload));      // Command
kafkaTemplate.send("app-events", new Event("USER_LOGIN", payload));            // Audit
kafkaTemplate.send("app-events", new Event("PAYMENT_PROCESS", payload));       // Command
kafkaTemplate.send("app-events", new Event("EMAIL_SEND_REQUEST", payload));    // Command
kafkaTemplate.send("app-events", new Event("ANALYTICS_TRACK", payload));       // Side-effect
 
// One consumer must handle all 7 semantic types
// No clear ownership: who owns "app-events"?
// No clear retention: financial audit events need 7 years; analytics need 7 days
// No clear schema contract: all events share a single schema registry subject

Why it is dangerous:

  • No clear ownership of the topic across teams
  • Retention policies conflict (audit log vs transient notification)
  • One team's schema change breaks unrelated consumers
  • Impossible to grant fine-grained security access
  • A slow consumer for analytics events blocks financial events in the same partition

The Fix: Topic Taxonomy by Domain and Semantic Type

Topic naming convention: <domain>.<semantic-type>.<entity>

Domains:       orders, payments, users, inventory, notifications
Semantic types: events (state changes), commands (requests to act),
                audit (compliance), notifications (outbound)

Examples:
  orders.events.order-lifecycle          - OrderPlaced, OrderFulfilled, OrderCancelled
  payments.events.payment-lifecycle      - PaymentInitiated, PaymentAuthorized, PaymentFailed
  payments.audit.payment-transactions    - Immutable audit trail, 7-year retention
  users.events.user-lifecycle            - UserRegistered, UserDeactivated
  notifications.commands.email-dispatch  - Commands to send emails
  inventory.events.stock-changes         - StockReserved, StockReplenished
// Each topic has a clear owner, schema, retention policy, and security policy
// Producers publish to domain-specific topics
// Consumers subscribe only to topics relevant to their domain
@KafkaListener(topics = "orders.events.order-lifecycle", groupId = "warehouse-service")
public void handleOrderEvent(OrderLifecycleEvent event) {
    if (event.getType() == OrderEventType.ORDER_FULFILLED) {
        warehouseService.schedulePickup(event);
    }
    // Ignore other types - well-defined filter based on event type enum
}

AP-3: The Synchronous Disguise

What it looks like:

Using a message queue but blocking synchronously on the response, converting async into sync with extra latency.

// BROKEN: Request-Reply that is synchronous under the hood
public PaymentResult processPayment(PaymentRequest request) {
    String correlationId = UUID.randomUUID().toString();
 
    // Publish request
    kafkaTemplate.send("payment-commands", correlationId, request);
 
    // BLOCK the current thread waiting for the response on a reply topic
    CompletableFuture<PaymentResult> future = pendingRequests.put(correlationId);
 
    try {
        // This blocks for up to 30 seconds
        // If payment service is slow: caller thread is blocked
        // 100 concurrent requests = 100 blocked threads
        // Thread pool exhaustion at 200 concurrent users
        return future.get(30, TimeUnit.SECONDS);
    } catch (TimeoutException e) {
        throw new PaymentServiceUnavailableException();
    }
}
 
// The worst of both worlds:
// - None of the decoupling benefits of async (caller still blocked)
// - All of the latency costs of async (extra serialization, broker roundtrip)
// - Thread pool exhaustion risk
// - No backpressure: blocked threads pile up faster than queue drains

Why it is dangerous:

The synchronous disguise takes the added latency of async messaging (serialization, broker roundtrip, deserialization) without delivering any of its benefits (decoupling, resilience, load leveling). Thread pools are exhausted during slowdowns.

The Fix: Genuine Async with Callback or Reactive Approach

// CORRECT OPTION A: Truly async - return a 202 Accepted with tracking ID
@PostMapping("/payments")
public ResponseEntity<PaymentTrackingResponse> initiatePayment(@RequestBody PaymentRequest request) {
    String paymentId = UUID.randomUUID().toString();
 
    kafkaTemplate.send("payment-commands", paymentId, request);
 
    // Return immediately - no blocking
    return ResponseEntity.accepted().body(
        PaymentTrackingResponse.builder()
            .paymentId(paymentId)
            .status("PROCESSING")
            .statusCheckUrl("/payments/" + paymentId + "/status")
            .estimatedCompletionSeconds(5)
            .build()
    );
}
 
// Client polls /payments/{id}/status OR uses webhooks/WebSocket for result
 
// CORRECT OPTION B: If synchronous truly required, use direct HTTP
// Do NOT use a message queue to simulate synchronous RPC between critical services
// Message queues add latency vs direct HTTP - use them when async benefits matter
@FeignClient(name = "payment-service")
public interface PaymentServiceClient {
    @PostMapping("/process")
    PaymentResult processPayment(PaymentRequest request);
}

AP-4: The God Consumer

What it looks like:

One consumer that handles all event types for a domain, growing to thousands of lines and dozens of dependencies.

// BROKEN: A single consumer handling everything
@Service
public class OrderEventConsumer {
 
    // 47 injected dependencies
    private final WarehouseService warehouseService;
    private final EmailService emailService;
    private final SMSService smsService;
    private final AnalyticsService analyticsService;
    private final LoyaltyService loyaltyService;
    private final InvoiceService invoiceService;
    private final AuditService auditService;
    private final FraudDetectionService fraudDetectionService;
    private final TaxService taxService;
    private final InventoryService inventoryService;
    // ... 37 more ...
 
    @KafkaListener(topics = "order-events")
    public void handleOrderEvent(OrderEvent event) {
        switch (event.getType()) {
            case ORDER_PLACED:
                warehouseService.reserve(event);
                emailService.sendConfirmation(event);
                smsService.sendSMS(event);
                analyticsService.trackOrderPlaced(event);
                fraudDetectionService.screenOrder(event);
                inventoryService.decrementStock(event);
                // 50 more lines ...
                break;
            case ORDER_CANCELLED:
                warehouseService.cancelReservation(event);
                emailService.sendCancellation(event);
                inventoryService.restockItems(event);
                loyaltyService.reversePoints(event);
                // 40 more lines ...
                break;
            // 12 more cases ...
        }
    }
    // This class is 1200 lines and breaks 3x per week
}

Why it is dangerous:

  • Deploy the consumer to add warehouse functionality? Every other function gets redeployed too.
  • One exception in fraud detection prevents the email from sending.
  • Cannot scale warehouse processing independently from email sending.
  • Testing requires mocking 47 dependencies.
  • Single point of failure: if this consumer goes down, ALL downstream effects are lost.

The Fix: Consumer-Per-Concern Pattern

// Each consumer handles ONE concern and owns ONE downstream dependency
 
@Service
public class WarehouseEventConsumer {
    private final WarehouseService warehouseService;
 
    @KafkaListener(topics = "order-events", groupId = "warehouse-group")
    public void handle(OrderEvent event) {
        if (event.getType() <mark class="obsidian-highlight"> ORDER_PLACED) warehouseService.reserve(event);
        if (event.getType() </mark> ORDER_CANCELLED) warehouseService.cancelReservation(event);
    }
}
 
@Service
public class NotificationEventConsumer {
    private final EmailService emailService;
    private final SMSService smsService;
 
    @KafkaListener(topics = "order-events", groupId = "notification-group")
    public void handle(OrderEvent event) {
        emailService.send(event);  // Handles all notification logic
        smsService.send(event);    // Can be further split if needed
    }
}
 
@Service
public class AnalyticsEventConsumer {
    private final AnalyticsService analyticsService;
 
    @KafkaListener(topics = "order-events", groupId = "analytics-group")
    public void handle(OrderEvent event) {
        analyticsService.track(event);
    }
}
 
// Benefits:
// - Deploy warehouse changes without touching notifications
// - Scale analytics consumers independently (can lag behind, no SLA)
// - Notification failure does not affect warehouse processing
// - Each consumer has 1-3 dependencies: easy to test

AP-5: The Invisible Contract

What it looks like:

Producers and consumers share message structure via implicit convention, copy-paste, or undocumented JSON fields, with no enforced schema.

// BROKEN: Producer side
public void publishOrder(Order order) {
    Map<String, Object> message = new HashMap<>();
    message.put("order_id", order.getId());
    message.put("customer_id", order.getCustomerId());
    message.put("total", order.getTotalAmount());
    message.put("items", order.getItems());
    kafkaTemplate.send("order-events", JSON.toJson(message));
}
 
// Consumer side (different team, different codebase)
public void handleOrder(String json) {
    Map<String, Object> message = JSON.fromJson(json);
    String orderId = (String) message.get("orderId");    // BUG: producer used "order_id"
    double total = (double) message.get("amount");       // BUG: producer used "total"
    // These null pointer issues appear in production, not in unit tests
    // because unit tests mock the message directly
}

Why it is dangerous:

  • A producer-side field rename breaks all consumers silently.
  • No validation at publish time means bad messages reach consumers.
  • Schema drift: teams evolve their side independently until incompatibility.
  • Impossible to know all current consumers from the producer's perspective.
  • Debugging requires cross-team investigation to find the contract mismatch.

The Fix: Schema Registry with Enforced Compatibility

// Define a canonical Avro schema for the message contract
// avro/order-event.avsc
{
  "type": "record",
  "name": "OrderPlacedEvent",
  "namespace": "com.example.orders",
  "fields": [
    {"name": "eventId",     "type": "string"},
    {"name": "orderId",     "type": "string"},
    {"name": "customerId",  "type": "string"},
    {"name": "totalAmount", "type": "double"},
    {"name": "status",      "type": {"type": "enum", "name": "OrderStatus",
                                     "symbols": ["PLACED", "FULFILLED", "CANCELLED"]}}
  ]
}
 
// Schema Registry enforces BACKWARD compatibility:
// - Adding optional fields: ALLOWED (existing consumers still work)
// - Removing required fields: BLOCKED at publish time
// - Renaming fields: BLOCKED at publish time
// Producer gets an error if it violates compatibility rules before
// the bad message ever reaches any consumer
 
// Spring Boot Avro producer with Schema Registry
@Bean
public ProducerFactory<String, OrderPlacedEvent> avroProducerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
    config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    return new DefaultKafkaProducerFactory<>(config);
}

AP-6: Queue as RPC Proxy

What it looks like:

Routing all inter-service communication through a message queue as a generic transport layer, even for operations that require immediate responses.

// BROKEN: Using Kafka as an RPC bus for everything
// UserService asks InventoryService "is item X in stock?" via Kafka
 
// UserService (caller)
public boolean isItemAvailable(String itemId) {
    String requestId = UUID.randomUUID().toString();
    kafkaTemplate.send("inventory-rpc-requests", requestId,
        new RpcRequest("CHECK_STOCK", itemId));
 
    // Wait for response on reply topic
    return rpcResponseTracker.waitFor(requestId, 5, TimeUnit.SECONDS);
}
 
// InventoryService (server)
@KafkaListener(topics = "inventory-rpc-requests")
public void handleRpcRequest(RpcRequest request) {
    boolean available = inventoryRepository.isAvailable(request.getItemId());
    kafkaTemplate.send("inventory-rpc-responses", request.getRequestId(), available);
}
 
// Problems:
// - 2x broker roundtrip overhead for a simple query
// - Timeout handling complexity across two topics
// - No circuit breaker possible (not a direct connection)
// - Request-reply correlation ID management in distributed memory
// - Network partition between services vs partition in Kafka = different failure modes
// - If response queue backs up, request ID correlation table grows without bound

The Fix: Right Tool for the Right Job

// RPC-style synchronous queries: use HTTP/gRPC directly
// Message queues: use for events and commands that do not need immediate responses
 
// For "is item in stock?" - use HTTP (synchronous, response needed immediately)
@Component
public class InventoryClient {
    @CircuitBreaker(name = "inventory")
    public StockStatus checkStock(String itemId) {
        return restTemplate.getForObject(
            "http://inventory-service/stock/{itemId}", StockStatus.class, itemId);
    }
}
 
// For "reserve item for order" - use Kafka (command, can be async, has DLQ fallback)
@Service
public class OrderService {
    public void reserveInventory(Order order) {
        kafkaTemplate.send("inventory-commands", order.getId(),
            new ReserveInventoryCommand(order.getId(), order.getItems()));
        // Does not wait - inventory will confirm via InventoryReservedEvent
    }
}
 
// Decision rule:
// Need a result NOW? -> HTTP/gRPC
// Issuing a command that will take effect eventually? -> Message Queue
// Publishing something that happened? -> Message Queue

AP-7: The Temporal Coupling Trap

What it looks like:

Designing consumers that require messages to arrive in a specific time window or order across different topics.

// BROKEN: Consumer assumes OrderPlaced arrives before PaymentReceived
// (both are produced concurrently by different services)
 
@KafkaListener(topics = "payment-events")
public void handlePayment(PaymentReceivedEvent event) {
    // Assumes order exists in DB already
    Order order = orderRepository.findById(event.getOrderId())
        .orElseThrow(() -> new OrderNotFoundException("Order not found: " + event.getOrderId()));
    // This throws if payment event arrives before the order is saved to DB
    // This can happen due to:
    // - Different Kafka partitions processed at different speeds
    // - Payment service faster to publish than Order service
    // - Consumer group lag differences
    order.markPaymentReceived(event.getPaymentId(), event.getAmount());
    orderRepository.save(order);
}

Why it is dangerous:

In distributed systems, message arrival order across topics is never guaranteed. Payment events can arrive before order events due to partition imbalances, consumer lag differences, or simply faster processing in the payment service.

The Fix: Handle Out-of-Order with Stateful Aggregation

// CORRECT: Accept that messages arrive out of order
// Use a staging table or cache for partially-arrived events
 
@Service
public class OrderPaymentAggregator {
 
    private final OrderPaymentStagingRepository stagingRepo;
    private final OrderRepository orderRepo;
 
    @KafkaListener(topics = "payment-events")
    public void handlePayment(PaymentReceivedEvent event) {
        // Try to find the order
        Optional<Order> order = orderRepo.findById(event.getOrderId());
 
        if (order.isPresent()) {
            // Order already exists - process immediately
            applyPayment(order.get(), event);
        } else {
            // Order not here yet - stage the payment event
            stagingRepo.save(StagedPaymentEvent.builder()
                .orderId(event.getOrderId())
                .paymentEvent(event)
                .stagedAt(Instant.now())
                .expiresAt(Instant.now().plus(10, ChronoUnit.MINUTES))
                .build());
        }
    }
 
    @KafkaListener(topics = "order-events")
    public void handleOrder(OrderPlacedEvent event) {
        Order order = createOrder(event);
        orderRepo.save(order);
 
        // Drain any staged payment events for this order
        stagingRepo.findByOrderId(event.getOrderId()).forEach(staged -> {
            applyPayment(order, staged.getPaymentEvent());
            stagingRepo.delete(staged);
        });
    }
}

AP-8: Wrong Level of Granularity Events

What it looks like:

Events so coarse (whole aggregate dump) or so fine (every field change) that they are unusable.

// BROKEN: Too coarse - dumping the entire domain object
public class OrderEvent {
    // 200 fields - entire order snapshot including every nested object
    private String orderId;
    private Customer customer;           // 50 fields
    private List<OrderItem> items;       // 30 fields each, 20 items = 600 fields
    private Payment payment;             // 40 fields
    private Address shippingAddress;     // 20 fields
    private List<AuditEntry> auditLog;   // All historical changes embedded
    // ... message is 50KB
}
 
// BROKEN: Too fine - a separate event per change
public class OrderStatusChangedEvent { String orderId; String oldStatus; String newStatus; }
public class OrderPaymentIdChangedEvent { String orderId; String paymentId; }
public class OrderItemAddedEvent { String orderId; OrderItem item; }
public class OrderShippingAddressChangedEvent { String orderId; Address address; }
// Consumers must join 12 event streams to understand order state

The Fix: Task-Relevant Projection Events

// CORRECT: Events contain exactly what consumers need for their task
// Think: "What does each consumer actually need from this event?"
 
// For warehouse (needs items to pick and where to ship)
@Value // Lombok
public class OrderReadyToFulfillEvent {
    String eventId;
    String orderId;
    List<FulfillmentItem> items;    // itemId, sku, quantity, warehouseLocation
    Address shippingAddress;
    String warehouseId;
    Instant requiredByDate;
    // NOT: payment details, customer profile, audit log
}
 
// For email notification (needs what to say in the email)
@Value
public class OrderConfirmationEvent {
    String eventId;
    String orderId;
    String customerEmail;
    String customerName;
    List<OrderSummaryItem> items;   // name, quantity, price
    Money totalAmount;
    String estimatedDeliveryDate;
    // NOT: internal warehouse IDs, payment tokens
}
 
// Both events sourced from the same business action (order placed)
// Each carries only what its consumer needs

Infrastructure Anti-Patterns


AP-9: The Shared Broker Monolith

What it looks like:

All teams, environments, and workloads sharing a single Kafka cluster or RabbitMQ instance with no isolation.

BROKEN topology:
Single Kafka Cluster (3 brokers, 8 cores, 64GB RAM)
    |-- team-A development topics (300 topics)
    |-- team-B development topics (250 topics)
    |-- team-C staging topics
    |-- team-A production topics (critical financial data)
    |-- team-B production topics
    |-- team-C production topics
    |-- shared analytics firehose (10TB/day)
    |-- IoT sensor data (1M messages/sec)
    |-- legacy app migration topics

Problems:
  - Dev team's traffic spike degrades production latency
  - One team's mis-configured producer fills disk for all teams
  - Security: team-A can accidentally consume team-B's financial data
  - Retention policies conflict: analytics wants 1 day, audit needs 7 years
  - Single cluster upgrade window affects all teams simultaneously
  - One team's network saturation affects ALL teams

The Fix: Cluster Isolation by Environment and Criticality

CORRECT topology:
Infrastructure Cluster
    |-- Cluster A: Production (Tier 1 - Financial, Orders)
    |   |-- Isolated: dedicated brokers, strict ACLs, encryption at rest
    |   |-- Replication: MirrorMaker to DR region
    |-- Cluster B: Production (Tier 2 - Analytics, Notifications)
    |   |-- Slightly relaxed durability (can accept some loss)
    |-- Cluster C: Staging
    |   |-- Mirror of production configs, smaller scale
    |-- Cluster D: Development
        |-- Shared but sandboxed per team with quotas
        |-- Aggressive compaction/retention to keep small

Each cluster has:
  - Independent disk pools
  - Independent ACL administration
  - Independent upgrade windows
  - Independent alert policies
  - Independent retention policies
// Spring Boot: environment-specific Kafka configuration
@Configuration
@Profile("production")
public class ProductionKafkaConfig {
    // Dedicated production brokers, SSL, authentication
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "prod-kafka-1:9093,prod-kafka-2:9093,prod-kafka-3:9093");
        config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        config.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/secrets/kafka-keystore.jks");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        return new DefaultKafkaProducerFactory<>(config);
    }
}

AP-10: The Fan-Out Bomb

What it looks like:

A single event publishing to N topics which each trigger M consumers which each call P downstream services. One upstream event causes O(NMP) downstream work.

BROKEN topology:
ProductCatalogUpdated event
    |-- Topic: search-update-events
    |       --> SearchIndexer (re-indexes 10M products)
    |-- Topic: cache-invalidation-events
    |       --> CacheService (invalidates 500K cache entries)
    |-- Topic: recommendation-update-events
    |       --> RecommendationEngine (re-scores 50M user profiles)
    |-- Topic: notification-events
    |       --> NotificationService --> sends emails to 5M subscribed users
    |-- Topic: analytics-events
    |       --> 8 separate analytics consumers
    |-- Topic: cdn-purge-events
    |       --> CDN purges 100K cached pages

One catalog update triggers:
  - 500 million cache invalidations
  - 50 million recommendation updates
  - 5 million notification emails
  - Cascading failures across 6 downstream systems simultaneously

The Fix: Rate-Limited Fan-Out with Batching Gates

// CORRECT: Use a fan-out coordinator with rate limiting and priority tiers
 
// Priority tiers for fan-out work:
// Tier 1: Critical (search index) - process immediately
// Tier 2: Important (cache) - process within 1 minute
// Tier 3: Deferrable (recommendations) - process over 1 hour
// Tier 4: Low-priority (emails to subscribers) - process over 24 hours
 
@Service
public class CatalogUpdateFanOutCoordinator {
 
    @KafkaListener(topics = "product-catalog-updated")
    public void handleCatalogUpdate(ProductCatalogUpdatedEvent event) {
 
        // Tier 1: Immediate - single product update to search
        kafkaTemplate.send("search-index-updates.priority-high", event.getProductId(), event);
 
        // Tier 2: Batched cache invalidation - aggregate 1000 updates before invalidating
        cacheInvalidationBuffer.add(event.getProductId());
 
        // Tier 3: Recommendation update - deferred via rate-limited queue
        kafkaTemplate.send("recommendation-updates.priority-low", event.getProductId(), event);
 
        // Tier 4: User notifications - ONLY if product has changes subscribers care about
        if (event.hasPriceChange() || event.hasNewAvailability()) {
            kafkaTemplate.send("subscriber-notifications.priority-lowest",
                event.getProductId(), event);
        }
        // If no price/availability change: skip subscriber notifications entirely
    }
}

AP-11: The Retention Cliff

What it looks like:

Setting short retention on topics that consumers depend on for replay and recovery, causing irrecoverable data loss during incidents.

# BROKEN: Short retention set without thinking about recovery scenarios
kafka-configs.sh --alter --entity-type topics --entity-name order-events \
  --add-config retention.ms=86400000  # 1 day retention
 
# Scenario:
# Friday 5pm: Consumer service deployed with a critical bug
# Saturday: Bug noticed, messages being processed incorrectly
# Sunday: Team fixes the bug and wants to replay Friday's messages
# Sunday: Messages are already deleted - only 1 day of retention
# Sunday 11pm: 48 hours of orders are permanently lost from the queue
# Only hope: check if DB has them (maybe not - consumer bug corrupted DB too)

The Fix: Retention Policy by Criticality and Recovery Time

# Financial/Order events - 30 days minimum
# Allows recovery from a 2-week undetected bug
kafka-configs.sh --alter --entity-type topics --entity-name orders.events.order-lifecycle \
  --add-config retention.ms=2592000000  # 30 days
 
# User activity events - 7 days
kafka-configs.sh --alter --entity-type topics --entity-name users.events.activity \
  --add-config retention.ms=604800000   # 7 days
 
# Analytics events - 3 days (low cost acceptable, replay less likely needed)
kafka-configs.sh --alter --entity-type topics --entity-name analytics.events.clicks \
  --add-config retention.ms=259200000   # 3 days
 
# Audit/Compliance events - 2 years (regulatory requirement)
kafka-configs.sh --alter --entity-type topics --entity-name payments.audit.transactions \
  --add-config retention.ms=63072000000 # 2 years
  --add-config retention.bytes=-1       # No size limit for audit topics
 
# General rule: Set retention to 2x your worst-case recovery time
# Recovery time = time to detect bug + time to fix + time to re-deploy

AP-12: No-Schema Schema Evolution

What it looks like:

Teams add/modify/remove fields in JSON messages without any coordination mechanism, breaking consumers across teams.

// BROKEN: Version 1 - OrderEvent (team A ships this)
{
    "orderId": "ORD-123",
    "status": "PLACED",
    "totalAmount": 99.99,
    "customerId": "CUST-456"
}
 
// Version 2 - Team A renames field WITHOUT coordination
{
    "orderId": "ORD-123",
    "orderStatus": "PLACED",        // "status" is now "orderStatus" - BREAKING CHANGE
    "amount": 99.99,                // "totalAmount" is now "amount" - BREAKING CHANGE
    "customer": {                   // "customerId" is now nested object - BREAKING CHANGE
        "id": "CUST-456",
        "email": "user@example.com"
    }
}
 
// Team B's consumer (still expecting V1 schema):
@KafkaListener(topics = "order-events")
public void handleOrder(Map<String, Object> event) {
    String status = (String) event.get("status");           // null - field renamed
    double amount = (double) event.get("totalAmount");      // NullPointerException
    String customerId = (String) event.get("customerId");   // null - structure changed
    // Silent data corruption: null values flow through without exception
    // Orders processed with null status, zero amount, null customer
}

The Fix: Schema Registry with Compatibility Enforcement + Versioning Headers

// Confluent Schema Registry enforces compatibility:
// BACKWARD: new schema can read old messages (default, safest)
// FORWARD: old schema can read new messages
// FULL: both directions compatible
 
// When a producer tries to publish with a BREAKING change:
// Schema Registry returns HTTP 409 Conflict
// Message NEVER reaches consumers
 
// Safe evolution example - adding optional fields
{
    "orderId": "ORD-123",
    "status": "PLACED",             // KEPT: existing field
    "orderStatus": "PLACED",        // ADDED: new name (consumers migrate gradually)
    "totalAmount": 99.99,           // KEPT: existing field
    "amount": 99.99,                // ADDED: duplicate with new name
    "customerId": "CUST-456",       // KEPT: existing field
    "customerDetails": {            // ADDED: enriched version (optional/nullable)
        "id": "CUST-456",
        "email": "user@example.com"
    },
    "schemaVersion": 2              // ALWAYS include schema version
}
 
// After all consumers updated to V2: remove deprecated V1 fields in V3
// This 3-phase deployment eliminates breaking changes entirely

AP-13: The Consumer Race Condition

What it looks like:

Multiple consumer instances concurrently processing messages for the same entity, causing data corruption through concurrent writes.

// BROKEN: Two consumer instances get messages for the same userId
// (happens when kafka partition count > consumer count, or RabbitMQ with multiple workers)
 
// Consumer Instance A processes: UpdateUserBalance(userId=42, delta=+100)
// Consumer Instance B processes: UpdateUserBalance(userId=42, delta=-50)
// Both read current balance = 1000 concurrently
// Instance A writes: 1000 + 100 = 1100
// Instance B writes: 1000 - 50 = 950  (OVERWRITES Instance A's write)
// Correct final balance should be: 1000 + 100 - 50 = 1050
// Actual final balance: 950  (Lost Update - a classic race condition)

Why it happens:

  • RabbitMQ: Multiple workers in the same queue can get different messages for the same user.
  • Kafka: If partition key is not the user ID, two consumers can get messages for the same user.
  • Any system: Consumer rebalancing can cause two instances to briefly process the same partition.

The Fix: Partition-Per-Entity Key + Optimistic Locking

// Fix 1: Partition key ensures all events for userId=42 go to the same partition
// and are consumed by the same consumer instance
 
kafkaTemplate.send("user-balance-events", userId.toString(), event);
// hash(userId) % numPartitions = same partition always for same userId
// One consumer instance per partition = no concurrent processing for same user
 
// Fix 2: Database-level optimistic locking as defense-in-depth
@Service
public class UserBalanceConsumer {
 
    @KafkaListener(topics = "user-balance-events")
    @Retryable(maxAttempts = 3, backoff = @Backoff(delay = 100))
    public void handleBalanceUpdate(BalanceUpdateEvent event) {
        int rowsUpdated = userRepository.updateBalanceWithVersion(
            event.getUserId(),
            event.getDelta(),
            event.getExpectedVersion()  // Optimistic lock version
        );
 
        if (rowsUpdated == 0) {
            // Version mismatch - concurrent update happened
            throw new OptimisticLockException("Concurrent balance update for user: " + event.getUserId());
            // Will retry up to 3 times
        }
    }
}
 
// SQL with optimistic locking
// UPDATE user_balance
// SET balance = balance + :delta, version = version + 1
// WHERE user_id = :userId AND version = :expectedVersion
// Returns 0 rows if version has changed - triggering retry

AP-14: The Offset Reset Catastrophe

What it looks like:

Running kafka-consumer-groups --reset-offsets --to-earliest on a production consumer group without understanding the consequences.

# BROKEN: "Let's replay all messages since the beginning to fix the issue"
kafka-consumer-groups.sh \
  --bootstrap-server prod-kafka:9092 \
  --group payment-processor \
  --topic payment-events \
  --reset-offsets --to-earliest --execute
 
# What actually happens:
# payment-processor has already processed 50 MILLION payment events
# Resetting to earliest causes it to process ALL 50 MILLION again
# Each event calls:
#   - chargeCustomerCard() - customers get double-charged
#   - updateAccountBalance() - balances doubled
#   - sendPaymentConfirmation() - customers get 50M duplicate emails
# Financial system is now in a completely corrupt state
# Recovery: weeks of manual reconciliation

The Fix: Safe Offset Management Practices

# RULE 1: NEVER run offset reset on production without a full impact analysis
 
# Step 1: Identify what messages would be replayed
kafka-consumer-groups.sh --describe --group payment-processor \
  --bootstrap-server prod-kafka:9092
# See current committed offsets and lag
 
# Step 2: Use --dry-run to see the impact WITHOUT applying
kafka-consumer-groups.sh \
  --bootstrap-server prod-kafka:9092 \
  --group payment-processor \
  --topic payment-events \
  --reset-offsets --to-datetime 2026-06-01T00:00:00.000 \
  --dry-run
# Shows what would happen WITHOUT executing
 
# Step 3: If replay is truly needed, use a NEW consumer group for replay
# The new group processes the replay; existing production group continues normally
kafka-consumer-groups.sh \
  --bootstrap-server prod-kafka:9092 \
  --group payment-processor-replay-2026-06-05 \
  --topic payment-events \
  --reset-offsets --to-datetime 2026-06-01T00:00:00.000 \
  --execute
 
# Step 4: Replay consumer must be IDEMPOTENT
# Ensure it skips already-applied events using event ID deduplication
# NEVER replay with the same consumer group as production

Operational Anti-Patterns


AP-15: The Silent DLQ

What it looks like:

A DLQ exists, messages are routed to it on failure, but nobody monitors it and it silently accumulates failures for months.

# Team set up DLQ - good!
# But no alerts configured for it
 
# 3 months after go-live:
# DLQ depth: 2,847,223 messages
# First noticed when: disk full alert fired
# Root cause: schema change in message 6 weeks ago broke deserialization for all consumers
# Impact: 2.8 million orders processed without sending confirmation emails
#         2.8 million customers never notified about their orders
 
# How it was discovered: a customer complained on Twitter
# How long it took to notice: 6 weeks

The Fix: DLQ Monitoring with Severity-Tiered Alerts

// Prometheus metrics for DLQ monitoring
@Component
public class DLQDepthMetricsCollector {
 
    private final KafkaAdminClient adminClient;
    private final MeterRegistry meterRegistry;
 
    @Scheduled(fixedDelay = 30000) // every 30 seconds
    public void collectDLQMetrics() {
        Map<String, Long> topicLag = calculateTopicLag();
 
        for (Map.Entry<String, Long> entry : topicLag.entrySet()) {
            String topicName = entry.getKey();
            Long depth = entry.getValue();
 
            if (topicName.contains(".dlq") || topicName.contains("-dlq")) {
                Gauge.builder("kafka.dlq.depth", depth, Long::doubleValue)
                    .tag("topic", topicName)
                    .tag("severity", classifyDLQDepth(topicName, depth))
                    .register(meterRegistry);
            }
        }
    }
 
    private String classifyDLQDepth(String topic, long depth) {
        if (topic.contains("payment") || topic.contains("financial")) {
            if (depth > 0) return "CRITICAL";    // ANY financial failure is critical
        }
        if (topic.contains("order")) {
            if (depth > 100) return "CRITICAL";  // >100 order failures
            if (depth > 10) return "WARNING";
        }
        if (depth > 10000) return "CRITICAL";
        if (depth > 1000) return "WARNING";
        return "INFO";
    }
}
# Alert rules - Prometheus/AlertManager
groups:
  - name: dlq-alerts
    rules:
      - alert: FinancialDLQNonEmpty
        expr: kafka_dlq_depth{topic=~".*payment.*|.*financial.*"} > 0
        for: 1m
        labels:
          severity: critical
          team: payments
        annotations:
          summary: "CRITICAL: Financial DLQ has {{ $value }} messages"
          runbook: "https://wiki.company.com/runbooks/financial-dlq"
 
      - alert: OrdersDLQDepthWarning
        expr: kafka_dlq_depth{topic=~".*order.*"} > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Orders DLQ has {{ $value }} failed messages"

AP-16: The Reprocessing Roulette

What it looks like:

Reprocessing DLQ messages without making the processing operation idempotent first, causing repeated side effects.

// BROKEN: Someone drains the DLQ to fix failed messages
// But the consumer is not idempotent
 
// DLQ has 5000 messages from 2 days ago
// Team replays them directly into the main topic
// Consumer processes each and:
//   1. Sends a "Your order has shipped!" email
//   2. Deducts inventory
//   3. Creates a shipment record
 
// But these orders WERE processed originally - the DLQ was populated
// due to a downstream analytics service failure, not a shipment failure
// Result:
//   - Customers receive second "Your order has shipped!" email
//   - Inventory deducted twice for each order
//   - Duplicate shipment records created

The Fix: DLQ Triage Before Reprocessing

// Step 1: Categorize DLQ messages BEFORE replaying
// Not all DLQ messages should be replayed
 
@Service
public class DLQTriageService {
 
    public DLQTriageReport analyzeDeadLetters(String dlqTopic, int sampleSize) {
        List<DeadLetter> samples = fetchSamples(dlqTopic, sampleSize);
 
        Map<String, Long> errorsByType = samples.stream()
            .collect(Collectors.groupingBy(dl -> dl.getHeader("exception-type"), counting()));
 
        Map<String, Long> errorsByEventType = samples.stream()
            .collect(Collectors.groupingBy(dl -> dl.getHeader("event-type"), counting()));
 
        return DLQTriageReport.builder()
            .totalMessages(getTotalDepth(dlqTopic))
            .errorDistribution(errorsByType)
            .eventTypeDistribution(errorsByEventType)
            .oldestMessage(findOldest(samples))
            .newestMessage(findNewest(samples))
            .recommendation(generateRecommendation(errorsByType))
            .build();
    }
}
 
// Step 2: Fix the consumer to be idempotent BEFORE replaying
// Step 3: Replay to a test environment first
// Step 4: Use a controlled replay strategy with rate limiting
 
@Component
public class ControlledDLQReplayService {
 
    private final RateLimiter replayRateLimiter = RateLimiter.create(100); // 100/sec max
 
    public void replayWithControls(String dlqTopic, String targetTopic,
                                   ReplayFilter filter, Duration timeWindow) {
        List<DeadLetter> messages = fetchMessagesInWindow(dlqTopic, timeWindow);
 
        List<DeadLetter> toReplay = messages.stream()
            .filter(filter::shouldReplay)
            .collect(toList());
 
        log.info("DLQ replay: {} total messages, {} qualify for replay",
            messages.size(), toReplay.size());
 
        for (DeadLetter message : toReplay) {
            replayRateLimiter.acquire(); // Never flood the system
            kafkaTemplate.send(targetTopic, message.getKey(), message.getPayload());
        }
    }
}

AP-17: The Uncontrolled Fanout Migration

What it looks like:

Migrating from one broker (e.g., RabbitMQ) to another (e.g., Kafka) by running dual publish and swapping consumers overnight.

BROKEN migration plan:
Week 1: Add Kafka publish alongside existing RabbitMQ publish (dual write)
Week 2: Switch all consumers to Kafka
Week 3: Remove RabbitMQ publish

Problems:
  - Week 1: Dual publish doubles message load on producers
  - Week 1-2: Gap window: if crash between RabbitMQ and Kafka publish,
    some consumers get the message, others don't = inconsistent state
  - Week 2 switchover night: All consumers switched at midnight
    Some consumers have different Kafka offsets (new group)
    Message gap or duplication during the transition
  - No rollback plan if Kafka consumers fail at 2am

The Fix: Strangler-Fig Migration with Shadow Consumers

// Phase 1: Shadow consumers read from BOTH brokers but process only RabbitMQ
// Validates that Kafka messages arrive correctly without affecting production
 
@Component
public class KafkaShadowConsumer {
 
    @KafkaListener(topics = "order-events", groupId = "shadow-validation-group")
    public void shadowConsume(OrderEvent kafkaEvent) {
        // Do NOT process - only validate
        shadowValidator.compareWithRabbitMQBaseline(kafkaEvent);
        // Alert if discrepancy found: message in Kafka but not RabbitMQ, or vice versa
    }
}
 
// Phase 2: Migrate one consumer group at a time (not all at once)
// Start with analytics (low risk) -> notifications -> orders -> payments (highest risk)
// Each migration has a 48-hour validation period before moving to next
 
// Phase 3: Maintain RabbitMQ as fallback for 2 weeks after full Kafka migration
// Feature flag controls which broker the consumer reads from:
@KafkaListener(topics = "order-events", groupId = "order-processor-kafka")
public void handleFromKafka(OrderEvent event) {
    if (featureFlags.isEnabled("kafka-consumers-order-processor")) {
        processOrder(event);
    }
    // If feature flag off: consume but discard (prevent lag buildup)
}
 
// Rollback: flip feature flag back to RabbitMQ consumer in < 1 minute

AP-18: Blind Broker Upgrades

What it looks like:

Upgrading Kafka or RabbitMQ version in production without testing protocol compatibility with existing clients.

Scenario:
  Production: Kafka 2.6 (running fine for 18 months)
  Plan: Upgrade to Kafka 3.4 to get KRaft (no ZooKeeper)
  Method: Upgrade all brokers in a rolling update, 1 at a time

  Problem 1: Some clients use kafka-clients 2.0 which is not compatible with Kafka 3.x
  Problem 2: The default request.timeout.ms changed in Kafka 3.x
  Problem 3: Log format version upgraded automatically - cannot downgrade

  After upgrade:
    - 3 services fail to connect: "Unsupported API version"
    - Consumer groups rebalance continuously (timeout change)
    - Cannot roll back brokers (log format incompatible)
    - 4 hour production outage for critical payment processing

The Fix: Compatibility Matrix Testing Before Any Broker Upgrade

# Step 1: Check all client versions against new broker version
# Kafka compatibility matrix: https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
 
# Step 2: Test upgrade in staging with production traffic replay (not synthetic traffic)
# Step 3: Upgrade clients BEFORE brokers (clients are backward compatible, brokers are not)
 
# Step 4: Use broker inter.broker.protocol.version to control log format upgrade separately
# This allows broker software upgrade WITHOUT log format upgrade first
kafka-configs.sh --alter --broker --all \
  --add-config log.message.format.version=2.6-IV0  # Keep old format during transition
 
# Step 5: Only after all clients upgraded and validated, upgrade log format
 
# Step 6: Document rollback procedure BEFORE starting upgrade
# For Kafka: Downgrade is IMPOSSIBLE after log format upgrade
# Rollback requires restoring from broker disk snapshots taken before upgrade

AP-19: The Missing Runbook

What it looks like:

Teams with complex messaging infrastructure but no documented procedures for common incidents. First response is always "wake up the original developer at 3am."

Common incidents without runbooks:
  1. Consumer lag spike: how do we scale? Where is the consumer config?
  2. DLQ filling up: which team owns it? How do we triage?
  3. Broker disk full: which topics have largest retention? How to safely truncate?
  4. Rebalancing storm: what are the symptoms? How to stabilize?
  5. Schema registry down: which services break? How to bypass temporarily?
  6. Single partition hot: how to redistribute? What is the re-keying process?
  7. Consumer stuck in rebalancing: how to force reassignment?

Without runbooks: each incident takes 4-8 hours
With runbooks: same incidents take 20-40 minutes

The Fix: Runbook Template for Every Critical Topic/Queue

# Runbook: order-events Consumer Lag Spike
 
## Symptom
 
kafka_consumer_group_lag{topic="order-events"} > 100000
 
## Immediate Response (0-5 minutes)
 
1. Check if consumers are running:
   kubectl get pods -l app=order-processor -n production
2. Check for errors in consumer logs:
   kubectl logs -l app=order-processor --since=5m | grep ERROR
3. Check partition count vs consumer count:
   kafka-consumer-groups.sh --describe --group order-processor-group
 
## Scale-Up Procedure (5-15 minutes)
 
If consumers running but lagging:
 
1. Check current replica count: kubectl get hpa order-processor
2. Manual scale: kubectl scale deployment order-processor --replicas=12
3. Max consumers = partition count (currently 12 for order-events)
4. Do NOT scale beyond partition count - extra pods will be idle
 
## DLQ Check (15-20 minutes)
 
Check if lag is due to processing failures sending to DLQ:
kafka-consumer-groups.sh --describe --group order-processor-dlq-group
 
If DLQ growing: Do NOT replay without engineering team review (see DLQ playbook)
 
## Escalation
 
If lag > 1,000,000 or not improving after 30 minutes:
Page: on-call-platform-team@company.com (Severity 1)
 
## Rollback
 
If new deployment caused the lag:
kubectl rollout undo deployment/order-processor

AP-20: The Noisy Neighbor Queue

What it looks like:

Low-priority bulk processing shares broker partitions or threads with high-priority critical processing, causing SLA violations.

BROKEN: Order processing and bulk report generation on same Kafka topic
Topic: business-events (12 partitions)
  - Consumer Group A: order-processor (critical, SLA: 100ms)
  - Consumer Group B: report-generator (non-critical, batch, SLA: 24 hours)

When report-generator triggers bulk processing:
  - All 12 partitions saturated with report generation commands
  - Order processor cannot keep up - messages queue behind report messages
  - Order confirmation emails delayed by 20 minutes
  - SLA violations on 40,000 orders

Also:
  - report-generator consumer causing 10,000 database reads/sec
  - Shared DB instance: order-processor reads now timing out

The Fix: Traffic Class Isolation

// Separate topics by traffic class, not just by event type
 
// CRITICAL path - small messages, low volume, strict SLA
@KafkaListener(
    topics = "orders.critical.payment-commands",
    groupId = "payment-processor",
    containerFactory = "criticalConsumerFactory"  // Dedicated thread pool
)
public void handleCriticalPayment(PaymentCommand event) { ... }
 
// BULK path - large volume, relaxed SLA, isolated infrastructure
@KafkaListener(
    topics = "reports.bulk.generation-commands",
    groupId = "report-generator",
    containerFactory = "bulkConsumerFactory"  // Separate, larger thread pool
)
public void handleBulkReport(ReportGenerationCommand event) { ... }
 
// Bean configuration: separate thread pools prevent cross-contamination
@Bean("criticalConsumerFactory")
public ConcurrentKafkaListenerContainerFactory<String, Object> criticalFactory() {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
    factory.setConcurrency(4);  // 4 dedicated threads
    factory.getContainerProperties().setPollTimeout(100); // Fast poll
    return factory;
}
 
@Bean("bulkConsumerFactory")
public ConcurrentKafkaListenerContainerFactory<String, Object> bulkFactory() {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
    factory.setConcurrency(20); // Many threads for bulk
    factory.getContainerProperties().setPollTimeout(5000); // Relaxed poll
    return factory;
}

Security Anti-Patterns


AP-21: The Open Exchange

What it looks like:

Kafka topics or RabbitMQ exchanges with no ACL configuration. Any service in the network can read from or write to any topic.

# BROKEN: No ACLs configured (Kafka default - open to all authenticated clients)
# Any service that can reach the Kafka broker can:
#   - Consume from payment-events (read all customer payment data)
#   - Publish to user-commands (issue arbitrary user deletion commands)
#   - Consume from audit-events (read the security audit trail)
 
# Real scenario: A logging service misconfiguration caused it to consume
# from the payment-events topic and write raw payment data to log files.
# Log files were publicly accessible via a misconfigured S3 bucket.
# 500,000 customer payment records exposed.

The Fix: Topic ACLs with Service Identity

# Every service gets its own service account with minimal permissions
 
# Payment service: can produce to payment-events, consume from order-events
kafka-acls.sh --add \
  --allow-principal User:payment-service \
  --operation WRITE --topic payment-events \
  --bootstrap-server prod-kafka:9092
 
kafka-acls.sh --add \
  --allow-principal User:payment-service \
  --operation READ --topic order-events \
  --group payment-processor-group \
  --bootstrap-server prod-kafka:9092
 
# Order service: can produce to order-events only
kafka-acls.sh --add \
  --allow-principal User:order-service \
  --operation WRITE --topic order-events \
  --bootstrap-server prod-kafka:9092
 
# Analytics service: READ ONLY, never write
kafka-acls.sh --add \
  --allow-principal User:analytics-service \
  --operation READ --topic order-events \
  --group analytics-consumer-group \
  --bootstrap-server prod-kafka:9092
 
# Default: DENY ALL (must explicitly grant)
# Verify: kafka-acls.sh --list --bootstrap-server prod-kafka:9092

AP-22: Message Payload PII Leakage

What it looks like:

Embedding personally identifiable information (PII) directly in message payloads that are stored in broker logs.

// BROKEN: Full PII in every message
OrderPlacedEvent event = OrderPlacedEvent.builder()
    .orderId(order.getId())
    .customerEmail("john.doe@example.com")     // PII
    .customerName("John Doe")                   // PII
    .creditCardNumber("4111111111111111")        // PCI-DSS sensitive data
    .cvv("123")                                 // Extremely sensitive
    .billingAddress(customer.getFullAddress())  // PII
    .phoneNumber(customer.getPhone())           // PII
    .dateOfBirth(customer.getDob())             // PII
    .build();
 
// This message is:
//   - Stored in Kafka broker log on disk for 30 days
//   - Replicated across 3 brokers
//   - Potentially readable by anyone with READ ACL on this topic
//   - Included in any broker backup
//   - Printed in logs if consumer logs the message
// GDPR: right-to-erasure is impossible (can't delete individual records from Kafka)
// PCI-DSS: storing CVV in ANY persistent store is a Level 1 PCI violation

The Fix: Reference Tokens + Field-Level Encryption

// CORRECT: Message contains IDs/references, not raw PII
OrderPlacedEvent event = OrderPlacedEvent.builder()
    .orderId(order.getId())
    .customerId(customer.getId())    // Reference ID only - no PII
    .paymentTokenId(paymentToken.getId())  // Tokenized payment reference
    .orderAmount(order.getTotal())
    .itemCount(order.getItems().size())
    .build();
 
// If some fields must contain PII (e.g., for routing/filtering):
// Use field-level envelope encryption
 
@Service
public class PIIAwareEventPublisher {
 
    private final FieldLevelEncryptor encryptor; // KMS-backed encryption
 
    public void publishOrderEvent(Order order) {
        OrderPlacedEvent event = OrderPlacedEvent.builder()
            .orderId(order.getId())
            .customerId(order.getCustomerId())
            // Only the notification service needs the email, and it is encrypted
            .encryptedCustomerEmail(encryptor.encrypt(
                customer.getEmail(),
                "orders.events",
                customer.getId()  // Key ID for per-customer key rotation
            ))
            .build();
 
        kafkaTemplate.send("orders.events.order-lifecycle", order.getId(), event);
    }
}
 
// Right-to-erasure compliance: delete the customer's encryption key
// All messages containing their data are now cryptographically inaccessible
// without touching the Kafka logs

AP-23: The Unauthenticated Consumer

What it looks like:

Consumer services connecting to Kafka or RabbitMQ without TLS or authentication, transmitting data over plaintext connections.

// BROKEN: No authentication, no TLS
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker:9092");
    // No security.protocol
    // No SSL configuration
    // No SASL authentication
    // Messages transmitted in plaintext over network
    // Any network packet capture reveals full message contents
    return new DefaultKafkaConsumerFactory<>(config);
}

The Fix: Mutual TLS + SASL Authentication

@Bean
public ConsumerFactory<String, Object> secureConsumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker:9093");
 
    // Transport security
    config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/certs/truststore.jks");
    config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "${TRUSTSTORE_PASSWORD}");
 
    // Client authentication (mTLS)
    config.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/certs/keystore.jks");
    config.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "${KEYSTORE_PASSWORD}");
 
    // SASL for identity
    config.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
    config.put(SaslConfigs.SASL_JAAS_CONFIG,
        "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
        "clientId=\"payment-service\" clientSecret=\"${CLIENT_SECRET}\";");
 
    // Encrypt at rest (broker-side configuration):
    // log.dirs on encrypted EBS volumes (AWS) or dm-crypt (on-prem)
 
    return new DefaultKafkaConsumerFactory<>(config);
}

AP-24: Audit Log Bypass via Async

What it looks like:

Using asynchronous message processing to skip audit logging for regulatory compliance because "the async path doesn't go through the audit filter."

// BROKEN: HTTP path has audit logging; async Kafka path does not
// Synchronous API path (audited):
@PostMapping("/payments")
@AuditLog(action = "PAYMENT_INITIATED")  // AOP interceptor logs all HTTP calls
public ResponseEntity<?> initiatePayment(@RequestBody PaymentRequest request) {
    kafkaTemplate.send("payment-commands", request);  // Async for performance
    return ResponseEntity.accepted().build();
}
 
// Kafka consumer (NOT audited):
@KafkaListener(topics = "payment-commands")
public void processPayment(PaymentRequest request) {
    // AOP audit interceptor does NOT apply to Kafka listeners
    // This is where the actual payment processing happens
    // But it is NOT audited
    // For PCI-DSS, SOX, GDPR compliance: every payment action must be audited
    paymentProcessor.charge(request);
    // Auditors find: "Who authorized 10,000 charges at 2am on a Tuesday?"
    // Answer: "The Kafka consumer... but we have no audit logs for it"
    // Result: Compliance violation, potential regulatory action
}

The Fix: Consumer-Level Audit with Message Lineage

// Every Kafka consumer that performs regulated actions must log to the audit system
 
@Component
public class AuditedKafkaConsumerAspect {
 
    private final AuditLogRepository auditRepo;
 
    @Around("@annotation(KafkaAudited)")
    public Object auditKafkaConsumer(ProceedingJoinPoint pjp) throws Throwable {
        Method method = ((MethodSignature) pjp.getSignature()).getMethod();
        KafkaAudited annotation = method.getAnnotation(KafkaAudited.class);
 
        AuditEntry entry = AuditEntry.builder()
            .action(annotation.action())
            .actor("kafka-consumer:" + pjp.getTarget().getClass().getSimpleName())
            .messageId(getCurrentMessageId())    // From Kafka header
            .correlationId(getCurrentCorrelationId()) // From Kafka header
            .timestamp(Instant.now())
            .build();
 
        try {
            Object result = pjp.proceed();
            auditRepo.save(entry.withStatus("SUCCESS"));
            return result;
        } catch (Exception e) {
            auditRepo.save(entry.withStatus("FAILED").withError(e.getMessage()));
            throw e;
        }
    }
}
 
@Service
public class PaymentConsumer {
 
    @KafkaListener(topics = "payment-commands")
    @KafkaAudited(action = "PAYMENT_PROCESSED")  // Audits this consumer call
    public void processPayment(PaymentRequest request) {
        paymentProcessor.charge(request);
    }
}

AP-25: The Insecure DLQ

What it looks like:

Applying strict security policies to production topics but leaving the DLQ with open access, creating a data exfiltration vector.

Production topic "payment-events": restricted, TLS, ACL, encrypted
DLQ "payment-events-dlq": same payment data, but:
  - No ACL configuration (open)
  - Consumed by "dlq-monitor" service which logs entire messages to stdout
  - dlq-monitor service has permissive logging that writes to unencrypted files
  - "dlq-monitor" logs are accessed by 15 developers for debugging

A DLQ contains EXACTLY the same sensitive data as the production topic.
Often MORE sensitive because it has failure context and stack traces attached.

The Fix: Apply Security Policies Uniformly to DLQs

// DLQ infrastructure setup with same security posture as production topic
@Configuration
public class DLQSecurityConfiguration {
 
    @PostConstruct
    public void configureDLQACLs() {
        // DLQ ACLs mirror production topic ACLs
        // Only the DLQ processor service can read from payment DLQ
        // NOT available to all developers for casual debugging
 
        // kafka-acls.sh --add
        //   --allow-principal User:dlq-processor-service
        //   --operation READ --topic payment-events.dlq
        //   --group dlq-processor-group
 
        // kafka-acls.sh --deny
        //   --deny-principal User:* (all others)
        //   --operation READ --topic payment-events.dlq
    }
}
 
// DLQ processor must mask PII before logging
@KafkaListener(topics = "payment-events.dlq", groupId = "dlq-processor-group")
public void processDLQ(ConsumerRecord<String, PaymentEvent> record) {
    PaymentEvent event = record.value();
 
    // MASK before logging - never log raw PII or payment data
    log.error("DLQ message: messageId={}, eventType={}, errorReason={}",
        event.getEventId(),
        event.getType(),
        record.headers().lastHeader("exception-message"));
    // NO: log.error("DLQ message: {}", event) - would log full payment details
 
    // Store masked audit record
    dlqAuditRepository.save(DLQAuditRecord.builder()
        .messageId(event.getEventId())
        .maskedPayload(piiMasker.mask(event))
        .errorType(record.headers().lastHeader("exception-type"))
        .build());
}

Testing and Reliability Anti-Patterns


AP-26: The Fake In-Memory Broker Test

What it looks like:

Using in-memory mock implementations of message brokers in tests that do not replicate the actual behavior of the broker.

// BROKEN: Test uses an in-memory fake that does not simulate real Kafka
 
@SpringBootTest
public class OrderProcessingTest {
 
    @MockBean
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    @Test
    void shouldPublishOrderEventOnOrderPlacement() {
        orderService.placeOrder(testOrder);
        verify(kafkaTemplate).send(eq("order-events"), any(), any());
    }
}
 
// What this test does NOT verify:
// - Message serialization (Avro schema compatibility)
// - Message key routing (partitioning by orderId)
// - Consumer deserialization
// - Idempotent processing on duplicate delivery
// - Ordering guarantees
// - Retry behavior on broker unavailability
// - DLQ routing on deserialization failure
// - Schema registry compatibility
//
// The test passes. Production burns.

The Fix: EmbeddedKafka Tests + Testcontainers for Full Integration

// CORRECT: Use EmbeddedKafka for unit-level integration tests
@SpringBootTest
@EmbeddedKafka(
    partitions = 3,
    topics = {"order-events", "order-events.dlq"},
    brokerProperties = {"log.dirs=/tmp/kafka-test", "auto.create.topics.enable=true"}
)
public class OrderEventIntegrationTest {
 
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
 
    @Autowired
    private ConsumerRecordFactory consumerRecordFactory;
 
    @Test
    void shouldDeduplicateAndNotProcessDuplicateEvent() throws Exception {
        OrderPlacedEvent event = createTestOrderEvent("order-123");
 
        // Publish TWICE (simulating at-least-once duplicate)
        kafkaTemplate.send("order-events", "order-123", event);
        kafkaTemplate.send("order-events", "order-123", event);
 
        await().atMost(10, SECONDS).untilAsserted(() -> {
            // Should be processed ONCE (idempotency)
            assertThat(processedOrderRepository.findById("order-123"))
                .isPresent()
                .hasValueSatisfying(o -> assertThat(o.getProcessCount()).isEqualTo(1));
        });
    }
}
 
// CORRECT: Testcontainers for full production-equivalent broker
@Testcontainers
@SpringBootTest
public class FullBrokerIntegrationTest {
 
    @Container
    static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
        .withKraft();  // Use KRaft mode same as production
 
    @Container
    static SchemaRegistryContainer schemaRegistry = new SchemaRegistryContainer()
        .dependsOn(kafka);
 
    @Test
    void shouldHandlePoisonMessageRoutingToDLQ() {
        // Test with REAL Kafka + REAL schema registry
        // Behavior matches production exactly
    }
}

AP-27: The Stubbed Consumer Test

What it looks like:

Testing the producer side by verifying the message was sent, but never testing that the consumer can actually deserialize and process it.

// BROKEN: Producer test only
@Test
void orderServicePublishesEvent() {
    orderService.placeOrder(testRequest);
    verify(kafkaTemplate).send("order-events", "order-123", any(OrderPlacedEvent.class));
    // ✓ Producer test passes
}
 
// BROKEN: Consumer test uses directly constructed objects, not deserialized messages
@Test
void orderConsumerProcessesEvent() {
    OrderPlacedEvent event = new OrderPlacedEvent("order-123", ...); // Directly constructed
    orderConsumer.handleOrder(event);  // Directly called
    // ✓ Consumer test passes
    //
    // NEVER TESTED: Can the consumer deserialize the exact bytes the producer sends?
    // NEVER TESTED: Are the field names in the producer's Avro schema the same as the consumer's?
    // NEVER TESTED: Do they use the same Schema Registry subject?
}

The Fix: End-to-End Contract Tests

// CORRECT: Contract test that exercises the full serialization/deserialization cycle
 
@SpringBootTest
@EmbeddedKafka(topics = "order-events")
public class OrderEventContractTest {
 
    @Autowired OrderService orderService;        // Producer side
    @Autowired TestOrderEventReceiver receiver;  // Test consumer that captures events
 
    @Test
    void producerAndConsumerShareCompatibleSchema() throws InterruptedException {
        // ACT: Let the producer publish through REAL Kafka (embedded)
        OrderRequest request = new OrderRequest("CUST-1", List.of(new Item("SKU-001", 2)));
        orderService.placeOrder(request);
 
        // ASSERT: The consumer receives a correctly deserialized event
        // This test fails if:
        // - Field names don't match between producer and consumer
        // - Avro schema versions are incompatible
        // - Serialization/deserialization code has bugs
        // - Schema registry subjects don't align
        OrderPlacedEvent received = receiver.waitForEvent(10, SECONDS);
        assertThat(received).isNotNull();
        assertThat(received.getCustomerId()).isEqualTo("CUST-1");
        assertThat(received.getItems()).hasSize(1);
        assertThat(received.getItems().get(0).getSku()).isEqualTo("SKU-001");
    }
}

AP-28: No Chaos in CI/CD

What it looks like:

Every CI/CD pipeline test assumes the broker is healthy. No tests verify the system behaves correctly when the broker is unavailable, slow, or partitioned.

// BROKEN: All tests assume healthy Kafka
// Zero tests for:
// - Kafka broker down for 30 seconds
// - Kafka slow (added 500ms latency to broker responses)
// - Message deserialization failures
// - Consumer group rebalancing during processing
// - Schema registry unavailable
// - Broker disk full (publish attempts start failing)
 
// Result: Teams discover all these failure modes in production
// Often at 2am on a weekend

The Fix: Chaos Tests in CI/CD with Toxiproxy

// Use Toxiproxy to inject network failures in tests
@SpringBootTest
@Testcontainers
public class KafkaChaosTest {
 
    @Container
    static ToxiproxyContainer toxiproxy = new ToxiproxyContainer()
        .withNetworkAliases("toxiproxy");
 
    @Container
    static KafkaContainer kafka = new KafkaContainer()
        .withNetwork(Network.SHARED);
 
    private static ToxiproxyClient toxiproxyClient;
    private static Proxy kafkaProxy;
 
    @BeforeAll
    static void setup() {
        toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort());
        kafkaProxy = toxiproxyClient.createProxy("kafka", "0.0.0.0:9092", "kafka:9092");
    }
 
    @Test
    void shouldBufferMessagesWhenBrokerTemporarilyUnavailable() throws Exception {
        // Arrange: Create a latency toxic (simulates slow broker)
        Toxic latency = kafkaProxy.toxics()
            .latency("kafka-latency", ToxicDirection.DOWNSTREAM, 2000); // 2s latency
 
        // Act: publish during degraded conditions
        CompletableFuture<RecordMetadata> future = kafkaTemplate.send("order-events", event);
 
        // Assert: message eventually delivered (with delay, not dropped)
        RecordMetadata result = future.get(10, SECONDS);
        assertThat(result).isNotNull();
 
        latency.remove(); // Restore
    }
 
    @Test
    void shouldRouteToOutboxWhenBrokerIsDown() throws Exception {
        // Kill the proxy - simulates broker outage
        kafkaProxy.toxics().timeout("kafka-timeout", ToxicDirection.DOWNSTREAM, 0);
 
        orderService.placeOrder(testOrder);
 
        // Message should be saved to outbox, not lost
        assertThat(outboxRepository.findByAggregateId(testOrder.getId()))
            .isPresent();
 
        kafkaProxy.toxics().get("kafka-timeout").remove();
    }
}

AP-29: The One-Time Migration Script

What it looks like:

Writing a "one-time" script to migrate messages from one topic/schema to another, running it in production once, then deleting it.

# BROKEN: "One-time" migration script
 
#!/bin/bash
# migrate-orders-v1-to-v2.sh
# Written: 2025-01-15
# Run: 2025-01-16 in production
# Deleted after running: "It was one-time, don't need it anymore"
 
kafka-console-consumer --topic order-events-v1 --from-beginning |
  python3 transform.py |
  kafka-console-producer --topic order-events-v2
 
# 6 months later:
# New region spun up - needs to migrate old topics again
# "One-time" script is gone
# Original author has left the company
# Team spends 2 weeks recreating the migration logic
# v2 schema has evolved since - migration produces wrong format
# $200,000 in engineering time to redo a "one-time" migration

The Fix: Version-Controlled, Idempotent Migration Jobs

// CORRECT: Migration as a first-class, versioned, idempotent job
 
@Component
@ConditionalOnProperty("migration.order-events-v1-to-v2.enabled")
public class OrderEventV1ToV2Migration {
 
    private static final String MIGRATION_ID = "order-events-v1-to-v2-20250116";
    private static final String CONSUMER_GROUP = "migration-" + MIGRATION_ID;
 
    @PostConstruct
    public void runMigration() {
        // Check if already completed (idempotent)
        if (migrationStateRepository.isCompleted(MIGRATION_ID)) {
            log.info("Migration {} already completed, skipping", MIGRATION_ID);
            return;
        }
 
        log.info("Starting migration: {}", MIGRATION_ID);
        long count = 0;
        long errors = 0;
 
        try (KafkaConsumer<String, OrderEventV1> consumer = createV1Consumer(CONSUMER_GROUP)) {
            consumer.subscribe(List.of("order-events-v1"));
            consumer.seekToBeginning(consumer.assignment());
 
            while (true) {
                ConsumerRecords<String, OrderEventV1> records = consumer.poll(Duration.ofSeconds(1));
                if (records.isEmpty()) break; // Caught up to end
 
                for (ConsumerRecord<String, OrderEventV1> record : records) {
                    try {
                        OrderEventV2 v2 = V1ToV2Transformer.transform(record.value());
                        kafkaTemplate.send("order-events-v2", record.key(), v2).get();
                        count++;
                    } catch (Exception e) {
                        log.error("Migration error at offset {}: {}", record.offset(), e.getMessage());
                        errors++;
                    }
                }
                consumer.commitSync();
            }
        }
 
        migrationStateRepository.markCompleted(MIGRATION_ID,
            MigrationResult.builder().migratedCount(count).errorCount(errors).build());
        log.info("Migration {} complete: {} migrated, {} errors", MIGRATION_ID, count, errors);
    }
}
// This migration can be run 100 times safely (idempotent consumer group)
// It is version-controlled, documented, and reusable for new regions

AP-30: Load Testing the Happy Path Only

What it looks like:

Load testing the message queue with perfectly formed messages at the expected rate. Never testing with realistic failure rates, large message sizes, or burst traffic patterns.

BROKEN load test:
  - 1000 messages/second (normal expected load)
  - All messages valid JSON, all exactly 1KB
  - Consumer processes successfully every time
  - Test runs for 10 minutes
  - Result: "System handles 1000 msg/sec ✓"

What this misses:
  - 5% of messages in production are 50KB (claim-check pattern not enforced)
  - 0.1% of messages fail deserialization (causes DLQ routing overhead)
  - Peak load is 10x normal during flash sales (10,000 msg/sec)
  - Consumer downstream (database) is 3x slower at 10,000 msg/sec due to connection pool
  - At 5,000 msg/sec, consumer lag builds faster than it drains
  - At 10,000 msg/sec, the broker itself becomes the bottleneck

The Fix: Realistic Load Testing Scenarios

// Load test scenarios that match production realities
 
public class KafkaLoadTestSuite {
 
    // Scenario 1: Normal load (baseline)
    @Test void normalLoadBaseline() {
        runLoadTest(LoadScenario.builder()
            .messagesPerSecond(1000)
            .messageSizeKB(1)
            .failureRate(0.001)  // 0.1% poison messages
            .durationMinutes(10)
            .build());
    }
 
    // Scenario 2: Flash sale burst
    @Test void flashSaleBurst() {
        runLoadTest(LoadScenario.builder()
            .messagesPerSecond(10000)  // 10x normal
            .rampUpSeconds(30)         // Ramp up over 30 seconds (realistic)
            .sustainSeconds(300)       // 5 minutes at peak
            .rampDownSeconds(60)
            .assertConsumerLagRecoveryWithin(Duration.ofMinutes(15))
            .build());
    }
 
    // Scenario 3: Poison message storm (5% failure rate - simulates schema migration issue)
    @Test void poisonMessageStorm() {
        runLoadTest(LoadScenario.builder()
            .messagesPerSecond(1000)
            .failureRate(0.05)  // 5% poison messages
            .assertDLQDepth("order-events.dlq", lessThan(500))
            .assertMainTopicContinuesProcessing() // DLQ routing must not block main topic
            .build());
    }
 
    // Scenario 4: Consumer restart during peak (rolling deployment simulation)
    @Test void rollingConsumerRestart() {
        LoadRunner runner = startLoad(1000);
        // Restart consumers one at a time, as rolling deployment does
        for (int i = 0; i < consumerCount; i++) {
            restartConsumer(i);
            Thread.sleep(30_000);  // 30 seconds between each restart
        }
        assertThat(runner.getDroppedMessages()).isZero();
        assertThat(runner.getDuplicateMessages()).isZero();
    }
}

Previous: Part 6 - Pitfalls and Best Practices
Next: Supplement 2 - Production Challenges
Index: Message Queues Demystified - Index