最初のSparkStreamingプログラムの実行(およびプロセス中の問題解決)

5791 ワード

公式の例の説明
公式ドキュメントのこの例の説明に従って、この例をローカルのspark-shell環境で簡単にテストできます.例として、より良い入門のために、もう一度説明します.この統計単語を実行する方法は3つあり、前の2つは公式ドキュメントのガイドラインで、3つ目はscalaプログラムで実行されます.
  • 第1の態様、run-demo
  • 端末を開き、端末を開き、コマンドnc -lk 9999を入力し、一時的に「nc端末」と呼ぶようにしましょう
  • 端末を再び開く、Spark HOMEディレクトリに切り替え、コマンドbin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999を実行し、毎秒ログループ出力
  • に類似する.
    ------------------------------------------- Time: 1415701382000 ms ------------------------------------------- ------------------------------------------- Time: 1415701383000 ms -------------------------------------------
  • nc端末に任意に文字列を入力し、aaaa bb cのようなスペースで区切られ、車に戻る.上のSpark端末では、新しいコンテンツ出力
  • が見られる.
    ------------------------------------------- Time: 1415701670000 ms ------------------------------------------- (aa,2) (bb,1) (c,1)
    OK、成功!
  • 第2のspark-shellモード
  • 次にspark-shellにscalaコードを入力して実行する方法について説明します.
  • 上記の最初のステップと同じように、1つの端末を開き、1つの端末を開き、コマンドnc -lk 9999を入力し、一時的に「nc端末」と呼ぶようにしましょう
  • .
  • もう1つの端末を開き、Spark HOMEディレクトリの下に切り替え、bin/spark-shell(Sparkがインストールされている場合は、spark-shellを直接入力すればよい)を入力し、Sparkの起動が成功するのを待って、情報
  • を印刷します.
    Spark context available as sc. scala>
    次の文を入力します.
    import org.apache.spark.streaming._ 
    import org.apache.spark.streaming.StreamingContext._
    import org.apache.spark.api.java.function._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.api._
     
    // Create a StreamingContext with a local master
    val ssc = new StreamingContext(sc, Seconds(1))
    
    // Create a DStream that will connect to serverIP:serverPort, like localhost:9999
    val lines = ssc.socketTextStream("localhost", 9999)
    
    // Split each line into words
    val words = lines.flatMap(_.split(" "))
    import org.apache.spark.streaming.StreamingContext._
    
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    
    // Print a few of the counts to the console
    wordCounts.print()
    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
    

    次の情報が印刷されます.
    14/11/11 18:07:23 INFO MemoryStore: ensureFreeSpace(2216) called with curMem=100936, maxMem=278019440 ...... 14/11/11 18:07:23 INFO DAGScheduler: Stage 91 (take at DStream.scala:608) finished in 0.004 s 14/11/11 18:07:23 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.007531701 s ------------------------------------------- Time: 1415700443000 ms -------------------------------------------
  • は第1の方法の第3のステップと同じで、勝手にいくつかの文字列を入力して、スペースで隔てて、aaaa bbcのように車に戻ります.上のSpark端末では、新しいコンテンツ出力
  • が見られる.
    ------------------------------------------- Time: 1415701670000 ms ------------------------------------------- (aa,2) (bb,1) (c,1)
    OK、成功!
  • 第3種scala-ideプログラミング方式
  • このような方法でこのdemoコードを実行するとき、多くの問題に遭遇し、記録して参考にします.この例では、まずここに記録されている方法に従って操作して、実行可能なプログラムを得てください.後で私が遭遇した問題を記録します.
  • scala-ideをダウンロードし、リンクをダウンロードし、For Scala 2.10.4の対応プラットフォームのideをダウンロードし、解凍し、実行します.
  • インストールsbt、ダウンロードリンク、
  • sbteclipse,githubアドレスをインストールし、~/.sbt/0.13/plugins/plugins.sbtファイルを編集し、pluginsディレクトリとpluginsがない場合は、以下の内容addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0")を追加します.sbt、自分で作成します.
  • ウィザードでscalaプロジェクトを作成し、プロジェクトルートディレクトリの下にbuildを作成します.sbtファイル、次の内容を追加(各行の正式な文の後に改行することに注意)
  • name := "spark-test"
    version := "1.0"
    scalaVersion := "2.10.4"
    //set the main class for the main 'run' task//change Compile to Test to set it for 'test:run' mainClass in (Compile, run) := Some("test.SparkTest")
    libraryDependencies += "org.apache.spark"% "spark-streaming_2.10"% "1.1.0"
  • testを作成します.SparkTest.scalaファイル、次のコード
  • を追加
    package test import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkContext import org.apache.spark.api.java.function._ import org.apache.spark.streaming._ import org.apache.spark.streaming.api._
    object SparkTest { def main(args: Array[String]): Unit = {//Create a StreamingContext with a local master//Spark Streaming needs at least two working thread val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10))//Create a DStream that will connect to serverIP:serverPort, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999)//Split each line into words val words = lines.flatMap(.split(""))//Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey( + _) wordCounts.print ssc.start ssc.awaitTermination } }
  • 端末でこのプロジェクトルートディレクトリにディレクトリを切り替え、コマンドsbtを入力し、コマンドの実行に成功した後、eclipseを叩いてeclipseプロジェクトとプロジェクトに必要な依存
  • を生成する.
  • は、第1の態様の第1、3のステップと同様に、もう1つの端末を開き、コマンドnc -lk 9999を入力する.次に、さっき書いたmainプログラムを実行し、nc端末に文字列を入力し、aa aa bb cのようなスペースで区切って、車に戻ります.
  • はideコンソールで観察できます
    ------------------------------------------- Time: 1415701670000 ms ------------------------------------------- (aa,2) (bb,1) (c,1)
    OK、成功!
    次に、問題と解決方法を示します.
    1.メインクラスが見つからないとプログラムを実行
    解:sbtファイルにプライマリクラスがどれであるかを構成していません.build.sbtファイルに以下のコードを追加します.
    mainClass in (Compile, run) := Some("test.SparkTest")
    Someではメインクラスのパスです
    2. java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
    この問題は長い間困っていたので,ずっとどうやって解決するか見つからなかった.その後、scalaがバージョンごとにアップグレードするたびに以前のバージョンでコンパイルされたライブラリと互換性がないということを見て、対応するバージョンのideを交換して正常に動作しました.解:scala-ideバージョンと現在使用されているsparkパッケージ依存コンパイルのscalaバージョンが一致しません.上記のscala-ide For Scala 2.10.4バージョンをダウンロードしてください.