SparkSQLデータ傾斜解決実戦紹介(HiveSQLに適用)
一:データの傾きはどのような状況ですか.
データの傾斜が発生する場合:1、shuffleの場合、このshuffleを生成するフィールドが空の場合、データ傾き が発生する.2、keyは多く、パーティション数の設定が少なすぎて、多くのkeyが1つのパーティションに集まってデータの傾き が発生した.3、あるテーブルのkeyデータが特に多い場合、groupbyを使用するとデータの傾き が発生する.4、大きい表join小さい表、この2つの表の中のある表はあるkeyあるいはあるいくつかのkeyデータが比較的に多くて、データの傾斜 が現れます5、大きい表join大きい表、その中のある表の分布は比較的に均一で、もう一つの表はある1つあるいはあるいくつかのkeyデータが特に多く、データの傾斜 も現れる.6、大きい表join大きい表、その中のある表の分布は比較的に均一で、もう一つの表は多くのkeyデータが特に多く存在し、データの傾斜 も現れる.
二:sqlを使用してデータの傾斜をどのように解決しますか?
1.第一のケース:shuffleの場合、このshuffleを生成するフィールドが空の場合、データの傾きが発生する
ソリューション:空の値をフィルタ
2.第2のケース:keyが多く、パーティション数の設定が少なすぎて、多くのkeyが1つのパーティションに集まってデータの傾きが発生する
ソリューション:パーティション数の構成パラメータの変更
3.第3のケース:あるテーブルのkeyデータが特に多い場合、groupbyを使用するとデータの傾きが発生します.
ソリューション:ローカル集約+グローバル集約
4.4つ目のケース:大きなテーブルjoin小さなテーブルで、この2つのテーブルのいずれかのテーブルに1つのkeyまたはいくつかのkeyデータが多く、データの傾きが発生します
ソリューション:小さなテーブルをブロードキャストすると、元のreduce joinからmap joinに変わり、shuffle操作を回避し、データの傾きを回避します.
5.第5のiの場合:大きい表join大きい表、その中のある表の分布は比較的に均一で、別の表はある1つあるいはあるいくつかのkeyデータが特に多くて、データの傾斜も現れる
ソリューションの手順:
1、データ傾斜を生じたkeyをろ過して単独処理[塩を加える]と拡容(1つまたは少量のkeyのみ傾斜が発生し、拡容可能な倍数10-50倍)2、データ傾斜keyのないデータは通常通り処理する
6.第6のケース:大きいテーブルjoin大きいテーブル、その中のあるテーブルの分布は比較的に均一で、別のテーブルは多くのkeyデータが特に多く存在して、データの傾斜も現れる
解決策:データの傾斜を生じたテーブルを直接塩[0-9]を加え、別のテーブルを拡張し、keyデータが特に多く[最大10倍拡張]、そうでなければメモリを浪費する
データの傾斜が発生する場合:
二:sqlを使用してデータの傾斜をどのように解決しますか?
1.第一のケース:shuffleの場合、このshuffleを生成するフィールドが空の場合、データの傾きが発生する
ソリューション:空の値をフィルタ
import org.apache.spark.sql.SparkSession
//1、shuffle , shuffle ,
// :
object ProcessData {
def main(args: Array[String]): Unit = {
//
val spark = SparkSession.builder().appName("parcess").master("local[4]").getOrCreate()
import spark.implicits._
// 1
spark.sparkContext.parallelize(Seq[(Int,String,Int,String)](
(1,"aa",20,""), (2,"bb",20,""), (3,"vv",20,""), (4,"dd",20,""), (5,"ee",20,""),
(6,"ss",20,""), (7,"uu",20,""), (8,"qq",20,""), (9,"ww",20,""), (10,"rr",20,""),
(11,"tt",20,""), (12,"xx",20,"class_02"), (13,"kk",20,"class_03"), (14,"oo",20,"class_01"),
(15,"pp",20,"class_01")
)).toDF("id","name","age","clazzId")
// ,
.filter("clazzId != ''")
.createOrReplaceTempView("student")
// 2
spark.sparkContext.parallelize(Seq[(String,String)](
("class_01","java"), ("class_02","python"), ("class_03","hive")
)).toDF("clazzId","name")
.createOrReplaceTempView("class")
//
spark.sql(
"""
|select s.id,s.name,s.age,c.name
| from student s left join class c
| on s.clazzId = c.clazzId
""".stripMargin).show()
/**
*
* +---+----+---+------+
| id|name|age| name|
+---+----+---+------+
| 14| oo| 20| java|
| 15| pp| 20| java|
| 12| xx| 20|python|
| 13| kk| 20| hive|
| 1| aa| 20| null|
| 2| bb| 20| null|
| 3| vv| 20| null|
| 4| dd| 20| null|
| 5| ee| 20| null|
| 6| ss| 20| null|
| 7| uu| 20| null|
| 8| qq| 20| null|
| 9| ww| 20| null|
| 10| rr| 20| null|
| 11| tt| 20| null|
+---+----+---+------+
| id|name|age| name|
+---+----+---+------+
| 14| oo| 20| java|
| 15| pp| 20| java|
| 12| xx| 20|python|
| 13| kk| 20| hive|
+---+----+---+------+
*/
}
}
2.第2のケース:keyが多く、パーティション数の設定が少なすぎて、多くのkeyが1つのパーティションに集まってデータの傾きが発生する
ソリューション:パーティション数の構成パラメータの変更
shuffle :
#spark sql shuffle
spark.sql.shuffle.partitions="200"
#spark core shuffle
spark.default.parallelism=200
3.第3のケース:あるテーブルのkeyデータが特に多い場合、groupbyを使用するとデータの傾きが発生します.
ソリューション:ローカル集約+グローバル集約
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Random
//3. : key , group by
// : +
object ProcessData {
def main(args: Array[String]): Unit = {
//
val spark = SparkSession.builder().appName("parcess").master("local[4]").getOrCreate()
import spark.implicits._
// 1
val source: DataFrame = spark.sparkContext.parallelize(Seq[(Int, String, Int, String)](
(1, "aa", 20, "class_01"), (2, "bb", 20, "class_01"), (3, "vv", 20, "class_01"), (4, "dd", 20, "class_01"), (5, "ee", 20, "class_01"),
(6, "ss", 20, "class_01"), (7, "uu", 20, "class_01"), (8, "qq", 20, "class_01"), (9, "ww", 20, "class_01"), (10, "rr", 20, "class_01"),
(11, "tt", 20, "class_01"), (12, "xx", 20, "class_02"), (13, "kk", 20, "class_03"), (14, "oo", 20, "class_01"),
(15, "pp", 20, "class_01")
)).toDF("id", "name", "age", "clazzId")
//2. udf , clazzId #
val addPrefix=(clazzId:String) => s"${clazzId}#${Random.nextInt(10)}"
//3. udf
spark.udf.register("addPrefix",addPrefix)
//4. udf
source.selectExpr("id","name","age","addPrefix(clazzId) clazzId").createOrReplaceTempView("temp")
//5.
val source2: DataFrame = spark.sql(
"""
select clazzId ,count(1) num
from temp
group by clazzId
""".stripMargin)
//6. udf , clazzId ,
val delPrefix = (clazzId:String)=>{
val head: String = clazzId.split("#").head
head
}
//7. udf
spark.udf.register("delPrefix",delPrefix)
//8. delPrefix
source2.selectExpr("delPrefix(clazzId) clazzId","num").createOrReplaceTempView("temp2")
//
spark.sql(
"""
|select * from temp2
""".stripMargin).show()
//9.
spark.sql(
"""
|select clazzId , sum(num)
| from temp2
| group by clazzId
""".stripMargin).show()
/**
*
* +--------+---+
| clazzId|num|
+--------+---+
|class_01| 1|
|class_01| 2|
|class_01| 2|
|class_03| 1|
|class_01| 1|
|class_01| 1|
|class_01| 3|
|class_01| 1|
|class_01| 1|
|class_01| 1|
|class_02| 1|
+--------+---+
+--------+--------+
| clazzId|sum(num)|
+--------+--------+
|class_01| 13|
|class_02| 1|
|class_03| 1|
+--------+--------+
*/
}
}
4.4つ目のケース:大きなテーブルjoin小さなテーブルで、この2つのテーブルのいずれかのテーブルに1つのkeyまたはいくつかのkeyデータが多く、データの傾きが発生します
ソリューション:小さなテーブルをブロードキャストすると、元のreduce joinからmap joinに変わり、shuffle操作を回避し、データの傾きを回避します.
import org.apache.spark.sql.{DataFrame, SparkSession}
import scala.util.Random
//4. : join , key key ,
// : , reduce join map join , shuffle ,
object ProcessData {
def main(args: Array[String]): Unit = {
//
val spark = SparkSession.builder()
.appName("parcess")
.master("local[4]")
.config("spark.sql.autoBroadcastJoinThreshold","10485760")
.getOrCreate()
import spark.implicits._
// 1
spark.sparkContext.parallelize(Seq[(Int, String, Int, String)](
(1, "aa", 20, "class_01"), (2, "bb", 20, "class_01"), (3, "vv", 20, "class_01"), (4, "dd", 20, "class_01"), (5, "ee", 20, "class_01"),
(6, "ss", 20, "class_01"), (7, "uu", 20, "class_01"), (8, "qq", 20, "class_01"), (9, "ww", 20, "class_01"), (10, "rr", 20, "class_01"),
(11, "tt", 20, "class_01"), (12, "xx", 20, "class_02"), (13, "kk", 20, "class_03"), (14, "oo", 20, "class_01"),
(15, "pp", 20, "class_01")
)).toDF("id", "name", "age", "clazzId")
.createOrReplaceTempView("student")
// 2
spark.sparkContext.parallelize(Seq[(String,String)](
("class_01","java"), ("class_02","python"), ("class_03","hive")
)).toDF("clazzId","name")
.createOrReplaceTempView("class")
// ,
spark.sql("cache table class")
//
spark.sql(
"""
|select s.id,s.name,s.age,c.name
| from student s left join class c
| on s.clazzId = c.clazzId
""".stripMargin)
//
.rdd
.mapPartitionsWithIndex((index,it)=>{
println(s"index_${index} data_${it.toBuffer}")
it
}).collect()
/**
* :
* index_51 data_ArrayBuffer([1,aa,20,java], [2,bb,20,java],[3,vv,20,java], [4,dd,20,java],[5,ee,20,java], [6,ss,20,java],
* [7,uu,20,java],[8,qq,20,java], [9,ww,20,java], [10,rr,20,java], [11,tt,20,java], [14,oo,20,java], [15,pp,20,java])
*
* :
* index_0 data_ArrayBuffer([1,aa,20,java], [2,bb,20,java], [3,vv,20,java])
index_1 data_ArrayBuffer([4,dd,20,java], [5,ee,20,java], [6,ss,20,java], [7,uu,20,java])
index_2 data_ArrayBuffer([8,qq,20,java], [9,ww,20,java], [10,rr,20,java], [11,tt,20,java])
index_3 data_ArrayBuffer([12,xx,20,python], [13,kk,20,hive], [14,oo,20,java], [15,pp,20,java])
*/
}
}
5.第5のiの場合:大きい表join大きい表、その中のある表の分布は比較的に均一で、別の表はある1つあるいはあるいくつかのkeyデータが特に多くて、データの傾斜も現れる
ソリューションの手順:
1、データ傾斜を生じたkeyをろ過して単独処理[塩を加える]と拡容(1つまたは少量のkeyのみ傾斜が発生し、拡容可能な倍数10-50倍)2、データ傾斜keyのないデータは通常通り処理する
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import scala.util.Random
//5. i : join , , key ,
// :
// 1、 key [ ]
// 2、 key
object ProcessData {
def main(args: Array[String]): Unit = {
//
val spark = SparkSession.builder()
.appName("parcess")
.master("local[4]")
//.config("spark.sql.autoBroadcastJoinThreshold","10485760")
.getOrCreate()
import spark.implicits._
// 1
val source1 = spark.sparkContext.parallelize(Seq[(Int, String, Int, String)](
(1, "aa", 20, "class_01"), (2, "bb", 20, "class_01"), (3, "vv", 20, "class_01"), (4, "dd", 20, "class_01"), (5, "ee", 20, "class_01"),
(6, "ss", 20, "class_01"), (7, "uu", 20, "class_01"), (8, "qq", 20, "class_01"), (9, "ww", 20, "class_01"), (10, "rr", 20, "class_01"),
(11, "tt", 20, "class_01"), (12, "xx", 20, "class_02"), (13, "kk", 20, "class_03"), (14, "oo", 20, "class_01"),
(15, "pp", 20, "class_01")
)).toDF("id", "name", "age", "clazzId")
// 2
val source2 = spark.sparkContext.parallelize(Seq[(String,String)](
("class_01","java"), ("class_02","python"), ("class_03","hive")
)).toDF("clazzId","name")
// , key
/**
* 1: , false
* 2: , 0.1~0.2 , ,
*/
source1.sample(false,0.5).show()
/**
* : class_01 ,
* +---+----+---+--------+
| id|name|age| clazzId|
+---+----+---+--------+
| 2| bb| 20|class_01|
| 5| ee| 20|class_01|
| 7| uu| 20|class_01|
| 8| qq| 20|class_01|
| 10| rr| 20|class_01|
| 11| tt| 20|class_01|
+---+----+---+--------+
*/
// :
//1. key
val source1Yes: Dataset[Row] = source1.filter("clazzId = 'class_01' ")
//2. key
val source1No: Dataset[Row] = source1.filter("clazzId != 'class_01' ")
//3. key
val source2Yes = source2.filter("clazzId = 'class_01'")
//4. key
val source2No: Dataset[Row] = source2.filter("clazzId != 'class_01'")
// join
source1No.createOrReplaceTempView("studentNo")
source2No.createOrReplaceTempView("classNo")
spark.sql(
"""
|select s.id,s.name,s.age,c.name
|from studentNo s left join classNo c
|on s.clazzId = c.clazzId
""".stripMargin).createOrReplaceTempView("temp1")
// UDF
// udf , clazzId #
val addPrefix=(clazzId:String) => s"${clazzId}#${Random.nextInt(10)}"
// udf , clazzId
val fixed=(i:Int,clazzId:String)=> s"${clazzId}#${i}"
// udf , clazzId
val delPrefix = (clazzId:String)=>{
val head: String = clazzId.split("#").head
head
}
// udf
spark.udf.register("addPrefix",addPrefix)
spark.udf.register("fixed",fixed)
spark.udf.register("delPrefix",delPrefix)
//
source1Yes.selectExpr("id","name","age","addPrefix(clazzId) clazzId").createOrReplaceTempView("student_temp")
//
def capacity(source:DataFrame): DataFrame ={
// dataframe
val rdd: RDD[Row] = spark.sparkContext.emptyRDD[Row]
var emptyDataframe: DataFrame = spark.createDataFrame(rdd,source.schema)
// 1-10, clazzId
for (i <- 0 until(10)){
emptyDataframe = emptyDataframe.union(source.selectExpr(s"fixed(${i},clazzId)","name"))
}
emptyDataframe
}
//
//
capacity(source2Yes).createOrReplaceTempView("clazz_temp")
// join
spark.sql(
"""
|select s.id,s.name,s.age,c.name
|from student_temp s left join clazz_temp c
|on s.clazzId = c.clazzId
""".stripMargin)
.createOrReplaceTempView("temp2")
spark.sql(
"""
|select * from temp1
|union
|select * from temp2
""".stripMargin)
.show()
/**
* :
*
* 1. join
* +---+----+---+------+
| id|name|age| name|
+---+----+---+------+
| 12| xx| 20|python|
| 13| kk| 20| hive|
+---+----+---+------+
2. join
+---+----+---+----+
| id|name|age|name|
+---+----+---+----+
| 3| vv| 20|java|
| 7| uu| 20|java|
| 8| qq| 20|java|
| 14| oo| 20|java|
| 4| dd| 20|java|
| 2| bb| 20|java|
| 6| ss| 20|java|
| 10| rr| 20|java|
| 1| aa| 20|java|
| 9| ww| 20|java|
| 11| tt| 20|java|
| 15| pp| 20|java|
| 5| ee| 20|java|
+---+----+---+----+
3.union
+---+----+---+------+
| id|name|age| name|
+---+----+---+------+
| 10| rr| 20| java|
| 9| ww| 20| java|
| 15| pp| 20| java|
| 1| aa| 20| java|
| 3| vv| 20| java|
| 7| uu| 20| java|
| 4| dd| 20| java|
| 11| tt| 20| java|
| 12| xx| 20|python|
| 6| ss| 20| java|
| 13| kk| 20| hive|
| 5| ee| 20| java|
| 2| bb| 20| java|
| 14| oo| 20| java|
| 8| qq| 20| java|
+---+----+---+------+
*/
}
}
6.第6のケース:大きいテーブルjoin大きいテーブル、その中のあるテーブルの分布は比較的に均一で、別のテーブルは多くのkeyデータが特に多く存在して、データの傾斜も現れる
解決策:データの傾斜を生じたテーブルを直接塩[0-9]を加え、別のテーブルを拡張し、keyデータが特に多く[最大10倍拡張]、そうでなければメモリを浪費する
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import scala.util.Random
//6. join , , key ,
//* :
//* [0-9], [ 10 ]
object ProcessData {
def main(args: Array[String]): Unit = {
//
val spark = SparkSession.builder()
.appName("parcess")
.master("local[4]")
//.config("spark.sql.autoBroadcastJoinThreshold","10485760")
.getOrCreate()
import spark.implicits._
// 1
val source1 = spark.sparkContext.parallelize(Seq[(Int, String, Int, String)](
(1, "aa", 20, "class_01"), (2, "bb", 20, "class_01"), (3, "vv", 20, "class_01"), (4, "dd", 20, "class_01"), (5, "ee", 20, "class_01"),
(6, "ss", 20, "class_01"), (7, "uu", 20, "class_01"), (8, "qq", 20, "class_01"), (9, "ww", 20, "class_01"), (10, "rr", 20, "class_01"),
(11, "tt", 20, "class_01"), (12, "xx", 20, "class_02"), (13, "kk", 20, "class_03"), (14, "oo", 20, "class_01"),
(15, "pp", 20, "class_01")
)).toDF("id", "name", "age", "clazzId")
// 2
val source2 = spark.sparkContext.parallelize(Seq[(String,String)](
("class_01","java"), ("class_02","python"), ("class_03","hive")
)).toDF("clazzId","name")
// , key
/**
* 1: , false
* 2: , 0.1~0.2 , ,
*/
source1.sample(false,0.5).show()
// source1 key
// UDF
// udf , clazzId #
val addPrefix=(clazzId:String) => s"${clazzId}#${Random.nextInt(10)}"
// udf , clazzId
val fixed=(i:Int,clazzId:String)=> s"${clazzId}#${i}"
// udf
spark.udf.register("addPrefix",addPrefix)
spark.udf.register("fixed",fixed)
//
source1.selectExpr("id","name","age","addPrefix(clazzId) clazzId").createOrReplaceTempView("student_temp")
//
def capacity(source:DataFrame): DataFrame ={
// dataframe
val rdd: RDD[Row] = spark.sparkContext.emptyRDD[Row]
var emptyDataframe: DataFrame = spark.createDataFrame(rdd,source.schema)
// 1-10, clazzId
for (i <- 0 until(10)){
emptyDataframe = emptyDataframe.union(source.selectExpr(s"fixed(${i},clazzId)","name"))
}
emptyDataframe
}
//
//
capacity(source2).createOrReplaceTempView("clazz_temp")
//
spark.sql(
"""
|select s.id,s.name,s.age,c.name
|from student_temp s left join clazz_temp c
|on s.clazzId = c.clazzId
""".stripMargin)
.show()
/**
* :
*+---+----+---+------+
| id|name|age| name|
+---+----+---+------+
| 7| uu| 20| java|
| 5| ee| 20| java|
| 8| qq| 20| java|
| 2| bb| 20| java|
| 3| vv| 20| java|
| 13| kk| 20| hive|
| 12| xx| 20|python|
| 1| aa| 20| java|
| 4| dd| 20| java|
| 6| ss| 20| java|
| 10| rr| 20| java|
| 11| tt| 20| java|
| 14| oo| 20| java|
| 15| pp| 20| java|
| 9| ww| 20| java|
+---+----+---+------+
*/
}
}