Storm Trident DRPC C/Sクラスタモード
5162 ワード
Storm Trident DRPC C/Sクラスタモード
概要
Storm Tridentフレームワークに基づいて、簡単なDRPCサービス側とクライアントが実現します.具体的にはstorm-starter、基本概念:Stormは分布式リアルタイム処理フレームワークである.DRPCは同時のRPCアーキテクチャを実現するために実現され、その中でDはDistributedであり、Stormの分布式、同時の能力を利用して、RPCの高性能を実現する.TridentはStorm 0.8である.0バージョンで導入された新しい特性は、Stormメタ語に基づいてリアルタイム計算を行うユーザーに、高スループット(毎秒百万レベルのメッセージ)と低処理遅延を同時に満たすより高度な抽象メタ語を提供します.
インプリメンテーション
サービス側
LinearderDRPCTopologyBuilderが廃止されたため、TridentTopologyでDRPCSteamストリームを生成することを推奨します.
上記のコードは、TridentTopologyを使用してDRPCStreamストリームを生成し、drpc関数として
クライアント
args[0]、args[1]はそれぞれサービス側アドレスおよびポートであり、その値はstormである.yamlの設定値.executeメソッドの最初のパラメータは、サービス側DRPCSteamストリームによって定義された関数名であり、ここでは
クラスタ構成
各ノードのプロファイルstormを更新する.yaml増加パラメータ
概要
Storm Tridentフレームワークに基づいて、簡単なDRPCサービス側とクライアントが実現します.具体的にはstorm-starter、基本概念:Stormは分布式リアルタイム処理フレームワークである.DRPCは同時のRPCアーキテクチャを実現するために実現され、その中でDはDistributedであり、Stormの分布式、同時の能力を利用して、RPCの高性能を実現する.TridentはStorm 0.8である.0バージョンで導入された新しい特性は、Stormメタ語に基づいてリアルタイム計算を行うユーザーに、高スループット(毎秒百万レベルのメッセージ)と低処理遅延を同時に満たすより高度な抽象メタ語を提供します.
インプリメンテーション
サービス側
LinearderDRPCTopologyBuilderが廃止されたため、TridentTopologyでDRPCSteamストリームを生成することを推奨します.
public class DrpcTopology {
public static class Split extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
collector.emit(new Values(word));
}
}
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if (args.length == 0) {
LocalCluster cluster = new LocalCluster();
LocalDRPC drpc = new LocalDRPC();
cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
System.out.println("DRPC RESULT: " + drpc.execute("data", "cat the dog jumped"));
drpc.shutdown();
cluster.shutdown();
} else {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
}
}
public static StormTopology buildTopology(LocalDRPC drpc) {
TridentTopology topology = new TridentTopology();
topology.newDRPCStream("data", drpc)
.each(new Fields("args"), new Split(), new Fields("result"))
.project(new Fields("result"));
return topology.build();
}
}
上記のコードは、TridentTopologyを使用してDRPCStreamストリームを生成し、drpc関数として
data
と命名します.Splitで文字列をスペースで分割し、複数の単語フィールドを下に送り、クライアントに返します.クライアント
public class DrpcClient {
public static void main(String[] args) throws TException, DRPCExecutionException {
DRPCClient client = new DRPCClient(args[0], args[1], null);
System.out.println(client.execute("data","cat the dog jumped"));
}
}
args[0]、args[1]はそれぞれサービス側アドレスおよびポートであり、その値はstormである.yamlの設定値.executeメソッドの最初のパラメータは、サービス側DRPCSteamストリームによって定義された関数名であり、ここでは
data
である.サーバトポロジをコミットした後(コミットプロセスは通常のトポロジと同じ)、クライアントを実行し、返されるデータは次のように表示されます.[["cat"],["the"],["dog"],["jumped"]]
クラスタ構成
各ノードのプロファイルstormを更新する.yaml増加パラメータ
drpc.servers: - "10.0.3.31"
drpc.port: 3772
DRPCサービスコマンドstorm drpc
を起動