Flinkノート
Flinkを使用する4つのステップ実行環境 を作成する. Source を追加 Transform Sink
実行環境の作成 Flinkは、実行環境を作成するAPIとは異なり、バッチenvを作成するコードは次のとおりです.
ストリーム処理envを作成するコードは次のとおりです.
Sourceの追加集合ベースsource ファイルベースsource ネットワークソケットに基づくsource カスタムsource kafka source
カスタムテストソース
Transform
Flinkのtranformはsparkのtransformよりも分類でき、変換を行うAPIでもありますが、詳細は異なる場合があります.
Transformation
へんかん
説明
map
DataStream → DataStream
エレメントを取得してエレメントを生成
flatMap
DataStream → DataStream
1つの要素を取り、0以上の要素を生成します.
filter
DataStream → DataStream
データ・ストリームのフィルタリング
keyBy
DataStream → KeyedStream
類似データベースのgroupby
reduce
KeyedStream → DataStream
つの汎用Tがあって、2つのTタイプのパラメータを受け取って、1つのTタイプのパラメータを返して、戻り値は次回実行する第1のパラメータとして、第2のパラメータはデータストリームの中のデータです
fold
KeyedStream → DataStream
reduceのようなものは廃棄されていますが、2つの汎用型を受信すると、出入り流の汎用型が異なる場合があります.
Aggregations
KeyedStream → DataStream
KeyedStreamを集約し、sum、min、max、minBy、maxByを含む
union
DataStream*→ DataStream
同じタイプの複数のデータ・ストリームの接続
connect
DataStream,DataStream → ConnectedStreams
2つのデータ・ストリームを接続します.この2つのデータ・ストリームのタイプは異なります.
split
DataStream → SplitStream
いくつかの基準に基づいてストリームを2つ以上のストリームに分割
select
SplitStream → DataStream
分割ストリームから1つ以上のストリームを選択
window
KeyedStream → WindowedStream
すでにパーティション化されているKeyedStreamsでWindowを定義する
timeWindow
KeyedStream → WindowedStream
すでにパーティション化されているKeyedStreamsでの時間Windowの定義
countWindow
KeyedStream → WindowedStream
すでにパーティション化されているKeyedStreamsでカウントウィンドウを定義する
windowAll
DataStream → WindowedStream
一般的なDataStreamsでのWindowの定義
Window api待更
以上の操作では、MapFunction、RichMapFunctionなどのカスタム関数クラスの転送がサポートされています.
Sink
FlinkのSinkはSparkのactionに似ていて、主にデータの出力によく使われるSinkに使われています. kafka Sinkまず依存 を増加させる
しゅプログラム es Sink まず依存 を増加する.
しゅプログラムカスタムSinkカスタムSink Functionを実現すればよいが、一般的にRichSinkFunctionを実現すればより豊富な機能を提供する
実行環境の作成
val env = ExecutionEnvironment.getExecutionEnvironment
ストリーム処理envを作成するコードは次のとおりです.
val env = StreamExecutionEnvironment.getExecutionEnvironment
Sourceの追加
val stream = env.fromCollection(List(
TemperatureRecord("d1", 25.5, 100),
TemperatureRecord("d2", 25.5, 100),
TemperatureRecord("d3", 25.5, 100),
TemperatureRecord("d4", 25.5, 100),
TemperatureRecord("d5", 25.5, 100),
TemperatureRecord("d6", 25.5, 100),
TemperatureRecord("d7", 25.5, 100)
))
val stream = env.readTextFile("""/source.txt""")
val dataStream = env.socketTextStream("192.168.1.101", 7777)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.1.101:9092")
properties.setProperty("zookeeper.connect", "192.168.1.101:2181")
properties.setProperty("group.id", "kafkaStreamTest")
val kafka11 = new FlinkKafkaConsumer011[String]("kafkaStreamTest", new SimpleStringSchema(), properties)
val stream = env.addSource(kafka11)
カスタムテストソース
class TestSourceFunction extends SourceFunction[String] {
var running = true
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
var i = 0
while (running) {
i += 1
sourceContext.collect(i + " " + "v_" + Random.nextInt(5))
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
running = false
}
}
Transform
Flinkのtranformはsparkのtransformよりも分類でき、変換を行うAPIでもありますが、詳細は異なる場合があります.
Transformation
へんかん
説明
map
DataStream → DataStream
エレメントを取得してエレメントを生成
flatMap
DataStream → DataStream
1つの要素を取り、0以上の要素を生成します.
filter
DataStream → DataStream
データ・ストリームのフィルタリング
keyBy
DataStream → KeyedStream
類似データベースのgroupby
reduce
KeyedStream → DataStream
つの汎用Tがあって、2つのTタイプのパラメータを受け取って、1つのTタイプのパラメータを返して、戻り値は次回実行する第1のパラメータとして、第2のパラメータはデータストリームの中のデータです
fold
KeyedStream → DataStream
reduceのようなものは廃棄されていますが、2つの汎用型を受信すると、出入り流の汎用型が異なる場合があります.
Aggregations
KeyedStream → DataStream
KeyedStreamを集約し、sum、min、max、minBy、maxByを含む
union
DataStream*→ DataStream
同じタイプの複数のデータ・ストリームの接続
connect
DataStream,DataStream → ConnectedStreams
2つのデータ・ストリームを接続します.この2つのデータ・ストリームのタイプは異なります.
split
DataStream → SplitStream
いくつかの基準に基づいてストリームを2つ以上のストリームに分割
select
SplitStream → DataStream
分割ストリームから1つ以上のストリームを選択
window
KeyedStream → WindowedStream
すでにパーティション化されているKeyedStreamsでWindowを定義する
timeWindow
KeyedStream → WindowedStream
すでにパーティション化されているKeyedStreamsでの時間Windowの定義
countWindow
KeyedStream → WindowedStream
すでにパーティション化されているKeyedStreamsでカウントウィンドウを定義する
windowAll
DataStream → WindowedStream
一般的なDataStreamsでのWindowの定義
Window api待更
以上の操作では、MapFunction、RichMapFunctionなどのカスタム関数クラスの転送がサポートされています.
Sink
FlinkのSinkはSparkのactionに似ていて、主にデータの出力によく使われるSinkに使われています.
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka-0.10_2.11artifactId>
<version>1.3.1version>
dependency>
しゅプログラム
val env = StreamExecutionEnvironment.getExecutionEnvironment
//TestObjectSourceFunction Source
val stream = env.addSource(new TestObjectSourceFunction)
val producer = new FlinkKafkaProducer011[String]("localhost:9092", "test", new SimpleStringSchema())
stream.map(_.id)
.addSink(producer)
env.execute("kafkaSinkTest")
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-elasticsearch6_2.11artifactId>
<version>1.7.2version>
dependency>
しゅプログラム
val env = StreamExecutionEnvironment.getExecutionEnvironment
//TestObjectSourceFunction Source
val stream = env.addSource(new TestObjectSourceFunction)
val httpHosts = new util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("localhost", 9200))
val esSink = new ElasticsearchSink.Builder[ApplyInfo](httpHosts, new ElasticsearchSinkFunction[ApplyInfo] {
override def process(item: ApplyInfo, ctx: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
val esSource = new util.HashMap[String, String]()
esSource.put("id", item.id)
esSource.put("areaCode", item.areaCode)
val req = Requests.indexRequest("applyInfo").`type`("applyInfo").source(esSource)
requestIndexer.add(req)
}
}).build()
stream.addSink(esSink)
env.execute("esSinkTest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
//TestObjectSourceFunction Source
val stream = env.addSource(new TestObjectSourceFunction)
stream.addSink(new RichSinkFunction[ApplyInfo] {
var out: OutputStream = _
override def open(parameters: Configuration): Unit = {
out = new FileOutputStream("/tmp/customSink.txt")
}
override def invoke(value: ApplyInfo, context: SinkFunction.Context[_]): Unit = {
out.write((value.id + "," + value.areaCode + "\r
").getBytes(StandardCharsets.UTF_8))
}
override def close(): Unit = {
out.close()
}
})
env.execute("esSinkTest")