RedisのPUB/SUBを使ってみる


はじめに

チャットアプリが作りたくてRedisのpub/subやってみました
複数台のwebsocketサーバーをつなぐ役目をしてるみたいです
https://aws.amazon.com/jp/blogs/news/how-to-build-a-chat-application-with-amazon-elasticache-for-redis/

あと、slackのチャンネルみたいなの作るときも使えそうです

環境

必要になるのはRedis、Websocketサーバー、クライアントです
RedisはimageがあるのでWebsocketサーバーとクライアントをNimで作ります
Nimはver.1.2.0です

docker-compose.yml
version: '3'
services:
  ws:
    image: nimlang/nim:latest
    ports:
      - "12345:12345"
    volumes:
      - ./ws:/workspace
    working_dir: /workspace
    tty: true

  redis:
    image: redis:latest
    ports:
      - "6379:6379"

構成

Redisがデータを送信する機能がpub(publish)、データを受信する機能がsub(subscribe)です
pubはchannelというものを指定する必要があり、subするとき同じchannelを指定しないと、データを受け取れません
websocketサーバーを作りch01という名前のchannelをsubscribeしてみます

websoketサーバー

サーバーは公式サイトを参考にしました
Redisとの接続はこちらのパッケージを使いました
subできたデータをクライアントに配信します

server.nim
import asyncnet, asyncdispatch, strformat
import redis

var clients{.threadvar.}: seq[AsyncSocket]

proc serve() {.async.} =
  clients = @[]
  let server = newAsyncSocket()
  server.setSockOpt(OptReuseAddr, true)
  server.bindAddr(Port(12345))
  server.listen()

  while true:
    let client = await server.accept() # クライアントの接続を待つ
    echo "client add"
    clients.add client


proc `$`(msg: RedisMessage): string = fmt"ch: {msg.channel} msg: {msg.message}"

proc broadCast(subMsg: RedisMessage) {.async.} =
  echo fmt"broad cast! {subMsg}"
  for c in clients:
    await c.send($subMsg & "\c\L")

# Redisと接続
proc newRedisConnection(channel: string) {.async.} =
  let client = await openAsync(host = "redis")

  await client.subscribe(channel) # subするchannelの登録

  while true:
    let msg = await client.nextMessage()
    echo fmt"message received! from ch: {msg.channel}"
    await broadCast(msg)

# asyncCheck = {.async.}の関数に使うdiscard
asyncCheck serve()
asyncCheck newRedisConnection("ch01")
runForever()

クライアント

websocketサーバーと接続するクライアントを作ります
subできたデータをサーバーが接続されてる全クライアントに配信するので、それを表示してみます

クライアントはNim in Actionを参考にしました

client.nim
import asyncdispatch, asyncnet

proc newClient(socket: AsyncSocket, serverAddr: string) {.async.} =
  echo("Connecting to ", serverAddr)
  await socket.connect(serverAddr, Port(12345))
  echo("Connected!")
  while true:
    let line = await socket.recvLine()
    echo line


var socket = newAsyncSocket()
asyncCheck newClient(socket, "localhost")

while true:
  asyncdispatch.poll()

結果

サーバーがch01をsubscribeしているので
publishされたデータを受け取ることができます
ch02のようにsubscribeしていないチャンネルのデータは送信されてもサーバーは受け取れません

おわりに

かなり雑に作りましたがpub/subの動きがつかめました
次はjsでクライアントを作って、もっとチャットアプリっぽくしたいです

参考