Flink Data Stream内外部データソースの諸事情まとめ
9649 ワード
一、内蔵データソース
(1)ファイルデータソースStreamExecutionEnvironmentではreadTextFileメソッドを使用してテキストファイルを直接読み込むこともできますし、readFileメソッドを使用して指定ファイルInputFormatを使用して特定のデータ型のファイル、例えばCsvInputFormatを読み取ることもできます.
次のコードでは、readTextFileを使用してテキストファイルを読み込む方法を示します.
(2)Socketデータソース
Unix環境ではnc-lk 9999コマンドを実行し、ポートを起動し、クライアントにデータを入力することでflinkがデータを受信できます.
(3)集合データソース
JavaまたはScalaプログラムの集合クラスを直接DataStreamデータセットに変換できます.本質的には、ローカル集合のデータをリモートパラレル実行ノードに配布します.
二、外部データソース
前述のデータソースタイプは、ファイル、Socketポートからデータにアクセスするなど、非常に基礎的なデータアクセス方式であり、本質的には異なるSourceFunctionを実現し、Flinkはそれを高度なAPIにカプセル化し、ユーザーの使用コストを削減している.
企業のほとんどは、Kafka、Elasticsearch、RabbitMQなど、高性能のサードパーティ製ストレージメディアとミドルウェアを使用しています.次に、Kafkaを例に、kafkaを入力元として使用する方法について説明する.
まずpom.xmlファイルに依存を導入する必要があります
(ここではkafka 0.10、flink 1.8.0バージョンを使用しています)
maven構成を導入すると、Flinkアプリケーションエンジニアリングで対応するConnectorを作成して使用できます.主なパラメータはkafkatopic、bootstrap.servers、zookeeper.connectです.また、Schemaパラメータの主な役割は、事前に定義されたSchema情報に基づいてこのSchema定義のデータ型にデータをシーケンス化することです.デフォルトはSimpleStreamSchemaです.KafkaからアクセスしたデータをStringタイプに変換
同時に,kafkaから読み出されたデータを逆シーケンス化し,主にDeserializationSchemaを実装して完成させることができる.Flinkでは多くの主流のデータソースコネクタが実装されていますが、Flinkの全体的なアーキテクチャは非常に開放的で、ユーザーはコネクタをカスタマイズして、異なるデータソースのアクセスニーズを満たすことができます.SourceFunctionを実装して単一スレッドのデータ・アクセサを定義するか、ParallelSourceFunctionインタフェースを実装するか、RichParallelSourceFunctionクラス定義の同時データ・ソース・アクセサを継承することができます.
(1)ファイルデータソースStreamExecutionEnvironmentではreadTextFileメソッドを使用してテキストファイルを直接読み込むこともできますし、readFileメソッドを使用して指定ファイルInputFormatを使用して特定のデータ型のファイル、例えばCsvInputFormatを読み取ることもできます.
次のコードでは、readTextFileを使用してテキストファイルを読み込む方法を示します.
import org.apache.flink.streaming.api.scala._
object Flink9 extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.readTextFile("d://1.txt")
.print()
env.execute("read_textfile")
}
(2)Socketデータソース
env.socketTextStream("localhost",9999)
Unix環境ではnc-lk 9999コマンドを実行し、ポートを起動し、クライアントにデータを入力することでflinkがデータを受信できます.
(3)集合データソース
JavaまたはScalaプログラムの集合クラスを直接DataStreamデータセットに変換できます.本質的には、ローカル集合のデータをリモートパラレル実行ノードに配布します.
val dataStream = env.fromElements(Tuple2(1L,3L),Tuple2(1L,5L))
val dataStream2 = env.fromCollection(List(1,2,3))
二、外部データソース
前述のデータソースタイプは、ファイル、Socketポートからデータにアクセスするなど、非常に基礎的なデータアクセス方式であり、本質的には異なるSourceFunctionを実現し、Flinkはそれを高度なAPIにカプセル化し、ユーザーの使用コストを削減している.
企業のほとんどは、Kafka、Elasticsearch、RabbitMQなど、高性能のサードパーティ製ストレージメディアとミドルウェアを使用しています.次に、Kafkaを例に、kafkaを入力元として使用する方法について説明する.
まずpom.xmlファイルに依存を導入する必要があります
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka-0.10_2.11artifactId>
<version>1.8.0version>
dependency>
(ここではkafka 0.10、flink 1.8.0バージョンを使用しています)
maven構成を導入すると、Flinkアプリケーションエンジニアリングで対応するConnectorを作成して使用できます.主なパラメータはkafkatopic、bootstrap.servers、zookeeper.connectです.また、Schemaパラメータの主な役割は、事前に定義されたSchema情報に基づいてこのSchema定義のデータ型にデータをシーケンス化することです.デフォルトはSimpleStreamSchemaです.KafkaからアクセスしたデータをStringタイプに変換
package com.dsj361
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
object Flink10 extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers","192.168.17.21:9092")
properties.setProperty("zookeeper.connect","192.168.17.22:2181")
properties.setProperty("group.id","test")
val input =
env.addSource(
new FlinkKafkaConsumer010[String](
"topic1",
new SimpleStringSchema(),
properties))
input.print()
env.execute("connect_kafka")
}
同時に,kafkaから読み出されたデータを逆シーケンス化し,主にDeserializationSchemaを実装して完成させることができる.Flinkでは多くの主流のデータソースコネクタが実装されていますが、Flinkの全体的なアーキテクチャは非常に開放的で、ユーザーはコネクタをカスタマイズして、異なるデータソースのアクセスニーズを満たすことができます.SourceFunctionを実装して単一スレッドのデータ・アクセサを定義するか、ParallelSourceFunctionインタフェースを実装するか、RichParallelSourceFunctionクラス定義の同時データ・ソース・アクセサを継承することができます.