flinkのscalaバージョンのwordcount+flinkが結果を出力しなかったいくつかの原因

5903 ワード

################################################################################
①クラスタの起動
$FLINK_HOME/bin/start-cluster.sh
②ソケットを開く
nc -lk 9999
mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua mua mua2 mua3 mua2 muamua mua2 mua3 mua2 mua mua mua2 mua3 mua2 mua
③コンパイルコマンド:
mvn scala:compile package
④flinkタスクをコミットし、別のsocketを生成
flink run -c WordCount target/scala-module-dependency-sample-1.0-SNAPSHOT.jar --port 9999
 
そして上記2つのsocketはtcpで通信します
####################################################実験結果
出力結果:
$FLINK_HOME/log/flink-appleyuchi-taskexecutor-2-Desktop.out
3> WordWithCount(mua3,2) 4> WordWithCount(mua2,4) 4> WordWithCount(mua2,14) 3> WordWithCount(mua3,7) 3> WordWithCount(mua3,12) 4> WordWithCount(mua2,24) 3> WordWithCount(muamua,1) 3> WordWithCount(muamua,1) 4> WordWithCount(mua2,24) 3> WordWithCount(mua3,12) 3> WordWithCount(muamua,1) 4> WordWithCount(mua2,24) 3> WordWithCount(mua3,12) 3> WordWithCount(muamua,1) 4> WordWithCount(mua2,20) 3> WordWithCount(mua3,10) 3> WordWithCount(muamua,1) 4> WordWithCount(mua2,10) 3> WordWithCount(mua3,5)
################################################################################
├── pom.xml ├── src │   └── main │       └── scala │           └── WordCount.scala└——運行方法.txt
#############################################################
完全なpom.xml:

  4.0.0
  sample
  scala-module-dependency-sample
  1.0-SNAPSHOT


        
            org.scala-lang
            scala-library
            2.11.8
        

        
            org.apache.flink
            flink-scala_2.11
            1.6.2
        
      
        
            org.apache.flink
            flink-streaming-scala_2.11
            1.6.2
        
        
            org.apache.flink
            flink-clients_2.11
            1.6.2
        

    








    
        
            
                org.scala-tools
                maven-scala-plugin
                2.15.2
                
                    
                        scala-compile-first
                        
                            compile
                        
                        
                            
                                **/*.scala
                            
                        
                    
                    
                        scala-test-compile
                        
                            testCompile
                        
                    
                
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    1.8
                    1.8
                
            
        
    





WordCount.scala
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WordCount {
  // Data type for words with count
  case class WordWithCount(word: String, count: Long)

  def main(args: Array[String]): Unit = {
    //         
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //    :  socket   
    val textDataStream = env.socketTextStream("Desktop", 9999, '
') val tupDataStream = textDataStream.flatMap(_.split(" ")).map(WordWithCount(_,1)) //groupby: val windowDstram = tupDataStream.keyBy("word").timeWindow(Time.seconds(5), Time.seconds(1))// bsize=5 , slid=1s windowDstram.sum("count").print() // , env.execute("Socket Window WordCount") } }

#################################################################################
Flinkが実験を実行しても出力結果が見つからない理由は次のとおりです.
①前のjobはcancelが次に影響しないので、flink listを入力してflink cancelで削除できます.
②yarnのqueueリソースがいっぱいです.(flink on yarnモード)
③出力結果yarnのインタフェースにあるノードまたはあるノードの#$FLINK_HOME/ロゴの下
④$FLINK/logの下にあるflink-ユーザ名-taskexecutor-2-desktopを削除しないでください.out,これは新しいjob時に自動的に生成されずstart-allのみである.sh起動時に生成し,削除した場合flinkクラスタを再起動しない限り実験結果は見られない.
⑤クラスタ起動前6123ポートがオフになっていないため、再起動後、タスクをコミットできない
Caused by: java.net.BindException: Could not start actor system on any port in port range 6123
注意タスクをコミットする前に、まず$FLINK_を確認してください.HOME/log下にgrep-ri errorを入力するとerrorがありません