WebSockets Demystified - Part 3: Advanced Patterns
Series: Index | Part 1 | Part 2 | Part 3 | Part 4 | Part 5 | Part 6
Table of Contents
- Pattern 1 - Real-Time Chat System
- Pattern 2 - Live Dashboard with Stock Prices
- Pattern 3 - User Presence System
- Pattern 4 - Real-Time Order Tracking
- Pattern 5 - Collaborative Editing (Operational Transform)
- Pattern 6 - Notification System
- Pattern 7 - Rate Limiting Per Connection
- Pattern 8 - Message Acknowledgment and Reliability
- Reconnection Strategy with Exponential Backoff
- Database Entities and Repositories
1. Pattern 1 - Real-Time Chat System
Architecture
Client A ----SEND msg----> /app/chat.send/{roomId}
|
ChatController
|
ChatService.processAndBroadcast()
| |
Save to MySQL Publish to /topic/room.{roomId}
|
(all subscribers receive)
| |
Client A Client B
Room Service
// src/main/java/com/example/ws/service/ChatRoomService.java
package com.example.ws.service;
import com.example.ws.entity.ChatRoom;
import com.example.ws.entity.ChatRoomMember;
import com.example.ws.exception.RoomNotFoundException;
import com.example.ws.repository.ChatRoomMemberRepository;
import com.example.ws.repository.ChatRoomRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
@Service
@RequiredArgsConstructor
public class ChatRoomService {
private final ChatRoomRepository roomRepository;
private final ChatRoomMemberRepository memberRepository;
private final SimpMessagingTemplate messagingTemplate;
/**
* Creates a new chat room.
*/
@Transactional
public ChatRoom createRoom(String name, String creatorId, boolean isPrivate) {
ChatRoom room = ChatRoom.builder()
.roomId(UUID.randomUUID().toString())
.name(name)
.createdBy(creatorId)
.isPrivate(isPrivate)
.createdAt(Instant.now())
.build();
ChatRoom saved = roomRepository.save(room);
// Add creator as a member automatically
addMember(saved.getRoomId(), creatorId, "ADMIN");
return saved;
}
/**
* Adds a user to a room and notifies existing members.
*/
@Transactional
public void addMember(String roomId, String userId, String role) {
ChatRoom room = roomRepository.findById(roomId)
.orElseThrow(() -> new RoomNotFoundException(roomId));
ChatRoomMember member = ChatRoomMember.builder()
.roomId(roomId)
.userId(userId)
.role(role)
.joinedAt(Instant.now())
.build();
memberRepository.save(member);
// Broadcast user joined event to all room members
UserJoinedEvent event = new UserJoinedEvent(roomId, userId, Instant.now());
messagingTemplate.convertAndSend("/topic/room." + roomId + ".events", event);
}
/**
* Removes a user from a room and notifies remaining members.
*/
@Transactional
public void removeMember(String roomId, String userId) {
memberRepository.deleteByRoomIdAndUserId(roomId, userId);
UserLeftEvent event = new UserLeftEvent(roomId, userId, Instant.now());
messagingTemplate.convertAndSend("/topic/room." + roomId + ".events", event);
}
/**
* Gets all rooms a user is a member of.
*/
@Transactional(readOnly = true)
public List<ChatRoom> getUserRooms(String userId) {
return roomRepository.findByMemberId(userId);
}
}Read Receipts
// src/main/java/com/example/ws/controller/ReadReceiptController.java
package com.example.ws.controller;
import com.example.ws.dto.ReadReceiptRequest;
import com.example.ws.service.ReadReceiptService;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Controller;
import java.security.Principal;
@Controller
@RequiredArgsConstructor
public class ReadReceiptController {
private final ReadReceiptService readReceiptService;
/**
* Client sends read receipt: /app/chat.read/{roomId}
* Server marks messages as read and notifies the sender.
*/
@MessageMapping("/chat.read/{roomId}")
public void markAsRead(
@Payload ReadReceiptRequest request,
Principal principal) {
String readerId = principal.getName();
// Update message status in DB and notify the original sender
readReceiptService.markMessagesRead(
readerId,
request.getRoomId(),
request.getLastReadMessageId()
);
}
}// src/main/java/com/example/ws/service/ReadReceiptService.java
package com.example.ws.service;
import com.example.ws.dto.ReadReceiptEvent;
import com.example.ws.repository.ChatMessageRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@RequiredArgsConstructor
public class ReadReceiptService {
private final ChatMessageRepository messageRepository;
private final SimpMessagingTemplate messagingTemplate;
@Transactional
public void markMessagesRead(String readerId, String roomId, String lastReadMessageId) {
// Update message status in DB - all messages up to lastReadMessageId
messageRepository.markAsReadUpTo(roomId, readerId, lastReadMessageId);
// Notify the room about who has read up to which message
ReadReceiptEvent event = new ReadReceiptEvent(readerId, roomId, lastReadMessageId);
messagingTemplate.convertAndSend("/topic/room." + roomId + ".receipts", event);
}
}2. Pattern 2 - Live Dashboard with Stock Prices
Architecture
MySQL/External API --> StockPriceScheduler --> /topic/prices --> All subscribed clients
--> PortfolioScheduler --> /user/queue/portfolio --> Each specific user
Price Update DTO
// src/main/java/com/example/ws/dto/PriceUpdate.java
package com.example.ws.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
import java.time.Instant;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PriceUpdate {
private String symbol;
private BigDecimal price;
private BigDecimal change;
private BigDecimal changePercent;
private long volume;
private Instant timestamp;
}Throttled Price Broadcaster
A critical pattern: do not push every single price tick to clients. Throttle to a human-readable rate.
// src/main/java/com/example/ws/scheduler/ThrottledPriceBroadcaster.java
package com.example.ws.scheduler;
import com.example.ws.dto.PriceUpdate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@RequiredArgsConstructor
@Slf4j
public class ThrottledPriceBroadcaster {
private final SimpMessagingTemplate messagingTemplate;
// Accumulate latest prices here - overwrites on each tick
// Only the latest price per symbol is kept
private final ConcurrentHashMap<String, PriceUpdate> latestPrices = new ConcurrentHashMap<>();
/**
* Receives price updates from exchange - could be 100s per second per symbol.
* Stores only the latest, does NOT push immediately.
*/
public void onPriceTick(String symbol, BigDecimal price) {
PriceUpdate update = PriceUpdate.builder()
.symbol(symbol)
.price(price)
.timestamp(Instant.now())
.build();
latestPrices.put(symbol, update);
}
/**
* Pushes latest prices to clients at a controlled rate: once per 500ms.
* This prevents flooding clients with thousands of updates per second.
*
* Pattern: "Last-Value Cache" or "Conflation" - industry standard in
* financial systems (Bloomberg, Reuters use this pattern).
*/
@Scheduled(fixedDelay = 500)
public void broadcastLatestPrices() {
if (latestPrices.isEmpty()) return;
List<PriceUpdate> snapshot = new ArrayList<>(latestPrices.values());
latestPrices.clear();
try {
messagingTemplate.convertAndSend("/topic/prices", snapshot);
} catch (Exception e) {
log.error("Failed to broadcast prices: {}", e.getMessage());
}
}
}3. Pattern 3 - User Presence System
Presence Service with Redis
// src/main/java/com/example/ws/service/PresenceService.java
package com.example.ws.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Tracks user online/offline status using Redis Sets.
*
* Redis Data Model:
* presence:user:{userId}:sessions --> Set of active sessionIds for this user
* presence:online --> Set of all currently online userIds
*
* Using Redis (not in-memory) is CRITICAL for clustered deployments:
* - User may connect to Server A but their contacts may be on Server B.
* - Redis gives a global view of presence.
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class PresenceService {
private final StringRedisTemplate redisTemplate;
private static final String PRESENCE_KEY = "presence:online";
private static final String SESSION_KEY_PREFIX = "presence:user:";
private static final Duration SESSION_TTL = Duration.ofHours(24);
/**
* Marks a user online with a specific session.
* A user can have multiple sessions (multiple tabs/devices).
*/
public void markOnline(String userId, String sessionId) {
String sessionKey = SESSION_KEY_PREFIX + userId + ":sessions";
redisTemplate.opsForSet().add(sessionKey, sessionId);
redisTemplate.expire(sessionKey, SESSION_TTL);
redisTemplate.opsForSet().add(PRESENCE_KEY, userId);
log.debug("User {} is now online (session={})", userId, sessionId);
}
/**
* Removes a specific session. Only marks user offline if no sessions remain.
*/
public void markOffline(String userId, String sessionId) {
String sessionKey = SESSION_KEY_PREFIX + userId + ":sessions";
redisTemplate.opsForSet().remove(sessionKey, sessionId);
Long remaining = redisTemplate.opsForSet().size(sessionKey);
if (remaining <mark class="obsidian-highlight"> null || remaining </mark> 0) {
redisTemplate.opsForSet().remove(PRESENCE_KEY, userId);
redisTemplate.delete(sessionKey);
log.debug("User {} is now offline", userId);
} else {
log.debug("User {} still has {} active sessions", userId, remaining);
}
}
/**
* Checks if a user is currently online.
*/
public boolean isOnline(String userId) {
Boolean member = redisTemplate.opsForSet().isMember(PRESENCE_KEY, userId);
return Boolean.TRUE.equals(member);
}
/**
* Gets all currently online users from a given list.
* Useful for checking presence of a user's contacts.
*/
public Set<String> getOnlineUsers(List<String> userIds) {
return userIds.stream()
.filter(this::isOnline)
.collect(Collectors.toSet());
}
/**
* Returns the count of active sessions for a user.
* Useful for showing "3 devices" in UI.
*/
public long getActiveSessionCount(String userId) {
String sessionKey = SESSION_KEY_PREFIX + userId + ":sessions";
Long count = redisTemplate.opsForSet().size(sessionKey);
return count != null ? count : 0;
}
}4. Pattern 4 - Real-Time Order Tracking
Order Status Controller
// src/main/java/com/example/ws/controller/OrderTrackingController.java
package com.example.ws.controller;
import com.example.ws.service.OrderTrackingService;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;
import java.security.Principal;
@Controller
@RequiredArgsConstructor
public class OrderTrackingController {
private final OrderTrackingService orderTrackingService;
/**
* When a client subscribes to /app/order.track/{orderId}
* they immediately receive the current order status.
*
* Client code:
* stompClient.subscribe('/app/order.track/ORD-123', callback)
*
* Subsequent updates are pushed to: /user/queue/order.status
*/
@SubscribeMapping("/order.track/{orderId}")
public OrderStatus onSubscribeToOrder(
@DestinationVariable String orderId,
Principal principal) {
String userId = principal.getName();
// Verify this user owns this order
orderTrackingService.validateOrderOwnership(userId, orderId);
// Register this user as tracking this order
orderTrackingService.startTracking(userId, orderId);
// Return current status immediately
return orderTrackingService.getCurrentStatus(orderId);
}
}Order Status Pusher (Event-Driven)
// src/main/java/com/example/ws/service/OrderTrackingService.java
package com.example.ws.service;
import com.example.ws.dto.OrderStatus;
import com.example.ws.entity.Order;
import com.example.ws.repository.OrderRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import java.time.Duration;
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderTrackingService {
private final OrderRepository orderRepository;
private final SimpMessagingTemplate messagingTemplate;
private final StringRedisTemplate redisTemplate;
// Redis key: "order:tracking:{orderId}" --> Set of userIds tracking this order
private static final String TRACKING_KEY_PREFIX = "order:tracking:";
public void startTracking(String userId, String orderId) {
redisTemplate.opsForSet().add(TRACKING_KEY_PREFIX + orderId, userId);
redisTemplate.expire(TRACKING_KEY_PREFIX + orderId, Duration.ofHours(24));
}
public void stopTracking(String userId, String orderId) {
redisTemplate.opsForSet().remove(TRACKING_KEY_PREFIX + orderId, userId);
}
@Transactional(readOnly = true)
public OrderStatus getCurrentStatus(String orderId) {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new RuntimeException("Order not found: " + orderId));
return buildOrderStatus(order);
}
/**
* Called when an order status changes (from payment service, logistics, etc.)
* Uses @TransactionalEventListener to ensure DB is committed before pushing WS update.
*
* This prevents pushing "SHIPPED" status to the client before the DB is committed.
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onOrderStatusChanged(OrderStatusChangedEvent event) {
String orderId = event.getOrderId();
// Get all users tracking this order from Redis
java.util.Set<String> trackingUsers =
redisTemplate.opsForSet().members(TRACKING_KEY_PREFIX + orderId);
if (trackingUsers == null || trackingUsers.isEmpty()) {
return; // No one is actively tracking this order
}
OrderStatus status = getCurrentStatus(orderId);
// Push update to each tracking user
for (String userId : trackingUsers) {
try {
messagingTemplate.convertAndSendToUser(userId, "/queue/order.status", status);
log.debug("Pushed order {} status {} to user {}", orderId, status.getStatus(), userId);
} catch (Exception e) {
log.warn("Failed to push order status to user {}: {}", userId, e.getMessage());
}
}
}
public void validateOrderOwnership(String userId, String orderId) {
if (!orderRepository.existsByOrderIdAndUserId(orderId, userId)) {
throw new SecurityException("User " + userId + " does not own order " + orderId);
}
}
private OrderStatus buildOrderStatus(Order order) {
return OrderStatus.builder()
.orderId(order.getOrderId())
.status(order.getStatus())
.estimatedDelivery(order.getEstimatedDelivery())
.currentLocation(order.getCurrentLocation())
.lastUpdated(order.getUpdatedAt())
.build();
}
}5. Pattern 5 - Collaborative Editing (Operational Transform)
The Challenge
When two users edit the same document simultaneously, their changes can conflict. The Operational Transform (OT) algorithm resolves conflicts by transforming operations against each other.
Simple OT Implementation
// src/main/java/com/example/ws/controller/DocumentController.java
package com.example.ws.controller;
import com.example.ws.dto.DocumentOperation;
import com.example.ws.service.DocumentService;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Controller;
import java.security.Principal;
@Controller
@RequiredArgsConstructor
public class DocumentController {
private final DocumentService documentService;
/**
* Client sends an operation: insert/delete at a position.
* Server transforms it against concurrent operations and broadcasts.
*
* Destination: /app/doc.operation/{docId}
* Broadcast to: /topic/doc.{docId}
*/
@MessageMapping("/doc.operation/{docId}")
public void applyOperation(
@DestinationVariable String docId,
@Payload DocumentOperation operation,
Principal principal) {
String userId = principal.getName();
documentService.applyAndBroadcast(docId, userId, operation);
}
}// src/main/java/com/example/ws/dto/DocumentOperation.java
package com.example.ws.dto;
import lombok.Data;
@Data
public class DocumentOperation {
// Operation type: INSERT or DELETE
private String type;
// Position in document where operation applies
private int position;
// For INSERT: the text to insert
private String text;
// For DELETE: number of characters to delete
private int length;
// Client's version number when this operation was created
// Used for OT conflict resolution
private int revision;
// Assigned by server after transformation
private int serverRevision;
private String operationId;
private String userId;
}// src/main/java/com/example/ws/service/DocumentService.java
package com.example.ws.service;
import com.example.ws.dto.DocumentOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@Service
@RequiredArgsConstructor
@Slf4j
public class DocumentService {
private final SimpMessagingTemplate messagingTemplate;
// Document content per docId
private final ConcurrentHashMap<String, StringBuilder> documents = new ConcurrentHashMap<>();
// Operation history per docId (for OT transformation)
private final ConcurrentHashMap<String, List<DocumentOperation>> operationHistory =
new ConcurrentHashMap<>();
// Per-document lock to serialize concurrent operations
private final ConcurrentHashMap<String, ReentrantLock> documentLocks =
new ConcurrentHashMap<>();
public void applyAndBroadcast(String docId, String userId, DocumentOperation operation) {
ReentrantLock lock = documentLocks.computeIfAbsent(docId, k -> new ReentrantLock());
lock.lock();
try {
List<DocumentOperation> history = operationHistory.computeIfAbsent(
docId, k -> new ArrayList<>()
);
// Transform the incoming operation against all server operations
// that happened since the client's revision
DocumentOperation transformed = transformOperation(operation, history);
transformed.setOperationId(UUID.randomUUID().toString());
transformed.setUserId(userId);
transformed.setServerRevision(history.size());
// Apply to the document
applyToDocument(docId, transformed);
history.add(transformed);
// Broadcast transformed operation to all collaborators
messagingTemplate.convertAndSend("/topic/doc." + docId, transformed);
} finally {
lock.unlock();
}
}
/**
* Transform operation against concurrent operations using OT.
* This is a simplified implementation for INSERT/DELETE operations.
* Production systems use libraries like ShareDB or ot.js.
*/
private DocumentOperation transformOperation(
DocumentOperation op,
List<DocumentOperation> concurrentOps) {
DocumentOperation result = cloneOperation(op);
int clientRevision = op.getRevision();
// Apply transformation for each server operation that happened
// after the client's revision
for (int i = clientRevision; i < concurrentOps.size(); i++) {
DocumentOperation serverOp = concurrentOps.get(i);
result = transform(result, serverOp);
}
return result;
}
/**
* Transform operation A against operation B.
* Returns a new operation A' that achieves the same intent
* when applied after B.
*/
private DocumentOperation transform(DocumentOperation a, DocumentOperation b) {
DocumentOperation result = cloneOperation(a);
if ("INSERT".equals(b.getType())) {
if (a.getPosition() >= b.getPosition()) {
// B inserted before A's position - shift A's position right
result.setPosition(a.getPosition() + b.getText().length());
}
} else if ("DELETE".equals(b.getType())) {
if (a.getPosition() > b.getPosition()) {
// B deleted before A's position - shift A's position left
result.setPosition(Math.max(b.getPosition(),
a.getPosition() - b.getLength()));
}
}
return result;
}
private void applyToDocument(String docId, DocumentOperation op) {
StringBuilder doc = documents.computeIfAbsent(docId, k -> new StringBuilder());
if ("INSERT".equals(op.getType())) {
int pos = Math.min(op.getPosition(), doc.length());
doc.insert(pos, op.getText());
} else if ("DELETE".equals(op.getType())) {
int start = Math.min(op.getPosition(), doc.length());
int end = Math.min(start + op.getLength(), doc.length());
doc.delete(start, end);
}
}
private DocumentOperation cloneOperation(DocumentOperation original) {
DocumentOperation clone = new DocumentOperation();
clone.setType(original.getType());
clone.setPosition(original.getPosition());
clone.setText(original.getText());
clone.setLength(original.getLength());
clone.setRevision(original.getRevision());
clone.setOperationId(original.getOperationId());
clone.setUserId(original.getUserId());
return clone;
}
}6. Pattern 6 - Notification System
Notification Service
// src/main/java/com/example/ws/service/NotificationService.java
package com.example.ws.service;
import com.example.ws.dto.Notification;
import com.example.ws.dto.NotificationType;
import com.example.ws.entity.NotificationEntity;
import com.example.ws.repository.NotificationRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {
private final SimpMessagingTemplate messagingTemplate;
private final NotificationRepository notificationRepository;
private final PresenceService presenceService;
/**
* Sends a notification to a user.
*
* Strategy:
* 1. Always persist to DB (for users who are offline)
* 2. If user is online, also push via WebSocket for instant delivery
*
* The client, on connect, fetches unread notifications from REST API.
* WebSocket is used only for real-time push when user is online.
*/
@Transactional
public void sendNotification(String targetUserId, NotificationType type, String message,
String referenceId) {
// Always save to DB - offline users will fetch on next login
NotificationEntity entity = NotificationEntity.builder()
.notificationId(UUID.randomUUID().toString())
.userId(targetUserId)
.type(type.name())
.message(message)
.referenceId(referenceId)
.isRead(false)
.createdAt(Instant.now())
.build();
notificationRepository.save(entity);
// If user is online, push immediately
if (presenceService.isOnline(targetUserId)) {
Notification notification = buildNotification(entity);
messagingTemplate.convertAndSendToUser(
targetUserId,
"/queue/notifications",
notification
);
log.debug("Pushed notification to online user {}", targetUserId);
} else {
log.debug("User {} is offline, notification saved to DB", targetUserId);
}
}
/**
* Sends a notification to multiple users at once (bulk notification).
* Example: "New feature available" to all users in a group.
*/
public void broadcastToGroup(List<String> userIds, NotificationType type, String message) {
for (String userId : userIds) {
sendNotification(userId, type, message, null);
}
}
/**
* Marks a notification as read and confirms via WebSocket.
*/
@Transactional
public void markAsRead(String userId, String notificationId) {
notificationRepository.markAsRead(notificationId, userId);
// Optionally push a "marked as read" confirmation back to the user
// so other tabs/devices also update their UI
ReadConfirmation confirmation = new ReadConfirmation(notificationId, Instant.now());
messagingTemplate.convertAndSendToUser(
userId,
"/queue/notification.read",
confirmation
);
}
/**
* Returns unread notifications for a user.
* Called via REST API on initial page load.
*/
@Transactional(readOnly = true)
public List<Notification> getUnreadNotifications(String userId) {
return notificationRepository.findUnreadByUserId(userId).stream()
.map(this::buildNotification)
.collect(Collectors.toList());
}
private Notification buildNotification(NotificationEntity entity) {
return Notification.builder()
.notificationId(entity.getNotificationId())
.type(entity.getType())
.message(entity.getMessage())
.referenceId(entity.getReferenceId())
.isRead(entity.isRead())
.createdAt(entity.getCreatedAt())
.build();
}
}7. Pattern 7 - Rate Limiting Per Connection
Rate Limiter using Token Bucket (per user)
// src/main/java/com/example/ws/ratelimit/WebSocketRateLimiter.java
package com.example.ws.ratelimit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Token Bucket rate limiter for WebSocket messages per user.
*
* Each user gets a bucket of tokens.
* - Sending a message consumes 1 token.
* - Tokens refill at a fixed rate.
* - If bucket is empty, message is rejected.
*
* This prevents a single user from flooding the server.
*/
@Component
@Slf4j
public class WebSocketRateLimiter {
private final ConcurrentHashMap<String, TokenBucket> userBuckets = new ConcurrentHashMap<>();
// 30 messages per 10 seconds per user
private static final int MAX_TOKENS = 30;
private static final long REFILL_INTERVAL_MS = 10000; // 10 seconds
private static final int REFILL_AMOUNT = 30;
/**
* Checks if a user is allowed to send a message.
* Returns true if allowed, false if rate limit exceeded.
*/
public boolean isAllowed(String userId) {
TokenBucket bucket = userBuckets.computeIfAbsent(userId,
k -> new TokenBucket(MAX_TOKENS, REFILL_INTERVAL_MS, REFILL_AMOUNT)
);
return bucket.consume();
}
/**
* Cleans up bucket when user disconnects.
*/
public void cleanup(String userId) {
userBuckets.remove(userId);
}
private static class TokenBucket {
private final AtomicInteger tokens;
private final int maxTokens;
private final long refillIntervalMs;
private final int refillAmount;
private volatile long lastRefillTime;
TokenBucket(int maxTokens, long refillIntervalMs, int refillAmount) {
this.maxTokens = maxTokens;
this.refillIntervalMs = refillIntervalMs;
this.refillAmount = refillAmount;
this.tokens = new AtomicInteger(maxTokens);
this.lastRefillTime = Instant.now().toEpochMilli();
}
synchronized boolean consume() {
refillIfNeeded();
if (tokens.get() > 0) {
tokens.decrementAndGet();
return true;
}
return false;
}
private void refillIfNeeded() {
long now = Instant.now().toEpochMilli();
if (now - lastRefillTime >= refillIntervalMs) {
tokens.set(Math.min(maxTokens, tokens.get() + refillAmount));
lastRefillTime = now;
}
}
}
}Integrating Rate Limiter in Channel Interceptor
// Add this to WebSocketChannelInterceptor.preSend():
} else if (StompCommand.SEND.equals(command)) {
Principal user = accessor.getUser();
if (user == null) {
throw new SecurityException("Unauthenticated SEND");
}
// Rate limit check
if (!rateLimiter.isAllowed(user.getName())) {
log.warn("Rate limit exceeded for user={}", user.getName());
throw new RuntimeException("Rate limit exceeded. Please slow down.");
}
}8. Pattern 8 - Message Acknowledgment and Reliability
The Problem
WebSocket does not guarantee message delivery. If a client is briefly disconnected and reconnects, messages sent during that gap are lost unless you implement acknowledgment.
Acknowledgment Pattern
// src/main/java/com/example/ws/reliability/MessageAckService.java
package com.example.ws.reliability;
import com.example.ws.dto.ChatMessageResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.util.Set;
/**
* Implements message acknowledgment and replay for reliability.
*
* On reconnect, the client sends its last received messageId.
* Server replays all messages after that ID.
*
* This ensures "at-least-once" delivery for WebSocket messages.
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class MessageAckService {
private final SimpMessagingTemplate messagingTemplate;
private final StringRedisTemplate redisTemplate;
// Redis sorted set: key="unacked:{roomId}" value=messageId score=timestamp
private static final String UNACKED_KEY_PREFIX = "unacked:";
private static final Duration UNACKED_TTL = Duration.ofHours(24);
/**
* Stores a message in Redis pending acknowledgment.
* The client acknowledges after it processes the message.
*/
public void storeForAck(String roomId, String messageId, long timestamp) {
String key = UNACKED_KEY_PREFIX + roomId;
redisTemplate.opsForZSet().add(key, messageId, timestamp);
redisTemplate.expire(key, UNACKED_TTL);
}
/**
* Client acknowledges receiving a message.
* Removes from the pending set.
*/
public void acknowledge(String roomId, String messageId) {
String key = UNACKED_KEY_PREFIX + roomId;
redisTemplate.opsForZSet().remove(key, messageId);
}
/**
* On client reconnect: replays all messages after the given messageId/timestamp.
* This is the "replay buffer" pattern.
*/
public void replayMissedMessages(String userId, String roomId, long sinceTimestamp) {
String key = UNACKED_KEY_PREFIX + roomId;
// Get all messages with timestamp > sinceTimestamp
Set<String> missedMessageIds = redisTemplate.opsForZSet()
.rangeByScore(key, sinceTimestamp + 1, Double.MAX_VALUE);
if (missedMessageIds == null || missedMessageIds.isEmpty()) {
log.debug("No missed messages for user={} in room={}", userId, roomId);
return;
}
log.info("Replaying {} missed messages to user={}", missedMessageIds.size(), userId);
// Fetch messages from DB and replay
for (String messageId : missedMessageIds) {
// Fetch from DB and send to user
messagingTemplate.convertAndSendToUser(
userId,
"/queue/replay",
new ReplayMessage(messageId, roomId)
);
}
}
}9. Reconnection Strategy with Exponential Backoff
JavaScript Client Reconnection (for reference)
// Frontend reconnection logic - placed here for documentation
// Demonstrates what a well-implemented JS client looks like
class WebSocketClient {
constructor(url, token) {
this.url = url;
this.token = token;
this.stompClient = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.baseDelay = 1000; // 1 second
this.maxDelay = 60000; // 60 seconds max
this.subscriptions = new Map();
}
connect() {
const socket = new SockJS(this.url + '?token=' + this.token);
this.stompClient = Stomp.over(socket);
// Disable STOMP debug logs in production
this.stompClient.debug = null;
this.stompClient.connect(
{},
(frame) => this.onConnected(frame),
(error) => this.onError(error)
);
}
onConnected(frame) {
console.log('Connected to WebSocket');
this.reconnectAttempts = 0; // Reset on successful connect
// Re-subscribe to all previous destinations
this.subscriptions.forEach((callback, destination) => {
this.stompClient.subscribe(destination, callback);
});
// Request replay of missed messages since last disconnect
const lastTimestamp = localStorage.getItem('lastMessageTimestamp');
if (lastTimestamp) {
this.stompClient.send('/app/replay', {}, JSON.stringify({ since: lastTimestamp }));
}
}
onError(error) {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max reconnect attempts reached');
return;
}
// Exponential backoff with jitter
const delay = Math.min(
this.baseDelay * Math.pow(2, this.reconnectAttempts) +
Math.random() * 1000, // Add jitter to prevent thundering herd
this.maxDelay
);
this.reconnectAttempts++;
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => this.connect(), delay);
}
subscribe(destination, callback) {
this.subscriptions.set(destination, callback);
if (this.stompClient && this.stompClient.connected) {
this.stompClient.subscribe(destination, callback);
}
}
send(destination, body) {
if (this.stompClient && this.stompClient.connected) {
this.stompClient.send(destination, {}, JSON.stringify(body));
} else {
console.warn('Cannot send: WebSocket not connected');
}
}
}10. Database Entities and Repositories
Core JPA Entities
// src/main/java/com/example/ws/entity/ChatMessage.java
package com.example.ws.entity;
import jakarta.persistence.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.Instant;
@Entity
@Table(
name = "chat_messages",
indexes = {
@Index(name = "idx_room_created", columnList = "room_id, created_at DESC"),
@Index(name = "idx_sender", columnList = "sender_id")
}
)
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessage {
@Id
@Column(name = "message_id", length = 36)
private String messageId;
@Column(name = "room_id", nullable = false, length = 36)
private String roomId;
@Column(name = "sender_id", nullable = false, length = 36)
private String senderId;
@Column(name = "content", nullable = false, columnDefinition = "TEXT")
private String content;
@Column(name = "message_type", length = 20)
private String messageType;
@Column(name = "status", length = 20)
private String status;
@Column(name = "created_at", nullable = false)
private Instant createdAt;
@Column(name = "updated_at")
private Instant updatedAt;
}// src/main/java/com/example/ws/repository/ChatMessageRepository.java
package com.example.ws.repository;
import com.example.ws.entity.ChatMessage;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface ChatMessageRepository extends JpaRepository<ChatMessage, String> {
/**
* Fetches the most recent N messages for a room, ordered by newest first.
* The caller reverses the list to display oldest-first.
*/
@Query("""
SELECT m FROM ChatMessage m
WHERE m.roomId = :roomId
ORDER BY m.createdAt DESC
""")
List<ChatMessage> findTopByRoomIdOrderByCreatedAtDesc(
@Param("roomId") String roomId,
Pageable pageable
);
default List<ChatMessage> findTopByRoomIdOrderByCreatedAtDesc(String roomId, int limit) {
return findTopByRoomIdOrderByCreatedAtDesc(roomId, Pageable.ofSize(limit));
}
/**
* Marks all messages in a room up to a given messageId as read by a user.
*/
@Modifying
@Query("""
UPDATE ChatMessage m SET m.status = 'READ'
WHERE m.roomId = :roomId
AND m.senderId != :readerId
AND m.createdAt <= (
SELECT m2.createdAt FROM ChatMessage m2
WHERE m2.messageId = :lastReadMessageId
)
""")
void markAsReadUpTo(
@Param("roomId") String roomId,
@Param("readerId") String readerId,
@Param("lastReadMessageId") String lastReadMessageId
);
}MySQL Schema
-- V1__create_websocket_tables.sql (Flyway migration)
CREATE TABLE chat_rooms (
room_id VARCHAR(36) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
created_by VARCHAR(36) NOT NULL,
is_private TINYINT(1) NOT NULL DEFAULT 0,
created_at DATETIME(6) NOT NULL,
updated_at DATETIME(6),
INDEX idx_created_by (created_by),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE chat_room_members (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
room_id VARCHAR(36) NOT NULL,
user_id VARCHAR(36) NOT NULL,
role VARCHAR(20) NOT NULL DEFAULT 'MEMBER',
joined_at DATETIME(6) NOT NULL,
UNIQUE KEY uk_room_user (room_id, user_id),
INDEX idx_user_id (user_id),
FOREIGN KEY (room_id) REFERENCES chat_rooms(room_id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE chat_messages (
message_id VARCHAR(36) NOT NULL PRIMARY KEY,
room_id VARCHAR(36) NOT NULL,
sender_id VARCHAR(36) NOT NULL,
content TEXT NOT NULL,
message_type VARCHAR(20) NOT NULL DEFAULT 'TEXT',
status VARCHAR(20) NOT NULL DEFAULT 'SENT',
created_at DATETIME(6) NOT NULL,
updated_at DATETIME(6),
INDEX idx_room_created (room_id, created_at DESC),
INDEX idx_sender (sender_id),
FOREIGN KEY (room_id) REFERENCES chat_rooms(room_id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE notifications (
notification_id VARCHAR(36) NOT NULL PRIMARY KEY,
user_id VARCHAR(36) NOT NULL,
type VARCHAR(50) NOT NULL,
message TEXT NOT NULL,
reference_id VARCHAR(36),
is_read TINYINT(1) NOT NULL DEFAULT 0,
created_at DATETIME(6) NOT NULL,
INDEX idx_user_unread (user_id, is_read, created_at DESC)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;