flink学習(1)ファイルを読み出して和を求める

2371 ワード

今日flink読み取りファイルのテストプログラムの作成を試みましたが、コードは比較的簡単で、途中で問題に遭遇しました.カスタムパッケージタイプの場合、コンパイルは以下の異常を放出します.
Error:(10, 20) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[whTest.readFile.SexAndAge]
     dataStream.map(formatPerson(_)).print()

この問題は,プログラムに暗黙的なパラメータ(implicit parameter)が必要であるためである.上記のプログラムで使用したmapのFlinkでの実装を見ることができます.
def map[R: TypeInformation](fun: T => R): DataStream[R] = {
  if (fun == null) {
    throw new NullPointerException("Map function must not be null.")
  }
  val cleanFun = clean(fun)
  val mapper = new MapFunction[T, R] {
    def map(in: T): R = cleanFun(in)
  }
 
  map(mapper)
}

mapの定義に[R:TypeInformation]がありますが、暗黙的なパラメータに関する定義は指定されていません.この場合、コンパイルコードはTypeInformationを作成できないため、上記の異常情報が表示されます.この問題を解決するには以下の2つの方法がある.

(1)、コードに直接以下のコードを加えることができます。


implicit val typeInfo = TypeInformation.of(classOf[SexAndAge])コードをコンパイルしても上記の異常は発生しません.

(2)、公式に推奨される方法は、コードにパッケージを導入することです。


import org.apache.flink.streaming.api.scala._ 静的データセットの場合、次のパッケージを導入できます.
import org.apache.flink.api.scala._ 問題は解決された
テストのコードは次のとおりです.
package whTest

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

/**
  *   org.apache.flink.streaming.api.scala._
  *   could not find implicit value for evidence parameter of type
  * (implicit parameter)
  */
import org.apache.flink.streaming.api.scala._
object ReadFile {
// 
  case class SexAndAge (sex:String,age:Int)
  def main(args: Array[String]): Unit = {
    val env : StreamExecutionEnvironment  = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream:DataStream[String]  = env.readTextFile("C:\\Users\\wh\\Desktop\\test.txt")
    val personDataStream = dataStream.map(formatPerson(_))
    // 
    // reduce 
    val counts = personDataStream.keyBy("sex").sum("age")
    counts.print()
    // execute program
    env.execute("Streaming WordCount")
  }
  def formatPerson(mess:String)={
    val split = mess.split(",")
    SexAndAge (split(1),split(2).toInt)
  }
}


入力ファイルは次のとおりです.
wh,m,12
wc,m,11
xds,m,11
hr,f,11
cxy,f,11

テスト結果:
3> SexAndAge(m,34)

1> SexAndAge(f,22)