← Back to Articles
6/6/2026Admin Post

message queues supplement4 realworld architecture

Message Queues - Supplement 4: Real-World Architecture & Industry Practices

Series Navigation:
Main Index |
Supplement 1 - Anti-Patterns Extended |
Supplement 2 - Production Challenges |
Supplement 3 - Trade-Offs & Decision Guide

This supplement covers how leading companies actually use message queues in production —
complete architectures, industry-specific patterns, migration strategies, and
architectural decision records (ADRs) with concrete reasoning.


Table of Contents

  1. E-Commerce Order Processing Architecture
  2. Financial Transaction Processing Architecture
  3. Real-Time Notification System Architecture
  4. IoT Data Ingestion at Scale
  5. Event Sourcing with CQRS Architecture
  6. Multi-Region Active-Passive Architecture
  7. Gradual Migration from Synchronous to Async
  8. Migrating Between Brokers Without Downtime
  9. Industry-Specific Patterns
  10. Architectural Decision Records (ADRs)

1. E-Commerce Order Processing Architecture

The Problem

An e-commerce platform processes 50,000 orders/hour at peak. Each order triggers: payment processing, inventory reservation, warehouse notification, customer email, loyalty points, analytics, fraud checks, and invoice generation. All of these must be reliable but not all need to happen before the user gets their confirmation.

Architecture Design

┌─────────────────────────────────────────────────────────────────────────────┐
│                        E-COMMERCE ORDER PROCESSING                          │
│                                                                              │
│  Customer                                                                    │
│     │                                                                        │
│     ▼                                                                        │
│  [Order API]  ──── Sync ────► [Payment Service]  (must succeed before ACK)  │
│     │                               │ success                               │
│     │ save + publish                │                                        │
│     ▼                               ▼                                        │
│  [Order DB] ◄──Outbox──► [Kafka: orders.events.order-lifecycle]              │
│                                     │                                        │
│              ┌──────────────────────┼──────────────────────┐                │
│              ▼                      ▼                       ▼                │
│  [Inventory Service]    [Warehouse Service]   [Notification Service]         │
│  (reserve stock)        (create shipment)     (email + SMS)                  │
│         │                       │                      │                    │
│         │                       │                      ▼                    │
│    [inventory-events]   [warehouse-events]   [SendGrid / Twilio]            │
│                                                                              │
│              ┌──────────────────────┬──────────────────────┐                │
│              ▼                      ▼                       ▼                │
│  [Loyalty Service]      [Analytics Service]    [Invoice Service]             │
│  (award points)         (BI pipeline)          (generate PDF)                │
│                                                                              │
│  [Fraud Service] ◄── SYNCHRONOUS (inline with order placement, not async)   │
└─────────────────────────────────────────────────────────────────────────────┘

Critical Design Decisions Explained

Decision 1: Payment is synchronous, everything else is async

@PostMapping("/orders")
@Transactional
public ResponseEntity<OrderResponse> placeOrder(@RequestBody OrderRequest request) {
 
    // Step 1: Fraud check - SYNCHRONOUS (must block order if fraudulent)
    FraudAssessment fraud = fraudService.assess(request);  // 50ms
    if (fraud.isHighRisk()) {
        return ResponseEntity.status(402).body(OrderResponse.rejected("Fraud risk detected"));
    }
 
    // Step 2: Payment authorization - SYNCHRONOUS (must succeed before committing)
    PaymentResult payment = paymentGateway.authorize(request.getPaymentDetails()); // 200ms
    if (!payment.isSuccessful()) {
        return ResponseEntity.status(402).body(OrderResponse.rejected(payment.getDeclineReason()));
    }
 
    // Step 3: Save order and outbox event in ONE transaction (Outbox Pattern)
    Order order = Order.create(request, payment.getAuthorizationId());
    orderRepository.save(order);
    outboxRepository.save(OutboxEntry.orderPlaced(order));  // Same DB transaction
 
    // All other processing (inventory, email, warehouse, analytics) is ASYNC
    // These happen after the transaction commits via the Outbox relay
 
    return ResponseEntity.accepted().body(
        OrderResponse.accepted(order.getId(), "/orders/" + order.getId())
    );
    // Total response time: ~300ms (fraud + payment only)
    // User gets "Order confirmed" - everything else runs asynchronously
}

Decision 2: SLA Tiers for Consumer Groups

// Tier 1: Must complete within 1 minute (business-critical)
@KafkaListener(
    topics = "orders.events.order-lifecycle",
    groupId = "inventory-reservation",  // SLA: 30 seconds
    containerFactory = "highPriorityConsumerFactory"
)
public void reserveInventory(OrderPlacedEvent event) { ... }
 
// Tier 2: Must complete within 10 minutes (important but not critical)
@KafkaListener(
    topics = "orders.events.order-lifecycle",
    groupId = "warehouse-fulfillment",  // SLA: 5 minutes
    containerFactory = "standardConsumerFactory"
)
public void notifyWarehouse(OrderPlacedEvent event) { ... }
 
@KafkaListener(
    topics = "orders.events.order-lifecycle",
    groupId = "customer-notifications",  // SLA: 2 minutes
    containerFactory = "standardConsumerFactory"
)
public void sendConfirmationEmail(OrderPlacedEvent event) { ... }
 
// Tier 3: Can lag by hours (non-critical)
@KafkaListener(
    topics = "orders.events.order-lifecycle",
    groupId = "analytics-pipeline",  // SLA: 24 hours
    containerFactory = "lowPriorityConsumerFactory"
)
public void trackOrderAnalytics(OrderPlacedEvent event) { ... }
 
// Tier 3 consumers use different consumer configuration:
// - Larger batch sizes (efficiency over latency)
// - Higher linger.ms
// - Alerts trigger only when lag > 1 hour (not immediately)

Decision 3: Inventory Reservation with Saga on Failure

// Inventory reservation can fail (item out of stock)
// Must trigger order cancellation and refund
 
@Service
public class InventoryReservationConsumer {
 
    @KafkaListener(topics = "orders.events.order-lifecycle",
                   groupId = "inventory-reservation")
    @Transactional
    public void handleOrderPlaced(OrderPlacedEvent event, Acknowledgment ack) {
        for (OrderItem item : event.getItems()) {
            boolean reserved = inventoryService.reserve(
                item.getSku(), item.getQuantity(), event.getOrderId()
            );
 
            if (!reserved) {
                // Saga compensating action: cancel the order
                kafkaTemplate.send("orders.commands.cancel-order",
                    event.getOrderId(),
                    CancelOrderCommand.builder()
                        .orderId(event.getOrderId())
                        .reason("INVENTORY_UNAVAILABLE")
                        .failedSku(item.getSku())
                        .triggeredBy("inventory-reservation-service")
                        .build()
                );
                ack.acknowledge();
                return;
            }
        }
        // All items reserved successfully - publish confirmation
        kafkaTemplate.send("orders.events.inventory-reserved",
            event.getOrderId(), InventoryReservedEvent.from(event));
        ack.acknowledge();
    }
}

