java-jstorm
7998 ワード
jstormはアリババがオープンソースのstormに基づいてJavaで書き換えた分布式リアルタイムストリーム計算フレームワークであり、使用が簡単で、特徴は以下の通りである:開発が非常に迅速である:インタフェースが簡単で、使いやすく、Topology、Spout、Boltのプログラミング規範を守れば拡張性の優れた応用を開発することができ、下層rpc、worker間の冗長性、データの分流などの動作は全く考慮しない.拡張性は極めて良いです:1級の処理ユニットの速度、直接合併数を配置すると、線形拡張性能が丈夫になります:workerが失効したり、機械が故障したりした場合、自動的に新しいworkerを割り当てて失効したworkerデータの正確性を置き換えます:Ackerメカニズムを採用して、データが失われないことを保証することができます.精度にもっと要求がある場合は、トランザクションメカニズムを採用し、データの正確さを保証します.利点:NimbusはHAを実現してStorm雪崩問題を徹底的に解決する:下層RPCはnetty+disruptorを採用して送信速度と受け入れ速度が一致する新しいsupervisor、Supervisor shutdownの時、新しい任務を提出して、workerの数が足りない時、すべて自動的に任務rebalanceの新しいtopologyをトリガして既存の任務に影響しないことを保証して、新しい任務は古い任務のcpu、memoryを奪う必要はありませんdiskとnetはZKへのアクセスを減らす:無駄なwatchを大量に取り除く;taskの心拍数は2倍に延長された.Task心拍数検出は全ZK走査Worker内部の全流水線モードを必要としない:Spout nextTupleとack/failは異なるスレッド性能で運行する:Zeromqを採用し、stormより30%速い;nettyを採用するとstormと10%速く,非常に安定である.
タスク#タスク#
jstormは簡単に使えますが、Topology、Spout、Boltのプログラミング仕様に従えばいいので、次の例ではこれらを一歩一歩完成します.例も簡単で,spoutで自己増加int配列が絶えず生成され,boltは数値を受け取ってログを印刷しhbaseに挿入する.
インストール:
別のブログを参考に
上記の3つのステップを経て、最も簡単なjstormアプリケーションの開発が完了し、次にコンパイル、パッケージングが完了した後、jarファイルjstorm-hbase-demo-0.1.jarを生成し、このjarファイルをjstormクラスタのnimbusマシンに提出すればよい:jstorm jar jstorm-hbase-demo-0.1.jar com.xirong.demo.BootStrap config.properties
タスク#タスク#
jstormは簡単に使えますが、Topology、Spout、Boltのプログラミング仕様に従えばいいので、次の例ではこれらを一歩一歩完成します.例も簡単で,spoutで自己増加int配列が絶えず生成され,boltは数値を受け取ってログを印刷しhbaseに挿入する.
インストール:
別のブログを参考に
public class TestSpout extends BaseRichSpout {
private static final Logger LOGGER = LoggerFactory.getLogger(TestSpout.class);
static AtomicInteger sAtomicInteger = new AtomicInteger(0);
static AtomicInteger pendNum = new AtomicInteger(0);
private int sqnum;
SpoutOutputCollector collector;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
sqnum = sAtomicInteger.incrementAndGet();
this.collector = collector;
}
@Override
public void nextTuple() {
while (true) {
int a = pendNum.incrementAndGet();
LOGGER.info(String.format("spount %d,pendNum %d", sqnum, a));
this.collector.emit(new Values("xxxxx:"+a));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("log"));
}
/** * ack , :https://github.com/alibaba/jstorm/wiki/Ack-%E6%9C%BA%E5%88%B6 * @param msgId */
@Override
public void ack(Object msgId) {
super.ack(msgId);
}
/** * * @param msgId */
@Override
public void fail(Object msgId) {
super.fail(msgId);
LOGGER.info("ack fail,msgId"+msgId);
}
}
public class TestBolt extends BaseRichBolt {
private static final Logger LOGGER = CustomerLoggerFactory.LOGGER(TestBolt.class);
OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String xx = input.getString(0);
LOGGER.info(String.format("receive from spout ,num is : %d", xx));
// ack spout , hbase , hbase ack ,
this.collector.ack(input);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public class TestTopology implements ILogTopology {
@Override
public void start(Properties properties) throws AlreadyAliveException, InvalidTopologyException, InterruptedException, IOException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("testspout", new TestSpout(), 1);
builder.setBolt("testbolt", new TestBolt(), 2).shuffleGrouping("testspout");
Config conf = ConfigUtils.getStormConfig(properties);
conf.setNumAckers(1);
StormSubmitter.submitTopology("testtopology", conf, builder.createTopology());
System.out.println("storm cluster will start");
}
}
上記の3つのステップを経て、最も簡単なjstormアプリケーションの開発が完了し、次にコンパイル、パッケージングが完了した後、jarファイルjstorm-hbase-demo-0.1.jarを生成し、このjarファイルをjstormクラスタのnimbusマシンに提出すればよい:jstorm jar jstorm-hbase-demo-0.1.jar com.xirong.demo.BootStrap config.properties