ゼロからSpark(一)環境構成を学び、WordCountを実現

6512 ワード

環境構成、Spark実現WordCount
私はテンセントの実習に参加するつもりで、ビッグデータと機械について勉強します.本人はビッグデータについて何も知らないので、Sparkを切り口として独学で始め、各アルゴリズムのSparkへの応用を一歩一歩完成させるつもりです.独学の過程の中の少しはすべて記録して、みんなと交流することができることを望んで、共に勉強します.
環境を配合するのは永遠に1つの新しい分野の最も難しい一部を学ぶことを始めて、私は2日の時間を費やして配置してMacOSの下のSpark開発環境に成功して、Scalaとsbtに基づくWordCountを実現して、次に一歩一歩ステップを記録します.
ステップ1:sbtのIntelliJでのプログラミング環境の構成
terminalを開く
Javaバージョンを表示するには、MacOSにjavaが付属しているため、インストールする必要はありません.
$ java -version

sbtをインストールします.これはscalaをコンパイルするツールです.
$ brew install sbt

sbtとscala情報の表示
$ sbt about

IntelliJのダウンロードインストール
Scala Pluginのインストール:IntelliJを開き、プロジェクトインタフェースを選択し、Configure→Plugins→Install JetBrains Pluginsを選択し、Scalaを検索してインストールする
デフォルトSDK:Configure→Project defaults→Project structure、SDK Java 1を選択します.8
これでscalaのIntelliJでの開発環境構成が完了しました
ステップ2:Sparkキットの構成
Sparkのダウンロード:アドレスをダウンロードして、もしすでにHadoopをインストールしたならば対応するバージョンをダウンロードして、以下のコマンドはHadoopバージョンを表示することができます
$ hadoop version

ダウンロードが完了したら解凍して1つのディレクトリの下に置き、/usr/shar/spark-2.1.0-bin-hadoop2.7に置くと仮定すると、環境変数にSparkを追加して使いやすくなります.
$ vim .bash_profile

行を追加し、保存してterminalを再起動すればいいです.
export SPARK_HOME=/usr/shar/spark-2.1.0-bin-hadoop2.7

これでSpark環境の構成が完了し、非常に便利ではないでしょうか.
ステップ3:コマンドライン形式でSparkを操作する
(1) Python Spark
terminalでコマンドを実行する
$ $SPARK_HOME/bin/pyspark

かっこいいSpark logoを見たら成功した
足りないのは、持参したpython shellに自動補完などの機能がなく、ipythonを使って完璧に解決できることです.
まず、ipythonをインストールします
$ pip install ipython

Sparkの実行
$ PYSPARK_DRIVER_PYTHON=ipython $SPARK_HOME/bin/pyspark

SparkのAPIを使ってコマンドを試してみましょう
>>> lines = sc.textFile("README.md") #       lines RDD
>>> lines.count() #   RDD       
127
>>> lines.first()

(2) Scala Spark Shell
$ $SPARK_HOME/bin/spark-shell

同じように行数統計の小さな応用を完成します
scala> val lines = sc.textFile("README.md") //       lines RDD
lines: spark.RDD[String] = MappedRDD[...]
scala> lines.count() //   RDD       
res0: Long = 127
scala> lines.first() //   RDD       ,   README.md     
res1: String = # Apache Spark

ステップ4:Spark独立アプリケーションの構築、WordCount
上はshell形式でSparkを呼び出していますが、今はもっと重要な独立プロジェクトに入っています.私は多くのチュートリアルを見ましたが、各チュートリアルには1歩2歩のあいまいさがあります.あるいは、バージョンが古すぎて、多くの穴が残っています.今、私は走ることができる例をまとめました.
まず、IntelliJでsbtプロジェクトを作成します.IntelliJ→CreateNewProject→Scala→sbt→ProjectName=wordcount→Createを開きます.
修正sbtは、最後にSparkのパッケージを1行追加します.注意scalaVersionは必ず2.11に変更しなければならない.Spark 2のためだ.1.0はScala 2に基づく.11の、デフォルトの2.12は間違っています!
name := "wordcount"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"

