← Back to Articles
6/6/2026Admin Post

sharding springboot part2

Series: Sharding Demystified

Sharding in Java Spring Boot - Part 2: Advanced Patterns and Production Hardening

Snowflake IDs, Outbox Pattern, Scatter-Gather, Resilience, Migrations, Monitoring, and Testing


Table of Contents

  1. Snowflake ID Generation
  2. The Outbox Pattern: Reliable Cross-Service Events
  3. Cross-Shard Scatter-Gather Service
  4. Resilience4j: Circuit Breaker Per Shard
  5. Flyway: Schema Migrations Across All Shards
  6. Health Indicators Per Shard
  7. Micrometer Metrics Per Shard
  8. Integration Testing with Testcontainers
  9. The Complete Request Flow: End-to-End Trace

1. Snowflake ID Generation

Why Auto-Increment Is Broken for Sharding

-- Each shard has its own independent sequence
Shard 0: INSERT INTO users -> id = 1
Shard 1: INSERT INTO users -> id = 1  <- COLLISION
Shard 2: INSERT INTO users -> id = 1  <- COLLISION
 
SELECT * FROM users WHERE id = 1
  -> Which shard? You have no idea.
  -> You must scatter-gather ALL shards.

The Snowflake ID Bit Layout

 1 bit      41 bits              10 bits         12 bits
[0][milliseconds since epoch][machine/pod ID][per-ms sequence]
 ^                                                           ^
 Sign (always 0)                              Resets each millisecond

Example: 7,352,876,654,878,720
Binary:  0 00001101011010011001010101011101 0000000000 000000000000
         ^ timestamp since epoch             ^ machine    ^ sequence

SnowflakeIdGenerator.java

/**
 * Thread-safe Snowflake ID generator.
 *
 * Properties of generated IDs:
 * - 64-bit (fits in Java long, PostgreSQL BIGINT, JavaScript safe integer range)
 * - Globally unique across all machines (no coordination needed)
 * - Time-sortable (IDs generated later are numerically greater)
 * - Contains machine ID (helps identify origin of any ID)
 * - 4,096 unique IDs per millisecond per machine
 *   (= 4 million per second per machine - far exceeds any realistic need)
 */
@Component
@Slf4j
public class SnowflakeIdGenerator {
 
    // Custom epoch: 2024-01-01T00:00:00Z in milliseconds
    // Using a recent epoch keeps the 41-bit timestamp usable for ~69 years from this date
    private static final long CUSTOM_EPOCH = 1_704_067_200_000L;
 
    private static final int  MACHINE_ID_BITS = 10;
    private static final int  SEQUENCE_BITS   = 12;
 
    private static final long MAX_MACHINE_ID  = ~(-1L << MACHINE_ID_BITS);  // 1023
    private static final long MAX_SEQUENCE    = ~(-1L << SEQUENCE_BITS);    // 4095
 
    private static final int  MACHINE_SHIFT   = SEQUENCE_BITS;                       // 12
    private static final int  TIMESTAMP_SHIFT = SEQUENCE_BITS + MACHINE_ID_BITS;     // 22
 
    private final long machineId;
    private long lastTimestampMs = -1L;
    private long sequence = 0L;
 
    /**
     * machineId must be unique per running instance.
     *
     * Kubernetes deployment: use the pod ordinal index as machine ID.
     * Set via environment variable in the StatefulSet spec:
     *   env:
     *   - name: POD_INDEX
     *     valueFrom:
     *       fieldRef:
     *         fieldPath: metadata.annotations['apps.kubernetes.io/pod-index']
     *
     * Non-Kubernetes: use host IP last octet, or a ZooKeeper-assigned ID.
     */
    public SnowflakeIdGenerator(
            @Value("${app.snowflake.machine-id:0}") long machineId) {
 
        if (machineId < 0 || machineId > MAX_MACHINE_ID) {
            throw new IllegalArgumentException(
                "Machine ID must be between 0 and " + MAX_MACHINE_ID +
                ". Got: " + machineId
            );
        }
 
        this.machineId = machineId;
        log.info("SnowflakeIdGenerator initialized. " +
                 "machineId={}, maxIdsPerMs={}", machineId, MAX_SEQUENCE + 1);
    }
 
    /**
     * Generates the next globally unique ID.
     * Synchronized: thread-safe, one ID at a time per machine.
     *
     * For throughput > 4M IDs/sec per machine, use a lock-striped approach:
     * one SnowflakeIdGenerator instance per CPU core (different sequence namespaces).
     */
    public synchronized long nextId() {
        long now = System.currentTimeMillis();
 
        if (now == lastTimestampMs) {
            // Same millisecond: increment the per-ms sequence
            sequence = (sequence + 1) & MAX_SEQUENCE;
 
            if (sequence == 0) {
                // Sequence wrapped (all 4096 IDs in this ms used).
                // Spin-wait for the next millisecond.
                now = waitForNextMs(lastTimestampMs);
            }
        } else if (now > lastTimestampMs) {
            // New millisecond: reset sequence
            sequence = 0L;
        } else {
            // Clock went backwards. This can happen due to NTP adjustments.
            // Refuse to generate: a backwards clock creates duplicate IDs.
            long drift = lastTimestampMs - now;
            throw new SystemClockException(
                "System clock moved backwards by " + drift + "ms. " +
                "Refusing to generate IDs until clock catches up."
            );
        }
 
        lastTimestampMs = now;
 
        return ((now - CUSTOM_EPOCH) << TIMESTAMP_SHIFT)
             | (machineId << MACHINE_SHIFT)
             | sequence;
    }
 
    private long waitForNextMs(long lastMs) {
        long current = System.currentTimeMillis();
        while (current <= lastMs) {
            current = System.currentTimeMillis();
        }
        return current;
    }
 
    /**
     * Extract the creation timestamp from any Snowflake ID.
     * Useful for: debugging, TTL checks, audit logs.
     */
    public Instant extractTimestamp(long snowflakeId) {
        long timestampMs = (snowflakeId >>> TIMESTAMP_SHIFT) + CUSTOM_EPOCH;
        return Instant.ofEpochMilli(timestampMs);
    }
 
    /**
     * Extract the machine ID from any Snowflake ID.
     * Useful for: identifying which pod generated an ID, incident debugging.
     */
    public long extractMachineId(long snowflakeId) {
        return (snowflakeId >>> MACHINE_SHIFT) & MAX_MACHINE_ID;
    }
 
    /**
     * Extract the sequence number from any Snowflake ID.
     */
    public long extractSequence(long snowflakeId) {
        return snowflakeId & MAX_SEQUENCE;
    }
}
 
// Custom exception for clock issues
public class SystemClockException extends RuntimeException {
    public SystemClockException(String message) { super(message); }
}

