java言語を使ってsparkのwodCount入門プログラムを実現します.
3541 ワード
sparkはscalaを使って書いているので、scalaを使ってsparkプログラムを作るのは正常ですが、ほとんどの場合はjava言語を使ってsparkプログラムを作成します.
scalaを学んだ後にsparkのソースコードを読むことができるためです!
以下はプログラムです
場合によっては、後のフォローが必要です.ネット上にある答えはそのままです.return Arays.asList;
scalaを学んだ後にsparkのソースコードを読むことができるためです!
以下はプログラムです
package com.john.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.junit.Test;
import scala.Serializable;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class JavaSpark01 implements Serializable {
@Test
public void testJavaSpark () {
//1. Spark SparkConf, spark
SparkConf sparkConf = new SparkConf().setAppName("spark_java01").setMaster("local[2]");
//2. SparkContext ,SparkContext Spark (java:JavaSparkContext)
JavaSparkContext sc = new JavaSparkContext(sparkConf);
//
sc.setLogLevel("WARN");
//3.
JavaRDD lineRdd = sc.textFile("D:\\Develop\\spark\\javaSpark\\input");
/**
* 4.
* new FlatMapFunction string
* Override call , Iterable
*
* flatmap spark , rdd rdd
*
*/
JavaRDD wordsRdd = lineRdd.flatMap(new FlatMapFunction() {
public Iterator call(String s) throws Exception {
String[] words = s.split(" ");
return Arrays.asList(words).iterator();
}
});
/**
* 5.
* map , MR map
* pairFunction: T: ;K,V:
* call
*/
JavaPairRDD wordAndOneRdd = wordsRdd.mapToPair(new PairFunction() {
public Tuple2 call(String s) throws Exception {
// :
return new Tuple2(s, 1);
}
});
/**
* 6.
* reduceByKey , MR reduce
* ( ones) KV , key ,
*/
JavaPairRDD wordAndCountRdd = wordAndOneRdd.reduceByKey(new Function2() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//7.
JavaPairRDD sortedRdd = wordAndCountRdd.sortByKey();
//8.collect spark RDD java
List> finalResult = sortedRdd.collect();
//9.
for (Tuple2 tuple2 : finalResult) {
System.out.println(tuple2._1 + "---->" + tuple2._2);
}
//10.
sortedRdd.saveAsTextFile("D:\\Develop\\spark\\javaSpark\\output");
//11. sparkContxt
sc.stop();
}
}
注意が必要です.flatMapメソッドにおけるコール方法におけるリターン結果(return Arays.asList(words).iterator();場合によっては、後のフォローが必要です.ネット上にある答えはそのままです.return Arays.asList;