Mysqlインクリメンタル書き込みHdfs(二)--Storm+hdfsのフロー処理

13089 ワード

一.概要
前回はmysqlからkafkaにデータを投げ込む方法を紹介しましたが、今回はstormを利用してhdfsにデータを書き込む過程に専念しました.stormがhdfsに書き込むカスタマイズ可能なものが多いので、kafkaから読み取るのではなく、Spoutデータを自分で定義してデータソースとして使用し、次の章で統合します.ここではデフォルトでstormの知識の基礎を持っています.少なくともSpoutとboltが何なのか知っています.
hdfsへの書き込みには、次のカスタムポリシーがあります.
  • カスタム書き込みファイルの名前
  • 書き込みコンテンツフォーマット
  • を定義する.
  • 所定の条件が満たされた後に書き込むファイル
  • を変更する.
  • ファイルへの書き込み時にトリガーされるアクション
  • の変更
    この記事では、まずstormでHDFSを書き込む方法、いくつかのAPIを書き込む方法、および最後に与えられた例について説明します.
    stormは10個のTupleを受け取るたびにhdfs書き込みファイルを変更し、新しいファイルの名前は何回目に変更されます.
    ps:stormバージョン:1.1.1.Hadoopバージョン:2.7.4.
    次に、まずStormがHDFSにどのように書き込むかを見てみましょう.
    二.Storm書き込みHDFS
    Storm公式には対応するAPIが提供されており、使用できます.HDFSは、HdfsBoltを作成し、対応するルールを定義することで書き込むことができます.
    まずmavenで依存およびプラグインを構成します.
    
        
            1.1.1
        
    
        
    
            
                org.apache.storm
                storm-core
                ${storm.version}
                
                
                    
                        org.slf4j
                        log4j-over-slf4j
                    
                
            
            
                commons-collections
                commons-collections
                3.2.1
            
            
                com.google.guava
                guava
                15.0
            
    
            
            
                org.apache.hadoop
                hadoop-client
                2.7.4
                
                    
                        org.slf4j
                        slf4j-log4j12
                    
                
            
            
                org.apache.hadoop
                hadoop-hdfs
                2.7.4
                
                    
                        org.slf4j
                        slf4j-log4j12
                    
                
            
    
            
            
                org.apache.storm
                storm-hdfs
                1.1.1
                
            
    
        
    
    
        
            
                
                    org.apache.maven.plugins
                    maven-compiler-plugin
                    3.5.1
                    
                        1.8
                        1.8
                    
                
                
                    org.codehaus.mojo
                    exec-maven-plugin
                    1.2.1
                    
                        
                            
                                exec
                            
                        
                    
                    
                        java
                        true
                        false
                        compile
                        com.learningstorm.kafka.KafkaTopology
                    
                
       
                
                    org.apache.maven.plugins
                    maven-shade-plugin
                    1.7
                    
                        true
                    
                    
                        
                            package
                            
                                shade
                            
                            
                                
                                    
                                    
                                        
                                    
                                
                            
                        
                    
                
            
        
    

    ここでは、クラスタにパッケージ化する場合、パッケージ化されたプラグインはmaven-shade-pluginというプラグインを使用し、maven Lifecycleのpackageを使用してパッケージ化する必要があります.Maven-assembly-pluginプラグインでパッケージ化するのではなく、
    Maven-assembly-pluginを使用すると、依存するすべてのパッケージがunpackされ、packになるため、同じファイルが上書きされる場合があります.クラスタに公開されるとNo FileSystem for scheme:hdfsのエラーが報告されます.
    次にHdfsBoltを使用してHdfsを書き込みます.公式文書の例を見てみましょう.
    //    "|"     ",",       
    RecordFormat format = new DelimitedRecordFormat()
            .withFieldDelimiter("|");
    
    //     1k         Hdfs  
    SyncPolicy syncPolicy = new CountSyncPolicy(1000);
    
    //         5MB ,      ,           
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
    
    //        ,           
    FileNameFormat fileNameFormat = new DefaultFileNameFormat()
            .withPath("/foo/");
    
    HdfsBolt bolt = new HdfsBolt()
            .withFsUrl("hdfs://localhost:9000")
            .withFileNameFormat(fileNameFormat)
            .withRecordFormat(format)
            .withRotationPolicy(rotationPolicy)
            .withSyncPolicy(syncPolicy);
    
    //    bolt
    topologyBuilder.setBolt("hdfsBolt", bolt, 5).globalGrouping("randomStrSpout");
            

    ここまでで終わりです.HdfsBoltは1つのStormの中の特殊ないくつかのboltとすることができる.このboltの役割は,受信情報に基づいてHdfsに書き込まれてもよい.
    新しいHdfsBoltでは、Stormはかなり柔軟性を提供しています.ある条件が達成されたときに書き込みファイルを変換したり、新しい書き込みファイルの名前を変換したり、書き込み時のセパレータを定義したりすることができます.
    使用を選択すると、Stormには一部のインタフェースが用意されていますが、豊富でないと思ったら、対応するクラスをカスタマイズすることもできます.これらの戦略をどのように制御するかを見てみましょう.
    RecordFormat
    これはインタフェースで、受信したコンテンツのフォーマットを自由に定義できます.
    public interface RecordFormat extends Serializable {
        byte[] format(Tuple tuple);
    }

    StormはDelimitedRecordFormatを提供しており、使用方法は上記のとおりです.このクラスのデフォルトの分割子はカンマ「,」であり、withFieldDelimiterメソッドで区切り子を変更できます.最初の区切り文字がカンマでない場合は、クラスインプリメンテーションRecordFormatインタフェースを書き換えることもできます.
    FileNameFormat
    同じインタフェースです.
    public interface FileNameFormat extends Serializable {
        void prepare(Map conf, TopologyContext topologyContext);
        String getName(long rotation, long timeStamp);
        String getPath();
    }

    Stormが提供するデフォルトはorgである.apache.storm.hdfs.format.DefaultFileNameFormat.デフォルトで使用される変換ファイル名は少し長いです.フォーマットは次のとおりです.
    {prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
    例:
    MyBolt-5-7-1390579837830.txt
    デフォルトでは、接頭辞は空で、拡張識別子は「.txt」です.
    SyncPolicy
    同期ポリシーによりbuffered dataをHdfsファイルにバッファリング(クライアントがデータを読み出すことができる)orgを実装することができる.apache.storm.hdfs.sync.SyncPolicyインタフェース:
    public interface SyncPolicy extends Serializable {
        boolean mark(Tuple tuple, long offset);
        void reset();
    }

    FileRotationPolicy
    このインタフェースでは、書き込みファイルをどのように変換するかを制御できます.
    public interface FileRotationPolicy extends Serializable {
        boolean mark(Tuple tuple, long offset);
        void reset();
    }

    Stormには、インタフェースを実装する3つのクラスがあります.
  • 最も簡単なのは変換を行うorgである.apache.storm.hdfs.bolt.rotation.NoRotationPolicy、何もしない.
  • ファイルサイズにより変換をトリガorg.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.
  • は、時間条件により変換のorgをトリガ.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.

  • もっと複雑なニーズがあれば、自分で定義することもできます.
    RotationAction
    これは主に1つ以上のhookを提供し、追加しても追加しなくてもいいです.主に書き込みファイル変換がトリガーされると起動します.
    public interface RotationAction extends Serializable {
        void execute(FileSystem fileSystem, Path filePath) throws IOException;
    }

    三.一例を実現する
    上記の状況を理解すると、書き込み記録の数に応じて書き込み変換(書き込みファイルの変更)を制御し、変換後のファイルの名前が現在何回目の変換であるかを示す例を実現します.
    まずHdfsBoltの内容を見てみましょう.
            RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(" ");
            // sync the filesystem after every 1k tuples
            SyncPolicy syncPolicy = new CountSyncPolicy(1000);
    //        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.KB);
            /** rotate file with Date,every month create a new file
             * format:yyyymm.txt
             */
            FileRotationPolicy rotationPolicy = new CountStrRotationPolicy();
            FileNameFormat fileNameFormat = new TimesFileNameFormat().withPath("/test/");
            RotationAction action = new NewFileAction();
            HdfsBolt bolt = new HdfsBolt()
                    .withFsUrl("hdfs://127.0.0.1:9000")
                    .withFileNameFormat(fileNameFormat)
                    .withRecordFormat(format)
                    .withRotationPolicy(rotationPolicy)
                    .withSyncPolicy(syncPolicy)
                    .addRotationAction(action);

    次に、各ポリシーのクラスをそれぞれ見てみましょう.
    FileRotationPolicy
    import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    import org.apache.storm.tuple.Tuple;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     *      Hdfs       ,   10    ,       ,        “TimesFileNameFormat”
     *         
     */
    
    public class CountStrRotationPolicy implements FileRotationPolicy {
    
    
        private SimpleDateFormat df = new SimpleDateFormat("yyyyMM");
    
        private String date =  null;
    
        private int count = 0;
    
        public CountStrRotationPolicy(){
            this.date =  df.format(new Date());
    //        this.date = df.format(new Date());
        }
    
    
        /**
         * Called for every tuple the HdfsBolt executes.
         *
         * @param tuple  The tuple executed.
         * @param offset current offset of file being written
         * @return true if a file rotation should be performed
         */
        @Override
        public boolean mark(Tuple tuple, long offset) {
            count ++;
            if(count == 10) {
                System.out.print("num :" +count + "   ");
                count = 0;
                return true;
    
            }
            else {
                return false;
            }
        }
    
        /**
         * Called after the HdfsBolt rotates a file.
         */
        @Override
        public void reset() {
    
        }
    
        @Override
        public FileRotationPolicy copy() {
            return new CountStrRotationPolicy();
        }
    
    
    }

    FileNameFormat
    
    import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    import org.apache.storm.task.TopologyContext;
    
    import java.util.Map;
    
    /**
     *              
     *                ,           
     */
    public class TimesFileNameFormat implements FileNameFormat {
        //    
        private String path = "/storm";
        //    
        private String extension = ".txt";
        private Long times = new Long(0);
    
        public TimesFileNameFormat withPath(String path){
            this.path = path;
            return this;
        }
    
        @Override
        public void prepare(Map conf, TopologyContext topologyContext) {
        }
    
    
        @Override
        public String getName(long rotation, long timeStamp) {
            times ++ ;
            //     ,            
            return times.toString() + this.extension;
        }
    
        public String getPath(){
            return this.path;
        }
    }

    RotationAction
    
    import org.apache.hadoop.fs.FileContext;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.storm.hdfs.common.rotation.RotationAction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.net.URI;
    /**
                     hook ,       。
     */
    public class NewFileAction implements RotationAction {
        private static final Logger LOG = LoggerFactory.getLogger(NewFileAction.class);
    
    
    
        @Override
        public void execute(FileSystem fileSystem, Path filePath) throws IOException {
            LOG.info("Hdfs change the written file!!");
    
            return;
        }
    }

    OK、これで大成功です.上のコードで、10個のTupleを受け取るたびに書き込みファイルが変換され、新しいファイルの名前は何回目の変換です.
    完全なコードには、私のgithubで表示できるランダムに生成された文字列のSpoutが含まれています.
    StormHdfsDemo:https://github.com/shezhiming/StormHdfsDemo
    推荐阅读:Mysqlストリームインクリメンタル書込みHdfs(一)--mysqlからkafka Spark SQLまで、DataFrameをjsonフォーマットに変換する方法