Instagram-Style ID: Shard ID Embedded in the ID

An alternative to Snowflake that makes routing even faster — the shard number is baked into every ID:

/**
 * Instagram-style ID generation using PostgreSQL sequences.
 * Bit layout:
 *   [41 bits: milliseconds since epoch][13 bits: shard ID][10 bits: per-shard sequence]
 *
 * Key benefit: Given any entity ID, you can compute its shard without a lookup table.
 *   shard = (id >> 10) & 0x1FFF
 *
 * Requires a PostgreSQL sequence on EACH shard (each named "global_id_seq").
 * The sequence mod 1024 is the per-shard sequence number.
 */
@Component
@Slf4j
public class InstagramStyleIdGenerator {
 
    private static final long EPOCH = 1_704_067_200_000L;  // 2024-01-01
    private static final int  SHARD_ID_BITS  = 13;   // 8192 max shards
    private static final int  SEQUENCE_BITS  = 10;   // 1024 per millisecond per shard
 
    private final ShardContextHolder shardContextHolder;
    private final JdbcTemplate jdbcTemplate;
    private final ShardKeyResolver shardKeyResolver;
 
    /**
     * Generate an ID for a new entity that will be placed on the shard for the given key.
     * The shard ID is embedded in the returned ID.
     */
    public long nextId(long shardKey) {
        int shardId = shardKeyResolver.resolve(shardKey);
        return shardContextHolder.withShard(shardId, () -> generateForCurrentShard(shardId));
    }
 
    private long generateForCurrentShard(int shardId) {
        // Fetch next sequence value from this shard's PostgreSQL sequence
        Long seq = jdbcTemplate.queryForObject(
            "SELECT nextval('global_id_seq') % 1024",
            Long.class
        );
 
        long nowMs = System.currentTimeMillis();
        long timestampPart = (nowMs - EPOCH) << (SHARD_ID_BITS + SEQUENCE_BITS);
        long shardPart = (long) shardId << SEQUENCE_BITS;
        long seqPart = (seq != null ? seq : 0L);
 
        return timestampPart | shardPart | seqPart;
    }
 
    /**
     * Extract the shard ID from any ID generated by this generator.
     * Zero network calls. Zero database lookups.
     */
    public int extractShardId(long id) {
        return (int)((id >> SEQUENCE_BITS) & ((1L << SHARD_ID_BITS) - 1));
    }
 
    /**
     * Route directly to the correct shard using the ID itself.
     * This eliminates the need to know the user_id to route an order lookup.
     */
    public <T> T withIdRouting(long entityId, Supplier<T> operation) {
        int shardId = extractShardId(entityId);
        return shardContextHolder.withShard(shardId, operation);
    }
}

Usage with Instagram-style IDs:

// Find order by orderId alone - no userId needed for routing
public Optional<Order> getOrderById(Long orderId) {
    // Shard ID is embedded in the orderId itself
    return instagramIdGenerator.withIdRouting(orderId,
        () -> orderRepository.findById(orderId)
    );
}

2. The Outbox Pattern: Reliable Cross-Service Events

The Problem It Solves

// UNRELIABLE - Two separate operations. If Kafka publish fails after DB commit,
// the event is lost. Downstream services never know the order was created.
@Transactional
public Order placeOrder(...) {
    Order order = orderRepository.save(new Order(...));
    kafkaTemplate.send("order.created", event);  // Can fail independently!
    return order;
}
 
// RELIABLE - Both the order and the outbox event are written in ONE transaction.
// If the transaction commits, the event WILL eventually be published (by the relay).
// If the transaction rolls back, neither the order nor the outbox event is saved.
@Transactional
public Order placeOrder(...) {
    Order order = orderRepository.save(new Order(...));
    outboxRepository.save(new OutboxEvent("OrderCreated", toJson(order)));
    return order;
    // Kafka publish happens LATER by the outbox relay, outside this transaction
}

OutboxEvent.java

@Entity
@Table(
    name = "outbox_events",
    indexes = {
        @Index(name = "idx_outbox_status_created", columnList = "status, created_at")
    }
)
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OutboxEvent {
 
    @Id
    private Long id;  // Snowflake ID
 
    @Column(name = "aggregate_id",   nullable = false, length = 100)
    private String aggregateId;   // The ID of the entity (order_id, user_id)
 
    @Column(name = "aggregate_type", nullable = false, length = 100)
    private String aggregateType; // "Order", "User", "Payment"
 
    @Column(name = "event_type",     nullable = false, length = 100)
    private String eventType;     // "OrderCreated", "UserRegistered"
 
    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;       // JSON serialized event data
 
    @Enumerated(EnumType.STRING)
    @Column(nullable = false, length = 20)
    private OutboxStatus status;  // PENDING, PUBLISHED, DEAD_LETTER
 
    @Column(name = "retry_count", nullable = false)
    @Builder.Default
    private int retryCount = 0;
 
    @Column(name = "last_error", length = 1000)
    private String lastError;
 
    @Column(name = "created_at", nullable = false, updatable = false)
    private Instant createdAt;
 
    @Column(name = "published_at")
    private Instant publishedAt;
 
    @PrePersist
    void prePersist() {
        if (createdAt == null) createdAt = Instant.now();
    }
}
 
public enum OutboxStatus {
    PENDING,
    PUBLISHED,
    DEAD_LETTER  // Failed after max retries - needs manual intervention
}

OutboxRepository.java

@Repository
public interface OutboxRepository extends JpaRepository<OutboxEvent, Long> {
 
    // Fetch a batch of pending events for processing
    List<OutboxEvent> findTop50ByStatusOrderByCreatedAtAsc(OutboxStatus status);
 
    // Count per shard for monitoring
    long countByStatus(OutboxStatus status);
 
    // Cleanup old published events (run periodically)
    @Modifying
    @Query("DELETE FROM OutboxEvent o WHERE o.status = 'PUBLISHED' AND o.publishedAt < :cutoff")
    int deletePublishedEventsBefore(@Param("cutoff") Instant cutoff);
}

OutboxEventRelay.java - Polls All Shards and Publishes to Kafka

/**
 * Runs every 200ms, polls each shard for PENDING outbox events, and publishes to Kafka.
 *
 * This is the "polling" approach to the Outbox Pattern.
 * Alternative (better for low latency): Use Debezium CDC to react to DB changes instantly.
 *
 * The polling approach adds up to 200ms latency to event publication.
 * For most use cases (notifications, downstream async processing), this is acceptable.
 */
@Component
@Slf4j
public class OutboxEventRelay {
 
    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ShardContextHolder shardContextHolder;
    private final ShardProperties shardProperties;
    private final MeterRegistry meterRegistry;
 