2. Financial Transaction Processing Architecture

The Problem

A fintech company processes 10,000 transactions/minute with strict requirements: zero data loss, exactly-once processing, complete audit trail, and sub-5-second settlement.

Architecture

┌───────────────────────────────────────────────────────────────────────────┐
│                   FINANCIAL TRANSACTION PROCESSING                         │
│                                                                             │
│  Client                                                                     │
│    │                                                                        │
│    ▼                                                                        │
│  [Transaction API] ──Sync validate──► [Risk Engine] (synchronous check)    │
│    │                                                                        │
│    │ (Transactional Outbox - single DB transaction)                         │
│    ▼                                                                        │
│  [Transaction DB]                                                           │
│    │                                                                        │
│    ▼ (Outbox relay - CDC via Debezium)                                      │
│  [Kafka: payments.events.transaction-lifecycle]                             │
│    │              (partition key: accountId - strict per-account ordering)  │
│    │                                                                        │
│    ├──► [Ledger Service] (debit/credit accounts - idempotent, versioned)    │
│    │          │                                                             │
│    │          ▼                                                             │
│    │    [Account Balance DB] (optimistic locking, version per balance)      │
│    │                                                                        │
│    ├──► [Settlement Service] (batches for clearing house)                   │
│    │          │                                                             │
│    │          ▼                                                             │
│    │    [Settlement Batch Queue] (time-windowed, aggregated)                │
│    │                                                                        │
│    ├──► [Compliance Service] (AML/KYC checks)                               │
│    │          │                                                             │
│    │          ▼                                                             │
│    │    [payments.audit.transactions] (7-year retention, immutable)         │
│    │                                                                        │
│    └──► [Notification Service] (SMS/email receipt)                          │
│                                                                             │
│  [Reconciliation Service] reads BOTH Transaction DB and Ledger               │
│    │ runs every 5 minutes, alerts on any discrepancy                        │
└───────────────────────────────────────────────────────────────────────────┘

Key Implementation: Ordered Ledger Updates

@Service
public class LedgerUpdateConsumer {
 
    @KafkaListener(
        topics = "payments.events.transaction-lifecycle",
        groupId = "ledger-service",
        // CRITICAL: concurrency=1 per partition to maintain ordering
        // partition key = accountId ensures all transactions for same account
        // are processed in sequence
        containerFactory = "orderedFinancialConsumerFactory"
    )
    @Transactional(isolation = Isolation.SERIALIZABLE)
    public void updateLedger(
            TransactionEvent event,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset,
            Acknowledgment ack) {
 
        // Check if already applied (idempotency)
        if (ledgerRepository.isTransactionApplied(event.getTransactionId())) {
            log.info("Duplicate transaction ignored: {}", event.getTransactionId());
            ack.acknowledge();
            return;
        }
 
        Account account = accountRepository.findByIdWithLock(event.getAccountId())
            .orElseThrow(() -> new AccountNotFoundException(event.getAccountId()));
 
        // Validate business rules
        if (event.getType() == DEBIT && account.getBalance().compareTo(event.getAmount()) < 0) {
            // Insufficient funds - publish compensation event
            publishInsufficientFundsEvent(event);
            ack.acknowledge();
            return;
        }
 
        // Apply the balance change
        BigDecimal newBalance = event.getType() == DEBIT
            ? account.getBalance().subtract(event.getAmount())
            : account.getBalance().add(event.getAmount());
 
        // Update with version check (optimistic locking)
        int rowsUpdated = accountRepository.updateBalance(
            event.getAccountId(),
            newBalance,
            account.getVersion(),
            account.getVersion() + 1
        );
 
        if (rowsUpdated == 0) {
            // Concurrent modification - retry (will be re-delivered by Kafka)
            throw new OptimisticLockException("Concurrent balance update: " + event.getAccountId());
        }
 
        // Record ledger entry (immutable)
        ledgerRepository.insertEntry(LedgerEntry.builder()
            .transactionId(event.getTransactionId())
            .accountId(event.getAccountId())
            .type(event.getType())
            .amount(event.getAmount())
            .balanceBefore(account.getBalance())
            .balanceAfter(newBalance)
            .kafkaPartition(partition)
            .kafkaOffset(offset)
            .processedAt(Instant.now())
            .build()
        );
 
        ack.acknowledge();
    }
}

Audit Trail Architecture

// Separate audit topic with 7-year retention
// Consumers write to it but can never modify or delete
 
@Component
public class AuditEventPublisher {
 
    @KafkaListener(topics = "payments.events.transaction-lifecycle",
                   groupId = "audit-writer")
    public void writeAuditRecord(TransactionEvent event, Acknowledgment ack) {
        // Transform to immutable audit record (no PII - only hashed identifiers)
        AuditRecord auditRecord = AuditRecord.builder()
            .eventId(event.getTransactionId())
            .eventType(event.getType().name())
            .accountIdHash(hashPII(event.getAccountId()))  // Hashed for GDPR
            .amount(event.getAmount())
            .currency(event.getCurrency())
            .timestamp(event.getTimestamp())
            .sourceRegion(event.getSourceRegion())
            .processorId(event.getProcessorId())
            .build();
 
        // Write to immutable audit topic (producer-only for most services)
        kafkaTemplate.send("payments.audit.transactions", event.getTransactionId(), auditRecord);
 
        // Also write to WORM storage (S3 Glacier) for regulatory compliance
        s3Client.putObject("financial-audit-archive",
            "transactions/" + event.getTimestamp().toLocalDate() + "/" + event.getTransactionId(),
            auditRecord);
 
        ack.acknowledge();
    }
}

3. Real-Time Notification System Architecture

The Problem

A social media platform needs to deliver notifications (likes, comments, follows, mentions) to 50 million daily active users. Notifications should arrive within 3 seconds. Some users have 10 million followers (celebrities). Peak load: 500,000 notifications/second during major events.

The Celebrity Problem (Fan-Out Challenge)

Normal user (100 followers):
  User posts → 100 notification events → trivial

Celebrity (10 million followers):
  Celebrity posts → 10,000,000 notification events in < 1 second
  At 3-second delivery SLA: must deliver 10M notifications in 3 seconds
  = 3.3 million/second just for this one celebrity's post

Naive fan-out at publish time:
  - Synchronously generate 10M notification records in DB → 10+ minutes
  - 500 celebrities posting at same time → system collapses

Solution: Pull-based notification for high-follower users (fan-out on read)

