← Back to Articles
6/6/2026Admin Post

websockets part3 advanced patterns

WebSockets Demystified - Part 3: Advanced Patterns

Series: Index | Part 1 | Part 2 | Part 3 | Part 4 | Part 5 | Part 6


Table of Contents

  1. Pattern 1 - Real-Time Chat System
  2. Pattern 2 - Live Dashboard with Stock Prices
  3. Pattern 3 - User Presence System
  4. Pattern 4 - Real-Time Order Tracking
  5. Pattern 5 - Collaborative Editing (Operational Transform)
  6. Pattern 6 - Notification System
  7. Pattern 7 - Rate Limiting Per Connection
  8. Pattern 8 - Message Acknowledgment and Reliability
  9. Reconnection Strategy with Exponential Backoff
  10. Database Entities and Repositories

1. Pattern 1 - Real-Time Chat System

Architecture

Client A ----SEND msg----> /app/chat.send/{roomId}
                                    |
                           ChatController
                                    |
                           ChatService.processAndBroadcast()
                            |              |
                     Save to MySQL    Publish to /topic/room.{roomId}
                                           |
                               (all subscribers receive)
                                   |            |
                               Client A      Client B

Room Service

// src/main/java/com/example/ws/service/ChatRoomService.java
 
package com.example.ws.service;
 
import com.example.ws.entity.ChatRoom;
import com.example.ws.entity.ChatRoomMember;
import com.example.ws.exception.RoomNotFoundException;
import com.example.ws.repository.ChatRoomMemberRepository;
import com.example.ws.repository.ChatRoomRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
import java.time.Instant;
import java.util.List;
import java.util.UUID;
 
@Service
@RequiredArgsConstructor
public class ChatRoomService {
 
    private final ChatRoomRepository roomRepository;
    private final ChatRoomMemberRepository memberRepository;
    private final SimpMessagingTemplate messagingTemplate;
 
    /**
     * Creates a new chat room.
     */
    @Transactional
    public ChatRoom createRoom(String name, String creatorId, boolean isPrivate) {
        ChatRoom room = ChatRoom.builder()
            .roomId(UUID.randomUUID().toString())
            .name(name)
            .createdBy(creatorId)
            .isPrivate(isPrivate)
            .createdAt(Instant.now())
            .build();
 
        ChatRoom saved = roomRepository.save(room);
 
        // Add creator as a member automatically
        addMember(saved.getRoomId(), creatorId, "ADMIN");
 
        return saved;
    }
 
    /**
     * Adds a user to a room and notifies existing members.
     */
    @Transactional
    public void addMember(String roomId, String userId, String role) {
        ChatRoom room = roomRepository.findById(roomId)
            .orElseThrow(() -> new RoomNotFoundException(roomId));
 
        ChatRoomMember member = ChatRoomMember.builder()
            .roomId(roomId)
            .userId(userId)
            .role(role)
            .joinedAt(Instant.now())
            .build();
 
        memberRepository.save(member);
 
        // Broadcast user joined event to all room members
        UserJoinedEvent event = new UserJoinedEvent(roomId, userId, Instant.now());
        messagingTemplate.convertAndSend("/topic/room." + roomId + ".events", event);
    }
 
    /**
     * Removes a user from a room and notifies remaining members.
     */
    @Transactional
    public void removeMember(String roomId, String userId) {
        memberRepository.deleteByRoomIdAndUserId(roomId, userId);
 
        UserLeftEvent event = new UserLeftEvent(roomId, userId, Instant.now());
        messagingTemplate.convertAndSend("/topic/room." + roomId + ".events", event);
    }
 
    /**
     * Gets all rooms a user is a member of.
     */
    @Transactional(readOnly = true)
    public List<ChatRoom> getUserRooms(String userId) {
        return roomRepository.findByMemberId(userId);
    }
}

Read Receipts

// src/main/java/com/example/ws/controller/ReadReceiptController.java
 
package com.example.ws.controller;
 
import com.example.ws.dto.ReadReceiptRequest;
import com.example.ws.service.ReadReceiptService;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Controller;
 
import java.security.Principal;
 
@Controller
@RequiredArgsConstructor
public class ReadReceiptController {
 
    private final ReadReceiptService readReceiptService;
 
    /**
     * Client sends read receipt: /app/chat.read/{roomId}
     * Server marks messages as read and notifies the sender.
     */
    @MessageMapping("/chat.read/{roomId}")
    public void markAsRead(
            @Payload ReadReceiptRequest request,
            Principal principal) {
 
        String readerId = principal.getName();
 
        // Update message status in DB and notify the original sender
        readReceiptService.markMessagesRead(
            readerId,
            request.getRoomId(),
            request.getLastReadMessageId()
        );
    }
}
// src/main/java/com/example/ws/service/ReadReceiptService.java
 
