ITridentSpout、FirstN(TopNを取る)実装、ストリームマージ、join

2981 ワード

一、ITridentSpout
トランザクションベース
static interface ITridentSpout.BatchCoordinator
           
static interface ITridentSpout.Emitter
インタフェースクラスの実装は、以前のトランザクションITransactionalSpoutと非常に似ています.
二、複数の集約を実行するための呼び出しチェーン

topology.newDRPCStream("top", drpc).each(new Fields("args"), new Split(“ ”), new Fields("time")).parallelismHint(5).stateQuery(myStates,new Fields("time"),new QueryPacketDB(),new Fields("srcip", "byt", "pkt")).groupBy(new Fields("srcip")).chainedAgg().aggregate(new Fields("byt"), new Sum(), new Fields("yt")).aggregate(new Fields("pkt"), new Sum(), new Fields("kt")).chainEnd().applyAssembly(new FirstN(10, "yt", true));

同僚が複数の集約を実行したい場合は、次の呼び出しチェーンを使用します.
mystream.chainedAgg()
        .partitionAggregate(new Count(), new Fields("count"))
        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
        .chainEnd()
このコードは、各パーティションでcountとsumの集約を実行します.出力には、「count」、「sum」フィールドが含まれます.
三、投影(projection)
投影操作は、データ上の列の切り取りです.
ストリームに「a」、「b」、「c」、「d」の4つのフィールドがある場合は、次のコードを実行します.
mystream.project(new Fields("b","d"));
出力ストリームには、「b」、「d」の2つのフィールドしかありません.
四、再分割(repartition)操作
再パーティション化操作は、task間のメタグループの分布を1つの関数で変更し、再パーティション化(repatition)はネットワーク転送を必要とし、集約やクエリーを容易にすることを目的としています.再分割関数は次のとおりです.
1.      
Shuffle:hadoopと同じように、同期したtupleを1つのパーティションに配置します.
2.Broadcast:各メタグループは、すべてのターゲットパーティションに繰り返し送信されます.これはDRPCで役に立ちます.各パーティションでstatequeryを作りたい場合は.
3.paritionBy:一連の配布フィールド(fields)に基づいて意味のパーティションを作成します.これらのフィールドにhash値をとり、ターゲットパーティション数をモデリングしてターゲットパーティションを取得します.paritionByは、同じ配布フィールド(fields)が同じターゲットパーティションに配布されることを保証する.
4.      
global:すべてのtupleが同じパーティションに配布されます.
5.batchGobal:このロットのすべてのtupleは同じパーティションに送信され、不通ロットは不通のパーティションに送信されます.
6.patition:この関数は、ユーザーがカスタマイズしたパーティション関数を受け入れます.ユーザ定義関数事項backtype.storm.grouping.CustomStreamGroupインタフェース.
五、連結と関連
複数のストリームを結合して1つのストリームにするには、次のようにします.
topology.merge(stream1, stream2, stream3);
Tridentがマージしたストリームフィールドは、最初のストリームのフィールドに名前が付けられます.
もう一つのストリームをマージする方法はjoinです.SQLのようなjoinは固定入力です.ストリームの入力は固定されていないので、sqlの方法でjoinをすることはできません.
Tridentのjoinはspoutが発行する各ロット間でのみ行われます.
ストリームにフィールド「key」、「val 1」、「val 2」が含まれている場合、
別のストリームには、フィールド「x」、「val 1」が含まれます.
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key","a","b","c"));
Stream 1のkeyはstream 2のxに関連付けられており、Tridentはすべてのフィールドに名前を変更する必要があります.
1.まずjoinフィールドです.例のstream 1の「key」はstream 2の「x」に対応する.
2.次に、join以外のフィールドを順番に並べ、joinに渡される順番に並べます.例では「a」、「b」はstream 1の「val 1」と「wal 2」、「c」はstream 2の「val 1」に対応する.
六、FirstN
トップNを取る
使用方法:
stream.applyAssembly(new FirstN(TOP_N, "sortField", true));
Tridentは要約型に適しており、脱重型には向いていない.