Stormリアルタイムクラウドコンピューティング学習使用基本apiおよび高レベルapi tridentの基本使用を含む


ここでSparkとStormの違いを補足します.Stormは1 s以下のリアルタイムクエリーを実現できますが、Sparkはできません.Stormのより自由なスタイルの計算はboltノードで、Sparkはチェーンで、Spark streamはstormと同じようにCEPのクエリーSparkはML graphxマシン学習図計算SQLはhiveのようなリアルタイムクエリーStormがありません.Stormはリアルタイムストリームに適しています.Sparkは、メッセージ・ミドルウェアからでも他のソースからでも、バッファ・ストレージのあるロット・ソースに適しています.StormにはDRPCがリアルタイムのクエリを行い結果を返すタイプがある.
1.Storm基本API(Trident以外)
まず私が提案したポイントをお話しします
     1.BasicRichBolt/BasicBasicBolt
     2. Stream Joinの方式
     3.Groupの種類
     4.BaseBatchBoltロット処理
     5.Drpcの使用
     6.画像検索の実戦
1.まずBasicRichBoltとBasicBasicBoltの主な違いは、確認する必要がないことを確認することです.
すなわち,前者はackメソッドを呼び出し,後者は自動的に呼び出す必要がある.失敗すると最終的にソースspoutに到達します
spoutから削除されることを確認すると、自己実装を上書きできます. 
    
public void ack(Object msgId) {
System.out.println("OK:"+msgId);
}
public void close() {}
public void fail(Object msgId) {
System.out.println("FAIL:"+msgId);
}

どうやって使うか見てみましょう
まず
prepare                cleanup         ,  jdbc   
  execute bolt    , spark    spark       , storm     bolt   
      
  tuple              spout    bolt emit     ,      id           fieldName   
  byte[] datas=input.getBinary(3);
  input.getBinaryByField("datas");
         emit 
  collector.emit(new Values(uuid,imageid,1,1.0));
  emit      ,      bolt       stream id  , stream id          TimeCacheMap       join       join    。      join          spout     bolt        field        。
         uuid   imageid                
  declarer.declare(new Fields("id","imageid","type","score"));
           
                   。