package com.example.ws.service;
 
import com.example.ws.dto.ReadReceiptEvent;
import com.example.ws.repository.ChatMessageRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
@Service
@RequiredArgsConstructor
public class ReadReceiptService {
 
    private final ChatMessageRepository messageRepository;
    private final SimpMessagingTemplate messagingTemplate;
 
    @Transactional
    public void markMessagesRead(String readerId, String roomId, String lastReadMessageId) {
        // Update message status in DB - all messages up to lastReadMessageId
        messageRepository.markAsReadUpTo(roomId, readerId, lastReadMessageId);
 
        // Notify the room about who has read up to which message
        ReadReceiptEvent event = new ReadReceiptEvent(readerId, roomId, lastReadMessageId);
        messagingTemplate.convertAndSend("/topic/room." + roomId + ".receipts", event);
    }
}

2. Pattern 2 - Live Dashboard with Stock Prices

Architecture

MySQL/External API --> StockPriceScheduler --> /topic/prices --> All subscribed clients
                   --> PortfolioScheduler  --> /user/queue/portfolio --> Each specific user

Price Update DTO

// src/main/java/com/example/ws/dto/PriceUpdate.java
 
package com.example.ws.dto;
 
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
 
import java.math.BigDecimal;
import java.time.Instant;
 
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PriceUpdate {
 
    private String symbol;
    private BigDecimal price;
    private BigDecimal change;
    private BigDecimal changePercent;
    private long volume;
    private Instant timestamp;
}

Throttled Price Broadcaster

A critical pattern: do not push every single price tick to clients. Throttle to a human-readable rate.

// src/main/java/com/example/ws/scheduler/ThrottledPriceBroadcaster.java
 
package com.example.ws.scheduler;
 
import com.example.ws.dto.PriceUpdate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
 
import java.math.BigDecimal;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
@Component
@RequiredArgsConstructor
@Slf4j
public class ThrottledPriceBroadcaster {
 
    private final SimpMessagingTemplate messagingTemplate;
 
    // Accumulate latest prices here - overwrites on each tick
    // Only the latest price per symbol is kept
    private final ConcurrentHashMap<String, PriceUpdate> latestPrices = new ConcurrentHashMap<>();
 
    /**
     * Receives price updates from exchange - could be 100s per second per symbol.
     * Stores only the latest, does NOT push immediately.
     */
    public void onPriceTick(String symbol, BigDecimal price) {
        PriceUpdate update = PriceUpdate.builder()
            .symbol(symbol)
            .price(price)
            .timestamp(Instant.now())
            .build();
        latestPrices.put(symbol, update);
    }
 
    /**
     * Pushes latest prices to clients at a controlled rate: once per 500ms.
     * This prevents flooding clients with thousands of updates per second.
     *
     * Pattern: "Last-Value Cache" or "Conflation" - industry standard in
     * financial systems (Bloomberg, Reuters use this pattern).
     */
    @Scheduled(fixedDelay = 500)
    public void broadcastLatestPrices() {
        if (latestPrices.isEmpty()) return;
 
        List<PriceUpdate> snapshot = new ArrayList<>(latestPrices.values());
        latestPrices.clear();
 
        try {
            messagingTemplate.convertAndSend("/topic/prices", snapshot);
        } catch (Exception e) {
            log.error("Failed to broadcast prices: {}", e.getMessage());
        }
    }
}

3. Pattern 3 - User Presence System

Presence Service with Redis

// src/main/java/com/example/ws/service/PresenceService.java
 
package com.example.ws.service;
 
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
 
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
 
