Spark Javaプログラムケース入門+Spark Window環境構築
31863 ワード
spark+windows環境構築
リンクのダウンロードhttp://spark.apache.org/downloads.html
まずscala+windows環境を構築してください
ダウンロード後の環境変数の設定
spark-shellエラーの解決方法を実行します.
hadoop各バージョンダウンロードリンクhttps://archive.apache.org/dist/hadoop/common/
hadoop+windows環境変数の構成
winutils.exeファイルが見つかりません.winutils.exeファイルをダウンロードします.
https://github.com/steveloughran/winutils
winutils.exeファイルをhadoopのbinディレクトリにコピー
spark-shellの再実行
Spark運転モード:
local
standalone
yarn
mesos
Spark実行プロセス:
SparkのApplicationは実行時、まずdriverプログラムでsparkcontextを作成し、これをスケジューリングの総エントリとし、その初期化中にDAGSchedular(Stageスケジューリング)とTaskSchedular(Taskスケジューリング)の2つのモジュールをそれぞれ作成します.DAGSchedularは、Stageベースのスケジューリングモジュールであり、各Spark Jobに対して依存関係を持つ複数のStageタスクフェーズを計算し、各Stageを特定のタスクのセットに分割してTaskSetとして最下位のTaskSchedulerに提出して具体的に実行する.TaskSchedulerは具体的な起動タスクを担当します.
Sparkカーネルのコア用語の解析: Application:SparkContextインスタンスオブジェクトを作成するSparkユーザー.Driverプログラムも含まれます. RDD:Sparkの基本計算ユニットは、パーティション化され、シーケンス化され、可変で、フォールトトレランス機構があり、並列に動作可能なデータの集合である. Job:Sparkのactionに対応し、各actionはマルチタスクの並列計算を含むJobインスタンスに対応します.1つのJobはn個のtransformationと1個のactionを含む. Driver Program:main関数を実行し、SparkContextインスタンスを新規作成するプログラム. 共有変数:Spark Applicationの実行時に、TaskまたはDriverに提供する変数を共有する必要がある場合があります.Sparkは,各ノードにキャッシュできるブロードキャスト変数と,加算のみをサポートし,加算を実現できる累積変数の2種類を提供する. DAGScheduler:Jobに基づいてStageベースのDAGを構築し、TaskschedulerにStageをコミットします.その分割ステージの根拠はRDD間の依存関係である. TaskScheduler:TaskSetをWorker実行にコミットし、各Executorが実行するTaskはここで割り当てられます. Cluster Manager:クラスタリソース管理の外部サービス.Sparkには主にSrandaloneとYarnとMesosの3種類のクラスタリソースマネージャ がある. Worker Node:クラスタ内でApplicationコードを実行できるワークノード.Hadoopのslaveノードに相当します. Executor:ワークノード上でアプリケーションのために開始されたワークプロセスで、タスクの実行を担当し、メモリまたはディスクにデータを格納します.各アプリケーションはWorkerノード上に1つのExecutorしか存在せず、Executor内部でアプリケーションのタスクをマルチスレッドで同時処理します. Task:DriverによってExecutor上の作業ユニットに送られ、Applicationを実行する基本単位です.通常、TaskはSplitのデータを処理します.各Splitは一般的にBlockの速いサイズです. Stage:ジョブは実行前にシステムによって1つまたは複数のタスクに分割され、各タスクはStageと呼ばれます. Persist/Cache:rddのpersistメソッドによりrddのパーティションデータをメモリまたはハードディスクに永続化し、cacheメソッドによりメモリにキャッシュします. Checkpoint:rddを呼び出すcheckpointメソッドは、ハードディスク(HDD)などの外部ストレージにrddを保存します.Sparkがcheckpointメカニズムを導入したのは,永続化したrddデータが失われたり置換されたりする可能性があるためであり,checkpointはこの時点で機能し,再計算を避けることができる.checkpointの作成は、現在のjobが完了した後に別の専門的なjobによって完了します.つまり、checkpointが必要なrddは2回再計算されます.したがってrdd.checkpoint()を使用する場合はrdd.cache()を追加することをお勧めします.これにより、2回目のjobはrddを計算する必要がなくなります. Shuffle:一部のtransformationまたはactionはrddに広い依存を生じさせ、このプロセスは親rddのすべてのパーティションのrecordをshuffleシャッフルしたように、データが分散再編成され、例えばtransformation操作に属するjoinやaction操作に属するreduceなどがshuffleを生じる. ClusterManager:クラスタ上でリソースを取得する外部サービス.現在、 の3種類があります. Standalone:Sparkオリジナルのリソースマネージャで、Masterがリソースの割り当てを担当します. Apache Mesos:Hadoop MapReduceとの互換性に優れたリソーススケジューリングフレームワーク. Hadoop Yarn:主にYarnのResourceManagerを指します.
Spark快速原因:統一RDD抽象と操作 メモリベースの反復計算 DAG 優れたフォールトトレランスメカニズム RDD:フレックス分散データセットResilient Distributed Data set.SparkではRDDの1つが分散オブジェクトの集合である.
RDDを作成する3つの方法:外部データセットのロード
ドライバでのコレクションの平行化
新しいRDDを1つのRDDで作成
RDDの2種類の操作:変換Transformation(元のRDDを新しいRDDに構築し、(不活性、lineage血統)) 動作Action(RDDにより計算する結果をドライバに返すかhdfsなどの外部ストレージシステムに保存する) .
注意:SparkのRDDは、デフォルトでは実行するたびに計算を再実行します.複数のアクションで再利用する必要がある場合は、その永続化メソッドRDD.persist()を使用して、最初の計算後、SparkはRDDコンテンツをメモリに格納します.
不活性:RDDを変換する方法は不活性であり、動作で使用する場合にのみlineageで計算されます.
Lineage:RDDが新しいRDDに変換されると,Sparkは記録を追跡し,異なるRDD間の依存関係を設定し,この関係が血統図である.この情報は、各RDDを必要に応じて計算し、持続的なRDD損失のデータの一部を復元するために使用される.注意:新しいアクションを呼び出すたびに、RDD全体が最初から計算を開始する必要があります.効率を向上させるために、ユーザーはこれらの中間結果を永続化することができます.
Spark Streamingリアルタイムストリーム処理フレームワーク
計算フロー:Spark Streamingは、フロー計算を一連の短いバッチ処理ジョブに分解する.すなわち、Spark Streamingの入力データをbatch size(例えば1秒)に従って一段一段のデータ(DStream)に分割し、各一段のデータをRDDに変換し、Spark StreamingのチームDStreamのTransformation操作をSparkのRDDに対するTransformation操作に変更する.
1つのRDDには複数のパーティションがあり、各パーティションは1つのdatasetフラグメントである.
狭依存Narrow Dependency:親RDDの各パーティションは、最大1つの子RDDの1つのパーティションでのみ使用できます.
ワイド依存Wide Dependency:親RDDの各パーティションは、布団RDDの複数のパーティションによって使用することができる.
eclipseでローカルモードでSpark API実戦
リンクのダウンロードhttp://spark.apache.org/downloads.html
まずscala+windows環境を構築してください
ダウンロード後の環境変数の設定
spark-shellエラーの解決方法を実行します.
hadoop各バージョンダウンロードリンクhttps://archive.apache.org/dist/hadoop/common/
hadoop+windows環境変数の構成
winutils.exeファイルが見つかりません.winutils.exeファイルをダウンロードします.
https://github.com/steveloughran/winutils
winutils.exeファイルをhadoopのbinディレクトリにコピー
spark-shellの再実行
Spark運転モード:
local
standalone
yarn
mesos
Spark実行プロセス:
SparkのApplicationは実行時、まずdriverプログラムでsparkcontextを作成し、これをスケジューリングの総エントリとし、その初期化中にDAGSchedular(Stageスケジューリング)とTaskSchedular(Taskスケジューリング)の2つのモジュールをそれぞれ作成します.DAGSchedularは、Stageベースのスケジューリングモジュールであり、各Spark Jobに対して依存関係を持つ複数のStageタスクフェーズを計算し、各Stageを特定のタスクのセットに分割してTaskSetとして最下位のTaskSchedulerに提出して具体的に実行する.TaskSchedulerは具体的な起動タスクを担当します.
Sparkカーネルのコア用語の解析:
Spark快速原因:
RDDを作成する3つの方法:外部データセットのロード
ドライバでのコレクションの平行化
新しいRDDを1つのRDDで作成
RDDの2種類の操作:
注意:SparkのRDDは、デフォルトでは実行するたびに計算を再実行します.複数のアクションで再利用する必要がある場合は、その永続化メソッドRDD.persist()を使用して、最初の計算後、SparkはRDDコンテンツをメモリに格納します.
不活性:RDDを変換する方法は不活性であり、動作で使用する場合にのみlineageで計算されます.
Lineage:RDDが新しいRDDに変換されると,Sparkは記録を追跡し,異なるRDD間の依存関係を設定し,この関係が血統図である.この情報は、各RDDを必要に応じて計算し、持続的なRDD損失のデータの一部を復元するために使用される.注意:新しいアクションを呼び出すたびに、RDD全体が最初から計算を開始する必要があります.効率を向上させるために、ユーザーはこれらの中間結果を永続化することができます.
Spark Streamingリアルタイムストリーム処理フレームワーク
計算フロー:Spark Streamingは、フロー計算を一連の短いバッチ処理ジョブに分解する.すなわち、Spark Streamingの入力データをbatch size(例えば1秒)に従って一段一段のデータ(DStream)に分割し、各一段のデータをRDDに変換し、Spark StreamingのチームDStreamのTransformation操作をSparkのRDDに対するTransformation操作に変更する.
1つのRDDには複数のパーティションがあり、各パーティションは1つのdatasetフラグメントである.
狭依存Narrow Dependency:親RDDの各パーティションは、最大1つの子RDDの1つのパーティションでのみ使用できます.
ワイド依存Wide Dependency:親RDDの各パーティションは、布団RDDの複数のパーティションによって使用することができる.
eclipseでローカルモードでSpark API実戦
// RDD RDD
private static void map() {
// spark
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
// java7 try-with-resources javasparkcontext
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
// RDD :
// JavaRDD javaRDD = sc.textFile("src/com/wf/main/map.txt");
// RDD :
JavaRDD javaRDD = sc.parallelize(Arrays.asList(new String[] { "1", "2", "22", "2" }));
// Function:String javaRDD , Integer
JavaRDD map = javaRDD.map(new Function() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(String arg0) throws Exception {
return Integer.parseInt(arg0) + 1;
}
});
// map :2->3->23->3->
map.foreach(new VoidFunction() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer arg0) throws Exception {
System.out.print(arg0 + "->");
}
});
}
}
// RDD :
private static void reduce() {
SparkConf conf = new SparkConf().setMaster("local").setAppName("reduce");
// java7 try-with-resources javasparkcontext
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
JavaRDD javaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
// Function2 Integer javaRDD
Integer reduce = javaRDD.reduce(new Function2() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer arg0, Integer arg1) throws Exception {
return arg0 + arg1;
}
});
System.out.println(reduce);
}
}
// RDD , true , false , RDD
private static void filter() {
SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 2, 1, 4), 2);
JavaRDD filterRDD = javaRDD.filter(new Function() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Integer arg0) throws Exception {
return arg0 % 2 == 0;
}
});
filterRDD.collect().forEach(new Consumer() {
@Override
public void accept(Integer t) {
System.out.println(t);
}
});
}
}
// RDD , map
private static void countByValue() {
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 2, 1, 4));
Map countByValue = javaRDD.countByValue();
/**
* 5 - 1
* 1 - 2
* 2 - 1
* 3 - 1
* 4 - 1
*
*/
for (Map.Entry entry : countByValue.entrySet()) {
System.out.println(entry.getKey() + " - " + entry.getValue());
}
}
}
// map, , map
private static void flatMap() {
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 2, 1, 4), 2);
JavaRDD flatMapRDD = javaRDD.flatMap(new FlatMapFunction() {
private static final long serialVersionUID = 1L;
@Override
public Iterator call(Integer arg0) throws Exception {
List list = new ArrayList();
list.add(arg0 + 1 + " abc");
return list.iterator();
}
});
//[6 abc, 2 abc, 4 abc, 3 abc, 2 abc, 5 abc]
System.out.println(flatMapRDD.collect());
sc.close();
}
private static void flatMapToDouble() {
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 2, 1, 4), 2);
JavaDoubleRDD flatMapToDoubleRDD = javaRDD.flatMapToDouble(new DoubleFlatMapFunction() {
private static final long serialVersionUID = 1L;
@Override
public Iterator call(Integer arg0) throws Exception {
Set set = new HashSet();
set.add(Double.parseDouble(String.valueOf(arg0 + 5)));
return set.iterator();
}
});
//[10.0, 6.0, 8.0, 7.0, 6.0, 9.0]
System.out.println(flatMapToDoubleRDD.collect());
sc.close();
}
private static void groupBy() {
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 2, 1, 4), 2);
JavaPairRDD> groupByPairRDD = javaRDD.groupBy(new Function() {
private static final long serialVersionUID = 1L;
@Override
public String call(Integer arg0) throws Exception {
return arg0 + 1 + "";
}
});
List>> collect = groupByPairRDD.collect();
/*
* 4 : 3
* 6 : 5
* 2 : 1 1
* 5 : 4
* 3 : 2
*/
for (Tuple2> tuple2 : collect) {
System.out.print(tuple2._1 + " : ");
Iterable iterable = tuple2._2;
for (Integer integer : iterable) {
System.out.print(integer + " ");
}
System.out.println();
}
sc.close();
}
private static void flatMapToPair() {
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 2, 1, 4), 2);
JavaPairRDD flatMapToPairRDD = javaRDD.flatMapToPair(new PairFlatMapFunction() {
private static final long serialVersionUID = 1L;
@Override
public Iterator> call(Integer arg0) throws Exception {
Set> set = new HashSet>();
Tuple2 tuple2 = new Tuple2(Float.parseFloat(arg0 + 1 + ""), arg0 + 2 + "");
set.add(tuple2);
return set.iterator();
}
});
/*
* 6.0 7
* 2.0 3
* 4.0 5
* 3.0 4
* 2.0 3
* 5.0 6
*/
flatMapToPairRDD.collect().forEach(new Consumer>() {
@Override
public void accept(Tuple2 t) {
System.out.println(t._1 + " " + t._2);
}
});
sc.close();
}
// RDD RDD
private static void intersection() {
System.out.println("intersection:");
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD javaRDD1 = sc.parallelize(Arrays.asList(5, 1, 3, 5, 1, 4), 2);
JavaRDD javaRDD2 = sc.parallelize(Arrays.asList(5, 2, 3, 1, 5, 5, 6), 4);
JavaRDD intersectionRDD = javaRDD1.intersection(javaRDD2); //
//[1, 5, 3]
System.out.println(intersectionRDD.collect());
sc.close();
}
// RDD RDD
private static void distinct() {
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 5, 1, 4), 2);
JavaRDD distinct = javaRDD.distinct();
distinct.foreach(new VoidFunction() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer arg0) throws Exception {
System.out.print(arg0 + " ");
}
});
sc.close();
}
private static void takeOrdered() {
SparkConf conf = new SparkConf().setAppName("takeOrdered").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 5, 1, 4), 2);
//takeOrdered 1: , 2:
List takeOrderedList = javaRDD.takeOrdered(2, new TakeOrderedComparator());
takeOrderedList.forEach(new Consumer() {
@Override
public void accept(Integer t) {
System.out.print(t + " ");
}
});
sc.close();
}
private static class TakeOrderedComparator implements Comparator,
Serializable {
private static final long serialVersionUID = 1L;
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
}
private static void sortBy() {
SparkConf conf = new SparkConf().setAppName("sortBy").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 1, 4, 4, 2, 2), 3);
final Random r = new Random(100);
JavaRDD mapRDD = javaRDD.map(new Function() {
private static final long serialVersionUID = 1L;
@Override
public String call(Integer arg0) throws Exception {
return arg0.toString() + "_" + r.nextInt(100);
}
});
//sortBy : : ascending , true : numPartitions ,
JavaRDD sortByRDD = mapRDD.sortBy(new Function() {
private static final long serialVersionUID = 1L;
@Override
public Object call(String arg0) throws Exception {
return arg0.split("_")[1];
}
}, false, 3);
System.out.println(sortByRDD.collect());
sc.close();
}
// RDD , sql union
private static void union() {
System.out.println("union:");
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 1, 4, 4, 2, 2), 3);
JavaRDD unionRDD = javaRDD.union(javaRDD);// rdd , partition
System.out.println(unionRDD.collect());
sc.close();
}
private static void zipWithIndex() {
System.out.println("zipWithIndex:");
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 1, 4, 4, 2, 2), 3);
JavaPairRDD zipWithIndexPairRDD = javaRDD.zipWithIndex();
// [(5,0),(1,1),(1,2),(4,3),(4,4),(2,5),(8,6)]
System.out.println(zipWithIndexPairRDD.collect());
}
}
private static void zipWithUniqueId() {
System.out.println("zipWithUniqueId:");
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
//javaRDD :
// 1:5 1
// 2:1 4
// 3:4 2 3
JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 1, 4, 4, 2, 2), 3);
//zipWithUniqueIdPairRDD key RDD , value = + *
JavaPairRDD zipWithUniqueIdPairRDD = javaRDD.zipWithUniqueId();
// [(5,0),(1,3),(1,1),(4,4),(4,2),(2,5),(2,8)]
// :
// 0 + 3 * 0 = 0
// 0 + 3 * 1 = 3
// 1 + 3 * 0 = 1
// 1 + 3 * 1 = 4
// 2 + 3 * 0 = 2
// 2 + 3 * 1 = 5
// 2 + 3 * 2 = 8
System.out.println(zipWithUniqueIdPairRDD.collect());
}
}
private static void mapToPair() {
System.out.println("mapToPair:");
SparkConf conf = new SparkConf().setAppName("mapToPair").setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
JavaRDD javaRDD = sc.parallelize(Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2));
//PairFunction
// Integer: RDD
// Integer: JavaPairRDD key
// String: JavaPairRDD value
JavaPairRDD mapToPairRDD = javaRDD.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Integer arg0) throws Exception {
return new Tuple2(arg0, "1");
}
});
// [(1,1), (2,1), (4,1), (3,1), (5,1), (6,1), (7,1), (1,1), (2,1)]
System.out.println(mapToPairRDD.collect());
}
}
// pair key group By
private static void groupByKey() {
System.out.println("groupByKey:");
SparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
JavaRDD javaRDD = sc.parallelize(Arrays.asList(1, 2, 4, 1, 3, 6, 3));
JavaPairRDD javaPairRDD = javaRDD.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(Integer a)
throws Exception {
return new Tuple2(a, 1);
}
});
int numPartitions = 2;
JavaPairRDD> groupByKeyRDD = javaPairRDD.groupByKey(numPartitions);
// [(4,[1]), (6,[1]), (2,[1]), (1,[1, 1]), (3,[1, 1])]
System.out.println(groupByKeyRDD.collect());
// partitioner
JavaPairRDD> groupByKeyRDD2 = javaPairRDD.groupByKey(new Partitioner() {
private static final long serialVersionUID = 1L;
// partition
@Override
public int numPartitions() {
return 10;
}
// partition
@Override
public int getPartition(Object o) {
return o.toString().hashCode() % numPartitions();
}
});
//[(2,[1]), (3,[1, 1]), (4,[1]), (6,[1]), (1,[1, 1])]
System.out.println(groupByKeyRDD2.collect());
}
}
private static void coalesce() {//
System.out.println("coalesce:");
SparkConf conf = new SparkConf().setAppName("coalesce").setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
JavaRDD javaRDD = sc.parallelize(Arrays.asList(1, 2, 4, 3, 5, 6, 7));
int numPartitions = 2;
JavaRDD coalesceRDD = javaRDD.coalesce(numPartitions); // shuffle false
System.out.println(coalesceRDD.collect());// [1, 2, 4, 3, 5, 6, 7]
JavaRDD coalesceRDD2 = javaRDD.coalesce(numPartitions, true);
System.out.println(coalesceRDD2.collect());// [1, 4, 5, 7, 2, 3, 6]
}
}
private static void repartition() {//
SparkConf conf = new SparkConf().setAppName("repartition").setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
JavaRDD javaRDD = sc.parallelize(Arrays.asList(1, 2, 4, 3, 5, 6, 7));
// coalesce(numPartitions,shuffle=true)
int numPartitions = 2;
JavaRDD coalesceRDD = javaRDD.repartition(numPartitions); // shuffle false
System.out.println(coalesceRDD.collect());// [1, 2, 4, 3, 5, 6, 7]
}
}
private static void mapPartitionsWithIndex() {
SparkConf conf = new SparkConf().setAppName("mapPartitionsWithIndex").setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
JavaRDD javaRDD = sc.parallelize(Arrays.asList("5", "1", "1", "3", "6", "2", "2"), 3);
boolean preservesPartitioning = false;
JavaRDD mapPartitionsWithIndexRDD = javaRDD.mapPartitionsWithIndex(new Function2, Iterator>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator call(Integer v1, Iterator v2) throws Exception {
//v1 v2
LinkedList linkedList = new LinkedList();
while (v2.hasNext()) {
linkedList.add(v1 + "=" + v2.next());
}
return linkedList.iterator();
}
}, preservesPartitioning);
// [0=5, 0=1, 1=1, 1=3, 2=6, 2=2, 2=2]
System.out.println(mapPartitionsWithIndexRDD.collect());
}
}
private static void countByKey() {
System.out.println("countByKey:");
SparkConf conf = new SparkConf().setAppName("countByKey").setMaster("local");
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
JavaRDD javaRDD = sc.parallelize(Arrays.asList("5", "1", "1", "3", "6", "2", "2"), 5);
boolean preservesPartitioning = false;
JavaRDD mapPartitionsWithIndexRDD = javaRDD.mapPartitionsWithIndex(new Function2, Iterator>() {
private static final long serialVersionUID = 1L;
@Override
public Iterator call(Integer v1, Iterator v2) throws Exception {
//v1 v2
LinkedList linkedList = new LinkedList();
while (v2.hasNext()) {
linkedList.add(v1 + "=" + v2.next());
}
return linkedList.iterator();
}
}, preservesPartitioning);
// [0=5, 1=1, 2=1, 2=3, 3=6, 4=2, 4=2]
System.out.println(mapPartitionsWithIndexRDD.collect());
JavaPairRDD mapToPairRDD = javaRDD.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(String s) throws Exception {
return new Tuple2(s, s);
}
});
// [(5,5), (1,1), (1,1),(3,3), (6,6), (2,2), (2,2)]
System.out.println(mapToPairRDD.collect());
// {5=1, 6=1, 1=2, 2=2, 3=1}
System.out.println(mapToPairRDD.countByKey());
}
}
// pair value
private static void mapValues() {
System.out.println("mapValues:");
try (JavaSparkContext sc = new JavaSparkContext("local", "mapValues")) {
List> list = new ArrayList>();
list.add(new Tuple2(1, "once"));
list.add(new Tuple2(3, "third"));
list.add(new Tuple2(2, "twice"));
JavaPairRDD rdd = sc.parallelizePairs(list);
JavaPairRDD mapValuesPairRDD = rdd.mapValues(new Function() {
private static final long serialVersionUID = 1L;
@Override
public Object call(String arg0) throws Exception {
return arg0 + "..V";
}
});
// [(1,once..V), (3,third..V), (2,twice..V)]
System.out.println(mapValuesPairRDD.collect());
}
}
// sql join , RDD key join
private static void join() {
System.out.println("join:");
try (JavaSparkContext sc = new JavaSparkContext("local", "join")) {
List> list1 = new ArrayList>();
list1.add(new Tuple2("a", "11"));
list1.add(new Tuple2("b", "22"));
list1.add(new Tuple2("a", "13"));
list1.add(new Tuple2("c", "4"));
JavaPairRDD rdd1 = sc.parallelizePairs(list1);
List> list2 = new ArrayList>();
list2.add(new Tuple2("a", "11"));
list2.add(new Tuple2("b", "22"));
list2.add(new Tuple2("a", "13"));
list2.add(new Tuple2("c", "4"));
JavaPairRDD rdd2 = sc.parallelizePairs(list2);
JavaPairRDD> joinPairRDD = rdd1.join(rdd2);
// [(a,(11,11)), (a,(11,13)), (a,(13,11)), (a,(13,13)), (b,(22,22)), (c,(4,4))]
System.out.println(joinPairRDD.collect());
}
}
// RDD key groupBy, RDD value groupBy
private static void cogroup() {
try (JavaSparkContext sc = new JavaSparkContext("local", "cogroup")) {
List> list1 = new ArrayList>();
list1.add(new Tuple2("a", "11"));
list1.add(new Tuple2("b", "22"));
list1.add(new Tuple2("a", "13"));
list1.add(new Tuple2("d", "4"));
JavaPairRDD rdd1 = sc.parallelizePairs(list1);
List> list2 = new ArrayList>();
list2.add(new Tuple2("a", "11"));
list2.add(new Tuple2("b", "22"));
list2.add(new Tuple2("a", "13"));
list2.add(new Tuple2("c", "4"));
list2.add(new Tuple2("e", "4"));
JavaPairRDD rdd2 = sc.parallelizePairs(list2);
JavaPairRDD, Iterable