    // Topic routing: event type -> Kafka topic name
    private static final Map<String, String> TOPIC_MAP = Map.of(
        "UserCreated",      "user.events",
        "UserDeleted",      "user.events",
        "OrderCreated",     "order.events",
        "OrderCancelled",   "order.events",
        "PaymentProcessed", "payment.events",
        "PaymentFailed",    "payment.events"
    );
 
    @Scheduled(fixedDelay = 200)  // Run every 200ms
    public void relayPendingEvents() {
        int numShards = shardProperties.getNumShards();
 
        for (int shardId = 0; shardId < numShards; shardId++) {
            final int id = shardId;
            try {
                shardContextHolder.withShard(id, () -> {
                    processShardOutbox(id);
                    return null;
                });
            } catch (Exception e) {
                log.error("Outbox relay failed on shard {}: {}", id, e.getMessage());
                // Continue to next shard - one failing shard does not block others
            }
        }
    }
 
    @Transactional
    protected void processShardOutbox(int shardId) {
        List<OutboxEvent> events = outboxRepository
            .findTop50ByStatusOrderByCreatedAtAsc(OutboxStatus.PENDING);
 
        if (events.isEmpty()) return;
 
        log.debug("Processing {} outbox events on shard {}", events.size(), shardId);
 
        for (OutboxEvent event : events) {
            try {
                String topic = resolveTopic(event.getEventType());
 
                // Synchronous Kafka send with ACK - we need confirmation before marking done
                kafkaTemplate.send(
                    topic,
                    event.getAggregateId(),  // Partition key (keeps related events ordered)
                    event.getPayload()
                ).get(5, TimeUnit.SECONDS);  // Timeout: if Kafka is slow, fail fast
 
                event.setStatus(OutboxStatus.PUBLISHED);
                event.setPublishedAt(Instant.now());
 
                Counter.builder("outbox.events.published")
                    .tag("shard", String.valueOf(shardId))
                    .tag("event_type", event.getEventType())
                    .register(meterRegistry)
                    .increment();
 
            } catch (Exception e) {
                event.setRetryCount(event.getRetryCount() + 1);
                event.setLastError(truncate(e.getMessage(), 900));
 
                if (event.getRetryCount() >= 5) {
                    event.setStatus(OutboxStatus.DEAD_LETTER);
                    log.error("DEAD LETTER: Outbox event {} on shard {} failed {} times. " +
                              "Event type: {}. Last error: {}",
                        event.getId(), shardId, event.getRetryCount(),
                        event.getEventType(), e.getMessage());
 
                    // Alert: dead letter requires human investigation
                    Counter.builder("outbox.dead_letters")
                        .tag("shard", String.valueOf(shardId))
                        .tag("event_type", event.getEventType())
                        .register(meterRegistry)
                        .increment();
                } else {
                    log.warn("Outbox event {} failed (attempt {}): {}",
                        event.getId(), event.getRetryCount(), e.getMessage());
                }
            }
 
            outboxRepository.save(event);
        }
    }
 
    private String resolveTopic(String eventType) {
        String topic = TOPIC_MAP.get(eventType);
        if (topic <mark class="obsidian-highlight"> null) {
            throw new IllegalArgumentException("No Kafka topic configured for event type: " + eventType);
        }
        return topic;
    }
 
    private String truncate(String s, int maxLen) {
        if (s </mark> null) return null;
        return s.length() > maxLen ? s.substring(0, maxLen) : s;
    }
 
    /**
     * Periodic cleanup: remove published events older than 7 days.
     * Prevents the outbox table from growing indefinitely.
     */
    @Scheduled(cron = "0 0 2 * * *")  // Daily at 2 AM
    public void cleanupPublishedEvents() {
        Instant cutoff = Instant.now().minus(7, ChronoUnit.DAYS);
        int numShards = shardProperties.getNumShards();
 
        for (int shardId = 0; shardId < numShards; shardId++) {
            final int id = shardId;
            shardContextHolder.withShard(id, () -> {
                int deleted = outboxRepository.deletePublishedEventsBefore(cutoff);
                log.info("Cleaned up {} published outbox events from shard {}", deleted, id);
                return null;
            });
        }
    }
}

3. Cross-Shard Scatter-Gather Service

/**
 * Service for queries that cannot be answered from a single shard.
 * These queries require hitting ALL shards and aggregating results.
 *
 * USE SPARINGLY: Every scatter-gather query consumes resources on ALL shards.
 * For high-frequency analytics, use a separate OLAP pipeline (Kafka + Flink + Redis).
 * Reserve scatter-gather for: admin dashboards, low-frequency reports, one-off queries.
 */
@Service
@Slf4j
public class CrossShardQueryService {
 
    private final UserRepository userRepository;
    private final OrderRepository orderRepository;
    private final ShardContextHolder shardContextHolder;
    private final ShardProperties shardProperties;
    private final ExecutorService scatterGatherPool;
    private final MeterRegistry meterRegistry;
 
    public CrossShardQueryService(
            UserRepository userRepository,
            OrderRepository orderRepository,
            ShardContextHolder shardContextHolder,
            ShardProperties shardProperties,
            MeterRegistry meterRegistry) {
 
        this.userRepository = userRepository;
        this.orderRepository = orderRepository;
        this.shardContextHolder = shardContextHolder;
        this.shardProperties = shardProperties;
        this.meterRegistry = meterRegistry;
 
        // Fixed-size pool: one thread per shard.
        // All shards queried in parallel. Bounded parallelism (won't exceed num shards).
        int numShards = shardProperties.getNumShards();
        this.scatterGatherPool = Executors.newFixedThreadPool(numShards, r -> {
            Thread t = new Thread(r, "scatter-gather-" + System.nanoTime());
            t.setDaemon(true);
            return t;
        });
    }
 
    /**
     * Count active users across all shards.
     * Uses parallel scatter-gather for speed.
     *
     * Latency: max(slowest shard query) + aggregation overhead
     * Typical: 5-20ms for a simple COUNT query
     */
    public long countAllActiveUsers() {
        return executeScatterGather(
            shardId -> userRepository.countActiveUsers(),
            "count_active_users"
        ).stream().mapToLong(Long::longValue).sum();
    }
 
    /**
     * Count pending orders across all shards.
     */
    public long countPendingOrdersAllShards() {
        return executeScatterGather(
            shardId -> orderRepository.countPendingOrders(),
            "count_pending_orders"
        ).stream().mapToLong(Long::longValue).sum();
    }
 