/**
 * Tracks user online/offline status using Redis Sets.
 *
 * Redis Data Model:
 *   presence:user:{userId}:sessions --> Set of active sessionIds for this user
 *   presence:online --> Set of all currently online userIds
 *
 * Using Redis (not in-memory) is CRITICAL for clustered deployments:
 * - User may connect to Server A but their contacts may be on Server B.
 * - Redis gives a global view of presence.
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class PresenceService {
 
    private final StringRedisTemplate redisTemplate;
 
    private static final String PRESENCE_KEY = "presence:online";
    private static final String SESSION_KEY_PREFIX = "presence:user:";
    private static final Duration SESSION_TTL = Duration.ofHours(24);
 
    /**
     * Marks a user online with a specific session.
     * A user can have multiple sessions (multiple tabs/devices).
     */
    public void markOnline(String userId, String sessionId) {
        String sessionKey = SESSION_KEY_PREFIX + userId + ":sessions";
 
        redisTemplate.opsForSet().add(sessionKey, sessionId);
        redisTemplate.expire(sessionKey, SESSION_TTL);
        redisTemplate.opsForSet().add(PRESENCE_KEY, userId);
 
        log.debug("User {} is now online (session={})", userId, sessionId);
    }
 
    /**
     * Removes a specific session. Only marks user offline if no sessions remain.
     */
    public void markOffline(String userId, String sessionId) {
        String sessionKey = SESSION_KEY_PREFIX + userId + ":sessions";
 
        redisTemplate.opsForSet().remove(sessionKey, sessionId);
        Long remaining = redisTemplate.opsForSet().size(sessionKey);
 
        if (remaining <mark class="obsidian-highlight"> null || remaining </mark> 0) {
            redisTemplate.opsForSet().remove(PRESENCE_KEY, userId);
            redisTemplate.delete(sessionKey);
            log.debug("User {} is now offline", userId);
        } else {
            log.debug("User {} still has {} active sessions", userId, remaining);
        }
    }
 
    /**
     * Checks if a user is currently online.
     */
    public boolean isOnline(String userId) {
        Boolean member = redisTemplate.opsForSet().isMember(PRESENCE_KEY, userId);
        return Boolean.TRUE.equals(member);
    }
 
    /**
     * Gets all currently online users from a given list.
     * Useful for checking presence of a user's contacts.
     */
    public Set<String> getOnlineUsers(List<String> userIds) {
        return userIds.stream()
            .filter(this::isOnline)
            .collect(Collectors.toSet());
    }
 
    /**
     * Returns the count of active sessions for a user.
     * Useful for showing "3 devices" in UI.
     */
    public long getActiveSessionCount(String userId) {
        String sessionKey = SESSION_KEY_PREFIX + userId + ":sessions";
        Long count = redisTemplate.opsForSet().size(sessionKey);
        return count != null ? count : 0;
    }
}

4. Pattern 4 - Real-Time Order Tracking

Order Status Controller

// src/main/java/com/example/ws/controller/OrderTrackingController.java
 
package com.example.ws.controller;
 
import com.example.ws.service.OrderTrackingService;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.stereotype.Controller;
 
import java.security.Principal;
 
@Controller
@RequiredArgsConstructor
public class OrderTrackingController {
 
    private final OrderTrackingService orderTrackingService;
 
    /**
     * When a client subscribes to /app/order.track/{orderId}
     * they immediately receive the current order status.
     *
     * Client code:
     *   stompClient.subscribe('/app/order.track/ORD-123', callback)
     *
     * Subsequent updates are pushed to: /user/queue/order.status
     */
    @SubscribeMapping("/order.track/{orderId}")
    public OrderStatus onSubscribeToOrder(
            @DestinationVariable String orderId,
            Principal principal) {
 
        String userId = principal.getName();
 
        // Verify this user owns this order
        orderTrackingService.validateOrderOwnership(userId, orderId);
 
        // Register this user as tracking this order
        orderTrackingService.startTracking(userId, orderId);
 
        // Return current status immediately
        return orderTrackingService.getCurrentStatus(orderId);
    }
}

Order Status Pusher (Event-Driven)

// src/main/java/com/example/ws/service/OrderTrackingService.java
 
package com.example.ws.service;
 
import com.example.ws.dto.OrderStatus;
import com.example.ws.entity.Order;
import com.example.ws.repository.OrderRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
 
import java.time.Duration;
 
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderTrackingService {
 
    private final OrderRepository orderRepository;
    private final SimpMessagingTemplate messagingTemplate;
    private final StringRedisTemplate redisTemplate;
 
    // Redis key: "order:tracking:{orderId}" --> Set of userIds tracking this order
    private static final String TRACKING_KEY_PREFIX = "order:tracking:";
 
    public void startTracking(String userId, String orderId) {
        redisTemplate.opsForSet().add(TRACKING_KEY_PREFIX + orderId, userId);
        redisTemplate.expire(TRACKING_KEY_PREFIX + orderId, Duration.ofHours(24));
    }
 
    public void stopTracking(String userId, String orderId) {
        redisTemplate.opsForSet().remove(TRACKING_KEY_PREFIX + orderId, userId);
    }
 
    @Transactional(readOnly = true)
    public OrderStatus getCurrentStatus(String orderId) {
        Order order = orderRepository.findById(orderId)
            .orElseThrow(() -> new RuntimeException("Order not found: " + orderId));
        return buildOrderStatus(order);
    }
 
    /**
     * Called when an order status changes (from payment service, logistics, etc.)
     * Uses @TransactionalEventListener to ensure DB is committed before pushing WS update.
     *
     * This prevents pushing "SHIPPED" status to the client before the DB is committed.
     */
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onOrderStatusChanged(OrderStatusChangedEvent event) {
        String orderId = event.getOrderId();
 
        // Get all users tracking this order from Redis
        java.util.Set<String> trackingUsers =
            redisTemplate.opsForSet().members(TRACKING_KEY_PREFIX + orderId);
 
        if (trackingUsers == null || trackingUsers.isEmpty()) {
            return; // No one is actively tracking this order
        }
 
        OrderStatus status = getCurrentStatus(orderId);
 
        // Push update to each tracking user
        for (String userId : trackingUsers) {
            try {
                messagingTemplate.convertAndSendToUser(userId, "/queue/order.status", status);
                log.debug("Pushed order {} status {} to user {}", orderId, status.getStatus(), userId);
            } catch (Exception e) {
                log.warn("Failed to push order status to user {}: {}", userId, e.getMessage());
            }
        }
    }
 
    public void validateOrderOwnership(String userId, String orderId) {
        if (!orderRepository.existsByOrderIdAndUserId(orderId, userId)) {
            throw new SecurityException("User " + userId + " does not own order " + orderId);
        }
    }
 
    private OrderStatus buildOrderStatus(Order order) {
        return OrderStatus.builder()
            .orderId(order.getOrderId())
            .status(order.getStatus())
            .estimatedDelivery(order.getEstimatedDelivery())
            .currentLocation(order.getCurrentLocation())
            .lastUpdated(order.getUpdatedAt())
            .build();
    }
}