Hybrid Push/Pull Architecture

┌──────────────────────────────────────────────────────────────────────────┐
│                    NOTIFICATION SYSTEM ARCHITECTURE                       │
│                                                                            │
│  User Action (post, like, follow)                                          │
│         │                                                                  │
│         ▼                                                                  │
│  [Activity Events Kafka Topic]                                             │
│  partition key: actorUserId                                                │
│         │                                                                  │
│         ▼                                                                  │
│  [Fan-Out Service Consumer]                                                │
│         │                                                                  │
│         ├── Celebrity? (>100K followers)                                   │
│         │   → Write activity to "celebrity-feed" cache (Redis)             │
│         │   → Followers PULL this on next app open (fan-out on read)       │
│         │                                                                  │
│         └── Normal user? (≤100K followers)                                 │
│             → Fetch follower list                                          │
│             → Publish one notification event per follower                  │
│             → [notification-events topic]                                  │
│                        │                                                   │
│                        ▼                                                   │
│            [Notification Delivery Service]                                 │
│                        │                                                   │
│             ┌──────────┼──────────┐                                        │
│             ▼          ▼          ▼                                        │
│          [APNS]    [FCM]     [WebSocket]                                   │
│       (iOS push) (Android) (web real-time)                                 │
└──────────────────────────────────────────────────────────────────────────┘

Implementation: Fan-Out Service

@Service
public class NotificationFanOutConsumer {
 
    private static final int CELEBRITY_THRESHOLD = 100_000;
 
    @KafkaListener(
        topics = "social.events.user-activity",
        groupId = "notification-fanout",
        containerFactory = "highThroughputConsumerFactory"
    )
    public void handleUserActivity(
            UserActivityEvent event,
            Acknowledgment ack) {
 
        int followerCount = followerCountCache.get(event.getActorId());
 
        if (followerCount > CELEBRITY_THRESHOLD) {
            handleCelebrityFanOut(event);
        } else {
            handleNormalFanOut(event);
        }
        ack.acknowledge();
    }
 
    private void handleCelebrityFanOut(UserActivityEvent event) {
        // Store activity in Redis - followers PULL when they open the app
        // Key: celebrity:{userId}:feed, Value: sorted set by timestamp
        redisTemplate.opsForZSet().add(
            "celebrity:" + event.getActorId() + ":feed",
            event,
            (double) event.getTimestamp().toEpochMilli()
        );
        // Set TTL: activities older than 7 days auto-expire
        redisTemplate.expire("celebrity:" + event.getActorId() + ":feed", 7, TimeUnit.DAYS);
 
        // Optionally: send push notification to a SAMPLE of active followers
        // (too expensive to notify all 10M followers; sample 1M most-active)
        List<String> activeFollowers = followerService.getMostActive(event.getActorId(), 1_000_000);
        publishNotificationsBatch(activeFollowers, event);
    }
 
    private void handleNormalFanOut(UserActivityEvent event) {
        List<String> followers = followerService.getFollowers(event.getActorId());
 
        // Batch publish to notification topic (don't flood with individual messages)
        List<ProducerRecord<String, NotificationEvent>> batch = followers.stream()
            .map(followerId -> new ProducerRecord<>(
                "social.notifications.delivery",
                followerId,  // Partition by recipient for ordering
                NotificationEvent.of(followerId, event)
            ))
            .collect(toList());
 
        // Publish batch (Kafka handles batching internally)
        batch.forEach(record -> kafkaTemplate.send(record));
    }
}

WebSocket Real-Time Delivery

// Final delivery leg: Kafka notification → WebSocket to live users
 
@Service
public class WebSocketDeliveryConsumer {
 
    private final Map<String, WebSocketSession> activeSessions = new ConcurrentHashMap<>();
 
    @KafkaListener(
        topics = "social.notifications.delivery",
        groupId = "websocket-delivery"
    )
    public void deliverToConnectedUser(
            NotificationEvent notification,
            Acknowledgment ack) {
 
        WebSocketSession session = activeSessions.get(notification.getRecipientId());
 
        if (session != null && session.isOpen()) {
            // User is online - deliver via WebSocket for instant delivery
            session.sendMessage(new TextMessage(serialize(notification)));
            log.debug("Delivered via WebSocket: userId={}", notification.getRecipientId());
        } else {
            // User is offline - send push notification instead
            pushNotificationService.send(notification);
            log.debug("Delivered via push: userId={}", notification.getRecipientId());
        }
 
        ack.acknowledge();
    }
}

4. IoT Data Ingestion at Scale

The Problem

A smart infrastructure company collects sensor readings from 2 million IoT devices at 1-second intervals. That is 2 million messages/second sustained. Data must be stored for 5 years, be queryable for analytics, and trigger real-time alerts within 500ms.

Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                        IOT DATA INGESTION ARCHITECTURE                   │
│                                                                          │
│  2M IoT Devices (temperature, pressure, GPS, power)                      │
│         │                                                                │
│         ▼ (MQTT over TLS)                                                │
│  [MQTT Broker Cluster]  ← lightweight protocol for constrained devices   │
│  (EMQ X or HiveMQ)                                                       │
│         │                                                                │
│         ▼ (MQTT → Kafka bridge)                                          │
│  [Kafka: iot.events.sensor-readings]                                     │
│  48 partitions, partition key: deviceId                                  │
│         │                                                                │
│    ┌────┴────────────────────────────────┐                               │
│    ▼                                     ▼                               │
│  [Real-Time Alert Consumer]     [Batch Storage Consumer]                 │
│  (checks thresholds,            (writes to ClickHouse/                   │
│   publishes to alerts topic)     TimescaleDB/S3 Parquet)                 │
│         │                               │                               │
│         ▼                               ▼                               │
│  [alert.events.device-alerts]    [Time-Series DB]                        │
│         │                        (long-term analytics)                   │
│         ▼                                                                │
│  [On-Call Notification]                                                  │
│  [Dashboard Real-Time]                                                   │
└─────────────────────────────────────────────────────────────────────────┘

Key: Schema Optimization for High-Throughput IoT

// At 2M messages/second: every byte saved = significant network/storage savings
 
// BAD: JSON schema (bloated, slow)
{
  "deviceId": "device-4f8a3b2c",
  "timestamp": "2026-06-05T12:34:56.789Z",
  "temperature": 23.5,
  "humidity": 65.2,
  "pressure": 1013.25,
  "batteryLevel": 87
}
// Size: ~120 bytes per message
// 2M/sec × 120 bytes = 240 MB/second network load
 