    /**
     * Find users created in a time range - inherently cross-shard.
     * Result is sorted by createdAt.
     *
     * WARNING: Can return a large result set. Always use with a limit.
     */
    public List<User> findUsersCreatedBetween(Instant from, Instant to, int maxPerShard) {
        List<List<User>> shardResults = executeScatterGather(
            shardId -> userRepository.findUsersCreatedBetween(from, to),
            "users_by_date_range"
        );
 
        return shardResults.stream()
            .flatMap(Collection::stream)
            .sorted(Comparator.comparing(User::getCreatedAt))
            .toList();
    }
 
    /**
     * Aggregate total order revenue across all shards.
     */
    public BigDecimal getTotalRevenueAllShards() {
        return executeScatterGather(
            shardId -> orderRepository.sumAllOrderAmounts().orElse(BigDecimal.ZERO),
            "total_revenue"
        ).stream()
            .filter(Objects::nonNull)
            .reduce(BigDecimal.ZERO, BigDecimal::add);
    }
 
    /**
     * Core scatter-gather executor.
     * Sends the query to all shards in parallel, waits for all results, returns.
     *
     * Partial failure handling: failed shards return null. Caller receives partial results.
     * The metric "scatter_gather_partial_failures" tracks how often this happens.
     */
    private <T> List<T> executeScatterGather(
            Function<Integer, T> shardOperation,
            String operationName) {
 
        int numShards = shardProperties.getNumShards();
        Timer.Sample timer = Timer.start(meterRegistry);
        int[] failedShards = {0};
 
        List<CompletableFuture<T>> futures = IntStream.range(0, numShards)
            .mapToObj(shardId ->
                CompletableFuture.supplyAsync(
                    () -> shardContextHolder.withShard(
                        shardId,
                        () -> shardOperation.apply(shardId)
                    ),
                    scatterGatherPool
                )
                .orTimeout(10, TimeUnit.SECONDS)
                .exceptionally(ex -> {
                    failedShards[0]++;
                    log.error("Scatter-gather shard {} failed for operation {}: {}",
                        shardId, operationName, ex.getMessage());
                    return null;  // Partial result for this shard
                })
            )
            .toList();
 
        // Wait for all futures to complete (or timeout)
        List<T> results = futures.stream()
            .map(f -> {
                try { return f.get(); }
                catch (Exception e) { return null; }
            })
            .filter(Objects::nonNull)
            .toList();
 
        // Record metrics
        timer.stop(Timer.builder("scatter_gather.duration")
            .tag("operation", operationName)
            .tag("shards_queried", String.valueOf(numShards))
            .register(meterRegistry));
 
        if (failedShards[0] > 0) {
            log.warn("Scatter-gather {} completed with {}/{} shard failures",
                operationName, failedShards[0], numShards);
            Counter.builder("scatter_gather.partial_failures")
                .tag("operation", operationName)
                .register(meterRegistry)
                .increment(failedShards[0]);
        }
 
        return results;
    }
}

4. Resilience4j: Circuit Breaker Per Shard

When a database shard goes down, every request to that shard hangs until timeout (typically 30s). With 20 threads in the pool, 20 simultaneous requests each hanging 30s will exhaust the thread pool and bring down the entire application — not just requests for that shard.

The circuit breaker prevents this by failing fast once it detects a shard is down.

/**
 * Resilient user service with per-shard circuit breakers.
 *
 * States of a circuit breaker:
 *   CLOSED  (normal): Requests pass through to the database shard.
 *   OPEN    (shard down): Requests fail immediately with CircuitBreakerOpenException.
 *                          No waiting for timeouts. Falls back to cache.
 *   HALF_OPEN (recovering): A few test requests are sent. If they succeed, CLOSED.
 *                            If they fail, back to OPEN.
 */
@Service
@Slf4j
public class ResilientUserService {
 
    private final UserRepository userRepository;
    private final ShardContextHolder shardContextHolder;
    private final ShardKeyResolver shardKeyResolver;
    private final CircuitBreakerRegistry circuitBreakerRegistry;
    private final RedisTemplate<String, User> redisTemplate;
 
    /**
     * Approach 1: Programmatic circuit breaker (fine-grained control per shard)
     *
     * Each shard has its own named circuit breaker (shard-0, shard-1, etc.).
     * If shard-2 goes down, only shard-2's circuit opens.
     * Requests for shard-0, shard-1, shard-3 are completely unaffected.
     */
    public Optional<User> getUserById(Long userId) {
        int shardId = shardKeyResolver.resolve(userId);
        String cbName = "shard-" + shardId;
 
        CircuitBreaker cb = circuitBreakerRegistry.circuitBreaker(cbName);
 
        try {
            return cb.executeSupplier(() ->
                shardContextHolder.withShard(shardId,
                    () -> userRepository.findById(userId)
                )
            );
        } catch (CallNotPermittedException e) {
            // Circuit is OPEN (shard is down). Fail fast with fallback.
            log.warn("Circuit OPEN for shard {}. Serving user {} from cache.", shardId, userId);
            return getUserFromCache(userId);
 
        } catch (Exception e) {
            // Other DB exception (not circuit related)
            log.error("DB error for user {} on shard {}: {}", userId, shardId, e.getMessage());
            return getUserFromCache(userId);
        }
    }
 
    private Optional<User> getUserFromCache(Long userId) {
        User cached = redisTemplate.opsForValue().get("user:" + userId);
        return Optional.ofNullable(cached);
    }
 
    /**
     * Approach 2: Declarative via @CircuitBreaker annotation (simpler for uniform handling)
     *
     * Limitation: All shards share the same circuit breaker (name="default-shard").
     * One bad shard opens the circuit for ALL shards. Use Approach 1 for per-shard isolation.
     */
    @CircuitBreaker(name = "default-shard", fallbackMethod = "getUserFallback")
    @ShardedOperation(shardKeyParam = "userId")
    public Optional<User> getUserByIdDeclarative(Long userId) {
        return userRepository.findById(userId);
    }
 
    private Optional<User> getUserFallback(Long userId, Exception ex) {
        log.warn("Fallback triggered for user {}. Cause: {}", userId, ex.getMessage());
        return getUserFromCache(userId);
    }
 
    /**
     * Register event listeners to monitor circuit breaker state changes.
     * In production, these should trigger PagerDuty/OpsGenie alerts.
     */
    @PostConstruct
    public void registerCircuitBreakerListeners() {
        int numShards = shardProperties.getNumShards();
 
        for (int i = 0; i < numShards; i++) {
            String cbName = "shard-" + i;
            CircuitBreaker cb = circuitBreakerRegistry.circuitBreaker(cbName);
            final int shardId = i;
 
            cb.getEventPublisher()
                .onStateTransition(event -> {
                    log.warn("CIRCUIT BREAKER STATE CHANGE: shard={}, from={}, to={}",
                        shardId,
                        event.getStateTransition().getFromState(),
                        event.getStateTransition().getToState()
                    );
 
                    if (event.getStateTransition().getToState() == CircuitBreaker.State.OPEN) {
                        // Alert: shard is down
                        alertingService.sendAlert(
                            "SHARD_DOWN",
                            "Database shard " + shardId + " circuit breaker OPEN. " +
                            "10% of users affected. Investigate immediately."
                        );
                    }
                });
        }
    }
}

