FlinkSQLソース読み-schema管理
6978 ワード
Flink SQLでは、メタデータの管理は3層に分けられています。catalog->database->tableは、Flink SQLがcaciteフレームに依存してSQLを実行する木の生産、検査、最適化などを知っています。ここでは、FlinkSQLがどのようにCalciteと結合してメタデータ管理を行うかを紹介します。
caciteオープンインターフェース
Flinkの関連実現
次に、Flankはどのようにこれらのインターフェースを実現しているかを見ます。
DatabaseSchemaで戻ってきたTableタイプはCatalogSchemaTableであり、具体的な接合構造はどのようなものかを見てみたい。Tableインターフェースは主にgetRowType関数であり、あるtableのtype情報を返すために用いられる。Table Schemaは、Flink内部で各フィールドのタイプ情報を保存するクラスであり、関連する変換関数により、caciteのtypeタイプに変換される。
上記のすべての関連インターフェースは、Flinkがcalciteフレームのメタデータに適応するための関連した実現である。これらの種類は具体的にどこで呼び出しますか?いつ呼び出しられますか?caciteの中のschemaは主にvalidate過程でテーブルに対応するフィールド情報を得て、対応するfunctionの戻り値情報を確保して、SQLのフィールド名を確保して、フィールドタイプは正しいです。クラスの依存関係は:validator-->schemaReader-->schemaReader-->schema
Flink Planneranner Impl.scala中
caciteオープンインターフェース
public interface Schema {
Table getTable(String name);
Schema getSubSchema(String name);
....
}
インターフェースに示されているように、Schemaインターフェースは、テーブルを介してテーブルを得ることができます。schema名来を通じて、サブschemaを得ることができます。public interface Table {
RelDataType getRowType(RelDataTypeFactory typeFactory);
....
}
テーブルのインターフェースを見ると、主にテーブルに戻るRelDataTypeです。Flinkの関連実現
次に、Flankはどのようにこれらのインターフェースを実現しているかを見ます。
public class CatalogManagerCalciteSchema extends FlinkSchema {
@Override
public Schema getSubSchema(String schemaName) {
if (catalogManager.schemaExists(name)) {
return new CatalogCalciteSchema(name, catalogManager, isStreamingMode);
} else {
return null;
}
}
}
public class CatalogCalciteSchema extends FlinkSchema {
@Override
public Schema getSubSchema(String schemaName) {
if (catalogManager.schemaExists(catalogName, schemaName)) {
return new DatabasecalciteSchema(schemaName, catalogNmae, catalogManager, isStreamingMode);
}
}
}
public class DatabaseCalciteSchema extends FlinkSchema {
private final String databaseName;
private final String catalogName;
private final CatalogManager catalogManager;
@Override
public Table getTable(String tableName) {
ObjectIdentifier identifier = ObjectIdentifier.of(catalogName, databaseName, tableName);
return catalogManager.getTable(identifier)
.map(result -> {
CatalogBaseTable table = result.getTable();
FlinkStatistic statistic = getStatistic(result.isTemporary(), table, identifier);
return new CatalogSchemaTable(identifier,
table,
statistic,
catalogManager.getCatalog(catalogName)
.flatMap(Catalog::getTableFactory)
.orElse(null),
isStreamingMode,
result.isTemporary());
})
.orElse(null);
}
@Override
public Schema getSubSchema(String name) {
return null;
}
}
CatalogSchemaがDatabaseSchemaに戻り、DatabaseSchemaがTableに戻ると分かりやすくなります。Flankの三層構造はどうなりますか?また、具体的なメタデータは実際にはcatalogManagerにあります。DatabaseSchemaで戻ってきたTableタイプはCatalogSchemaTableであり、具体的な接合構造はどのようなものかを見てみたい。Tableインターフェースは主にgetRowType関数であり、あるtableのtype情報を返すために用いられる。Table Schemaは、Flink内部で各フィールドのタイプ情報を保存するクラスであり、関連する変換関数により、caciteのtypeタイプに変換される。
public class CatalogSchemaTable extends AbstractTable implements TemporalTable {
private final ObjectIdentifier tableIdentifier;
private final CatalogBaseTable catalogBaseTable;
private final FlinkStatistic statistic;
private final boolean isStreamingMode;
private final boolean isTemporary;
...
private static RelDataType getRowType(RelDataTypeFactory typeFactory,
CatalogBaseTable catalogBaseTable,
boolean isStreamingMode) {
final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory;
TableSchema tableSchema = catalogBaseTable.getSchema();
final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
if (!isStreamingMode
&& catalogBaseTable instanceof ConnectorCatalogTable
&& ((ConnectorCatalogTable) catalogBaseTable).getTableSource().isPresent()) {
// If the table source is bounded, materialize the time attributes to normal TIMESTAMP type.
// Now for ConnectorCatalogTable, there is no way to
// deduce if it is bounded in the table environment, so the data types in TableSchema
// always patched with TimeAttribute.
// See ConnectorCatalogTable#calculateSourceSchema
// for details.
// Remove the patched time attributes type to let the TableSourceTable handle it.
// We should remove this logic if the isBatch flag in ConnectorCatalogTable is fixed.
// TODO: Fix FLINK-14844.
for (int i = 0; i < fieldDataTypes.length; i++) {
LogicalType lt = fieldDataTypes[i].getLogicalType();
if (lt instanceof TimestampType
&& (((TimestampType) lt).getKind() == TimestampKind.PROCTIME
|| ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) {
int precision = ((TimestampType) lt).getPrecision();
fieldDataTypes[i] = DataTypes.TIMESTAMP(precision);
}
}
}
return TableSourceUtil.getSourceRowType(flinkTypeFactory,
tableSchema,
scala.Option.empty(),
isStreamingMode);
}
}
CatalogBaseTableインターフェースは、FlinkのTableのパラメータ(schemaパラメータ、connectorパラメータ)を最終的にmapとして表すことができます。public interface CatalogBaseTable {
/**
* Get the properties of the table.
*
* @return property map of the table/view
*/
Map getProperties();
/**
* Get the schema of the table.
*
* @return schema of the table/view.
*/
TableSchema getSchema();
/**
* Get comment of the table or view.
*
* @return comment of the table/view.
*/
String getComment();
/**
* Get a deep copy of the CatalogBaseTable instance.
*
* @return a copy of the CatalogBaseTable instance
*/
CatalogBaseTable copy();
/**
* Get a brief description of the table or view.
*
* @return an optional short description of the table/view
*/
Optional getDescription();
/**
* Get a detailed description of the table or view.
*
* @return an optional long description of the table/view
*/
Optional getDetailedDescription();
}
FlinkSchemaの使用上記のすべての関連インターフェースは、Flinkがcalciteフレームのメタデータに適応するための関連した実現である。これらの種類は具体的にどこで呼び出しますか?いつ呼び出しられますか?caciteの中のschemaは主にvalidate過程でテーブルに対応するフィールド情報を得て、対応するfunctionの戻り値情報を確保して、SQLのフィールド名を確保して、フィールドタイプは正しいです。クラスの依存関係は:validator-->schemaReader-->schemaReader-->schema
Flink Planneranner Impl.scala中
private def createSqlValidator(catalogReader: CatalogReader) = {
val validator = new FlinkCalciteSqlValidator(
operatorTable,
catalogReader,
typeFactory)
validator.setIdentifierExpansion(true)
// Disable implicit type coercion for now.
validator.setEnableTypeCoercion(false)
validator
}
PlanningConfigrationBuilder.java private CatalogReader createCatalogReader(
boolean lenientCaseSensitivity,
String currentCatalog,
String currentDatabase) {
SqlParser.Config sqlParserConfig = getSqlParserConfig();
final boolean caseSensitive;
if (lenientCaseSensitivity) {
caseSensitive = false;
} else {
caseSensitive = sqlParserConfig.caseSensitive();
}
SqlParser.Config parserConfig = SqlParser.configBuilder(sqlParserConfig)
.setCaseSensitive(caseSensitive)
.build();
return new CatalogReader(
rootSchema,
asList(
asList(currentCatalog, currentDatabase),
singletonList(currentCatalog)
),
typeFactory,
CalciteConfig.connectionConfig(parserConfig));
}
以上より、Flankがどのようにcaciteのschemaを利用してFlinkのtable情報を管理しているかが分かりました。