CDCの迅速な使用:Lakesoulからの新しいデモを簡単に環境を設定することができます


変更データキャプチャ(CDC)は、データ同期、データ配布、およびデータ収集に適用され、データベース内のデータの変更をキャプチャするために使用されるデータベース指向の技術です.前者はオフラインであり、オフラインスケジューリングを通して問い合わせることができ、テーブルは、データの一貫性とリアルタイム性能を保証することができないクエリを通じて最新のデータを取得するために他のシステムに同期されます.データは、クエリプロセスで複数回変更することができます.Lakesoul(24679152 CDC技術)はログベースのCDCタイプで、データの整合性とリアルタイム性を確保するために消費ログを実装することができます.
数日前、* *[ Lakesoul ](https://github.com/meta-soul/LakeSoul)'s)はギタブにデモをアップロードしました.MySQLとOracleのような関係データベースの追加、削除、および変更の操作は、CDCを介してLakesoulにアクセスすることができますし、リアルタイムに格納されます.以下のようにします.完全なフレームワークを構築した後、システムは、リアルタイムでデータを追加、削除、変更することができます.[upsert](https://github.com/meta-soul/LakeSoul)使用時に必要です.
下のデモを見ましょう.または、Lakesoul(https://github.com/meta-soul/LakeSoul/wiki/03.-Usage-Doc#311-code-examples)でそれをチェックしてください.
KAKAKAにCDCストリームを書き、Lakesoul(既にサポートされている)に変換して書き込むためにスパークストリーミングを使用します2 ) linkファイルに直接リンクを書き込む.
このデモでは、Lakesoulチームは最初の方法を示しました.彼らは、MySQLのインスタンスを設定し、スクリプトを使用してDBの変更を生成し、Kafkaにし、Lakesoulにそれらを同期するdebeziumを使用します.
1 .セットアップ
1.1データベースとテーブルを作成する
Create database cdc;
CREATE TABLE test(
 id int primary key,
 rangeid int,
 value varchar(100) 
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
** CDCベンチマークジェネレータを使用します.
*
我々は、テストとベンチマークCDCの同期のためのMySQLデータジェネレータを提供します.発電機はdiretoryの下にある
examples/cdc_ingestion_debezium/MysqlBenchmark.
1 . mysqlcdcを変更します.必要に応じて
user=user name of mysql
 passwd=password of mysql
 host=host of mysql
 port=port of mysql
2 .データをテーブルに挿入する
# Inside () are comments of parameters, remove them before execution
 bash MysqlCdcBenchmark.sh  insert  cdc(db name) test(table name) 10(lines to insert) 1(thread number)
テーブルへのデータの更新
bash MysqlCdcBenchmark.sh  update  cdc test id(primary key) value(column to update) 10(lines to update) 
テーブルからデータを削除
bash mysqlccDocXmark.sh delete cdc test 10 (削除する行)
https://github.com/meta-soul/LakeSoul/tree/main/examples/cdc_ingestion_debezium
kubectl create -f install/cluster-operator -n my-cluster-operator-namespace
kubectl apply -f examples/kafka/kafka-persistent-single.yaml
3 . debeziumを設定します(既に持っている場合は無視してください).
debeziumをインストールする3.1
K 8 SのDeBeziumの実行中のコンテナをすばやく設定するには、次の手順に従います.
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: dbz-pod-claim
spec:
  accessModes:
    - ReadWriteOnce
  # replace to actual StorageClass in your cluster
  storageClassName: 
  resources:
    requests:
      storage: 10Gi
---
apiVersion: v1
kind: Pod
metadata:
  name: dbz-pod
  namespace: dmetasoul
spec:
  restartPolicy: Never
  containers:
  - name: dbs
    image: debezium/connect:latest
    env:
      - name: BOOTSTRAP_SERVERS
        # replace to actual kafka host
        value: ${kafka_host}:9092
      - name: GROUP_ID
        value: "1"
      - name: CONFIG_STORAGE_TOPIC
        value: my_connect_configs
      - name: OFFSET_STORAGE_TOPIC
        value: my_connect_offsets
      - name: STATUS_STORAGE_TOPIC
        value: my_connect_statuses
    resources:
      requests:
        cpu: 500m
        memory: 4Gi
      limits:
        cpu: 4
        memory: 8Gi
    volumeMounts:
      - mountPath: "/kafka/data"
        name: dbz-pv-storage

  volumes:
    - name: dbz-pv-storage
      persistentVolumeClaim:
        claimName: dbz-pod-claim
次に、このYAMLファイルを適用します.
kubectl apply -f pod.yaml
3.2セットアップdebezium同期タスク
# remember to replace {dbzhost} to actual dbz deployment ip address
# replace database parameters accordingly
curl -X POST http://{dbzhost}:8083/connectors/ -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{
    "name": "cdc",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "tasks.max": "1",
        "database.hostname": "mysqlhost",
        "database.port": "mysqlport",
        "database.user": "mysqluser",
        "database.password": "mysqlpassword",
        "database.server.id": "1",
        "database.server.name": "cdcserver",
        "database.include.list": "cdc",
        "database.history.kafka.bootstrap.servers": "kafkahost:9092",
        "database.history.kafka.topic": "schema-changes.cdc",
        "decimal.handling.mode": "double",
        "table.include.list":"cdc.test" 
    }
}'
次に、同期タスクがうまく作成されているかどうかを確認します.
curl -H "Accept:application/json" dbzhost:8083 -X GET http://dbzhost:8083/connectors/
テスト終了後、同期タスクを削除できます.
curl -i  -X DELETE http://dbzhost:8083/connectors/cdc
スタートスパークシンクシンク
4.1設定
Lakesoulを設定し、環境をスパークする方法をクイックスタートを参照してください.
4.2スタートスパークシェル
スパークシェルはKAFKA依存関係から始めなければなりません.
> ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf spark.dmetasoul.lakesoul.meta.host=localhost --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.dmetasoul.lakesoul.meta.database.name=test_lakesoul_meta --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog
4.3 Lakesoul表を作成する
MySQLCDCTestという名前のLakeSoulテーブルを作成します.これはMySQLテーブルと同期します.LakeSoulテーブルには主キーIDもあり、CDC OPSを表す余分なフィールドopが必要です.
インポートCOM.ダメタアスレイクスル表.レイクテーブル
>val path="/opt/spark/cdctest"
>val data=Seq((1L,1L,"hello world","insert")).toDF("id","rangeid","value","op")
>LakeSoulTable.createTable(data, path).shortTableName("cdc").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "op").create()
> 5.4 Start spark streaming to sync Debezium CDC data into LakeSoul
> import com.dmetasoul.lakesoul.tables.LakeSoulTable
> val path="/opt/spark/cdctest"
> val lakeSoulTable = LakeSoulTable.forPath(path)
> var strList = List.empty[String]
> //js1 is just a fake data to help generate the schema
> val js1 = """{
>           |  "before": {
>           |    "id": 2,
>           |    "rangeid": 2,
>           |    "value": "sms"
>           |  },
>           |  "after": {
>           |    "id": 2,
>           |    "rangeid": 2,
>           |    "value": "sms"
>           |  },
>           |  "source": {
>           |    "version": "1.8.0.Final",
>           |    "connector": "mysql",
>           |    "name": "cdcserver",
>           |    "ts_ms": 1644461444000,
>           |    "snapshot": "false",
>           |    "db": "cdc",
>           |    "sequence": null,
>           |    "table": "sms",
>           |    "server_id": 529210004,
>           |    "gtid": "de525a81-57f6-11ec-9b60-fa163e692542:1621099",
>           |    "file": "binlog.000033",
>           |    "pos": 54831329,
>           |    "row": 0,
>           |    "thread": null,
>           |    "query": null
>           |  },
>           |  "op": "c",
>           |  "ts_ms": 1644461444777,
>           |  "transaction": null
>           |}""".stripMargin
> strList = strList :+ js1
> val rddData = spark.sparkContext.parallelize(strList)
> val resultDF = spark.read.json(rddData)
> val sche = resultDF.schema
> import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
> // Specify kafka settings
> val kfdf = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "kafkahost:9092")
>   .option("subscribe", "cdcserver.cdc.test")
>   .option("startingOffsets", "latest")
>   .load()
> // parse CDC json from debezium, and transform `op` field into one of 'insert', 'update', 'delete' into LakeSoul
> val kfdfdata = kfdf
>   .selectExpr("CAST(value AS STRING) as value")
>   .withColumn("payload", from_json($"value", sche))
>   .filter("value is not null")
>   .drop("value")
>   .select("payload.after", "payload.before", "payload.op")
>   .withColumn(
>     "op",
>     when($"op" === "c", "insert")
>       .when($"op" === "u", "update")
>       .when($"op" === "d", "delete")
>       .otherwise("unknown")
>   )
>   .withColumn(
>     "data",
>     when($"op" === "insert" || $"op" === "update", $"after")
>       .when($"op" === "delete", $"before")
>   )
>   .drop($"after")
>   .drop($"before")
>   .select("data.*", "op")
> // upsert into LakeSoul with microbatch
> kfdfdata.writeStream
>   .foreachBatch { (batchDF: DataFrame, _: Long) =>
>     {
>       lakeSoulTable.upsert(batchDF)
>       batchDF.show
>     }
>   }
>   .start()
>   .awaitTermination()
4.5同期データを表示するには、Lakesoulから読みます.
import com.dmetasoul.lakesoul.tables.LakeSoulTable
val path="/opt/spark/cdctest"
val lakeSoulTable = LakeSoulTable.forPath(path)
lakeSoulTable.toDF.select("*").show()
これはすぐにCDCを使用して環境を設定するための非常に詳細なデモです.次に、flink cdc、lakesoul - cdc、debezium、datax、やかんなどのオープンソースのCDCソリューションを比較します.