application.yml Circuit Breaker Configuration

resilience4j:
  circuitbreaker:
    instances:
      shard-0:
        # Open circuit after 50% failure rate over last 10 calls
        slidingWindowSize: 10
        failureRateThreshold: 50
        # Keep circuit open for 30s before trying again
        waitDurationInOpenState: 30s
        # Allow 3 test calls in HALF_OPEN state before deciding to CLOSE or re-OPEN
        permittedNumberOfCallsInHalfOpenState: 3
        # Automatically transition from OPEN to HALF_OPEN after waitDuration
        automaticTransitionFromOpenToHalfOpenEnabled: true
        # Ignore these exceptions when calculating failure rate
        ignoreExceptions:
          - com.example.sharding.exception.UserNotFoundException
          - com.example.sharding.exception.OrderNotFoundException
      # shard-1, shard-2, shard-3 have identical config
      shard-1:
        slidingWindowSize: 10
        failureRateThreshold: 50
        waitDurationInOpenState: 30s
        permittedNumberOfCallsInHalfOpenState: 3
        automaticTransitionFromOpenToHalfOpenEnabled: true

5. Flyway: Schema Migrations Across All Shards

The Challenge

A single @SpringBootApplication with Flyway auto-configuration migrates only one DataSource.
With 4 shards, you need Flyway to run on all 4 databases.
But they must all succeed or the application must not start (partial migration = inconsistent cluster).

ShardedFlywayMigrationRunner.java

/**
 * Runs Flyway migrations on ALL shards during application startup.
 *
 * Execution model:
 * - Sequential: one shard at a time (avoids simultaneous lock conflicts)
 * - Fail-fast: if any shard fails migration, application startup aborts
 * - Idempotent: Flyway tracks which migrations ran per shard (flyway_schema_history table)
 *
 * Migration files location: classpath:db/migration/
 * Example: V1__create_tables.sql, V2__add_indexes.sql, V3__add_outbox_table.sql
 */
@Component
@Slf4j
@DependsOn("shardDataSources")  // Ensure DataSources are created before this runs
public class ShardedFlywayMigrationRunner implements ApplicationRunner {
 
    @Qualifier("shardDataSources")
    private final Map<Integer, DataSource> shardDataSources;
 
    public ShardedFlywayMigrationRunner(
            @Qualifier("shardDataSources") Map<Integer, DataSource> shardDataSources) {
        this.shardDataSources = shardDataSources;
    }
 
    @Override
    public void run(ApplicationArguments args) {
        log.info("Starting Flyway migrations for {} shards", shardDataSources.size());
        long startTime = System.currentTimeMillis();
 
        // Sort by shard ID for deterministic ordering
        new TreeMap<>(shardDataSources).forEach((shardId, dataSource) -> {
            migrateShard(shardId, dataSource);
        });
 
        long elapsed = System.currentTimeMillis() - startTime;
        log.info("All shard migrations completed successfully in {}ms", elapsed);
    }
 
    private void migrateShard(int shardId, DataSource dataSource) {
        log.info("Migrating shard {}...", shardId);
 
        try {
            Flyway flyway = Flyway.configure()
                .dataSource(dataSource)
                .locations("classpath:db/migration")   // Shared migration scripts
                .table("flyway_schema_history")         // Same tracking table name per shard
                .validateOnMigrate(true)                // Fail if checksums change
                .outOfOrder(false)                      // Enforce sequential migrations
                .baselineOnMigrate(false)               // Do not auto-baseline empty DBs
                .load();
 
            MigrateResult result = flyway.migrate();
 
            if (result.migrationsExecuted > 0) {
                log.info("Shard {}: Applied {} migration(s). Schema version: {}",
                    shardId, result.migrationsExecuted, result.targetSchemaVersion);
            } else {
                log.info("Shard {}: Schema up-to-date at version {}",
                    shardId, result.targetSchemaVersion);
            }
 
        } catch (FlywayException e) {
            // FAIL FAST: migration failure means the shard's schema is wrong.
            // The application must not start with inconsistent schema across shards.
            throw new RuntimeException(
                String.format("FATAL: Flyway migration failed on shard %d. " +
                              "Application startup aborted. Fix the migration and restart. " +
                              "Error: %s", shardId, e.getMessage()),
                e
            );
        }
    }
}

Example Migration Files

-- V1__create_tables.sql - Runs on every shard
-- Identical schema on all shards. Data differs. Schema is same.
 
CREATE TABLE IF NOT EXISTS users (
    id           BIGINT        PRIMARY KEY,
    email        VARCHAR(255)  NOT NULL UNIQUE,
    full_name    VARCHAR(255)  NOT NULL,
    phone_number VARCHAR(20),
    status       VARCHAR(20)   NOT NULL DEFAULT 'ACTIVE',
    created_at   TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    updated_at   TIMESTAMPTZ   NOT NULL DEFAULT NOW()
);
 
CREATE INDEX IF NOT EXISTS idx_users_email      ON users(email);
CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at);
 
CREATE TABLE IF NOT EXISTS orders (
    id               BIGINT          PRIMARY KEY,
    user_id          BIGINT          NOT NULL,
    total_amount     NUMERIC(12, 2)  NOT NULL,
    status           VARCHAR(30)     NOT NULL DEFAULT 'PENDING',
    delivery_address VARCHAR(500),
    created_at       TIMESTAMPTZ     NOT NULL DEFAULT NOW()
);
 
CREATE INDEX IF NOT EXISTS idx_orders_user_id    ON orders(user_id);
CREATE INDEX IF NOT EXISTS idx_orders_created_at ON orders(created_at);
 
CREATE TABLE IF NOT EXISTS outbox_events (
    id             BIGINT        PRIMARY KEY,
    aggregate_id   VARCHAR(100)  NOT NULL,
    aggregate_type VARCHAR(100)  NOT NULL,
    event_type     VARCHAR(100)  NOT NULL,
    payload        TEXT          NOT NULL,
    status         VARCHAR(20)   NOT NULL DEFAULT 'PENDING',
    retry_count    INT           NOT NULL DEFAULT 0,
    last_error     VARCHAR(1000),
    created_at     TIMESTAMPTZ   NOT NULL DEFAULT NOW(),
    published_at   TIMESTAMPTZ
);
 