// GOOD: Avro binary schema (compact, fast)
// Schema defined once in Schema Registry
{
  "type": "record",
  "name": "SensorReading",
  "namespace": "com.company.iot",
  "fields": [
    {"name": "deviceId",     "type": {"type": "fixed", "size": 16}},  // UUID as bytes
    {"name": "timestamp",    "type": "long"},                          // epoch millis
    {"name": "temperature",  "type": "float"},
    {"name": "humidity",     "type": "float"},
    {"name": "pressure",     "type": "float"},
    {"name": "batteryLevel", "type": "int"}
  ]
}
// Size: ~30 bytes per message (4× smaller)
// 2M/sec × 30 bytes = 60 MB/second (4× less network load)
// 5-year storage at 1-day retention: 60MB/sec × 86,400 = 5TB/day saved
 
// BEST for IoT: Protocol Buffers (even more compact for repeated messages)
message SensorReading {
  bytes device_id = 1;       // 16 bytes
  int64 timestamp = 2;       // 8 bytes
  float temperature = 3;     // 4 bytes
  float humidity = 4;        // 4 bytes
  float pressure = 5;        // 4 bytes
  int32 battery_level = 6;   // 4 bytes
}
// Size: ~42 bytes with field tags (similar to Avro, better for nested messages)

Real-Time Alert Consumer with Windowing

// Kafka Streams for real-time alerting with sliding windows
 
@Configuration
public class IoTAlertingTopology {
 
    @Bean
    public KStream<String, SensorReading> buildAlertingTopology(StreamsBuilder builder) {
 
        KStream<String, SensorReading> readings = builder.stream(
            "iot.events.sensor-readings",
            Consumed.with(Serdes.String(), sensorReadingAvroSerde())
        );
 
        // Alert: Temperature exceeding threshold for 5 consecutive minutes
        // (single spike OK; sustained high temp = equipment problem)
        readings
            .filter((deviceId, reading) -> reading.getTemperature() > 80.0f) // Threshold
            .groupByKey()
            .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)))
            .count()
            .toStream()
            .filter((windowedDeviceId, count) -> count >= 5) // 5 readings in 5 min = alert
            .mapValues((windowedDeviceId, count) -> DeviceAlert.builder()
                .deviceId(windowedDeviceId.key())
                .alertType("SUSTAINED_HIGH_TEMPERATURE")
                .windowStart(windowedDeviceId.window().startTime())
                .windowEnd(windowedDeviceId.window().endTime())
                .readingCount(count)
                .build()
            )
            .to("alert.events.device-alerts");
 
        // Alert: Battery below 10% AND below 5% in last reading
        readings
            .filter((deviceId, reading) -> reading.getBatteryLevel() < 10)
            .to("alert.events.low-battery");
 
        return readings;
    }
}

Batch Storage Consumer with Micro-Batching

// Write IoT readings to ClickHouse in micro-batches for efficiency
// Individual inserts at 2M/sec would kill any database
// Batch 10,000 readings every 100ms instead
 
@Component
public class IoTBatchStorageConsumer {
 
    private final List<SensorReading> batch = new ArrayList<>(10000);
    private final ScheduledExecutorService flusher = Executors.newScheduledThreadPool(1);
 
    @PostConstruct
    public void scheduleFlushing() {
        flusher.scheduleAtFixedRate(this::flush, 100, 100, TimeUnit.MILLISECONDS);
    }
 
    @KafkaListener(
        topics = "iot.events.sensor-readings",
        groupId = "iot-clickhouse-writer",
        containerFactory = "batchIoTConsumerFactory"
    )
    public void bufferReadings(List<SensorReading> readings) {
        synchronized (batch) {
            batch.addAll(readings);
            if (batch.size() >= 10000) {
                flush(); // Force flush if batch is large enough
            }
        }
    }
 
    private void flush() {
        List<SensorReading> toFlush;
        synchronized (batch) {
            if (batch.isEmpty()) return;
            toFlush = new ArrayList<>(batch);
            batch.clear();
        }
 
        try {
            clickhouseClient.insertBatch("iot_readings", toFlush);
            // ClickHouse handles 100M+ rows/second in batch mode
        } catch (Exception e) {
            log.error("ClickHouse batch write failed: {} readings", toFlush.size(), e);
            // Write failed readings to error topic for retry
            toFlush.forEach(r -> kafkaTemplate.send("iot.events.storage-failures", r));
        }
    }
}

5. Event Sourcing with CQRS Architecture

The Problem

A banking application needs a complete, auditable history of all account changes. The current state must be derivable from the event history. Read and write performance requirements differ significantly.

Architecture

┌───────────────────────────────────────────────────────────────────────────┐
│                     EVENT SOURCING + CQRS ARCHITECTURE                    │
│                                                                            │
│  COMMAND SIDE (Write)                                                      │
│  ──────────────────────────────────────────────────────────               │
│  Client → [Command Handler] → validates → [Event Store (Kafka)]           │
│                                            (immutable event log)          │
│                                            (source of truth)              │
│                                                    │                      │
│                                                    ▼                      │
│  QUERY SIDE (Read) - Multiple Projections          │                      │
│  ─────────────────────────────────────────         │                      │
│  [Account Balance Projection] ◄───────────────────┤                      │
│  (relational DB, current balance only)             │                      │
│                                                    │                      │
│  [Transaction History Projection] ◄────────────────┤                      │
│  (optimized for statement generation)              │                      │
│                                                    │                      │
│  [Analytics Projection] ◄──────────────────────────┤                      │
│  (ClickHouse, aggregations)                        │                      │
│                                                    │                      │
│  [Fraud Detection Projection] ◄────────────────────┘                      │
│  (pattern matching, real-time)                                            │
│                                                                            │
│  Kafka Consumer Groups = different projections built independently        │
│  Each projection can be REBUILT from Kafka at any time                    │
└───────────────────────────────────────────────────────────────────────────┘

Command Handler and Event Store

// Command handler: validates, then publishes events to Kafka (the event store)
 
@Service
public class AccountCommandHandler {
 
    @PostMapping("/accounts/{accountId}/deposits")
    @Transactional
    public DepositResult handleDeposit(
            @PathVariable String accountId,
            @RequestBody DepositCommand command) {
 
        // Load current state by replaying events (or from snapshot + recent events)
        Account account = accountProjector.getCurrentState(accountId);
 
        // Validate the command against current state
        account.validateDeposit(command); // Throws if invalid
 
        // Create the event (the fact that will be stored)
        MoneyDepositedEvent event = MoneyDepositedEvent.builder()
            .eventId(UUID.randomUUID().toString())
            .accountId(accountId)
            .amount(command.getAmount())
            .currency(command.getCurrency())
            .reference(command.getReference())
            .depositedAt(Instant.now())
            .sequenceNumber(account.getEventSequenceNumber() + 1)
            .build();
 
        // Publish to Kafka (the event store)
        // This is the ONLY write - Kafka IS the source of truth
        kafkaTemplate.send("accounts.events.account-lifecycle", accountId, event)
            .get(5, TimeUnit.SECONDS); // Synchronous for command handlers
 
        return DepositResult.success(event.getEventId(), event.getSequenceNumber());
    }
}
 
// Account state is built by replaying events
@Service
public class AccountProjector {
 
    public Account getCurrentState(String accountId) {
        // Try snapshot first (for performance with long event histories)
        Optional<AccountSnapshot> snapshot = snapshotRepository.findLatest(accountId);
 
        Account account;
        long fromSequenceNumber;
 
        if (snapshot.isPresent()) {
            account = Account.fromSnapshot(snapshot.get());
            fromSequenceNumber = snapshot.get().getSequenceNumber() + 1;
        } else {
            account = Account.empty(accountId);
            fromSequenceNumber = 0;
        }
 
        // Replay events after snapshot
        List<AccountEvent> events = accountEventRepository.findByAccountIdAndSequenceGte(
            accountId, fromSequenceNumber
        );
 
        for (AccountEvent event : events) {
            account.apply(event);
        }
 
        return account;
    }
}

Projection Rebuilding via Kafka Replay

// The killer feature of event sourcing + Kafka: rebuild any projection from scratch
 
@Service
public class AccountBalanceProjectionRebuilder {
 
    /**
     * Rebuild the balance projection from scratch.
     * Called when: schema change, bug fix, new analytics requirement.
     * Reads ALL historical events from Kafka.
     */
    public void rebuildFromScratch() {
        log.info("Starting balance projection rebuild from Kafka offset 0");
 
        // Create a temporary consumer with a unique group ID for this rebuild
        String rebuildGroupId = "rebuild-balance-projection-" + Instant.now().toEpochMilli();
 
        try (KafkaConsumer<String, AccountEvent> consumer = createConsumer(rebuildGroupId)) {
            consumer.subscribe(Collections.singletonList("accounts.events.account-lifecycle"));
            consumer.seekToBeginning(consumer.assignment());
 
            // Clear existing projection (rebuild = fresh start)
            balanceProjectionRepository.truncate();
 
            Map<String, BigDecimal> inMemoryBalances = new HashMap<>();
            int count = 0;
 
            while (true) {
                ConsumerRecords<String, AccountEvent> records = consumer.poll(Duration.ofSeconds(1));
                if (records.isEmpty()) break; // Reached end of topic
 
                for (ConsumerRecord<String, AccountEvent> record : records) {
                    applyEventToProjection(record.value(), inMemoryBalances);
                    count++;
                }
 
                consumer.commitSync();
                if (count % 100_000 == 0) {
                    // Flush in-memory state to DB every 100K events
                    flushToDatabase(inMemoryBalances);
                    log.info("Rebuild progress: {} events processed", count);
                }
            }
 
            flushToDatabase(inMemoryBalances); // Final flush
            log.info("Rebuild complete: {} total events processed", count);
        }
    }
}

6. Multi-Region Active-Passive Architecture

The Problem

A company needs 99.99% uptime for its order processing system. They run in US-East as primary and EU-West as disaster recovery, requiring automatic failover in < 60 seconds.

Architecture

┌────────────────────────────────────────────────────────────────────────────┐
│                     MULTI-REGION ACTIVE-PASSIVE                             │
│                                                                             │
│  REGION: US-East (Active)                                                   │
│  ┌────────────────────────────────────────────────────────────────────┐    │
│  │  All writes → US Kafka Cluster                                       │    │
│  │  All consumers → US Kafka Cluster                                    │    │
│  │  Replication lag: 0ms (local)                                        │    │
│  └────────────────────┬───────────────────────────────────────────────┘    │
│                        │                                                    │
│                        │ MirrorMaker 2 (async replication)                  │
│                        │ Typical lag: 50–200ms                              │
│                        ▼                                                    │
│  REGION: EU-West (Passive / DR)                                             │
│  ┌────────────────────────────────────────────────────────────────────┐    │
│  │  Consumers ONLY WHEN US is unavailable                               │    │
│  │  Topic naming: us-east.order-events (mirrored from US)               │    │
│  │  Replication lag: 50–200ms behind US                                 │    │
│  └────────────────────────────────────────────────────────────────────┘    │
│                                                                             │
│  Failover: DNS switch + consumer group activation in EU                     │
│  RTO (Recovery Time Objective): < 60 seconds                               │
│  RPO (Recovery Point Objective): < 200ms (replication lag)                 │
└────────────────────────────────────────────────────────────────────────────┘

MirrorMaker 2 Configuration

# MirrorMaker 2 configuration (running in EU-West, replicating FROM US-East)
clusters = us-east, eu-west
 
us-east.bootstrap.servers = us-east-kafka-1:9093,us-east-kafka-2:9093,us-east-kafka-3:9093
eu-west.bootstrap.servers = eu-west-kafka-1:9093,eu-west-kafka-2:9093,eu-west-kafka-3:9093
 
# Replicate these critical topics
us-east->eu-west.topics = orders.events.order-lifecycle, \
                           payments.events.transaction-lifecycle, \
                           users.events.user-lifecycle
 
# Preserve source offsets for seamless consumer group failover
# This is the key: EU consumers can resume from EXACTLY where US consumers left off
us-east->eu-west.emit.checkpoints.enabled = true
us-east->eu-west.sync.group.offsets.enabled = true
us-east->eu-west.sync.group.offsets.interval.seconds = 10
 
# Topic name in EU: us-east.orders.events.order-lifecycle
# Consumers use the prefix to distinguish replicated topics from local topics

Automated Failover Procedure

// Health monitor that triggers failover
 
@Component
public class RegionFailoverController {
 
    private static final Duration US_EAST_TIMEOUT = Duration.ofSeconds(30);
    private volatile boolean failoverActive = false;
 
    @Scheduled(fixedDelay = 5000) // Check every 5 seconds
    public void checkPrimaryRegionHealth() {
        if (failoverActive) return;
 
        boolean usEastHealthy = checkUSEastHealth();
        if (!usEastHealthy) {
            log.error("US-East region health check failed - initiating failover to EU-West");
            initiateFailover();
        }
    }
 
    private void initiateFailover() {
        failoverActive = true;
 
        // Step 1: Switch DNS to EU-West load balancer (< 60 seconds TTL)
        dnsService.updateRecord("orders.api.company.com", euWestLoadBalancer);
 
        // Step 2: Start EU-West consumers (they were idle/suspended)
        // EU consumers use the MIRRORED topic with translated consumer group offsets
        euWestConsumerManager.activateConsumerGroups(
            "order-processor",
            "payment-processor",
            "notification-service"
        );
        // These consumers start from the same position US consumers left off
        // because MirrorMaker syncs consumer group offsets every 10 seconds
 
        // Step 3: Alert on-call team
        alertingService.sendCritical(
            "FAILOVER ACTIVE: Orders processing shifted to EU-West",
            Map.of("region", "EU-West", "trigger", "US-East health check failure")
        );
 
        log.info("Failover complete: EU-West is now processing orders");
    }
 
