Spring Webflux、Redis Pubsub、RSocketとAuth 0でスケーラブルなライブストリームチャットサービスを構築すること


導入


このポストは、どのようにtwitchチャットと同様の単一のリアルタイムチャットルームを構築する方法を探る.有効性と需要の増加を維持するために水平にスケールできるサービスを開発することに焦点を当てます.このサービスはJava Spring Bootで開発されます.
リポジトリをチェックしてくださいGitHub .

要件


このポストの主な目標として、リアルタイムのチャットスケーラビリティを探索するチャットルーム機能は最小限に抑えられます.
  • ユーザーは、単一のグローバルチャットルームに参加することができます.選択する余地はありません.
  • サービスメッセージを単純化するためには、テキストだけです.
  • つぶやきに似て、ユーザーは、彼らが部屋に参加したときからチャットメッセージを見ることができるだけです.チャットの歴史はありません.
  • ユーザーは、匿名のメッセージを表示することができますが、唯一の場合は、一意のユーザー名にサインアップしているチャットにメッセージを送信することができます.
  • メッセージ送付者は、ユーザー名だけによって特定されます.
  • 建築


    リアルタイムコミュニケーションを促進するために、我々はクライアントとサービスの間でメッセージを輸送するためにWebSocketプロトコルを使用しています.水平にスケールするために、複数のサーバインスタンスが必要であるので、新しいWebSocket接続はそれらの間でバランスをとることができます.
    WebSocket接続負荷のインスタンス間でバランスをとって、我々はメッセージブローカーを使用してすべてのクライアントにメッセージをファンアウトすることができます.このプロジェクトのために、我々はメッセージを出していますRedis PubSub .

    レッドリス


    Redis Pubsubは、メッセージの高いスループットが最小限の待ち時間でその加入者に追い出されるのを許します.また、スループットと冗長性を高めるために複数のインスタンスに分散することもできます.Shahar Morによる素晴らしい話は、より詳細に入ります.

    制限


    Redis Pubsubによる1つのトレードオフは、それが加入者に「火をつけて、忘れる」だけで、メッセージ配信を保証する方法を提供しないということです.これは、サーバーインスタンスが再起動され、ユーザーが再接続する必要がある場合、メッセージがドロップされる可能性があります.
    保証されたメッセージ配送を加えることはこのポストの範囲の外にあります.また、いくつかのユーザーのためのまれなメッセージ低下がライブストリームチャットの経験に影響しないという議論もなされるかもしれません.
    あなたがメッセージ信頼性のより高い程度を必要とするならば、それは他のメッセージブローカーをチェックアウトする価値がありますKafka , RabbitMQ あるいはRedis Streams .

    テスト


    後のポストにおいて、我々は我々のチャットサービスに接続する反応クライアントを開発しています、しかし、それまで、我々は要件が春フレームワークの中で統合テストを開発することによって満たされることを確認しています.
    私たちはPlaytika testcontainers-springboot ローカルの統合テストを実行しながらライブラリを簡単にredis Dockerコンテナをスピンします.これは、接続サーバーの機能をテストするように、私たちのPubsubメッセージブローカーとして使用することができます.
    要件を満たすために使用されるすべての統合テストが見つかりますhere .

    実装


    警告!これは情報ダンプのビットになるので、カバーする多くの領域があります.私は、詳細をクリアするために質問に答えることが常に幸せです.

    スプリングウエブフラックス


    我々は、接続サーバーを開発するための非ブロッキングスプリングWebfluxフレームワークを利用する予定です.これは、接続の数が多いのレイテンシを減らすのに役立ちます.Webfluxはまた、プロジェクトのリアクターの反応ストリームの実装を利用して、理由がわかりやすい非同期コードを書くことを可能にします.
    反応プログラミングの利点に関する情報についてはSpring or Project Reactor サイト.
    Spring Webfluxを使用するには、この依存関係が必要です.
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    
    可能性のあるリンクのセットアップガイドと反応性プログラミングの詳細情報

    PubSubサービス


    PubSub機能を一般的なメッセージクラスとインターフェイスでサービスに抽象化します.
    @Data  
    @AllArgsConstructor
    @NoArgsConstructor
    public class Message {
        String username;
        String message;
    }
    
    public interface PubSubService {
        Mono<Void> publish(Message message); 
        Flux<Message> subscribe(); 
    }
    

    REDISの設定


    REDISサーバとのインタフェースをするためには、Redis依存性が必要です.
    implementation 'org.springframework.boot:spring-boot-starter-data-redis'
    
    Redisはまた、そのリアクティブAPIを利用できるように設定する必要があります.必要な設定BeanをredispubSubconfigクラスに追加します.
    ReactiEventEditPlate Beanはメッセージを公開する必要があり、メッセージオブジェクトをJSONにシリアル化できるように構成されています.
    @Bean
    public ReactiveRedisTemplate<String, Message> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
    
            StringRedisSerializer keySerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer<Message> valueSerializer = 
                            new Jackson2JsonRedisSerializer<>(Message.class);
    
        RedisSerializationContext.RedisSerializationContextBuilder<String, Message> builder =
                RedisSerializationContext.newSerializationContext(keySerializer);
    
        RedisSerializationContext<String, Message> context =
                builder.value(valueSerializer).build();
    
        return new ReactiveRedisTemplate<>(factory, context);
    }
    
    メッセージチャネルを購読するためには、ReactiVelessMessageSchemaContainer Beanが必要です.
    @Bean
    ReactiveRedisMessageListenerContainer container(ReactiveRedisConnectionFactory factory) {
        return new ReactiveRedisMessageListenerContainer(factory);
    }
    

    レッドサブサービス


    これで、PubSubServiceインターフェイスを実装し、構成されているRedis Beanを注入することによってRedispuBSubサービスを作成できます.
    @Service
    public class RedisPubSubService implements PubSubService {
    
        private final ReactiveRedisTemplate<String, Message> reactiveTemplate;
        private final ReactiveRedisMessageListenerContainer reactiveMsgListenerContainer;
    
        private final ChannelTopic channelTopic = new ChannelTopic("broadcast"); // channel used to send and recieve messages
    
        public RedisPubSubService(ReactiveRedisTemplate<String, Message> reactiveTemplate,
                                  ReactiveRedisMessageListenerContainer reactiveMsgListenerContainer) {
            this.reactiveMsgListenerContainer = reactiveMsgListenerContainer;
            this.reactiveTemplate = reactiveTemplate;
        }
    
    発行インターフェイスメソッドはRedisテンプレートを使用してメッセージをトピック“ブロードキャスト”に送信します.空のモノラルは、メッセージが正常に送信されたシグナルに返されます.
    @Override
    public Mono<Void> publish(Message message) {
        return this.reactiveTemplate
                .convertAndSend(channelTopic.getTopic(), message)
                .then(Mono.empty());
    }
    
    チャネルへの登録は少し冗長であり、チャネルをiterableとして渡す必要があります.また、手動でRedisテンプレートからSerializationContextを渡し、着信メッセージを逆シリアル化する必要があります.
    @Override
    public Flux<Message> subscribe() {
        return reactiveMsgListenerContainer
                .receive(Collections.singletonList(channelTopic),
                        reactiveTemplate.getSerializationContext().getKeySerializationPair(),
                        reactiveTemplate.getSerializationContext().getValueSerializationPair())
                .map(ReactiveSubscription.Message::getMessage);
    }
    
    現在、接続サーバインスタンス間でメッセージをブロードキャストすることができるPubSubサービスがあります.
    このサービスの単体テストを見つけることができますhere .

    ソケット


    クライアントをチャットサービスに接続するには、WSocket接続の上にRSocketプロトコルを使用します.RSocketは、我々がネットワーク境界の上で反応性ストリーム標準を利用するのを許して、バックプレッシャーとルーティングのような若干の役に立つ特徴を持ちます.
    我々がクライアントが受け入れるメッセージの数を制限することができるメッセージの高い波があるならば、バックプレッシャーは特に我々のクライアントアプリケーションの役に立ちます.
    rsocketを利用するには、spring - boot - rsocket依存関係を含める必要があります.
    implementation 'org.springframework.boot:spring-boot-starter-rsocket'
    
    また、WSocketを使用してクライアントに接続するエンドポイントを設定するには、rsocketを設定する必要があります.
    spring.rsocket.server.transport=websocket
    spring.rsocket.server.mapping-path=/rs
    
    次に、クライアントからrsocketメッセージを処理するためのPubSubControllerを実装できます.このコントローラは以前に作成したPubSubServiceを注入します.
    @Controller
    @Slf4j
    public class PubSubController {
    
            private final PubSubService messagingService;
    
        public PubSubController(PubSubService messagingService) {
            this.messagingService = messagingService;
        }
    
    // .....
    
    rsocket springでは、2つの有用な注釈メソッドを使用できます.
  • @ConnectMapping -新しい接続を行うときに使用する.
  • @MessageMapping -特定のルートのメッセージを処理するために使用します.
  • 新しい接続が行われるか閉じられるとき、我々は@ connectmappingを利用するでしょう.
    @ConnectMapping
    void onConnect(RSocketRequester requester) {
        Objects.requireNonNull(requester.rsocket(), "rsocket connection should not be null")
                .onClose()
                .doOnError(error -> log.warn(requester.rsocketClient() + " Closed"))
                .doFinally(consumer -> log.info(requester.rsocketClient() + " Disconnected"))
                .subscribe();
    }
    
    @ messagemapping(「公開」)は、PubSubServiceにメッセージを送ります.
    @MessageMapping("publish")
    Mono<Void> publish(Message message) {
        return messagingService.publish(message);
    }
    
    @ messagemapping ("subscribe ")は、PubSubServiceからクライアントへのメッセージをストリームします.
    @MessageMapping("subscribe")
    Flux<Message> subscribe() {
        return messagingService.subscribe();
    }
    

    認証


    サードパーティサービスを利用しますAuth0 ユーザーのサインアップと認証.The JWT Author 0によって提供されたユーザアクセストークンもrsocketと互換性があります.これは、個々のメッセージングルートを確保することができます.
    スプリングブートを使用したAuthor 0を使用するチュートリアルhere .
    サインアップで入力されたユニークなユーザ名は、アクセストークンにエンコードされ、メッセージが送られるたびにデコードされます.これは、チャットサービスにユーザー名を格納する必要性を削除します.この符号化のために、アクセストークンは妥協する危険を減らすために短命であるべきです.
    デコードし、我々のアプリでJWTトークンを確認できるようにするには、この依存性が必要になります.
    implementation 'org.springframework.boot:spring-boot-starter-oauth2-resource-server'
    
    認証サーバーのエンドポイントを設定し、正しいAuth 0オーディエンスとユーザー名をエンコードするために使用するカスタムクレームのプロパティを追加できます.
    spring.security.oauth2.resourceserver.jwt.issuer-uri=https://{YOUR_AUTH0_DOMAIN}.us.auth0.com/
    auth0.audience={YOUR_AUTH0_API_AUDIENCE}
    auth0.username-claim={YOUR_AUTH0_USERNAME_CLAIM}
    

    セキュリティー


    rsocketを確保するには、RSocketのセキュリティ依存性が含まれます.
    implementation 'org.springframework.security:spring-security-rsocket'
    
    デコードされたJWTトークンをAuthenticationPrinciple 我々は、春のセキュリティメッセージングの依存関係が必要になります.
    implementation 'org.springframework.security:spring-security-messaging
    
    以前に追加されたAuth 0プロパティを注入することでRSocketSecurityConfigクラスを作成できます.@ EnableSocketSecurityと@ EnableActiveMethodSecurtiy注釈をアクティブなセキュリティサポートに追加することもできます.
    @Configuration
    @EnableRSocketSecurity
    @EnableReactiveMethodSecurity
    public class RSocketSecurityConfig {
    
        @Value("${auth0.audience}")
        String audience;
        @Value("${spring.security.oauth2.resourceserver.jwt.issuer-uri}")
        String issuer;
        @Value("${auth0.username-claim}")
        String usernameClaim;
    
    //......
    
    JWTトークンをデコードするにはReactiveJwtDecoder ビーン.カスタムOAuth2TokenValidators また、観衆とユーザー名主張がトークンに含まれていることを確認するために加えられます.
    @Bean
    public ReactiveJwtDecoder reactiveJwtDecoder() {
    
        var reactiveJwtDecoder = (NimbusReactiveJwtDecoder) ReactiveJwtDecoders.fromOidcIssuerLocation(issuer);
    
        OAuth2TokenValidator<Jwt> audienceValidator = (jwt) -> {
            OAuth2Error error = new OAuth2Error("invalid_token", "The required audience is missing", null);
            if (jwt.getAudience().contains(audience)) {
                return OAuth2TokenValidatorResult.success();
            }
            return OAuth2TokenValidatorResult.failure(error);
        };
    
        OAuth2TokenValidator<Jwt> usernameValidator = (jwt) -> {
            OAuth2Error error = new OAuth2Error("invalid_token", "The required username is missing", null);
            if (jwt.getClaimAsString(usernameClaim) != null) {
                return OAuth2TokenValidatorResult.success();
            }
            return OAuth2TokenValidatorResult.failure(error);
        };
    
        OAuth2TokenValidator<Jwt> withIssuer = JwtValidators.createDefaultWithIssuer(issuer);
        OAuth2TokenValidator<Jwt> compositeValidator = new DelegatingOAuth2TokenValidator<>(withIssuer, audienceValidator, usernameValidator);
    
        reactiveJwtDecoder.setJwtValidator(compositeValidator);
    
        return reactiveJwtDecoder;
    }
    
    権限のないユーザーがメッセージを送信するのを防ぐために、JWTトークンを必要とするためにPayloadSocketAcceptorInterceptor Beanを設定することで、「発行」ルートを確保することができます.
    @Bean
    public PayloadSocketAcceptorInterceptor rsocketInterceptor(RSocketSecurity rSocketSecurity) {
        return rSocketSecurity.authorizePayload(authorize ->
                        authorize.route("publish").authenticated() 
                                .anyExchange().permitAll()) // everything else is permitted
                .jwt(Customizer.withDefaults())
                .build();
    }
    
    RSocketMessageHandler Beanも、認証ソケットをRSocketハンドラーメソッドで解決できるように構成されています.
    @Bean
    RSocketMessageHandler messageHandler(RSocketStrategies strategies) {
        RSocketMessageHandler mh = new RSocketMessageHandler();
        mh.getArgumentResolverConfigurer().addCustomResolver(
                new AuthenticationPrincipalArgumentResolver());
        mh.setRouteMatcher(new PathPatternRouteMatcher());
        mh.setRSocketStrategies(strategies);
        return mh;
    }
    
    メッセージを処理するとき、我々は現在デコードされたユーザー名主張にアクセスすることができます.
    @Value("${auth0.username-claim}")
    String usernameClaim;
    
    //......
    
    @MessageMapping("publish")
    Mono<Void> publish(String message, @AuthenticationPrincipal Mono<Jwt> token) {
        return token.map(jwt -> jwt.getClaimAsString(usernameClaim))
                .flatMap(username -> messagingService.publish(new Message(username, message)));
    }
    

    結論


    これは実装の概要をまとめます.もう一度、私はこのポストについてのどんな質問または選ばれたテクノロジーの後の推論にでも必ず満足しています.
    無料のフルソースをチェックしてくださいGitHub .