Sparkによるカラム移行
1999 ワード
1、列転行
行転列と列転行は、次のcsvファイルがある場合によく使用される機能です.
次のフォーマットに変換します.
まず、sparkを使用して以下のcsvファイルを読み込みます.
次に、入力した1行のデータについてaddressに基づいて複数行に分割し、次の関数を書きます.
最後に、主な関数です.
行転列と列転行は、次のcsvファイルがある場合によく使用される機能です.
+---+----+---------+
|id |name|address |
+---+----+---------+
|1 |a |add1,add2|
|2 |b |add3 |
|3 |c |add4 |
|4 |d |add5 |
+---+----+---------+
次のフォーマットに変換します.
+---+----+----+
| id|name|addr|
+---+----+----+
| 1| a|add1|
| 1| a|add2|
| 2| b|add3|
| 3| c|add4|
| 4| d|add5|
+---+----+----+
まず、sparkを使用して以下のcsvファイルを読み込みます.
def getCsv(spark:SparkSession,file:String) = {
import spark.implicits._
val df = spark.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.option("header", true)
.option("delimiter", ",")
.option("encoding","gbk")
.load(file)
df
}
次に、入力した1行のデータについてaddressに基づいて複数行に分割し、次の関数を書きます.
def splitByAddr(id:String,name:String,addrs:String)={
val addrsList = addrs.split(",")
var resSeq = Seq[Row]()
for(t
最後に、主な関数です.
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.enableHiveSupport()
//.config("spark.some.config.option", "some-value")
.getOrCreate()
import spark.implicits._
//implicit val sEncoder = org.apache.spark.sql.Encoders.kryo[String]
val df = getCsv(spark,"test.csv")
df.show(false)
val df1 = df.rdd.flatMap(line => {var s = line.getAs[String]("address");var id = line.getAs[String]("id");var name = line.getAs[String]("name");splitByAddr(id,name,s)})
df1.collect().foreach(println)
val schema = StructType(List(
StructField("id", StringType, nullable = false),
StructField("name", StringType, nullable = true),
StructField("addr", StringType, nullable = true)
))
println(df1)
spark.createDataFrame(df1,schema).show()