FlinkSQLソース読み-schema管理

6978 ワード

Flink SQLでは、メタデータの管理は3層に分けられています。catalog->database->tableは、Flink SQLがcaciteフレームに依存してSQLを実行する木の生産、検査、最適化などを知っています。ここでは、FlinkSQLがどのようにCalciteと結合してメタデータ管理を行うかを紹介します。
caciteオープンインターフェース
public interface Schema {
    Table getTable(String name);

    Schema getSubSchema(String name);

    ....
}
インターフェースに示されているように、Schemaインターフェースは、テーブルを介してテーブルを得ることができます。schema名来を通じて、サブschemaを得ることができます。
public interface Table {
    RelDataType getRowType(RelDataTypeFactory typeFactory);
    ....
}
テーブルのインターフェースを見ると、主にテーブルに戻るRelDataTypeです。
Flinkの関連実現
次に、Flankはどのようにこれらのインターフェースを実現しているかを見ます。
public class CatalogManagerCalciteSchema extends FlinkSchema {
	@Override
	public Schema getSubSchema(String schemaName) {
		if (catalogManager.schemaExists(name)) {
			return new CatalogCalciteSchema(name, catalogManager, isStreamingMode);
		} else {
			return null;
		}
	}
}

public class CatalogCalciteSchema extends FlinkSchema {
    @Override
    public Schema getSubSchema(String schemaName) {
        if (catalogManager.schemaExists(catalogName, schemaName)) {
            return new DatabasecalciteSchema(schemaName, catalogNmae, catalogManager, isStreamingMode);
        }
    }
}
public class DatabaseCalciteSchema extends FlinkSchema {
    private final String databaseName;
    private final String catalogName;
    private final CatalogManager catalogManager;

    @Override
    public Table getTable(String tableName) {
		ObjectIdentifier identifier = ObjectIdentifier.of(catalogName, databaseName, tableName);
		return catalogManager.getTable(identifier)
			.map(result -> {
				CatalogBaseTable table = result.getTable();
				FlinkStatistic statistic = getStatistic(result.isTemporary(), table, identifier);
				return new CatalogSchemaTable(identifier,
					table,
					statistic,
					catalogManager.getCatalog(catalogName)
						.flatMap(Catalog::getTableFactory)
						.orElse(null),
					isStreamingMode,
					result.isTemporary());
			})
			.orElse(null);
    }

    @Override
    public Schema getSubSchema(String name) {
        return null;
    }
}
CatalogSchemaがDatabaseSchemaに戻り、DatabaseSchemaがTableに戻ると分かりやすくなります。Flankの三層構造はどうなりますか?また、具体的なメタデータは実際にはcatalogManagerにあります。
DatabaseSchemaで戻ってきたTableタイプはCatalogSchemaTableであり、具体的な接合構造はどのようなものかを見てみたい。Tableインターフェースは主にgetRowType関数であり、あるtableのtype情報を返すために用いられる。Table Schemaは、Flink内部で各フィールドのタイプ情報を保存するクラスであり、関連する変換関数により、caciteのtypeタイプに変換される。
public class CatalogSchemaTable extends AbstractTable implements TemporalTable {
    