public class ImageSurfCompareBolt  extends BaseBasicBolt {
	
	
    
    
    private QueryRunner qr;
    
   
	private Set<String> ids = Sets.newConcurrentHashSet();
    //LockFreeOrderedList<String> ids=new LockFreeOrderedList<String>();
	

	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		qr=new QueryRunner(DBHelper.getDateSource());
		context.getSharedExecutor().execute(new Runnable() {
			
			@Override
			public void run() {
				ImageSurf surf=ImageSurf.surf;
			}
		});
	 
	}



	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("id","imageid","type","score"));
		
	}



	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		String imageid=input.getString(1);
		Integer type=input.getInteger(2);
		byte[] datas=input.getBinary(3);
		final Object uuid=input.getValue(0);
		String key=uuid.toString()+"-"+imageid;
		if(!ids.contains(key)){
			
			
			//     
			
			//imagecache.put(key, imageid);
			ids.add(key);
			String sql="select path from image_path where id=?";
			try {
				String imagepath=qr.query(sql, new ScalarHandler<String>(1),imageid);
				String fileName=imagepath;
				if(imagepath!=null){
				BufferedImage imageA=ImageIO.read(new File(fileName));
				BufferedImage imageB=ImageIO.read(new ByteArrayInputStream(datas));
				double score=ImageSurf.compare(imageA, imageB);
				if(type==4){
					MatchModel matchModel=DetectUtility.matchImage(imageA, imageB);
					double mscore=matchModel.getScore();
					Integer sum=matchModel.getSum();
					if(sum>200&&mscore>0.6){
						collector.emit(new Values(uuid,imageid,3,String.valueOf(score)));
					}
				}else{
					collector.emit(new Values(uuid,imageid,type,String.valueOf(score)));
				}
				//MatchModel imatchscore=DetectUtility.matchImage(imageA, imageB);
				//System.out.println(fileName+"########################"+score);
				
				}
			} catch (Exception e) {
				System.out.println("=========imageSurfCompare=========="+e.getMessage());
			}
			
            if(ids.size()>90){ 
				
            	
				ids.removeIf(new java.util.function.Predicate<String>() {

					@Override
					public boolean test(String t) {
						if(t.startsWith(uuid.toString())){
							return false;
							}else{
								return true;
							}
					}
				});
			}
			
		}
		
	}

   2.上で述べたjoin方式は、ここでいくつか指摘されています.複数のspoutを提出しbolt上で判断する.emitはstream idをトップに置くstream idにデータemit(String streamId,Listtuple)emit(String streamId,Listtuple,String messageId)をコミットし、boltで対応するstream 3を受け入れるように設定.複数のspoutをコミットしてfinishBatchメソッドにTimeCacheMapを集合してjoinすることをbasicBatchBolt方式で行う
   3.グループ方式
 //1. Shuffle Group:ランダムにグループ化し、streamの中のtupleをランダムに配布し、boltごとに受信tupleの数が同じであることを保証する.
   //2. Fields Group:useridでグループ化するなど、フィールド別にグループ化すると、同じuseridを持つtupleは同じBoltsに分割され、異なるuseridは異なるBoltsに割り当てられます.
   //3. All Group:ブロードキャスト送信、各tupleについて、すべてのBoltsが受信します.
   //4. Global Group:stormの1つのboltの1つtaskに割り当てられるグローバルパケット.さらに具体的にはid値が最も低いtaskに割り当てる.
   //5. Non Group:グループ化しないで、streamは誰がそれを受け取るのか気にしないという意味です.現在、彼はShuffleグループと同じ効果で、stormがこのboltをこのboltの購読者と同じスレッドに置いて実行するのとは少し違います.
   //6. Direct Group:直接パケット、これは比較的特別なパケット方法であり、このパケットはメッセージの送信者がメッセージ受信者のどのtaskによってこのメッセージを処理するかを意味する.このパケット化方法は、Direct Streamとして宣言するメッセージストリームのみが宣言できる.また、このメッセージtupleは、emitDirect方式を用いる送信しなければならない.メッセージ処理者は、TopologyContextまたはそのメッセージを処理するtaskid(OutputCollector.emitメソッド)を使用してtaskidを返すことができます.
//トポロジは、メッセージ(メタグループ)が1つ以上のブランチを通過するツリー構造です.ツリー上の各ノードはack(tuple)またはfail(tuple)を呼び出すので、Stormはメッセージが失敗したかどうかを知り、これらのメッセージを製造したspout(s)を通知します.Stormトポロジが高度に並列化された環境で動作する以上、始発spoutインスタンスを追跡する最善の方法は、メッセージメタグループ内に始発spout参照を含むことである.このテクニックをアンカーと呼ぶ(原文はAnchoring)
     4.BaseBatchBoltバッチ処理は、対応するすべてのfieldGroup/他のgroupが最後の呼び出しを完了しfinishBatchメソッドを呼び出すのを待つため、redis zsortSetのホッピング構造などのTopN問題を求めるために使用できます.
次に例を示します
/**
 *   vladdistance                
 * @author zhuyuping
 * 2016 3 5 
 */
public class ImageVladIndexSearchBolt extends BaseBatchBolt{

	Object uuid;
	BatchOutputCollector collector;
	
	private double[][] distances;
	Map<Integer,double[][]> knns; 
	QueryRunner runner;
	byte[] imgdatas=null;
	
	@Override
	public void prepare(Map conf, TopologyContext context,
			BatchOutputCollector collector, Object id) {
		 this.uuid=id;
		 this.collector=collector;
		 this.runner=new QueryRunner(DBHelper.getDateSource());
		 this.distances=new double[16][256];
		 this.knns=AliyunOSSUtils.getKNNDatas();
	}

