spark 2.0系Tips #1 Jupyterでpyspark


概要

みんな大好きJupyter notebook(python)上で、Pyspark/Cythonを使っていろんなことをやる。とかいう記事を書こうと思ったけど、1記事に詰め込みすぎても醜いし、時間かかって書きかけで放置してしまうので、分割して初歩的なことからはじめようとおもった。

ということで、今回は、Jupyter起動して、sparkSession作るだけにしてみる。

使用バージョン

  • Python == 3.5.1
  • Spark == 2.0系最新(branch-2.0をビルドしたもの)
  • notebook == 4.2.1

Sparkの最新安定バージョンは、2016-07-01現在1.6.2なんだけど、もうgithubには2.0.0-rc1出てたりする。しかもrc1出て以降も、バグフィックスとかcommitされているので、結局今使っているのは、branch-2.0をビルドしたもの。
ちなみに、2.0で結構APIが変わっています。

Jupyter起動の前にやること

Jupyter起動前に、いろいろ環境変数をセットしておく。Jupyterの設定ファイルに書いといてもいいけど、書き方よくわかっていないし、毎回設定変えたりするので、環境変数でやってしまう。

$ export SPARK_HOME=/opt/local/spark
$ export PYSPARK_PYTHON=/opt/local/python-3.5.1/bin/python3
$ export PYSPARK_DRIVER_PYTHON=/opt/local/python-3.5.1/bin/python3
export PYTHONPATH=$(ls -a ${SPARK_HOME}/python/lib/py4j-*-src.zip):${SPARK_HOME}/python:$PYTHONPATH
$ export PYSPARK_SUBMIT_ARGS="
--packages com.amazonaws:aws-java-sdk-pom:1.11.8,org.apache.hadoop:hadoop-aws:2.7.2
 --conf 'spark.local.dir=/mnt/ephemeral/tmp/spark'
 --driver-java-options '-XX:+UseG1GC -XX:G1HeapRegionSize=32m -XX:+ParallelRefProcEnabled -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=35'
 --driver-library-path '/opt/local/hadoop/lib/native'
 --conf 'spark.driver.memory=2g'
 --conf 'spark.driver.maxResultSize=2g'
 --conf 'spark.executor.memory=45g'
 --conf 'spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:G1HeapRegionSize=32m -XX:+ParallelRefProcEnabled -XX:MaxGCPauseMillis=300 -XX:InitiatingHeapOccupancyPercent=35'
 --conf 'spark.executor.extraLibraryPath=/opt/local/hadoop/lib/native'
 --conf 'spark.executorEnv.LD_PRELOAD=/usr/lib/libjemalloc.so'
 --conf 'spark.network.timeout=600s'
 --conf 'spark.io.compression.codec=lz4'
 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
 --conf 'spark.kryo.referenceTracking=false'
 --conf 'spark.shuffle.io.numConnectionsPerPeer=4'
 --conf 'spark.sql.inMemoryColumnarStorage.batchSize=20000'
 --conf 'spark.sql.autoBroadcastJoinThreshold=104857600'
 --conf 'spark.sql.shuffle.partitions=800'
 pyspark-shell
"

環境変数説明

Sparkドキュメント見ればわかるけど一応。インストールパスとかは、自分の環境に合わせてね。これ以外にも、必要に応じてHADOOP_HOMEとかも。

環境変数名 適用
SPARK_HOME ま、これはそのまんま。spark-env.shとかで設定してもいい。
PYSPARK_PYTHON Workerが使うPython executable。指定しなければOSデフォルトのpython
PYSPARK_DRIVER_PYTHON Driverが使うPython executable。指定しなければOSデフォルトのpython
PYTHONPATH Jupyter上でimport pysparkできるように。py4jのバージョン変わったりするのでわざわざlsしてます
PYSPARK_SUBMIT_ARGS pysparkの起動オプション。aws関連のパッケージを読んだりしている。好きなように変えてください。メモリをたくさん使う設定にしているので、このまま張り付けたりしても、メモリ足りないと動きません。最後のpyspark-shellは必要。

複数notebook使う時、メモリなどの設定をnotebookごとに変えたい場合は、notebook上でsparkSessionを作る前に、os.environを使ってPYSPARK_SUBMIT_ARGSを上書きしてもいいよ。

Jupyter起動

上記環境変数とともに、こんな感じで。

$ /opt/local/python-3.5/bin/jupyter notebook --ip=0.0.0.0 --no-browser

これ以降は、Jupyter上で作業。以下は、Jupyterでつくったnotebookをmarkdown変換して張り付けただけ。

