JdbcSinkの概要
7820 ワード
1、JdbcSink
DataStreamはJdbcのSink出力を増加させるために使用され、主に2つのインタフェース:sink()とexactlyOnceSink()である.ここでexactlyOnceSink()は、トランザクションをサポートする13バージョンのインタフェースで、今回はsink()インタフェースについて説明します.
1.1、パラメータ
インタフェースには4つのパラメータがあり、そのうち3番目のパラメータexecutionOptionsはデフォルト値の使用を省略することができ、具体的なサンプルは1、JdbcSink方式を参照する. sqlStringタイプ、SQL文テンプレートは、通常使用されるPreparedStatementのような形式です.例えば、insert into wordcount(wordcl,countcl)values(??) statementBuilderJdbcStatementBuilderタイプは、ストリームデータとSQLの具体的な列の対応を完了する役割を果たし、前のパラメータのPreparedStatement形式に基づいて、対応関係 を完了する executionOptionsFlink Jdbc出力の実行規則は、主に実行トリガメカニズムを設定し、主に3つのパラメータを設定します:データ量トリガしきい値、時間トリガしきい値、最大再試行回数.ここで、データ量トリガはデフォルトで5000、時間トリガはデフォルトで0、すなわちオフ時間トリガです.注意トリガしきい値を設定しすぎないでください.そうしないと、データベースがブロックされる可能性があります. connectionOptionsJdbcConnectionOptionsタイプUrl、Driver、Username、Passwordなどの を含むデータベース接続プロパティの設定
1.2、戻る
インタフェースはSinkFunctionに基づいて実装されたGenericJdbcSinkFunctionクラスを返し、そのコアはパラメータJdbcBatchingOutputFormatである.
GenericJdbcSinkFunctionの結果コアメソッドは以下の通りであり,いずれもJdbcBatchingOutputFormatに基づく動作である.
2、JdbcBatchingOutputFormat
JdbcBatchingOutputFormatは、Jdbcインタラクションを行う実装クラスであり、Jdbcへ出力する前にデータ集約を行う
2.1、パラメータ
インタフェースには4つのパラメータがあります JdbcConnectionProvider提供Jdbc接続 JdbcExecutionOptions実行パラメータ S t e m e n t ExecutorFactoryStatement実行ファクトリ、すなわちストリームデータとデータベースフィールドとの関係の処理 RecordExtractorデータ抽出の実行クラス 2.2、open方法
Openメソッドは,データベース接続の初期化および前期準備を行うインタフェースであり,呼び出し関係がある.
2.2.1、接続データベース
Openメソッドの最初のステップは、データベースへの接続です.上位メソッドAbstractJdbcOutputFormatのopenメソッドを呼び出します.その後、JdbcConnectionProviderの実装クラスSimpleJdbcConnectionProviderのgetOrEstablishConnection()メソッドを呼び出して接続を確立します.getOrEstablishConnectionの具体的な操作は次のとおりです.
ここでは、Driveが設定されているかどうかによって2つの処理があります.設定されていない場合は、設定されたURLに基づいて自動的に解析され、JavaのDriverManagerが使用されます.このクラスはJdbcドライバを管理するために使用されます.DriverManagerはclasspathのDriverを自動的に認識し、URLに基づいてペアリングを自動的に解析できます.Driverが設定されている場合は、このDriverを直接ロードして接続処理を行います.
2.2.2、JdbcExec
これはStatementExecutorFactoryに基づいて作成され、ここで最後に使用される実装クラスはJdbcBatchStatementExecutorでsink()インタフェースで設定されています.このステップの実際の操作はprepareStatementsを作ることです
2.2.3、scheduler
データベースのパフォーマンスは限られているため、Flink書き込みデータベースは通常バッチ方式を採用しています.ここでは時間スケジューリングを設定します.具体的なパラメータは第1章を参照してください.2つの特殊な構成値に注意してください.時間が0または1の場合、このスケジューラは作成されません.
ここで作成したスケジューリングスレッドプールには、1つのスレッドしか含まれていません.
スケジューラが最終的に実行する操作は、クラス全体で最大の点であり、flushデータはデータベースに送信されます.
2.3、writeRecord方法
writeRecordはクラスのコアメソッドであり,データの書き込みを行う.主に2つの操作を行い、データをリストに追加し、条件に達したときにflushをデータベースに追加します.
2.3.1、データのキャッシュ
キャッシュデータはSimpleBatchStatementExecutorで定義された簡単なArrayListを使用する
以上のようにbatchはデータをキャッシュするためのものであり、データを追加する操作は以下の通りである.
valueTransformerの役割は入力を返し、sinkの初期に定義します.
2.3.2、flush
flushとはキャッシュデータをデータベースに消去することであり,最終的に呼び出されるのはSimpleBatchStatementExecutorのexecuteBatchメソッドである.
DataStreamはJdbcのSink出力を増加させるために使用され、主に2つのインタフェース:sink()とexactlyOnceSink()である.ここでexactlyOnceSink()は、トランザクションをサポートする13バージョンのインタフェースで、今回はsink()インタフェースについて説明します.
public static SinkFunction sink(
String sql,
JdbcStatementBuilder statementBuilder,
JdbcExecutionOptions executionOptions,
JdbcConnectionOptions connectionOptions) {
return new GenericJdbcSinkFunction<>(
new JdbcBatchingOutputFormat<>(
new SimpleJdbcConnectionProvider(connectionOptions),
executionOptions,
context -> {
Preconditions.checkState(
!context.getExecutionConfig().isObjectReuseEnabled(),
"objects can not be reused with JDBC sink function");
return JdbcBatchStatementExecutor.simple(
sql, statementBuilder, Function.identity());
},
JdbcBatchingOutputFormat.RecordExtractor.identity()));
}
1.1、パラメータ
インタフェースには4つのパラメータがあり、そのうち3番目のパラメータexecutionOptionsはデフォルト値の使用を省略することができ、具体的なサンプルは1、JdbcSink方式を参照する.
1.2、戻る
インタフェースはSinkFunctionに基づいて実装されたGenericJdbcSinkFunctionクラスを返し、そのコアはパラメータJdbcBatchingOutputFormatである.
GenericJdbcSinkFunctionの結果コアメソッドは以下の通りであり,いずれもJdbcBatchingOutputFormatに基づく動作である.
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
@Override
public void invoke(T value, Context context) throws IOException {
outputFormat.writeRecord(value);
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
outputFormat.flush();
}
2、JdbcBatchingOutputFormat
JdbcBatchingOutputFormatは、Jdbcインタラクションを行う実装クラスであり、Jdbcへ出力する前にデータ集約を行う
2.1、パラメータ
インタフェースには4つのパラメータがあります
Openメソッドは,データベース接続の初期化および前期準備を行うインタフェースであり,呼び出し関係がある.
Task.doRun()
->invokable.invoke()->DataSinkTask.invoke()
->format.open()->JdbcBatchingOutputFormat.open()
2.2.1、接続データベース
Openメソッドの最初のステップは、データベースへの接続です.上位メソッドAbstractJdbcOutputFormatのopenメソッドを呼び出します.その後、JdbcConnectionProviderの実装クラスSimpleJdbcConnectionProviderのgetOrEstablishConnection()メソッドを呼び出して接続を確立します.getOrEstablishConnectionの具体的な操作は次のとおりです.
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
if (connection != null) {
return connection;
}
if (jdbcOptions.getDriverName() == null) {
connection =
DriverManager.getConnection(
jdbcOptions.getDbURL(),
jdbcOptions.getUsername().orElse(null),
jdbcOptions.getPassword().orElse(null));
} else {
Driver driver = getLoadedDriver();
Properties info = new Properties();
jdbcOptions.getUsername().ifPresent(user -> info.setProperty("user", user));
jdbcOptions.getPassword().ifPresent(password -> info.setProperty("password", password));
connection = driver.connect(jdbcOptions.getDbURL(), info);
if (connection == null) {
// Throw same exception as DriverManager.getConnection when no driver found to match
// caller expectation.
throw new SQLException(
"No suitable driver found for " + jdbcOptions.getDbURL(), "08001");
}
}
return connection;
}
ここでは、Driveが設定されているかどうかによって2つの処理があります.設定されていない場合は、設定されたURLに基づいて自動的に解析され、JavaのDriverManagerが使用されます.このクラスはJdbcドライバを管理するために使用されます.DriverManagerはclasspathのDriverを自動的に認識し、URLに基づいてペアリングを自動的に解析できます.Driverが設定されている場合は、このDriverを直接ロードして接続処理を行います.
2.2.2、JdbcExec
これはStatementExecutorFactoryに基づいて作成され、ここで最後に使用される実装クラスはJdbcBatchStatementExecutorでsink()インタフェースで設定されています.このステップの実際の操作はprepareStatementsを作ることです
@Override
public void prepareStatements(Connection connection) throws SQLException {
this.st = connection.prepareStatement(sql);
}
2.2.3、scheduler
データベースのパフォーマンスは限られているため、Flink書き込みデータベースは通常バッチ方式を採用しています.ここでは時間スケジューリングを設定します.具体的なパラメータは第1章を参照してください.2つの特殊な構成値に注意してください.時間が0または1の場合、このスケジューラは作成されません.
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
ここで作成したスケジューリングスレッドプールには、1つのスレッドしか含まれていません.
this.scheduler =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("jdbc-upsert-output-format"))
スケジューラが最終的に実行する操作は、クラス全体で最大の点であり、flushデータはデータベースに送信されます.
synchronized (JdbcBatchingOutputFormat.this) {
if (!closed) {
try {
flush();
} catch (Exception e) {
flushException = e;
}
}
}
2.3、writeRecord方法
writeRecordはクラスのコアメソッドであり,データの書き込みを行う.主に2つの操作を行い、データをリストに追加し、条件に達したときにflushをデータベースに追加します.
try {
addToBatch(record, jdbcRecordExtractor.apply(record));
batchCount++;
if (executionOptions.getBatchSize() > 0
&& batchCount >= executionOptions.getBatchSize()) {
flush();
}
} catch (Exception e) {
throw new IOException("Writing records to JDBC failed.", e);
}
2.3.1、データのキャッシュ
キャッシュデータはSimpleBatchStatementExecutorで定義された簡単なArrayListを使用する
SimpleBatchStatementExecutor(
String sql, JdbcStatementBuilder statementBuilder, Function valueTransformer) {
this.sql = sql;
this.parameterSetter = statementBuilder;
this.valueTransformer = valueTransformer;
this.batch = new ArrayList<>();
}
以上のようにbatchはデータをキャッシュするためのものであり、データを追加する操作は以下の通りである.
@Override
public void addToBatch(T record) {
batch.add(valueTransformer.apply(record));
}
valueTransformerの役割は入力を返し、sinkの初期に定義します.
return JdbcBatchStatementExecutor.simple(
sql, statementBuilder, Function.identity());
/**
* Returns a function that always returns its input argument.
*
* @param the type of the input and output objects to the function
* @return a function that always returns its input argument
*/
static Function identity() {
return t -> t;
}
2.3.2、flush
flushとはキャッシュデータをデータベースに消去することであり,最終的に呼び出されるのはSimpleBatchStatementExecutorのexecuteBatchメソッドである.
@Override
public void executeBatch() throws SQLException {
if (!batch.isEmpty()) {
for (V r : batch) {
parameterSetter.accept(st, r);
st.addBatch();
}
st.executeBatch();
batch.clear();
}
}