pysparkでデータ加工するための自分用チートシート


案件でビッグデータを加工するジョブをそれなりに作成してきました。
その中で使ってきたpysparkの備忘録です。
こういう記事は他にもたくさんあるけれど、自分用にメモ書きしていきます。

★ファイルの操作

■ファイルの読み込み

#読み込みたいファイルが格納されているディレクトリパス(単一ファイル指定可)
dir_path = '(ディレクトリパス)'
#読み込み形式(csv or parquet or json)
file_type = 'csv'
#headerの有無(true or false)
necessity= 'true'
### 読み込み
df = spark.read.format(file_type).option('header', necessity).load(dir_path)

■ファイルの書き出し

#書き出し先ディレクトリパス
dir_path = '(ディレクトリパス)'
#書き出し形式(csv or parquet or json)
file_type = 'csv'
#書き出しモード(overwrite or append)
write_mode = 'overwrite'
#headerの有無(true or false)
necessity= 'true'
#分割ファイル数(1以上の整数)
file_num = 1
※1ファイルが100MBになるように設定すると早いらしい
###書き込み
df.coalesce(file_num).write.format(file_type).option('header', necessity).mode(write_mode).save(dir_path)

★データの操作

■新規カラムの作成

from pyspark.sql.functions import col
#新規作成カラム名(既存と同じものにすると更新される)
new_col_name = 'new_col'
#参照するカラム
old_col_name = 'old_col'
###カラムを追加
df = df.withColumn(new_col_name, col(old_col_name) )
###カラム名の変換
df = df.withColumnRenamed(old_col_name, new_col_name)

■フィルタリング

df = df.filter( (条件式) )
※戻り値がbooleanなら使える

■指定列の抽出

#列指定
col_list = ['hoge', 'fuga']
###列抽出
select_df = df.select( col_list )
###リスト化
col_data_list = df.select(col_list[0]).rdd.flatMap(lambda x: x).collect()

■キャスト

#型指定
cast_kind = 'string'
###キャスト
df = df.withColumn(new_col_name, F.col(old_col_name).cast(cast_kind) )

■正規表現(戻り値:true or false)

#正規表現パターン
pattern = '^\d+$'
###正規表現
df = df.withColumn(new_col_name, F.col(old_col_name).rlike(pattern) )

■null値の扱い

###nullをチェック(戻り値:true or false)
df = df.withColumn(new_col_name, F.col(old_col_name).isNull() )
###null値の補完
#nullの置換が必要なカラムを指定
target_col = 'col_name'
df = df.fillna('hoge', subset=target_col)
※カラム指定なしなら、全カラムが対象になる
###nullを含むレコードを削除
#nullのチェックが必要なカラムを指定
target_col = 'col_name'
df = df.dropna(subset=target_col)
※カラム指定なしなら、全カラムが対象になる

■テーブル名の付与(ジョインする前によく使う)

#テーブル名の定義
table_name = 'hoge'
###エイリアスの付与
df = df.alias(table_name)

■ジョイン

###横結合
#ジョインキー
join_key = 'col_name'
※異なるカラムどうしでジョインする場合はSQLのように書く
#結合方式(inner,cross,outer,left,right,left_anti)
how = 'left'
※left_antiジョインは、null値を同値として扱ってくれないので注意
df = df1.join(df2, join_key, how)
###縦結合(ただのunionは事故るので列名参照する)
df = df1.unionByName(df2)

■集計

###カウント
df = df.groupBy(col_name).count()
###最大値
df = df.groupBy(col_name)agg({col_name : 'max'})
###最小値
df = df.groupBy(col_name)agg({col_name : 'min'})
###平均値
df = df.groupBy(col_name)agg({col_name : 'ave'})

■ソート

df = df.sort(col_name, ascending=True)

■データ加工のお供"pyspark.sql.functions"
公式:https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html

#functionsモジュールをインポート
from pyspark.sql import functions as F
###列を複製
df = df.withColumn(new_col_name, F.col(old_col_name) )
###同じ値で埋めた列を作成
df = df.withColumn(new_col_name, F.lit(1) )
###条件によって値を変化させた列を作成
df = df.withColumn(new_col_name, 
                   F.when( (条件式) ,
                          (trueの場合の値)
                         ).otherwise( (falseの場合の値) ) 
                   )
※whenはチェーン可→F.when().when().otherwise()
###文字列の連結
df = df.withColumn(new_col_name, 
                   F.concat( F.col(old_col_name), F.lit('hoge') )
                  )
###文字列の置換
df = df.withColumn(new_col_name, 
                   F.regexp_replace( old_col_name, (置換対象文字列), (置換後文字列) )
                  )

#####window関数(集計関数)を使う#####
#windowモジュールをインポート
from pyspark.sql.window import Window
#順位付け対象カラム名
col_name = 'hoge'
#集積関数の定義
myWindow = Window.orderBy(col_name)
#パーティションあり
partition_col = 'fuga'
myWindow = Window.partitionBy(partition_col).orderBy(col_name)
###順位付け
df = df.withColumn(new_col_name, F.rank().over(myWindow) )
#denseRank:同率ランクがある場合の順位飛ばしあり
#rank:同率ランクがある場合の順位飛ばしなし
#rowNumber:順位の重複なし

#####日付系#####
#日付形式
pattern = 'yyyyMMdd'
###指定の日付形式の文字列からdate型へ変更
df = df.withColumn(new_col_name, F.to_date( F.col(old_col_name), pattern)
###指定の日付形式の文字列からtimestamp型へ変更
df = df.withColumn(new_col_name, F.to_timestamp( F.col(old_col_name), pattern )
###指定した形式のdate型へ変更
df = df.withColumn(new_col_name, F.date_format( F.col(old_col_name), pattern )

★検証

#レコード数のカウント
df.count()
#1行目を表示
df.head()
#上から指定した分だけ表示
num = 5
df.show(num)
#カラム一覧表示
df.columns
#データフレームどうしの比較(カラムは一致すること)
df = df1.subsract(df2)
※戻り値はleft anti joinの結果