Spring Cloud StreamとRabitMQの統合を紹介します.
Spring Cloud Streamは、Spring BootとSpring Integrationの上に構築されたフレームワークであり、イベント駆動やメッセージ駆動のマイクロサービスの作成に役立ちます.本論文ではSpring Cloud Streamの概念と構造をいくつかの簡単な例で紹介します.
1 Maven依存
開始前に、私たちはSpring Cloud StreamとRabbiitMQメッセージの中間デバイスの依存性を追加する必要があります.
マイクロサービスアーキテクチャは「スマートエンドポイントとダミーパイプ」の原則に従う.エンドポイント間の通信は、ラビットMQまたはApache Kafkaのようなメッセージ中間体によって駆動される.サービスは、これらのエンドポイントまたはチャネルを介してイベントをリリースし、通信する.
このメッセージ駆動サービスの基本的な構成例を通して、Spring Cloud Streamフレームのいくつかの主要な概念を見てみましょう.
2.1サービスクラス
Spring Cloud Streamを介して簡単なアプリケーションを確立し、Inputチャネルからメッセージを傍受し、Outputチャネルに応答する.
これらの概念の定義を見てみましょう. Bindings-入出力チャネルのインターフェースセットを宣言します. Binder−メッセージ中間実装、例えばKafkaまたはRabbiitMQ Chanel-メッセージ中間部とアプリケーション間の通信パイプライン を表します. Stream Listeners-beanにおけるメッセージ処理方法は、中間デバイスのMessage Coverter特定イベントにおいてオブジェクトの順序付け/逆順序化を行った後、チャネル上のメッセージ上で自動的にメッセージ処理方法を呼び出す. Message Schemasは、メッセージの序文化および逆序文化のために使用され、これらのモードは、オブジェクトタイプの発展を静的に読み取ることができ、または動的にロードすることができる. メッセージを指定の目的地に配信するのは、購読メッセージモードをリリースすることによって行われる.発表者はメッセージをテーマに分類し、各テーマは名称によって識別される.購読者は一つ以上のテーマに興味を示します.中間デバイスフィルタメッセージは、関心のあるテーマを購読サーバに転送します.加入者は、グループIDによって識別される加入者または消費者のグループであり、主題または主題のパーティションからメッセージが負荷バランスで配信される.
2.2試験クラス
テストクラスは、チャネルとの対話とメッセージのチェックを可能にするバインディング装置の実装である.上のenrichLogMessageサービスにメッセージを送り、応答にテキストが含まれているかどうか確認します.
私たちは工程src/main/resourceディレクトリのappication.ymlファイルにRabitMQバインドの配置を追加する必要があります.
3カスタムチャンネル
上記の例では、Spring Cloudで提供されたProcessorインターフェースを使って、このインターフェースにはinputチャネルとout putチャネルがあります.
もし私たちがいくつかの異なるチャンネルを作りたいなら、例えばinputチャンネルとoutputチャンネルを二つ作って、新しいカスタムプロセッサを作ることができます.
Springはこのインターフェースの実現を提供してくれます.チャンネル名は、注釈を用いて@Output(「myOutput」)のように設定することができます.もし設定されていないなら、Springは通路名として名来を使用します.ここには三つの通路があります.myInput、myOutput、another Output.
現在、私達はいくつかのルーティングルールを追加できます.もし受信した値が10未満なら、outputチャネルを通ります.受信した値が10以上の場合は、他のout putチャネルを通ります.
異なるメッセージを送信し、戻り値が異なるチャネルで得られるかどうかを判定する.
@Stream Listenerのコメントを使用して、ユーザーが望むメッセージをフィルタリングするためにカスタムSpEL式を使用することもできます.以下の例では、条件スケジュールを用いてメッセージを異なる出力にルーティングする.
本教程では、Spring Cloud Streamの主な概念を紹介し、RabitMQ上のいくつかの簡単な例を通してどのように使用するかを示した.プロジェクトコードは参照できます.https://download.csdn.net/download/peterwanghao/10412121
1 Maven依存
開始前に、私たちはSpring Cloud StreamとRabbiitMQメッセージの中間デバイスの依存性を追加する必要があります.
<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-starter-stream-rabbitartifactId>
dependency>
Junnitユニットテストに対応するために、pom.xmlファイルに追加します.<dependency>
<groupId>org.springframework.cloudgroupId>
<artifactId>spring-cloud-stream-test-supportartifactId>
<scope>testscope>
dependency>
2主要概念マイクロサービスアーキテクチャは「スマートエンドポイントとダミーパイプ」の原則に従う.エンドポイント間の通信は、ラビットMQまたはApache Kafkaのようなメッセージ中間体によって駆動される.サービスは、これらのエンドポイントまたはチャネルを介してイベントをリリースし、通信する.
このメッセージ駆動サービスの基本的な構成例を通して、Spring Cloud Streamフレームのいくつかの主要な概念を見てみましょう.
2.1サービスクラス
Spring Cloud Streamを介して簡単なアプリケーションを確立し、Inputチャネルからメッセージを傍受し、Outputチャネルに応答する.
@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
public static void main(String[] args) {
SpringApplication.run(MyLoggerServiceApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public LogMessage enrichLogMessage(LogMessage log) {
return new LogMessage(String.format("[1]: %s", log.getMessage()));
}
}
このアプリケーションはINPUTとOUTを結合しているとコメントしました.この2つのチャネルはインターフェースProcessorで定義されている(Spring Cloud Streamデフォルト設定).すべてのチャネルは、特定のメッセージ中間またはバインディング内に配置されている.これらの概念の定義を見てみましょう.
2.2試験クラス
テストクラスは、チャネルとの対話とメッセージのチェックを可能にするバインディング装置の実装である.上のenrichLogMessageサービスにメッセージを送り、応答にテキストが含まれているかどうか確認します.
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationIntegrationTest {
@Autowired
private Processor pipe;
@Autowired
private MessageCollector messageCollector;
@Test
public void whenSendMessage_thenResponseShouldUpdateText() {
pipe.input().send(MessageBuilder.withPayload(new LogMessage("This is my message")).build());
Object payload = messageCollector.forChannel(pipe.output()).poll().getPayload();
assertEquals("[1]: This is my message", payload.toString());
}
}
2.3 RabbiitMQ構成私たちは工程src/main/resourceディレクトリのappication.ymlファイルにRabitMQバインドの配置を追加する必要があります.
spring:
cloud:
stream:
bindings:
input:
destination: queue.log.messages
binder: local_rabbit
group: logMessageConsumers
output:
destination: queue.pretty.log.messages
binder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
inputバインディングは、queue.log.messagesというメッセージスイッチを使用して、outputバインディングは、queue.pretty.logs.messageというメッセージスイッチを使用しています.すべてのバインディングにはlocal_という名前が使われています.ラビットのバインド.RabbitmQスイッチやキューを事前に作成する必要はありませんので、ご注意ください.アプリケーションを実行すると、二つのスイッチが自動的に作成されます.3カスタムチャンネル
上記の例では、Spring Cloudで提供されたProcessorインターフェースを使って、このインターフェースにはinputチャネルとout putチャネルがあります.
もし私たちがいくつかの異なるチャンネルを作りたいなら、例えばinputチャンネルとoutputチャンネルを二つ作って、新しいカスタムプロセッサを作ることができます.
public interface MyProcessor {
String INPUT = "myInput";
@Input
SubscribableChannel myInput();
@Output("myOutput")
MessageChannel anOutput();
@Output
MessageChannel anotherOutput();
}
3.1サービス類Springはこのインターフェースの実現を提供してくれます.チャンネル名は、注釈を用いて@Output(「myOutput」)のように設定することができます.もし設定されていないなら、Springは通路名として名来を使用します.ここには三つの通路があります.myInput、myOutput、another Output.
現在、私達はいくつかのルーティングルールを追加できます.もし受信した値が10未満なら、outputチャネルを通ります.受信した値が10以上の場合は、他のout putチャネルを通ります.
@Autowired
private MyProcessor processor;
@StreamListener(MyProcessor.INPUT)
public void routeValues(Integer val) {
if (val < 10) {
processor.anOutput().send(message(val));
} else {
processor.anotherOutput().send(message(val));
}
}
private static final Message message(T val) {
return MessageBuilder.withPayload(val).build();
}
3.2試験類異なるメッセージを送信し、戻り値が異なるチャネルで得られるかどうかを判定する.
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MultipleOutputsServiceApplication.class)
@DirtiesContext
public class MultipleOutputsServiceApplicationIntegrationTest {
@Autowired
private MyProcessor pipe;
@Autowired
private MessageCollector messageCollector;
@Test
public void whenSendMessage_thenResponseIsInAOutput() {
whenSendMessage(1);
thenPayloadInChannelIs(pipe.anOutput(), 1);
}
@Test
public void whenSendMessage_thenResponseIsInAnotherOutput() {
whenSendMessage(11);
thenPayloadInChannelIs(pipe.anotherOutput(), 11);
}
private void whenSendMessage(Integer val) {
pipe.myInput().send(MessageBuilder.withPayload(val).build());
}
private void thenPayloadInChannelIs(MessageChannel channel, Integer expectedValue) {
Object payload = messageCollector.forChannel(channel).poll().getPayload();
assertEquals(expectedValue, payload);
}
}
4条件に応じて分配する.@Stream Listenerのコメントを使用して、ユーザーが望むメッセージをフィルタリングするためにカスタムSpEL式を使用することもできます.以下の例では、条件スケジュールを用いてメッセージを異なる出力にルーティングする.
@Autowired
private MyProcessor processor;
@StreamListener(
target = MyProcessor.INPUT,
condition = "payload < 10")
public void routeValuesToAnOutput(Integer val) {
processor.anOutput().send(message(val));
}
@StreamListener(
target = MyProcessor.INPUT,
condition = "payload >= 10")
public void routeValuesToAnotherOutput(Integer val) {
processor.anotherOutput().send(message(val));
}
5まとめ本教程では、Spring Cloud Streamの主な概念を紹介し、RabitMQ上のいくつかの簡単な例を通してどのように使用するかを示した.プロジェクトコードは参照できます.https://download.csdn.net/download/peterwanghao/10412121