CDCの迅速な使用:Lakesoulからの新しいデモを簡単に環境を設定することができます
11149 ワード
変更データキャプチャ(CDC)は、データ同期、データ配布、およびデータ収集に適用され、データベース内のデータの変更をキャプチャするために使用されるデータベース指向の技術です.前者はオフラインであり、オフラインスケジューリングを通して問い合わせることができ、テーブルは、データの一貫性とリアルタイム性能を保証することができないクエリを通じて最新のデータを取得するために他のシステムに同期されます.データは、クエリプロセスで複数回変更することができます.Lakesoul(24679152 CDC技術)はログベースのCDCタイプで、データの整合性とリアルタイム性を確保するために消費ログを実装することができます.
数日前、* *[ Lakesoul ](https://github.com/meta-soul/LakeSoul)'s)はギタブにデモをアップロードしました.MySQLとOracleのような関係データベースの追加、削除、および変更の操作は、CDCを介してLakesoulにアクセスすることができますし、リアルタイムに格納されます.以下のようにします.完全なフレームワークを構築した後、システムは、リアルタイムでデータを追加、削除、変更することができます.[upsert](https://github.com/meta-soul/LakeSoul)使用時に必要です.
下のデモを見ましょう.または、Lakesoul(https://github.com/meta-soul/LakeSoul/wiki/03.-Usage-Doc#311-code-examples)でそれをチェックしてください.
KAKAKAにCDCストリームを書き、Lakesoul(既にサポートされている)に変換して書き込むためにスパークストリーミングを使用します2 ) linkファイルに直接リンクを書き込む.
このデモでは、Lakesoulチームは最初の方法を示しました.彼らは、MySQLのインスタンスを設定し、スクリプトを使用してDBの変更を生成し、Kafkaにし、Lakesoulにそれらを同期するdebeziumを使用します.
1 .セットアップ
1.1データベースとテーブルを作成する
*
我々は、テストとベンチマークCDCの同期のためのMySQLデータジェネレータを提供します.発電機はdiretoryの下にある
bash mysqlccDocXmark.sh delete cdc test 10 (削除する行)
https://github.com/meta-soul/LakeSoul/tree/main/examples/cdc_ingestion_debezium
debeziumをインストールする3.1
K 8 SのDeBeziumの実行中のコンテナをすばやく設定するには、次の手順に従います.
4.1設定
Lakesoulを設定し、環境をスパークする方法をクイックスタートを参照してください.
4.2スタートスパークシェル
スパークシェルはKAFKA依存関係から始めなければなりません.
MySQLCDCTestという名前のLakeSoulテーブルを作成します.これはMySQLテーブルと同期します.LakeSoulテーブルには主キーIDもあり、CDC OPSを表す余分なフィールドopが必要です.
インポートCOM.ダメタアスレイクスル表.レイクテーブル
数日前、* *[ Lakesoul ](https://github.com/meta-soul/LakeSoul)'s)はギタブにデモをアップロードしました.MySQLとOracleのような関係データベースの追加、削除、および変更の操作は、CDCを介してLakesoulにアクセスすることができますし、リアルタイムに格納されます.以下のようにします.完全なフレームワークを構築した後、システムは、リアルタイムでデータを追加、削除、変更することができます.[upsert](https://github.com/meta-soul/LakeSoul)使用時に必要です.
下のデモを見ましょう.または、Lakesoul(https://github.com/meta-soul/LakeSoul/wiki/03.-Usage-Doc#311-code-examples)でそれをチェックしてください.
KAKAKAにCDCストリームを書き、Lakesoul(既にサポートされている)に変換して書き込むためにスパークストリーミングを使用します2 ) linkファイルに直接リンクを書き込む.
このデモでは、Lakesoulチームは最初の方法を示しました.彼らは、MySQLのインスタンスを設定し、スクリプトを使用してDBの変更を生成し、Kafkaにし、Lakesoulにそれらを同期するdebeziumを使用します.
1 .セットアップ
1.1データベースとテーブルを作成する
Create database cdc;
CREATE TABLE test(
id int primary key,
rangeid int,
value varchar(100)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
** CDCベンチマークジェネレータを使用します.*
我々は、テストとベンチマークCDCの同期のためのMySQLデータジェネレータを提供します.発電機はdiretoryの下にある
examples/cdc_ingestion_debezium/MysqlBenchmark.
1 . mysqlcdcを変更します.必要に応じてuser=user name of mysql
passwd=password of mysql
host=host of mysql
port=port of mysql
2 .データをテーブルに挿入する# Inside () are comments of parameters, remove them before execution
bash MysqlCdcBenchmark.sh insert cdc(db name) test(table name) 10(lines to insert) 1(thread number)
テーブルへのデータの更新bash MysqlCdcBenchmark.sh update cdc test id(primary key) value(column to update) 10(lines to update)
テーブルからデータを削除bash mysqlccDocXmark.sh delete cdc test 10 (削除する行)
https://github.com/meta-soul/LakeSoul/tree/main/examples/cdc_ingestion_debezium
kubectl create -f install/cluster-operator -n my-cluster-operator-namespace
kubectl apply -f examples/kafka/kafka-persistent-single.yaml
3 . debeziumを設定します(既に持っている場合は無視してください).debeziumをインストールする3.1
K 8 SのDeBeziumの実行中のコンテナをすばやく設定するには、次の手順に従います.
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: dbz-pod-claim
spec:
accessModes:
- ReadWriteOnce
# replace to actual StorageClass in your cluster
storageClassName:
resources:
requests:
storage: 10Gi
---
apiVersion: v1
kind: Pod
metadata:
name: dbz-pod
namespace: dmetasoul
spec:
restartPolicy: Never
containers:
- name: dbs
image: debezium/connect:latest
env:
- name: BOOTSTRAP_SERVERS
# replace to actual kafka host
value: ${kafka_host}:9092
- name: GROUP_ID
value: "1"
- name: CONFIG_STORAGE_TOPIC
value: my_connect_configs
- name: OFFSET_STORAGE_TOPIC
value: my_connect_offsets
- name: STATUS_STORAGE_TOPIC
value: my_connect_statuses
resources:
requests:
cpu: 500m
memory: 4Gi
limits:
cpu: 4
memory: 8Gi
volumeMounts:
- mountPath: "/kafka/data"
name: dbz-pv-storage
volumes:
- name: dbz-pv-storage
persistentVolumeClaim:
claimName: dbz-pod-claim
次に、このYAMLファイルを適用します.kubectl apply -f pod.yaml
3.2セットアップdebezium同期タスク# remember to replace {dbzhost} to actual dbz deployment ip address
# replace database parameters accordingly
curl -X POST http://{dbzhost}:8083/connectors/ -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{
"name": "cdc",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"tasks.max": "1",
"database.hostname": "mysqlhost",
"database.port": "mysqlport",
"database.user": "mysqluser",
"database.password": "mysqlpassword",
"database.server.id": "1",
"database.server.name": "cdcserver",
"database.include.list": "cdc",
"database.history.kafka.bootstrap.servers": "kafkahost:9092",
"database.history.kafka.topic": "schema-changes.cdc",
"decimal.handling.mode": "double",
"table.include.list":"cdc.test"
}
}'
次に、同期タスクがうまく作成されているかどうかを確認します.curl -H "Accept:application/json" dbzhost:8083 -X GET http://dbzhost:8083/connectors/
テスト終了後、同期タスクを削除できます.curl -i -X DELETE http://dbzhost:8083/connectors/cdc
スタートスパークシンクシンク4.1設定
Lakesoulを設定し、環境をスパークする方法をクイックスタートを参照してください.
4.2スタートスパークシェル
スパークシェルはKAFKA依存関係から始めなければなりません.
> ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 --conf spark.dmetasoul.lakesoul.meta.host=localhost --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.dmetasoul.lakesoul.meta.database.name=test_lakesoul_meta --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog
4.3 Lakesoul表を作成するMySQLCDCTestという名前のLakeSoulテーブルを作成します.これはMySQLテーブルと同期します.LakeSoulテーブルには主キーIDもあり、CDC OPSを表す余分なフィールドopが必要です.
インポートCOM.ダメタアスレイクスル表.レイクテーブル
>val path="/opt/spark/cdctest"
>val data=Seq((1L,1L,"hello world","insert")).toDF("id","rangeid","value","op")
>LakeSoulTable.createTable(data, path).shortTableName("cdc").hashPartitions("id").hashBucketNum(2).rangePartitions("rangeid").tableProperty("lakesoul_cdc_change_column" -> "op").create()
> 5.4 Start spark streaming to sync Debezium CDC data into LakeSoul
> import com.dmetasoul.lakesoul.tables.LakeSoulTable
> val path="/opt/spark/cdctest"
> val lakeSoulTable = LakeSoulTable.forPath(path)
> var strList = List.empty[String]
> //js1 is just a fake data to help generate the schema
> val js1 = """{
> | "before": {
> | "id": 2,
> | "rangeid": 2,
> | "value": "sms"
> | },
> | "after": {
> | "id": 2,
> | "rangeid": 2,
> | "value": "sms"
> | },
> | "source": {
> | "version": "1.8.0.Final",
> | "connector": "mysql",
> | "name": "cdcserver",
> | "ts_ms": 1644461444000,
> | "snapshot": "false",
> | "db": "cdc",
> | "sequence": null,
> | "table": "sms",
> | "server_id": 529210004,
> | "gtid": "de525a81-57f6-11ec-9b60-fa163e692542:1621099",
> | "file": "binlog.000033",
> | "pos": 54831329,
> | "row": 0,
> | "thread": null,
> | "query": null
> | },
> | "op": "c",
> | "ts_ms": 1644461444777,
> | "transaction": null
> |}""".stripMargin
> strList = strList :+ js1
> val rddData = spark.sparkContext.parallelize(strList)
> val resultDF = spark.read.json(rddData)
> val sche = resultDF.schema
> import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
> // Specify kafka settings
> val kfdf = spark.readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "kafkahost:9092")
> .option("subscribe", "cdcserver.cdc.test")
> .option("startingOffsets", "latest")
> .load()
> // parse CDC json from debezium, and transform `op` field into one of 'insert', 'update', 'delete' into LakeSoul
> val kfdfdata = kfdf
> .selectExpr("CAST(value AS STRING) as value")
> .withColumn("payload", from_json($"value", sche))
> .filter("value is not null")
> .drop("value")
> .select("payload.after", "payload.before", "payload.op")
> .withColumn(
> "op",
> when($"op" === "c", "insert")
> .when($"op" === "u", "update")
> .when($"op" === "d", "delete")
> .otherwise("unknown")
> )
> .withColumn(
> "data",
> when($"op" === "insert" || $"op" === "update", $"after")
> .when($"op" === "delete", $"before")
> )
> .drop($"after")
> .drop($"before")
> .select("data.*", "op")
> // upsert into LakeSoul with microbatch
> kfdfdata.writeStream
> .foreachBatch { (batchDF: DataFrame, _: Long) =>
> {
> lakeSoulTable.upsert(batchDF)
> batchDF.show
> }
> }
> .start()
> .awaitTermination()
4.5同期データを表示するには、Lakesoulから読みます.import com.dmetasoul.lakesoul.tables.LakeSoulTable
val path="/opt/spark/cdctest"
val lakeSoulTable = LakeSoulTable.forPath(path)
lakeSoulTable.toDF.select("*").show()
これはすぐにCDCを使用して環境を設定するための非常に詳細なデモです.次に、flink cdc、lakesoul - cdc、debezium、datax、やかんなどのオープンソースのCDCソリューションを比較します.Reference
この問題について(CDCの迅速な使用:Lakesoulからの新しいデモを簡単に環境を設定することができます), 我々は、より多くの情報をここで見つけました https://dev.to/qazmkop/quick-use-of-cdc-a-new-demo-from-lakesoul-makes-it-easier-to-set-up-the-environment-o24テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol