FlinkXはどのようにClickhouseを読み取り、書き込みますか?


概要:本文は主にFlinkXがClickhouseを読み取り、書き込む過程と関連パラメータを紹介し、核心内容は以下の3つの問題をめぐっている:1.FlinkXがClickhouseを読み書きするのはどのバージョンをサポートしますか?、2.ClickHouseの読み書きClickhouseにはどのようなパラメータがありますか?3.ClickHouseのClickhouseパラメータの読み書きにはどのような説明がありますか?
ここでは、主にFlinkXがClickhouseを読み取り、書き込むプロセスと関連パラメータについて説明します.コアコンテンツは、次の3つの問題をめぐって、FlinkXプラグインがダウンロードされます.
https://github.com/DTStack/flinkx
  • FlinkX読み書きClickhouseはどのバージョンをサポートしますか?
  • ClickHouse読み書きClickhouseにはどのようなパラメータがありますか?
  • ClickHouse読み書きClickhouseパラメータにはどのような説明がありますか?

  • ClickHouse読み込み
    一、プラグイン名
    名称:clickhousereader
    二、サポートするデータソースバージョン
    ClickHouse 19.x以上
    三、パラメータ説明
    「jdbcUrl」
  • 記述:リレーショナル・データベースのjdbc接続文字列
  • jdbcUrlリファレンスドキュメント:clickhouse-jdbc公式ドキュメント
  • 必須:
  • デフォルト:
  • なし
    「username」
  • 説明:データソースのユーザー名
  • 必須:
  • デフォルト:
  • なし
    「password」
  • 説明:データソース指定ユーザ名のパスワード
  • 必須:
  • デフォルト:
  • なし
    「where」
  • 説明:フィルタ条件、readerプラグインは指定されたcolumn、table、where条件に従ってSQLを接続し、このSQLに基づいてデータ抽出を行う.実際のビジネスシーンでは、where条件をgmt_に指定できる当日のデータを同期することがよくあります.create > time.
  • 注意:where条件をlimit 10として指定することはできません.limitはSQLの合法的なwhere句ではありません.
  • 必須:No
  • デフォルト:
  • なし
    「splitPk」
  • の説明:speed構成のchannelが1より大きい場合にこのパラメータを指定し、Readerプラグインは、コンカレント数とこのパラメータで指定されたフィールドに基づいてsqlを接続し、各コンカレントに異なるデータを読み出し、読み取り速度を向上させる.注意:splitPkはテーブルプライマリ・キーを使用することをお勧めします.テーブル・プライマリ・キーは通常均一であるため、切り分けたスライスにもデータ・ホットスポットが現れにくいからです.現在splitPkは整形データの分割のみをサポートしており、浮動小数点、文字列、日付などの他のタイプはサポートされていません.ユーザーが他のサポートされていないタイプを指定した場合、FlinkXはエラーを報告します.channelが1より大きいがこのパラメータが構成されていない場合、タスクは失敗します.
  • 必須:No
  • デフォルト:
  • なし
    「fetchSize」
  • には、読み出し時にロット毎に読み出されるデータの数が記載されている.
  • 注意:このパラメータの値を設定しすぎてはいけません.そうしないと、読み取りがタイムアウトし、タスクが失敗します.
  • 必須:No
  • デフォルト:1000
  • 「queryTimeOut」
  • の説明:クエリーのタイムアウト時間、単位秒.
  • 注意:データ量が大きい場合、またはビューからクエリーする場合、またはsqlクエリーをカスタマイズする場合、このパラメータでタイムアウト時間を指定できます.
  • 必須:No
  • デフォルト:1000
  • 「customSql」
  • の説明:カスタムクエリー文は、フィールドのみを指定して需要を満たすことができない場合、このパラメータを使用してクエリーのsqlを指定できます.任意の複雑なクエリー文です.注:クエリー文のみです.そうしないと、タスクが失敗します.クエリ文が返すフィールドはcolumnリストのフィールドと厳格に対応する必要があります.このパラメータを指定した場合、connectionで指定したtableは無効です.このパラメータを指定する場合、columnは特定のフィールド情報を指定する必要があります.*番号で置き換えることはできません.
  • 必須:No
  • デフォルト:
  • なし
    「column」
  • には、読み取りが必要なフィールドが記載されています.
  • フォーマット:3つのフォーマット
  • をサポート
    1.すべてのフィールドを読み込みます.フィールドの数が多い場合は、次の書き方を使用します.
    "column":["*"]

    2.フィールド名のみを指定します.
    "column":["id","name"]

    3.具体的な情報を指定する:
    "column": [{
        "name": "col",
        "type": "datetime",
        "format": "yyyy-MM-dd hh:mm:ss",
        "value": "value"
    }]

    属性の説明:
  • name:フィールド名
  • type:フィールドタイプ、データベース内のフィールドタイプとは異なり、プログラムは1回のタイプ変換
  • を行うことができます.
  • format:フィールドが時間文字列の場合、時間のフォーマットを指定し、フィールドタイプを日付フォーマットに変換して
  • を返します.
  • value:指定したフィールドがデータベースに存在しない場合は、エラーが発生します.指定したフィールドが存在する場合、指定したフィールドの値がnullの場合、このvalue値はデフォルト値として
  • を返します.
  • 必須:
  • デフォルト:
  • なし
    「polling」
  • では、間隔ポーリングをオンにするかどうかについて説明します.オンにすると、pollingIntervalポーリング間隔に基づいてデータベースからデータが周期的に引き寄せられます.間隔ポーリングを開くには、パラメータpollingInterval,increColumnを構成する必要があります.構成パラメータstartLocationを選択できます.パラメータstartLocationを構成しないと、タスクの開始時にデータベースからインクリメンタルフィールドの最大値がポーリングの開始位置としてクエリーされます.
  • 必須:No
  • デフォルト:false
  • 「pollingInterval」
    説明:ポーリング間隔、データベースからデータを抽出する間隔、デフォルトは5000ミリ秒です.必須:Noデフォルト:5000
    「requestAccumulatorInterval」
  • には、クエリアキュムレータ要求を送信する間隔が記載されている.
  • 必須:No
  • デフォルト:2
  • 構成例
    1、基礎配置
    {
      "job": {
        "content": [{
          "reader": {
            "parameter" : {
              "column" : [ {
                "name" : "id",
                "type" : "bigint",
                "key" : "id"
              }, {
                "name" : "user_id",
                "type" : "bigint",
                "key" : "user_id"
              }, {
                "name" : "name",
                "type" : "varchar",
                "key" : "name"
              } ],
              "username" : "username",
              "password" : "password",
              "connection" : [ {
                "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
                "table" : [ "tableTest" ]
              } ],
              "where": "id > 1",
              "splitPk": "id",
              "fetchSize": 1000,
              "queryTimeOut": 1000,
              "customSql": "",
              "requestAccumulatorInterval": 2
            },
            "name" : "clickhousereader"
          },
          "writer": {
            "name": "streamwriter",
            "parameter": {
              "print": true
            }
          }
        }],
        "setting": {
          "speed": {
            "channel": 1,
            "bytes": 0
          },
          "errorLimit": {
            "record": 100
          }
        }
      }
    }

    2、マルチチャネル
    {
      "job": {
        "content": [{
          "reader": {
            "parameter" : {
              "column" : [ {
                "name" : "id",
                "type" : "bigint",
                "key" : "id"
              }, {
                "name" : "user_id",
                "type" : "bigint",
                "key" : "user_id"
              }, {
                "name" : "name",
                "type" : "varchar",
                "key" : "name"
              } ],
              "username" : "username",
              "password" : "password",
              "connection" : [ {
                "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
                "table" : [ "tableTest" ]
              } ],
              "where": "id > 1",
              "splitPk": "id",
              "fetchSize": 1000,
              "queryTimeOut": 1000,
              "customSql": "",
              "requestAccumulatorInterval": 2
            },
            "name" : "clickhousereader"
          },
          "writer": {
            "name": "streamwriter",
            "parameter": {
              "print": true
            }
          }
        }],
        "setting": {
          "speed": {
            "channel": 3,
            "bytes": 0
          },
          "errorLimit": {
            "record": 100
          }
        }
      }
    }

    3、customSqlを指定する
    {
      "job": {
        "content": [{
          "reader": {
            "parameter" : {
              "column" : [ {
                "name" : "id",
                "type" : "bigint",
                "key" : "id"
              }, {
                "name" : "user_id",
                "type" : "bigint",
                "key" : "user_id"
              }, {
                "name" : "name",
                "type" : "varchar",
                "key" : "name"
              } ],
              "username" : "username",
              "password" : "password",
              "connection" : [ {
                "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
                "table" : [ "tableTest" ]
              } ],
              "where": "id > 1",
              "splitPk": "id",
              "fetchSize": 1000,
              "queryTimeOut": 1000,
              "customSql": "select id from tableTest",
              "requestAccumulatorInterval": 2
            },
            "name" : "clickhousereader"
          },
          "writer": {
            "name": "streamwriter",
            "parameter": {
              "print": true
            }
          }
        }],
        "setting": {
          "speed": {
            "channel": 1,
            "bytes": 0
          },
          "errorLimit": {
            "record": 100
          }
        }
      }
    }

    4、インクリメンタル同期指定startLocation
    {
      "job": {
        "content": [{
          "reader": {
            "parameter" : {
              "column" : [ {
                "name" : "id",
                "type" : "bigint",
                "key" : "id"
              }, {
                "name" : "user_id",
                "type" : "bigint",
                "key" : "user_id"
              }, {
                "name" : "name",
                "type" : "varchar",
                "key" : "name"
              } ],
              "username" : "username",
              "password" : "password",
              "connection" : [ {
                "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
                "table" : [ "tableTest" ]
              } ],
              "where": "id > 1",
              "splitPk": "id",
              "fetchSize": 1000,
              "queryTimeOut": 1000,
              "customSql": "",
              "increColumn": "id",
              "startLocation": "20",
              "requestAccumulatorInterval": 2
            },
            "name" : "clickhousereader"
          },
          "writer": {
            "name": "streamwriter",
            "parameter": {
              "print": true
            }
          }
        }],
        "setting": {
          "speed": {
            "channel": 1,
            "bytes": 0
          },
          "errorLimit": {
            "record": 100
          }
        }
      }
    }

    5、間隔ポーリング
    {
      "job": {
        "content": [{
          "reader": {
            "parameter" : {
              "column" : [ {
                "name" : "id",
                "type" : "bigint",
                "key" : "id"
              }, {
                "name" : "user_id",
                "type" : "bigint",
                "key" : "user_id"
              }, {
                "name" : "name",
                "type" : "varchar",
                "key" : "name"
              } ],
              "username" : "username",
              "password" : "password",
              "connection" : [ {
                "jdbcUrl" : [ "jdbc:clickhouse://0.0.0.1:8123/dtstack" ],
                "table" : [ "tableTest" ]
              } ],
              "where": "id > 1",
              "splitPk": "id",
              "fetchSize": 1000,
              "queryTimeOut": 1000,
              "customSql": "",
              "requestAccumulatorInterval": 2,
              "polling": true,
              "pollingInterval": 3000
            },
            "name" : "clickhousereader"
          },
          "writer": {
            "name": "streamwriter",
            "parameter": {
              "print": true
            }
          }
        }],
        "setting": {
          "speed": {
            "channel": 1,
            "bytes": 0
          },
          "errorLimit": {
            "record": 100
          }
        }
      }
    }

    ClickHouse書き込み
    一、プラグイン名
    名称:clickhousewriter
    二、サポートするデータソースバージョン
    ClickHouse 19.x以上
    三、パラメータ説明
    「jdbcUrl」
  • 記述:リレーショナル・データベースのjdbc接続文字列
  • 必須:
  • デフォルト:
  • なし
    「username」
  • 説明:データソースのユーザー名
  • 必須:
  • デフォルト:
  • なし
    「password」
  • 説明:データソース指定ユーザ名のパスワード
  • 必須:
  • デフォルト:
  • なし
    「column」
  • 説明:宛先テーブルには、英語のカンマで区切られたデータを書き込むフィールドが必要です.たとえば、「column」:["id","name","age"]です.
  • 必須:
  • デフォルト:No
  • デフォルト:
  • なし
    「preSql」
  • 説明:目的のテーブルにデータを書き込む前に、ここでの標準文
  • のセットが先に実行される
  • 必須:No
  • デフォルト:
  • なし
    「postSql」
  • 説明:目的のテーブルにデータを書き込むと、ここでの標準文
  • のセットが実行されます.
  • 必須:No
  • デフォルト:
  • なし
    「table」
  • 説明:宛先テーブルのテーブル名.現在は単一のテーブルの構成のみがサポートされており、その後、マルチテーブル
  • がサポートされます.
  • 必須:
  • デフォルト:
  • なし
    「writeMode」
  • 説明:ターゲットテーブルへのデータの書き込みを制御するにはinsert into文を採用し、insert操作
  • のみをサポートする.
  • 必須:
  • すべてのオプション:insert
  • デフォルト:insert
  • 「batchSize」
  • では、FlinkXとデータベースのネットワーク・インタラクションの回数を大幅に削減し、全体的なスループットを向上させることができる、一括コミットされたレコード数のサイズについて説明しています.ただし、この値が大きすぎると、FlinkX実行プロセスOOMが発生する可能性があります.
  • 必須:No
  • デフォルト:1024
  • 文章の出所は以下の通りで、興味のある学生は原文を見ることができます.https://www.aboutyun.com/forum.php?mod=viewthread&tid=29271
    もっとFlink技術の問題は釘群で交流できる
    テキストリンク:https://developer.aliyun.com/article/770821?
    本文の内容はアリクラウドの実名登録ユーザーが自発的に貢献し、著作権は原作者の所有に帰し、アリクラウド開発者コミュニティはその著作権を持たず、相応の法律責任も負わない.具体的なルールは「アリクラウド開発者コミュニティユーザーサービス協定」と「アリクラウド開発者コミュニティ知的財産権保護ガイドライン」を参照してください.当コミュニティに盗作の疑いのある内容があることを発見した場合、権利侵害苦情フォームに記入して通報し、確認されると、当コミュニティはすぐに権利侵害の疑いのある内容を削除します.