CREATE INDEX IF NOT EXISTS idx_outbox_status_created ON outbox_events(status, created_at);
-- V2__add_delivery_notes.sql - Backward-compatible column addition
-- Safe to run while application is live (nullable column, no default value)
 
ALTER TABLE orders ADD COLUMN IF NOT EXISTS delivery_notes TEXT;

6. Health Indicators Per Shard

Spring Boot Actuator health endpoint automatically includes this indicator:
GET /actuator/health

/**
 * Custom health indicator that checks each shard independently.
 *
 * Response when all shards are healthy:
 * {
 *   "status": "UP",
 *   "components": {
 *     "shards": {
 *       "status": "UP",
 *       "details": {
 *         "shard-0": "UP (latency: 1ms)",
 *         "shard-1": "UP (latency: 2ms)",
 *         "shard-2": "UP (latency: 1ms)",
 *         "shard-3": "UP (latency: 1ms)"
 *       }
 *     }
 *   }
 * }
 *
 * Response when shard-2 is down:
 * {
 *   "status": "DOWN",
 *   "components": {
 *     "shards": {
 *       "status": "DOWN",
 *       "details": {
 *         "shard-0": "UP (latency: 1ms)",
 *         "shard-1": "UP (latency: 2ms)",
 *         "shard-2": "DOWN: Connection refused",
 *         "shard-3": "UP (latency: 1ms)"
 *       }
 *     }
 *   }
 * }
 */
@Component("shards")
@Slf4j
public class ShardHealthIndicator implements HealthIndicator {
 
    @Qualifier("shardDataSources")
    private final Map<Integer, DataSource> shardDataSources;
 
    public ShardHealthIndicator(
            @Qualifier("shardDataSources") Map<Integer, DataSource> shardDataSources) {
        this.shardDataSources = shardDataSources;
    }
 
    @Override
    public Health health() {
        Map<String, Object> details = new LinkedHashMap<>();
        boolean anyDown = false;
 
        for (Map.Entry<Integer, DataSource> entry : new TreeMap<>(shardDataSources).entrySet()) {
            int shardId = entry.getKey();
            String key = "shard-" + shardId;
 
            ShardHealthResult result = checkShard(entry.getValue());
            details.put(key, result.message());
 
            if (!result.healthy()) {
                anyDown = true;
                log.warn("Health check failed for shard {}: {}", shardId, result.message());
            }
        }
 
        return anyDown
            ? Health.down().withDetails(details).build()
            : Health.up().withDetails(details).build();
    }
 
    private ShardHealthResult checkShard(DataSource dataSource) {
        long start = System.currentTimeMillis();
 
        try (Connection conn = dataSource.getConnection()) {
            if (conn.isValid(2)) {  // 2-second connection validation timeout
                long latency = System.currentTimeMillis() - start;
                return new ShardHealthResult(true, "UP (latency: " + latency + "ms)");
            }
            return new ShardHealthResult(false, "DOWN: Connection invalid");
        } catch (Exception e) {
            return new ShardHealthResult(false, "DOWN: " + e.getMessage());
        }
    }
 
    record ShardHealthResult(boolean healthy, String message) {}
}

7. Micrometer Metrics Per Shard

/**
 * AOP aspect that records per-shard operation metrics.
 * Every service method execution is timed and tagged with the shard ID.
 *
 * Metrics emitted (available via /actuator/prometheus):
 *   db_shard_operation_duration_seconds{shard="0", method="getUserById", status="success"}
 *   db_shard_operation_duration_seconds{shard="2", method="placeOrder",  status="error"}
 *   db_shard_errors_total{shard="1", method="getOrderById", exception="DataAccessException"}
 *
 * These metrics power Grafana dashboards like:
 *   - Per-shard query rate (ops/sec)
 *   - Per-shard error rate (%)
 *   - Per-shard P50/P95/P99 latency
 *   - Hot shard detection (one shard significantly higher rate than others)
 */
@Aspect
@Component
@Slf4j
public class ShardMetricsAspect {
 
    private final MeterRegistry meterRegistry;
 
    public ShardMetricsAspect(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }
 
    @Around("execution(* com.example.sharding.service..*(..))")
    public Object recordShardMetrics(ProceedingJoinPoint joinPoint) throws Throwable {
        String methodName = joinPoint.getSignature().getName();
        String className  = joinPoint.getTarget().getClass().getSimpleName();
        String shardTag   = resolveShardTag();
 
        Timer.Sample sample = Timer.start(meterRegistry);
        String status = "success";
 
        try {
            return joinPoint.proceed();
 
        } catch (Exception e) {
            status = "error";
 
            // Record error counter with shard, class, method, exception type
            Counter.builder("db.shard.errors")
                .tag("shard",     shardTag)
                .tag("class",     className)
                .tag("method",    methodName)
                .tag("exception", e.getClass().getSimpleName())
                .register(meterRegistry)
                .increment();
 
            throw e;
 
        } finally {
            String finalStatus = status;
 
            sample.stop(
                Timer.builder("db.shard.operation.duration")
                    .tag("shard",  shardTag)
                    .tag("class",  className)
                    .tag("method", methodName)
                    .tag("status", finalStatus)
                    .description("Duration of sharded DB operations in the service layer")
                    .register(meterRegistry)
            );
        }
    }
 
    private String resolveShardTag() {
        Integer shardId = ShardContext.get();
        return shardId != null ? String.valueOf(shardId) : "unrouted";
    }
}

HikariCP Pool Metrics Per Shard (Automatic)

HikariCP integrates with Micrometer automatically when both are on the classpath.
Because each pool has a distinct poolName, metrics are automatically tagged:

hikaricp_connections_active{pool="HikariPool-Shard-0"}
hikaricp_connections_active{pool="HikariPool-Shard-1"}
hikaricp_connections_pending{pool="HikariPool-Shard-2"}  <- Connections waiting
hikaricp_connection_timeout_total{pool="HikariPool-Shard-3"}  <- Pool exhaustion

Grafana alert rule for pool exhaustion:

# Alert if any shard's pending connections > 5 for 1 minute
hikaricp_connections_pending{pool=~"HikariPool-Shard-.*"} > 5

8. Integration Testing with Testcontainers

TestcontainersShardConfig.java

/**
 * Test configuration that starts one PostgreSQL container per shard.
 * Containers are started ONCE for the entire test class (not per test).
 * Uses @DynamicPropertySource to inject container URLs into Spring context.
 */
@TestConfiguration
public class TestcontainersShardConfig {
 
    private static final int NUM_TEST_SHARDS = 4;
 
    // Static: containers are shared across all test instances in this class
    static final List<PostgreSQLContainer<?>> SHARD_CONTAINERS;
 