5. Pattern 5 - Collaborative Editing (Operational Transform)

The Challenge

When two users edit the same document simultaneously, their changes can conflict. The Operational Transform (OT) algorithm resolves conflicts by transforming operations against each other.

Simple OT Implementation

// src/main/java/com/example/ws/controller/DocumentController.java
 
package com.example.ws.controller;
 
import com.example.ws.dto.DocumentOperation;
import com.example.ws.service.DocumentService;
import lombok.RequiredArgsConstructor;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Controller;
 
import java.security.Principal;
 
@Controller
@RequiredArgsConstructor
public class DocumentController {
 
    private final DocumentService documentService;
 
    /**
     * Client sends an operation: insert/delete at a position.
     * Server transforms it against concurrent operations and broadcasts.
     *
     * Destination: /app/doc.operation/{docId}
     * Broadcast to: /topic/doc.{docId}
     */
    @MessageMapping("/doc.operation/{docId}")
    public void applyOperation(
            @DestinationVariable String docId,
            @Payload DocumentOperation operation,
            Principal principal) {
 
        String userId = principal.getName();
        documentService.applyAndBroadcast(docId, userId, operation);
    }
}
// src/main/java/com/example/ws/dto/DocumentOperation.java
 
package com.example.ws.dto;
 
import lombok.Data;
 
@Data
public class DocumentOperation {
 
    // Operation type: INSERT or DELETE
    private String type;
 
    // Position in document where operation applies
    private int position;
 
    // For INSERT: the text to insert
    private String text;
 
    // For DELETE: number of characters to delete
    private int length;
 
    // Client's version number when this operation was created
    // Used for OT conflict resolution
    private int revision;
 
    // Assigned by server after transformation
    private int serverRevision;
 
    private String operationId;
    private String userId;
}
// src/main/java/com/example/ws/service/DocumentService.java
 
package com.example.ws.service;
 
import com.example.ws.dto.DocumentOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
 
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
 
@Service
@RequiredArgsConstructor
@Slf4j
public class DocumentService {
 
    private final SimpMessagingTemplate messagingTemplate;
 
    // Document content per docId
    private final ConcurrentHashMap<String, StringBuilder> documents = new ConcurrentHashMap<>();
 
    // Operation history per docId (for OT transformation)
    private final ConcurrentHashMap<String, List<DocumentOperation>> operationHistory =
        new ConcurrentHashMap<>();
 
    // Per-document lock to serialize concurrent operations
    private final ConcurrentHashMap<String, ReentrantLock> documentLocks =
        new ConcurrentHashMap<>();
 
    public void applyAndBroadcast(String docId, String userId, DocumentOperation operation) {
 
        ReentrantLock lock = documentLocks.computeIfAbsent(docId, k -> new ReentrantLock());
        lock.lock();
 
        try {
            List<DocumentOperation> history = operationHistory.computeIfAbsent(
                docId, k -> new ArrayList<>()
            );
 
            // Transform the incoming operation against all server operations
            // that happened since the client's revision
            DocumentOperation transformed = transformOperation(operation, history);
            transformed.setOperationId(UUID.randomUUID().toString());
            transformed.setUserId(userId);
            transformed.setServerRevision(history.size());
 
            // Apply to the document
            applyToDocument(docId, transformed);
            history.add(transformed);
 
            // Broadcast transformed operation to all collaborators
            messagingTemplate.convertAndSend("/topic/doc." + docId, transformed);
 
        } finally {
            lock.unlock();
        }
    }
 
