Akkaガイドの「クラスタ感知ルータ」


お知らせ:Akka中国語ガイドのGitHubアドレスは「akka-guide」です.StarFork、誤り訂正を歓迎します.
クラスタ感知ルータ
すべての「routers」は、クラスタ内のメンバーノード、すなわち新しいルーティング(routees)を配備するか、クラスタ内のノード上でルーティングを検索するかを知ることができる.ノードがクラスタにアクセスまたは離れることができない場合、ノードのルーティングは自動的にルータからログアウトされます.新しいノードがクラスタに追加されると、構成に従ってルータに追加のルーティングが追加されます.1つのノードがアクセス不可になった後に再びアクセス可能になった場合、ルーティングも追加されます.
クラスタセンシングルーティング(Cluster aware routers)は、WeaklyUpステータスのメンバー(この機能が有効である場合)を使用することができる.
2つの異なるタイプのルータがあります.
  • Group、Actor selectionを使用して指定されたパスのルータにメッセージを送信します.ルーティングは、クラスタ内の異なるノードで実行できるルータ間で共有できます.このタイプのルータの一例は、クラスタ内のいくつかのバックエンドノード上で実行されるサービスであり、クラスタ内のフロントエンドノード上で実行されるルータによって使用され得る.
  • Poolは、ルーティングをサブActorとして作成し、各ルータに独自のルーティングインスタンスがあるリモートノードに配備します.たとえば、10ノードクラスタ内の3つのノードでルータを起動する場合、ルータが各ノードに1つのインスタンスを使用するように構成されている場合、合計30のルーティングがあります.異なるルータで作成されたルーティングは、ルータ間で共有されません.このタイプのルータの一例は、ジョブを調整し、クラスタ内の他のノードで実行されるルーティングに実際の動作を委任する個別のmasterである.

  • に頼る
    クラスタセンシングルータ(Cluster Aware Routers)を使用するには、プロジェクトに次の依存度を追加する必要があります.
    
    <dependency>
      <groupId>com.typesafe.akkagroupId>
      <artifactId>akka-cluster_2.12artifactId>
      <version>2.5.21version>
    dependency>
    
    
    dependencies {
      compile group: 'com.typesafe.akka', name: 'akka-cluster_2.12', version: '2.5.21'
    }
    
    
    libraryDependencies += "com.typesafe.akka" %% "akka-cluster" % "2.5.21"
    

    グループルータGroupを使用する場合は、クラスタメンバーノード上でルーティングActorを起動する必要があります.これはルータによって行われたものではありません.グループの構成は次のとおりです.
    akka.actor.deployment {
      /statsService/workerRouter {
          router = consistent-hashing-group
          routees.paths = ["/user/statsWorker"]
          cluster {
            enabled = on
            allow-local-routees = on
            use-roles = ["compute"]
          }
        }
    }
    
  • 注意:メンバーステータスがUpに変更されると、ルータがそれらを使用しようとするため、Actorシステムが起動されると、ルーティングActorはできるだけ早く起動する必要があります.
  • routees.pathsで定義されたActorパスは、ルータによってメッセージが転送されるActorを選択するために使用される.パスは、クラスタメンバー(membership)から動的に取得されるため、プロトコルおよびアドレス情報を含むべきではない.メッセージは「ActorSelection」を使用してルーティングに転送されるので、同じ伝達意味を使用する必要があります.use-rolesを指定することにより、ルーティングの検索を特定のロールセットがマークされたメンバーノードに制限することができる.max-total-nr-of-instancesは、クラスタ内のルーティングの合計数を定義します.デフォルトでは、max-total-nr-of-instancesは高い値(10000)に設定されており、ノードがクラスタに追加されると、新しいルーティングがルータに追加されます.ルーティングの合計数を制限する場合は、その合計数を低い値に設定します.
    同じタイプのルータもコードで定義できます.
    int totalInstances = 100;
    Iterable<String> routeesPaths = Collections.singletonList("/user/statsWorker");
    boolean allowLocalRoutees = true;
    Set<String> useRoles = new HashSet<>(Arrays.asList("compute"));
    ActorRef workerRouter =
        getContext()
            .actorOf(
                new ClusterRouterGroup(
                        new ConsistentHashingGroup(routeesPaths),
                        new ClusterRouterGroupSettings(
                            totalInstances, routeesPaths, allowLocalRoutees, useRoles))
                    .props(),
                "workerRouter2");
    

    設定の詳細については、「構成参照」を参照してください.
    ルーティンググループ付きルータの例
    クラスタセンシングルータをルーティングのセット(すなわち、ルータパスに送信されるルーティング)と一緒に使用する方法を見てみましょう.
    サンプル・アプリケーションは、テキスト統計を計算するサービスを提供します.一部のテキストがサービスに送信されると、それを単語に分割し、各単語の文字数を計算するために、タスクを個別のワークプロセス(ルータのルーティング)に割り当てます.各ワードの文字数は、すべての結果を収集するときに各ワードの平均文字数を計算する集約器(aggregator)に送られる.
    メッセージ:
    public interface StatsMessages {
    
      public static class StatsJob implements Serializable {
        private final String text;
    
        public StatsJob(String text) {
          this.text = text;
        }
    
        public String getText() {
          return text;
        }
      }
    
      public static class StatsResult implements Serializable {
        private final double meanWordLength;
    
        public StatsResult(double meanWordLength) {
          this.meanWordLength = meanWordLength;
        }
    
        public double getMeanWordLength() {
          return meanWordLength;
        }
    
        @Override
        public String toString() {
          return "meanWordLength: " + meanWordLength;
        }
      }
    
      public static class JobFailed implements Serializable {
        private final String reason;
    
        public JobFailed(String reason) {
          this.reason = reason;
        }
    
        public String getReason() {
          return reason;
        }
    
        @Override
        public String toString() {
          return "JobFailed(" + reason + ")";
        }
      }
    }
    

    各ワードの文字数を計算する作業者(worker):
    public class StatsWorker extends AbstractActor {
    
      Map<String, Integer> cache = new HashMap<String, Integer>();
    
      @Override
      public Receive createReceive() {
        return receiveBuilder()
            .match(
                String.class,
                word -> {
                  Integer length = cache.get(word);
                  if (length == null) {
                    length = word.length();
                    cache.put(word, length);
                  }
                  getSender().tell(length, getSelf());
                })
            .build();
      }
    }
    

    ユーザからテキストを受信し、それを単語に分割し、workersおよび集約(aggregates)に委任するサービス:
    public class StatsService extends AbstractActor {
    
      // This router is used both with lookup and deploy of routees. If you
      // have a router with only lookup of routees you can use Props.empty()
      // instead of Props.create(StatsWorker.class).
      ActorRef workerRouter =
          getContext()
              .actorOf(FromConfig.getInstance().props(Props.create(StatsWorker.class)), "workerRouter");
    
      @Override
      public Receive createReceive() {
        return receiveBuilder()
            .match(
                StatsJob.class,
                job -> !job.getText().isEmpty(),
                job -> {
                  String[] words = job.getText().split(" ");
                  ActorRef replyTo = getSender();
    
                  // create actor that collects replies from workers
                  ActorRef aggregator =
                      getContext().actorOf(Props.create(StatsAggregator.class, words.length, replyTo));
    
                  // send each word to a worker
                  for (String word : words) {
                    workerRouter.tell(new ConsistentHashableEnvelope(word, word), aggregator);
                  }
                })
            .build();
      }
    }
    
    public class StatsAggregator extends AbstractActor {
    
      final int expectedResults;
      final ActorRef replyTo;
      final List<Integer> results = new ArrayList<Integer>();
    
      public StatsAggregator(int expectedResults, ActorRef replyTo) {
        this.expectedResults = expectedResults;
        this.replyTo = replyTo;
      }
    
      @Override
      public void preStart() {
        getContext().setReceiveTimeout(Duration.ofSeconds(3));
      }
    
      @Override
      public Receive createReceive() {
        return receiveBuilder()
            .match(
                Integer.class,
                wordCount -> {
                  results.add(wordCount);
                  if (results.size() == expectedResults) {
                    int sum = 0;
                    for (int c : results) {
                      sum += c;
                    }
                    double meanWordLength = ((double) sum) / results.size();
                    replyTo.tell(new StatsResult(meanWordLength), getSelf());
                    getContext().stop(getSelf());
                  }
                })
            .match(
                ReceiveTimeout.class,
                x -> {
                  replyTo.tell(new JobFailed("Service unavailable, try again later"), getSelf());
                  getContext().stop(getSelf());
                })
            .build();
      }
    }
    

    なお,これまで特定のクラスタはなく,通常のActorであった.
    すべてのノードは、StatsServiceおよびStatsWorker Actorを起動します.この場合、ルーティングはworkerであることを覚えておいてください.ルータはroutees.pathsを構成しています.
    akka.actor.deployment {
      /statsService/workerRouter {
        router = consistent-hashing-group
        routees.paths = ["/user/statsWorker"]
        cluster {
          enabled = on
          allow-local-routees = on
          use-roles = ["compute"]
        }
      }
    }
    

    これは、ユーザ要求が任意のノード上のStatsServiceに送信され、すべてのノード上でStatsWorkerが使用されることを意味する.
    最も簡単なルータの例を実行する方法は、ルーティンググループを使用してルータの例を実行する方法についての説明を含む「Akka Cluster Sample with Java」をダウンロードすることです.この例のソースコードは、「Akka Samples Repository」にもあります.
    リモート配置ルーティングプール付きルータPoolをクラスタメンバーノード上で作成および配置されたルーティングとともに使用する場合、ルータの構成は次のようになります.
    akka.actor.deployment {
      /statsService/singleton/workerRouter {
          router = consistent-hashing-pool
          cluster {
            enabled = on
            max-nr-of-instances-per-node = 3
            allow-local-routees = on
            use-roles = ["compute"]
          }
        }
    }
    

    ルーティング(use-roles)の配置は、routeesを指定することによって、特定のロールセットがマークされたメンバーノードに制限することができる.max-total-nr-of-instancesは、クラスタ内のルーティングの合計数を定義しますが、各ノードのルーティング数、max-nr-of-instances-per-nodeを超えません.デフォルトでは、max-total-nr-of-instancesは高い値(10000)に設定されており、ノードがクラスタに追加されると、新しいルーティングがルータに追加されます.ルーティングの合計数を制限する場合は、その合計数を低い値に設定します.
    同じタイプのルータもコードで定義できます.
    int totalInstances = 100;
    int maxInstancesPerNode = 3;
    boolean allowLocalRoutees = false;
    Set<String> useRoles = new HashSet<>(Arrays.asList("compute"));
    ActorRef workerRouter =
        getContext()
            .actorOf(
                new ClusterRouterPool(
                        new ConsistentHashingPool(0),
                        new ClusterRouterPoolSettings(
                            totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles))
                    .props(Props.create(StatsWorker.class)),
                "workerRouter3");
    

    設定の詳細については、「構成参照」を参照してください.
    リモート配置ルーティングプールを備えたルータの例workersの単一のプライマリノード(master node)を作成および配置する上でクラスタセンシングルータを使用する方法を見てみましょう.単一のプライマリノードを追跡するために、クラスタツールモジュールのクラスタ単一例を使用します.ClusterSingletonManagerは各ノードで起動します.
    ClusterSingletonManagerSettings settings =
        ClusterSingletonManagerSettings.create(system).withRole("compute");
    system.actorOf(
        ClusterSingletonManager.props(
            Props.create(StatsService.class), PoisonPill.getInstance(), settings),
        "statsService");
    

    また、各ノードにActorがあり、現在の単一のプライマリノードの位置を追跡し、StatsServiceにジョブを委任する必要があります.ClusterSingletonProxyによって提供されます.
    ClusterSingletonProxySettings proxySettings =
        ClusterSingletonProxySettings.create(system).withRole("compute");
    system.actorOf(
        ClusterSingletonProxy.props("/user/statsService", proxySettings), "statsServiceProxy");
    
    ClusterSingletonProxyは、ユーザからのテキストを受信し、現在のStatsService(シングルマスター)に委任する.クラスタイベントをリスニングして、最も古いノードのStatsServiceを検索します.
    すべてのノードは、ClusterSingletonProxyおよびClusterSingletonManagerを起動します.ルータは次のように構成されます.
    akka.actor.deployment {
      /statsService/singleton/workerRouter {
        router = consistent-hashing-pool
        cluster {
          enabled = on
          max-nr-of-instances-per-node = 3
          allow-local-routees = on
          use-roles = ["compute"]
        }
      }
    }
    

    最も簡単なリモート配置ルーティングプールを持つルータの例を実行する方法は、リモート配置ルーティングプールを使用してルータを実行する方法の例について説明する「Akka Cluster Sample with Java」をダウンロードすることです.この例のソースコードは、「Akka Samples Repository」にもあります.
    英文原文リンク:Closter Aware Routers.
    ——————☆☆――戻る->Akka中国語ガイド