    private boolean checkUSEastHealth() {
        try {
            // Check: Can we connect to US-East Kafka broker?
            // Check: Is consumer lag within acceptable bounds?
            // Check: Can we reach US-East API health endpoint?
            return usEastKafkaAdmin.listTopics().names().get(US_EAST_TIMEOUT) != null
                && consumerLagMonitor.getMaxLag("order-processor") < 10_000
                && httpClient.get("https://us-east.internal/health").getStatusCode() == 200;
        } catch (Exception e) {
            log.warn("US-East health check failed: {}", e.getMessage());
            return false;
        }
    }
}

7. Gradual Migration from Synchronous to Async

The Problem

An existing monolith makes all downstream calls synchronously. The team wants to gradually introduce async messaging without a "big bang" rewrite.

The Strangler Fig Pattern for Async Migration

Migration stages:
Phase 0: Current state (fully synchronous)
  OrderService → direct call → InventoryService
  OrderService → direct call → EmailService
  OrderService → direct call → WarehouseService

Phase 1: Dual publish (no consumer changes)
  OrderService → direct call → InventoryService (unchanged)
  OrderService → direct call → EmailService (unchanged)
  OrderService → ALSO publishes to Kafka (new, no consumer yet)

  Goal: Validate that events are correct and complete
  Risk: None (Kafka has no consumers)
  Validation: Compare event content with actual service calls

Phase 2: Shadow consumers
  OrderService → direct call → InventoryService (unchanged)
  Kafka → shadow InventoryConsumer (processes but writes to shadow DB)

  Goal: Validate shadow consumer produces same result as synchronous call
  Risk: Shadow consumer writes do not affect production
  Validation: Compare shadow DB with production DB daily

Phase 3: Cut over lowest-risk service (email)
  OrderService → direct call → InventoryService (unchanged)
  OrderService → removes direct call to EmailService
  Kafka → EmailConsumer (now production, not shadow)

  Goal: First real async consumer in production
  Risk: Low (email delay is acceptable)
  Rollback: Re-enable direct call with feature flag

Phase 4: Cut over remaining services
  One service at a time, 1-week validation between each

Phase 5: Remove synchronous fallbacks
  After 90 days of stable async operation: remove dual publish and fallbacks

Feature-Flag-Based Migration Implementation

@Service
public class OrderService {
 
    @Value("${feature.async.email:false}")
    private boolean asyncEmail;
 
    @Value("${feature.async.warehouse:false}")
    private boolean asyncWarehouse;
 
    @Value("${feature.async.inventory:false}")
    private boolean asyncInventory;
 
    @Value("${feature.dual-publish:true}")
    private boolean dualPublish;
 
    public OrderResult placeOrder(OrderRequest request) {
        Order order = createOrder(request);
 
        // Always publish to Kafka (dual-publish during migration)
        if (dualPublish) {
            kafkaTemplate.send("order-events", order.getId(), OrderPlacedEvent.from(order));
        }
 
        // Service 1: Inventory (safest to migrate first)
        if (!asyncInventory) {
            inventoryService.reserve(order);  // Sync (old way)
        }
        // If asyncInventory=true: Kafka consumer handles this
 
        // Service 2: Email (already async)
        if (!asyncEmail) {
            emailService.sendConfirmation(order);  // Sync fallback
        }
        // If asyncEmail=true: Kafka consumer handles this
 
        // Service 3: Warehouse (higher risk - migrate last)
        if (!asyncWarehouse) {
            warehouseService.notifyFulfillment(order);  // Sync fallback
        }
        // If asyncWarehouse=true: Kafka consumer handles this
 
        return OrderResult.success(order.getId());
    }
}

8. Migrating Between Brokers Without Downtime

The Problem

A company needs to migrate from RabbitMQ to Apache Kafka because they've outgrown RabbitMQ's throughput and need event replay. The system processes 200,000 messages/day. Zero downtime is required.

7-Phase Migration Plan

Phase 1: Infrastructure Setup (Week 1)
  → Provision Kafka cluster alongside existing RabbitMQ
  → Set up Schema Registry and monitoring
  → No code changes yet

Phase 2: Dual-Publish Producer (Week 2)
  → Add Kafka publishing to all producer services (alongside existing RabbitMQ)
  → Kafka consumers do NOT exist yet - messages accumulate in Kafka
  → Validate: Kafka topic contains same messages as RabbitMQ queues

Phase 3: Shadow Kafka Consumers (Week 3)
  → Add Kafka consumers that process messages but write to shadow tables
  → Compare shadow table with production table (should match)
  → Fix any discrepancies in Kafka consumer logic

Phase 4: Migrate Low-Risk Consumers (Week 4)
  → Start with analytics consumers (read-only, no side effects on failure)
  → Disable RabbitMQ consumer for analytics service
  → Enable Kafka consumer for analytics service
  → 72-hour monitoring period before next migration

Phase 5: Migrate Medium-Risk Consumers (Week 5-6)
  → Notification service
  → Report generation service
  → 72-hour monitoring period after each

Phase 6: Migrate High-Risk Consumers (Week 7-8)
  → Inventory service
  → Warehouse service
  → 1-week monitoring period after each

Phase 7: Decommission RabbitMQ (Week 10-12)
  → Remove dual-publish from producers (publish only to Kafka)
  → Monitor for 2 weeks
  → Drain and shut down RabbitMQ
  → Remove RabbitMQ from infrastructure

Dual-Publish Implementation

// During migration: publish to BOTH brokers
 
@Service
public class MigrationAwareEventPublisher {
 
    @Value("${migration.dual-publish.kafka.enabled:false}")
    private boolean kafkaPublishEnabled;
 
    @Value("${migration.dual-publish.rabbitmq.enabled:true}")
    private boolean rabbitPublishEnabled;
 
