スパークテクニック
1827 ワード
1.メッセージサイズの最大値の設定
2.yarnとの結合時にキューを設定する
3.実行時にyarnを使用してリソースを割り当て、--num-executorsパラメータを設定
4.impalaのparquetを読み出し、String列の処理
5.parquetfileの書き込み
6.parquetfileの読み取り
7.ファイルを書くとき、すべての結果を1つのファイルにまとめる
8.rddを繰り返し使用する場合はcacheキャッシュを使用する
9.spark-shell依存パッケージの追加
10.spark-shell yarnモードを使用し、キューを使用
def main(args: Array[String]) {
System.setProperty("spark.akka.frameSize", "1024")
}
2.yarnとの結合時にキューを設定する
val conf=new SparkConf().setAppName("WriteParquet")
conf.set("spark.yarn.queue","wz111")
val sc=new SparkContext(conf)
3.実行時にyarnを使用してリソースを割り当て、--num-executorsパラメータを設定
nohup /home/SASadm/spark-1.4.1-bin-hadoop2.4/bin/spark-submit
--name mergePartition
--class main.scala.week2.mergePartition
--num-executors 30
--master yarn
mergePartition.jar >server.log 2>&1 &
4.impalaのparquetを読み出し、String列の処理
sqlContext.setConf("spark.sql.parquet.binaryAsString","true")
5.parquetfileの書き込み
case class ParquetFormat(usr_id:BigInt , install_ids:String )
val appRdd=sc.textFile("hdfs://").map(_.split("\t")).map(r=>ParquetFormat(r(0).toLong,r(1)))
sqlContext.createDataFrame(appRdd).repartition(1).write.parquet("hdfs://")
6.parquetfileの読み取り
val parquetFile=sqlContext.read.parquet("hdfs://")
parquetFile.registerTempTable("install_running")
val data=sqlContext.sql("select user_id,install_ids from install_running")
data.map(t=>"user_id:"+t(0)+" install_ids:"+t(1)).collect().foreach(println)
7.ファイルを書くとき、すべての結果を1つのファイルにまとめる
repartition(1)
8.rddを繰り返し使用する場合はcacheキャッシュを使用する
cache()
9.spark-shell依存パッケージの追加
spark-1.4.1-bin-hadoop2.4/bin/spark-shell local[4] --jars code.jar
10.spark-shell yarnモードを使用し、キューを使用
spark-1.4.1-bin-hadoop2.4/bin/spark-shell --master yarn-client --queue wz111