Stormリアルタイムクラウドコンピューティング学習使用基本apiおよび高レベルapi tridentの基本使用を含む
ここでSparkとStormの違いを補足します.Stormは1 s以下のリアルタイムクエリーを実現できますが、Sparkはできません.Stormのより自由なスタイルの計算はboltノードで、Sparkはチェーンで、Spark streamはstormと同じようにCEPのクエリーSparkはML graphxマシン学習図計算SQLはhiveのようなリアルタイムクエリーStormがありません.Stormはリアルタイムストリームに適しています.Sparkは、メッセージ・ミドルウェアからでも他のソースからでも、バッファ・ストレージのあるロット・ソースに適しています.StormにはDRPCがリアルタイムのクエリを行い結果を返すタイプがある.
1.Storm基本API(Trident以外)
まず私が提案したポイントをお話しします
1.BasicRichBolt/BasicBasicBolt
2. Stream Joinの方式
3.Groupの種類
4.BaseBatchBoltロット処理
5.Drpcの使用
6.画像検索の実戦
1.まずBasicRichBoltとBasicBasicBoltの主な違いは、確認する必要がないことを確認することです.
すなわち,前者はackメソッドを呼び出し,後者は自動的に呼び出す必要がある.失敗すると最終的にソースspoutに到達します
spoutから削除されることを確認すると、自己実装を上書きできます.
どうやって使うか見てみましょう
まず
2.上で述べたjoin方式は、ここでいくつか指摘されています.複数のspoutを提出しbolt上で判断する.emitはstream idをトップに置くstream idにデータemit(String streamId,List
1.Storm基本API(Trident以外)
まず私が提案したポイントをお話しします
1.BasicRichBolt/BasicBasicBolt
2. Stream Joinの方式
3.Groupの種類
4.BaseBatchBoltロット処理
5.Drpcの使用
6.画像検索の実戦
1.まずBasicRichBoltとBasicBasicBoltの主な違いは、確認する必要がないことを確認することです.
すなわち,前者はackメソッドを呼び出し,後者は自動的に呼び出す必要がある.失敗すると最終的にソースspoutに到達します
spoutから削除されることを確認すると、自己実装を上書きできます.
public void ack(Object msgId) {
System.out.println("OK:"+msgId);
}
public void close() {}
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);
}
どうやって使うか見てみましょう
まず
prepare cleanup , jdbc
execute bolt , spark spark , storm bolt
tuple spout bolt emit , id fieldName
byte[] datas=input.getBinary(3);
input.getBinaryByField("datas");
emit
collector.emit(new Values(uuid,imageid,1,1.0));
emit , bolt stream id , stream id TimeCacheMap join join 。 join spout bolt field 。
uuid imageid
declarer.declare(new Fields("id","imageid","type","score"));
。
public class ImageSurfCompareBolt extends BaseBasicBolt {
private QueryRunner qr;
private Set<String> ids = Sets.newConcurrentHashSet();
//LockFreeOrderedList<String> ids=new LockFreeOrderedList<String>();
@Override
public void prepare(Map stormConf, TopologyContext context) {
qr=new QueryRunner(DBHelper.getDateSource());
context.getSharedExecutor().execute(new Runnable() {
@Override
public void run() {
ImageSurf surf=ImageSurf.surf;
}
});
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","imageid","type","score"));
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String imageid=input.getString(1);
Integer type=input.getInteger(2);
byte[] datas=input.getBinary(3);
final Object uuid=input.getValue(0);
String key=uuid.toString()+"-"+imageid;
if(!ids.contains(key)){
//
//imagecache.put(key, imageid);
ids.add(key);
String sql="select path from image_path where id=?";
try {
String imagepath=qr.query(sql, new ScalarHandler<String>(1),imageid);
String fileName=imagepath;
if(imagepath!=null){
BufferedImage imageA=ImageIO.read(new File(fileName));
BufferedImage imageB=ImageIO.read(new ByteArrayInputStream(datas));
double score=ImageSurf.compare(imageA, imageB);
if(type==4){
MatchModel matchModel=DetectUtility.matchImage(imageA, imageB);
double mscore=matchModel.getScore();
Integer sum=matchModel.getSum();
if(sum>200&&mscore>0.6){
collector.emit(new Values(uuid,imageid,3,String.valueOf(score)));
}
}else{
collector.emit(new Values(uuid,imageid,type,String.valueOf(score)));
}
//MatchModel imatchscore=DetectUtility.matchImage(imageA, imageB);
//System.out.println(fileName+"########################"+score);
}
} catch (Exception e) {
System.out.println("=========imageSurfCompare=========="+e.getMessage());
}
if(ids.size()>90){
ids.removeIf(new java.util.function.Predicate<String>() {
@Override
public boolean test(String t) {
if(t.startsWith(uuid.toString())){
return false;
}else{
return true;
}
}
});
}
}
}
2.上で述べたjoin方式は、ここでいくつか指摘されています.複数のspoutを提出しbolt上で判断する.emitはstream idをトップに置くstream idにデータemit(String streamId,List