[Spring Integration] サーバ機能を介して生成したソケットを使ってクライアント送信を行う


タイトルだけでやりたい事が伝わる自信が全くないので・・・、絵を使ってやりたい事を補足します。

やりたいこと

私が参画している案件は下記の図だと「Gateway」としている部分で、クライアントからHTTP経由でJSONメッセージを受け取り、固定項目長メッセージへ変換してからTCP経由でサーバへ送信する必要があります。

Spring Integrationでの実現性は?

現在参画している案件ではSpring Integrationを使っているのですが、ざっと調べた感じだと・・・標準機能の組み合わせだけでは無理そうでした。

標準機能だけでは無理そうでしたが、以下のように送信コンポーネントを呼び出す前に「あること」を行うコンポーネントを挟み込むことで実現することができそうです。

「あること」=「利用するコネクションのコネクションIDをメッセージヘッダに設定すること」です。

実際に試してみる

Bean設定およびHeaderEnricherの実装は以下のような感じ。

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean // サーバからの接続を受けるためのコネクションファクトリ
    public AbstractServerConnectionFactory serverConnectionFactory() {
        return Tcp.nioServer(9999).get();
    }

    @Bean // サーバからメッセージが送られてきた時の処理(※今回はとりあえずログ出力してメッセージを破棄するフローを定義しておく)
    public IntegrationFlow receiver(AbstractConnectionFactory serverConnectionFactory) {
        return IntegrationFlows.from(Tcp.inboundAdapter(serverConnectionFactory))
                .<byte[], String>transform(String::new)
                .log(LoggingHandler.Level.INFO, "client-res")
                .nullChannel();
    }

    @Bean // サーバへメッセージを送信する処理
    public IntegrationFlow sender(AbstractConnectionFactory serverConnectionFactory) {
        return IntegrationFlows.from(MessageChannels.direct("senderChannel"))
                // 【今回のポイント】 送信処理で利用するコネクションのIDをメッセージヘッダへ設定しておく
                // TODO 利用するコネクションの決定方法(⇨実アプリだとラウンドロビン方式で使うコネクションを振り分ける必要あり)
                .enrich(x -> x.headerFunction(IpHeaders.CONNECTION_ID,
                        m -> serverConnectionFactory.getOpenConnectionIds().get(0)))
                .log(LoggingHandler.Level.INFO, "client-req")
                .<String, byte[]>transform(String::getBytes)
                .handle(Tcp.outboundAdapter(serverConnectionFactory))
                .get();
    }

    @Bean // テストケース側でsenderにメッセージを送信する時に使う用
    public MessagingTemplate messagingTemplate() {
        return new MessagingTemplate();
    }

}

テストケースクラスを作って、やりたいことができているか検証してみます。

@SpringBootTest
class DemoApplicationTests {

    @Autowired
    private MessagingTemplate messagingTemplate;

    @Test
    void sending() throws IOException, InterruptedException {

        // サーバからGatewayへ接続してソケットを開く
        try (Socket socket = new Socket("localhost", 9999)) {
            socket.setSoTimeout(2000);
            TimeUnit.SECONDS.sleep(1);

            // サーバへメッセージを送信
            // 本来であればクライアントシステムから送信されたものを固定項目長のメッセージに変換してから送る
            messagingTemplate.send("senderChannel", MessageBuilder.withPayload("abc").build());
            messagingTemplate.send("senderChannel", MessageBuilder.withPayload("def").build());

            // サーバでメッセージを受信
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
                {
                    String message = reader.readLine();
                    System.out.println("server-req : " + message);
                    Assertions.assertThat(message).isEqualTo("abc");
                }
                {
                    String message = reader.readLine();
                    System.out.println("server-req : " + message);
                    Assertions.assertThat(message).isEqualTo("def");
                }
            }
        }

    }

}

できた!っぽい

検証バージョン

  • Spring Integration 5.2.1.RELEASE
  • Spring Boot 2.2.1.RELEASE

まとめ

Spring Integrationでは、以下のようにGwatewayから接続を行ってソケットを開くパターンはサポートしています。

今回のようなケースは特殊な接続仕様?なのかもしれませんが、少し工夫することで実現できそうなのでよかったです。

参考ページ