必要なモジュールのimport

import os

import pyspark
from pyspark import StorageLevel
from pyspark.sql import (
    SparkSession,
    functions as sql_funcs,
)
from pyspark.sql.types import *

SparkSession作成

2.0.0からは、pyspark.sql.SparkSessionがこういう時のフロントAPIになっているみたいなので、それに従う。

SparkSession使用時に、SparkContextのAPIにアクセスしたい場合は、spark_session.sparkContextでSparkContextを取得できる。

try:
    spark_session.stop()
except NameError:
    pass

spark_session = SparkSession.builder.appName(
    name='spark-tips-1',
).master(
    master=os.environ.get('X_SPARK_MASTER', 'local[*]'),
).enableHiveSupport().getOrCreate()

DataFrame作成

pythonの欠点は遅いところ。pysparkのソース見ればわかるけど、特にrddのAPIは、「処理を速くしよう」という意思を微塵も感じさせないコードになってたりする。
なので、DataFrame(将来的にはDataSet?)で完結できる処理は、極力DataFrameでやろう。

今回は、最初の一歩なので、お手軽にプロセス内のlistからDataFrame作成。

local_list = (
    ('2016-07-01 00:00:00', 'jiba-nyan', 1,),
    ('2016-07-01 00:01:00', 'bushi-nyan', 1,),
    ('2016-07-01 00:02:00', 'koma-san', 1,),
    ('2016-07-01 00:03:00', 'komajiro', 1,),
)

schema = StructType(
    fields=(
        # 日時
        StructField(
            name='dt',
            dataType=StringType(),
            nullable=False,
        ),
        # user_name
        StructField(
            name='user_name',
            dataType=StringType(),
            nullable=False,
        ),
        # rate
        StructField(
            name='rate',
            dataType=LongType(),
            nullable=True,
        ),
    ),
)


df = spark_session.createDataFrame(
    data=local_list,
    schema=schema,
).persist(
    storageLevel=StorageLevel.MEMORY_ONLY_SER,
)

printSchema()でスキーマ確認

df.printSchema()
root
 |-- dt: string (nullable = false)
 |-- user_name: string (nullable = false)
 |-- rate: long (nullable = true)

show()で中身を確認

df.show()
+-------------------+----------+----+
|                 dt| user_name|rate|
+-------------------+----------+----+
|2016-07-01 00:00:00| jiba-nyan|   1|
|2016-07-01 00:01:00|bushi-nyan|   1|
|2016-07-01 00:02:00|  koma-san|   1|
|2016-07-01 00:03:00|  komajiro|   1|
+-------------------+----------+----+

take()で中身を確認

df.take(4)
[Row(dt='2016-07-01 00:00:00', user_name='jiba-nyan', rate=1),
 Row(dt='2016-07-01 00:01:00', user_name='bushi-nyan', rate=1),
 Row(dt='2016-07-01 00:02:00', user_name='koma-san', rate=1),
 Row(dt='2016-07-01 00:03:00', user_name='komajiro', rate=1)]

日付のカラムが文字列になってるので、Timestamp型に変換

この場合は、うまい具合に日時フォーマットになってるので、cast(TimestampType())するだけ。
フォーマットが違う場合も、文字列操作などのSQL関数で、(python使わずに)大体何とかなります。

df_parsed = df.withColumn(
    'dt',
    df['dt'].cast(TimestampType()),
).persist(
    storageLevel=StorageLevel.MEMORY_ONLY_SER,
)
df_parsed.printSchema()
root
 |-- dt: timestamp (nullable = true)
 |-- user_name: string (nullable = false)
 |-- rate: long (nullable = true)
df_parsed.show()
+--------------------+----------+----+
|                  dt| user_name|rate|
+--------------------+----------+----+
|2016-07-01 00:00:...| jiba-nyan|   1|
|2016-07-01 00:01:...|bushi-nyan|   1|
|2016-07-01 00:02:...|  koma-san|   1|
|2016-07-01 00:03:...|  komajiro|   1|
+--------------------+----------+----+
df_parsed.take(4)
[Row(dt=datetime.datetime(2016, 7, 1, 0, 0), user_name='jiba-nyan', rate=1),
 Row(dt=datetime.datetime(2016, 7, 1, 0, 1), user_name='bushi-nyan', rate=1),
 Row(dt=datetime.datetime(2016, 7, 1, 0, 2), user_name='koma-san', rate=1),
 Row(dt=datetime.datetime(2016, 7, 1, 0, 3), user_name='komajiro', rate=1)]

まとめ

今回は最初の一歩なので、簡単だよねん。