Spark Javaプログラムケース入門+Spark Window環境構築

31863 ワード

spark+windows環境構築
リンクのダウンロードhttp://spark.apache.org/downloads.html
Spark Java程序案例入门+Spark Window环境搭建_第1张图片
まずscala+windows環境を構築してください
ダウンロード後の環境変数の設定
Spark Java程序案例入门+Spark Window环境搭建_第2张图片
Spark Java程序案例入门+Spark Window环境搭建_第3张图片
Spark Java程序案例入门+Spark Window环境搭建_第4张图片
Spark Java程序案例入门+Spark Window环境搭建_第5张图片
spark-shellエラーの解決方法を実行します.
hadoop各バージョンダウンロードリンクhttps://archive.apache.org/dist/hadoop/common/
hadoop+windows環境変数の構成
Spark Java程序案例入门+Spark Window环境搭建_第6张图片
Spark Java程序案例入门+Spark Window环境搭建_第7张图片
winutils.exeファイルが見つかりません.winutils.exeファイルをダウンロードします.
https://github.com/steveloughran/winutils
Spark Java程序案例入门+Spark Window环境搭建_第8张图片
winutils.exeファイルをhadoopのbinディレクトリにコピー
Spark Java程序案例入门+Spark Window环境搭建_第9张图片
spark-shellの再実行
Spark Java程序案例入门+Spark Window环境搭建_第10张图片
Spark運転モード:
local
standalone
yarn
mesos
Spark実行プロセス:
SparkのApplicationは実行時、まずdriverプログラムでsparkcontextを作成し、これをスケジューリングの総エントリとし、その初期化中にDAGSchedular(Stageスケジューリング)とTaskSchedular(Taskスケジューリング)の2つのモジュールをそれぞれ作成します.DAGSchedularは、Stageベースのスケジューリングモジュールであり、各Spark Jobに対して依存関係を持つ複数のStageタスクフェーズを計算し、各Stageを特定のタスクのセットに分割してTaskSetとして最下位のTaskSchedulerに提出して具体的に実行する.TaskSchedulerは具体的な起動タスクを担当します.
Sparkカーネルのコア用語の解析:
  • Application:SparkContextインスタンスオブジェクトを作成するSparkユーザー.Driverプログラムも含まれます.
  • RDD:Sparkの基本計算ユニットは、パーティション化され、シーケンス化され、可変で、フォールトトレランス機構があり、並列に動作可能なデータの集合である.
  • Job:Sparkのactionに対応し、各actionはマルチタスクの並列計算を含むJobインスタンスに対応します.1つのJobはn個のtransformationと1個のactionを含む.
  • Driver Program:main関数を実行し、SparkContextインスタンスを新規作成するプログラム.
  • 共有変数:Spark Applicationの実行時に、TaskまたはDriverに提供する変数を共有する必要がある場合があります.Sparkは,各ノードにキャッシュできるブロードキャスト変数と,加算のみをサポートし,加算を実現できる累積変数の2種類を提供する.
  • DAGScheduler:Jobに基づいてStageベースのDAGを構築し、TaskschedulerにStageをコミットします.その分割ステージの根拠はRDD間の依存関係である.
  • TaskScheduler:TaskSetをWorker実行にコミットし、各Executorが実行するTaskはここで割り当てられます.
  • Cluster Manager:クラスタリソース管理の外部サービス.Sparkには主にSrandaloneとYarnとMesosの3種類のクラスタリソースマネージャ
  • がある.
  • Worker Node:クラスタ内でApplicationコードを実行できるワークノード.Hadoopのslaveノードに相当します.
  • Executor:ワークノード上でアプリケーションのために開始されたワークプロセスで、タスクの実行を担当し、メモリまたはディスクにデータを格納します.各アプリケーションはWorkerノード上に1つのExecutorしか存在せず、Executor内部でアプリケーションのタスクをマルチスレッドで同時処理します.
  • Task:DriverによってExecutor上の作業ユニットに送られ、Applicationを実行する基本単位です.通常、TaskはSplitのデータを処理します.各Splitは一般的にBlockの速いサイズです.
  • Stage:ジョブは実行前にシステムによって1つまたは複数のタスクに分割され、各タスクはStageと呼ばれます.
  • Persist/Cache:rddのpersistメソッドによりrddのパーティションデータをメモリまたはハードディスクに永続化し、cacheメソッドによりメモリにキャッシュします.
  • Checkpoint:rddを呼び出すcheckpointメソッドは、ハードディスク(HDD)などの外部ストレージにrddを保存します.Sparkがcheckpointメカニズムを導入したのは,永続化したrddデータが失われたり置換されたりする可能性があるためであり,checkpointはこの時点で機能し,再計算を避けることができる.checkpointの作成は、現在のjobが完了した後に別の専門的なjobによって完了します.つまり、checkpointが必要なrddは2回再計算されます.したがってrdd.checkpoint()を使用する場合はrdd.cache()を追加することをお勧めします.これにより、2回目のjobはrddを計算する必要がなくなります.
  • Shuffle:一部のtransformationまたはactionはrddに広い依存を生じさせ、このプロセスは親rddのすべてのパーティションのrecordをshuffleシャッフルしたように、データが分散再編成され、例えばtransformation操作に属するjoinやaction操作に属するreduceなどがshuffleを生じる.
  • ClusterManager:クラスタ上でリソースを取得する外部サービス.現在、
  • の3種類があります.
  • Standalone:Sparkオリジナルのリソースマネージャで、Masterがリソースの割り当てを担当します.
  • Apache Mesos:Hadoop MapReduceとの互換性に優れたリソーススケジューリングフレームワーク.
  • Hadoop Yarn:主にYarnのResourceManagerを指します.

  • Spark快速原因:
  • 統一RDD抽象と操作
  • メモリベースの反復計算
  • DAG
  • 優れたフォールトトレランスメカニズム
  • RDD:フレックス分散データセットResilient Distributed Data set.SparkではRDDの1つが分散オブジェクトの集合である.
    RDDを作成する3つの方法:外部データセットのロード
     ドライバでのコレクションの平行化
    新しいRDDを1つのRDDで作成
    RDDの2種類の操作:
  • 変換Transformation(元のRDDを新しいRDDに構築し、(不活性、lineage血統))
  • 動作Action(RDDにより計算する結果をドライバに返すかhdfsなどの外部ストレージシステムに保存する)
  • .
    注意:SparkのRDDは、デフォルトでは実行するたびに計算を再実行します.複数のアクションで再利用する必要がある場合は、その永続化メソッドRDD.persist()を使用して、最初の計算後、SparkはRDDコンテンツをメモリに格納します.
    不活性:RDDを変換する方法は不活性であり、動作で使用する場合にのみlineageで計算されます.
    Lineage:RDDが新しいRDDに変換されると,Sparkは記録を追跡し,異なるRDD間の依存関係を設定し,この関係が血統図である.この情報は、各RDDを必要に応じて計算し、持続的なRDD損失のデータの一部を復元するために使用される.注意:新しいアクションを呼び出すたびに、RDD全体が最初から計算を開始する必要があります.効率を向上させるために、ユーザーはこれらの中間結果を永続化することができます.
    Spark Streamingリアルタイムストリーム処理フレームワーク
    計算フロー:Spark Streamingは、フロー計算を一連の短いバッチ処理ジョブに分解する.すなわち、Spark Streamingの入力データをbatch size(例えば1秒)に従って一段一段のデータ(DStream)に分割し、各一段のデータをRDDに変換し、Spark StreamingのチームDStreamのTransformation操作をSparkのRDDに対するTransformation操作に変更する.
    1つのRDDには複数のパーティションがあり、各パーティションは1つのdatasetフラグメントである.
    狭依存Narrow Dependency:親RDDの各パーティションは、最大1つの子RDDの1つのパーティションでのみ使用できます.
    ワイド依存Wide Dependency:親RDDの各パーティションは、布団RDDの複数のパーティションによって使用することができる.
    eclipseでローカルモードでSpark API実戦
    //     RDD          RDD
    private static void map() {
    		//       spark
    		SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
    		//   java7 try-with-resources          javasparkcontext
    		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
    			//   RDD    :       
    			// JavaRDD javaRDD = sc.textFile("src/com/wf/main/map.txt");
    			//   RDD    :     
    			JavaRDD javaRDD = sc.parallelize(Arrays.asList(new String[] { "1", "2", "22", "2" }));
    			// Function:String  javaRDD     , Integer          
    			JavaRDD map = javaRDD.map(new Function() {
    				private static final long serialVersionUID = 1L;
    				@Override
    				public Integer call(String arg0) throws Exception {
    					return Integer.parseInt(arg0) + 1;
    				}
    			});
    			//   map   :2->3->23->3->
    			map.foreach(new VoidFunction() {
    				private static final long serialVersionUID = 1L;
    				@Override
    				public void call(Integer arg0) throws Exception {
    					System.out.print(arg0 + "->");
    				}
    			});
    		}
    	}
    
    
    //   RDD           :        
    private static void reduce() {
    		SparkConf conf = new SparkConf().setMaster("local").setAppName("reduce");
    		//   java7 try-with-resources      javasparkcontext
    		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
    			JavaRDD javaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
    			// Function2 Integer javaRDD     
    			Integer reduce = javaRDD.reduce(new Function2() {
    				private static final long serialVersionUID = 1L;
    				@Override
    				public Integer call(Integer arg0, Integer arg1) throws Exception {
    					return arg0 + arg1;
    				}
    			});
    			System.out.println(reduce);
    		}
    	}
    
    //  RDD          ,  true   ,  false   ,      RDD
    private static void filter() {
    		SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
    		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
    			JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 2, 1, 4), 2);
    			JavaRDD filterRDD = javaRDD.filter(new Function() {
    				private static final long serialVersionUID = 1L;
    				@Override
    				public Boolean call(Integer arg0) throws Exception {
    					return arg0 % 2 == 0;
    				}
    			});
    			filterRDD.collect().forEach(new Consumer() {
    				@Override
    				public void accept(Integer t) {
    					System.out.println(t);
    				}
    			});
    		}
    	}
    
    
    // RDD        ,    map
    private static void countByValue() {
    		SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
    		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
    			JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 2, 1, 4));
    			Map countByValue = javaRDD.countByValue();
    			/**
    			 *  5 - 1
    			 *	1 - 2
    			 *	2 - 1
    			 *	3 - 1
    			 *	4 - 1
    			 * 
    			 */
    			for (Map.Entry entry : countByValue.entrySet()) {
    				System.out.println(entry.getKey() + " - " + entry.getValue());
    			}
    		}
    	}
    
    //    map,                 , map      
    private static void flatMap() {
    		SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 2, 1, 4), 2);
    		JavaRDD flatMapRDD = javaRDD.flatMap(new FlatMapFunction() {
    			private static final long serialVersionUID = 1L;
    			@Override
    			public Iterator call(Integer arg0) throws Exception {
    				List list = new ArrayList();
    				list.add(arg0 + 1 + " abc");
    				return list.iterator();
    			}
    		});
    		//[6 abc, 2 abc, 4 abc, 3 abc, 2 abc, 5 abc]
    		System.out.println(flatMapRDD.collect());
    		sc.close();
    	}
    
    
    
    private static void flatMapToDouble() {
    		SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 2, 1, 4), 2);
    		JavaDoubleRDD flatMapToDoubleRDD = javaRDD.flatMapToDouble(new DoubleFlatMapFunction() {
    			private static final long serialVersionUID = 1L;
    			@Override
    			public Iterator call(Integer arg0) throws Exception {
    				Set set = new HashSet();
    				set.add(Double.parseDouble(String.valueOf(arg0 + 5)));
    				return set.iterator();
    			}
    		});
    		//[10.0, 6.0, 8.0, 7.0, 6.0, 9.0]
    		System.out.println(flatMapToDoubleRDD.collect());
    		sc.close();
    	}
    
    private static void groupBy() {
    		SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 2, 1, 4), 2);
    		JavaPairRDD> groupByPairRDD = javaRDD.groupBy(new Function() {
    			private static final long serialVersionUID = 1L;
    			@Override
    			public String call(Integer arg0) throws Exception {
    				return arg0 + 1 + "";
    			}
    		});
    		List>> collect = groupByPairRDD.collect();
    		/*
    		 *  4 : 3 
    		 *	6 : 5 
    		 *	2 : 1 1 
    		 *	5 : 4 
    		 *	3 : 2 
    		 */
    		for (Tuple2> tuple2 : collect) {
    			System.out.print(tuple2._1 + " : ");
    			Iterable iterable = tuple2._2;
    			for (Integer integer : iterable) {
    				System.out.print(integer + " ");
    			}
    			System.out.println();
    		}
    		sc.close();
    	}
    
    
    private static void flatMapToPair() {
    		SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 2, 1, 4), 2);
    		JavaPairRDD flatMapToPairRDD = javaRDD.flatMapToPair(new PairFlatMapFunction() {
    			private static final long serialVersionUID = 1L;
    			@Override
    			public Iterator> call(Integer arg0) throws Exception {
    				Set> set = new HashSet>();
    				Tuple2 tuple2 = new Tuple2(Float.parseFloat(arg0 + 1 + ""), arg0 + 2 + "");
    				set.add(tuple2);
    				return set.iterator();
    			}
    		});
    		/*
    		 * 6.0 7
    		 *	2.0 3
    		 *	4.0 5
    		 *	3.0 4
    		 *	2.0 3
    		 *	5.0 6
    		 */
    		flatMapToPairRDD.collect().forEach(new Consumer>() {
    			@Override
    			public void accept(Tuple2 t) {
    				System.out.println(t._1 + " " + t._2);
    			}
    		});
    		sc.close();
    	}
    
    
    //  RDD         RDD
    private static void intersection() {
    		System.out.println("intersection:");
    		SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		JavaRDD javaRDD1 = sc.parallelize(Arrays.asList(5, 1, 3, 5, 1, 4), 2);
    		JavaRDD javaRDD2 = sc.parallelize(Arrays.asList(5, 2, 3, 1, 5, 5, 6), 4);
    		JavaRDD intersectionRDD = javaRDD1.intersection(javaRDD2); //   
    		//[1, 5, 3]
    		System.out.println(intersectionRDD.collect());
    		sc.close();
    	}
    
    
    	//  RDD         RDD
    private static void distinct() {
    		SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 5, 1, 4), 2);
    		JavaRDD distinct = javaRDD.distinct();
    		distinct.foreach(new VoidFunction() {
    			private static final long serialVersionUID = 1L;
    			@Override
    			public void call(Integer arg0) throws Exception {
    				System.out.print(arg0 + " ");
    			}
    		});
    		sc.close();
    	}
    
    
    private static void takeOrdered() {
    		SparkConf conf = new SparkConf().setAppName("takeOrdered").setMaster("local");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 3, 5, 1, 4), 2);
    		//takeOrdered   1:      ,   2:    
    		List takeOrderedList = javaRDD.takeOrdered(2, new TakeOrderedComparator());
    		takeOrderedList.forEach(new Consumer() {
    			@Override
    			public void accept(Integer t) {
    				System.out.print(t + " ");
    			}
    		});
    		sc.close();
    	}
    
    private static class TakeOrderedComparator implements Comparator,
    			Serializable {
    		private static final long serialVersionUID = 1L;
    		@Override
    		public int compare(Integer o1, Integer o2) {
    			return o1.compareTo(o2);
    		}
    	}
    
    private static void sortBy() {
    		SparkConf conf = new SparkConf().setAppName("sortBy").setMaster("local");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 1, 4, 4, 2, 2), 3);
    		final Random r = new Random(100);
    		JavaRDD mapRDD = javaRDD.map(new Function() {
    			private static final long serialVersionUID = 1L;
    			@Override
    			public String call(Integer arg0) throws Exception {
    				return arg0.toString() + "_" + r.nextInt(100);
    			}
    		});
    		//sortBy    :             : ascending    ,  true      : numPartitions    ,         
    		JavaRDD sortByRDD = mapRDD.sortBy(new Function() {
    			private static final long serialVersionUID = 1L;
    			@Override
    			public Object call(String arg0) throws Exception {
    				return arg0.split("_")[1];
    			}
    		}, false, 3);
    		System.out.println(sortByRDD.collect());
    		sc.close();
    	}
    
    //    RDD  , sql  union  
    private static void union() {
    		System.out.println("union:");
    		SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 1, 4, 4, 2, 2), 3);
    		JavaRDD unionRDD = javaRDD.union(javaRDD);//    rdd       ,   partition    
    		System.out.println(unionRDD.collect());
    		sc.close();
    	}
    
    private static void zipWithIndex() {
    		System.out.println("zipWithIndex:");
    		SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
    		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
    			JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 1, 4, 4, 2, 2), 3);
    			JavaPairRDD zipWithIndexPairRDD = javaRDD.zipWithIndex();
    			// [(5,0),(1,1),(1,2),(4,3),(4,4),(2,5),(8,6)]
    			System.out.println(zipWithIndexPairRDD.collect());
    		}
    	}
    
    
    private static void zipWithUniqueId() {
    		System.out.println("zipWithUniqueId:");
    		SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
    		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
    			//javaRDD     :
    			//  1:5 1  
    			//  2:1 4
    			//  3:4 2 3
    			JavaRDD javaRDD = sc.parallelize(Arrays.asList(5, 1, 1, 4, 4, 2, 2), 3);
    			//zipWithUniqueIdPairRDD key    RDD   , value =        +     *            
    			JavaPairRDD zipWithUniqueIdPairRDD = javaRDD.zipWithUniqueId();
    			// [(5,0),(1,3),(1,1),(4,4),(4,2),(2,5),(2,8)]
    			//    :
    			// 0 + 3 * 0 = 0
    			// 0 + 3 * 1 = 3
    			
    			// 1 + 3 * 0 = 1
    			// 1 + 3 * 1 = 4
    			
    			// 2 + 3 * 0 = 2
    			// 2 + 3 * 1 = 5
    			// 2 + 3 * 2 = 8
    			System.out.println(zipWithUniqueIdPairRDD.collect());
    		}
    	}
    
    private static void mapToPair() {
    		System.out.println("mapToPair:");
    		SparkConf conf = new SparkConf().setAppName("mapToPair").setMaster("local");
    		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
    			JavaRDD javaRDD = sc.parallelize(Arrays.asList(1, 2, 4, 3, 5, 6, 7, 1, 2));
    			//PairFunction 
    			//     Integer:    RDD     
    			//     Integer:    JavaPairRDD key  
    			//     String:    JavaPairRDD value  
    			JavaPairRDD mapToPairRDD = javaRDD.mapToPair(new PairFunction() {
    				private static final long serialVersionUID = 1L;
    				@Override
    				public Tuple2 call(Integer arg0) throws Exception {
    					return new Tuple2(arg0, "1");
    				}
    			});
    			// [(1,1), (2,1), (4,1), (3,1), (5,1), (6,1), (7,1), (1,1), (2,1)]
    			System.out.println(mapToPairRDD.collect());
    		}
    	}
    
    //  pair  key  group By  
    private static void groupByKey() {
    		System.out.println("groupByKey:");
    		SparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local");
    		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
    			JavaRDD javaRDD = sc.parallelize(Arrays.asList(1, 2, 4, 1, 3, 6, 3));
    			JavaPairRDD javaPairRDD = javaRDD.mapToPair(new PairFunction() {
    				private static final long serialVersionUID = 1L;
    				@Override
    				public Tuple2 call(Integer a)
    						throws Exception {
    					return new Tuple2(a, 1);	
    				}
    			});
    			int numPartitions = 2;
    			JavaPairRDD> groupByKeyRDD = javaPairRDD.groupByKey(numPartitions);
    			// [(4,[1]), (6,[1]), (2,[1]), (1,[1, 1]), (3,[1, 1])]
    			System.out.println(groupByKeyRDD.collect());
    			//    partitioner
    			JavaPairRDD> groupByKeyRDD2 = javaPairRDD.groupByKey(new Partitioner() {
    				private static final long serialVersionUID = 1L;
    				// partition  
    				@Override
    				public int numPartitions() {
    					return 10;
    				}
    				// partition  
    				@Override
    				public int getPartition(Object o) {
    					return o.toString().hashCode() % numPartitions();
    				}
    			});
    			//[(2,[1]), (3,[1, 1]), (4,[1]), (6,[1]), (1,[1, 1])]
    			System.out.println(groupByKeyRDD2.collect());
    		}
    	}
    
    private static void coalesce() {//   
    		System.out.println("coalesce:");
    		SparkConf conf = new SparkConf().setAppName("coalesce").setMaster("local");
    		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
    			JavaRDD javaRDD = sc.parallelize(Arrays.asList(1, 2, 4, 3, 5, 6, 7));
    			int numPartitions = 2;
    			JavaRDD coalesceRDD = javaRDD.coalesce(numPartitions); // shuffle   false
    			System.out.println(coalesceRDD.collect());// [1, 2, 4, 3, 5, 6, 7]
    			JavaRDD coalesceRDD2 = javaRDD.coalesce(numPartitions, true);
    			System.out.println(coalesceRDD2.collect());// [1, 4, 5, 7, 2, 3, 6]
    		}
    	}
    
    private static void repartition() {//     
    		SparkConf conf = new SparkConf().setAppName("repartition").setMaster("local");
    		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
    			JavaRDD javaRDD = sc.parallelize(Arrays.asList(1, 2, 4, 3, 5, 6, 7));
    			//   coalesce(numPartitions,shuffle=true)
    			int numPartitions = 2;
    			JavaRDD coalesceRDD = javaRDD.repartition(numPartitions); // shuffle   false
    			System.out.println(coalesceRDD.collect());// [1, 2, 4, 3, 5, 6, 7]
    		}
    	}
    
    
    private static void mapPartitionsWithIndex() {
    		SparkConf conf = new SparkConf().setAppName("mapPartitionsWithIndex").setMaster("local");
    		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
    			JavaRDD javaRDD = sc.parallelize(Arrays.asList("5", "1", "1", "3", "6", "2", "2"), 3);
    			boolean preservesPartitioning = false;
    			JavaRDD mapPartitionsWithIndexRDD = javaRDD.mapPartitionsWithIndex(new Function2, Iterator>() {
    				private static final long serialVersionUID = 1L;
    				@Override
    				public Iterator call(Integer v1, Iterator v2) throws Exception {
    					//v1      v2         
    					LinkedList linkedList = new LinkedList();
    					while (v2.hasNext()) {
    						linkedList.add(v1 + "=" + v2.next());
    					}
    					return linkedList.iterator();
    				}
    			}, preservesPartitioning);
    			// [0=5, 0=1, 1=1, 1=3, 2=6, 2=2, 2=2]
    			System.out.println(mapPartitionsWithIndexRDD.collect());
    		}
    	}
    
    
    private static void countByKey() {
    		System.out.println("countByKey:");
    		SparkConf conf = new SparkConf().setAppName("countByKey").setMaster("local");
    		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
    			JavaRDD javaRDD = sc.parallelize(Arrays.asList("5", "1", "1", "3", "6", "2", "2"), 5);
    			boolean preservesPartitioning = false;
    			JavaRDD mapPartitionsWithIndexRDD = javaRDD.mapPartitionsWithIndex(new Function2, Iterator>() {
    				private static final long serialVersionUID = 1L;
    				@Override
    				public Iterator call(Integer v1, Iterator v2) throws Exception {
    					//v1      v2         
    					LinkedList linkedList = new LinkedList();
    					while (v2.hasNext()) {
    						linkedList.add(v1 + "=" + v2.next());
    					}
    					return linkedList.iterator();
    				}
    			}, preservesPartitioning);
    			// [0=5, 1=1, 2=1, 2=3, 3=6, 4=2, 4=2]
    			System.out.println(mapPartitionsWithIndexRDD.collect());
    			JavaPairRDD mapToPairRDD = javaRDD.mapToPair(new PairFunction() {
    				private static final long serialVersionUID = 1L;
    				@Override
    				public Tuple2 call(String s) throws Exception {
    					return new Tuple2(s, s);
    				}
    			});
    			// [(5,5), (1,1), (1,1),(3,3), (6,6), (2,2), (2,2)]
    			System.out.println(mapToPairRDD.collect());
    			// {5=1, 6=1, 1=2, 2=2, 3=1}
    			System.out.println(mapToPairRDD.countByKey());
    		}
    	}
    
    
    //  pair  value           
    private static void mapValues() {
    		System.out.println("mapValues:");
    		try (JavaSparkContext sc = new JavaSparkContext("local", "mapValues")) {
    			List> list = new ArrayList>();
    			list.add(new Tuple2(1, "once"));
    			list.add(new Tuple2(3, "third"));
    			list.add(new Tuple2(2, "twice"));
    			JavaPairRDD rdd = sc.parallelizePairs(list);
    			JavaPairRDD mapValuesPairRDD = rdd.mapValues(new Function() {
    				private static final long serialVersionUID = 1L;
    				@Override
    				public Object call(String arg0) throws Exception {
    					return arg0 + "..V";
    				}
    			});
    			// [(1,once..V), (3,third..V), (2,twice..V)]
    			System.out.println(mapValuesPairRDD.collect());
    		}
    	}
    
    
    //  sql join  ,   RDD key     join    
    private static void join() {
    		System.out.println("join:");
    		try (JavaSparkContext sc = new JavaSparkContext("local", "join")) {
    			List> list1 = new ArrayList>();
    			list1.add(new Tuple2("a", "11"));
    			list1.add(new Tuple2("b", "22"));
    			list1.add(new Tuple2("a", "13"));
    			list1.add(new Tuple2("c", "4"));
    			JavaPairRDD rdd1 = sc.parallelizePairs(list1);
    
    			List> list2 = new ArrayList>();
    			list2.add(new Tuple2("a", "11"));
    			list2.add(new Tuple2("b", "22"));
    			list2.add(new Tuple2("a", "13"));
    			list2.add(new Tuple2("c", "4"));
    			JavaPairRDD rdd2 = sc.parallelizePairs(list2);
    
    			JavaPairRDD> joinPairRDD = rdd1.join(rdd2);
    			// [(a,(11,11)), (a,(11,13)), (a,(13,11)), (a,(13,13)), (b,(22,22)), (c,(4,4))]
    			System.out.println(joinPairRDD.collect());
    		}
    	}
    
    
    //    RDD key  groupBy,    RDD value    groupBy
    private static void cogroup() {
    		try (JavaSparkContext sc = new JavaSparkContext("local", "cogroup")) {
    			List> list1 = new ArrayList>();
    			list1.add(new Tuple2("a", "11"));
    			list1.add(new Tuple2("b", "22"));
    			list1.add(new Tuple2("a", "13"));
    			list1.add(new Tuple2("d", "4"));
    			JavaPairRDD rdd1 = sc.parallelizePairs(list1);
    
    			List> list2 = new ArrayList>();
    			list2.add(new Tuple2("a", "11"));
    			list2.add(new Tuple2("b", "22"));
    			list2.add(new Tuple2("a", "13"));
    			list2.add(new Tuple2("c", "4"));
    			list2.add(new Tuple2("e", "4"));
    			JavaPairRDD rdd2 = sc.parallelizePairs(list2);
    
    			JavaPairRDD, Iterable>> cogroupPairRDD = rdd1.cogroup((rdd2));
    			// [(d,([4],[])), (e,([],[4])), (a,([11, 13],[11, 13])), (b,([22],[22])), (c,([],[4]))]
    			System.out.println(cogroupPairRDD.collect());
    		}
    	}
    
    //  pair  key   group By  ,                 
    private static void reduceByKey() {
    		System.out.println("cogroup:");
    		try (JavaSparkContext sc = new JavaSparkContext("local", "cogroup")) {
    			List> list = new ArrayList>();
    			list.add(new Tuple2("a", "11"));
    			list.add(new Tuple2("b", "22"));
    			list.add(new Tuple2("a", "13"));
    			list.add(new Tuple2("d", "4"));
    			JavaPairRDD rdd = sc.parallelizePairs(list);
    			JavaPairRDD reduceByKeyPairRDD = rdd.reduceByKey(new Function2() {
    				private static final long serialVersionUID = 1L;
    				@Override
    				public String call(String arg0, String arg1) throws Exception {
    					return arg0 + "|" + arg1;
    				}
    			});
    			// [(d,4), (a,11|13), (b,22)]
    			System.out.println(reduceByKeyPairRDD.collect());
    		}
    	}
    
    private static void accumulator() {
    		System.out.println("accumulator:");
    		try (JavaSparkContext sc = new JavaSparkContext("local", "accumulator")) {
    			JavaRDD rdd = sc.parallelize(Arrays.asList("5", "1", "1", "3", "6", "2", "2"), 5);
    			Accumulator blankLines = sc.accumulator(1);
    			JavaRDD flatMap = rdd.flatMap(new FlatMapFunction() {
    				private static final long serialVersionUID = 1L;
    				@Override
    				public Iterator call(String line)
    						throws Exception {
    					if (line.equals("")) {
    						blankLines.add(1);
    					}
    					return Arrays.asList(line.split(" ")).iterator();
    				}
    			});
    			System.out.println(flatMap.collect());
    			System.out.println(blankLines.value());
    		}
    	}