	@Override
	public void execute(Tuple tuple) {
		//declarer.declare(new Fields("id","key","query","k"));
		Integer key=tuple.getInteger(1);
		//System.out.println("======================"+key);
		if(imgdatas==null)
		imgdatas=tuple.getBinary(4);
		double[] qus=(double[]) tuple.getValue(2);
		//System.out.println("qu is $$$$$$ "+qus.length);
		Integer k=tuple.getInteger(3);
		final BoundedPriorityQueue<IntDoublePair> queue = new BoundedPriorityQueue<IntDoublePair>(
				k, IntDoublePair.SECOND_ITEM_ASCENDING_COMPARATOR);
		double[][] nn=knns.get(key);
		for (int j = 0; j < k; j++) {
			double score = DoubleFVComparison.SUM_SQUARE.compare(qus,nn[j]);
			int index = key;
			IntDoublePair wp = new IntDoublePair(index, score);
			wp = queue.offerItem(wp);
		}
		List<IntDoublePair> pair = queue.toOrderedListDestructive();// double[]
		double[] sort = new double[pair.size()];
		for (int j = 0; j < pair.size(); j++) {
			sort[j] = pair.get(j).second;
			//System.out.println("vlad sort is ********************"+sort[j]);
		}
		
		
		this.distances[key]=sort;
		System.out.println("=================="+this.distances[key][70]);
	}

	@Override
	public void finishBatch() {
		//                 mysql     count     10 
		System.out.println("=================="+this.distances[0][70]);
		try{
		List<VladIndex> vladIndexs=runner.query("select * from vladindex", new ResultSetHandler<List<VladIndex>>() {
			@Override
			public List<VladIndex> handle(ResultSet rs) throws SQLException {
				List<VladIndex> vladindexs=Lists.newArrayList();
				while(rs.next()){
				String id=rs.getString("id");
				String vlad=rs.getString("vlad");
				vladindexs.add(new VladIndex(id,vlad));
				}
				return vladindexs;
			}
		});
//		ByteArrayDataOutput out=ByteStreams.newDataOutput();
//		BinIO.storeDoubles(this.distances,out);
//		byte[] distancesdatas=out.toByteArray();
		AliyunOCSService.put(uuid+"distance", distances);
		for (VladIndex vladIndex : vladIndexs) {
			//System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!"+distancesdatas.length/1024);
			collector.emit(new Values(uuid.toString(),vladIndex.getId(),vladIndex.getDatas(),imgdatas));
		}
		}catch(Exception e){
			e.printStackTrace();
		}
		
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("id","imageid","datas","imagedatas"));
		
	}

}

    5.Drpcはリアルタイムの外部呼び出しと同じで、外部からのdrpc要求を受け入れ、呼び出しを入力します.デフォルトではdrpc Spoutがあります.彼は外部リンクを待っています.もしなければreturnも自分で書いてもいいです.私たちが使っているのはカスタマイズの良い方法です.ただし、毎回返信idを提出しなければならないことに注意してください.
