使用集群将 Socket.IO 扩展到多个 Node.js 进程

有没有人设法将 Socket.IO扩展到由 Node.js 的 集群模块产生的多个“工作者”进程?

假设我在 辅助进程中有以下内容(伪) :

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);


// socket.io
io.set('store', new socket.RedisStore);


// set-up connections...
io.sockets.on('connection', function(socket) {


socket.on('join', function(rooms) {
rooms.forEach(function(room) {
socket.join(room);
});
});


socket.on('leave', function(rooms) {
rooms.forEach(function(room) {
socket.leave(room);
});
});


});


// Emit a message every second
function send() {
io.sockets.in('room').emit('data', 'howdy');
}


setInterval(send, 1000);

在浏览器上。

// on the client
socket = io.connect();
socket.emit('join', ['room']);


socket.on('data', function(data){
console.log(data);
});

问题是: 每一秒钟,我都在接收 消息,这是由于四个独立的工作进程在发送这些消息。

如何确保消息只发送一次?

56006 次浏览

这实际上看起来像 Socket.IO 在扩展方面取得了成功。您可能希望来自一台服务器的消息传递到该房间中的所有套接字,而不管它们正好连接到哪台服务器。

最好的办法是有一个每秒发送一条消息的主进程。例如,只有在 cluster.isMaster的情况下才能运行它。

让主服务器处理心跳(下面的例子)或在内部不同端口上启动多个进程,并用 nginx 实现负载平衡(nginx 也支持从 V1.3开始的 websockets)。

和主人在一起

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;


// socket.io
io.set('store', new socket.RedisStore);


// set-up connections...
io.sockets.on('connection', function(socket) {
socket.on('join', function(rooms) {
rooms.forEach(function(room) {
socket.join(room);
});
});


socket.on('leave', function(rooms) {
rooms.forEach(function(room) {
socket.leave(room);
});
});


});


if (cluster.isMaster) {
// Fork workers.
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}


// Emit a message every second
function send() {
console.log('howdy');
io.sockets.in('room').emit('data', 'howdy');
}


setInterval(send, 1000);




cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
}

编辑: 在 Socket.IO 1.0 + 中,现在可以使用一个更简单的 Redis 适配器模块,而不是使用多个 Redis 客户端设置存储。

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

下面的例子看起来更像这样:

var cluster = require('cluster');
var os = require('os');


if (cluster.isMaster) {
// we create a HTTP server, but we do not use listen
// that way, we have a socket.io server that doesn't accept connections
var server = require('http').createServer();
var io = require('socket.io').listen(server);
var redis = require('socket.io-redis');


io.adapter(redis({ host: 'localhost', port: 6379 }));


setInterval(function() {
// all workers will receive this in Redis, and emit
io.emit('data', 'payload');
}, 1000);


for (var i = 0; i < os.cpus().length; i++) {
cluster.fork();
}


cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
}


if (cluster.isWorker) {
var express = require('express');
var app = express();


var http = require('http');
var server = http.createServer(app);
var io = require('socket.io').listen(server);
var redis = require('socket.io-redis');


io.adapter(redis({ host: 'localhost', port: 6379 }));
io.on('connection', function(socket) {
socket.emit('data', 'connected to worker: ' + cluster.worker.id);
});


app.listen(80);
}

如果有一个主节点需要发布到其他 Socket.IO 进程,但是它本身不接受套接字连接,那么使用 Socket.io 发射器而不是 Socket.io-redis

如果无法扩展,请使用 DEBUG=*运行 Node 应用程序。现在 Socket.IO 实现了 调试,它还将打印 Redis 适配器调试消息。输出示例:

socket.io:server initializing namespace / +0ms
socket.io:server creating engine.io instance with opts {"path":"/socket.io"} +2ms
socket.io:server attaching client serving req handler +2ms
socket.io-parser encoding packet {"type":2,"data":["event","payload"],"nsp":"/"} +0ms
socket.io-parser encoded {"type":2,"data":["event","payload"],"nsp":"/"} as 2["event","payload"] +1ms
socket.io-redis ignore same uid +0ms

