SparkSQLデータ傾斜解決実戦紹介(HiveSQLに適用)

92659 ワード

一:データの傾きはどのような状況ですか.
データの傾斜が発生する場合:
  • 1、shuffleの場合、このshuffleを生成するフィールドが空の場合、データ傾き
  • が発生する.
  • 2、keyは多く、パーティション数の設定が少なすぎて、多くのkeyが1つのパーティションに集まってデータの傾き
  • が発生した.
  • 3、あるテーブルのkeyデータが特に多い場合、groupbyを使用するとデータの傾き
  • が発生する.
  • 4、大きい表join小さい表、この2つの表の中のある表はあるkeyあるいはあるいくつかのkeyデータが比較的に多くて、データの傾斜
  • が現れます
  • 5、大きい表join大きい表、その中のある表の分布は比較的に均一で、もう一つの表はある1つあるいはあるいくつかのkeyデータが特に多く、データの傾斜
  • も現れる.
  • 6、大きい表join大きい表、その中のある表の分布は比較的に均一で、もう一つの表は多くのkeyデータが特に多く存在し、データの傾斜
  • も現れる.
    二:sqlを使用してデータの傾斜をどのように解決しますか?
    1.第一のケース:shuffleの場合、このshuffleを生成するフィールドが空の場合、データの傾きが発生する
    ソリューション:空の値をフィルタ
    import org.apache.spark.sql.SparkSession
    
    //1、shuffle   ,      shuffle     ,       
    //    :     
    object ProcessData {
      def main(args: Array[String]): Unit = {
        //    
        val spark = SparkSession.builder().appName("parcess").master("local[4]").getOrCreate()
        import spark.implicits._
        //    1
        spark.sparkContext.parallelize(Seq[(Int,String,Int,String)](
          (1,"aa",20,""), (2,"bb",20,""), (3,"vv",20,""), (4,"dd",20,""), (5,"ee",20,""),
          (6,"ss",20,""), (7,"uu",20,""), (8,"qq",20,""), (9,"ww",20,""), (10,"rr",20,""),
          (11,"tt",20,""), (12,"xx",20,"class_02"), (13,"kk",20,"class_03"), (14,"oo",20,"class_01"),
          (15,"pp",20,"class_01")
        )).toDF("id","name","age","clazzId")
          //    ,      
            .filter("clazzId != ''")
          .createOrReplaceTempView("student")
    
        //    2
        spark.sparkContext.parallelize(Seq[(String,String)](
          ("class_01","java"), ("class_02","python"), ("class_03","hive")
        )).toDF("clazzId","name")
          .createOrReplaceTempView("class")
    
        //    
        spark.sql(
          """
            |select s.id,s.name,s.age,c.name
            | from student s left join class c
            | on s.clazzId = c.clazzId
          """.stripMargin).show()
    
        /**
          *          
          * +---+----+---+------+
            | id|name|age|  name|
            +---+----+---+------+
            | 14|  oo| 20|  java|
            | 15|  pp| 20|  java|
            | 12|  xx| 20|python|
            | 13|  kk| 20|  hive|
            |  1|  aa| 20|  null|
            |  2|  bb| 20|  null|
            |  3|  vv| 20|  null|
            |  4|  dd| 20|  null|
            |  5|  ee| 20|  null|
            |  6|  ss| 20|  null|
            |  7|  uu| 20|  null|
            |  8|  qq| 20|  null|
            |  9|  ww| 20|  null|
            | 10|  rr| 20|  null|
            | 11|  tt| 20|  null|
            +---+----+---+------+
    
                  
          | id|name|age|  name|
          +---+----+---+------+
          | 14|  oo| 20|  java|
          | 15|  pp| 20|  java|
          | 12|  xx| 20|python|
          | 13|  kk| 20|  hive|
          +---+----+---+------+
    
          */
    
      }
    
    }
    
    

    2.第2のケース:keyが多く、パーティション数の設定が少なすぎて、多くのkeyが1つのパーティションに集まってデータの傾きが発生する
    ソリューション:パーティション数の構成パラメータの変更
      shuffle   :
    #spark sql shuffle     
    spark.sql.shuffle.partitions="200"
    
    #spark core shuffle     
    spark.default.parallelism=200
    
    

    3.第3のケース:あるテーブルのkeyデータが特に多い場合、groupbyを使用するとデータの傾きが発生します.
    ソリューション:ローカル集約+グローバル集約
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    import scala.util.Random
    
    //3.     :         key     ,    group by         
    //    :     +    
    object ProcessData {
      def main(args: Array[String]): Unit = {
        //    
        val spark = SparkSession.builder().appName("parcess").master("local[4]").getOrCreate()
        import spark.implicits._
        //    1
        val source: DataFrame = spark.sparkContext.parallelize(Seq[(Int, String, Int, String)](
          (1, "aa", 20, "class_01"), (2, "bb", 20, "class_01"), (3, "vv", 20, "class_01"), (4, "dd", 20, "class_01"), (5, "ee", 20, "class_01"),
          (6, "ss", 20, "class_01"), (7, "uu", 20, "class_01"), (8, "qq", 20, "class_01"), (9, "ww", 20, "class_01"), (10, "rr", 20, "class_01"),
          (11, "tt", 20, "class_01"), (12, "xx", 20, "class_02"), (13, "kk", 20, "class_03"), (14, "oo", 20, "class_01"),
          (15, "pp", 20, "class_01")
        )).toDF("id", "name", "age", "clazzId")
    
        //2.  udf  ,      clazzId       #    
        val addPrefix=(clazzId:String) => s"${clazzId}#${Random.nextInt(10)}"
    
        //3.  udf  
        spark.udf.register("addPrefix",addPrefix)
    
        //4.  udf               
        source.selectExpr("id","name","age","addPrefix(clazzId) clazzId").createOrReplaceTempView("temp")
    
        //5.    
        val source2: DataFrame = spark.sql(
          """
    select clazzId ,count(1) num
    from temp
      group by clazzId
          """.stripMargin)
    
        //6.  udf  , clazzId     ,       
        val delPrefix = (clazzId:String)=>{
          val head: String = clazzId.split("#").head
          head
        }
    
        //7.  udf  
        spark.udf.register("delPrefix",delPrefix)
    
        //8.  delPrefix       
        source2.selectExpr("delPrefix(clazzId) clazzId","num").createOrReplaceTempView("temp2")
    
        //         
        spark.sql(
          """
            |select * from temp2
          """.stripMargin).show()
    
        //9.      
        spark.sql(
          """
            |select clazzId , sum(num)
            | from temp2
            |  group by clazzId
          """.stripMargin).show()
    
        /**
          *       
          * +--------+---+
            | clazzId|num|
            +--------+---+
            |class_01|  1|
            |class_01|  2|
            |class_01|  2|
            |class_03|  1|
            |class_01|  1|
            |class_01|  1|
            |class_01|  3|
            |class_01|  1|
            |class_01|  1|
            |class_01|  1|
            |class_02|  1|
            +--------+---+
    
                
          +--------+--------+
          | clazzId|sum(num)|
          +--------+--------+
          |class_01|      13|
          |class_02|      1|
          |class_03|      1|
          +--------+--------+
    
          */
    
      }
    }
    
    

    4.4つ目のケース:大きなテーブルjoin小さなテーブルで、この2つのテーブルのいずれかのテーブルに1つのkeyまたはいくつかのkeyデータが多く、データの傾きが発生します
    ソリューション:小さなテーブルをブロードキャストすると、元のreduce joinからmap joinに変わり、shuffle操作を回避し、データの傾きを回避します.
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    import scala.util.Random
    
    //4.     :   join    ,             key     key     ,       
    //    :       ,      reduce join    map join ,  shuffle  ,        
    object ProcessData {
      def main(args: Array[String]): Unit = {
        //    
        val spark = SparkSession.builder()
            .appName("parcess")
            .master("local[4]")
            .config("spark.sql.autoBroadcastJoinThreshold","10485760")
            .getOrCreate()
        import spark.implicits._
        //    1
        spark.sparkContext.parallelize(Seq[(Int, String, Int, String)](
          (1, "aa", 20, "class_01"), (2, "bb", 20, "class_01"), (3, "vv", 20, "class_01"), (4, "dd", 20, "class_01"), (5, "ee", 20, "class_01"),
          (6, "ss", 20, "class_01"), (7, "uu", 20, "class_01"), (8, "qq", 20, "class_01"), (9, "ww", 20, "class_01"), (10, "rr", 20, "class_01"),
          (11, "tt", 20, "class_01"), (12, "xx", 20, "class_02"), (13, "kk", 20, "class_03"), (14, "oo", 20, "class_01"),
          (15, "pp", 20, "class_01")
        )).toDF("id", "name", "age", "clazzId")
            .createOrReplaceTempView("student")
    
        //    2
        spark.sparkContext.parallelize(Seq[(String,String)](
          ("class_01","java"), ("class_02","python"), ("class_03","hive")
        )).toDF("clazzId","name")
          .createOrReplaceTempView("class")
    
        //       ,               
        spark.sql("cache table class")
    
        //  
        spark.sql(
          """
            |select s.id,s.name,s.age,c.name
            | from student s left join class c
            |  on s.clazzId = c.clazzId
          """.stripMargin)
          //             
              .rdd
                  .mapPartitionsWithIndex((index,it)=>{
                    println(s"index_${index}  data_${it.toBuffer}")
                    it
                  }).collect()
    
        /**
          *        :
          * index_51  data_ArrayBuffer([1,aa,20,java], [2,bb,20,java],[3,vv,20,java], [4,dd,20,java],[5,ee,20,java], [6,ss,20,java],
          *                            [7,uu,20,java],[8,qq,20,java], [9,ww,20,java], [10,rr,20,java], [11,tt,20,java], [14,oo,20,java], [15,pp,20,java])
          *
          *         :
          * index_0  data_ArrayBuffer([1,aa,20,java], [2,bb,20,java], [3,vv,20,java])
            index_1  data_ArrayBuffer([4,dd,20,java], [5,ee,20,java], [6,ss,20,java], [7,uu,20,java])
            index_2  data_ArrayBuffer([8,qq,20,java], [9,ww,20,java], [10,rr,20,java], [11,tt,20,java])
            index_3  data_ArrayBuffer([12,xx,20,python], [13,kk,20,hive], [14,oo,20,java], [15,pp,20,java])
          */
    
      }
    }
    
    

    5.第5のiの場合:大きい表join大きい表、その中のある表の分布は比較的に均一で、別の表はある1つあるいはあるいくつかのkeyデータが特に多くて、データの傾斜も現れる
    ソリューションの手順:
    1、データ傾斜を生じたkeyをろ過して単独処理[塩を加える]と拡容(1つまたは少量のkeyのみ傾斜が発生し、拡容可能な倍数10-50倍)2、データ傾斜keyのないデータは通常通り処理する
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    
    import scala.util.Random
    
    //5.   i  :   join   ,            ,              key     ,        
    //      :
      //  1、        key        [  ]   
      //  2、      key       
    object ProcessData {
      def main(args: Array[String]): Unit = {
        //    
        val spark = SparkSession.builder()
            .appName("parcess")
            .master("local[4]")
            //.config("spark.sql.autoBroadcastJoinThreshold","10485760")
            .getOrCreate()
        import spark.implicits._
        //    1
        val source1 = spark.sparkContext.parallelize(Seq[(Int, String, Int, String)](
          (1, "aa", 20, "class_01"), (2, "bb", 20, "class_01"), (3, "vv", 20, "class_01"), (4, "dd", 20, "class_01"), (5, "ee", 20, "class_01"),
          (6, "ss", 20, "class_01"), (7, "uu", 20, "class_01"), (8, "qq", 20, "class_01"), (9, "ww", 20, "class_01"), (10, "rr", 20, "class_01"),
          (11, "tt", 20, "class_01"), (12, "xx", 20, "class_02"), (13, "kk", 20, "class_03"), (14, "oo", 20, "class_01"),
          (15, "pp", 20, "class_01")
        )).toDF("id", "name", "age", "clazzId")
    
        //    2
        val source2 = spark.sparkContext.parallelize(Seq[(String,String)](
          ("class_01","java"), ("class_02","python"), ("class_03","hive")
        )).toDF("clazzId","name")
    
        //  ,       key
        /**
          *   1:        ,   false           
          *   2:     ,          0.1~0.2 ,        ,      
          */
    
        source1.sample(false,0.5).show()
    
        /**
          *     :    class_01      ,       
          * +---+----+---+--------+
            | id|name|age| clazzId|
            +---+----+---+--------+
            |  2|  bb| 20|class_01|
            |  5|  ee| 20|class_01|
            |  7|  uu| 20|class_01|
            |  8|  qq| 20|class_01|
            | 10|  rr| 20|class_01|
            | 11|  tt| 20|class_01|
            +---+----+---+--------+
    
          */
    
        //    :
        //1.               key
        val source1Yes: Dataset[Row] = source1.filter("clazzId = 'class_01' ")
        //2.                 key
        val source1No: Dataset[Row] = source1.filter("clazzId != 'class_01' ")
        //3.                 key
        val source2Yes = source2.filter("clazzId = 'class_01'")
        //4.                   key
        val source2No: Dataset[Row] = source2.filter("clazzId != 'class_01'")
    
        //             join
        source1No.createOrReplaceTempView("studentNo")
        source2No.createOrReplaceTempView("classNo")
    
        spark.sql(
          """
            |select s.id,s.name,s.age,c.name
            |from studentNo s left join  classNo c
            |on s.clazzId = c.clazzId
          """.stripMargin).createOrReplaceTempView("temp1")
    
        //   UDF  
        //  udf  ,      clazzId       #    
        val addPrefix=(clazzId:String) => s"${clazzId}#${Random.nextInt(10)}"
        //  udf  , clazzId      
        val fixed=(i:Int,clazzId:String)=> s"${clazzId}#${i}"
        //  udf  , clazzId     
        val delPrefix = (clazzId:String)=>{
          val head: String = clazzId.split("#").head
          head
        }
        //  udf  
        spark.udf.register("addPrefix",addPrefix)
        spark.udf.register("fixed",fixed)
        spark.udf.register("delPrefix",delPrefix)
    
        //            
        source1Yes.selectExpr("id","name","age","addPrefix(clazzId) clazzId").createOrReplaceTempView("student_temp")
    
        //      
        def capacity(source:DataFrame): DataFrame ={
          //    dataframe
          val rdd: RDD[Row] = spark.sparkContext.emptyRDD[Row]
          var emptyDataframe: DataFrame = spark.createDataFrame(rdd,source.schema)
          //  1-10, clazzId      
          for (i <- 0 until(10)){
            emptyDataframe = emptyDataframe.union(source.selectExpr(s"fixed(${i},clazzId)","name"))
          }
          emptyDataframe
        }
    
        //                      
        //      
        capacity(source2Yes).createOrReplaceTempView("clazz_temp")
        //          join
      spark.sql(
          """
            |select s.id,s.name,s.age,c.name
            |from student_temp s left join clazz_temp c
            |on s.clazzId = c.clazzId
          """.stripMargin)
            .createOrReplaceTempView("temp2")
    
        spark.sql(
          """
            |select * from temp1
            |union
            |select * from temp2
          """.stripMargin)
          .show()
    
        /**
          *     :
          *
          * 1.       join
          * +---+----+---+------+
            | id|name|age|  name|
            +---+----+---+------+
            | 12|  xx| 20|python|
            | 13|  kk| 20|  hive|
            +---+----+---+------+
    
          2.      join
          +---+----+---+----+
          | id|name|age|name|
          +---+----+---+----+
          |  3|  vv| 20|java|
          |  7|  uu| 20|java|
          |  8|  qq| 20|java|
          | 14|  oo| 20|java|
          |  4|  dd| 20|java|
          |  2|  bb| 20|java|
          |  6|  ss| 20|java|
          | 10|  rr| 20|java|
          |  1|  aa| 20|java|
          |  9|  ww| 20|java|
          | 11|  tt| 20|java|
          | 15|  pp| 20|java|
          |  5|  ee| 20|java|
          +---+----+---+----+
    
          3.union    
          +---+----+---+------+
          | id|name|age|  name|
          +---+----+---+------+
          | 10|  rr| 20|  java|
          |  9|  ww| 20|  java|
          | 15|  pp| 20|  java|
          |  1|  aa| 20|  java|
          |  3|  vv| 20|  java|
          |  7|  uu| 20|  java|
          |  4|  dd| 20|  java|
          | 11|  tt| 20|  java|
          | 12|  xx| 20|python|
          |  6|  ss| 20|  java|
          | 13|  kk| 20|  hive|
          |  5|  ee| 20|  java|
          |  2|  bb| 20|  java|
          | 14|  oo| 20|  java|
          |  8|  qq| 20|  java|
          +---+----+---+------+
    
          */
    
      }
    }
    
    

    6.第6のケース:大きいテーブルjoin大きいテーブル、その中のあるテーブルの分布は比較的に均一で、別のテーブルは多くのkeyデータが特に多く存在して、データの傾斜も現れる
    解決策:データの傾斜を生じたテーブルを直接塩[0-9]を加え、別のテーブルを拡張し、keyデータが特に多く[最大10倍拡張]、そうでなければメモリを浪費する
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    
    import scala.util.Random
    
    //6.   join   ,            ,        key     ,        
    //*     :
      //*                   [0-9],         [    10 ]
    object ProcessData {
      def main(args: Array[String]): Unit = {
        //    
        val spark = SparkSession.builder()
            .appName("parcess")
            .master("local[4]")
            //.config("spark.sql.autoBroadcastJoinThreshold","10485760")
            .getOrCreate()
        import spark.implicits._
        //    1
        val source1 = spark.sparkContext.parallelize(Seq[(Int, String, Int, String)](
          (1, "aa", 20, "class_01"), (2, "bb", 20, "class_01"), (3, "vv", 20, "class_01"), (4, "dd", 20, "class_01"), (5, "ee", 20, "class_01"),
          (6, "ss", 20, "class_01"), (7, "uu", 20, "class_01"), (8, "qq", 20, "class_01"), (9, "ww", 20, "class_01"), (10, "rr", 20, "class_01"),
          (11, "tt", 20, "class_01"), (12, "xx", 20, "class_02"), (13, "kk", 20, "class_03"), (14, "oo", 20, "class_01"),
          (15, "pp", 20, "class_01")
        )).toDF("id", "name", "age", "clazzId")
    
        //    2
        val source2 = spark.sparkContext.parallelize(Seq[(String,String)](
          ("class_01","java"), ("class_02","python"), ("class_03","hive")
        )).toDF("clazzId","name")
    
        //  ,       key
        /**
          *   1:        ,   false           
          *   2:     ,          0.1~0.2 ,        ,      
          */
    
        source1.sample(false,0.5).show()
        //  source1   key      
    
        //   UDF  
        //  udf  ,      clazzId       #    
        val addPrefix=(clazzId:String) => s"${clazzId}#${Random.nextInt(10)}"
        //  udf  , clazzId      
        val fixed=(i:Int,clazzId:String)=> s"${clazzId}#${i}"
    
        //  udf  
        spark.udf.register("addPrefix",addPrefix)
        spark.udf.register("fixed",fixed)
    
        //            
        source1.selectExpr("id","name","age","addPrefix(clazzId) clazzId").createOrReplaceTempView("student_temp")
    
        //      
        def capacity(source:DataFrame): DataFrame ={
          //    dataframe
          val rdd: RDD[Row] = spark.sparkContext.emptyRDD[Row]
          var emptyDataframe: DataFrame = spark.createDataFrame(rdd,source.schema)
          //  1-10, clazzId      
          for (i <- 0 until(10)){
            emptyDataframe = emptyDataframe.union(source.selectExpr(s"fixed(${i},clazzId)","name"))
          }
          emptyDataframe
        }
    
        //              
        //      
        capacity(source2).createOrReplaceTempView("clazz_temp")
    
      //    
      spark.sql(
          """
            |select s.id,s.name,s.age,c.name
            |from student_temp s left join clazz_temp c
            |on s.clazzId = c.clazzId
          """.stripMargin)
              .show()
    
        /**
          *   :
          *+---+----+---+------+
          | id|name|age|  name|
          +---+----+---+------+
          |  7|  uu| 20|  java|
          |  5|  ee| 20|  java|
          |  8|  qq| 20|  java|
          |  2|  bb| 20|  java|
          |  3|  vv| 20|  java|
          | 13|  kk| 20|  hive|
          | 12|  xx| 20|python|
          |  1|  aa| 20|  java|
          |  4|  dd| 20|  java|
          |  6|  ss| 20|  java|
          | 10|  rr| 20|  java|
          | 11|  tt| 20|  java|
          | 14|  oo| 20|  java|
          | 15|  pp| 20|  java|
          |  9|  ww| 20|  java|
          +---+----+---+------+
          */
    
      }
    }