Binlog+FlinkCDCによるデータのリアルタイム監視

12939 ワード

一、MySqlのBinlog
1、Binlogとは
1)binlogはバイナリ・ログであり、トランザクション・セキュリティである
2)binlogはすべてのDDLとDML(データ問合せ文を除く)文を記録し、イベント形式で記録し、文の実行にかかる時間も含む
3)一般的にバイナリ・ログをオンにすると1%のパフォーマンス損失が発生します.
2、Binlog使用シーン
1)binlogでデータを復元する
2)mysqlで変更されたデータをプロジェクトで動的にリスニングする
3、Binlogオープン
1)MySQLのプロファイル(Linux:/etc/my.cnf,Windows:my.ini)で、[mysqld]ブロック設定/追加で構成を変更する
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall2019
binlog-do-db=gmall2020
binlog-do-db=gmall2021

2)mysqlの再起動
sudo systemctl restart mysqld

4、プロファイルパラメータ解析
マシンIDの設定
複数台の機械を繰り返してはいけません
server-id=1

binlogを開く
log-bin=mysql-bin

Binlog分類設定
MySQL Binlogのフォーマットは、STATEMENT、MIXED、ROWの3種類です.
コンフィギュレーションファイルでコンフィギュレーションを選択します.一般的にrowに設定されます.
binlog_format=row

3つの分類の違い:
1)statement
文レベルでbinlogは、書き込みを実行するたびに文を記録します.
rowモードに比べてスペースが節約されますが、たとえば不一致が発生する可能性があります.
​ update tt set create_date=now()
binlogログでリカバリを行うと、実行時間によって生成される可能性のあるデータが異なります.
利点:省スペース
欠点:データが一致しない可能性があります.
2)row(常用)
行レベル、binlogは、各操作後の各行のレコードの変化を記録します.
利点:データの絶対的な一貫性を維持します.sqlが何であれ、どんな関数を参照しているかにかかわらず、実行後の効果だけを記録しているからです.
欠点:大きなスペースを占有する.
3)mixed
statementのアップグレード版は、いくつかの状況によるstatementモードの不一致の問題をある程度解決しました.

関数にUUID()が含まれている場合.
       AUTO_INCREMENT         ;

       INSERT DELAYED    ;

      UDF  ;

        ROW       

利点:スペースを節約し、一定の一貫性を兼ね備えています.
欠点:まだ一部のごく一部の状況が依然として不一致をもたらすことがあります.
またstatementやmixedはbinlogのモニタリングが必要な場合に不便です.
データベースの設定
リスニングするデータベースを設定し、複数のライブラリに同時に書き込むことができます.
binlog-do-db=gmall2021
binlog-do-db=gmall2022
binlog-do-db=gmall2023

二、FlinkCDC
1、CDCとは
CDCは、Change Data Capture(変更データ取得)の略称である.核心思想は、データベースの変動(データやデータテーブルの挿入、更新、削除などを含む)を監視し、キャプチャし、これらの変更を発生した順序で完全に記録し、他のサービスの購読と消費のためにメッセージミドルウェアに書き込むことである.
2、CDCの種類
CDCは主にクエリーベースとBinlogベースの2つの方法に分けられ、この2つの違いを主に理解します.
クエリベースCDC
BinlogベースCDC
オープンソース製品
Sqoop、Kafka JDBC Source
Canal、Maxwell、Debezium
実行モード
Batch
Streaming
すべてのデータの変更をキャプチャできるかどうか
いいえ
はい
ちえんせい
こうちえん
ていちえん
データベースの圧力を増加するかどうか
はい
いいえ
3、FlinkCDC
FlinkにはDebeziumが内蔵されています
FlinkCDC 1.11リリース
Canalはbinlogデータの全量の読み取りをサポートしていませんが、FlinkCDCはこの問題を完全に回避しています.
Flinkコミュニティはflink-cdc-connectorsコンポーネントを開発しました.これはMySQL、PostgreSQLなどのデータベースから全量のデータと増分変更データを直接読み取ることができるsourceコンポーネントです.現在もオープンソース、オープンソースアドレス:https://github.com/ververica/...
3.CDC実例実操
1)依存のインポート

    
        org.apache.flink
        flink-java
        1.12.0
    

    
        org.apache.flink
        flink-streaming-java_2.12
        1.12.0
    

    
        org.apache.flink
        flink-clients_2.12
        1.12.0
    

    
        org.apache.hadoop
        hadoop-client
        3.1.3
    

    
        mysql
        mysql-connector-java
        5.1.49
    

    
        com.alibaba.ververica
        flink-connector-mysql-cdc
        1.2.0
    


        com.alibaba
        fastjson
        1.2.75
    



    
        
            org.apache.maven.plugins
            maven-assembly-plugin
            3.0.0
            
                
                    jar-with-dependencies
                
            
            
                
                    make-assembly
                    package
                    
                        single
                    
                
            
        
    


2)コード作成
package com.haoziqi;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

/**************************************************************
 * @Author: haoziqi
 * @Date: Created in 9:27 2021/3/15
 * @Description: TODO   DataStream  mysql,                    :flink  mysql  binlog  
 *                  
 *   linux :sudo vim /etc/my.cnf
 *   2、         hdfs
 *   3、  mysql,
 *
 **************************************************************/
public class FlinkCDC1 {
    private static Properties properties;