    public void publish(DomainEvent event) {
        Throwable rabbitError = null;
        Throwable kafkaError = null;
 
        if (rabbitPublishEnabled) {
            try {
                rabbitTemplate.convertAndSend(
                    event.getExchange(), event.getRoutingKey(), event);
            } catch (Exception e) {
                rabbitError = e;
            }
        }
 
        if (kafkaPublishEnabled) {
            try {
                kafkaTemplate.send(event.getKafkaTopic(), event.getEntityId(), event).get(3, SECONDS);
            } catch (Exception e) {
                kafkaError = e;
            }
        }
 
        // Fail if the currently-primary broker fails
        // During early phases: RabbitMQ is primary
        // During later phases: Kafka is primary
        boolean kafkaIsPrimary = kafkaPublishEnabled && !rabbitPublishEnabled;
        if (kafkaIsPrimary && kafkaError != null) {
            throw new EventPublishException("Kafka (primary) publish failed", kafkaError);
        }
        if (!kafkaIsPrimary && rabbitError != null) {
            throw new EventPublishException("RabbitMQ (primary) publish failed", rabbitError);
        }
 
        // Log if secondary broker failed (alert but don't fail the request)
        if (rabbitError != null) log.warn("RabbitMQ (secondary) publish failed: {}", rabbitError.getMessage());
        if (kafkaError != null) log.warn("Kafka (secondary) publish failed: {}", kafkaError.getMessage());
    }
}

9. Industry-Specific Patterns

Fintech: Settlement Batch Window

// Financial settlement systems batch transactions for clearing house submission
// Window: 09:00 to 16:30 (market hours)
// Settlement: submitted at end of day for next-day clearing
 
@Service
public class SettlementWindowConsumer {
 
    @KafkaListener(topics = "payments.events.transaction-lifecycle",
                   groupId = "settlement-aggregator")
    public void accumulateForSettlement(TransactionEvent event) {
        // Calculate which settlement window this belongs to
        SettlementWindow window = windowCalculator.calculateWindow(event.getTimestamp());
 
        if (window.isClosed()) {
            // Transaction arrived after settlement window closed
            // Route to next-day settlement
            kafkaTemplate.send("payments.commands.next-day-settlement", event.getAccountId(), event);
            return;
        }
 
        // Accumulate into settlement batch for this window
        settlementBatchRepository.addTransaction(window.getWindowId(), event);
    }
 
    // Scheduled: end of each settlement window
    @Scheduled(cron = "0 30 16 * * MON-FRI")  // 4:30pm weekdays
    public void submitSettlementBatch() {
        SettlementWindow closedWindow = windowCalculator.closeCurrentWindow();
        List<TransactionEvent> batch = settlementBatchRepository.getAll(closedWindow.getWindowId());
 
        clearingHouseClient.submitBatch(SettlementBatch.of(closedWindow, batch));
        log.info("Settlement batch submitted: {} transactions, window: {}",
            batch.size(), closedWindow.getWindowId());
    }
}

Healthcare: HIPAA-Compliant Event Processing

// Healthcare events contain PHI (Protected Health Information)
// Must meet HIPAA audit, encryption, and access control requirements
 
@Configuration
public class HIPAACompliantKafkaConfig {
 
    @Bean
    public ProducerFactory<String, PatientEvent> hipaaProducerFactory() {
        Map<String, Object> config = new HashMap<>();
 
        // Encryption in transit (required)
        config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
 
        // Encryption at rest: Kafka log dirs on LUKS-encrypted volumes (broker config)
 
        // Field-level encryption for PHI fields (defense in depth)
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, HIIPAAEncryptingSerializer.class);
        config.put("hipaa.field.encryption.key.id", "${HIPAA_KEY_ID}");
        config.put("hipaa.field.encryption.kms.url", "${KMS_URL}");
 
        return new DefaultKafkaProducerFactory<>(config);
    }
}
 
@Service
public class PatientEventPublisher {
 
    public void publishAppointmentBooked(AppointmentBookedEvent event) {
        // Encrypt PHI fields before publishing
        AppointmentBookedEvent encrypted = AppointmentBookedEvent.builder()
            .appointmentId(event.getAppointmentId())  // Not PHI - not encrypted
            .patientIdHash(hashPHI(event.getPatientId()))  // Hash for correlation
            .encryptedPatientName(kmsEncrypt(event.getPatientName()))
            .encryptedDateOfBirth(kmsEncrypt(event.getDateOfBirth()))
            .encryptedDiagnosis(kmsEncrypt(event.getDiagnosis()))
            .appointmentType(event.getAppointmentType())  // Not PHI
            .timestamp(event.getTimestamp())
            .hipaaRequestId(hipaaAuditLogger.logAccess(event)) // Audit every access
            .build();
 
        kafkaTemplate.send("healthcare.events.appointments", event.getAppointmentId(), encrypted);
    }
}

E-Learning: Live Session Coordination

// Live class session: instructor actions must be synchronized to all students in < 200ms
 
@Service
public class LiveSessionEventBus {
 
    // Instructor actions published here
    @KafkaListener(topics = "elearning.events.session-actions",
                   groupId = "session-broadcast")
    public void broadcastToStudents(SessionActionEvent event) {
        // Fetch all active session participants
        List<String> participantIds = sessionParticipantCache.get(event.getSessionId());
 
        // Publish individual student notification events
        for (String participantId : participantIds) {
            kafkaTemplate.send("elearning.events.student-actions", participantId,
                StudentActionEvent.of(participantId, event));
        }
    }
 
    // Student WebSocket consumers receive their individual events
    @KafkaListener(topics = "elearning.events.student-actions",
                   groupId = "websocket-delivery")
    public void deliverToStudent(StudentActionEvent event) {
        webSocketSessionManager.send(event.getStudentId(), event);
        // Sub-100ms end-to-end for online students
    }
}

10. Architectural Decision Records (ADRs)

ADRs document why architectural decisions were made, not just what was decided. They prevent teams from re-debating settled questions.

ADR-001: Choose Kafka over RabbitMQ for Primary Event Bus

Context:
Our order management platform is growing from 100,000 to 2,000,000 orders/day. We need an event bus for order lifecycle events that multiple teams will consume.

Decision:
Use Apache Kafka (MSK) as the primary event bus.

Rationale:

CriterionRabbitMQKafkaOur Requirement
Throughput100K msg/sec1M+ msg/sec2M events/day peak = 23 msg/sec (both fine)
Message replayNoYes (30-day retention)REQUIRED: new services need to replay past orders
Consumer independenceBroker-trackedConsumer-trackedNeeded: analytics and order teams deploy independently
Team familiarityMediumLowNeither team has production experience
Operational complexityLowerHigherWe have SRE team capacity

Why we chose Kafka despite being harder:
The replay requirement is non-negotiable. When we onboard our new ML team next quarter, they need 90 days of order history. RabbitMQ cannot provide this. The operational complexity is accepted cost.

Alternatives Considered:

  • AWS SQS: Rejected (no replay, fan-out requires separate SNS topology)
  • RabbitMQ: Rejected (no message replay for future consumers)

Consequences:

  • Investment required in Kafka expertise (training 2 engineers)
  • Schema Registry added to infrastructure
  • Retention cost: 30 days × 2M events/day × 2KB average = ~120GB/month

