Spark現在のパーティションのpartitionIdを取得
私のオリジナルアドレス:https://dongkelun.com/2018/06/28/sparkGetPartitionId/
ここではSparkが現在のパーティションのpartitionIdを取得する方法について説明します.これはTaskContext.を通じてget.partitionId(公式サイトで見たものです)については、以下に例を示します.
次のコードは主にSparkSession、SparkContextが作成したrddとdfがサポートされているかどうかをテストします.
前言
ここではSparkが現在のパーティションのpartitionIdを取得する方法について説明します.これはTaskContext.を通じてget.partitionId(公式サイトで見たものです)については、以下に例を示します.
1、コード
次のコードは主にSparkSession、SparkContextが作成したrddとdfがサポートされているかどうかをテストします.
package com.dkl.leanring.partition
import org.apache.spark.sql.SparkSession
import org.apache.spark.TaskContext
/**
* partitionId
*/
object GetPartitionIdDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("GetPartitionIdDemo").master("local").getOrCreate()
val sc = spark.sparkContext
val data = Seq(1, 2, 3, 4)
// rdd,
val rdd = sc.parallelize(data, 3)
rdd.foreach(i => {
println("partitionId:" + TaskContext.get.partitionId)
})
import spark.implicits._
// df,
val df = rdd.toDF("id")
df.show
df.foreach(row => {
println("partitionId:" + TaskContext.get.partitionId)
})
// df,
val data1 = Array((1, 2), (3, 4))
val df1 = spark.createDataFrame(data1).repartition(2)
df1.show()
df1.foreach(row => {
println("partitionId:" + TaskContext.get.partitionId)
})
}
}