    public static void main(String[] args) throws Exception {
        //TODO 1.         
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //1.1Checkpoint  
/*    binlog    ,      ,           。        (   )。       (    )。    :  binlog       ,        (     )    。 flink           Checkpoint 。  Checkpoint   flink   (  )        Checkpoint */
        env.enableCheckpointing(5000L);//5s    Checkpoint
        //  Checkpoint   :    
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //           checkpoint。           CK  ,       。           checkpoint    。RETAIN_ON_CANCELLATION         ,       checkpoint。          ,        RETAIN
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //        :           ,      Integer    ,      1s
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
        //         jobManager。     yarn    ,jobManager          ,    jobManager   。            。    checkpoint,           。              :hdfs 
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flink/ck/"));
        //    
        System.setProperty("HADOOP_USER_NAME", "atguigu");



        //TODO 2.  mysql       MySQL      
        Properties properties = new Properties(); //                    
        DebeziumSourceFunction sourceFunction = MySQLSource.builder() //  builder  MySQLsource  ,         。
                .hostname("hadoop102") //          (MySQL     )
                .port(3306) //MySQL      
                .username("root") //  
                .password("123456")//  
                .databaseList("gmall_flink_0923") //list:       
                .tableList("gmall_flink_0923.z_user_info") //             ,    【  .  】
                //debezium        。           
                //.debeziumProperties(properties)
                .deserializer(new StringDebeziumDeserializationSchema()) //     binlog  ,     ,    
                .startupOptions(StartupOptions.initial()) //     :              。initial(  +     )、latest-offset(     )。timestamp(                )
                .build();

        DataStreamSource streamSource = env.addSource(sourceFunction);

        //TODO 3.    
        streamSource.print();
        //        ,    

        SingleOutputStreamOperator map = streamSource.map(data -> data);
        SingleOutputStreamOperator slotgroup = map.slotSharingGroup("123");

        slotgroup.print();



        //TODO 4.    

        env.execute();
    }
}

3)ケーススタディ:
1)依存するjarパッケージにパッケージ化
2)MySQLbinlogをオンにしてMysqlを再起動する
4)HDFSクラスタ+yarnの起動
start-yarn.sh
start-dfs.sh

5)起動プログラム(yarnベースpre-jobモード)
bin/flink run -t yarn-per-job -c com.haoziqi.FlinkCDC1 flink-1.0-SNAPSHOT-jar-with-dependencies.jar

6)MySQLでのgmall-flink.z_user_infoテーブルでのデータの追加、変更または削除
7)コンソールで出力を表示する
4)CDCデータフォーマット変換(必見)
上記のデータ収集により、SourceRecord形式のデータが得られました.
SourceRecord{
    sourcePartition={server=mysql_binlog_source},
    sourceOffset={ts_sec=1616030398, file=mysql-bin.000009, pos=519, row=1, server_id=1, event=2}
    }
ConnectRecord{
    topic='mysql_binlog_source.gmall_flink_0923.z_user_info', kafkaPartition=null, key=Struct{id=8}, 

    keySchema=Schema{
    mysql_binlog_source.gmall_flink_0923.z_user_info.Key:STRUCT
    }, 

value=Struct{
        before=Struct{id=8,name=haoziqi},
        after=Struct{id=8,name=haoziqi,phone_num=123456},
        source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1616030398000,db=gmall_flink_0923,table=z_user_info,server_id=1,file=mysql-bin.000009,pos=675,row=0,thread=2
        },op=u,ts_ms=1616030399282
        },
    valueSchema=Schema{
        mysql_binlog_source.gmall_flink_0923.z_user_info.Envelope:STRUCT
        
        },
     timestamp=null, headers=ConnectHeaders(headers=)
}

上記で取得したデータでは、更新後のデータを取得するだけで、次のコードを使用してデータをフィルタできます.
更新されたデータはvalueのStructでafterと表記される
package com.haoziqi.app.func;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

/**
 * description
 * created by A on 2021/3/15
 */
//  DebeziumDeserializationSchema            
public class MyDeserializationSchemaFunction implements DebeziumDeserializationSchema {
   @Override
public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
    //  JSON              
 JSONObject result = new JSONObject();
    //       
 String topic = sourceRecord.topic();
    String[] split = topic.split(".");
    String database = split[1];
    String table = split[2];
    //      
 Envelope.Operation operation = Envelope.operationFor(sourceRecord);
    //      
 Struct struct = (Struct) sourceRecord.value();
    Struct after = struct.getStruct("after");
    JSONObject value = new JSONObject();
    if (after != null) {
        Schema schema = after.schema();
        for (Field field : schema.fields()) {
            value.put(field.name(), after.get(field.name()));
        }
    }
    //     JSON  
 result.put("database", database);
    result.put("table", table);
    String type = operation.toString().toLowerCase();
    if ("create".equals(type)) {
        type = "insert";
    }
    result.put("type", type);
    result.put("data", value);
    //       
 collector.collect(result.toJSONString());
}
@Override
public TypeInformation getProducedType() {
    return TypeInformation.of(String.class);
}
}

フォーマット変換クラスを作成したら、Flinkオブジェクトの構築時に.deserializerパラメータを設定すればよい
  DebeziumSourceFunction sourceFunction = MySQLSource.builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("gmall_flink_0923")
                .deserializer(new MyDeserializationSchemaFunction())
                .startupOptions(StartupOptions.latest())
                .build();
        DataStreamSource mySqlDS = env.addSource(sourceFunction);

これで、CDCで取得したSourceRecord形式をJSON文字列に変換することに成功しました.