flink学習(1)ファイルを読み出して和を求める
2371 ワード
今日flink読み取りファイルのテストプログラムの作成を試みましたが、コードは比較的簡単で、途中で問題に遭遇しました.カスタムパッケージタイプの場合、コンパイルは以下の異常を放出します.
この問題は,プログラムに暗黙的なパラメータ(implicit parameter)が必要であるためである.上記のプログラムで使用したmapのFlinkでの実装を見ることができます.
mapの定義に[R:TypeInformation]がありますが、暗黙的なパラメータに関する定義は指定されていません.この場合、コンパイルコードはTypeInformationを作成できないため、上記の異常情報が表示されます.この問題を解決するには以下の2つの方法がある.
implicit val typeInfo = TypeInformation.of(classOf[SexAndAge])コードをコンパイルしても上記の異常は発生しません.
import org.apache.flink.streaming.api.scala._ 静的データセットの場合、次のパッケージを導入できます.
import org.apache.flink.api.scala._ 問題は解決された
テストのコードは次のとおりです.
入力ファイルは次のとおりです.
テスト結果:
Error:(10, 20) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[whTest.readFile.SexAndAge]
dataStream.map(formatPerson(_)).print()
この問題は,プログラムに暗黙的なパラメータ(implicit parameter)が必要であるためである.上記のプログラムで使用したmapのFlinkでの実装を見ることができます.
def map[R: TypeInformation](fun: T => R): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("Map function must not be null.")
}
val cleanFun = clean(fun)
val mapper = new MapFunction[T, R] {
def map(in: T): R = cleanFun(in)
}
map(mapper)
}
mapの定義に[R:TypeInformation]がありますが、暗黙的なパラメータに関する定義は指定されていません.この場合、コンパイルコードはTypeInformationを作成できないため、上記の異常情報が表示されます.この問題を解決するには以下の2つの方法がある.
(1)、コードに直接以下のコードを加えることができます。
implicit val typeInfo = TypeInformation.of(classOf[SexAndAge])コードをコンパイルしても上記の異常は発生しません.
(2)、公式に推奨される方法は、コードにパッケージを導入することです。
import org.apache.flink.streaming.api.scala._ 静的データセットの場合、次のパッケージを導入できます.
import org.apache.flink.api.scala._ 問題は解決された
テストのコードは次のとおりです.
package whTest
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
/**
* org.apache.flink.streaming.api.scala._
* could not find implicit value for evidence parameter of type
* (implicit parameter)
*/
import org.apache.flink.streaming.api.scala._
object ReadFile {
//
case class SexAndAge (sex:String,age:Int)
def main(args: Array[String]): Unit = {
val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream:DataStream[String] = env.readTextFile("C:\\Users\\wh\\Desktop\\test.txt")
val personDataStream = dataStream.map(formatPerson(_))
//
// reduce
val counts = personDataStream.keyBy("sex").sum("age")
counts.print()
// execute program
env.execute("Streaming WordCount")
}
def formatPerson(mess:String)={
val split = mess.split(",")
SexAndAge (split(1),split(2).toInt)
}
}
入力ファイルは次のとおりです.
wh,m,12
wc,m,11
xds,m,11
hr,f,11
cxy,f,11
テスト結果:
3> SexAndAge(m,34)
1> SexAndAge(f,22)