pysparkでデータ加工するための自分用チートシート
案件でビッグデータを加工するジョブをそれなりに作成してきました。
その中で使ってきたpysparkの備忘録です。
こういう記事は他にもたくさんあるけれど、自分用にメモ書きしていきます。
★ファイルの操作
■ファイルの読み込み
#読み込みたいファイルが格納されているディレクトリパス(単一ファイル指定可)
dir_path = '(ディレクトリパス)'
#読み込み形式(csv or parquet or json)
file_type = 'csv'
#headerの有無(true or false)
necessity= 'true'
### 読み込み
df = spark.read.format(file_type).option('header', necessity).load(dir_path)
■ファイルの書き出し
#書き出し先ディレクトリパス
dir_path = '(ディレクトリパス)'
#書き出し形式(csv or parquet or json)
file_type = 'csv'
#書き出しモード(overwrite or append)
write_mode = 'overwrite'
#headerの有無(true or false)
necessity= 'true'
#分割ファイル数(1以上の整数)
file_num = 1
※1ファイルが100MBになるように設定すると早いらしい
###書き込み
df.coalesce(file_num).write.format(file_type).option('header', necessity).mode(write_mode).save(dir_path)
★データの操作
■新規カラムの作成
from pyspark.sql.functions import col
#新規作成カラム名(既存と同じものにすると更新される)
new_col_name = 'new_col'
#参照するカラム
old_col_name = 'old_col'
###カラムを追加
df = df.withColumn(new_col_name, col(old_col_name) )
###カラム名の変換
df = df.withColumnRenamed(old_col_name, new_col_name)
■フィルタリング
df = df.filter( (条件式) )
※戻り値がbooleanなら使える
■指定列の抽出
#列指定
col_list = ['hoge', 'fuga']
###列抽出
select_df = df.select( col_list )
###リスト化
col_data_list = df.select(col_list[0]).rdd.flatMap(lambda x: x).collect()
■キャスト
#型指定
cast_kind = 'string'
###キャスト
df = df.withColumn(new_col_name, F.col(old_col_name).cast(cast_kind) )
■正規表現(戻り値:true or false)
#正規表現パターン
pattern = '^\d+$'
###正規表現
df = df.withColumn(new_col_name, F.col(old_col_name).rlike(pattern) )
■null値の扱い
###nullをチェック(戻り値:true or false)
df = df.withColumn(new_col_name, F.col(old_col_name).isNull() )
###null値の補完
#nullの置換が必要なカラムを指定
target_col = 'col_name'
df = df.fillna('hoge', subset=target_col)
※カラム指定なしなら、全カラムが対象になる
###nullを含むレコードを削除
#nullのチェックが必要なカラムを指定
target_col = 'col_name'
df = df.dropna(subset=target_col)
※カラム指定なしなら、全カラムが対象になる
■テーブル名の付与(ジョインする前によく使う)
#テーブル名の定義
table_name = 'hoge'
###エイリアスの付与
df = df.alias(table_name)
■ジョイン
###横結合
#ジョインキー
join_key = 'col_name'
※異なるカラムどうしでジョインする場合はSQLのように書く
#結合方式(inner,cross,outer,left,right,left_anti)
how = 'left'
※left_antiジョインは、null値を同値として扱ってくれないので注意
df = df1.join(df2, join_key, how)
###縦結合(ただのunionは事故るので列名参照する)
df = df1.unionByName(df2)
■集計
###カウント
df = df.groupBy(col_name).count()
###最大値
df = df.groupBy(col_name)agg({col_name : 'max'})
###最小値
df = df.groupBy(col_name)agg({col_name : 'min'})
###平均値
df = df.groupBy(col_name)agg({col_name : 'ave'})
■ソート
df = df.sort(col_name, ascending=True)
■データ加工のお供"pyspark.sql.functions"
公式:https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html
#functionsモジュールをインポート
from pyspark.sql import functions as F
###列を複製
df = df.withColumn(new_col_name, F.col(old_col_name) )
###同じ値で埋めた列を作成
df = df.withColumn(new_col_name, F.lit(1) )
###条件によって値を変化させた列を作成
df = df.withColumn(new_col_name,
F.when( (条件式) ,
(trueの場合の値)
).otherwise( (falseの場合の値) )
)
※whenはチェーン可→F.when().when().otherwise()
###文字列の連結
df = df.withColumn(new_col_name,
F.concat( F.col(old_col_name), F.lit('hoge') )
)
###文字列の置換
df = df.withColumn(new_col_name,
F.regexp_replace( old_col_name, (置換対象文字列), (置換後文字列) )
)
#####window関数(集計関数)を使う#####
#windowモジュールをインポート
from pyspark.sql.window import Window
#順位付け対象カラム名
col_name = 'hoge'
#集積関数の定義
myWindow = Window.orderBy(col_name)
#パーティションあり
partition_col = 'fuga'
myWindow = Window.partitionBy(partition_col).orderBy(col_name)
###順位付け
df = df.withColumn(new_col_name, F.rank().over(myWindow) )
#denseRank:同率ランクがある場合の順位飛ばしあり
#rank:同率ランクがある場合の順位飛ばしなし
#rowNumber:順位の重複なし
#####日付系#####
#日付形式
pattern = 'yyyyMMdd'
###指定の日付形式の文字列からdate型へ変更
df = df.withColumn(new_col_name, F.to_date( F.col(old_col_name), pattern)
###指定の日付形式の文字列からtimestamp型へ変更
df = df.withColumn(new_col_name, F.to_timestamp( F.col(old_col_name), pattern )
###指定した形式のdate型へ変更
df = df.withColumn(new_col_name, F.date_format( F.col(old_col_name), pattern )
★検証
#レコード数のカウント
df.count()
#1行目を表示
df.head()
#上から指定した分だけ表示
num = 5
df.show(num)
#カラム一覧表示
df.columns
#データフレームどうしの比較(カラムは一致すること)
df = df1.subsract(df2)
※戻り値はleft anti joinの結果
#レコード数のカウント
df.count()
#1行目を表示
df.head()
#上から指定した分だけ表示
num = 5
df.show(num)
#カラム一覧表示
df.columns
#データフレームどうしの比較(カラムは一致すること)
df = df1.subsract(df2)
※戻り値はleft anti joinの結果
Author And Source
この問題について(pysparkでデータ加工するための自分用チートシート), 我々は、より多くの情報をここで見つけました https://qiita.com/sho_oboegaki/items/58d8e5c3c0a4632d2ba9著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .