Sharding Demystified
Understanding Database Sharding from First Principles to Production Reality
Table of Contents
- The Real-World Analogy That Makes It Click
- The Problem Sharding Solves
- What is Sharding? Core Terminology
- Partitioning vs Sharding - The Distinction Everyone Confuses
- The Four Types of Sharding
- Consistent Hashing - The Algorithm That Makes Sharding Practical
- The Shard Key - The Most Critical Decision You Will Ever Make
- The Hot Shard Problem
- Where Does the Sharding Logic Live?
- Real Companies and How They Shard
- Cross-Shard Operations - The Painful Reality
- Resharding - Adding and Removing Shards
- Global ID Generation Across Shards
- Schema Migrations Across Shards
- Production Pitfalls
- Sharding Decision Checklist
1. The Real-World Analogy That Makes It Click
The Phone Directory Analogy
Think about a city telephone directory from the 1990s. One massive book. Millions of names. Alphabetically sorted.
Now imagine the city grows to 50 million people. The book becomes so heavy no one can lift it. Finding a name takes too long. So the publisher splits it:
- Book 1 (Server 1): Names A through M
- Book 2 (Server 2): Names N through Z
Immediately, the benefits appear:
- Each book is half the size and half the weight
- Two librarians can serve people simultaneously (parallelism)
- Looking for "Rahul Sharma" means you go directly to Book 2 — you never touch Book 1
- If Book 1 is being reprinted, people looking for N-Z names can still use Book 2
This is sharding. Exactly this. Just with database rows instead of names, and database servers instead of books.
Extending the Analogy
Now the city keeps growing. Two books are not enough. You split again:
- Book 1: A through F
- Book 2: G through L
- Book 3: M through R
- Book 4: S through Z
But now there is a new problem. The "N" section (for "Naidu", "Nair", "Nayak") is enormous because there are millions of people with names starting in N in South India. Book 2 is 3x fatter than Book 1. One librarian is overwhelmed while others sit idle.
This is the hot shard problem. Your data split is uneven.
This is why shard key selection (how you decide to split) is the most important decision in sharding design.
2. The Problem Sharding Solves
The Journey of Every Growing Application
Stage 1: The Happy Beginning
[Application Servers]
|
v
[Single Database Server]
- 100 GB of data
- 500 queries/second
- All running fine
Every startup starts here. Simple. Easy to debug. Easy to back up. One server, one database.
Stage 2: First Signs of Stress
[Application Servers (scaled to 20 instances)]
|
v
[Single Database Server]
- 2 TB of data
- 5,000 queries/second
- CPU: 85% utilized
- Slow queries appearing
You scaled your application servers horizontally (20 instances behind a load balancer). But your database is still one server. The application tier is no longer the bottleneck — the database is. This is called the database bottleneck.
Stage 3: Vertical Scaling — The First Escape Attempt
You buy a bigger server:
- 64 CPU cores
- 512 GB RAM
- 10 TB NVMe SSD
This buys time. But vertical scaling has hard limits:
| Problem | Why It Exists | Why You Cannot Just Throw Money At It |
|---|---|---|
| CPU ceiling | The fastest CPUs exist and cost $50K+ | 100 CPUs do not give 100x throughput for a single query |
| RAM ceiling | The biggest servers have 12-24 TB RAM at most | Not all workloads benefit from more RAM beyond a point |
| Single point of failure | One machine crashes, everything goes down | Replication helps reads but writes still go to one primary |
| Connection ceiling | PostgreSQL: ~10,000 connections max practical | Each connection uses 10 MB memory |
| Index size | Full-table indexes must fit in RAM for fast queries | 1 billion rows = indexes that cannot fit in memory |
| Write throughput | One disk controller, one write-ahead log | Sequential writes on one device max out around 1-3 GB/s |
Stage 4: The Need for Sharding
[Application Servers]
|
v
[Router / Shard Map]
/ | \
v v v
[DB 1] [DB 2] [DB 3]
Users Users Users
1-1M 1M-2M 2M-3M
Now:
- Each server holds 1/3 of the data
- Each server handles 1/3 of the queries
- You can keep adding servers as you grow
- One server crashing affects only 1/3 of users (not all)
3. What is Sharding? Core Terminology
Formal Definition
Sharding is the practice of horizontally partitioning data across multiple independent database instances (called shards), where each shard holds a distinct subset of the total data, and together all shards hold the complete dataset.
The Vocabulary You Must Know
Shard (also called Partition or Node):
One individual database instance that holds a subset of the data. Think of it as one "slice" of the whole.
Shard Key (also called Partition Key):
The column or set of columns whose value determines which shard a given row lives in. This is the most important design decision. Example: user_id, customer_id, region, order_date.
Shard Map (also called Partition Map or Routing Table):
A lookup table or algorithm that maps a shard key value to a specific shard. Answers: "Given user_id = 5043, which shard should I talk to?"
Routing Layer:
The component that intercepts every database query, consults the shard map, and forwards the query to the correct shard. Can live in the application, a middleware proxy, or the database itself.
Logical Shard vs Physical Shard:
- A logical shard is an abstract unit of partitioning (e.g., "shard 0042")
- A physical shard is an actual database server
- Multiple logical shards can live on the same physical shard
- Example: You define 1,000 logical shards but have only 10 physical servers — each server hosts 100 logical shards. When you add an 11th server, you move some logical shards to it. Much easier than moving raw data.
Rebalancing:
The process of moving data from one shard to another, typically because shards have become uneven in size or load.
Resharding:
The process of changing the number of shards, requiring data to be redistributed across the new shard count.
4. Partitioning vs Sharding
This distinction confuses many engineers. They are related but different.
Partitioning
Splitting data within a single database instance. All partitions live on the same machine, same database process.
One Database Server
+-----------------------------------------+
| orders_2022 (partition 1) |
| orders_2023 (partition 2) |
| orders_2024 (partition 3) |
+-----------------------------------------+
PostgreSQL range partitioning example:
CREATE TABLE orders (
order_id BIGINT,
user_id BIGINT,
created_at TIMESTAMP,
amount DECIMAL
) PARTITION BY RANGE (created_at);
CREATE TABLE orders_2023
PARTITION OF orders
FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');
CREATE TABLE orders_2024
PARTITION OF orders
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');Benefits of partitioning:
- Queries on
orders_2024only scan that one partition (partition pruning) - Old partitions can be archived or dropped cheaply
- Still one database connection string from your app
Limitations:
- Still one server — you hit the same CPU, RAM, and disk limits
- Does not help with write throughput (still one WAL)
Sharding
Splitting data across multiple independent database instances on different machines.
Server 1 (Shard A) Server 2 (Shard B)
+-------------------+ +-------------------+
| orders | | orders |
| user_id: 1-500K | | user_id: 500K-1M |
+-------------------+ +-------------------+
| Dimension | Partitioning | Sharding |
|---|---|---|
| Location | Same server | Different servers |
| Scaling | Helps query performance | Helps capacity AND performance |
| Write throughput | No improvement | Scales with shard count |
| Complexity | Low (database-native) | High (routing logic, cross-shard ops) |
| Failure isolation | No (single server) | Yes (one shard failure = partial outage) |
| When to use | Large tables with time-based or range queries | Data that has grown beyond one server's capacity |
Summary: Partitioning is a performance optimization. Sharding is a scaling architecture.
5. The Four Types of Sharding
Type 1: Range-Based Sharding
How it works: Divide the shard key value range into continuous segments. Each segment maps to one shard.
Shard 1: user_id 1 to 2,000,000
Shard 2: user_id 2,000,001 to 4,000,000
Shard 3: user_id 4,000,001 to 6,000,000
Shard 4: user_id 6,000,001 to 8,000,000
Routing logic:
public int getShardForUserId(long userId) {
if (userId <= 2_000_000) return 1;
if (userId <= 4_000_000) return 2;
if (userId <= 6_000_000) return 3;
return 4;
}Strengths:
- Range queries are efficient — "all users with ID between 1M and 2M" hits exactly one shard
- Easy to understand and reason about
- Easy to add new ranges for new shards
Weaknesses:
- Uneven load: If most active users happen to have IDs in a certain range, that shard is overloaded
- Sequential inserts are hot: If user_id is auto-incrementing, all new users go to the highest shard — that shard gets 100% of writes while others sit idle
- Requires range planning upfront — difficult to change ranges later
Real-world use:
- HBase: Row key range partitioning, called "regions"
- MongoDB: Chunk-based range sharding
- Time-series data: Most natural fit — data sharded by time range (this month, last month, last year)
Type 2: Hash-Based Sharding
How it works: Apply a hash function to the shard key, then use modulo to determine the shard number.
shard_number = hash(shard_key) mod number_of_shards
Example with 4 shards:
user_id = 1001 hash(1001) = 9823441 9823441 mod 4 = 1 -> Shard 1
user_id = 1002 hash(1002) = 3741982 3741982 mod 4 = 2 -> Shard 2
user_id = 1003 hash(1003) = 6182731 6182731 mod 4 = 3 -> Shard 3
user_id = 1004 hash(1004) = 2847561 2847561 mod 4 = 0 -> Shard 0
Strengths:
- Even data distribution — hash functions distribute uniformly (no hot shards from uneven key ranges)
- Works for any data type — hash strings, numbers, UUIDs
- No manual range planning needed
Weaknesses:
- Range queries span all shards — "users registered between Jan and March" requires querying ALL shards (scatter-gather)
- Resharding is painful — adding a 5th shard changes mod from 4 to 5, most existing data must move
- No data locality — related records for the same user might end up on different shards if you use different shard keys
Real-world use:
- Cassandra: Murmur3 hash of the partition key
- Redis Cluster: CRC16 hash of the key mapped to 16,384 hash slots
- DynamoDB: Hash-based partition key
Type 3: Directory-Based Sharding (Lookup Table)
How it works: Maintain an explicit mapping table: "entity X lives on shard Y". The routing layer consults this table for every request.
Lookup Table (stored in a fast cache like Redis):
+--------------------+--------+
| entity_id | shard |
+--------------------+--------+
| tenant_id = acme | shard3 |
| tenant_id = google | shard7 |
| tenant_id = apple | shard2 |
| tenant_id = tcs | shard5 |
+--------------------+--------+
Strengths:
- Maximum flexibility — you can move any entity to any shard at any time by updating the lookup
- Great for multi-tenant SaaS — each customer (tenant) on their own shard
- Can isolate hot tenants — if one tenant is huge, give them a dedicated shard
Weaknesses:
- The lookup table becomes a bottleneck — every single read/write must first check the directory
- The directory is a single point of failure — if it goes down, nothing works
- Stale cache — if routing table changes and cache is not invalidated immediately, queries go to wrong shard
Real-world use:
- Salesforce: Large tenants get dedicated shards, small tenants share shards
- Shopify: Each shop is a tenant, with a directory mapping shops to database pods
- Any multi-tenant B2B SaaS application
Type 4: Geographic / List-Based Sharding
How it works: Assign data to shards based on a list of values or geographic region. Similar to range-based but with discrete categories rather than numeric ranges.
Shard 1 (India DC): region IN ('IN', 'LK', 'BD', 'NP')
Shard 2 (Europe DC): region IN ('DE', 'FR', 'UK', 'NL', 'IT')
Shard 3 (Americas DC): region IN ('US', 'CA', 'BR', 'MX')
Shard 4 (APAC DC): region IN ('JP', 'KR', 'SG', 'AU')
Strengths:
- Data residency compliance — GDPR requires EU user data to stay in EU (mandatory for many companies)
- Low latency — users in India read from India shard, not a US datacenter
- Natural business grouping — operations teams in each region manage their shard
Weaknesses:
- Uneven load — some regions are much larger than others (US shard gets more load than NP shard)
- Cross-region queries — global analytics requires querying all shards
- Regulatory changes — adding a new country requires routing table updates
Real-world use:
- WhatsApp / Meta: User data stored in region of origin for data sovereignty
- Uber: Driver and rider data co-located by city/region
- Stripe: Financial data sharded by country for regulatory compliance
Comparing All Four Types
| Type | Distribution | Range Queries | Resharding Ease | Best Use Case |
|---|---|---|---|---|
| Range-based | Uneven (depends on data) | Excellent (single shard) | Moderate | Time-series, ordered data |
| Hash-based | Even | Poor (scatter-gather) | Hard with mod, easy with consistent hashing | General purpose, user data |
| Directory-based | Flexible | Poor | Easy (update lookup table) | Multi-tenant SaaS, large clients |
| Geographic | Uneven (by region) | Poor (cross-region) | Moderate | Data sovereignty, latency optimization |
6. Consistent Hashing
Why Simple Modulo Hashing Breaks When You Rescale
Imagine you have 4 shards and use hash(key) mod 4 to route:
Key "user:1001" -> hash = 9823441 -> 9823441 mod 4 = 1 -> Shard 1
Key "user:1002" -> hash = 3741982 -> 3741982 mod 4 = 2 -> Shard 2
Now you add a 5th shard because you need more capacity. Now it is hash(key) mod 5:
Key "user:1001" -> hash = 9823441 -> 9823441 mod 5 = 1 -> Shard 1 (lucky, same)
Key "user:1002" -> hash = 3741982 -> 3741982 mod 5 = 2 -> Shard 2 (lucky, same)
Key "user:1003" -> hash = 6182731 -> 6182731 mod 5 = 1 -> Shard 1 (WAS Shard 3!)
With mod 4 to mod 5, approximately 80% of all keys map to a different shard. This means 80% of your data must be physically moved from one server to another. During this migration, your system serves stale data or has to be taken offline.
For a 1 TB database, that is 800 GB of data movement. This is a catastrophic resharding event.
Consistent Hashing Solves This
The Core Idea:
Imagine a circle (a hash ring) numbered from 0 to 2^32 (about 4 billion points). Both servers AND data keys are placed on this ring by hashing them.
0
/ \
/ \
Hash Ring: / \
3B -----+---+-------+--- 500M
| \ / |
| \ / |
2.5B | \ / | 1B
| \ / |
| X |
2B -----+-------+-------+---- 1.5B
|
1.5B
Placing servers on the ring:
hash("Server-A") = 500M -> Server A sits at position 500M
hash("Server-B") = 1.5B -> Server B sits at position 1.5B
hash("Server-C") = 2.5B -> Server C sits at position 2.5B
hash("Server-D") = 3.8B -> Server D sits at position 3.8B (near 0)
Routing a key:
hash("user:1001") = 700M
Walk clockwise on the ring from 700M until you hit a server.
First server clockwise from 700M is Server-B at 1.5B.
Therefore: user:1001 belongs to Server-B.
Adding a new server:
hash("Server-E") = 1B -> New server sits at position 1B
Now only keys between 500M and 1B (previously served by Server-B) move to Server-E.
All other keys are unaffected.
Only ~1/5 of keys (1 server's worth) need to be moved. Not 80%.
Virtual Nodes
The problem without virtual nodes:
Servers land at random positions on the ring. By chance, one server might cover 40% of the ring and another only 5%. Uneven load again.
Solution — virtual nodes (vnodes):
Instead of placing each server once on the ring, place it multiple times (100-200 times). Each placement is a virtual node. The server has 150 virtual positions spread around the ring.
Server-A: positions 100M, 350M, 820M, 1.2B, 1.9B, 2.4B, 3.1B ... (150 positions)
Server-B: positions 200M, 480M, 750M, 1.4B, 2.1B, 2.7B, 3.5B ... (150 positions)
With 150 vnodes per server, the load is statistically much more even.
When a server is added, it takes a few vnodes from every existing server — perfectly balanced.
When a server is removed, its vnodes distribute evenly to all remaining servers.
How Redis Cluster Uses This Concept
Redis Cluster does not use pure consistent hashing but a similar deterministic slot approach:
- The keyspace is divided into 16,384 hash slots
slot = CRC16(key) mod 16384- Each Redis node is assigned a range of slots (not consistent hashing ring, but same principle)
- Adding a node: some slots from existing nodes move to the new node
- Removing a node: its slots redistribute to remaining nodes
Node 1: slots 0 to 5460
Node 2: slots 5461 to 10922
Node 3: slots 10923 to 16383
Adding Node 4:
Node 1: slots 0 to 4095 (gave away 1365 slots)
Node 2: slots 4096 to 8191 (gave away 1365 slots)
Node 3: slots 8192 to 12287 (gave away 1365 slots)
Node 4: slots 12288 to 16383 (received 4096 slots total)
7. The Shard Key
Why This Decision Is Irreversible (Practically)
Choosing the wrong shard key is the most expensive mistake in database design. Changing a shard key after data is written means:
- Reading every row in every shard
- Computing the new shard assignment for every row
- Moving rows to their new shards
- Updating all application code and queries
- Doing all of this without downtime
For a company like Flipkart with billions of rows, this is a months-long project. Choose carefully upfront.
What Makes a Good Shard Key
A good shard key satisfies four criteria:
1. High Cardinality
The shard key must have many distinct values. Otherwise, you cannot create enough shards.
gender(M/F/Other) = terrible shard key. Only 3 distinct values. You can have at most 3 shards.country= poor shard key for global apps. 200 values, and 'US' has 30% of all data.user_id= excellent. Millions of distinct values, distributes evenly.order_id= excellent. Every order has a unique ID.
2. Even Write Distribution
Writes must spread evenly across shards. No single shard should be the bottleneck.
created_at(date) = poor for write distribution. All new records go to today's shard. 100% of writes go to one shard.order_status= terrible. Status = 'PENDING' might have 90% of all rows.user_idwith hash = excellent. Writes distribute evenly.
3. Query Locality
The most frequent queries should be answerable from a single shard, not require scatter-gather across all shards.
Ask yourself: "What does my application query most often?"
- "Give me all orders for user X" -> shard by
user_id. All of user X's orders are on one shard. - "Give me all orders placed today" -> shard by
date. All today's orders are on one shard. - If you shard orders by
user_idbut frequently query "give me all orders placed today", you need to hit every shard.
4. Immutability
The shard key value must never change for a given record. If it changes, the record must physically move to a different shard.
email= terrible shard key. Users change emails. Moving a record means reading from old shard, writing to new shard, deleting from old shard, all atomically.user_id= excellent. User IDs are assigned once and never change.country_of_residence= dangerous. Users move countries.
Common Shard Key Patterns
Pattern 1: Entity Owner ID
Shard by the ID of the entity that owns the data. All data for that entity lives on one shard.
Orders table: shard key = customer_id
Result: All orders for customer 5042 are on one shard.
"Give me all orders for customer 5042" = single shard query.
"Give me all orders placed today" = scatter-gather (all shards).
When to use: When your most common query is "give me everything for customer/user/tenant X".
Pattern 2: Compound / Composite Key
Combine two fields into a compound shard key.
Messages table: shard key = (conversation_id, message_date)
Result: All messages in a conversation, for a given month, are on one shard.
Fast: "Give me messages in conversation 999 from this month"
Slow: "Give me all conversations where user X participated"
Used by: WhatsApp, Slack (conversation-based sharding).
Pattern 3: Hash of Primary Key
If no single business field works well, hash the primary key.
shard = MD5(order_id) mod num_shards
Even distribution, but no range query locality whatsoever.
Pattern 4: Geo-Based Compound Key
shard key = (region_code, user_id)
All US user data on US shards. All IN user data on IN shards.
Within each region, user_id provides further distribution.
Used by: Uber, WhatsApp for data sovereignty.
Shard Key Anti-Patterns
| Bad Shard Key | Why It Is Bad | Better Alternative |
|---|---|---|
gender | Only 2-3 values, cannot have more shards than values | user_id |
created_at (date only) | All new writes go to one shard | Hash of user_id or order_id |
order_status | Skewed distribution (most orders are DELIVERED) | user_id or order_id |
email | Mutable (users change email), PII in routing layer | user_id (immutable) |
country | US gets 30-40% of all data | Hash of user_id with geo as secondary dimension |
product_category | Low cardinality (100 categories), some categories huge | product_id hash |
auto_increment_id (sequential) | All inserts go to the latest shard | Random UUID or snowflake ID |
8. The Hot Shard Problem
What Is a Hot Shard?
A hot shard is a shard that receives disproportionately more load than other shards. While other shards are serving 1,000 queries/second at 20% CPU, the hot shard is serving 15,000 queries/second at 100% CPU. It becomes the bottleneck for the entire system.
Hot shards arise from two causes:
- Uneven data distribution (bad shard key with low cardinality or skewed values)
- Behavioral skew (data is evenly distributed, but some entities are accessed far more than others)
Cause 1: The Celebrity Problem
Imagine you shard Twitter posts by user_id. User IDs are uniformly distributed across 100 shards. The data is evenly distributed.
But then a celebrity like Sachin Tendulkar (40 million followers) posts a message. All 40 million followers open the app and read that post. All those reads go to the one shard that holds Sachin's data. That one shard is overwhelmed. The other 99 shards are completely fine.
This is the celebrity problem. Even data distribution does not prevent hot shards if the access pattern is skewed.
Solutions for the Celebrity Problem:
Solution A: Replicate hot data to all shards
For identified "celebrity" entities, copy their data to every shard. When reading, any shard can serve the request.
Normal users: data lives on exactly 1 shard (routed by user_id hash)
Celebrity users: data replicated to ALL 100 shards
Read for celebrity Sachin: any shard can serve it (spread the load)
Write for celebrity Sachin: must update all 100 shards (write amplification)
Used by: Twitter maintains a list of "hot accounts" and uses this approach for reads.
Solution B: Dedicated hot shard
Move the celebrity entity to a dedicated shard with more hardware resources. The shard map has a special entry for this entity.
Routing logic:
if user_id = CELEBRITY_SACHIN: route to high-capacity-shard-7
else: route to hash(user_id) mod 100
Solution C: Add a read cache layer
Put Redis in front of the database shard. Celebrity posts are cached in Redis. Database shard only receives a trickle of cache-miss requests.
Read request for Sachin's post:
-> Check Redis cache
-> Cache hit (95% of the time): return from Redis
-> Cache miss (5% of the time): fetch from DB shard, populate cache
Cause 2: Monotonically Increasing Shard Key
This is the most common hot shard bug in real production systems.
You shard orders by order_id. Order IDs are assigned sequentially (1, 2, 3, 4...). You use range-based sharding:
Shard 1: order_id 1 to 1,000,000 (old data, mostly reads)
Shard 2: order_id 1M to 2M (older data, mostly reads)
Shard 3: order_id 2M to 3M (recent data, some reads + writes)
Shard 4: order_id 3M+ (ALL new orders go here)
Shard 4 is a permanent hot shard. Every single new order, forever, goes to Shard 4. The other shards receive only historical reads.
Solutions:
Solution A: Hash the sequential ID
shard = hash(order_id) mod num_shards
Now order_id 3,000,001 might go to shard 2. The monotonic sequence is broken. Even distribution restored.
Solution B: UUIDs or Snowflake IDs
Generate non-sequential IDs. Snowflake IDs embed a sequence number but also a timestamp and machine ID — their hash distributes evenly.
Solution C: Shard by a different key
If you shard orders by customer_id instead of order_id, new orders distribute evenly across customers.
Cause 3: Time-Based Sharding Hot Spot
Sharding by event_date or created_at (date only) means the shard for today receives 100% of writes. All historical shards receive only reads. The "today" shard is always hot.
Solution:
Use a composite shard key that adds a bucketing dimension:
shard_key = date || (user_id mod bucket_count)
Or shard by something other than date entirely (user_id, entity_id) and use date only for partitioning within a shard.
9. Where Does the Sharding Logic Live?
Option 1: Application-Level Sharding
Your application code contains the routing logic. The app decides which database shard to connect to.
@Service
public class OrderRepository {
private final Map<Integer, DataSource> shards; // shard_number -> DataSource
public DataSource getShardForOrder(long orderId) {
int shardNumber = (int)(Math.abs(Objects.hash(orderId)) % shards.size());
return shards.get(shardNumber);
}
public Order findOrderById(long orderId) {
DataSource ds = getShardForOrder(orderId);
JdbcTemplate jdbc = new JdbcTemplate(ds);
return jdbc.queryForObject(
"SELECT * FROM orders WHERE order_id = ?",
new Object[]{orderId},
orderRowMapper
);
}
}Spring Boot multi-datasource configuration:
@Configuration
public class ShardingDataSourceConfig {
@Bean("shard0")
public DataSource shard0DataSource() {
return DataSourceBuilder.create()
.url("jdbc:postgresql://shard0-db.internal:5432/orders")
.username("app")
.password("secret")
.build();
}
@Bean("shard1")
public DataSource shard1DataSource() {
return DataSourceBuilder.create()
.url("jdbc:postgresql://shard1-db.internal:5432/orders")
.username("app")
.password("secret")
.build();
}
// Thread-local routing: set before any DB call
@Bean
public AbstractRoutingDataSource routingDataSource(
@Qualifier("shard0") DataSource shard0,
@Qualifier("shard1") DataSource shard1
) {
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(0, shard0);
targetDataSources.put(1, shard1);
AbstractRoutingDataSource routing = new AbstractRoutingDataSource() {
@Override
protected Object determineCurrentLookupKey() {
return ShardContext.getCurrentShard(); // ThreadLocal
}
};
routing.setTargetDataSources(targetDataSources);
return routing;
}
}Strengths:
- Full control over routing logic
- No additional infrastructure component
- Can implement custom routing rules per query type
Weaknesses:
- Every team and service must implement sharding logic — code duplication
- Hard to change sharding strategy without deploying all services
- Application code becomes tightly coupled to infrastructure decisions
Who uses this: Early Flickr, Pinterest, early Twitter, many Indian startups
Option 2: Proxy / Middleware Sharding
A dedicated middleware component sits between the application and the database. The application connects to the proxy as if it were a single database. The proxy routes transparently.
[Application]
|
| (thinks it is talking to one database)
v
[Sharding Proxy / Middleware]
- Parses SQL
- Determines shard key from query
- Routes to correct shard(s)
| | |
v v v
[Shard 1] [Shard 2] [Shard 3]
Popular middleware sharding solutions:
| Tool | Database | Used By |
|---|---|---|
| Vitess | MySQL | YouTube, Slack, GitHub, PlanetScale |
| ProxySQL | MySQL | Various |
| ShardingSphere | MySQL, PostgreSQL | Many Chinese tech companies |
| Citus | PostgreSQL | Microsoft Azure, Instacart |
| AWS Aurora Sharding | MySQL/PostgreSQL | AWS customers |
Vitess example — YouTube uses this at massive scale:
Vitess is transparent to the application. Your Spring Boot app connects to Vitess using a standard MySQL JDBC URL. Vitess handles sharding, resharding, replication, and failover invisibly.
Strengths:
- Application knows nothing about sharding
- Centralized routing logic — change sharding strategy without touching application code
- Built-in resharding tools (Vitess MoveTables, Materialize)
- Handles cross-shard scatter-gather automatically
Weaknesses:
- One more infrastructure component to operate and monitor
- Proxy can become a bottleneck if undersized
- Not all SQL features work across shards (complex JOINs, stored procedures)
Option 3: Database-Native Sharding
The database engine itself handles partitioning and distribution. The application uses a single connection and issues standard queries.
MongoDB Sharding Architecture:
Application
|
v
[mongos router process] (lightweight, stateless query router)
|
v
[Config Servers] (store shard map: which chunks live on which shard)
/ | \
v v v
[Shard1] [Shard2] [Shard3]
(each shard is a replica set: primary + 2 replicas)
MongoDB sharding concepts:
- Chunk: A contiguous range of shard key values (default 128 MB)
- Balancer: Background process that moves chunks between shards to keep them even
- mongos: The query router — your application connects only to mongos
Cassandra Distribution Architecture:
Cassandra is distributed by design. There is no separate routing component:
- Every Cassandra node knows where every piece of data lives (gossip protocol)
- A client can connect to ANY node
- That node acts as the coordinator, forwarding the request to the correct shard node
- Data is distributed using consistent hashing (Murmur3 by default)
Strengths:
- Zero application changes for sharding
- Database handles rebalancing automatically
- Purpose-built for distributed operation
Weaknesses:
- You are locked into that database technology
- Operational complexity of the database itself is high
- Fewer SQL features (Cassandra is CQL, not full SQL)
10. Real Companies and How They Shard
YouTube / Google - Vitess (MySQL at Scale)
The Problem: YouTube stores billions of video metadata rows. MySQL is the database. But MySQL was never designed for billions of rows across hundreds of servers.
The Solution: YouTube built Vitess in 2010 to shard MySQL transparently.
How it works:
- Shard key:
user_idandvideo_id(different tables use different keys) - Routing: Vitess parses SQL, extracts shard key, routes to correct MySQL shard
- Resharding: Vitess can split a shard into two (online resharding) without downtime
- Scale: Thousands of MySQL instances managed as one logical database
Key insight: Vitess uses logical shards (called keyspaces). A keyspace can be served by 1 physical shard initially. When it gets too large, you split it into 2, 4, 8 physical shards — Vitess handles the data movement transparently.
Twitter - MySQL Sharding + Gizzard
The Problem: Hundreds of millions of tweets, users, and relationships.
The Solution: Twitter built Gizzard, a middleware sharding framework for MySQL.
Sharding decisions:
- Users: sharded by
user_idhash - Tweets: sharded by
user_id(all tweets by a user on the same shard — enables "get all tweets by user" efficiently) - Social graph (followers): sharded by
user_id
The Fan-Out Problem: When a user tweets, 50 million followers need to see it in their timeline. Twitter does NOT store "compute timeline on read from all followed users' shards" (too slow). Instead, at write time, the tweet is fanned out and written to each follower's timeline shard (push model). For celebrities (>1M followers), pure pull model is used to avoid write storms.
Facebook - MySQL at Extreme Scale
The Problem: Facebook has 3 billion users. Single MySQL instance collapses.
The Solution:
- Application-level sharding of MySQL
- Later: TAO (graph database) for social graph
- Later still: MyRocks (RocksDB-based MySQL storage engine) for better space efficiency
Sharding specifics:
- User data sharded by
user_id - Logical sharding: 10,000 logical shards mapped to ~500 physical MySQL servers
- Each physical server hosts ~20 logical shards
- Adding capacity: Move some logical shards to new servers — no physical data migration needed
Amazon DynamoDB - Native Adaptive Sharding
The Problem: DynamoDB needs to serve millions of tables for millions of customers with unpredictable access patterns.
The Solution: DynamoDB partitions tables automatically.
How it works:
- You choose a Partition Key (= shard key) when creating a table
- DynamoDB computes
shard = hash(partition_key) mod num_shards - DynamoDB automatically splits hot partitions (adaptive capacity)
- You never manage shards — AWS does it
The Partition Key Problem:
If you choose a bad partition key (low cardinality, like status), DynamoDB still has only a few physical partitions. The same hot partition problem applies.
DynamoDB best practices for shard key:
- Use
user_id,device_id,order_id— high cardinality - Add a random suffix for super-hot keys:
celebrity_user_id#shard_0throughcelebrity_user_id#shard_9— then read all 10 shards and merge in application
Cassandra - Netflix, Apple, Instagram
The Problem: Netflix has billions of viewing history records. Instagram has billions of photos.
How Cassandra shards:
- Every row has a partition key (= shard key)
- Cassandra applies MurmurHash3 to the partition key
- The hash maps to a position on the consistent hash ring
- The data goes to the token range owner at that position
Netflix use of Cassandra:
- Viewing history: partition key =
(user_id, content_id)— all ratings for a user/movie combination on one partition - User preferences: partition key =
user_id - Scale: 10,000+ Cassandra nodes globally
Zerodha - PostgreSQL Sharding for Trading Data
The Challenge: SEBI mandates 7 years of trade data retention. Zerodha has millions of active traders. Each trade must be queryable by regulatory auditors instantly.
The Solution:
- Time-based partitioning within PostgreSQL (not full sharding) for historical data
- Instrument master data and current positions in dedicated separate databases
- Archive strategy: old trade data moved to cheaper storage, hot recent data in main PostgreSQL
11. Cross-Shard Operations
Cross-Shard Queries: The Scatter-Gather Pattern
When a query cannot be routed to a single shard, it must be sent to ALL shards. Each shard executes the query on its local data. The results are collected and merged. This is called scatter-gather.
Query: SELECT COUNT(*) FROM orders WHERE status = 'PENDING'
Routing: status is not the shard key. Cannot determine which shard has PENDING orders.
Scatter: Send query to all 100 shards simultaneously.
Shard 1 returns: 150
Shard 2 returns: 230
...
Shard 100 returns: 89
Gather: Sum all results: total PENDING orders = 12,450
Total time: max(time of slowest shard) + aggregation overhead
Cost of scatter-gather:
- Latency is bounded by the SLOWEST shard (if shard 73 is slow, you wait)
- Every query consumes resources on ALL shards simultaneously
- 10-shard scatter-gather = 10x more load on the cluster than a single-shard query
Solutions:
Solution A: Accept it for infrequent analytics queries
If this query runs once per hour for a dashboard, scatter-gather is acceptable.
Solution B: Maintain a denormalized aggregate table
A background job runs every minute, counts pending orders on each shard, writes the total to a central aggregate table:
central_stats table:
metric = 'pending_orders_count'
value = 12450
updated_at = 2026-05-01 10:45:00
Reads are instant. The count is slightly stale (up to 1 minute). For most dashboards, this is fine.
Solution C: Separate analytics database (OLAP)
Pipe all data changes from all shards into a data warehouse (BigQuery, Redshift, Snowflake). Run all analytical queries there. The OLTP sharded database handles only transactional queries. The OLAP warehouse handles analytics.
This is what every large company does. It is not a workaround — it is the correct architecture.
Cross-Shard Transactions: The Hardest Problem
A transaction that must atomically update data on two different shards.
Example: User A (on Shard 1) transfers money to User B (on Shard 2).
BEGIN TRANSACTION
UPDATE accounts SET balance = balance - 1000 WHERE user_id = A -- Shard 1
UPDATE accounts SET balance = balance + 1000 WHERE user_id = B -- Shard 2
COMMIT
In a single database, @Transactional handles this. Across shards, there is no single transaction coordinator.
Option 1: Two-Phase Commit (2PC)
Phase 1 - Prepare:
Coordinator -> Shard 1: "Prepare to debit User A by 1000. Lock the row."
Shard 1 -> Coordinator: "Ready"
Coordinator -> Shard 2: "Prepare to credit User B by 1000. Lock the row."
Shard 2 -> Coordinator: "Ready"
Phase 2 - Commit:
Coordinator -> Shard 1: "Commit"
Coordinator -> Shard 2: "Commit"
Problems with 2PC:
- Both shards hold locks during Phase 1 AND Phase 2 — lock held for 2x network round trips
- If the coordinator crashes between Phase 1 and Phase 2, shards are stuck in limbo (in doubt transaction)
- Does not scale — every cross-shard write needs coordination
- Used in: XA transactions (Java JTA), distributed databases like CockroachDB, Spanner (internally)
Option 2: Saga Pattern (Preferred for microservices)
Break the transaction into a series of local transactions with compensating actions:
Step 1: Debit User A on Shard 1 (local transaction, fast)
On success: Publish "UserADebited" event
On failure: Publish "DebitFailed" event (nothing to compensate)
Step 2: Credit User B on Shard 2 (local transaction, fast)
On success: Done
On failure: Publish "CreditFailed" event -> triggers Step 3
Step 3 (Compensation): If credit failed, Refund User A on Shard 1
No locks held across shards. Eventual consistency. If User B's shard is down, User A's debit will be reversed. The operation eventually succeeds or is fully reversed.
Option 3: Design Data to Avoid Cross-Shard Transactions
The best cross-shard transaction is one that never happens. Design your data model so that all data that participates in a single transaction lives on the same shard.
For money transfers: Instead of having User A on Shard 1 and User B on Shard 2, model the transfer as a transaction record on User A's shard:
transactions table (sharded by from_user_id):
from_user_id = A (on Shard 1)
to_user_id = B
amount = 1000
status = COMPLETED
The debit of User A's balance and the creation of this transaction record
are on the SAME shard (Shard 1). No cross-shard transaction needed.
User B's balance is updated asynchronously via event.
This is how most financial applications actually work. Eventual consistency for balance updates is acceptable in many contexts.
12. Resharding
The Resharding Challenge
Resharding is the process of changing the shard count. You have 4 shards, each getting full. You want to split to 8 shards.
With simple modulo hashing (hash mod 4 vs hash mod 8):
- Keys that were on Shard 0 (hash mod 4 = 0) now could be on Shard 0 or Shard 4 (hash mod 8 = 0 or 4)
- Approximately 50% of all data must move to new shards (for doubling)
- During the migration, which copy of the data is authoritative?
- How do you handle writes that arrive during migration?
This is why resharding is feared. Done wrong, it causes data corruption or downtime.
Safe Online Resharding Strategy
Step 1: Double-write to old and new shards
Before migrating any data, configure the application to write to BOTH the old shard configuration AND the new configuration simultaneously. The old shards are authoritative (reads go to old shards).
Step 2: Backfill new shards
Copy existing data from old shards to new shards. Since writes are going to both, new shards are not stale for new data. Only historical data is being backfilled.
Step 3: Verify consistency
Compare checksums / row counts between old and new shards. Ensure new shards are caught up.
Step 4: Switch reads to new shards
With new shards fully populated and receiving writes, switch reads to the new shards. Old shards still receive writes for a brief period (safety net).
Step 5: Stop writes to old shards
Once confident, route all writes only to new shards. Old shards are now stale.
Step 6: Decommission old shards (or repurpose)
This process is operationally complex. Vitess automates it with the MoveTables and SwitchTraffic commands.
Vitess Online Resharding Example
# Step 1: Start resharding - copy data from source to target shard
vtctlclient Reshard -workflow=order_split \
-source_shards=-80 \
-target_shards=-40,40-80 \
orders
# Step 2: Wait for copy to complete and verify
vtctlclient Reshard -workflow=order_split orders
# Step 3: Switch reads to new shards (non-disruptive, reads just move)
vtctlclient SwitchTraffic -workflow=order_split -tablet_types=RDONLY,REPLICA orders
# Step 4: Switch writes to new shards (brief, atomic cutover)
vtctlclient SwitchTraffic -workflow=order_split -tablet_types=PRIMARY orders
# Step 5: Cleanup - remove old shard
vtctlclient DropSources -workflow=order_split ordersZero downtime. Zero data loss. The cutover in Step 4 takes milliseconds.
13. Global ID Generation Across Shards
The Problem
In a single database, AUTO_INCREMENT generates unique IDs. Across shards, each shard would independently generate ID 1, 2, 3... You would have collisions (User A on Shard 1 has ID 42, AND User B on Shard 2 also has ID 42).
You need a globally unique ID that can be generated independently on each node without coordination.
Option 1: UUID v4 (Random)
String id = UUID.randomUUID().toString();
// Example: "550e8400-e29b-41d4-a716-446655440000"Pros: No coordination needed. Globally unique.
Cons: 128-bit (16 bytes) vs 64-bit integer. Random = terrible for B-tree indexes (random inserts = index fragmentation = slow writes). Not sortable by creation time.
Option 2: Snowflake ID (Twitter, 2010)
Twitter open-sourced a 64-bit ID generation scheme:
Bits: [1 unused][41 bit timestamp ms][10 bit machine ID][12 bit sequence]
Timestamp: milliseconds since epoch (gives ~69 years of IDs)
Machine ID: 10 bits = 1024 distinct machines, each generates IDs independently
Sequence: 12 bits = 4096 IDs per millisecond per machine
Total: 41 + 10 + 12 = 63 bits (fits in Java long)
Properties:
- 64-bit integer — fits in a database BIGINT column
- Monotonically increasing within a machine (good for B-tree locality)
- Sortable by time globally (roughly)
- No coordination between machines
- You can extract the timestamp from any ID:
timestamp = (id >> 22) + EPOCH
Used by: Twitter, Discord, Instagram, Zerodha (for order IDs)
Option 3: Instagram ID (2012)
Instagram needed IDs that could be generated on any database shard independently without coordination:
Bits: [41 bit timestamp ms][13 bit logical shard ID][10 bit per-shard sequence]
Total: 41 + 13 + 10 = 64 bits
The key innovation: the logical shard ID is embedded in the ID itself. Given any Instagram photo ID, you can compute which shard it lives on:
-- PostgreSQL function to generate Instagram-style IDs
CREATE OR REPLACE FUNCTION next_id(OUT result BIGINT)
AS $$
DECLARE
our_epoch BIGINT := 1314220021721; -- Instagram epoch
seq_id BIGINT;
now_millis BIGINT;
shard_id INT := 42; -- This shard's ID, set per shard
BEGIN
SELECT nextval('global_id_sequence') % 1024 INTO seq_id;
SELECT FLOOR(EXTRACT(EPOCH FROM clock_timestamp()) * 1000) INTO now_millis;
result := (now_millis - our_epoch) << 23; -- 41 bits timestamp
result := result | (shard_id << 10); -- 13 bits shard id
result := result | (seq_id); -- 10 bits sequence
END;
$$ LANGUAGE plpgsql;Routing from ID:
public int getShardFromPhotoId(long photoId) {
return (int)((photoId >> 10) & 0x1FFF); // Extract 13-bit shard ID
}You never need a lookup table to find which shard a photo lives on. The shard ID is in the photo ID itself.
Option 4: ULID (Universally Unique Lexicographically Sortable Identifier)
ULID: 01ARZ3NDEKTSV4RRFFQ69G5FAV
Format: [10 char timestamp][16 char random]
[48 bits time] [80 bits random]
Better than UUID (sortable, shorter string representation). Less common than Snowflake for high-volume systems.
14. Schema Migrations Across Shards
The Challenge
You have 100 MySQL shards. You need to add a column:
ALTER TABLE orders ADD COLUMN delivery_notes VARCHAR(500);In a single database: Run the migration, it takes a few minutes, done.
Across 100 shards:
- You need to run this on all 100 shards
- MySQL ALTER TABLE locks the table during migration (pre-5.6 online DDL)
- If you run all 100 simultaneously, 100 tables are locked simultaneously — your service is down
- If you run one at a time, it takes hours, and during that time different shards have different schemas
Strategy 1: Sequential Roll with Compatible Changes
Run the migration shard-by-shard, one at a time. Ensure the migration is backward compatible (adding a nullable column is safe — old code ignores it, new code uses it).
for shard in shard1 shard2 ... shard100; do
mysql -h $shard orders < add_delivery_notes.sql
sleep 5 # Brief pause between shards
doneNew column is nullable and has no default code that writes to it yet. The migration can run without downtime.
Strategy 2: Expand-Contract Pattern
For non-backward-compatible changes:
- Expand: Add new schema alongside old schema (dual write). Deploy application that writes to both old and new.
- Migrate: Backfill old data to new schema format.
- Contract: Once all shards have new schema and all data is migrated, remove old schema.
Strategy 3: pt-online-schema-change (Percona Toolkit)
For MySQL, pt-osc performs online schema changes without table locks:
- Creates a new empty table with the new schema
- Copies rows from old to new table in small batches
- Uses triggers on the old table to apply any writes to the new table during copy
- When copy is done, renames tables atomically
No downtime. Can be run on each shard independently.
Strategy 4: Liquibase / Flyway with Shard Awareness
Flyway and Liquibase can be configured to run against multiple data sources. The migration is idempotent (runs only once per shard due to tracking table).
@Bean
public FlywayMigrationStrategy flywayStrategy() {
return flyway -> {
// Configure and migrate all shards in sequence
for (DataSource shardDs : allShardDataSources) {
Flyway.configure()
.dataSource(shardDs)
.locations("classpath:db/sharded-migrations")
.load()
.migrate();
}
};
}15. Production Pitfalls
Pitfall 1: Choosing the Wrong Shard Key (Unrecoverable)
The most common and most expensive mistake. Engineers often shard by what seems logical ("shard orders by order_date") instead of what provides even distribution and query locality.
Detection: Monitor per-shard query rate, CPU, and data size. One shard consistently higher than others = wrong shard key.
Fix: Expensive resharding project. Sometimes companies run old and new sharding schemes in parallel for months during migration.
Prevention: Before choosing a shard key, analyze your actual query patterns. The most frequent query type should be satisfiable from a single shard.
Pitfall 2: Forgetting to Handle Cross-Shard Queries in the Application
Team builds a sharded system. Works great for "get order by order_id" (single shard). Six months later, analytics team needs "count of orders per city this month". Query hits all shards, returns in 30 seconds, timeouts start appearing.
Fix: Separate OLAP pipeline. Never run analytical queries on sharded OLTP databases.
Pitfall 3: No Global ID Scheme — ID Collisions
Team shards users table. Shard 1 auto-increments from 1. Shard 2 auto-increments from 1. User ID 45 exists on both shards. Now JOIN between orders (sharded by user_id) and users returns wrong data.
Fix: Migrate to Snowflake IDs or UUID before sharding. Non-negotiable.
Pitfall 4: Backup Complexity
Single database: pg_dump mydb > backup.sql. Done.
100 shards: Need to coordinate backups across 100 servers. Point-in-time recovery requires all 100 backups to be consistent to the same timestamp. One shard backup fails silently. You discover this when you need to restore after a disaster.
Fix: Automated backup verification across all shards. Test restores quarterly. Use database-native backup solutions (Vitess has built-in backup management).
Pitfall 5: Transactions That Silently Become Non-Atomic
@Transactional // THIS DOES NOT WORK ACROSS SHARDS
public void transferMoney(long fromUserId, long toUserId, BigDecimal amount) {
accountRepo.debit(fromUserId, amount); // Goes to Shard 1
accountRepo.credit(toUserId, amount); // Goes to Shard 2
}@Transactional controls only ONE database connection (one shard). The Spring @Transactional annotation does NOT magically create a distributed transaction. This code will:
- Debit from Shard 1 successfully (committed)
- Crash on Shard 2
- Money is lost — debit happened, credit did not
Fix: Use the Saga pattern. Never assume @Transactional protects you across shards.
Pitfall 6: Testing Only On a Single Shard
Development and QA environments use a single database for simplicity. Sharded behavior (cross-shard queries, scatter-gather, ID generation) is never tested. Production sharded behavior is different from what was tested.
Fix: Run integration tests against a multi-shard setup. Use Docker Compose to spin up 4 PostgreSQL containers for testing.
Pitfall 7: Ignoring the Resharding Plan
Teams shard a database thinking "4 shards will be enough for years". 18 months later, growth exceeded projections. Now you need 8 shards urgently. No resharding plan exists. Emergency project with significant risk.
Fix: Use logical sharding from day one. Pre-create 100-1000 logical shards even if you only have 4 physical servers initially. Moving logical shards to new physical servers is easy. Changing the number of logical shards is the painful operation.
16. Sharding Decision Checklist
Use this checklist before deciding to shard:
Do You Actually Need Sharding?
| Question | If YES | Action |
|---|---|---|
| Is your database > 5 TB? | Strong candidate for sharding | Continue checklist |
| Is your write throughput > 50,000 writes/second? | Strong candidate | Continue checklist |
| Have you maxed out vertical scaling? | Sharding needed | Continue checklist |
| Can you solve it with read replicas? | Maybe not yet | Try read replicas first |
| Can you solve it with better indexes? | Maybe not yet | Profile queries first |
| Can you solve it with caching? | Maybe not yet | Add caching layer first |
If You Decide to Shard:
- Chosen shard key has high cardinality (millions of distinct values)
- Chosen shard key is immutable (value never changes for a given record)
- Write distribution is even (tested with production data sample)
- Most frequent queries can be answered from a single shard
- Cross-shard query cases identified and addressed (OLAP pipeline or scatter-gather accepted)
- Cross-shard transaction cases identified and Saga pattern designed
- Global ID generation scheme chosen (Snowflake recommended)
- Hot shard scenarios identified (celebrities, trending content, sequential IDs)
- Logical sharding planned (more logical than physical shards)
- Resharding strategy documented
- Backup and recovery tested across all shards
- Monitoring per-shard (query rate, size, CPU) in place
- Schema migration strategy defined
-
@Transactionalusage audited — no cross-shard transaction assumptions
This document is Part 1 of the Sharding series. See sharding-interview-questions.md for deep-dive interview preparation including tricky questions, common mistakes, and scenario-based design problems.