Qualkusと角度10で最も速いウェブソケット


Qualkusは、両方のメッセージングとWebSockets本当に始めるとすぐに役立ちます.しかし、あなたが一緒に2つの技術と結婚するとき、何が起こりますか?
この節で使用されているソースコードは次のようにします.
https://github.com/cloudy-engineering/quarkus-chat-api
https://github.com/cloudy-engineering/angular-chat-ui
Qualkus WebSocketsについては、以下のように簡単です.
$ mvn io.quarkus:quarkus-maven-plugin:1.7.0.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=websockets-quickstart \
    -Dextensions="undertow-websockets"
$ cd websockets-quickstart
これは典型的なMavenベースのソースコード構造を作成します.
  • quarkus-undertow-websockets 依存性
  • quarkus-resteasy 依存性
  • あなたの最初の統合を取得し、実際には非常に簡単です.
  • WebSocketエンドポイントを表す新しいクラスを作成する
  • 標準のWebSocketライフサイクルメソッドを実装するonOpen , onError , onMessage , onClose )
  • WebSocketエンドポイントと統合するUIを作成します
  • 機能の実装


    インターフェイスを実装するか、基本クラスを拡張するのではなく、Qualkus WebSocketは注釈を使用してライフサイクルを実装します.
    @ServerEndpoint("/chat/{username}")
    public class SocketEndpoint {
    
        private final Logger log = LoggerFactory.getLogger(SocketEndpoint.class);
    
        @OnOpen
        public void onOpen(Session session, @PathParam("username") String username) {
            log.debug("{} has just connected", username);
        }
    
        @OnError
        public void onError(Session session, @PathParam("username") String username, Throwable throwable) {
            log.error("{} encountered an error", username);
        }
    
        @OnMessage
        public void onMessage(String message, @PathParam("username") String username) {
            log.debug("{} has just sent us a message: {}", username, message);
        }
    
        @OnClose
        public void onClose(Session session, @PathParam("username") String username) {
            log.debug("{} has now disconnected", username);
        }
    }
    
    サーバー側コンポーネントで覚えておくべきことはSession . これはエンドユーザーと通信する方法です.この記事のために、asyncremoteを使用して、オブジェクトをユーザに返します.
    @ServerEndpoint("/chat/{username}")
    @ApplicationScoped
    public class SocketEndpoint {
    
        @OnOpen
        public void onOpen(Session session, @PathParam("username") String username) {
            log.debug("{} has just connected", username);
            session.getAsyncRemote().sendText(String.format("Welcome to the show %s", username));
        }
    
    ...
    }
    
    フロントエンドを介して接続するとonOpen メソッドをインスタンス化します.ここでは、ユーザーの相互作用を設定し、任意のアクションを確認してメッセージを送信することができます.ここで返信を送ります.
    ログとcors
    我々が続ける前に、我々は我々が我々のデバッグメッセージを見ることができるようにログを構成するつもりです.
    src/main/resources/application.properties file以下のエントリを追加します.
    quarkus.log.category."com.brightfield.streams".level=ALL
    
    また、CORSを有効にする必要があります.
    quarkus.http.cors.enabled=true
    quarkus.http.cors.origins=http://localhost:4200
    quarkus.http.cors.methods=get,post,put,head,options
    
    そして、私は非常に多くのアプリケーションを実行しているので8080 ポートを交換するつもりです8011 :
    quarkus.http.port=8011
    
    テストするには単体テストを作りましょう.
    ServerdEndpointest.ジャバ
    package com.brightfield.streams;
    
    import io.quarkus.test.common.http.TestHTTPResource;
    import io.quarkus.test.junit.QuarkusTest;
    import org.junit.jupiter.api.Assertions;
    import org.junit.jupiter.api.Test;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.websocket.*;
    import java.net.URI;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.TimeUnit;
    
    @QuarkusTest
    public class SocketEndpointTest {
    
        private static final LinkedBlockingDeque<String> MESSAGES = new LinkedBlockingDeque<>();
    
        @TestHTTPResource("/chat/testuser")
        URI uri;
    
        @Test
        public void testWebSocketChat() throws Exception {
            try (Session session = ContainerProvider.getWebSocketContainer().connectToServer(Client.class, uri)) {
                Assertions.assertEquals("Connecting to central control...", MESSAGES.poll(10, TimeUnit.SECONDS));
                Assertions.assertEquals("Welcome to the show testuser", MESSAGES.poll(10, TimeUnit.SECONDS));
            }
        }
    
        @ClientEndpoint
        public static class Client {
            private final Logger log = LoggerFactory.getLogger(Client.class);
    
            @OnOpen
            public void open(final Session session) {
                log.debug("Connecting to server");
                String toSend = "Connecting to central control...";
                session.getAsyncRemote().sendText(toSend);
            }
    
            @OnMessage
            void message(final String message) {
                log.debug("Incoming message: {}", message);
                MESSAGES.add(message);
            }
        }
    }
    
    それで、我々はここで何をしましたか?
    まず、パイプラインを通過しているメッセージを格納するキューを設定します.我々がメッセージを送るか、クライアント側でそれを受け取るとき、我々は彼らが到着する命令を検証するためにメッセージを待ちたいです.
    この場合、最初のメッセージはメッセージを送るときに送られますClient.class 最初の接続:「ショーへの歓迎」
    クライアントが接続するとき、我々は最初のメッセージを送ります.これは、シーケンス内の2番目のメッセージになります.
    コードをコンパイルして実行するならば、以下のようにデバッグを行います.
    INFO  [io.und.web.jsr] (main) UT026004: Adding annotated client endpoint class com.brightfield.streams.SocketEndpointTest$Client
    INFO  [io.und.web.jsr] (main) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
    INFO  [io.quarkus] (main) Quarkus 1.7.2.Final on JVM started in 1.791s. Listening on: http://0.0.0.0:8081
    INFO  [io.quarkus] (main) Profile test activated.
    INFO  [io.quarkus] (main) Installed features: [cdi, resteasy, servlet, undertow-websockets]
    DEBUG [com.bri.str.SocketEndpointTest$Client] (main) Connecting to server
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just connected
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just sent us a message: Connecting to central control...
    DEBUG [com.bri.str.SocketEndpointTest$Client] (nioEventLoopGroup-2-1) Incoming message: Welcome to the show testuser
    
    イベントのシーケンスを考えると、
  • クライアントがサーバに接続する
  • サーバは歓迎メッセージを送ります
  • クライアントはサーバにメッセージを送ります
  • クライアントがメッセージを受信します.
  • 私たちのテストは、サーバー側の相互作用とクライアント側の相互作用を追跡します.

    UIの作成


    より良い絵を得るために角度10でUIを作成するのを見ましょう.
    あなたの角度のアプリケーションを作成して起動します:
    $ ng new chat-ui
    ? Would you like to add Angular routing? Yes
    ? Which stylesheet format would you like to use? CSS
    ... 
    Installing packages...
    ✔ Packages installed successfully.
        Successfully initialized git.
    
    次に、私たちは、反応性フォームがapp.module.ts :
    @NgModule({
    ...
      imports: [
        BrowserModule,
        AppRoutingModule,
        BrowserAnimationsModule,
        ReactiveFormsModule,
      ],
    
    });
    
    2つのクラスを作成します.
  • WebSocket状態を管理するサービス
  • 相互作用を表示するコンポーネント
  • $ ng g s _services/socket
    CREATE src/app/_services/socket.service.spec.ts (357 bytes)
    CREATE src/app/_services/socket.service.ts (135 bytes)
    $ ng g c chat
    CREATE src/app/chat/chat.component.css (0 bytes)
    CREATE src/app/chat/chat.component.html (19 bytes)
    CREATE src/app/chat/chat.component.spec.ts (612 bytes)
    CREATE src/app/chat/chat.component.ts (267 bytes)
    UPDATE src/app/app.module.ts (388 bytes)
    
    ベストプラクティスのために、まずQualkusアプリケーションエンドポイントの環境変数を設定しましょう.
    src/環境/環境.TS
    export const environment = {
      production: false,
      socket_endpoint: 'ws://localhost:8011'
    };
    

    サービスの実装


    サーバー側のコンポーネントに接続して相互作用するには、いくつかの組み込みのrxjsクラスを利用します.
    import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
    
    RXJSとWebSockets
    Rxjsは、それが実装するためにServerEndpointに接続するのと同じくらい簡単になります.
    The WebSocketSubject クライアントとサーバ間の通信の状態を表します.ちょっと同じようにBehaviorSubject 我々は、メッセージをプッシュし、上の応答を購読するつもりですWebSocketSubject .
    The webSocket クラスを作成するファクトリを表しますWebSocketSubject サーバへの接続.我々は、URLを我々のサービスに渡して、それを返しますWebSocketSubject 私たちのプッシュと購読する.
    実装する必要があるライフサイクルの3つの部分があります.
  • 接続する
  • クローズ/デストロイ
  • 送信する
  • チャット.サービスTS
    import { Injectable } from '@angular/core';
    import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
    import { Observable } from 'rxjs';
    import { environment as env } from '../../environments/environment';
    
    @Injectable({
      providedIn: 'root'
    })
    export class SocketService {
    
      connection$: WebSocketSubject<any>;
    
      constructor() { }
    
      connect(): Observable<any> {
        this.connection$ = webSocket({
          url: `${env.socket_endpoint}/chat/angularuser`,
          deserializer: ({data}) => data,
          serializer: ({data}) => data,
        });
        return this.connection$;
      }
    ...
    }
    
    RXJSとのWebSocket接続を作成する場合、デフォルトのシリアル化/逆シリアル化はJSON.parse . サーバー側のコンポーネントでプレーンテストを使用しているので、データを解析することなくserdeをオーバーライドします.
    後でconnect ()メソッドを呼び出し、メッセージを送受信できる初期接続を確認します.
    メッセージを送信するには、次のようなメッセージをキューに入れる必要があります.
    import { Injectable } from '@angular/core';
    import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
    import { Observable } from 'rxjs';
    import { environment as env } from '../../environments/environment';
    
    @Injectable({
      providedIn: 'root'
    })
    export class SocketService {
    
      connection$: WebSocketSubject<any>;
    
      constructor() { }
    
      connect(): Observable<any> {
        this.connection$ = webSocket(`${env.socket_endpoint}/angularuser`);
        return this.connection$;
      }
    
      send(data: any): void {
        if (this.connection$) {
          this.connection$.next(data);
        } else {
          console.log('Did not send data, unable to open connection');
        }
      }
    
    }
    
    我々の間connection$ パイプは開いていますnext() オブジェクトをサーバーに送信する方法.私たちが接続性を失ったならば、今のところ、ちょうどメッセージを記録します.
    つの最後のことは、我々はサーバーから切断する場合は、我々は接続を閉じるとバックエンドのイベントをトリガすることを確認したい@OnClose , では、Aを実装しましょうcloseConnection() メソッドを呼び出し、onDestroy() イベント
    チャット.サービスTS
    import { Injectable } from '@angular/core';
    import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
    import { Observable } from 'rxjs';
    import { environment as env } from '../../environments/environment';
    
    @Injectable({
      providedIn: 'root'
    })
    export class SocketService {
    
      connection$: WebSocketSubject<any>;
    
      constructor() { }
    
      connect(): Observable<any> {
        this.connection$ = webSocket(`${env.socket_endpoint}/angularuser`);
        return this.connection$;
      }
    
      send(data: any): void {
        if (this.connection$) {
          this.connection$.next(data);
        } else {
          console.log('Did not send data, unable to open connection');
        }
      }
    
      closeConnection(): void {
        if (this.connection$) {
          this.connection$.complete();
          this.connection$= null;
        }
      }
    
      ngOnDestroy() {
        this.closeConnection();
      }
    
    }
    

    角コンポーネントの作成


    ご覧のように、標準的な観測可能なパターンを使用する、非常に直接的な実装があります.このサービスを使用するには、接続を開始し、WebSocket接続を介してデータを送信するコンポーネントを作成する必要があります.
    import { Component, OnInit } from '@angular/core';
    import { SocketService } from '../_services/socket.service';
    import { Subject } from 'rxjs';
    import { takeUntil } from 'rxjs/operators';
    import { FormControl } from '@angular/forms';
    
    @Component({
      selector: 'app-chat',
      templateUrl: './chat.component.html',
      styleUrls: ['./chat.component.css']
    })
    export class ChatComponent implements OnInit {
    
      messages: string[] = [];
      msgControl = new FormControl('');
      destroyed$ = new Subject();
    
      constructor(private chatService: SocketService) { }
    
      ngOnInit(): void {
        const chatSub$ = this.chatService.connect().pipe(
          takeUntil(this.destroyed$),
        );
    
        chatSub$.subscribe(message => this.messages.push(message));
      }
    
      sendMessage(): void {
        this.chatService.send(this.msgControl.value);
        this.msgControl.setValue('');
      }
    
      ngOnDestroy(): void {
        this.destroyed$.next();
      }
    
    }
    
    チャット.コンポーネント.HTML
    <ul>
      <li *ngFor="let message of messages">{{ message }}</li>
    </ul>
    <input placeholder="Send a message..." [formControl]="msgControl">
    <button (click)="sendMessage()">Send</button>
    
    また、新しいコンポーネントのルートをすばやく追加しましょう.
    アプリケーションルーティング.モジュールです.TS
    import { NgModule } from '@angular/core';
    import { Routes, RouterModule } from '@angular/router';
    import { ChatComponent } from './chat/chat.component';
    
    const routes: Routes = [
      { path: 'chat', component: ChatComponent }
    ];
    
    @NgModule({
      imports: [RouterModule.forRoot(routes)],
      exports: [RouterModule]
    })
    export class AppRoutingModule { }
    
    私たちのコンポーネントで見ることができるように、私たちはSocketService WebSocket接続のライフサイクルを実行します.ユーザーインターフェイスは、戻ってくるメッセージのリストを持つシンプルなフォームコントロールです.
    サービスと角度のユーザーインターフェイスを起動し、あなたはhttp://localhost:4200/chat

    あなたがページにアクセスするとき、あなたは我々の最初のメッセージ「ショーAngularuserへの歓迎」と入力ボックスを見るべきです.
    ログをチェックするなら、最初の接続が行われるのを見るべきです.
    __  ____  __  _____   ___  __ ____  ______
     --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
     -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
    --\___\_\____/_/ |_/_/|_/_/|_|\____/___/
    WARN  [io.qua.kub.dep.KubernetesProcessor] (build-15) No registry was set for the container image, so 'ImagePullPolicy' is being force-set to 'IfNotPresent'.
    INFO  [io.und.web.jsr] (Quarkus Main Thread) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
    INFO  [io.quarkus] (Quarkus Main Thread) chat-service 1.0-SNAPSHOT on JVM (powered by Quarkus 1.7.2.Final) started in 3.055s. Listening on: http://0.0.0.0:8011
    INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
    INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kubernetes, resteasy, servlet, undertow-websockets]
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just connected
    
    メッセージを入力して送信すると、サーバー側でログメッセージが表示されます.
    __  ____  __  _____   ___  __ ____  ______
     --/ __ \/ / / / _ | / _ \/ //_/ / / / __/
     -/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
    --\___\_\____/_/ |_/_/|_/_/|_|\____/___/
    WARN  [io.qua.kub.dep.KubernetesProcessor] (build-15) No registry was set for the container image, so 'ImagePullPolicy' is being force-set to 'IfNotPresent'.
    INFO  [io.und.web.jsr] (Quarkus Main Thread) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
    INFO  [io.quarkus] (Quarkus Main Thread) chat-service 1.0-SNAPSHOT on JVM (powered by Quarkus 1.7.2.Final) started in 3.055s. Listening on: http://0.0.0.0:8011
    INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
    INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kubernetes, resteasy, servlet, undertow-websockets]
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just connected
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just sent us a message: "Good morning"
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-8) angularuser has just connected
    
    これまではとても良いですが、我々はこれがよりインタラクティブにしたい.この記事のために、ユーザーが送るものを反響しましょう.
    @ServerEndpoint("/chat/{username}")
    public class SocketEndpoint {
    
        private final Logger log = LoggerFactory.getLogger(SocketEndpoint.class);
        private Map<String, Session> socketSessions = new HashMap<>();
    
        @OnOpen
        public void onOpen(Session session, @PathParam("username") String username) {
            log.debug("{} has just connected", username);
            session.getAsyncRemote().sendText(String.format("Welcome to the show %s", username));
            socketSessions.put(username, session);
        }
    
        @OnError
        public void onError(Session session, @PathParam("username") String username, Throwable throwable) {
            log.error("{} encountered an error", username);
        }
    
        @OnMessage
        public void onMessage(String message, @PathParam("username") String username) {
            log.debug("{} has just sent us a message: {}", username, message);
            Session session = socketSessions.get(username);
            session.getAsyncRemote().sendText(message);
        }
    
        public void onClose(Session session, @PathParam("username") String username) {
            log.debug("{} has now disconnected", username);
        }
    }
    
    コードへのアップデートでは、ユーザーが接続すると、我々はSession インHashMap ユーザ名にインデックスを付けます.メッセージが入ると、セッションを検索し、メッセージを送信します.

    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just connected
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just sent us a message: "Glad to be here"
    DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just sent us a message: "What day is it?"
    
    次の記事では、KafkaキューにKafkaをWebSocketセッションとブロードキャストメッセージに配線する方法を説明します.