Sparkプログラミング環境構築およびWordCountの例

42378 ワード

Intellij IDEAに基づくSpark開発環境構築
Intellij IDEAに基づくSpark開発環境構築——参考文書
●参考文書http://spark.apache.org/docs/latest/programming-guide.html
●操作手順
・a)mavenプロジェクトの作成
・b)導入依存(Spark依存、パッケージプラグイン等)
Intellij IDEAに基づいてSpark開発環境を構築する-maven vs sbt
●どれを使いこなせるか
●Mavenはscalaプロジェクトの構築も可能
Intellij IDEAに基づくSpark開発環境構築-maven構築scalaプロジェクト
●参考文書http://docs.scala-lang.org/tutorials/scala-with-maven.html
●操作手順
a)mavenでscalaプロジェクトを構築する(net.alchim 31.maven:scala-archetype-simpleに基づく)
Spark编程环境搭建及WordCount实例_第1张图片
Spark编程环境搭建及WordCount实例_第2张图片
Spark编程环境搭建及WordCount实例_第3张图片
Spark编程环境搭建及WordCount实例_第4张图片
Spark编程环境搭建及WordCount实例_第5张图片
b)pom.xml導入依存(spark依存、パッケージプラグインなど)
pom.xmlファイルの適切な場所には、次の内容が追加されます.
<dependencies>
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-core_2.11artifactId>
        <version>2.2.0version>
        <scope>providedscope> //     ,                 
    dependency>
dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.pluginsgroupId>
            <artifactId>maven-shade-pluginartifactId>
            <version>2.4.1version>
            <executions>
                <execution>
                    <phase>packagephase>
                    <goals>
                        <goal>shadegoal>
                    goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">transformer>
                            transformers>
                            <createDependencyReducedPom>falsecreateDependencyReducedPom>
                        configuration>
                execution>
            executions>
        plugin>
    plugins>
build>

パッケージングを1回行い、正常に動作しているかどうかを確認します.Spark编程环境搭建及WordCount实例_第6张图片
Terminalにコマンドを入力してパッケージングを行います.
mvn clean package
      :
D:\Code\JavaCode\sparkMaven>mvn clean package
 [INFO] Scanning for projects...
