Storm Trident DRPC C/Sクラスタモード

5162 ワード

Storm Trident DRPC C/Sクラスタモード
概要
Storm Tridentフレームワークに基づいて、簡単なDRPCサービス側とクライアントが実現します.具体的にはstorm-starter、基本概念:Stormは分布式リアルタイム処理フレームワークである.DRPCは同時のRPCアーキテクチャを実現するために実現され、その中でDはDistributedであり、Stormの分布式、同時の能力を利用して、RPCの高性能を実現する.TridentはStorm 0.8である.0バージョンで導入された新しい特性は、Stormメタ語に基づいてリアルタイム計算を行うユーザーに、高スループット(毎秒百万レベルのメッセージ)と低処理遅延を同時に満たすより高度な抽象メタ語を提供します.
インプリメンテーション
サービス側
LinearderDRPCTopologyBuilderが廃止されたため、TridentTopologyでDRPCSteamストリームを生成することを推奨します.
public class DrpcTopology {
    public static class Split extends BaseFunction {
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String sentence = tuple.getString(0);
            for (String word : sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
    }

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
        Config conf = new Config();
        conf.setMaxSpoutPending(20);
        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            LocalDRPC drpc = new LocalDRPC();
            cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
            System.out.println("DRPC RESULT: " + drpc.execute("data", "cat the dog jumped"));
            drpc.shutdown();
            cluster.shutdown();
        } else {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
        }
    }
    public static StormTopology buildTopology(LocalDRPC drpc) {
        TridentTopology topology = new TridentTopology();
        topology.newDRPCStream("data", drpc)
                .each(new Fields("args"), new Split(), new Fields("result"))
                .project(new Fields("result"));
        return topology.build();
    }
}

上記のコードは、TridentTopologyを使用してDRPCStreamストリームを生成し、drpc関数としてdataと命名します.Splitで文字列をスペースで分割し、複数の単語フィールドを下に送り、クライアントに返します.
クライアント
public class DrpcClient {
    public static void main(String[] args) throws TException, DRPCExecutionException {
        DRPCClient client = new DRPCClient(args[0], args[1], null);
        System.out.println(client.execute("data","cat the dog jumped"));
    }
}

args[0]、args[1]はそれぞれサービス側アドレスおよびポートであり、その値はstormである.yamlの設定値.executeメソッドの最初のパラメータは、サービス側DRPCSteamストリームによって定義された関数名であり、ここではdataである.サーバトポロジをコミットした後(コミットプロセスは通常のトポロジと同じ)、クライアントを実行し、返されるデータは次のように表示されます.
[["cat"],["the"],["dog"],["jumped"]]

クラスタ構成
各ノードのプロファイルstormを更新する.yaml増加パラメータdrpc.servers: - "10.0.3.31" drpc.port: 3772 DRPCサービスコマンドstorm drpcを起動