    /**
     * Transform operation against concurrent operations using OT.
     * This is a simplified implementation for INSERT/DELETE operations.
     * Production systems use libraries like ShareDB or ot.js.
     */
    private DocumentOperation transformOperation(
            DocumentOperation op,
            List<DocumentOperation> concurrentOps) {
 
        DocumentOperation result = cloneOperation(op);
        int clientRevision = op.getRevision();
 
        // Apply transformation for each server operation that happened
        // after the client's revision
        for (int i = clientRevision; i < concurrentOps.size(); i++) {
            DocumentOperation serverOp = concurrentOps.get(i);
            result = transform(result, serverOp);
        }
 
        return result;
    }
 
    /**
     * Transform operation A against operation B.
     * Returns a new operation A' that achieves the same intent
     * when applied after B.
     */
    private DocumentOperation transform(DocumentOperation a, DocumentOperation b) {
        DocumentOperation result = cloneOperation(a);
 
        if ("INSERT".equals(b.getType())) {
            if (a.getPosition() >= b.getPosition()) {
                // B inserted before A's position - shift A's position right
                result.setPosition(a.getPosition() + b.getText().length());
            }
        } else if ("DELETE".equals(b.getType())) {
            if (a.getPosition() > b.getPosition()) {
                // B deleted before A's position - shift A's position left
                result.setPosition(Math.max(b.getPosition(),
                    a.getPosition() - b.getLength()));
            }
        }
 
        return result;
    }
 
    private void applyToDocument(String docId, DocumentOperation op) {
        StringBuilder doc = documents.computeIfAbsent(docId, k -> new StringBuilder());
 
        if ("INSERT".equals(op.getType())) {
            int pos = Math.min(op.getPosition(), doc.length());
            doc.insert(pos, op.getText());
        } else if ("DELETE".equals(op.getType())) {
            int start = Math.min(op.getPosition(), doc.length());
            int end = Math.min(start + op.getLength(), doc.length());
            doc.delete(start, end);
        }
    }
 
    private DocumentOperation cloneOperation(DocumentOperation original) {
        DocumentOperation clone = new DocumentOperation();
        clone.setType(original.getType());
        clone.setPosition(original.getPosition());
        clone.setText(original.getText());
        clone.setLength(original.getLength());
        clone.setRevision(original.getRevision());
        clone.setOperationId(original.getOperationId());
        clone.setUserId(original.getUserId());
        return clone;
    }
}

6. Pattern 6 - Notification System

Notification Service

// src/main/java/com/example/ws/service/NotificationService.java
 
package com.example.ws.service;
 
import com.example.ws.dto.Notification;
import com.example.ws.dto.NotificationType;
import com.example.ws.entity.NotificationEntity;
import com.example.ws.repository.NotificationRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
 
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
 
@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {
 
    private final SimpMessagingTemplate messagingTemplate;
    private final NotificationRepository notificationRepository;
    private final PresenceService presenceService;
 
    /**
     * Sends a notification to a user.
     *
     * Strategy:
     * 1. Always persist to DB (for users who are offline)
     * 2. If user is online, also push via WebSocket for instant delivery
     *
     * The client, on connect, fetches unread notifications from REST API.
     * WebSocket is used only for real-time push when user is online.
     */
    @Transactional
    public void sendNotification(String targetUserId, NotificationType type, String message,
                                  String referenceId) {
 
        // Always save to DB - offline users will fetch on next login
        NotificationEntity entity = NotificationEntity.builder()
            .notificationId(UUID.randomUUID().toString())
            .userId(targetUserId)
            .type(type.name())
            .message(message)
            .referenceId(referenceId)
            .isRead(false)
            .createdAt(Instant.now())
            .build();
 
        notificationRepository.save(entity);
 
        // If user is online, push immediately
        if (presenceService.isOnline(targetUserId)) {
            Notification notification = buildNotification(entity);
            messagingTemplate.convertAndSendToUser(
                targetUserId,
                "/queue/notifications",
                notification
            );
            log.debug("Pushed notification to online user {}", targetUserId);
        } else {
            log.debug("User {} is offline, notification saved to DB", targetUserId);
        }
    }
 
    /**
     * Sends a notification to multiple users at once (bulk notification).
     * Example: "New feature available" to all users in a group.
     */
    public void broadcastToGroup(List<String> userIds, NotificationType type, String message) {
        for (String userId : userIds) {
            sendNotification(userId, type, message, null);
        }
    }
 
    /**
     * Marks a notification as read and confirms via WebSocket.
     */
    @Transactional
    public void markAsRead(String userId, String notificationId) {
        notificationRepository.markAsRead(notificationId, userId);
 
        // Optionally push a "marked as read" confirmation back to the user
        // so other tabs/devices also update their UI
        ReadConfirmation confirmation = new ReadConfirmation(notificationId, Instant.now());
        messagingTemplate.convertAndSendToUser(
            userId,
            "/queue/notification.read",
            confirmation
        );
    }
 
    /**
     * Returns unread notifications for a user.
     * Called via REST API on initial page load.
     */
    @Transactional(readOnly = true)
    public List<Notification> getUnreadNotifications(String userId) {
        return notificationRepository.findUnreadByUserId(userId).stream()
            .map(this::buildNotification)
            .collect(Collectors.toList());
    }
 
    private Notification buildNotification(NotificationEntity entity) {
        return Notification.builder()
            .notificationId(entity.getNotificationId())
            .type(entity.getType())
            .message(entity.getMessage())
            .referenceId(entity.getReferenceId())
            .isRead(entity.isRead())
            .createdAt(entity.getCreatedAt())
            .build();
    }
}

