Storm DRPC
4583 ワード
DRPCの概要
DRPCは同時RPCアーキテクチャを実現するために実現され、DはDistributedであり、Storm分散、同時能力を利用してRPCの高性能を実現する.
DRPCアーキテクチャ
DRPCのアーキテクチャは以下の通りである.
注意:Topologyのemitのtupleで、最初のfieldは「id」で、値は要求IDでなければなりません.
私たちはTopology部分を実現するだけでいいです.
Storm DPRC APIの紹介
まずDRPCクライアントのAPIを見てみましょう.
DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");
まず、DRPCクライアントを表すDRPCClientオブジェクトを構築します.1番目のパラメータはホストアドレスで、2番目のパラメータはポート番号です.その後、このclientでexecute関数を呼び出し、リモートで関数を実行します.executeの最初のパラメータは関数名を表し、2番目のパラメータは関数パラメータです.
DRPCサーバ側のAPIを見てみましょう.
ローカルモードのDRPC:
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
builder.addBolt(new ExclaimBolt(), 3);
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
cluster.shutdown();
drpc.shutdown();
リモートモードのDRPC:
StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
リモートモードのDRPCとローカルモードのDRPCの違いは以下の点である.
Trident DRPC APIの紹介
TridentTopology topology = new TridentTopology();
topology.newDRPCStream("words")
.each(new Fields("args"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
TridentTopologyでtopologyを構築し、newDRPCStreamを呼び出してDRPCストリームを構築します.パラメータは関数名です.後の内容は通常のTrident APIと同じです.