Triendleにもdrpcがあります.
前述のidのように.
declarer.declare(new Fields("id"," 


	LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("search");
		builder.addBolt(new IndexSearchTaskBolt());
		builder.addBolt(new ImageSearchBolt(), 10).fieldsGrouping(new Fields("type"));
		builder.addBolt(new ImageSurfCompareBolt(), 30).fieldsGrouping(new Fields("imageid"));
		//builder.addBolt(new ImageTopScoreBolt()).globalGrouping();
		builder.addBolt(new ImageTopScoreBolt()).fieldsGrouping(new Fields("id"));
		return builder;
		main    
		LinearDRPCTopologyBuilder builder = construct();
		Config conf = new Config();
		if (args == null || args.length == 0) {
			conf.setMaxTaskParallelism(5);
			LocalDRPC drpc = new LocalDRPC();
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("search", conf, builder.createLocalTopology(drpc));
			try {
				for (int i = 0; i < 4; i++) {
					String result=drpc.execute("search", Base64.encodeBase64String(Files.toByteArray(new File("c:/baidu/test3.jpg"))));
					System.out.println(i+" ------------------------------------- "+result);
				}
			} catch (IOException e) {
			}
			cluster.shutdown();
			drpc.shutdown();
		}else {
			conf.put(Config.TOPOLOGY_NAME, "search");
			conf.put(Config.DRPC_PORT, 3772);
			conf.put(Config.DRPC_SERVERS,Lists.newArrayList("10.47.50.235","10.47.49.206"));
			conf.put(Config.NIMBUS_HOST,  "10.47.50.235");
			conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
			conf.setMaxSpoutPending(5000);
			conf.setNumWorkers(10);
			conf.setNumAckers(5);
			conf.put("topology.spout.max.batch.size", 1000 /* x1000 i.e. every tuple has 1000 feature vectors*/);
		    conf.put("topology.trident.batch.emit.interval.millis", 1000);
			conf.put(Config.STORM_CLUSTER_MODE, "distributed");
		    conf.put(Config.NIMBUS_TASK_TIMEOUT_SECS, 10);
			StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createRemoteTopology());
		}
		
	}

ローカル実行
    
2.Stormより高いレベルのチェーンAPI Trident
文字数が最大制限を超えているようですここで説明してカットします
   1.eachは前述のsparkのmapに類似して変換処理をカスタマイズできるのが基本的な処理である.
    
次のコードは、次の行のコードのようなコードの一部を抽出します.最初のパラメータは、使用するストリームデータのfieldを表します.
中間は処理される方法であり,最後尾の行はlengthを生成して前の既存のfieldの後ろに加算することを示す.
    
each(new Fields("word"), new StringLength(), new Fields("length"))

2.projectこれはeachが毎回新しいfieldを生成できることですが、以前のfieldデータはprojectを削除していません.projectに必要なfieldを保持します.
    
   3.Aggregateさらにby xxは、ソースコードの集約、すなわちspark reduce aggreageteのようなものを別途見ることができる
   4.partitionByパーティションは実は上のstormの基本タイプのgroupと同じで、リダイレクトストリーム
    5.簡単だよジョニーjoin(hashtags, new Fields("tweetId"), urls, new Fields("tweetId"), new Fields("tweetId", "hashtag", "url"))
それぞれストリームのjoinのフィールド、例えばid-age id-nameはid joinを通じてid name ageである
   
    5.テスト
まず例を貼り付けます
    
 private static StormTopology buildTopology() {
	        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
	                new Values("the cow jumped over the moon"),
	                new Values("the man went to the store and bought some candy"),
	                new Values("four score and seven years ago"),
	                new Values("how many apples can you eat"));
	        spout.setCycle(true);

	        TridentTopology topology = new TridentTopology();
	        topology.newStream("spout", spout)
	                //no name
	                .each(new Fields("sentence"), new Split(), new Fields("word"))
	                .partitionBy(new Fields("word"))
	                .name("abc")
	                .each(new Fields("word"), new StringLength(), new Fields("length"))
	                .partitionBy(new Fields("length"))
	                .name("def")
	                .aggregate(new Fields("length"), new Count(), new Fields("count"))
	                .partitionBy(new Fields("count"))
	                .name("ghi")
	                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
	        return topology.build();
	    }
	  public static void main(String[] args) throws Exception  {
		  StormTopology topology = buildTopology();
		  Config conf = new Config();
		  LocalCluster cluster = new LocalCluster();
		  cluster.submitTopology("search", conf, topology);
		  //StormSubmitter.submitTopology("search", conf, topology);
	}

ローカル実行結果