FlinkにおけるDataStream/DataSetとTableの相互変換

4491 ワード

最近Flink Table APIを用いて,DataStream/DataSetとTableの相互変換方法をまとめた.
一、pom.xmlインポートに必要なパッケージ

    org.apache.flink
    flink-streaming-scala_${scala.binary.version}
    ${flink.version}
    
        
            log4j
            *
        
        
            org.slf4j
            slf4j-log4j12
        
    


    org.apache.flink
    flink-table-planner_${scala.binary.version}
    ${flink.version}


    org.apache.flink
    flink-table-api-scala-bridge_${scala.binary.version}
    ${flink.version}


    org.apache.flink
    flink-table-common
    ${flink.version}

二、メソッドインポートクラス.
Table APIはScalaの暗黙変換を使用します.Scalaの暗黙変換を使用するには、必ずインポートしてください.
org.apache.flink.api.scala._
org.apache.flink.table.api.scala._
org.apache.flink.streaming.api.scala._
三、DataStream or DataSet to Table
データ型の定義
// data type
case class Order(user: Long, product: String, amount: Int)

3.1、 Register a DataStream or DataSet as Table
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

// DataStream
val orderA: DataStream[Order] = env.fromCollection(Seq(
  Order(2L, "pen", 3),
  Order(1L, "rubber", 3),
  Order(4L, "beer", 1)
))

// register DataStream as Table
tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)

// DataSet
val orderB: DataSet[Order] = env.fromElements(
  Order(2L, "pen", 3),
  Order(1L, "rubber", 3),
  Order(4L, "beer", 1)
)

// register the DataSet as table
tEnv.registerDataSet("OrderB", orderB, 'user, 'product, 'amount)

3.2、Convert a DataStream or DataSet into a Table
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

// DataStream
val orderA: DataStream[Order] = env.fromCollection(Seq(
  Order(2L, "pen", 3),
  Order(1L, "rubber", 3),
  Order(4L, "beer", 1)
))

// convert DataStream to Table
val tableA: Table = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
//   
orderA.toTable(tEnv)
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)

// DataSet
val orderB: DataSet[Order] = env.fromElements(
  Order(2L, "pen", 3),
  Order(1L, "rubber", 3),
  Order(4L, "beer", 1)
)

// convert DataSets to Table
val tableB: Table = orderB.toTable(tEnv, 'user, 'product, 'amount)

四、Convert a Table into a DataStreamor DataSet
4.1、Convert a Table into a DataStream
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)

// DataSet
val orderB: DataSet[Order] = env.fromElements(
  Order(2L, "pen", 3),
  Order(1L, "rubber", 3),
  Order(4L, "beer", 1)
)

// convert DataSets to Table
val tableB: Table = orderB.toTable(tEnv, 'user, 'product, 'amount)

// convert a Table into a DataStream
val ds1: DataStream[Order] = tableA.toAppendStream[Order]
//   
val ds2: DataStream[Order] = tEnv.toAppendStream[Order](tableA)

4.2、Convert a Table into a DataSet
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)

// DataSet
val orderB: DataSet[Order] = env.fromElements(
  Order(2L, "pen", 3),
  Order(1L, "rubber", 3),
  Order(4L, "beer", 1)
)

// convert DataSets to Table
val tableB: Table = orderB.toTable(tEnv, 'user, 'product, 'amount)

// convert a Table into a DataSet
val ds1: DataSet[Order] = tableB.toDataSet[Order]
//   
val ds2: DataSet[Order] = tEnv.toDataSet[Order](tableB)

 
間違ったところがあれば、ご指摘ください.Q群:176098255