JdbcSinkの概要

7820 ワード

1、JdbcSink
  DataStreamはJdbcのSink出力を増加させるために使用され、主に2つのインタフェース:sink()とexactlyOnceSink()である.ここでexactlyOnceSink()は、トランザクションをサポートする13バージョンのインタフェースで、今回はsink()インタフェースについて説明します.
public static  SinkFunction sink(
        String sql,
        JdbcStatementBuilder statementBuilder,
        JdbcExecutionOptions executionOptions,
        JdbcConnectionOptions connectionOptions) {
    return new GenericJdbcSinkFunction<>(
            new JdbcBatchingOutputFormat<>(
                    new SimpleJdbcConnectionProvider(connectionOptions),
                    executionOptions,
                    context -> {
                        Preconditions.checkState(
                                !context.getExecutionConfig().isObjectReuseEnabled(),
                                "objects can not be reused with JDBC sink function");
                        return JdbcBatchStatementExecutor.simple(
                                sql, statementBuilder, Function.identity());
                    },
                    JdbcBatchingOutputFormat.RecordExtractor.identity()));
}

1.1、パラメータ
  インタフェースには4つのパラメータがあり、そのうち3番目のパラメータexecutionOptionsはデフォルト値の使用を省略することができ、具体的なサンプルは1、JdbcSink方式を参照する.
  • sqlStringタイプ、SQL文テンプレートは、通常使用されるPreparedStatementのような形式です.例えば、insert into wordcount(wordcl,countcl)values(??)
  • statementBuilderJdbcStatementBuilderタイプは、ストリームデータとSQLの具体的な列の対応を完了する役割を果たし、前のパラメータのPreparedStatement形式に基づいて、対応関係
  • を完了する
  • executionOptionsFlink Jdbc出力の実行規則は、主に実行トリガメカニズムを設定し、主に3つのパラメータを設定します:データ量トリガしきい値、時間トリガしきい値、最大再試行回数.ここで、データ量トリガはデフォルトで5000、時間トリガはデフォルトで0、すなわちオフ時間トリガです.注意トリガしきい値を設定しすぎないでください.そうしないと、データベースがブロックされる可能性があります.
  • connectionOptionsJdbcConnectionOptionsタイプUrl、Driver、Username、Passwordなどの
  • を含むデータベース接続プロパティの設定
    1.2、戻る
      インタフェースはSinkFunctionに基づいて実装されたGenericJdbcSinkFunctionクラスを返し、そのコアはパラメータJdbcBatchingOutputFormatである.
      GenericJdbcSinkFunctionの結果コアメソッドは以下の通りであり,いずれもJdbcBatchingOutputFormatに基づく動作である.
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        RuntimeContext ctx = getRuntimeContext();
        outputFormat.setRuntimeContext(ctx);
        outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
    }
    
    @Override
    public void invoke(T value, Context context) throws IOException {
        outputFormat.writeRecord(value);
    }
    
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        outputFormat.flush();
    }

    2、JdbcBatchingOutputFormat
     JdbcBatchingOutputFormatは、Jdbcインタラクションを行う実装クラスであり、Jdbcへ出力する前にデータ集約を行う
    2.1、パラメータ
      インタフェースには4つのパラメータがあります
  • JdbcConnectionProvider提供Jdbc接続
  • JdbcExecutionOptions実行パラメータ
  • S t e m e n t ExecutorFactoryStatement実行ファクトリ、すなわちストリームデータとデータベースフィールドとの関係の処理
  • RecordExtractorデータ抽出の実行クラス
  • 2.2、open方法
      Openメソッドは,データベース接続の初期化および前期準備を行うインタフェースであり,呼び出し関係がある.
    Task.doRun()
        ->invokable.invoke()->DataSinkTask.invoke()
            ->format.open()->JdbcBatchingOutputFormat.open()

    2.2.1、接続データベース
     Openメソッドの最初のステップは、データベースへの接続です.上位メソッドAbstractJdbcOutputFormatのopenメソッドを呼び出します.その後、JdbcConnectionProviderの実装クラスSimpleJdbcConnectionProviderのgetOrEstablishConnection()メソッドを呼び出して接続を確立します.getOrEstablishConnectionの具体的な操作は次のとおりです.
    public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
        if (connection != null) {
            return connection;
        }
        if (jdbcOptions.getDriverName() == null) {
            connection =
                    DriverManager.getConnection(
                            jdbcOptions.getDbURL(),
                            jdbcOptions.getUsername().orElse(null),
                            jdbcOptions.getPassword().orElse(null));
        } else {
            Driver driver = getLoadedDriver();
            Properties info = new Properties();
            jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));
            jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));
            connection = driver.connect(jdbcOptions.getDbURL(), info);
            if (connection == null) {
                // Throw same exception as DriverManager.getConnection when no driver found to match
                // caller expectation.
                throw new SQLException(
                        "No suitable driver found for " + jdbcOptions.getDbURL(), "08001");
            }
        }
        return connection;
    }

      ここでは、Driveが設定されているかどうかによって2つの処理があります.設定されていない場合は、設定されたURLに基づいて自動的に解析され、JavaのDriverManagerが使用されます.このクラスはJdbcドライバを管理するために使用されます.DriverManagerはclasspathのDriverを自動的に認識し、URLに基づいてペアリングを自動的に解析できます.Driverが設定されている場合は、このDriverを直接ロードして接続処理を行います.
    2.2.2、JdbcExec
     これはStatementExecutorFactoryに基づいて作成され、ここで最後に使用される実装クラスはJdbcBatchStatementExecutorでsink()インタフェースで設定されています.このステップの実際の操作はprepareStatementsを作ることです
    @Override
    public void prepareStatements(Connection connection) throws SQLException {
        this.st = connection.prepareStatement(sql);
    }

    2.2.3、scheduler
      データベースのパフォーマンスは限られているため、Flink書き込みデータベースは通常バッチ方式を採用しています.ここでは時間スケジューリングを設定します.具体的なパラメータは第1章を参照してください.2つの特殊な構成値に注意してください.時間が0または1の場合、このスケジューラは作成されません.
    if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {

    ここで作成したスケジューリングスレッドプールには、1つのスレッドしか含まれていません.
    this.scheduler =
            Executors.newScheduledThreadPool(
                    1, new ExecutorThreadFactory("jdbc-upsert-output-format"))

    スケジューラが最終的に実行する操作は、クラス全体で最大の点であり、flushデータはデータベースに送信されます.
    synchronized (JdbcBatchingOutputFormat.this) {
        if (!closed) {
            try {
                flush();
            } catch (Exception e) {
                flushException = e;
            }
        }
    }

    2.3、writeRecord方法
      writeRecordはクラスのコアメソッドであり,データの書き込みを行う.主に2つの操作を行い、データをリストに追加し、条件に達したときにflushをデータベースに追加します.
    try {
        addToBatch(record, jdbcRecordExtractor.apply(record));
        batchCount++;
        if (executionOptions.getBatchSize() > 0
                && batchCount >= executionOptions.getBatchSize()) {
            flush();
        }
    } catch (Exception e) {
        throw new IOException("Writing records to JDBC failed.", e);
    }

    2.3.1、データのキャッシュ
     キャッシュデータはSimpleBatchStatementExecutorで定義された簡単なArrayListを使用する
    SimpleBatchStatementExecutor(
            String sql, JdbcStatementBuilder statementBuilder, Function valueTransformer) {
        this.sql = sql;
        this.parameterSetter = statementBuilder;
        this.valueTransformer = valueTransformer;
        this.batch = new ArrayList<>();
    }

    以上のようにbatchはデータをキャッシュするためのものであり、データを追加する操作は以下の通りである.
    @Override
    public void addToBatch(T record) {
        batch.add(valueTransformer.apply(record));
    }

      valueTransformerの役割は入力を返し、sinkの初期に定義します.
    return JdbcBatchStatementExecutor.simple(
            sql, statementBuilder, Function.identity());
            
    /**
     * Returns a function that always returns its input argument.
     *
     * @param  the type of the input and output objects to the function
     * @return a function that always returns its input argument
     */
    static  Function identity() {
        return t -> t;
    }

    2.3.2、flush
      flushとはキャッシュデータをデータベースに消去することであり,最終的に呼び出されるのはSimpleBatchStatementExecutorのexecuteBatchメソッドである.
    @Override
    public void executeBatch() throws SQLException {
        if (!batch.isEmpty()) {
            for (V r : batch) {
                parameterSetter.accept(st, r);
                st.addBatch();
            }
            st.executeBatch();
            batch.clear();
        }
    }