まずsbtプロジェクトのディレクトリ構造を見てみましょう
├── build.sbt
├── project
│   ├── build.properties
│   ├── plugins.sbt
│   ├── project
│   └── target
├── src
│   ├── main
│   │   ├── java
│   │   ├── resources
│   │   ├── scala
│   │   └── scala-2.12
│   └── test
│       ├── java
│       ├── resources
│       ├── scala
│       └── scala-2.12
└── target
    ├── resolution-cache
    ├── scala-2.12
    └── streams

私たちが書くコードは主に/src/main/scala に置いてあります.
次に、私たちは私たちのコードを書き始めました.具体的な詳細は深く研究する必要はありません.この章は環境を調整するためだけです.
新規ディレクトリ/src/main/scala/com/oreilly/learningsparkexamples/mini/scala最初のファイル/src/main/scala/com/oreilly/learningsparkexamples/mini/scala/BasicMap.scalaを追加
/**
 * Illustrates a simple map in Scala
 */
package com.oreilly.learningsparkexamples.scala

import org.apache.spark._

object BasicMap {
    def main(args: Array[String]) {
      val master = args.length match {
        case x: Int if x > 0 => args(0)
        case _ => "local"
      }
      val sc = new SparkContext(master, "BasicMap", System.getenv("SPARK_HOME"))
      val input = sc.parallelize(List(1,2,3,4))
      val result = input.map(x => x*x)
      println(result.collect().mkString(","))
    }
}

2番目のファイル/src/main/scala/com/oreilly/learningsparkexamples/mini/scala/WordCount.scalaを追加
/**
 * Illustrates flatMap + countByValue for wordcount.
 */
package com.oreilly.learningsparkexamples.mini.scala

import org.apache.spark._
import org.apache.spark.SparkContext._

object WordCount {
    def main(args: Array[String]) {
      val inputFile = args(0)
      val outputFile = args(1)
      val conf = new SparkConf().setAppName("wordCount")
      // Create a Scala Spark Context.
      val sc = new SparkContext(conf)
      // Load our input data.
      val input =  sc.textFile(inputFile)
      // Split up into words.
      val words = input.flatMap(line => line.split(" "))
      // Transform into word and count.
      val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
      // Save the word count back out to a text file, causing evaluation.
      counts.saveAsTextFile(outputFile)
    }
}

右上のBuild Projectアイコンをクリックするとコンパイルに成功します.エラーがなければ、おめでとうございます.環境構成に成功しました.
ステップ5:spark-submitを使用してアプリケーションを実行する
Spark-submitスクリプトは、Sparkが使用する環境変数の一連を構成します.
まず、私たちがコンパイルしたプロジェクトをパッケージ化する必要があります.最も便利な方法はwordcountディレクトリの下に入って、入力することです.
$ sbt package

パッケージされたファイルは/wordcount/target/scala-2.11/wordcount_2.11-1.0.jarです
次にSparkが提供してくれたspark-submitを利用してアプリケーションを実行し、wordcountディレクトリの下に入ります.
$ $SPARK_HOME/bin/spark-submit \
--class com.oreilly.learningsparkexamples.mini.scala.WordCount  \
./target/scala-2.11/wc_2.11-1.0.jar \
./input.txt ./wordcounts

上のコマンドを簡単に説明します.--classは使用するClassで、後ろはjarパッケージのパスで、最後の2つはwordcountの2つのパラメータで、それぞれ入力ファイルと出力ファイルのパスです.
私たちの入力ファイル\wordcount\input.txtはこうです.
one two three four
four five six
one five six
one one three

実行後、成功すると\wordcount\wordcounts\part-00000に表示されます.
(two,1)
(one,4)
(six,2)
(three,2)
(five,2)
(four,2)

これで、私たちの環境全体の構成は成功しました.問題があれば、伝言を残してください.
参考資料
Spark公式ドキュメントQuick-start
SBTでSparkのWordCountプログラムをコンパイルする
Big Data Analysis with Scala and Sparkローザンヌ連邦工科大学-Coursera