	private final ObjectIdentifier tableIdentifier;
	private final CatalogBaseTable catalogBaseTable;
	private final FlinkStatistic statistic;
	private final boolean isStreamingMode;
	private final boolean isTemporary;
    ...
	private static RelDataType getRowType(RelDataTypeFactory typeFactory,
			CatalogBaseTable catalogBaseTable,
			boolean isStreamingMode) {
		final FlinkTypeFactory flinkTypeFactory = (FlinkTypeFactory) typeFactory;
		TableSchema tableSchema = catalogBaseTable.getSchema();
		final DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
		if (!isStreamingMode
			&& catalogBaseTable instanceof ConnectorCatalogTable
			&& ((ConnectorCatalogTable) catalogBaseTable).getTableSource().isPresent()) {
			// If the table source is bounded, materialize the time attributes to normal TIMESTAMP type.
			// Now for ConnectorCatalogTable, there is no way to
			// deduce if it is bounded in the table environment, so the data types in TableSchema
			// always patched with TimeAttribute.
			// See ConnectorCatalogTable#calculateSourceSchema
			// for details.

			// Remove the patched time attributes type to let the TableSourceTable handle it.
			// We should remove this logic if the isBatch flag in ConnectorCatalogTable is fixed.
			// TODO: Fix FLINK-14844.
			for (int i = 0; i < fieldDataTypes.length; i++) {
				LogicalType lt = fieldDataTypes[i].getLogicalType();
				if (lt instanceof TimestampType
					&& (((TimestampType) lt).getKind() == TimestampKind.PROCTIME
					|| ((TimestampType) lt).getKind() == TimestampKind.ROWTIME)) {
					int precision = ((TimestampType) lt).getPrecision();
					fieldDataTypes[i] = DataTypes.TIMESTAMP(precision);
				}
			}
		}
		return TableSourceUtil.getSourceRowType(flinkTypeFactory,
			tableSchema,
			scala.Option.empty(),
			isStreamingMode);
	}
}
CatalogBaseTableインターフェースは、FlinkのTableのパラメータ(schemaパラメータ、connectorパラメータ)を最終的にmapとして表すことができます。
public interface CatalogBaseTable {
	/**
	 * Get the properties of the table.
	 *
	 * @return property map of the table/view
	 */
	Map getProperties();

	/**
	 * Get the schema of the table.
	 *
	 * @return schema of the table/view.
	 */
	TableSchema getSchema();

	/**
	 * Get comment of the table or view.
	 *
	 * @return comment of the table/view.
	 */
	String getComment();

	/**
	 * Get a deep copy of the CatalogBaseTable instance.
	 *
	 * @return a copy of the CatalogBaseTable instance
	 */
	CatalogBaseTable copy();

	/**
	 * Get a brief description of the table or view.
	 *
	 * @return an optional short description of the table/view
	 */
	Optional getDescription();

	/**
	 * Get a detailed description of the table or view.
	 *
	 * @return an optional long description of the table/view
	 */
	Optional getDetailedDescription();
}

FlinkSchemaの使用
上記のすべての関連インターフェースは、Flinkがcalciteフレームのメタデータに適応するための関連した実現である。これらの種類は具体的にどこで呼び出しますか?いつ呼び出しられますか?caciteの中のschemaは主にvalidate過程でテーブルに対応するフィールド情報を得て、対応するfunctionの戻り値情報を確保して、SQLのフィールド名を確保して、フィールドタイプは正しいです。クラスの依存関係は:validator-->schemaReader-->schemaReader-->schema
Flink Planneranner Impl.scala中
  private def createSqlValidator(catalogReader: CatalogReader) = {
    val validator = new FlinkCalciteSqlValidator(
      operatorTable,
      catalogReader,
      typeFactory)
    validator.setIdentifierExpansion(true)
    // Disable implicit type coercion for now.
    validator.setEnableTypeCoercion(false)
    validator
  }
PlanningConfigrationBuilder.java
	private CatalogReader createCatalogReader(
			boolean lenientCaseSensitivity,
			String currentCatalog,
			String currentDatabase) {
		SqlParser.Config sqlParserConfig = getSqlParserConfig();
		final boolean caseSensitive;
		if (lenientCaseSensitivity) {
			caseSensitive = false;
		} else {
			caseSensitive = sqlParserConfig.caseSensitive();
		}

		SqlParser.Config parserConfig = SqlParser.configBuilder(sqlParserConfig)
			.setCaseSensitive(caseSensitive)
			.build();

		return new CatalogReader(
			rootSchema,
			asList(
				asList(currentCatalog, currentDatabase),
				singletonList(currentCatalog)
			),
			typeFactory,
			CalciteConfig.connectionConfig(parserConfig));
	}
以上より、Flankがどのようにcaciteのschemaを利用してFlinkのtable情報を管理しているかが分かりました。