7. Pattern 7 - Rate Limiting Per Connection

Rate Limiter using Token Bucket (per user)

// src/main/java/com/example/ws/ratelimit/WebSocketRateLimiter.java
 
package com.example.ws.ratelimit;
 
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
 
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * Token Bucket rate limiter for WebSocket messages per user.
 *
 * Each user gets a bucket of tokens.
 * - Sending a message consumes 1 token.
 * - Tokens refill at a fixed rate.
 * - If bucket is empty, message is rejected.
 *
 * This prevents a single user from flooding the server.
 */
@Component
@Slf4j
public class WebSocketRateLimiter {
 
    private final ConcurrentHashMap<String, TokenBucket> userBuckets = new ConcurrentHashMap<>();
 
    // 30 messages per 10 seconds per user
    private static final int MAX_TOKENS = 30;
    private static final long REFILL_INTERVAL_MS = 10000; // 10 seconds
    private static final int REFILL_AMOUNT = 30;
 
    /**
     * Checks if a user is allowed to send a message.
     * Returns true if allowed, false if rate limit exceeded.
     */
    public boolean isAllowed(String userId) {
        TokenBucket bucket = userBuckets.computeIfAbsent(userId,
            k -> new TokenBucket(MAX_TOKENS, REFILL_INTERVAL_MS, REFILL_AMOUNT)
        );
        return bucket.consume();
    }
 
    /**
     * Cleans up bucket when user disconnects.
     */
    public void cleanup(String userId) {
        userBuckets.remove(userId);
    }
 
    private static class TokenBucket {
 
        private final AtomicInteger tokens;
        private final int maxTokens;
        private final long refillIntervalMs;
        private final int refillAmount;
        private volatile long lastRefillTime;
 
        TokenBucket(int maxTokens, long refillIntervalMs, int refillAmount) {
            this.maxTokens = maxTokens;
            this.refillIntervalMs = refillIntervalMs;
            this.refillAmount = refillAmount;
            this.tokens = new AtomicInteger(maxTokens);
            this.lastRefillTime = Instant.now().toEpochMilli();
        }
 
        synchronized boolean consume() {
            refillIfNeeded();
            if (tokens.get() > 0) {
                tokens.decrementAndGet();
                return true;
            }
            return false;
        }
 
        private void refillIfNeeded() {
            long now = Instant.now().toEpochMilli();
            if (now - lastRefillTime >= refillIntervalMs) {
                tokens.set(Math.min(maxTokens, tokens.get() + refillAmount));
                lastRefillTime = now;
            }
        }
    }
}

Integrating Rate Limiter in Channel Interceptor

// Add this to WebSocketChannelInterceptor.preSend():
 
} else if (StompCommand.SEND.equals(command)) {
    Principal user = accessor.getUser();
    if (user == null) {
        throw new SecurityException("Unauthenticated SEND");
    }
 
    // Rate limit check
    if (!rateLimiter.isAllowed(user.getName())) {
        log.warn("Rate limit exceeded for user={}", user.getName());
        throw new RuntimeException("Rate limit exceeded. Please slow down.");
    }
}

8. Pattern 8 - Message Acknowledgment and Reliability

The Problem

WebSocket does not guarantee message delivery. If a client is briefly disconnected and reconnects, messages sent during that gap are lost unless you implement acknowledgment.

Acknowledgment Pattern

// src/main/java/com/example/ws/reliability/MessageAckService.java
 
package com.example.ws.reliability;
 
import com.example.ws.dto.ChatMessageResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
 
import java.time.Duration;
import java.util.Set;
 
