Non-Socket.ioなプロセスからSocket.ioプロセスたちにメッセージを送信する


Socket.ioサーバをスケールアウトする(Redisを使った複数プロセス間でのブロードキャスト)という記事で、Redisを使ってSocket.ioサーバをスケールアウトする方法を書いたが、Redisを使うとNon-Socket.ioなプロセスからメッセージを送信することもできるようになる。

サーバとクライアントの実装を再掲。

サーバ

index.js
const cluster = require('cluster');
const io = require('socket.io')();
const sticky = require('sticky-session');
const http = require('http');
const redis = require('socket.io-redis');

const server = http.createServer((req, res) => {
  res.end('worker: ' + cluster.worker.id);
});

io.adapter(redis({host: '192.168.99.100', port: 6379}));
io.attach(server);
isWorker = sticky.listen(server, 3000);

if (isWorker) {
  io.on('connection', (socket) => {
    console.log(`worker: ${cluster.worker.id}, connected, id: ${socket.id}`);

    socket.on('chat message', (user, message) => {
      data = `${message} from ${user}`;
      console.log(data);
      socket.broadcast.emit('chat message', data);
    });

    socket.on('disconnect', () => {
      console.log(`disconnected, id: ${socket.id}`);
    });
  });
}

クライアント

client.js
const readline = require('readline');
const socket = require('socket.io-client')('http://localhost:3000');

const user = process.argv[2];

rl = readline.createInterface(process.stdin, process.stdout);
rl.setPrompt('');

rl.on('line', (line) => {
  socket.emit('chat message', user, line);
})
.on('close', () => {
  process.exit(0);
});

socket.on('connect', () => {
  console.log(`connected, id: ${socket.id}`);
});

socket.on('chat message', (data) => {
  console.log(data);
});

socket.on('error', (err) => {
  console.log('Error:', err);
});

socket.on('disconnect', () => {
  console.log('disconnected');
});

それぞれ実行する。

# クライアントA
$ node client.js A
connected, id: S1H5pJcm8Kun5lOoAAAA
# クライアントB
$ node client.js B
connected, id: 7HLiEmHioqbLuINtAAAA
# サーバ
$ node index.js
worker: 2, connected, id: S1H5pJcm8Kun5lOoAAAA
worker: 4, connected, id: 7HLiEmHioqbLuINtAAAA

クライアントAとBでチャットができる。

# クライアントA
$ node client.js A
connected, id: S1H5pJcm8Kun5lOoAAAA

Hello  # クライアントAが送信したメッセージ

Hi from B  # クライアントBが送信したメッセージ
# クライアントB
$ node client.js B
connected, id: 7HLiEmHioqbLuINtAAAA

Hello from A  # クライアントAが送信したメッセージ

Hi  # クライアントBが送信したメッセージ

ここで、第三者PからクライアントAとBにメッセージを送りたい。
このPはサーバとコネクションは張らず、一方的にクライアントAとBにメッセージを送る。

socket.io-emitterを使うと、この実装が可能になる。

emitter.js
const io = require('socket.io-emitter')({ host: 'localhost', port: 6379 });
const socketId = process.argv[2];

if (socketId) {
  io.to(socketId).emit('chat message', 'Hi guys from Emitter');
} else {
  io.emit('chat message', 'message from outside');
}

実行する。

$ node emitter.js

クライアントAとBのソケットにemitter.jsからのメッセージが送信される。

# クライアントA
$ node client.js A
connected, id: S1H5pJcm8Kun5lOoAAAA

Hello  # クライアントAが送信したメッセージ

Hi from B  # クライアントBが送信したメッセージ

Hi guys from Emitter  # emitter.jsが送信したメッセージ
# クライアントB
$ node client.js B
connected, id: 7HLiEmHioqbLuINtAAAA

Hello from A  # クライアントAが送信したメッセージ

Hi  # クライアントBが送信したメッセージ

Hi guys from Emitter  # emitter.jsが送信したメッセージ

クライアントAだけにメッセージを送信することもできる。

$ node emitter.js S1H5pJcm8Kun5lOoAAAA  # クライアントAが接続しているソケットのID

裏側で何が起きているか

Socket.ioサーバをスケールアウトする(Redisを使った複数プロセス間でのブロードキャスト)のときと同じで、こちらもRedisのPUB/SUBを利用している。

サーバを起動し、クライアントAとBが接続すると、サーバは以下のチャンネルをSUBSCRIBEした状態になっている。
以下はredis-cli monitorの出力。

# サーバ起動時
"subscribe" "socket.io#/#" "socket.io-request#/#" "socket.io-response#/#"

# クライアントA接続時
"subscribe" "socket.io#/#S1H5pJcm8Kun5lOoAAAA#"

# クライアントB接続時
"subscribe" "socket.io#/#7HLiEmHioqbLuINtAAAA#"

emitter.jsを実行すると、Redisに対して以下のコマンドが実行される。

"publish" "socket.io#/#" "\x93\xa7emitter\x83\xa4type\x02\xa4data\x92\xacchat message\xb4Hi guys from Emitter\xa3nsp\xa1/\x82\xa5rooms\x90\xa5flags\x80"

サーバはsocket.io#/#チャンネルをSUBSCRIBEしているため、このメッセージを受信し、それをクライアントに送信する。

クライアントAだけに送信するとした場合、以下のコマンドが実行される。

"publish" "socket.io#/#S1H5pJcm8Kun5lOoAAAA#" "\x93\xa7emitter\x83\xa4type\x02\xa4data\x92\xacchat message\xb4message from outside\xa3nsp\xa1/\x82\xa5rooms\x91\xb4S1H5pJcm8Kun5lOoAAAA\xa5flags\x80"

サーバはsocket.io#/#S1H5pJcm8Kun5lOoAAAA#チャンネルもSUBSCRIBEしているため、このメッセージを受信し、それをクライアントAに送信する。

なお、emitter.jsからはchat messageというイベントを発火しているが、サーバはこのイベントに反応しないようである。