spark 2.x浅入深底シリーズ六のRDD java apiから詳しく説明する
sparkのどんな技術を学ぶ前に、まずsparkを正しく理解して、参考にすることができます:sparkを正しく理解します
以下に、RDDの3つの作成方法、単一タイプRDDの基本的なtransformation api、サンプリングApi、pipe操作についてjava apiについて説明する.
一、RDDの三つの作成方式安定したファイルストレージシステムから、local file SystemやhdfsなどのRDDが作成される.
2. transformation apiを介して既存のRDDから新しいRDDを作成できます.以下はmapという変換apiです.
3. メモリ内のリストデータからRDDを作成し、RDDのパーティション数を指定できます.指定しない場合は、Executorのすべてのcores数を取得します.
注意:第3のケースでは、scalaにはmakeRDD apiも用意されています.このapiは、spark core RDD scala apiを参照して、RDDの各パーティションを作成するマシンを指定することができます.
二、単一タイプRDD基本的なtransformation api
メモリ内のデータに基づいてRDDを作成する map操作は、integerJavaRDDの各要素にカスタム関数インタフェースを適用することを示し、以下に各要素に1: を追加する.
なお、mapオペレーションは、RDDとは異なるタイプのデータを返すことができ、以下のように、カスタムUserオブジェクトを返すことができる.
以下に、RDDの3つの作成方法、単一タイプRDDの基本的なtransformation api、サンプリングApi、pipe操作についてjava apiについて説明する.
一、RDDの三つの作成方式
// hdfs
JavaRDD textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt");
// , file: ///, ,
// , RDD ,
//
JavaRDD textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt" 2 );
2. transformation apiを介して既存のRDDから新しいRDDを作成できます.以下はmapという変換apiです.
JavaRDD mapRDD = textFileRDD.map(new Function() {
@Override
public String call(String s) throws Exception {
return s + "test";
}
});
System.out.println("mapRDD = " + mapRDD.collect());
3. メモリ内のリストデータからRDDを作成し、RDDのパーティション数を指定できます.指定しない場合は、Executorのすべてのcores数を取得します.
// JavaRDD
JavaRDD integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3, 4), 2);
System.out.println("integerJavaRDD = " + integerJavaRDD.glom().collect());
// Double JavaRDD
JavaDoubleRDD doubleJavaDoubleRDD = sc.parallelizeDoubles(Arrays.asList(2.0, 3.3, 5.6));
System.out.println("doubleJavaDoubleRDD = " + doubleJavaDoubleRDD.collect());
// key-value RDD
import scala.Tuple2;
JavaPairRDD javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3)));
System.out.println("javaPairRDD = " + javaPairRDD.collect());
注意:第3のケースでは、scalaにはmakeRDD apiも用意されています.このapiは、spark core RDD scala apiを参照して、RDDの各パーティションを作成するマシンを指定することができます.
二、単一タイプRDD基本的なtransformation api
メモリ内のデータに基づいてRDDを作成する
JavaRDD integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3), 2);
JavaRDD mapRDD = integerJavaRDD.map(new Function() {
@Override
public Integer call(Integer element) throws Exception {
return element + 1;
}
});
// :[2, 3, 4, 4]
System.out.println("mapRDD = " + mapRDD.collect());
なお、mapオペレーションは、RDDとは異なるタイプのデータを返すことができ、以下のように、カスタムUserオブジェクトを返すことができる.
public class User implements Serializable {
private String userId;
private Integer amount;
public User(String userId, Integer amount) {
this.userId = userId;
this.amount = amount;
}
//getter setter....
@Override
public String toString() {
return "User{" +
"userId='" + userId + '\'' +
", amount=" + amount +
'}';
}
}
JavaRDD userJavaRDD = integerJavaRDD.map(new Function() {
@Override
public User call(Integer element) throws Exception {
if (element
2. flatMap , integerJavaRDD FlatMapFunction, ,flatMap
JavaRDD flatMapJavaRDD = integerJavaRDD.flatMap(new FlatMapFunction() {
@Override
public Iterator call(Integer element) throws Exception {
// list, list 0 element
List list = new ArrayList<>();
int i = 0;
while (i <= element) {
list.add(i);
i++;
}
return list.iterator();
}
});
// : [0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3]
System.out.println("flatMapJavaRDD = " + flatMapJavaRDD.collect());
3. filter では、integerJavaRDDの にカスタムフィルタ を し、 な をフィルタします. のように、1に しくない をフィルタします.JavaRDD filterJavaRDD = integerJavaRDD.filter(new Function() {
@Override
public Boolean call(Integer integer) throws Exception {
return integer != 1;
}
});
// :[2, 3, 3]
System.out.println("filterJavaRDD = " + filterJavaRDD.collect());
4. glom 、integerJavaRDD パーティションに する データを JavaRDD> glomRDD = integerJavaRDD.glom();
// : [[1, 2], [3, 3]], integerJavaRDD , 1 2, 3 3
System.out.println("glomRDD = " + glomRDD.collect());
5. mapPartitions は、integerJavaRDDの パーティションのデータにカスタマイズされた インタフェースメソッドを します. に を する があると します.この の には がかかります.この 、mapPartitionsを すると、 のように きなメリットがあります.// ,
public static Integer getInitNumber(String source) {
System.out.println("get init number from " + source + ", may be take much time........");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
JavaRDD mapPartitionTestRDD = integerJavaRDD.mapPartitions(new FlatMapFunction, Integer>() {
@Override
public Iterator call(Iterator integerIterator) throws Exception {
// ,integerJavaRDD , getInitNumber
// , , mapPartitions , map
Integer initNumber = getInitNumber("mapPartitions");
List list = new ArrayList<>();
while (integerIterator.hasNext()) {
list.add(integerIterator.next() + initNumber);
}
return list.iterator();
}
});
// : [2, 3, 4, 4]
System.out.println("mapPartitionTestRDD = " + mapPartitionTestRDD.collect());
JavaRDD mapInitNumberRDD = integerJavaRDD.map(new Function() {
@Override
public Integer call(Integer integer) throws Exception {
// , integerJavaRDD 4 , getInitNumber 4 , , mapPartitions
Integer initNumber = getInitNumber("map");
return integer + initNumber;
}
});
// :[2, 3, 4, 4]
System.out.println("mapInitNumberRDD = " + mapInitNumberRDD.collect());
6. mapPartitionsWithIndex はintegerJavaRDDの パーティションのデータに して のカスタマイズした インタフェースの を して、 インタフェースの を する パーティションの を って、つまりあなたが しているのが のパーティションのデータであることを っていますJavaRDD mapPartitionWithIndex = integerJavaRDD.mapPartitionsWithIndex(new Function2, Iterator>() {
@Override
public Iterator call(Integer partitionId, Iterator integerIterator) throws Exception {
//partitionId
System.out.println("partition id = " + partitionId);
List list = new ArrayList<>();
while (integerIterator.hasNext()) {
list.add(integerIterator.next() + partitionId);
}
return list.iterator();
}
}, false);
// [1, 2, 4, 4]
System.out.println("mapPartitionWithIndex = " + mapPartitionWithIndex.collect());
、サンプリングApi
メモリ のデータに づいてRDDを するJavaRDD listRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3), 2);
sample // withReplacement
// withReplacement=true ,
// withReplacement=false ,
// :fraction, ,
// 100 ,fraction=0.2, 100 * 0.2 = 20 ,
// 100 0.2; ,
// withReplacement=true fraction>=0
// withReplacement=false 0 sampleRDD = listRDD.sample(false, 0.5, 100);
// : [1, 3]
System.out.println("sampleRDD = " + sampleRDD.collect());
2. randomSplit // RDD , RDD
// , , RDD
JavaRDD[] splitRDDs = listRDD.randomSplit(new double[]{0.4, 0.6});
// 2
System.out.println("splitRDDs.length = " + splitRDDs.length);
// [2, 3]
System.out.println("splitRDD(0) = " + splitRDDs[0].collect());
// [1, 3]
System.out.println("splitRDD(1) = " + splitRDDs[1].collect());
3. takeSample //
// withReplacement
// withReplacement=true ,
// withReplacement=false ,
// ,
[2, 3]
System.out.println(listRDD.takeSample(false, 2));
4. サンプリング、key-valueタイプのRDDをサンプリング// key value RDD
import scala.Tuple2;
JavaPairRDD javaPairRDD =
sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3),
new Tuple2("kkk", 3), new Tuple2("kkk", 3)));
// key
Map fractions = new HashMap<>();
fractions.put("test", 0.5);
fractions.put("kkk", 0.4);
// key
// [(test,3), (kkk,3)]
//sampleByKey ,
System.out.println(javaPairRDD.sampleByKey(true, fractions).collect());
// [(test,3), (kkk,3)]
//sampleByKeyExtra , , 。
System.out.println(javaPairRDD.sampleByKeyExact(true, fractions).collect());
サンプリングの の は、spark core RDD apiを することができる.これらの なものは ではあまり しにくい.
、pipeは、RDD ストリームのあるステップでpythonやshellスクリプトなどの のスクリプトを することを すJavaRDD dataRDD = sc.parallelize(Arrays.asList("hi", "hello", "how", "are", "you"), 2);
// echo.py
Map env = new HashMap<>();
env.put("env", "envtest");
List commands = new ArrayList<>();
commands.add("python");
// spark , echo.py
commands.add("/Users/tangweiqun/spark/source/spark-course/spark-rdd-java/src/main/resources/echo.py");
JavaRDD result = dataRDD.pipe(commands, env);
// : [slave1-hi-envtest, slave1-hello-envtest, slave1-how-envtest, slave1-are-envtest, slave1-you-envtest]
System.out.println(result.collect());
echo.pyの は のとおりです.import sys
import os
#input = "test"
input = sys.stdin
env_keys = os.environ.keys()
env = ""
if "env" in env_keys:
env = os.environ["env"]
for ele in input:
output = "slave1-" + ele.strip('
') + "-" + env
print (output)
input.close
pipeの 、およびどのように するかについて、 :spark core RDD api、この でまたどのように でスクリプトをすべての にコピーする を するかを らかにしました
システム spark:1、[ ]Spark 2.xの Spark Core:https://edu.51cto.com/sd/88429 2、[ ]Spark 2.xの Spark SQL :https://edu.51cto.com/sd/16f3d 3、[ ]Scala シリーズのテーマ:https://edu.51cto.com/sd/8e85b 4、[ ]Spark 2.xの Spark Streamig:https://edu.51cto.com/sd/8c525 5、[ ]Spark 2.x コース:https://edu.51cto.com/sd/ff9a4 6、ScalaからSpark 2.xまでのテーマ:https://edu.51cto.com/sd/d72af