Akkaガイドの「クラスタ感知ルータ」
お知らせ:Akka中国語ガイドのGitHubアドレスは「akka-guide」です.
クラスタ感知ルータ
すべての「routers」は、クラスタ内のメンバーノード、すなわち新しいルーティング(
クラスタセンシングルーティング(
2つの異なるタイプのルータがあります.
に頼る
クラスタセンシングルータ(
グループルータ注意:メンバーステータスが
同じタイプのルータもコードで定義できます.
設定の詳細については、「構成参照」を参照してください.
ルーティンググループ付きルータの例
クラスタセンシングルータをルーティングのセット(すなわち、ルータパスに送信されるルーティング)と一緒に使用する方法を見てみましょう.
サンプル・アプリケーションは、テキスト統計を計算するサービスを提供します.一部のテキストがサービスに送信されると、それを単語に分割し、各単語の文字数を計算するために、タスクを個別のワークプロセス(ルータのルーティング)に割り当てます.各ワードの文字数は、すべての結果を収集するときに各ワードの平均文字数を計算する集約器(
メッセージ:
各ワードの文字数を計算する作業者(
ユーザからテキストを受信し、それを単語に分割し、
なお,これまで特定のクラスタはなく,通常のActorであった.
すべてのノードは、
これは、ユーザ要求が任意のノード上の
最も簡単なルータの例を実行する方法は、ルーティンググループを使用してルータの例を実行する方法についての説明を含む「Akka Cluster Sample with Java」をダウンロードすることです.この例のソースコードは、「Akka Samples Repository」にもあります.
リモート配置ルーティングプール付きルータ
ルーティング(
同じタイプのルータもコードで定義できます.
設定の詳細については、「構成参照」を参照してください.
リモート配置ルーティングプールを備えたルータの例
また、各ノードにActorがあり、現在の単一のプライマリノードの位置を追跡し、
すべてのノードは、
最も簡単なリモート配置ルーティングプールを持つルータの例を実行する方法は、リモート配置ルーティングプールを使用してルータを実行する方法の例について説明する「Akka Cluster Sample with Java」をダウンロードすることです.この例のソースコードは、「Akka Samples Repository」にもあります.
英文原文リンク:Closter Aware Routers.
——————☆☆――戻る->Akka中国語ガイド
Star
、Fork
、誤り訂正を歓迎します.クラスタ感知ルータ
すべての「routers」は、クラスタ内のメンバーノード、すなわち新しいルーティング(
routees
)を配備するか、クラスタ内のノード上でルーティングを検索するかを知ることができる.ノードがクラスタにアクセスまたは離れることができない場合、ノードのルーティングは自動的にルータからログアウトされます.新しいノードがクラスタに追加されると、構成に従ってルータに追加のルーティングが追加されます.1つのノードがアクセス不可になった後に再びアクセス可能になった場合、ルーティングも追加されます.クラスタセンシングルーティング(
Cluster aware routers
)は、WeaklyUp
ステータスのメンバー(この機能が有効である場合)を使用することができる.2つの異なるタイプのルータがあります.
Group
、Actor selection
を使用して指定されたパスのルータにメッセージを送信します.ルーティングは、クラスタ内の異なるノードで実行できるルータ間で共有できます.このタイプのルータの一例は、クラスタ内のいくつかのバックエンドノード上で実行されるサービスであり、クラスタ内のフロントエンドノード上で実行されるルータによって使用され得る.Pool
は、ルーティングをサブActorとして作成し、各ルータに独自のルーティングインスタンスがあるリモートノードに配備します.たとえば、10ノードクラスタ内の3つのノードでルータを起動する場合、ルータが各ノードに1つのインスタンスを使用するように構成されている場合、合計30のルーティングがあります.異なるルータで作成されたルーティングは、ルータ間で共有されません.このタイプのルータの一例は、ジョブを調整し、クラスタ内の他のノードで実行されるルーティングに実際の動作を委任する個別のmaster
である.に頼る
クラスタセンシングルータ(
Cluster Aware Routers
)を使用するには、プロジェクトに次の依存度を追加する必要があります.
<dependency>
<groupId>com.typesafe.akkagroupId>
<artifactId>akka-cluster_2.12artifactId>
<version>2.5.21version>
dependency>
dependencies {
compile group: 'com.typesafe.akka', name: 'akka-cluster_2.12', version: '2.5.21'
}
libraryDependencies += "com.typesafe.akka" %% "akka-cluster" % "2.5.21"
グループルータ
Group
を使用する場合は、クラスタメンバーノード上でルーティングActorを起動する必要があります.これはルータによって行われたものではありません.グループの構成は次のとおりです.akka.actor.deployment {
/statsService/workerRouter {
router = consistent-hashing-group
routees.paths = ["/user/statsWorker"]
cluster {
enabled = on
allow-local-routees = on
use-roles = ["compute"]
}
}
}
Up
に変更されると、ルータがそれらを使用しようとするため、Actorシステムが起動されると、ルーティングActorはできるだけ早く起動する必要があります.routees.paths
で定義されたActorパスは、ルータによってメッセージが転送されるActorを選択するために使用される.パスは、クラスタメンバー(membership
)から動的に取得されるため、プロトコルおよびアドレス情報を含むべきではない.メッセージは「ActorSelection」を使用してルーティングに転送されるので、同じ伝達意味を使用する必要があります.use-roles
を指定することにより、ルーティングの検索を特定のロールセットがマークされたメンバーノードに制限することができる.max-total-nr-of-instances
は、クラスタ内のルーティングの合計数を定義します.デフォルトでは、max-total-nr-of-instances
は高い値(10000
)に設定されており、ノードがクラスタに追加されると、新しいルーティングがルータに追加されます.ルーティングの合計数を制限する場合は、その合計数を低い値に設定します.同じタイプのルータもコードで定義できます.
int totalInstances = 100;
Iterable<String> routeesPaths = Collections.singletonList("/user/statsWorker");
boolean allowLocalRoutees = true;
Set<String> useRoles = new HashSet<>(Arrays.asList("compute"));
ActorRef workerRouter =
getContext()
.actorOf(
new ClusterRouterGroup(
new ConsistentHashingGroup(routeesPaths),
new ClusterRouterGroupSettings(
totalInstances, routeesPaths, allowLocalRoutees, useRoles))
.props(),
"workerRouter2");
設定の詳細については、「構成参照」を参照してください.
ルーティンググループ付きルータの例
クラスタセンシングルータをルーティングのセット(すなわち、ルータパスに送信されるルーティング)と一緒に使用する方法を見てみましょう.
サンプル・アプリケーションは、テキスト統計を計算するサービスを提供します.一部のテキストがサービスに送信されると、それを単語に分割し、各単語の文字数を計算するために、タスクを個別のワークプロセス(ルータのルーティング)に割り当てます.各ワードの文字数は、すべての結果を収集するときに各ワードの平均文字数を計算する集約器(
aggregator
)に送られる.メッセージ:
public interface StatsMessages {
public static class StatsJob implements Serializable {
private final String text;
public StatsJob(String text) {
this.text = text;
}
public String getText() {
return text;
}
}
public static class StatsResult implements Serializable {
private final double meanWordLength;
public StatsResult(double meanWordLength) {
this.meanWordLength = meanWordLength;
}
public double getMeanWordLength() {
return meanWordLength;
}
@Override
public String toString() {
return "meanWordLength: " + meanWordLength;
}
}
public static class JobFailed implements Serializable {
private final String reason;
public JobFailed(String reason) {
this.reason = reason;
}
public String getReason() {
return reason;
}
@Override
public String toString() {
return "JobFailed(" + reason + ")";
}
}
}
各ワードの文字数を計算する作業者(
worker
):public class StatsWorker extends AbstractActor {
Map<String, Integer> cache = new HashMap<String, Integer>();
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
String.class,
word -> {
Integer length = cache.get(word);
if (length == null) {
length = word.length();
cache.put(word, length);
}
getSender().tell(length, getSelf());
})
.build();
}
}
ユーザからテキストを受信し、それを単語に分割し、
workers
および集約(aggregates
)に委任するサービス:public class StatsService extends AbstractActor {
// This router is used both with lookup and deploy of routees. If you
// have a router with only lookup of routees you can use Props.empty()
// instead of Props.create(StatsWorker.class).
ActorRef workerRouter =
getContext()
.actorOf(FromConfig.getInstance().props(Props.create(StatsWorker.class)), "workerRouter");
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
StatsJob.class,
job -> !job.getText().isEmpty(),
job -> {
String[] words = job.getText().split(" ");
ActorRef replyTo = getSender();
// create actor that collects replies from workers
ActorRef aggregator =
getContext().actorOf(Props.create(StatsAggregator.class, words.length, replyTo));
// send each word to a worker
for (String word : words) {
workerRouter.tell(new ConsistentHashableEnvelope(word, word), aggregator);
}
})
.build();
}
}
public class StatsAggregator extends AbstractActor {
final int expectedResults;
final ActorRef replyTo;
final List<Integer> results = new ArrayList<Integer>();
public StatsAggregator(int expectedResults, ActorRef replyTo) {
this.expectedResults = expectedResults;
this.replyTo = replyTo;
}
@Override
public void preStart() {
getContext().setReceiveTimeout(Duration.ofSeconds(3));
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
Integer.class,
wordCount -> {
results.add(wordCount);
if (results.size() == expectedResults) {
int sum = 0;
for (int c : results) {
sum += c;
}
double meanWordLength = ((double) sum) / results.size();
replyTo.tell(new StatsResult(meanWordLength), getSelf());
getContext().stop(getSelf());
}
})
.match(
ReceiveTimeout.class,
x -> {
replyTo.tell(new JobFailed("Service unavailable, try again later"), getSelf());
getContext().stop(getSelf());
})
.build();
}
}
なお,これまで特定のクラスタはなく,通常のActorであった.
すべてのノードは、
StatsService
およびStatsWorker
Actorを起動します.この場合、ルーティングはworker
であることを覚えておいてください.ルータはroutees.paths
を構成しています.akka.actor.deployment {
/statsService/workerRouter {
router = consistent-hashing-group
routees.paths = ["/user/statsWorker"]
cluster {
enabled = on
allow-local-routees = on
use-roles = ["compute"]
}
}
}
これは、ユーザ要求が任意のノード上の
StatsService
に送信され、すべてのノード上でStatsWorker
が使用されることを意味する.最も簡単なルータの例を実行する方法は、ルーティンググループを使用してルータの例を実行する方法についての説明を含む「Akka Cluster Sample with Java」をダウンロードすることです.この例のソースコードは、「Akka Samples Repository」にもあります.
リモート配置ルーティングプール付きルータ
Pool
をクラスタメンバーノード上で作成および配置されたルーティングとともに使用する場合、ルータの構成は次のようになります.akka.actor.deployment {
/statsService/singleton/workerRouter {
router = consistent-hashing-pool
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = on
use-roles = ["compute"]
}
}
}
ルーティング(
use-roles
)の配置は、routees
を指定することによって、特定のロールセットがマークされたメンバーノードに制限することができる.max-total-nr-of-instances
は、クラスタ内のルーティングの合計数を定義しますが、各ノードのルーティング数、max-nr-of-instances-per-node
を超えません.デフォルトでは、max-total-nr-of-instances
は高い値(10000
)に設定されており、ノードがクラスタに追加されると、新しいルーティングがルータに追加されます.ルーティングの合計数を制限する場合は、その合計数を低い値に設定します.同じタイプのルータもコードで定義できます.
int totalInstances = 100;
int maxInstancesPerNode = 3;
boolean allowLocalRoutees = false;
Set<String> useRoles = new HashSet<>(Arrays.asList("compute"));
ActorRef workerRouter =
getContext()
.actorOf(
new ClusterRouterPool(
new ConsistentHashingPool(0),
new ClusterRouterPoolSettings(
totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles))
.props(Props.create(StatsWorker.class)),
"workerRouter3");
設定の詳細については、「構成参照」を参照してください.
リモート配置ルーティングプールを備えたルータの例
workers
の単一のプライマリノード(master node
)を作成および配置する上でクラスタセンシングルータを使用する方法を見てみましょう.単一のプライマリノードを追跡するために、クラスタツールモジュールのクラスタ単一例を使用します.ClusterSingletonManager
は各ノードで起動します.ClusterSingletonManagerSettings settings =
ClusterSingletonManagerSettings.create(system).withRole("compute");
system.actorOf(
ClusterSingletonManager.props(
Props.create(StatsService.class), PoisonPill.getInstance(), settings),
"statsService");
また、各ノードにActorがあり、現在の単一のプライマリノードの位置を追跡し、
StatsService
にジョブを委任する必要があります.ClusterSingletonProxy
によって提供されます.ClusterSingletonProxySettings proxySettings =
ClusterSingletonProxySettings.create(system).withRole("compute");
system.actorOf(
ClusterSingletonProxy.props("/user/statsService", proxySettings), "statsServiceProxy");
ClusterSingletonProxy
は、ユーザからのテキストを受信し、現在のStatsService
(シングルマスター)に委任する.クラスタイベントをリスニングして、最も古いノードのStatsService
を検索します.すべてのノードは、
ClusterSingletonProxy
およびClusterSingletonManager
を起動します.ルータは次のように構成されます.akka.actor.deployment {
/statsService/singleton/workerRouter {
router = consistent-hashing-pool
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = on
use-roles = ["compute"]
}
}
}
最も簡単なリモート配置ルーティングプールを持つルータの例を実行する方法は、リモート配置ルーティングプールを使用してルータを実行する方法の例について説明する「Akka Cluster Sample with Java」をダウンロードすることです.この例のソースコードは、「Akka Samples Repository」にもあります.
英文原文リンク:Closter Aware Routers.
——————☆☆――戻る->Akka中国語ガイド