/**
 * Implements message acknowledgment and replay for reliability.
 *
 * On reconnect, the client sends its last received messageId.
 * Server replays all messages after that ID.
 *
 * This ensures "at-least-once" delivery for WebSocket messages.
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class MessageAckService {
 
    private final SimpMessagingTemplate messagingTemplate;
    private final StringRedisTemplate redisTemplate;
 
    // Redis sorted set: key="unacked:{roomId}" value=messageId score=timestamp
    private static final String UNACKED_KEY_PREFIX = "unacked:";
    private static final Duration UNACKED_TTL = Duration.ofHours(24);
 
    /**
     * Stores a message in Redis pending acknowledgment.
     * The client acknowledges after it processes the message.
     */
    public void storeForAck(String roomId, String messageId, long timestamp) {
        String key = UNACKED_KEY_PREFIX + roomId;
        redisTemplate.opsForZSet().add(key, messageId, timestamp);
        redisTemplate.expire(key, UNACKED_TTL);
    }
 
    /**
     * Client acknowledges receiving a message.
     * Removes from the pending set.
     */
    public void acknowledge(String roomId, String messageId) {
        String key = UNACKED_KEY_PREFIX + roomId;
        redisTemplate.opsForZSet().remove(key, messageId);
    }
 
    /**
     * On client reconnect: replays all messages after the given messageId/timestamp.
     * This is the "replay buffer" pattern.
     */
    public void replayMissedMessages(String userId, String roomId, long sinceTimestamp) {
        String key = UNACKED_KEY_PREFIX + roomId;
 
        // Get all messages with timestamp > sinceTimestamp
        Set<String> missedMessageIds = redisTemplate.opsForZSet()
            .rangeByScore(key, sinceTimestamp + 1, Double.MAX_VALUE);
 
        if (missedMessageIds == null || missedMessageIds.isEmpty()) {
            log.debug("No missed messages for user={} in room={}", userId, roomId);
            return;
        }
 
        log.info("Replaying {} missed messages to user={}", missedMessageIds.size(), userId);
 
        // Fetch messages from DB and replay
        for (String messageId : missedMessageIds) {
            // Fetch from DB and send to user
            messagingTemplate.convertAndSendToUser(
                userId,
                "/queue/replay",
                new ReplayMessage(messageId, roomId)
            );
        }
    }
}

9. Reconnection Strategy with Exponential Backoff

JavaScript Client Reconnection (for reference)

// Frontend reconnection logic - placed here for documentation
// Demonstrates what a well-implemented JS client looks like
 
class WebSocketClient {
 
    constructor(url, token) {
        this.url = url;
        this.token = token;
        this.stompClient = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 10;
        this.baseDelay = 1000;      // 1 second
        this.maxDelay = 60000;      // 60 seconds max
        this.subscriptions = new Map();
    }
 
    connect() {
        const socket = new SockJS(this.url + '?token=' + this.token);
        this.stompClient = Stomp.over(socket);
 
        // Disable STOMP debug logs in production
        this.stompClient.debug = null;
 
        this.stompClient.connect(
            {},
            (frame) => this.onConnected(frame),
            (error) => this.onError(error)
        );
    }
 
    onConnected(frame) {
        console.log('Connected to WebSocket');
        this.reconnectAttempts = 0; // Reset on successful connect
 
        // Re-subscribe to all previous destinations
        this.subscriptions.forEach((callback, destination) => {
            this.stompClient.subscribe(destination, callback);
        });
 
        // Request replay of missed messages since last disconnect
        const lastTimestamp = localStorage.getItem('lastMessageTimestamp');
        if (lastTimestamp) {
            this.stompClient.send('/app/replay', {}, JSON.stringify({ since: lastTimestamp }));
        }
    }
 
    onError(error) {
        if (this.reconnectAttempts >= this.maxReconnectAttempts) {
            console.error('Max reconnect attempts reached');
            return;
        }
 
        // Exponential backoff with jitter
        const delay = Math.min(
            this.baseDelay * Math.pow(2, this.reconnectAttempts) +
            Math.random() * 1000, // Add jitter to prevent thundering herd
            this.maxDelay
        );
 
        this.reconnectAttempts++;
        console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
 
        setTimeout(() => this.connect(), delay);
    }
 
    subscribe(destination, callback) {
        this.subscriptions.set(destination, callback);
        if (this.stompClient && this.stompClient.connected) {
            this.stompClient.subscribe(destination, callback);
        }
    }
 
    send(destination, body) {
        if (this.stompClient && this.stompClient.connected) {
            this.stompClient.send(destination, {}, JSON.stringify(body));
        } else {
            console.warn('Cannot send: WebSocket not connected');
        }
    }
}

10. Database Entities and Repositories

Core JPA Entities