[INFO]
[INFO] ---------------------< com.zimo.spark:scala-spark >---------------------
[INFO] Building scala-spark 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ scala-spark ---
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ scala-spark ---
[WARNING] Using platform encoding (GBK actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory D:\Code\JavaCode\sparkMaven\src\main\resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ scala-spark ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ scala-spark ---
[WARNING] Using platform encoding (GBK actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory D:\Code\JavaCode\sparkMaven\src\test\resources
[INFO]
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ scala-spark ---
[INFO] No sources to compile
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ scala-spark ---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ scala-spark ---
[WARNING] JAR will be empty - no content was marked for inclusion!
[INFO] Building jar: D:\Code\JavaCode\sparkMaven\target\scala-spark-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-shade-plugin:2.4.1:shade (default) @ scala-spark ---
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing D:\Code\JavaCode\sparkMaven\target\scala-spark-1.0-SNAPSHOT.jar with D:\Code\JavaCode\sparkMaven\target\scala-spark-1.0-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.675 s
[INFO] Finished at: 2018-09-11T15:33:53+08:00
[INFO] ------------------------------------------------------------------------

BUILD SUCCESSが登場し、すべてが正常であることを示した.以下のScalaプログラミングの大まかな流れと,このフレームワークの下で同様にJavaで実現するためにどのように操作すべきかを示す.
Scalaプログラミング実現WordCount
Spark编程环境搭建及WordCount实例_第7张图片
Spark编程环境搭建及WordCount实例_第8张图片
注意:ここではObjectを選択する必要があります.そうしないとmainメソッドはありません.
次に、次のコードを入力し、パッケージング操作を行います.
def main(args: Array[String]): Unit = {
  println("hello spark")
}

Spark编程环境搭建及WordCount实例_第9张图片
完了すると、プロジェクトディレクトリの下にtargetディレクトリが1つ追加されていることがわかります.これがScalaプログラミングを用いた大まかな流れです.次はWordCountプログラムを書きます.(後でJavaプログラミングのバージョンも提供されます)
まず、クラスタに次のディレクトリとテストファイルを作成します.
[hadoop@masternode ~]$ cd /home/hadoop/

[hadoop@masternode ~]$ ll

total 68

drwxr-xr-x. 9 hadoop hadoop  4096 Sep 10 22:15 app

drwxrwxr-x. 6 hadoop hadoop  4096 Aug 17 10:42 data

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Desktop

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Documents

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Downloads

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Music

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Pictures

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Public

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Templates

drwxrwxr-x. 3 hadoop hadoop  4096 Apr 18 10:11 tools

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Videos

-rw-rw-r--. 1 hadoop hadoop 20876 Apr 20 18:03 zookeeper.out

[hadoop@masternode ~]$ mkdir testSpark/

[hadoop@masternode ~]$ ll

total 72

drwxr-xr-x. 9 hadoop hadoop  4096 Sep 10 22:15 app

drwxrwxr-x. 6 hadoop hadoop  4096 Aug 17 10:42 data

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Desktop

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Documents

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Downloads

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Music

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Pictures

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Public

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Templates

drwxrwxr-x. 2 hadoop hadoop  4096 Sep 12 10:23 testSpark

drwxrwxr-x. 3 hadoop hadoop  4096 Apr 18 10:11 tools

drwxr-xr-x. 2 hadoop hadoop  4096 Apr 17 10:03 Videos

-rw-rw-r--. 1 hadoop hadoop 20876 Apr 20 18:03 zookeeper.out

[hadoop@masternode ~]$ cd testSpark/

[hadoop@masternode testSpark]$ vi word.txt

apache hadoop spark scala

apache hadoop spark scala

apache hadoop spark scala

apache hadoop spark scala

  WordCount.scalaコードは次のとおりです(右クリックNewの下に「Scala Class」オプションがない場合は、IDEAにscalaプラグインが追加されているかどうかを確認します).
package com.zimo.spark

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by Zimo on 2018/9/11
  */
object MyWordCount {
  def main(args: Array[String]): Unit = {
    //    
    if (args.length < 2) {
      System.err.println("Usage: myWordCount  ")
      System.exit(1)
    }

    //    
    val input = args(0)
    val output = args(1)

    //  Scala   SparkContext
    val conf = new SparkConf().setAppName("myWordCount")
    val sc = new SparkContext(conf)

    //    
    val lines = sc.textFile(input)

    //      
    lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println) 

    //    

    sc.stop()
  }
}

コードからscalaの利点は簡潔であるが,可読性が悪いことがわかる.したがって,学習は後のjavaコードと比較できる.
そしてパッケージSpark编程环境搭建及WordCount实例_第10张图片
パッケージングが完了したら、上図のファイルをsparkクラスタにアップロードし、実行します.
[hadoop@masternode testSpark]$ rz



[hadoop@masternode testSpark]$ ll

total 8

-rw-r--r--. 1 hadoop hadoop 1936 Sep 12 10:59 scala-spark-1.0-SNAPSHOT.jar

-rw-rw-r--. 1 hadoop hadoop  104 Sep 12 10:26 word.txt

[hadoop@masternode testSpark]$ cd ../app/spark-2.2.0/

[hadoop@masternode spark-2.2.0]$ cd bin/

[hadoop@masternode bin]$ ll

total 92

-rwxr-xr-x. 1 hadoop hadoop 1089 Jul  1  2017 beeline

-rw-r--r--. 1 hadoop hadoop  899 Jul  1  2017 beeline.cmd

-rwxr-xr-x. 1 hadoop hadoop 1933 Jul  1  2017 find-spark-home

-rw-r--r--. 1 hadoop hadoop 1909 Jul  1  2017 load-spark-env.cmd

-rw-r--r--. 1 hadoop hadoop 2133 Jul  1  2017 load-spark-env.sh

-rwxr-xr-x. 1 hadoop hadoop 2989 Jul  1  2017 pyspark

-rw-r--r--. 1 hadoop hadoop 1493 Jul  1  2017 pyspark2.cmd

-rw-r--r--. 1 hadoop hadoop 1002 Jul  1  2017 pyspark.cmd

-rwxr-xr-x. 1 hadoop hadoop 1030 Jul  1  2017 run-example

-rw-r--r--. 1 hadoop hadoop  988 Jul  1  2017 run-example.cmd

-rwxr-xr-x. 1 hadoop hadoop 3196 Jul  1  2017 spark-class

-rw-r--r--. 1 hadoop hadoop 2467 Jul  1  2017 spark-class2.cmd

-rw-r--r--. 1 hadoop hadoop 1012 Jul  1  2017 spark-class.cmd

-rwxr-xr-x. 1 hadoop hadoop 1039 Jul  1  2017 sparkR

-rw-r--r--. 1 hadoop hadoop 1014 Jul  1  2017 sparkR2.cmd

-rw-r--r--. 1 hadoop hadoop 1000 Jul  1  2017 sparkR.cmd

-rwxr-xr-x. 1 hadoop hadoop 3017 Jul  1  2017 spark-shell

-rw-r--r--. 1 hadoop hadoop 1530 Jul  1  2017 spark-shell2.cmd

-rw-r--r--. 1 hadoop hadoop 1010 Jul  1  2017 spark-shell.cmd

-rwxr-xr-x. 1 hadoop hadoop 1065 Jul  1  2017 spark-sql

-rwxr-xr-x. 1 hadoop hadoop 1040 Jul  1  2017 spark-submit

-rw-r--r--. 1 hadoop hadoop 1128 Jul  1  2017 spark-submit2.cmd

-rw-r--r--. 1 hadoop hadoop 1012 Jul  1  2017 spark-submit.cmd
[hadoop@masternode testSpark]$ ./spark-submit --class com.zimo.spark.MyWordCount ~/testSpark/scala-spark-1.0-SNAPSHOT.jar ~/testSpark/word.txt ~/testSpark/

運転結果は下図の通りです:Spark编程环境搭建及WordCount实例_第11张图片
以上の操作は結果を直接印刷するので、次に結果をテキストに保存してみましょう.次のコードを変更します.
//      
//lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)
val resultRDD = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

//    
resultRDD.saveAsTextFile(output)

再実行:
./spark-submit --class com.zimo.spark.MyWordCount ~/testSpark/scala-spark-1.0-SNAPSHOT.jar ~/testSpark/word.txt ~/testSpark/result
//              !

結果は次のとおりです.
[hadoop@masternode testSpark]$ ll

total 5460

drwxrwxr-x. 2 hadoop hadoop    4096 Sep 12 16:02 result

-rw-r--r--. 1 hadoop hadoop 5582827 Sep 12 16:00 scala-spark-1.0-SNAPSHOT.jar

-rw-rw-r--. 1 hadoop hadoop     104 Sep 12 15:52 word.txt

[hadoop@masternode testSpark]$ cd result/

[hadoop@masternode result]$ ll

total 4

-rw-r--r--. 1 hadoop hadoop 42 Sep 12 16:02 part-00000

-rw-r--r--. 1 hadoop hadoop  0 Sep 12 16:02 _SUCCESS

[hadoop@masternode result]$ cat part-00000

(scala,4)

(spark,4)

(hadoop,4)

(apache,4)

**
Javaプログラミング実現WordCount
**
同じディレクトリにjavaディレクトリを新規作成し、「Sources Root」に設定します.
Spark编程环境搭建及WordCount实例_第12张图片
ユニットテストディレクトリtestもjavaフォルダを作成する必要があります.
Spark编程环境搭建及WordCount实例_第13张图片
同様に「Test Sources Root」に設定します.その後、resourcesディレクトリ(プロファイルの保存用)をそれぞれ作成し、「Resources Root」と「Test Resources Root」に設定します.Spark编程环境搭建及WordCount实例_第14张图片
最後に、「com.zimo.spark」パッケージを作成し、下にMyJavaWordCountを新規作成します.Classクラス(右クリックNewの下に「Java Class」オプションがない場合は、ブログを参照してくださいhttps://blog.csdn.net/py_123456/article/details/82628612次の詳細について説明します).コードは次のとおりです.
package com.zimo.spark;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;

/**
 * Created by Zimo on 2018/9/12
 */
public class MyJavaWordCount {
    public static void main(String[] args) {

        //    
        if (args.length < 2) {
            System.err.println("Usage: MyJavaWordCount  ");
            System.exit(1);
        }

        //    
        String input = args[0];
        String output = args[1];

        //  Java   SparkContext
        SparkConf conf = new SparkConf().setAppName("MyJavaWordCount");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //    
        JavaRDD inputRDD = sc.textFile(input);

        //      
        JavaRDD words = inputRDD.flatMap(new FlatMapFunction() {
            @Override
            public Iterator call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });


        JavaPairRDD result = words.mapToPair(new PairFunction() {
            @Override
            public Tuple2 call(String word) throws Exception {
                return new Tuple2(word, 1);
            }
        }).reduceByKey(new Function2() {
            @Override
            public Integer call(Integer x, Integer y) throws Exception {
                return x+y;
            }
        });

        //    
        result.saveAsTextFile(output);

        //  sc
        sc.stop();
    }
}

注意:ここでは少し修正します.コメントを削除するxmlファイルの下にあるここの内容
Spark编程环境搭建及WordCount实例_第15张图片ここではデフォルトのSource ROOTのパスなので、パッケージ化するときはScalaの下のコードしかパッケージ化できませんが、私たちが新しく作ったJavaディレクトリはパッケージ化されません.注釈の後は私たちの前のディレクトリ構成を主とします.その後、パッケージングとクラスタ上の実行操作を実行できます.Scalaプログラミングとそっくりですが、ここでは説明しません.上を参照してください.ただし、outputディレクトリは存在しないディレクトリでなければなりません.実行するたびに変更することを覚えておいてください.
以上はブロガーが皆さんに紹介したこのプレートの主な内容で、これはブロガー自身の学習過程で、皆さんに一定の指導作用をもたらすことを望んで、役に立つのはまたみんなに支持を頼んで、もしあなたに役に立たないならば許して、間違いがあれば指摘してください.期待があれば、ブログの更新を第一時間で取得することができます.ありがとうございます.