Flink使用(一)-kafkaから読み出したデータをHBASEに書き込む

4839 ワード

1、はじめに
本稿では『リアルタイムヒット商品の計算方法』[1]で行った拡張であり、Flinkを利用してKafkaデータを消費し、処理後のデータをHBAseに書き込むプロセスのみを機能的に検証したが、その具体的な性能はチューニングされていない.また、Flink処理ロジックについてはあまり分析されていません.引用文(特に説明しなければ、本文の引用文はすべて「リアルタイムの人気商品をどのように計算するか」という文)に詳しく書かれているため、ブロガーのデバッグが犯した間違いだけが与えられます.文の中でもし間違いがあるならば、みんなの伝言を歓迎して指摘して、ありがとうございます!
ソースコードはGitHub、アドレス:https://github.com/L-Wg/flinkExample;
環境:Flink 1.6+Kafka 1.1+HBAse 1.2
       OpenJDK 1.8+Maven 3.5.2
2、データの取得
本稿では,Kafkaをデータソースとして(現在業界で流行している手法),データのフォーマットと引用文のフォーマットが一致し,データ型がPOJOである.ソースを追加するには、インタフェースSourceFunctionを実装するのが一般的ですが、FlinkとKafkaのリンク(connector)、Flinkコミュニティはすでに作成されており、pomファイルに対応する依存を加えるだけでいいです.ここで注目すべき点は、flink-connector-kafka-*.jarにはバージョン要件があり、その具体的な要件はFlink公式サイトconnectorのセクションに参加することができる[2].コードは次のとおりです.
DataStream dataStream=env.addSource(new FlinkKafkaConsumer010(
                topic,
                new UserBehaviorSerial(),
                properties
        ).setStartFromEarliest());

 ここで、コードで指定する必要があるのは、消費するtopic、データシーケンス化されたオブジェクト、および構成です.ここで、構成はbootstrap.serversを指定できます.その他の構成は必要に応じて設定できます.setStarFromEarliest()を呼び出すのは、Flinkがtopicのデータを最初から消費するように指定するためで、Kafka topicにデータが存在する限り、テスト時にkafkaにデータを書き直す必要はありません.もちろん、このメソッドを呼び出すのは、この役割だけでなく、ビジネス上の使用は必要に応じて行われます.また、Flinkにはkafkaを消費する方法を指定する方法が多く、詳細は公式サイト[2]を参照してください.
ここで言うべき点は、データを取得した後もdataStreamの値は変わらず、flatmapなどの操作をしたからといって変更することはありません.
3、データ変換
Flinkコードの分析手順については、以下の点について説明します.
1.kafkaのデータが自分でデータ形式でランダムに生成されている場合は、ブロガーコードのcustomWaterExtractor()クラスの書き方に従ってwatermarkとtimestampを定義しないでください.コードのcurrentTimeStampの値もランダムである可能性があるため、プログラムが間違っていないがカード死待ちの場合があります.
2.timestampの値は、データソース内のデータと同じデータレベルを維持します.
public static class customWaterExtractor implements AssignerWithPeriodicWatermarks{

        private static final long serialVersionUID = 298015256202705122L;

        private final long maxOutOrderness=3500;
        private long currentTimeStamp=Long.MIN_VALUE;

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(currentTimeStamp-maxOutOrderness);
        }

        @Override
        public long extractTimestamp(UserBehaviorSchema element, long previousElementTimestamp) {
//                  :timestamp        1000     ,           timestamp     。
            long timeStamp=element.timestamp*1000;
            currentTimeStamp=Math.max(timeStamp,currentTimeStamp);
            return timeStamp;
        }
    }

3.返される結果クラスResultEventで、sinkingフィールドを使用してHotTopNの順位を保存します.デフォルト値は0です.  
 4、データストレージ
本稿では,extends RichSinkFunctionによりHBAseへのデータ書き込みを実現しているが,@Overrideのinvoke()メソッドは各データに対して呼び出され,残りのopen(),close()メソッドは,ログから見ると各データに対して呼び出されるかどうかである.Open()メソッドでは、リンクを開くために使用されます.接続プールは、リンクが多すぎることを避けることが望ましいです.ここで、HBAseのconnectionは、単独で実装する必要はありません.
HBAseへの書き込みには、次の2つの推奨事項があります.
1.HBAseの表にデータを書き込む場合は、表の事前分割作業を先に行い、後期に表のsplitによる性能低下やメンテナンス上の困難を避けることが望ましい.
2.HBAseのクエリー速度を速めるために、作成フィールドをHBAseテーブルのrowkeyとすることができ、ここではタイムスタンプとランキングをテーブルとして指定したrowkeyであり、2次インデックスなどはここでは議論しない.
5、参考文献リンク:
  [1]http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/
  [2]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html