// src/main/java/com/example/ws/entity/ChatMessage.java
 
package com.example.ws.entity;
 
import jakarta.persistence.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
 
import java.time.Instant;
 
@Entity
@Table(
    name = "chat_messages",
    indexes = {
        @Index(name = "idx_room_created", columnList = "room_id, created_at DESC"),
        @Index(name = "idx_sender", columnList = "sender_id")
    }
)
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessage {
 
    @Id
    @Column(name = "message_id", length = 36)
    private String messageId;
 
    @Column(name = "room_id", nullable = false, length = 36)
    private String roomId;
 
    @Column(name = "sender_id", nullable = false, length = 36)
    private String senderId;
 
    @Column(name = "content", nullable = false, columnDefinition = "TEXT")
    private String content;
 
    @Column(name = "message_type", length = 20)
    private String messageType;
 
    @Column(name = "status", length = 20)
    private String status;
 
    @Column(name = "created_at", nullable = false)
    private Instant createdAt;
 
    @Column(name = "updated_at")
    private Instant updatedAt;
}
// src/main/java/com/example/ws/repository/ChatMessageRepository.java
 
package com.example.ws.repository;
 
import com.example.ws.entity.ChatMessage;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
 
import java.util.List;
 
@Repository
public interface ChatMessageRepository extends JpaRepository<ChatMessage, String> {
 
    /**
     * Fetches the most recent N messages for a room, ordered by newest first.
     * The caller reverses the list to display oldest-first.
     */
    @Query("""
        SELECT m FROM ChatMessage m
        WHERE m.roomId = :roomId
        ORDER BY m.createdAt DESC
        """)
    List<ChatMessage> findTopByRoomIdOrderByCreatedAtDesc(
        @Param("roomId") String roomId,
        Pageable pageable
    );
 
    default List<ChatMessage> findTopByRoomIdOrderByCreatedAtDesc(String roomId, int limit) {
        return findTopByRoomIdOrderByCreatedAtDesc(roomId, Pageable.ofSize(limit));
    }
 
    /**
     * Marks all messages in a room up to a given messageId as read by a user.
     */
    @Modifying
    @Query("""
        UPDATE ChatMessage m SET m.status = 'READ'
        WHERE m.roomId = :roomId
          AND m.senderId != :readerId
          AND m.createdAt <= (
              SELECT m2.createdAt FROM ChatMessage m2
              WHERE m2.messageId = :lastReadMessageId
          )
        """)
    void markAsReadUpTo(
        @Param("roomId") String roomId,
        @Param("readerId") String readerId,
        @Param("lastReadMessageId") String lastReadMessageId
    );
}

MySQL Schema

-- V1__create_websocket_tables.sql (Flyway migration)
 
CREATE TABLE chat_rooms (
    room_id VARCHAR(36) NOT NULL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    created_by VARCHAR(36) NOT NULL,
    is_private TINYINT(1) NOT NULL DEFAULT 0,
    created_at DATETIME(6) NOT NULL,
    updated_at DATETIME(6),
    INDEX idx_created_by (created_by),
    INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
CREATE TABLE chat_room_members (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    room_id VARCHAR(36) NOT NULL,
    user_id VARCHAR(36) NOT NULL,
    role VARCHAR(20) NOT NULL DEFAULT 'MEMBER',
    joined_at DATETIME(6) NOT NULL,
    UNIQUE KEY uk_room_user (room_id, user_id),
    INDEX idx_user_id (user_id),
    FOREIGN KEY (room_id) REFERENCES chat_rooms(room_id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
CREATE TABLE chat_messages (
    message_id VARCHAR(36) NOT NULL PRIMARY KEY,
    room_id VARCHAR(36) NOT NULL,
    sender_id VARCHAR(36) NOT NULL,
    content TEXT NOT NULL,
    message_type VARCHAR(20) NOT NULL DEFAULT 'TEXT',
    status VARCHAR(20) NOT NULL DEFAULT 'SENT',
    created_at DATETIME(6) NOT NULL,
    updated_at DATETIME(6),
    INDEX idx_room_created (room_id, created_at DESC),
    INDEX idx_sender (sender_id),
    FOREIGN KEY (room_id) REFERENCES chat_rooms(room_id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
 
CREATE TABLE notifications (
    notification_id VARCHAR(36) NOT NULL PRIMARY KEY,
    user_id VARCHAR(36) NOT NULL,
    type VARCHAR(50) NOT NULL,
    message TEXT NOT NULL,
    reference_id VARCHAR(36),
    is_read TINYINT(1) NOT NULL DEFAULT 0,
    created_at DATETIME(6) NOT NULL,
    INDEX idx_user_unread (user_id, is_read, created_at DESC)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

Next: Part 4 - Production and AWS