Qualkusと角度10で最も速いウェブソケット
46335 ワード
Qualkusは、両方のメッセージングとWebSockets本当に始めるとすぐに役立ちます.しかし、あなたが一緒に2つの技術と結婚するとき、何が起こりますか?
この節で使用されているソースコードは次のようにします.
https://github.com/cloudy-engineering/quarkus-chat-api
https://github.com/cloudy-engineering/angular-chat-ui
Qualkus WebSocketsについては、以下のように簡単です.
あなたの最初の統合を取得し、実際には非常に簡単です. WebSocketエンドポイントを表す新しいクラスを作成する 標準のWebSocketライフサイクルメソッドを実装する WebSocketエンドポイントと統合するUIを作成します 機能の実装
クライアントがサーバに接続する サーバは歓迎メッセージを送ります クライアントはサーバにメッセージを送ります クライアントがメッセージを受信します. 私たちのテストは、サーバー側の相互作用とクライアント側の相互作用を追跡します.
UIの作成
WebSocket状態を管理するサービス 相互作用を表示するコンポーネント
src/環境/環境.TS
サーバー側のコンポーネントに接続して相互作用するには、いくつかの組み込みのrxjsクラスを利用します.
Rxjsは、それが実装するためにServerEndpointに接続するのと同じくらい簡単になります.
The
The
実装する必要があるライフサイクルの3つの部分があります. 接続する クローズ/デストロイ 送信する チャット.サービスTS
後でconnect ()メソッドを呼び出し、メッセージを送受信できる初期接続を確認します.
メッセージを送信するには、次のようなメッセージをキューに入れる必要があります.
つの最後のことは、我々はサーバーから切断する場合は、我々は接続を閉じるとバックエンドのイベントをトリガすることを確認したい
チャット.サービスTS
ご覧のように、標準的な観測可能なパターンを使用する、非常に直接的な実装があります.このサービスを使用するには、接続を開始し、WebSocket接続を介してデータを送信するコンポーネントを作成する必要があります.
アプリケーションルーティング.モジュールです.TS
サービスと角度のユーザーインターフェイスを起動し、あなたはhttp://localhost:4200/chat
あなたがページにアクセスするとき、あなたは我々の最初のメッセージ「ショーAngularuserへの歓迎」と入力ボックスを見るべきです.
ログをチェックするなら、最初の接続が行われるのを見るべきです.
この節で使用されているソースコードは次のようにします.
https://github.com/cloudy-engineering/quarkus-chat-api
https://github.com/cloudy-engineering/angular-chat-ui
Qualkus WebSocketsについては、以下のように簡単です.
$ mvn io.quarkus:quarkus-maven-plugin:1.7.0.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=websockets-quickstart \
-Dextensions="undertow-websockets"
$ cd websockets-quickstart
これは典型的なMavenベースのソースコード構造を作成します.quarkus-undertow-websockets
依存性quarkus-resteasy
依存性onOpen
, onError
, onMessage
, onClose
) 機能の実装
インターフェイスを実装するか、基本クラスを拡張するのではなく、Qualkus WebSocketは注釈を使用してライフサイクルを実装します.
@ServerEndpoint("/chat/{username}")
public class SocketEndpoint {
private final Logger log = LoggerFactory.getLogger(SocketEndpoint.class);
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
log.debug("{} has just connected", username);
}
@OnError
public void onError(Session session, @PathParam("username") String username, Throwable throwable) {
log.error("{} encountered an error", username);
}
@OnMessage
public void onMessage(String message, @PathParam("username") String username) {
log.debug("{} has just sent us a message: {}", username, message);
}
@OnClose
public void onClose(Session session, @PathParam("username") String username) {
log.debug("{} has now disconnected", username);
}
}
サーバー側コンポーネントで覚えておくべきことはSession
. これはエンドユーザーと通信する方法です.この記事のために、asyncremoteを使用して、オブジェクトをユーザに返します.
@ServerEndpoint("/chat/{username}")
@ApplicationScoped
public class SocketEndpoint {
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
log.debug("{} has just connected", username);
session.getAsyncRemote().sendText(String.format("Welcome to the show %s", username));
}
...
}
フロントエンドを介して接続するとonOpen
メソッドをインスタンス化します.ここでは、ユーザーの相互作用を設定し、任意のアクションを確認してメッセージを送信することができます.ここで返信を送ります.
ログとcors
我々が続ける前に、我々は我々が我々のデバッグメッセージを見ることができるようにログを構成するつもりです.
にsrc/main/resources/application.properties
file以下のエントリを追加します.
quarkus.log.category."com.brightfield.streams".level=ALL
また、CORSを有効にする必要があります.
quarkus.http.cors.enabled=true
quarkus.http.cors.origins=http://localhost:4200
quarkus.http.cors.methods=get,post,put,head,options
そして、私は非常に多くのアプリケーションを実行しているので8080
ポートを交換するつもりです8011
:
quarkus.http.port=8011
テストするには単体テストを作りましょう.
ServerdEndpointest.ジャバ
package com.brightfield.streams;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.websocket.*;
import java.net.URI;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
@QuarkusTest
public class SocketEndpointTest {
private static final LinkedBlockingDeque<String> MESSAGES = new LinkedBlockingDeque<>();
@TestHTTPResource("/chat/testuser")
URI uri;
@Test
public void testWebSocketChat() throws Exception {
try (Session session = ContainerProvider.getWebSocketContainer().connectToServer(Client.class, uri)) {
Assertions.assertEquals("Connecting to central control...", MESSAGES.poll(10, TimeUnit.SECONDS));
Assertions.assertEquals("Welcome to the show testuser", MESSAGES.poll(10, TimeUnit.SECONDS));
}
}
@ClientEndpoint
public static class Client {
private final Logger log = LoggerFactory.getLogger(Client.class);
@OnOpen
public void open(final Session session) {
log.debug("Connecting to server");
String toSend = "Connecting to central control...";
session.getAsyncRemote().sendText(toSend);
}
@OnMessage
void message(final String message) {
log.debug("Incoming message: {}", message);
MESSAGES.add(message);
}
}
}
それで、我々はここで何をしましたか?
まず、パイプラインを通過しているメッセージを格納するキューを設定します.我々がメッセージを送るか、クライアント側でそれを受け取るとき、我々は彼らが到着する命令を検証するためにメッセージを待ちたいです.
この場合、最初のメッセージはメッセージを送るときに送られますClient.class
最初の接続:「ショーへの歓迎」
クライアントが接続するとき、我々は最初のメッセージを送ります.これは、シーケンス内の2番目のメッセージになります.
コードをコンパイルして実行するならば、以下のようにデバッグを行います.
INFO [io.und.web.jsr] (main) UT026004: Adding annotated client endpoint class com.brightfield.streams.SocketEndpointTest$Client
INFO [io.und.web.jsr] (main) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
INFO [io.quarkus] (main) Quarkus 1.7.2.Final on JVM started in 1.791s. Listening on: http://0.0.0.0:8081
INFO [io.quarkus] (main) Profile test activated.
INFO [io.quarkus] (main) Installed features: [cdi, resteasy, servlet, undertow-websockets]
DEBUG [com.bri.str.SocketEndpointTest$Client] (main) Connecting to server
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just connected
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just sent us a message: Connecting to central control...
DEBUG [com.bri.str.SocketEndpointTest$Client] (nioEventLoopGroup-2-1) Incoming message: Welcome to the show testuser
イベントのシーケンスを考えると、
@ServerEndpoint("/chat/{username}")
public class SocketEndpoint {
private final Logger log = LoggerFactory.getLogger(SocketEndpoint.class);
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
log.debug("{} has just connected", username);
}
@OnError
public void onError(Session session, @PathParam("username") String username, Throwable throwable) {
log.error("{} encountered an error", username);
}
@OnMessage
public void onMessage(String message, @PathParam("username") String username) {
log.debug("{} has just sent us a message: {}", username, message);
}
@OnClose
public void onClose(Session session, @PathParam("username") String username) {
log.debug("{} has now disconnected", username);
}
}
@ServerEndpoint("/chat/{username}")
@ApplicationScoped
public class SocketEndpoint {
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
log.debug("{} has just connected", username);
session.getAsyncRemote().sendText(String.format("Welcome to the show %s", username));
}
...
}
quarkus.log.category."com.brightfield.streams".level=ALL
quarkus.http.cors.enabled=true
quarkus.http.cors.origins=http://localhost:4200
quarkus.http.cors.methods=get,post,put,head,options
quarkus.http.port=8011
package com.brightfield.streams;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.websocket.*;
import java.net.URI;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
@QuarkusTest
public class SocketEndpointTest {
private static final LinkedBlockingDeque<String> MESSAGES = new LinkedBlockingDeque<>();
@TestHTTPResource("/chat/testuser")
URI uri;
@Test
public void testWebSocketChat() throws Exception {
try (Session session = ContainerProvider.getWebSocketContainer().connectToServer(Client.class, uri)) {
Assertions.assertEquals("Connecting to central control...", MESSAGES.poll(10, TimeUnit.SECONDS));
Assertions.assertEquals("Welcome to the show testuser", MESSAGES.poll(10, TimeUnit.SECONDS));
}
}
@ClientEndpoint
public static class Client {
private final Logger log = LoggerFactory.getLogger(Client.class);
@OnOpen
public void open(final Session session) {
log.debug("Connecting to server");
String toSend = "Connecting to central control...";
session.getAsyncRemote().sendText(toSend);
}
@OnMessage
void message(final String message) {
log.debug("Incoming message: {}", message);
MESSAGES.add(message);
}
}
}
INFO [io.und.web.jsr] (main) UT026004: Adding annotated client endpoint class com.brightfield.streams.SocketEndpointTest$Client
INFO [io.und.web.jsr] (main) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
INFO [io.quarkus] (main) Quarkus 1.7.2.Final on JVM started in 1.791s. Listening on: http://0.0.0.0:8081
INFO [io.quarkus] (main) Profile test activated.
INFO [io.quarkus] (main) Installed features: [cdi, resteasy, servlet, undertow-websockets]
DEBUG [com.bri.str.SocketEndpointTest$Client] (main) Connecting to server
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just connected
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-2) testuser has just sent us a message: Connecting to central control...
DEBUG [com.bri.str.SocketEndpointTest$Client] (nioEventLoopGroup-2-1) Incoming message: Welcome to the show testuser
UIの作成
より良い絵を得るために角度10でUIを作成するのを見ましょう.
あなたの角度のアプリケーションを作成して起動します:
$ ng new chat-ui
? Would you like to add Angular routing? Yes
? Which stylesheet format would you like to use? CSS
...
Installing packages...
✔ Packages installed successfully.
Successfully initialized git.
次に、私たちは、反応性フォームがapp.module.ts
:
@NgModule({
...
imports: [
BrowserModule,
AppRoutingModule,
BrowserAnimationsModule,
ReactiveFormsModule,
],
});
2つのクラスを作成します.
$ ng new chat-ui
? Would you like to add Angular routing? Yes
? Which stylesheet format would you like to use? CSS
...
Installing packages...
✔ Packages installed successfully.
Successfully initialized git.
@NgModule({
...
imports: [
BrowserModule,
AppRoutingModule,
BrowserAnimationsModule,
ReactiveFormsModule,
],
});
$ ng g s _services/socket
CREATE src/app/_services/socket.service.spec.ts (357 bytes)
CREATE src/app/_services/socket.service.ts (135 bytes)
$ ng g c chat
CREATE src/app/chat/chat.component.css (0 bytes)
CREATE src/app/chat/chat.component.html (19 bytes)
CREATE src/app/chat/chat.component.spec.ts (612 bytes)
CREATE src/app/chat/chat.component.ts (267 bytes)
UPDATE src/app/app.module.ts (388 bytes)
ベストプラクティスのために、まずQualkusアプリケーションエンドポイントの環境変数を設定しましょう.src/環境/環境.TS
export const environment = {
production: false,
socket_endpoint: 'ws://localhost:8011'
};
サービスの実装
サーバー側のコンポーネントに接続して相互作用するには、いくつかの組み込みのrxjsクラスを利用します.
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
RXJSとWebSocketsRxjsは、それが実装するためにServerEndpointに接続するのと同じくらい簡単になります.
The
WebSocketSubject
クライアントとサーバ間の通信の状態を表します.ちょっと同じようにBehaviorSubject
我々は、メッセージをプッシュし、上の応答を購読するつもりですWebSocketSubject
.The
webSocket
クラスを作成するファクトリを表しますWebSocketSubject
サーバへの接続.我々は、URLを我々のサービスに渡して、それを返しますWebSocketSubject
私たちのプッシュと購読する.実装する必要があるライフサイクルの3つの部分があります.
import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Observable } from 'rxjs';
import { environment as env } from '../../environments/environment';
@Injectable({
providedIn: 'root'
})
export class SocketService {
connection$: WebSocketSubject<any>;
constructor() { }
connect(): Observable<any> {
this.connection$ = webSocket({
url: `${env.socket_endpoint}/chat/angularuser`,
deserializer: ({data}) => data,
serializer: ({data}) => data,
});
return this.connection$;
}
...
}
RXJSとのWebSocket接続を作成する場合、デフォルトのシリアル化/逆シリアル化はJSON.parse
. サーバー側のコンポーネントでプレーンテストを使用しているので、データを解析することなくserdeをオーバーライドします.後でconnect ()メソッドを呼び出し、メッセージを送受信できる初期接続を確認します.
メッセージを送信するには、次のようなメッセージをキューに入れる必要があります.
import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Observable } from 'rxjs';
import { environment as env } from '../../environments/environment';
@Injectable({
providedIn: 'root'
})
export class SocketService {
connection$: WebSocketSubject<any>;
constructor() { }
connect(): Observable<any> {
this.connection$ = webSocket(`${env.socket_endpoint}/angularuser`);
return this.connection$;
}
send(data: any): void {
if (this.connection$) {
this.connection$.next(data);
} else {
console.log('Did not send data, unable to open connection');
}
}
}
我々の間connection$
パイプは開いていますnext()
オブジェクトをサーバーに送信する方法.私たちが接続性を失ったならば、今のところ、ちょうどメッセージを記録します.つの最後のことは、我々はサーバーから切断する場合は、我々は接続を閉じるとバックエンドのイベントをトリガすることを確認したい
@OnClose
, では、Aを実装しましょうcloseConnection()
メソッドを呼び出し、onDestroy()
イベントチャット.サービスTS
import { Injectable } from '@angular/core';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { Observable } from 'rxjs';
import { environment as env } from '../../environments/environment';
@Injectable({
providedIn: 'root'
})
export class SocketService {
connection$: WebSocketSubject<any>;
constructor() { }
connect(): Observable<any> {
this.connection$ = webSocket(`${env.socket_endpoint}/angularuser`);
return this.connection$;
}
send(data: any): void {
if (this.connection$) {
this.connection$.next(data);
} else {
console.log('Did not send data, unable to open connection');
}
}
closeConnection(): void {
if (this.connection$) {
this.connection$.complete();
this.connection$= null;
}
}
ngOnDestroy() {
this.closeConnection();
}
}
角コンポーネントの作成
ご覧のように、標準的な観測可能なパターンを使用する、非常に直接的な実装があります.このサービスを使用するには、接続を開始し、WebSocket接続を介してデータを送信するコンポーネントを作成する必要があります.
import { Component, OnInit } from '@angular/core';
import { SocketService } from '../_services/socket.service';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { FormControl } from '@angular/forms';
@Component({
selector: 'app-chat',
templateUrl: './chat.component.html',
styleUrls: ['./chat.component.css']
})
export class ChatComponent implements OnInit {
messages: string[] = [];
msgControl = new FormControl('');
destroyed$ = new Subject();
constructor(private chatService: SocketService) { }
ngOnInit(): void {
const chatSub$ = this.chatService.connect().pipe(
takeUntil(this.destroyed$),
);
chatSub$.subscribe(message => this.messages.push(message));
}
sendMessage(): void {
this.chatService.send(this.msgControl.value);
this.msgControl.setValue('');
}
ngOnDestroy(): void {
this.destroyed$.next();
}
}
チャット.コンポーネント.HTML<ul>
<li *ngFor="let message of messages">{{ message }}</li>
</ul>
<input placeholder="Send a message..." [formControl]="msgControl">
<button (click)="sendMessage()">Send</button>
また、新しいコンポーネントのルートをすばやく追加しましょう.アプリケーションルーティング.モジュールです.TS
import { NgModule } from '@angular/core';
import { Routes, RouterModule } from '@angular/router';
import { ChatComponent } from './chat/chat.component';
const routes: Routes = [
{ path: 'chat', component: ChatComponent }
];
@NgModule({
imports: [RouterModule.forRoot(routes)],
exports: [RouterModule]
})
export class AppRoutingModule { }
私たちのコンポーネントで見ることができるように、私たちはSocketService
WebSocket接続のライフサイクルを実行します.ユーザーインターフェイスは、戻ってくるメッセージのリストを持つシンプルなフォームコントロールです.サービスと角度のユーザーインターフェイスを起動し、あなたはhttp://localhost:4200/chat
あなたがページにアクセスするとき、あなたは我々の最初のメッセージ「ショーAngularuserへの歓迎」と入力ボックスを見るべきです.
ログをチェックするなら、最初の接続が行われるのを見るべきです.
__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
WARN [io.qua.kub.dep.KubernetesProcessor] (build-15) No registry was set for the container image, so 'ImagePullPolicy' is being force-set to 'IfNotPresent'.
INFO [io.und.web.jsr] (Quarkus Main Thread) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
INFO [io.quarkus] (Quarkus Main Thread) chat-service 1.0-SNAPSHOT on JVM (powered by Quarkus 1.7.2.Final) started in 3.055s. Listening on: http://0.0.0.0:8011
INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kubernetes, resteasy, servlet, undertow-websockets]
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just connected
メッセージを入力して送信すると、サーバー側でログメッセージが表示されます.__ ____ __ _____ ___ __ ____ ______
--/ __ \/ / / / _ | / _ \/ //_/ / / / __/
-/ /_/ / /_/ / __ |/ , _/ ,< / /_/ /\ \
--\___\_\____/_/ |_/_/|_/_/|_|\____/___/
WARN [io.qua.kub.dep.KubernetesProcessor] (build-15) No registry was set for the container image, so 'ImagePullPolicy' is being force-set to 'IfNotPresent'.
INFO [io.und.web.jsr] (Quarkus Main Thread) UT026003: Adding annotated server endpoint class com.brightfield.streams.SocketEndpoint for path /chat/{username}
INFO [io.quarkus] (Quarkus Main Thread) chat-service 1.0-SNAPSHOT on JVM (powered by Quarkus 1.7.2.Final) started in 3.055s. Listening on: http://0.0.0.0:8011
INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kubernetes, resteasy, servlet, undertow-websockets]
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just connected
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-18) angularuser has just sent us a message: "Good morning"
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-8) angularuser has just connected
これまではとても良いですが、我々はこれがよりインタラクティブにしたい.この記事のために、ユーザーが送るものを反響しましょう.@ServerEndpoint("/chat/{username}")
public class SocketEndpoint {
private final Logger log = LoggerFactory.getLogger(SocketEndpoint.class);
private Map<String, Session> socketSessions = new HashMap<>();
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
log.debug("{} has just connected", username);
session.getAsyncRemote().sendText(String.format("Welcome to the show %s", username));
socketSessions.put(username, session);
}
@OnError
public void onError(Session session, @PathParam("username") String username, Throwable throwable) {
log.error("{} encountered an error", username);
}
@OnMessage
public void onMessage(String message, @PathParam("username") String username) {
log.debug("{} has just sent us a message: {}", username, message);
Session session = socketSessions.get(username);
session.getAsyncRemote().sendText(message);
}
public void onClose(Session session, @PathParam("username") String username) {
log.debug("{} has now disconnected", username);
}
}
コードへのアップデートでは、ユーザーが接続すると、我々はSession
インHashMap
ユーザ名にインデックスを付けます.メッセージが入ると、セッションを検索し、メッセージを送信します.DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just connected
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just sent us a message: "Glad to be here"
DEBUG [com.bri.str.SocketEndpoint] (vert.x-eventloop-thread-12) angularuser has just sent us a message: "What day is it?"
次の記事では、KafkaキューにKafkaをWebSocketセッションとブロードキャストメッセージに配線する方法を説明します.Reference
この問題について(Qualkusと角度10で最も速いウェブソケット), 我々は、より多くの情報をここで見つけました https://dev.to/anthonyikeda/quickest-websockets-with-quarkus-and-angular-10-1e2cテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol