SparkSqlパフォーマンステストケース
3823 ワード
前言
前回SparkSqlが引き起こした災難を考慮して、私は小さなテストをすることにしました:異なる方法で数量を統計することにしました.
データセットの準備
infoA:
13111111111,Tom
13222222222,Jack
13333333333,Lily
13444444444,Lucy
13555555555,Allen
13666666666,White
13777777777,Rivers
13888888888,John
13999999999,Robert
infoB:
15111111111,Emma
15222222222,Mary
13555555555,Allen
13666666666,White
15333333333,Kevin
15444444444,Rose
13888888888,John
13999999999,Robert
15555555555,Kelly
15666666666,Steve
15777777777,David
15888888888,Amy
15999999999,Ruby
需要(前回の実例をシミュレートすること)
infoAにあるがinfoBにないユーザーの数を統計する
Sparkエントリプログラムとデータコードの読み出し
val spark = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.master(master = "local[*]")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val dfA = spark.read.textFile(path = "./data/infoA")
.map(_.split(","))
.map(x => (x(0), x(1)))
.toDF("tel_number", "name")
val dfB = spark.read.textFile(path = "./data/infoB")
.map(_.split(","))
.map(x => (x(0), x(1)))
.toDF("tel_number", "name")
dfA.cache()
dfB.cache()
dfB.createTempView(viewName = "viewB")
方法1
コード:
val start_time1 = System.currentTimeMillis()
dfA.filter("tel_number not in (select tel_number from viewB)")
.agg(count($"tel_number")).show()
val end_time1 = System.currentTimeMillis()
println(" :" + (end_time1 - start_time1))
説明:
ここではin(サブクエリ)を使用していますが、今回はデータセットが少ないため、前回の災害は発生しませんでした.前回の実際の環境テストを経て、明らかにこの方法は、in(‘A’,‘B’,‘C’)など、サブクエリの検出数が少ない場合に適しています.
結果:
+-----------------+
|count(tel_number)|
+-----------------+
| 5|
+-----------------+
:770
方法2
コード:
val start_time2 = System.currentTimeMillis()
dfA.join(dfB, usingColumns = Seq("tel_number"), joinType = "left_outer")
.where(dfB("name").isNull)
.agg(count($"tel_number")).show()
val end_time2 = System.currentTimeMillis()
println(" :" + (end_time2 - start_time2))
説明:
ここではjoin演算子左関連の2つのテーブルを使用していますが、関連していない右変表のnameはnullで表示され、この方法では大きな災害は発生しませんが、joinはshuffleを発生させる演算子なので、パフォーマンスが消耗します.SparkでShuffle演算子を生成するには
結果:
+-----------------+
|count(tel_number)|
+-----------------+
| 5|
+-----------------+
:2755
方法3
コード:
val start_time3 = System.currentTimeMillis()
val telSet: Set[String] = HashSet() ++ dfB.select("tel_number")
.map(row => row.mkString).collect()
val telBroadcast = spark.sparkContext.broadcast(telSet)
dfA.select("tel_number").map(row => row.mkString)
.filter(x => !telBroadcast.value.contains(x))
.toDF("tel_number")
.agg(count($"tel_number")).show()
val end_time3 = System.currentTimeMillis()
println(" :" + (end_time3 - start_time3))
説明:
Broadcastを使用してMapperエンドShuffleの集約機能を実現する原理(mapJoinを行う場合、一般的なデータ構造はHashsetを使用し、もちろん使用しなくてもよい)
結果:
+-----------------+
|count(tel_number)|
+-----------------+
| 5|
+-----------------+
:713
後記
3つの方法の比較から,最も性能が優れているのは3つ目の方法,すなわちブロードキャスト変数を用いる方法であり,最も性能が悪いのはShuffleを生じたためJoin演算子を用いることであることが分かった.ある友达は聞くことができて、それでは後で直接第3の方法を使えばいいのではないでしょうか?ブロードキャスト変数の使用にも限界があり、データcollectをDriver側にする必要があります.この場合、Driver側のリソースが十分であることを確保しなければなりません.放送変数の詳細については、ぜひこのブログを参考にしてください.1つ目の方法は前に述べたように、限界もありますので、3つの方法を使うときは適宜考えてみてください.