Pulsar Node.jsクライアントライブラリを使ってみた


はじめに

こんにちは。

本稿では、PulsarのNode.jsクライアントライブラリ(以下、pulsar-client-node)を使って、簡単なメッセージの送受信をしていきます。
サーバには、standaloneモードのPulsarを起動させます。

今回は、CentOS 7.5.0 1台の上に以下の環境を構築して試していきます。

  • サーバ

    • Java 11
    • Pulsar(standaloneモード) v2.7.1
  • クライアント

    • Node.js v12.22.1
    • pulsar-client-node v1.3.0
    • pulsar-client-cpp(後述) v2.7.1

サーバの準備

ここでは、メッセージの送受信を行うためのサーバを準備していきます。

まず、Pulsarのstandaloneモードを動かすために、Javaが必要になります。
Javaのインストール後、Pulsarのバイナリファイルが入ったアーカイブファイルをダウンロードします。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.7.1/apache-pulsar-2.7.1-bin.tar.gz

ダウンロード完了後、アーカイブファイルを展開します。

$ tar xzf apache-pulsar-2.7.1-bin.tar.gz
$ cd apache-pulsar-2.7.1
$ ls
bin  conf  examples  instances  lib  LICENSE  licenses  NOTICE  README

これでサーバの準備は完了です。

クライアントの準備

ここでは、pulsar-client-nodeをインストールして、メッセージの送受信を行うためのクライアントの準備をしていきます。

pulsar-client-nodeは、ライブラリ内部でApache PulsarのC++クライアントライブラリ(以下、pulsar-client-cpp)を使っているので、下記の順番でインストールする必要があります。

  1. pulsar-client-cpp
  2. pulsar-client-node

pulsar-client-cppのインストール

こちらを参考に、pulsar-client-cppをインストールしていきます。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.7.1/RPMS/apache-pulsar-client-2.7.1-1.x86_64.rpm
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.7.1/RPMS/apache-pulsar-client-devel-2.7.1-1.x86_64.rpm
$ sudo rpm -ivh apache-pulsar-client*.rpm

pulsar-client-nodeのインストール

pulsar-client-nodeを動かすためには、Node.jsが必要になります。
Node.jsのインストール後、pulsar-client-nodeをインストールしていきます。

まず、pulsar-client-nodeをビルドするために使用されているnode-gypを使うために、gcc-c++をインストールします。

$ sudo yum install -y gcc-c++

次にnpmを使って、pulsar-client-nodeをインストールします。

$ npm install [email protected]

これでクライアントの準備は完了です。

メッセージの送受信

サーバとクライアントの準備ができたので、実際にメッセージの送受信をしていきます。

まず、Pulsar standaloneモードでサーバを起動させます。

# サーバ側のターミナル
$ ./bin/pulsar standalone

次に、メッセージを受け取る側(以下、Consumer)を起動させ、サーバに接続させ、メッセージを受け取る状態にさせます。
Consumerに使用するコードは、pulsar-client-nodeのリポジトリにある、サンプルコードを使用します。

# Consumer側のターミナル
$ node consumer.js

次に、メッセージを送る側(以下、Producer)を起動させ、サーバに接続させ、メッセージを送ります。

Producerに使用するコードもConsumer同様、サンプルコードを使用します。

# Producer側のターミナル
$ node producer.js

すると、Producerから送信されたメッセージがConsumerに流れ、Consumer側のターミナルで以下のように表示されました。

# Consumer側のターミナル
2021-05-12 18:17:45.650 INFO  [139877982844672] ConsumerImpl:216 | [persistent://public/default/my-topic, sub1, 0] Created consumer on broker [127.0.0.1:39562 -> 127.0.0.1:6650]
my-message-0
my-message-1
my-message-2
my-message-3
my-message-4
my-message-5
my-message-6
my-message-7
my-message-8
my-message-9

TypeScriptからの利用

pulsar-client-nodeは、TypeScriptからでも利用できます。
TypeScriptで書かれたコードをコンパイルするために必要な型定義ファイルは、pulsar-client-nodeのリポジトリに用意されています。

まずは、TypeScriptとコンパイルに必要なパッケージをインストールします。

$ sudo npm install -g typescript
$ npm install --save-dev @types/node

次に、ProducerとConsumerのTypeScriptのコードを用意し、tscコマンドでコンパイルします。

producer_ts.ts
import Pulsar = require('pulsar-client');

(async () => {

  const client: Pulsar.Client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
    operationTimeoutSeconds: 30,
  });

  const producer: Pulsar.Producer = await client.createProducer({
    topic: 'persistent://public/default/my-topic',
  });

  for (let i = 0; i < 10; i += 1) {
    const msg = `my-message-${i}`;
    await producer.send({
      data: Buffer.from(msg),
    });
    console.log(`Sent message: ${msg}`);
  }
  await producer.flush();

  await producer.close();
  await client.close();
})();
consumer_ts.ts
import Pulsar = require('pulsar-client');

(async () => {

  const client: Pulsar.Client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
    operationTimeoutSeconds: 30,
  });

  const consumer: Pulsar.Consumer = await client.subscribe({
    topic: 'persistent://public/default/my-topic',
    subscription: 'sub1',
  });

  for (let i = 0; i < 10; i += 1) {
    const msg: Pulsar.Message = await consumer.receive();
    console.log(msg.getData().toString());
    consumer.acknowledge(msg);
  }

  await consumer.close();
  await client.close();
})();
コンパイル
$ tsc producer_ts.ts consumer_ts.ts

コンパイルが完了すると、producer_ts.jsconsumer_ts.jsが作成され実行可能な状態になります。
以下のコマンドで実行すると、先ほどと同様、Producerから送信されたメッセージがConsumerに流れました。

# Producer側のターミナル
$ node producer_ts.js

# Consumer側のターミナル
$ node consumer_ts.js
2021-05-14 09:24:53.913 INFO  [140592544884480] ConsumerImpl:216 | [persistent://public/default/my-topic, sub1, 0] Created consumer on broker [127.0.0.1:41952 -> 127.0.0.1:6650]
my-message-0
my-message-1
my-message-2
my-message-3
my-message-4
my-message-5
my-message-6
my-message-7
my-message-8
my-message-9

おわりに

本稿では、pulsar-client-nodeを使ったメッセージの送受信を行いました。
また、TypeScriptからpulsar-client-nodeを利用することもできました。

今回、シンプルなProducer、Consumerを作成して試しましたが、他にも様々な機能が実装されています。
公式ドキュメントが参考になるかと思います。

参考URL