Review Date: June 2027


ADR-002: Use Outbox Pattern Instead of Dual-Write

Context:
Order service writes to PostgreSQL (order DB) and also needs to publish to Kafka. Both must be consistent: if order saved, event must be published.

Options Evaluated:

Option A: Dual-write (write to DB then publish to Kafka)
  Risk: App crashes between DB write and Kafka publish
  Result: Order in DB but no event published
  Frequency: Could happen every deployment restart (5 times/week)

Option B: Publish to Kafka then write to DB
  Risk: App crashes between Kafka publish and DB write
  Result: Event published but order not in DB
  Frequency: Same risk, worse consequence (orphaned events)

Option C: Kafka Transactions (KIP-98)
  Limitation: Only atomic for Kafka-to-Kafka (not Kafka-to-PostgreSQL)
  Result: Does not solve our DB+Kafka atomic write requirement

Option D: Outbox Pattern
  Write to DB + outbox table in ONE PostgreSQL transaction
  Separate relay process reads outbox, publishes to Kafka, marks as published
  Result: DB and Kafka guaranteed consistent - no split possible
  Cost: Additional outbox table + relay process (low complexity)

Decision:
Use the Outbox Pattern.

Implementation:
Debezium CDC reads PostgreSQL WAL (Write-Ahead Log), publishes outbox table inserts to Kafka.

-- Outbox table schema
CREATE TABLE order_outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(50) NOT NULL,  -- 'ORDER'
    aggregate_id VARCHAR(36) NOT NULL,     -- orderId
    event_type VARCHAR(100) NOT NULL,      -- 'OrderPlaced'
    payload JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at TIMESTAMPTZ,              -- Null = not yet published
    INDEX idx_outbox_unpublished (published_at) WHERE published_at IS NULL
);

Consequences:

  • Debezium added to infrastructure (accepted: already used for CDC elsewhere)
  • Publish latency: 50–200ms delay (acceptable for our async use cases)
  • No dual-write inconsistency possible

ADR-003: At-Least-Once with Idempotent Consumer vs Exactly-Once Transactions

Context:
Payment processing consumer must never charge a customer twice. We debated Kafka transactions (exactly-once) vs. idempotent consumer (at-least-once delivery, exactly-once processing).

Analysis:

Kafka Transactions (exactly-once delivery):
  - Producer: enable.idempotence=true + transactional.id
  - Consumer: isolation.level=read_committed
  - Performance impact: 40-60% throughput reduction (transaction overhead)
  - Coverage: Only works for Kafka-to-Kafka pipelines
  - Our case: Consumer writes to PostgreSQL → Kafka transactions don't help

Idempotent Consumer (at-least-once + deduplication):
  - Consumer checks PostgreSQL for processed payment ID before processing
  - If already processed: skip (idempotent)
  - If not processed: charge + save payment record atomically
  - Performance impact: 1 DB SELECT per message (negligible at our scale)
  - Coverage: Works for any consumer (DB write, API call, etc.)
  - Stripe integration: Pass idempotency key to Stripe (third layer of protection)

Decision:
Use at-least-once delivery with three-layer idempotency:

  1. Kafka idempotent producer (prevents duplicate publish from producer retries)
  2. Consumer-side deduplication via PostgreSQL unique constraint on payment_id
  3. Stripe idempotency key in payment gateway call

Why not Kafka exactly-once:
Kafka transactions provide exactly-once for Kafka→Kafka only. Our consumer calls PostgreSQL and Stripe - outside Kafka's transaction boundary. Exactly-once transactions would add 50% overhead but still require idempotency on the non-Kafka side. We get all the same guarantees with less complexity by using idempotency directly.


ADR-004: Partition Key Strategy for Order Events

Context:
We need to decide how to partition order events. Options: by orderId, customerId, or no key (round-robin).

Requirements:

  1. All events for the same order must be processed in order (OrderPlaced before OrderShipped)
  2. Multiple orders should be processed in parallel (throughput requirement)
  3. No hot partitions (even distribution)

Analysis:

Option A: No key (round-robin)
  Distribution: Perfect
  Ordering: None (events for same order can go to different partitions)
  Result: Violates requirement 1

Option B: customerId
  Distribution: Risk of hot partitions for high-volume customers (B2B accounts)
  Ordering: Per-customer ordering guaranteed
  Problem: Requirement is per-order, not per-customer; customers create thousands of orders
  Result: Overly strict ordering (per-customer when per-order needed)

Option C: orderId
  Distribution: Excellent (UUIDs have uniform hash distribution)
  Ordering: Per-order ordering guaranteed
  Result: Meets all requirements

Option D: customerId + orderBucket (1-4)
  Needed if we had hot customers, but UUIDs are uniformly distributed
  Added complexity without benefit for our use case

Decision:
Use orderId as partition key.

Implementation:

kafkaTemplate.send("orders.events.order-lifecycle", order.getId(), event);
// hash(orderId) % partitionCount = partition assignment
// All events for the same orderId always go to the same partition

ADR-005: Consumer Group Per Downstream Service

Context:
Should we use one consumer group for all order event consumers, or a separate consumer group per downstream service?

Option A: One consumer group (shared processing)

Consumer Group: "order-events-processor"
  - Instance 1: processes inventory + email + warehouse
  - Instance 2: processes inventory + email + warehouse
  Problem: if email service is slow, it blocks inventory updates
  Problem: cannot scale email without scaling inventory
  Problem: email failure causes retry → blocks warehouse notification

Option B: One consumer group per service

Consumer Group: "inventory-service" - only inventory logic
Consumer Group: "email-service"     - only email logic
Consumer Group: "warehouse-service" - only warehouse logic
Consumer Group: "analytics-service" - only analytics logic

Benefit: Each service scales independently
Benefit: Email failure has zero impact on inventory processing
Benefit: Analytics lag does not affect critical services
Cost: More consumer groups to monitor (manageable)

Decision:
One consumer group per downstream service.

Reasoning:
Service isolation outweighs operational overhead. A consumer failure in email must not impact the inventory reservation SLA (30-second SLA). With shared consumer groups, one slow downstream creates head-of-line blocking for all others.

Configuration:

// Each service defines its own consumer group
@KafkaListener(topics = "orders.events.order-lifecycle",
               groupId = "inventory-service")   // Dedicated group
public void handleForInventory(OrderPlacedEvent event) { ... }
 
@KafkaListener(topics = "orders.events.order-lifecycle",
               groupId = "email-notification-service") // Dedicated group
public void handleForEmail(OrderPlacedEvent event) { ... }

Previous: Supplement 3 - Trade-Offs & Decision Guide
Index: Message Queues Demystified - Index