如果您的主进程和子进程都显示相同的解析器消息,则应用程序正在进行适当的伸缩。


如果您是从单个工作线程发出的,那么您的设置应该没有问题。您所做的是从所有四个工作者发出的,并且由于 Redis 发布/订阅,消息不是重复的,而是按照您要求应用程序所做的那样写了四次。下面是 Redis 的一个简单示意图:

Client  <--  Worker 1 emit -->  Redis
Client  <--  Worker 2  <----------|
Client  <--  Worker 3  <----------|
Client  <--  Worker 4  <----------|

正如你所看到的,当你从一个工作者那里排放的时候,它会将排放量发布到 Redis,并且会从已经订阅了 Redis 数据库的其他工作者那里进行镜像。这也意味着您可以使用连接到同一个实例的多个套接字服务器,并且一个服务器上的发出将在所有连接的服务器上触发。

对于集群,当一个客户端连接时,它将连接到您的四个工作者之一,而不是所有四个工作者。这也意味着您从工作线程发出的任何信息只会显示给客户端一次。所以,是的,应用程序是可伸缩的,但是你这样做的方式,你是从所有四个工作者发出的,Redis 数据库让它看起来好像你在一个工作者身上调用了四次。如果一个客户端实际上连接到您的所有四个套接字实例,那么他们将每秒接收16条消息,而不是4条。

套接字处理的类型取决于应用程序的类型。如果要单独处理客户端,那么应该没有问题,因为每个客户端只会触发一个工作者的连接事件。如果您需要一个全局“心跳”,那么您可以在您的主进程中拥有一个套接字处理程序。由于工作进程在主进程终止时会终止,因此您应该抵消主进程的连接负载,并让子进程处理连接。这里有一个例子:

var cluster = require('cluster');
var os = require('os');


if (cluster.isMaster) {
// we create a HTTP server, but we do not use listen
// that way, we have a socket.io server that doesn't accept connections
var server = require('http').createServer();
var io = require('socket.io').listen(server);


var RedisStore = require('socket.io/lib/stores/redis');
var redis = require('socket.io/node_modules/redis');


io.set('store', new RedisStore({
redisPub: redis.createClient(),
redisSub: redis.createClient(),
redisClient: redis.createClient()
}));


setInterval(function() {
// all workers will receive this in Redis, and emit
io.sockets.emit('data', 'payload');
}, 1000);


for (var i = 0; i < os.cpus().length; i++) {
cluster.fork();
}


cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
}


if (cluster.isWorker) {
var express = require('express');
var app = express();


var http = require('http');
var server = http.createServer(app);
var io = require('socket.io').listen(server);


var RedisStore = require('socket.io/lib/stores/redis');
var redis = require('socket.io/node_modules/redis');


io.set('store', new RedisStore({
redisPub: redis.createClient(),
redisSub: redis.createClient(),
redisClient: redis.createClient()
}));


io.sockets.on('connection', function(socket) {
socket.emit('data', 'connected to worker: ' + cluster.worker.id);
});


app.listen(80);
}

在这个例子中,有五个 Socket.IO 实例,一个是主实例,四个是子实例。主服务器从不调用 listen(),因此该进程没有连接开销。但是,如果您对主进程调用一个发出,它将被发布到 Redis,并且四个工作进程将在它们的客户机上执行发出。这会将连接负载偏移到 worker,而且如果 worker 死亡,您的主应用程序逻辑将在 master 中保持不变。

请注意,使用 Redis,所有发出,甚至在一个名称空间或房间中的发出都将由其他工作进程处理,就好像您触发了该进程的发出一样。换句话说,如果您有两个 Socket.IO 实例和一个 Redis 实例,那么在第一个工作者的套接字上调用 emit()将把数据发送给它的客户机,而工作者2将执行与您从该工作者调用发出相同的操作。

行程间通讯不足以让 socket.io 1.4.5与集群一起工作。强制使用 websocket 模式也是必须的。参见 Node.JS、 Socket.IO 和集群中的 WebSocket 握手不起作用