    static {
        SHARD_CONTAINERS = IntStream.range(0, NUM_TEST_SHARDS)
            .mapToObj(i ->
                new PostgreSQLContainer<>("postgres:15-alpine")
                    .withDatabaseName("shard_" + i + "_test")
                    .withUsername("test_user")
                    .withPassword("test_pass")
                    .withReuse(true)   // Reuse containers across test runs (Testcontainers Desktop)
            )
            .toList();
 
        // Start all containers in parallel - faster than sequential start
        SHARD_CONTAINERS.parallelStream().forEach(PostgreSQLContainer::start);
    }
 
    @Bean("shardDataSources")
    public Map<Integer, DataSource> shardDataSources() {
        Map<Integer, DataSource> map = new LinkedHashMap<>();
 
        for (int i = 0; i < NUM_TEST_SHARDS; i++) {
            PostgreSQLContainer<?> container = SHARD_CONTAINERS.get(i);
 
            HikariDataSource ds = new HikariDataSource();
            ds.setJdbcUrl(container.getJdbcUrl());
            ds.setUsername(container.getUsername());
            ds.setPassword(container.getPassword());
            ds.setMaximumPoolSize(5);
            ds.setPoolName("TestPool-Shard-" + i);
 
            map.put(i, ds);
        }
 
        return map;
    }
}

ShardingIntegrationTest.java

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Import(TestcontainersShardConfig.class)
@ActiveProfiles("test")
@Transactional  // NOT used here: cross-shard tests should not be in one transaction
class ShardingIntegrationTest {
 
    @Autowired private UserService userService;
    @Autowired private OrderService orderService;
    @Autowired private CrossShardQueryService crossShardQueryService;
    @Autowired private ShardKeyResolver shardKeyResolver;
    @Autowired private SnowflakeIdGenerator idGenerator;
 
    // ==========================================================<mark class="obsidian-highlight">
    // Test 1: Basic CRUD on correct shard
    // </mark>==========================================================
 
    @Test
    @DisplayName("Create user and retrieve by ID - must land on correct shard")
    void createAndRetrieveUser_shouldWorkOnCorrectShard() {
        CreateUserRequest request = new CreateUserRequest("alice@test.com", "Alice Smith");
 
        User created = userService.createUser(request);
 
        assertThat(created.getId()).isNotNull();
        assertThat(created.getEmail()).isEqualTo("alice@test.com");
        assertThat(created.getStatus()).isEqualTo(UserStatus.ACTIVE);
 
        // Retrieve the same user - should find it
        Optional<User> retrieved = userService.getUserById(created.getId());
 
        assertThat(retrieved).isPresent();
        assertThat(retrieved.get().getFullName()).isEqualTo("Alice Smith");
    }
 
    // ==========================================================<mark class="obsidian-highlight">
    // Test 2: Distribution verification
    // </mark>==========================================================
 
    @Test
    @DisplayName("100 users should distribute across all shards, not just shard 0")
    void multipleUsers_shouldDistributeAcrossAllShards() {
        Map<Integer, Integer> shardDistribution = new HashMap<>();
 
        for (int i = 0; i < 100; i++) {
            User user = userService.createUser(
                new CreateUserRequest("user" + i + "@test.com", "User " + i)
            );
            int shard = shardKeyResolver.resolve(user.getId());
            shardDistribution.merge(shard, 1, Integer::sum);
        }
 
        // All 4 shards must have been used
        assertThat(shardDistribution).hasSize(4);
 
        // No shard should have more than 60% of users (would indicate poor distribution)
        shardDistribution.values().forEach(count ->
            assertThat(count)
                .as("Shard distribution too uneven - hot shard detected")
                .isLessThan(60)
        );
 
        // Each shard should have at least 10% of users
        shardDistribution.values().forEach(count ->
            assertThat(count)
                .as("Shard distribution too uneven - under-utilized shard detected")
                .isGreaterThan(10)
        );
    }
 
    // ==========================================================<mark class="obsidian-highlight">
    // Test 3: @Transactional rollback stays on one shard
    // </mark>==========================================================
 
    @Test
    @DisplayName("@Transactional rollback must not affect other shards")
    void transactionRollback_shouldAffectOnlyOneShard() {
        // Create a user successfully
        User user = userService.createUser(
            new CreateUserRequest("rollback@test.com", "Rollback User")
        );
 
        // Attempt an update that will fail (null fullName violates NOT NULL constraint)
        assertThatThrownBy(() ->
            userService.updateUserProfile(
                user.getId(),
                UpdateProfileRequest.builder().fullName(null).build()
            )
        ).isInstanceOf(DataIntegrityViolationException.class);
 
        // User should be unchanged
        User unchanged = userService.getUserById(user.getId()).orElseThrow();
        assertThat(unchanged.getFullName()).isEqualTo("Rollback User");
    }
 
    // ==========================================================<mark class="obsidian-highlight">
    // Test 4: Order co-located on same shard as user
    // </mark>==========================================================
 
    @Test
    @DisplayName("Orders should be co-located on the same shard as their user")
    void order_shouldBeOnSameShardAsUser() {
        User user = userService.createUser(
            new CreateUserRequest("order.user@test.com", "Order User")
        );
 
        Order order = orderService.placeOrder(
            user.getId(),
            new PlaceOrderRequest(new BigDecimal("599.00"), "123 Main St")
        );
 
        // Both user and order should resolve to the same shard
        int userShard  = shardKeyResolver.resolve(user.getId());
        int orderShard = shardKeyResolver.resolve(order.getUserId()); // userId is the shard key
 
        assertThat(orderShard).isEqualTo(userShard);
 
        // Order should be retrievable (confirms it is on the correct shard)
        Optional<Order> retrieved = orderService.getOrderById(order.getId(), user.getId());
        assertThat(retrieved).isPresent();
        assertThat(retrieved.get().getTotalAmount()).isEqualByComparingTo("599.00");
    }
 
    // ==========================================================<mark class="obsidian-highlight">
    // Test 5: Scatter-gather aggregation
    // </mark>==========================================================
 
    @Test
    @DisplayName("Scatter-gather count must aggregate correctly from all shards")
    void scatterGather_countActiveUsers_shouldSumAllShards() {
        // Create exactly 12 users and track the count before
        long countBefore = crossShardQueryService.countAllActiveUsers();
 
        for (int i = 0; i < 12; i++) {
            userService.createUser(
                new CreateUserRequest("sg" + i + "@test.com", "SG User " + i)
            );
        }
 
        long countAfter = crossShardQueryService.countAllActiveUsers();
 
        assertThat(countAfter - countBefore).isEqualTo(12);
    }
 
    // ==========================================================<mark class="obsidian-highlight">
    // Test 6: ShardContext is always cleared (ThreadLocal leak prevention)
    // </mark>========================================================<mark class="obsidian-highlight">
 
    @Test
    @DisplayName("ShardContext must always be cleared after operation, even on exception")
    void shardContext_mustAlwaysBeCleared_evenOnException() {
        // Confirm context starts clean
        assertThat(ShardContext.isSet()).isFalse();
 
        // This will likely not find a user but must still clear context
        userService.getUserById(Long.MAX_VALUE);
        assertThat(ShardContext.isSet()).isFalse();
 
        // Trigger an exception scenario - context must still clear
        try {
            userService.getUserById(null);  // null will cause exception
        } catch (Exception ignored) {}
 
        assertThat(ShardContext.isSet()).isFalse();
    }
 
    // </mark>========================================================<mark class="obsidian-highlight">
    // Test 7: Snowflake ID properties
    // </mark>==========================================================
 
    @Test
    @DisplayName("Snowflake IDs must be globally unique and time-ordered")
    void snowflakeIds_shouldBeUniqueAndTimeOrdered() {
        Set<Long> ids = new HashSet<>();
        List<Long> orderedIds = new ArrayList<>();
 
        for (int i = 0; i < 10_000; i++) {
            long id = idGenerator.nextId();
            ids.add(id);
            orderedIds.add(id);
        }
 
        // All IDs must be unique
        assertThat(ids).hasSize(10_000);
 
        // IDs must be monotonically increasing (time-sortable)
        for (int i = 1; i < orderedIds.size(); i++) {
            assertThat(orderedIds.get(i)).isGreaterThan(orderedIds.get(i - 1));
        }
 
        // Timestamp extraction must work
        Instant ts = idGenerator.extractTimestamp(orderedIds.get(0));
        assertThat(ts).isAfter(Instant.now().minus(1, ChronoUnit.MINUTES));
        assertThat(ts).isBefore(Instant.now().plus(1, ChronoUnit.MINUTES));
    }
 
    // ==========================================================<mark class="obsidian-highlight">
    // Test 8: @Async does not leak ShardContext
    // </mark>==========================================================
 
    @Test
    @DisplayName("Async operations must set their own ShardContext, not inherit ThreadLocal")
    void asyncOperation_shouldNotInheritShardContext() throws Exception {
        // Set context on the main thread
        ShardContext.set(3);
 
        AtomicInteger capturedShardInAsyncThread = new AtomicInteger(-1);
        CountDownLatch latch = new CountDownLatch(1);
 
        // Simulate what @Async does - new thread from pool
        Thread asyncThread = new Thread(() -> {
            // New thread should NOT see the parent thread's ShardContext
            Integer shard = ShardContext.get();
            capturedShardInAsyncThread.set(shard != null ? shard : -99);
            latch.countDown();
        });
        asyncThread.start();
 
        latch.await(2, TimeUnit.SECONDS);
 
        // Async thread should have seen null (no context), not 3 (parent's context)
        assertThat(capturedShardInAsyncThread.get())
            .as("Async thread must not inherit parent thread's ShardContext (ThreadLocal is per-thread)")
            .isEqualTo(-99);  // -99 means null was returned (correct)
 
        // Cleanup main thread
        ShardContext.clear();
    }
}

9. The Complete Request Flow: End-to-End Trace

This is what happens, in exact order, when a user calls POST /api/orders:

1. HTTP Request arrives at Spring DispatcherServlet
   POST /api/orders
   Body: { "userId": 5042, "totalAmount": 599.00 }

2. OrderController.placeOrder() is called
   - No shard logic here. Just delegates to service.

3. Spring AOP intercepts before entering OrderService.placeOrder()

   3a. ShardingAspect runs (@Order(1)):
       - Reads @ShardedOperation(shardKeyParam = "userId")
       - Extracts userId = 5042 from method arguments
       - Calls shardKeyResolver.resolve(5042L)
         -> MurmurHash3(5042) mod 4 = 2
       - Calls ShardContext.set(2)
       - Calls joinPoint.proceed() -> enters OrderService.placeOrder()

   3b. @Transactional AOP runs (@Order(MAX_VALUE)):
       - Calls routingDataSource.getConnection()
       - ShardRoutingDataSource.determineCurrentLookupKey() returns 2 (from ShardContext)
       - AbstractRoutingDataSource selects DataSource for shard 2
       - HikariCP pool "HikariPool-Shard-2" provides a connection
       - Spring binds this connection to the current thread via TransactionSynchronizationManager
       - Transaction starts: BEGIN on shard 2

4. OrderService.placeOrder() body executes:
   - userRepository.findById(5042) -> JDBC -> shard 2 connection -> PostgreSQL shard 2
   - orderRepository.save(order)   -> JDBC -> shard 2 connection -> PostgreSQL shard 2
   - outboxRepository.save(event)  -> JDBC -> shard 2 connection -> PostgreSQL shard 2
   All three use the SAME connection (bound to thread). All on shard 2.

5. @Transactional AOP commits:
   - COMMIT on shard 2 connection
   - Connection returned to HikariPool-Shard-2

6. ShardMetricsAspect records:
   - Timer: db.shard.operation.duration{shard="2", method="placeOrder", status="success"}

7. ShardingAspect finally block runs:
   - ShardContext.clear()  <- ThreadLocal removed

8. HTTP Response returned: 201 Created

---

200ms later (OutboxEventRelay runs):

9. @Scheduled relayPendingEvents():
   - Iterates shards 0, 1, 2, 3
   - On shard 2:
     - ShardContext.set(2)
     - Queries outbox_events WHERE status = 'PENDING'
     - Finds the event from step 4 above
     - kafkaTemplate.send("order.events", "5042", payload)
     - Updates event.status = PUBLISHED
     - ShardContext.clear()

10. Kafka consumers receive "order.events":
    - InventoryService: reserves items
    - PaymentService: processes payment
    - NotificationService: sends order confirmation

Quick Reference: The Four Rules of Spring Boot Sharding

Rule 1: @Order(1) on ShardingAspect is non-negotiable.
        It must run BEFORE @Transactional opens the DB connection.

Rule 2: ShardContext.clear() must ALWAYS run in a finally block.
        Never rely on it being called normally. Thread pool contamination is silent.

Rule 3: @Transactional is single-shard only.
        Once a transaction opens on shard X, all DB calls in that transaction
        go to shard X regardless of any ShardContext changes inside the method.

Rule 4: @Async means a new thread. ThreadLocal is not propagated.
        Async methods must call ShardContextHolder.withKey() themselves.
        Do not assume they inherit the calling thread's shard context.

See sharding-demystified.md for conceptual foundations and sharding-interview-questions.md for interview preparation.