最初のSparkStreamingプログラムの実行(およびプロセス中の問題解決)
5791 ワード
公式の例の説明
公式ドキュメントのこの例の説明に従って、この例をローカルのspark-shell環境で簡単にテストできます.例として、より良い入門のために、もう一度説明します.この統計単語を実行する方法は3つあり、前の2つは公式ドキュメントのガイドラインで、3つ目はscalaプログラムで実行されます.第1の態様、run-demo 端末を開き、端末を開き、コマンド 端末を再び開く、Spark HOMEディレクトリに切り替え、コマンド に類似する.
------------------------------------------- Time: 1415701382000 ms ------------------------------------------- ------------------------------------------- Time: 1415701383000 ms ------------------------------------------- nc端末に任意に文字列を入力し、aaaa bb cのようなスペースで区切られ、車に戻る.上のSpark端末では、新しいコンテンツ出力 が見られる.
------------------------------------------- Time: 1415701670000 ms ------------------------------------------- (aa,2) (bb,1) (c,1)
OK、成功!第2のspark-shellモード 次にspark-shellにscalaコードを入力して実行する方法について説明します.上記の最初のステップと同じように、1つの端末を開き、1つの端末を開き、コマンド .もう1つの端末を開き、Spark HOMEディレクトリの下に切り替え、 を印刷します.
Spark context available as sc. scala>
次の文を入力します.
次の情報が印刷されます.
14/11/11 18:07:23 INFO MemoryStore: ensureFreeSpace(2216) called with curMem=100936, maxMem=278019440 ...... 14/11/11 18:07:23 INFO DAGScheduler: Stage 91 (take at DStream.scala:608) finished in 0.004 s 14/11/11 18:07:23 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.007531701 s ------------------------------------------- Time: 1415700443000 ms -------------------------------------------は第1の方法の第3のステップと同じで、勝手にいくつかの文字列を入力して、スペースで隔てて、aaaa bbcのように車に戻ります.上のSpark端末では、新しいコンテンツ出力 が見られる.
------------------------------------------- Time: 1415701670000 ms ------------------------------------------- (aa,2) (bb,1) (c,1)
OK、成功!第3種scala-ideプログラミング方式 このような方法でこのdemoコードを実行するとき、多くの問題に遭遇し、記録して参考にします.この例では、まずここに記録されている方法に従って操作して、実行可能なプログラムを得てください.後で私が遭遇した問題を記録します. scala-ideをダウンロードし、リンクをダウンロードし、For Scala 2.10.4の対応プラットフォームのideをダウンロードし、解凍し、実行します. インストールsbt、ダウンロードリンク、 sbteclipse,githubアドレスをインストールし、 ウィザードでscalaプロジェクトを作成し、プロジェクトルートディレクトリの下にbuildを作成します.sbtファイル、次の内容を追加(各行の正式な文の後に改行することに注意) name := "spark-test"
version := "1.0"
scalaVersion := "2.10.4"
//set the main class for the main 'run' task//change Compile to Test to set it for 'test:run' mainClass in (Compile, run) := Some("test.SparkTest")
libraryDependencies += "org.apache.spark"% "spark-streaming_2.10"% "1.1.0" testを作成します.SparkTest.scalaファイル、次のコード を追加
package test import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkContext import org.apache.spark.api.java.function._ import org.apache.spark.streaming._ import org.apache.spark.streaming.api._
object SparkTest { def main(args: Array[String]): Unit = {//Create a StreamingContext with a local master//Spark Streaming needs at least two working thread val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10))//Create a DStream that will connect to serverIP:serverPort, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999)//Split each line into words val words = lines.flatMap(.split(""))//Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey( + _) wordCounts.print ssc.start ssc.awaitTermination } }端末でこのプロジェクトルートディレクトリにディレクトリを切り替え、コマンド を生成する.は、第1の態様の第1、3のステップと同様に、もう1つの端末を開き、コマンド はideコンソールで観察できます
------------------------------------------- Time: 1415701670000 ms ------------------------------------------- (aa,2) (bb,1) (c,1)
OK、成功!
次に、問題と解決方法を示します.
1.メインクラスが見つからないとプログラムを実行
解:sbtファイルにプライマリクラスがどれであるかを構成していません.
mainClass in (Compile, run) := Some("test.SparkTest")
Someではメインクラスのパスです
2. java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
この問題は長い間困っていたので,ずっとどうやって解決するか見つからなかった.その後、scalaがバージョンごとにアップグレードするたびに以前のバージョンでコンパイルされたライブラリと互換性がないということを見て、対応するバージョンのideを交換して正常に動作しました.解:scala-ideバージョンと現在使用されているsparkパッケージ依存コンパイルのscalaバージョンが一致しません.上記の
公式ドキュメントのこの例の説明に従って、この例をローカルのspark-shell環境で簡単にテストできます.例として、より良い入門のために、もう一度説明します.この統計単語を実行する方法は3つあり、前の2つは公式ドキュメントのガイドラインで、3つ目はscalaプログラムで実行されます.
nc -lk 9999
を入力し、一時的に「nc端末」と呼ぶようにしましょうbin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999
を実行し、毎秒ログループ出力------------------------------------------- Time: 1415701382000 ms ------------------------------------------- ------------------------------------------- Time: 1415701383000 ms -------------------------------------------
------------------------------------------- Time: 1415701670000 ms ------------------------------------------- (aa,2) (bb,1) (c,1)
OK、成功!
nc -lk 9999
を入力し、一時的に「nc端末」と呼ぶようにしましょうbin/spark-shell
(Sparkがインストールされている場合は、spark-shell
を直接入力すればよい)を入力し、Sparkの起動が成功するのを待って、情報Spark context available as sc. scala>
次の文を入力します.
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.api.java.function._
import org.apache.spark.streaming._
import org.apache.spark.streaming.api._
// Create a StreamingContext with a local master
val ssc = new StreamingContext(sc, Seconds(1))
// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print a few of the counts to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
次の情報が印刷されます.
14/11/11 18:07:23 INFO MemoryStore: ensureFreeSpace(2216) called with curMem=100936, maxMem=278019440 ...... 14/11/11 18:07:23 INFO DAGScheduler: Stage 91 (take at DStream.scala:608) finished in 0.004 s 14/11/11 18:07:23 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.007531701 s ------------------------------------------- Time: 1415700443000 ms -------------------------------------------
------------------------------------------- Time: 1415701670000 ms ------------------------------------------- (aa,2) (bb,1) (c,1)
OK、成功!
~/.sbt/0.13/plugins/plugins.sbt
ファイルを編集し、pluginsディレクトリとpluginsがない場合は、以下の内容addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0")
を追加します.sbt、自分で作成します.version := "1.0"
scalaVersion := "2.10.4"
//set the main class for the main 'run' task//change Compile to Test to set it for 'test:run' mainClass in (Compile, run) := Some("test.SparkTest")
libraryDependencies += "org.apache.spark"% "spark-streaming_2.10"% "1.1.0"
package test import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkContext import org.apache.spark.api.java.function._ import org.apache.spark.streaming._ import org.apache.spark.streaming.api._
object SparkTest { def main(args: Array[String]): Unit = {//Create a StreamingContext with a local master//Spark Streaming needs at least two working thread val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(10))//Create a DStream that will connect to serverIP:serverPort, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999)//Split each line into words val words = lines.flatMap(.split(""))//Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey( + _) wordCounts.print ssc.start ssc.awaitTermination } }
sbt
を入力し、コマンドの実行に成功した後、eclipse
を叩いてeclipseプロジェクトとプロジェクトに必要な依存nc -lk 9999
を入力する.次に、さっき書いたmainプログラムを実行し、nc端末に文字列を入力し、aa aa bb cのようなスペースで区切って、車に戻ります.------------------------------------------- Time: 1415701670000 ms ------------------------------------------- (aa,2) (bb,1) (c,1)
OK、成功!
次に、問題と解決方法を示します.
1.メインクラスが見つからないとプログラムを実行
解:sbtファイルにプライマリクラスがどれであるかを構成していません.
build.sbt
ファイルに以下のコードを追加します.mainClass in (Compile, run) := Some("test.SparkTest")
Someではメインクラスのパスです
2. java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class
この問題は長い間困っていたので,ずっとどうやって解決するか見つからなかった.その後、scalaがバージョンごとにアップグレードするたびに以前のバージョンでコンパイルされたライブラリと互換性がないということを見て、対応するバージョンのideを交換して正常に動作しました.解:scala-ideバージョンと現在使用されているsparkパッケージ依存コンパイルのscalaバージョンが一致しません.上記の
scala-ide For Scala 2.10.4
バージョンをダウンロードしてください.