SparkSqlパフォーマンステストケース

3823 ワード

前言


前回SparkSqlが引き起こした災難を考慮して、私は小さなテストをすることにしました:異なる方法で数量を統計することにしました.

データセットの準備


infoA:
13111111111,Tom
13222222222,Jack
13333333333,Lily
13444444444,Lucy
13555555555,Allen
13666666666,White
13777777777,Rivers
13888888888,John
13999999999,Robert

infoB:
15111111111,Emma
15222222222,Mary
13555555555,Allen
13666666666,White
15333333333,Kevin
15444444444,Rose
13888888888,John
13999999999,Robert
15555555555,Kelly
15666666666,Steve
15777777777,David
15888888888,Amy
15999999999,Ruby

需要(前回の実例をシミュレートすること)


infoAにあるがinfoBにないユーザーの数を統計する

Sparkエントリプログラムとデータコードの読み出し

val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master(master = "local[*]")
      .getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.functions._
    val dfA = spark.read.textFile(path = "./data/infoA")
      .map(_.split(","))
      .map(x => (x(0), x(1)))
      .toDF("tel_number", "name")

    val dfB = spark.read.textFile(path = "./data/infoB")
      .map(_.split(","))
      .map(x => (x(0), x(1)))
      .toDF("tel_number", "name")
    dfA.cache()
    dfB.cache()
    dfB.createTempView(viewName = "viewB")

方法1


コード:
    val start_time1 = System.currentTimeMillis()
    dfA.filter("tel_number not in (select tel_number from viewB)")
      .agg(count($"tel_number")).show()
    val end_time1 = System.currentTimeMillis()
    println(" :" + (end_time1 - start_time1))

説明:
ここではin(サブクエリ)を使用していますが、今回はデータセットが少ないため、前回の災害は発生しませんでした.前回の実際の環境テストを経て、明らかにこの方法は、in(‘A’,‘B’,‘C’)など、サブクエリの検出数が少ない場合に適しています.
結果:
+-----------------+
|count(tel_number)|
+-----------------+
|                5|
+-----------------+

 :770

方法2


コード:
    val start_time2 = System.currentTimeMillis()
    dfA.join(dfB, usingColumns = Seq("tel_number"), joinType = "left_outer")
      .where(dfB("name").isNull)
      .agg(count($"tel_number")).show()
    val end_time2 = System.currentTimeMillis()
    println(" :" + (end_time2 - start_time2))

説明:
ここではjoin演算子左関連の2つのテーブルを使用していますが、関連していない右変表のnameはnullで表示され、この方法では大きな災害は発生しませんが、joinはshuffleを発生させる演算子なので、パフォーマンスが消耗します.SparkでShuffle演算子を生成するには
結果:
+-----------------+
|count(tel_number)|
+-----------------+
|                5|
+-----------------+

 :2755

方法3


コード:
    val start_time3 = System.currentTimeMillis()
    val telSet: Set[String] = HashSet() ++ dfB.select("tel_number")
      .map(row => row.mkString).collect()
    val telBroadcast = spark.sparkContext.broadcast(telSet)
    dfA.select("tel_number").map(row => row.mkString)
      .filter(x => !telBroadcast.value.contains(x))
      .toDF("tel_number")
      .agg(count($"tel_number")).show()

    val end_time3 = System.currentTimeMillis()
    println(" :" + (end_time3 - start_time3))

説明:
Broadcastを使用してMapperエンドShuffleの集約機能を実現する原理(mapJoinを行う場合、一般的なデータ構造はHashsetを使用し、もちろん使用しなくてもよい)
結果:
+-----------------+
|count(tel_number)|
+-----------------+
|                5|
+-----------------+

 :713

後記


3つの方法の比較から,最も性能が優れているのは3つ目の方法,すなわちブロードキャスト変数を用いる方法であり,最も性能が悪いのはShuffleを生じたためJoin演算子を用いることであることが分かった.ある友达は聞くことができて、それでは後で直接第3の方法を使えばいいのではないでしょうか?ブロードキャスト変数の使用にも限界があり、データcollectをDriver側にする必要があります.この場合、Driver側のリソースが十分であることを確保しなければなりません.放送変数の詳細については、ぜひこのブログを参考にしてください.1つ目の方法は前に述べたように、限界もありますので、3つの方法を使うときは適宜考えてみてください.