WebSocket Implementation
Overview
Build scalable WebSocket systems for real-time communication with proper connection management, message routing, error handling, and horizontal scaling support.
When to Use
- Building real-time chat and messaging
- Implementing live notifications
- Creating collaborative editing tools
- Broadcasting live data updates
- Building real-time dashboards
- Streaming events to clients
- Live multiplayer games
Instructions
1. Node.js WebSocket Server (Socket.IO)
const express = require("express");
const http = require("http");
const socketIo = require("socket.io");
const redis = require("redis");
const app = express();
const server = http.createServer(app);
const io = socketIo(server, {
cors: { origin: "*" },
transports: ["websocket", "polling"],
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
reconnectionAttempts: 5,
});
// Redis adapter for horizontal scaling
const redisClient = redis.createClient();
const { createAdapter } = require("@socket.io/redis-adapter");
io.adapter(createAdapter(redisClient, redisClient.duplicate()));
// Connection management
const connectedUsers = new Map();
io.on("connection", (socket) => {
console.log(`User connected: ${socket.id}`);
// Store user connection
socket.on("auth", (userData) => {
connectedUsers.set(socket.id, {
userId: userData.id,
username: userData.username,
socketId: socket.id,
connectedAt: new Date(),
});
// Join user-specific room
socket.join(`user:${userData.id}`);
socket.join("authenticated_users");
// Notify others user is online
io.to("authenticated_users").emit("user:online", {
userId: userData.id,
username: userData.username,
timestamp: new Date(),
});
console.log(`User authenticated: ${userData.username}`);
});
// Chat messaging
socket.on("chat:message", (message) => {
const user = connectedUsers.get(socket.id);
if (!user) {
socket.emit("error", { message: "Not authenticated" });
return;
}
const chatMessage = {
id: `msg_${Date.now()}`,
senderId: user.userId,
senderName: user.username,
text: message.text,
roomId: message.roomId,
timestamp: new Date(),
status: "delivered",
};
// Save to database
Message.create(chatMessage);
// Broadcast to room
io.to(`room:${message.roomId}`).emit("chat:message", chatMessage);
// Update message status
setTimeout(() => {
socket.emit("chat:message:ack", {
messageId: chatMessage.id,
status: "read",
});
}, 100);
});
// Room management
socket.on("room:join", (roomId) => {
socket.join(`room:${roomId}`);
const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit("room:user:joined", {
userId: user.userId,
username: user.username,
timestamp: new Date(),
});
});
socket.on("room:leave", (roomId) => {
socket.leave(`room:${roomId}`);
const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit("room:user:left", {
userId: user.userId,
timestamp: new Date(),
});
});
// Typing indicator
socket.on("typing:start", (roomId) => {
const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit("typing:indicator", {
userId: user.userId,
username: user.username,
isTyping: true,
});
});
socket.on("typing:stop", (roomId) => {
const user = connectedUsers.get(socket.id);
io.to(`room:${roomId}`).emit("typing:indicator", {
userId: user.userId,
isTyping: false,
});
});
// Handle disconnection
socket.on("disconnect", () => {
const user = connectedUsers.get(socket.id);
if (user) {
connectedUsers.delete(socket.id);
io.to("authenticated_users").emit("user:offline", {
userId: user.userId,
timestamp: new Date(),
});
console.log(`User disconnected: ${user.username}`);
}
});
// Error handling
socket.on("error", (error) => {
console.error(`Socket error: ${error}`);
socket.emit("error", { message: "An error occurred" });
});
});
// Server methods
const broadcastUserUpdate = (userId, data) => {
io.to(`user:${userId}`).emit("user:update", data);
};
const notifyRoom = (roomId, event, data) => {
io.to(`room:${roomId}`).emit(event, data);
};
const sendDirectMessage = (userId, event, data) => {
io.to(`user:${userId}`).emit(event, data);
};
server.listen(3000, () => {
console.log("WebSocket server listening on port 3000");
});
2. Browser WebSocket Client
class WebSocketClient {
constructor(url, options = {}) {
this.url = url;
this.socket = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
this.reconnectDelay = options.reconnectDelay || 1000;
this.listeners = new Map();
this.messageQueue = [];
this.isAuthenticated = false;
this.connect();
}
connect() {
this.socket = io(this.url, {
reconnection: true,
reconnectionDelay: this.reconnectDelay,
reconnectionAttempts: this.maxReconnectAttempts,
});
this.socket.on("connect", () => {
console.log("Connected to server");
this.reconnectAttempts = 0;
this.processMessageQueue();
});
this.socket.on("disconnect", () => {
console.log("Disconnected from server");
});
this.socket.on("error", (error) => {
console.error("Socket error:", error);
this.emit("error", error);
});
this.socket.on("connect_error", (error) => {
console.error("Connection error:", error);
});
}
authenticate(userData) {
this.socket.emit("auth", userData, (response) => {
if (response.success) {
this.isAuthenticated = true;
this.emit("authenticated");
}
});
}
on(event, callback) {
this.socket.on(event, callback);
if (!this.listeners.has(event)) {
this.listeners.set(event, []);
}
this.listeners.get(event).push(callback);
}
emit(event, data, callback) {
if (!this.socket.connected) {
this.messageQueue.push({ event, data, callback });
return;
}
this.socket.emit(event, data, callback);
}
processMessageQueue() {
while (this.messageQueue.length > 0) {
const { event, data, callback } = this.messageQueue.shift();
this.socket.emit(event, data, callback);
}
}
joinRoom(roomId) {
this.emit("room:join", roomId);
}
leaveRoom(roomId) {
this.emit("room:leave", roomId);
}
sendMessage(roomId, text) {
this.emit("chat:message", { roomId, text });
}
setTypingIndicator(roomId, isTyping) {
if (isTyping) {
this.emit("typing:start", roomId);
} else {
this.emit("typing:stop", roomId);
}
}
disconnect() {
this.socket.disconnect();
}
}
// Usage
const client = new WebSocketClient("http://localhost:3000");
client.on("chat:message", (message) => {
console.log("Received message:", message);
displayMessage(message);
});
client.on("typing:indicator", (data) => {
updateTypingIndicator(data);
});
client.on("user:online", (user) => {
updateUserStatus(user.userId, "online");
});
client.authenticate({ id: "user123", username: "john" });
client.joinRoom("room1");
client.sendMessage("room1", "Hello everyone!");
3. Python WebSocket Server (aiohttp)
from aiohttp import web
import aiohttp
import json
from datetime import datetime
from typing import Set
class WebSocketServer:
def __init__(self):
self.app = web.Application()
self.rooms = {}
self.users = {}
self.setup_routes()
def setup_routes(self):
self.app.router.add_get('/ws', self.websocket_handler)
self.app.router.add_post('/api/message', self.send_message_api)
async def websocket_handler(self, request):
ws = web.WebSocketResponse()
await ws.prepare(request)
user_id = None
room_id = None
async for msg in ws.iter_any():
if isinstance(msg, aiohttp.WSMessage):
data = json.loads(msg.data)
event_type = data.get('type')
try:
if event_type == 'auth':
user_id = data.get('userId')
self.users[user_id] = ws
await ws.send_json({
'type': 'authenticated',
'timestamp': datetime.now().isoformat()
})
elif event_type == 'join_room':
room_id = data.get('roomId')
if room_id not in self.rooms:
self.rooms[room_id] = set()
self.rooms[room_id].add(user_id)
# Notify others
await self.broadcast_to_room(room_id, {
'type': 'user_joined',
'userId': user_id,
'timestamp': datetime.now().isoformat()
}, exclude=user_id)
elif event_type == 'message':
message = {
'id': f'msg_{datetime.now().timestamp()}',
'userId': user_id,
'text': data.get('text'),
'roomId': room_id,
'timestamp': datetime.now().isoformat()
}
# Save to database
await self.save_message(message)
# Broadcast to room
await self.broadcast_to_room(room_id, message)
elif event_type == 'leave_room':
if room_id in self.rooms:
self.rooms[room_id].discard(user_id)
except Exception as error:
await ws.send_json({
'type': 'error',
'message': str(error)
})
# Cleanup on disconnect
if user_id:
del self.users[user_id]
if room_id and user_id:
if room_id in self.rooms:
self.rooms[room_id].discard(user_id)
return ws
async def broadcast_to_room(self, room_id, message, exclude=None):
if room_id not in self.rooms:
return
for user_id in self.rooms[room_id]:
if user_id != exclude and user_id in self.users:
try:
await self.users[user_id].send_json(message)
except Exception as error:
print(f'Error sending message: {error}')
async def save_message(self, message):
# Save to database
pass
async def send_message_api(self, request):
data = await request.json()
room_id = data.get('roomId')
await self.broadcast_to_room(room_id, {
'type': 'message',
'text': data.get('text'),
'timestamp': datetime.now().isoformat()
})
return web.json_response({'sent': True})
def create_app():
server = WebSocketServer()
return server.app
if __name__ == '__main__':
app = create_app()
web.run_app(app, port=3000)
4. Message Types and Protocols
// Authentication
{
"type": "auth",
"userId": "user123",
"token": "jwt_token_here"
}
// Chat Message
{
"type": "message",
"roomId": "room123",
"text": "Hello everyone!",
"timestamp": "2025-01-15T10:30:00Z"
}
// Typing Indicator
{
"type": "typing",
"roomId": "room123",
"isTyping": true
}
// Presence
{
"type": "presence",
"status": "online|away|offline"
}
// Notification
{
"type": "notification",
"title": "New message",
"body": "You have a new message",
"data": {}
}
5. Scaling with Redis
const redis = require("redis");
const { createAdapter } = require("@socket.io/redis-adapter");
const { createClient } = require("redis");
const pubClient = createClient({ host: "redis", port: 6379 });
const subClient = pubClient.duplicate();
io.adapter(createAdapter(pubClient, subClient));
// Publish to multiple servers
io.emit("user:action", { userId: 123, action: "login" });
// Subscribe to events from other servers
redisClient.subscribe("notifications", (message) => {
const notification = JSON.parse(message);
io.to(`user:${notification.userId}`).emit("notification", notification);
});
Best Practices
✅ DO
- Implement proper authentication
- Handle reconnection gracefully
- Manage rooms/channels effectively
- Persist messages appropriately
- Monitor active connections
- Implement presence features
- Use Redis for scaling
- Add message acknowledgment
- Implement rate limiting
- Handle errors properly
❌ DON'T
- Send unencrypted sensitive data
- Keep unlimited message history in memory
- Allow arbitrary room/channel creation
- Forget to clean up disconnected connections
- Send large messages frequently
- Ignore network failures
- Store passwords in messages
- Skip authentication/authorization
- Create unbounded growth of connections
- Ignore scalability from day one
Monitoring
// Track active connections
io.engine.on("connection_error", (err) => {
console.log(err.req); // the request object
console.log(err.code); // the error code, e.g. 1
console.log(err.message); // the error message
console.log(err.context); // some additional error context
});
app.get("/metrics/websocket", (req, res) => {
res.json({
activeConnections: io.engine.clientsCount,
connectedSockets: io.sockets.sockets.size,
rooms: Object.keys(io.sockets.adapter.rooms),
});
});