Flinkブロードキャストストリーム-BroadcastStream
通常のFlink開発では、実行中のFlinkプログラムにパラメータを変更する必要がある場合があります.たとえば、フィールド値のフィルタリングなどです.これらの値はMysqlに設定されている場合がありますが、高スループット計算を行うFunctionでダイナミッククエリープロファイルを使用すると、タスクがブロックされたり、タスクが失敗したりする可能性があります.
上記のシナリオに遭遇した場合、ブロードキャストストリームクエリープロファイルを介してoperatorのすべての同時インスタンスにブロードキャストし、別のストリームデータ接続と計算することを考慮できます.
実装手順:
1、まずMysqlのエンティティークラスを定義します.次の例の属性名は実際のMysqlの表名によって自分で変えることができます.
2、ブロードキャストのデータフォーマットを記述するMapStateDescriptorを定義する
3、ソースStreamを作成して下流のoperatorを放送する
4、データソースを追加し、データソースをブロードキャストストリームに登録する
5.ブロードキャストストリームと処理データを接続するストリーム
注意すべき問題:データソース送信時にデータが集合である場合、スレッドセキュリティの集合クラス を使用する必要がある.で取得したBroadcastStateはmapで、同じKEYで、putが入ると をカバーします.
上記のシナリオに遭遇した場合、ブロードキャストストリームクエリープロファイルを介してoperatorのすべての同時インスタンスにブロードキャストし、別のストリームデータ接続と計算することを考慮できます.
実装手順:
1、まずMysqlのエンティティークラスを定義します.次の例の属性名は実際のMysqlの表名によって自分で変えることができます.
class Flow {
var flowId = 0
//
var mode: Int = 0
//
var databaseName: String = ""
//mysql
var tableName: String = ""
//hbase
var hbaseTable: String = ""
//Column Family
var family: String = ""
// , true
var uppercaseQualifier: Boolean = true
// ,ETL
var commitBatch: Int = 0;
// rowkey ,
var rowKey: String = ""
//
var status: Int = 0
var kuduTable: String = ""
var tidbTable: String = ""
var mask_fields: String = ""
}
2、ブロードキャストのデータフォーマットを記述するMapStateDescriptorを定義する
private val flowStateDescriptor: MapStateDescriptor[String, Flow] = new MapStateDescriptor[String, Flow](
"flowBroadCastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint[Flow] {}))
3、ソースStreamを作成して下流のoperatorを放送する
class FlowSource extends RichSourceFunction[Flow]{
private val log: Logger = LoggerFactory.getLogger(Class[FlowSource].getClass)
val serialVersionUID: Long = 3519222623348229907L
private val flow = new Flow
//
var isRunning: Boolean = true
override def run(ctx: SourceFunction.SourceContext[Flow]): Unit = {
// flow , Flow
while (isRunning) {
val conn = MysqlJdbcUtils.getConnection(
"jdbc:mysql://10.101.40.197:3306/hdb_data_warehouse?useUnicode=true&characterEncoding=utf8",
"canal",
"canal"
)
val statement = conn.createStatement()
val rs = statement.executeQuery("select * from data_warehouse_cfg")
while (rs.next()) {
flow.flowId = rs.getInt("flow_id")
flow.databaseName = rs.getString("mysql_db")
flow.tableName = rs.getString("mysql_table")
flow.hbaseTable = rs.getString("hbase_table")
flow.family = rs.getString("hbase_col_family")
flow.commitBatch = rs.getInt("status")
flow.status = rs.getInt("status")
flow.rowKey = rs.getString("hbase_rowkey")
flow.kuduTable = rs.getString("kudu_table")
flow.tidbTable = rs.getString("tidb_table")
flow.mask_fields = rs.getString("mask_fields")
log.debug("load flow: " + flow.toString)
ctx.collect(flow)
}
// ,
Thread.sleep(60 * 1000L)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
4、データソースを追加し、データソースをブロードキャストストリームに登録する
val broadcast: BroadcastStream[Flow] = env.addSource(new FlowSource).broadcast(flowStateDescriptor)
5.ブロードキャストストリームと処理データを接続するストリーム
val connectedStream: DataStream[(FlatMessage, Flow)] = keyedMessage.connect(broadcast).process(new KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)] {
override def processElement(
message: FlatMessage,
ctx: KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)]#ReadOnlyContext,
out: Collector[(FlatMessage, Flow)]): Unit = {
//
val flow = ctx.getBroadcastState(flowStateDescriptor).get(message.getDatabase + message.getTable)
if (null != flow) {
out.collect((message, flow))
}
}
override def processBroadcastElement(
flow: Flow,
ctx: KeyedBroadcastProcessFunction[String, FlatMessage, Flow, (FlatMessage, Flow)]#Context,
out: Collector[(FlatMessage, Flow)]): Unit = {
val broadcast: BroadcastState[String, Flow] = ctx.getBroadcastState(flowStateDescriptor)
...
broadcast.put(key, flow)
}
})
注意すべき問題: