flinkのscalaバージョンのwordcount+flinkが結果を出力しなかったいくつかの原因
5903 ワード
################################################################################
①クラスタの起動
$FLINK_HOME/bin/start-cluster.sh
②ソケットを開く
nc -lk 9999
mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 muamua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua
③コンパイルコマンド:
mvn scala:compile package
④flinkタスクをコミットし、別のsocketを生成
flink run -c WordCount target/scala-module-dependency-sample-1.0-SNAPSHOT.jar --port 9999
そして上記2つのsocketはtcpで通信します
####################################################実験結果
出力結果:
$FLINK_HOME/log/flink-appleyuchi-taskexecutor-2-Desktop.out
3> WordWithCount(mua3,2) 4> WordWithCount(mua2,4) 4> WordWithCount(mua2,14) 3> WordWithCount(mua3,7) 3> WordWithCount(mua3,12) 4> WordWithCount(mua2,24) 3> WordWithCount(muamua,1) 3> WordWithCount(muamua,1) 4> WordWithCount(mua2,24) 3> WordWithCount(mua3,12) 3> WordWithCount(muamua,1) 4> WordWithCount(mua2,24) 3> WordWithCount(mua3,12) 3> WordWithCount(muamua,1) 4> WordWithCount(mua2,20) 3> WordWithCount(mua3,10) 3> WordWithCount(muamua,1) 4> WordWithCount(mua2,10) 3> WordWithCount(mua3,5)
################################################################################
├── pom.xml ├── src │ └── main │ └── scala │ └── WordCount.scala└——運行方法.txt
#############################################################
完全なpom.xml:
WordCount.scala
#################################################################################
Flinkが実験を実行しても出力結果が見つからない理由は次のとおりです.
①前のjobはcancelが次に影響しないので、flink listを入力してflink cancelで削除できます.
②yarnのqueueリソースがいっぱいです.(flink on yarnモード)
③出力結果yarnのインタフェースにあるノードまたはあるノードの#$FLINK_HOME/ロゴの下
④$FLINK/logの下にあるflink-ユーザ名-taskexecutor-2-desktopを削除しないでください.out,これは新しいjob時に自動的に生成されずstart-allのみである.sh起動時に生成し,削除した場合flinkクラスタを再起動しない限り実験結果は見られない.
⑤クラスタ起動前6123ポートがオフになっていないため、再起動後、タスクをコミットできない
Caused by: java.net.BindException: Could not start actor system on any port in port range 6123
注意タスクをコミットする前に、まず$FLINK_を確認してください.HOME/log下にgrep-ri errorを入力するとerrorがありません
①クラスタの起動
$FLINK_HOME/bin/start-cluster.sh
②ソケットを開く
nc -lk 9999
mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 muamua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua
③コンパイルコマンド:
mvn scala:compile package
④flinkタスクをコミットし、別のsocketを生成
flink run -c WordCount target/scala-module-dependency-sample-1.0-SNAPSHOT.jar --port 9999
そして上記2つのsocketはtcpで通信します
####################################################実験結果
出力結果:
$FLINK_HOME/log/flink-appleyuchi-taskexecutor-2-Desktop.out
3> WordWithCount(mua3,2) 4> WordWithCount(mua2,4) 4> WordWithCount(mua2,14) 3> WordWithCount(mua3,7) 3> WordWithCount(mua3,12) 4> WordWithCount(mua2,24) 3> WordWithCount(muamua,1) 3> WordWithCount(muamua,1) 4> WordWithCount(mua2,24) 3> WordWithCount(mua3,12) 3> WordWithCount(muamua,1) 4> WordWithCount(mua2,24) 3> WordWithCount(mua3,12) 3> WordWithCount(muamua,1) 4> WordWithCount(mua2,20) 3> WordWithCount(mua3,10) 3> WordWithCount(muamua,1) 4> WordWithCount(mua2,10) 3> WordWithCount(mua3,5)
################################################################################
├── pom.xml ├── src │ └── main │ └── scala │ └── WordCount.scala└——運行方法.txt
#############################################################
完全なpom.xml:
4.0.0
sample
scala-module-dependency-sample
1.0-SNAPSHOT
org.scala-lang
scala-library
2.11.8
org.apache.flink
flink-scala_2.11
1.6.2
org.apache.flink
flink-streaming-scala_2.11
1.6.2
org.apache.flink
flink-clients_2.11
1.6.2
org.scala-tools
maven-scala-plugin
2.15.2
scala-compile-first
compile
**/*.scala
scala-test-compile
testCompile
org.apache.maven.plugins
maven-compiler-plugin
1.8
1.8
WordCount.scala
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WordCount {
// Data type for words with count
case class WordWithCount(word: String, count: Long)
def main(args: Array[String]): Unit = {
//
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// : socket
val textDataStream = env.socketTextStream("Desktop", 9999, '
')
val tupDataStream = textDataStream.flatMap(_.split(" ")).map(WordWithCount(_,1))
//groupby:
val windowDstram = tupDataStream.keyBy("word").timeWindow(Time.seconds(5), Time.seconds(1))// bsize=5 , slid=1s
windowDstram.sum("count").print()
// ,
env.execute("Socket Window WordCount")
}
}
#################################################################################
Flinkが実験を実行しても出力結果が見つからない理由は次のとおりです.
①前のjobはcancelが次に影響しないので、flink listを入力してflink cancelで削除できます.
②yarnのqueueリソースがいっぱいです.(flink on yarnモード)
③出力結果yarnのインタフェースにあるノードまたはあるノードの#$FLINK_HOME/ロゴの下
④$FLINK/logの下にあるflink-ユーザ名-taskexecutor-2-desktopを削除しないでください.out,これは新しいjob時に自動的に生成されずstart-allのみである.sh起動時に生成し,削除した場合flinkクラスタを再起動しない限り実験結果は見られない.
⑤クラスタ起動前6123ポートがオフになっていないため、再起動後、タスクをコミットできない
Caused by: java.net.BindException: Could not start actor system on any port in port range 6123
注意タスクをコミットする前に、まず$FLINK_を確認してください.HOME/log下にgrep-ri errorを入力するとerrorがありません