Spark Streamingの動作確認 ローカル及びクラスタ環境


Spark入門の7章及び以下のサイトを参考にSparkStreamingの動作確認を行なった際のメモ。
http://spark.apache.org/docs/latest/streaming-programming-guide.html

ローカル環境でSpark Streamingの動作確認

ポート9999番で待ち受けるデータサーバ(データを書き込む)がある。このネットワークソケットに接続して、1秒毎にテキストデータをストリームデータとして読み込み、そのストリームデータに含まれる単語数をカウントするプログラムの動作確認を行う。
ソースは上記で参照しているチュートリアルから参照可能。

作成したプログラムは以下のとおり。

chap7-1.py
# -*- coding:utf-8 -*-
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark import StorageLevel

sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("localhost", 9999,StorageLevel.MEMORY_AND_DISK_SER) 

words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()
ssc.start()             
ssc.awaitTermination() 

StreamingContextを生成

StreamingContextはストリーミング機能のメインエントリーポイントであり、StreamingContextを利用して入力ストリームデータの定義などを行う。

class pyspark.streaming.StreamingContext(sparkContext, batchDuration=None, jssc=None)

以下の例では1秒毎のバッチ処理を繰り返すStreamingContextを生成。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

DStreamの作成

このコンテキスト(StreamingContext)を使用して、DStreamを定義する。DStreamは時々刻々と生成されるRDDみたいなもので、データサーバから受信するストリームデータである。DStreamに含まれるそれぞれのレコードは1行のテキストである。
以下の例ではsocketTextStreamメソッドを使用し、localhostの9999番ポートに接続するように定義。

lines = ssc.socketTextStream("localhost", 9999)

SocketTextStreamは、TCPソース(hostname:port)からinputを生成する。データはTCPソケットを使用して受け取とられ、UTF-8でエンコードし、¥nをデリミタとした行単位でバイトで受け取る。

socketTextStream(hostname, port, storageLevel=StorageLevel(True, True, False, False, 2))

StorageLevelは次のURLを参照。 http://spark.apache.org/docs/latest/api/python/pyspark.html

word count処理

ストリームデータとして得られたテキストラインをスペース区切りで文字に分割する。

words = lines.flatMap(lambda line: line.split(" "))

この例では、それぞれのテキストラインが複数の語(word)に分けられ、そしてwordのストリームがDStreamとなる。

次にwordをカウントする。
words DStreamを(word,1)ペアのDStreamにマップし、それからwordの頻度を得るためにreduce処理を行う。

pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

これで単語毎の出現回数を保持するDStreamが定義できた。

次に毎秒生成されるカウントをpprintメソッドで出力する。

wordCounts.pprint()

ここまででセットアップは完了したので、以下のコードを追加してストリーム処理を開始する。

ssc.start()             # 計算処理を開始
ssc.awaitTermination()  # stopメソッドによるcontextの終了や例外が発生するまで待つ

動作確認

作成したSparkStreamingのアプリを実行

$ spark-submit --master local chap7-1.py

別ターミナルにて、データ書き込み用に、NetCatを使って9999番ポートで待ち受けるようにし、テキストデータを書き込む。

$ nc -lk 9999
hello world

プログラムを実行したターミナルにて以下の出力があることを確認。

・・・
------------------------------------------
Time: 2015-12-22 08:28:02
-------------------------------------------
('world', 1)
('hello', 1)
・・・

HDFS環境でSpark Streamingの動作確認

HDFS上のディレクトリを監視し、新しいファイルが生成されたら、入力データとして取り込み、ワードをカウントするアプリを実行する。
※既存のファイルを更新しても入力データとして認識されないので注意。

変更点

先ほどのコードとの違いは以下の点。

  • localの代わりにyarn-clientをmasterとして動作するように指定 sc = SparkContext("yarn-client", "Chap7-2")
  • socketTextStreamメソッドの代わりにtextFileStreamメソッドを使用し、監視対象のhdfsディレクトリを引数で指定。textFileStreamはディレクトリに含まれるファイルを監視し、新しいファイルが生成されたらストリームデータとして取得する。
lines = ssc.textFileStream("hdfs:///user/y_tadayasu/data/") 

動作確認

アプリの実行

$ spark-submit --master yarn-client ./chap7-2.py

HDFS上にファイルを生成

$ hdfs dfs -put /opt/spark/README.md /user/y_tadayasu/data/

アプリを実行しているターミナルに以下の出力があることを確認。

-------------------------------------------
Time: 2015-12-23 04:15:50
-------------------------------------------
('', 67)
('guide,', 1)
('APIs', 1)
('name', 1)
('It', 2)
('package.', 1)
('particular', 3)
('tools', 1)
('must', 1)
('params', 1)
...

この結果だと空白もカウントされているので、filterを使用して空白は取り除くようにする。

words = lines.flatMap(lambda line: line.split(" ")).filter(lambda x:x)
-------------------------------------------
Time: 2015-12-23 04:25:30
-------------------------------------------
('guide,', 1)
('APIs', 1)
('optimized', 1)
('name', 1)
('It', 2)
('package.', 1)
('particular', 3)
('tools', 1)
('must', 1)
('params', 1)
...

コード全体は以下のとおり。

chap7-2.py
# -*- coding:utf-8 -*-
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark import StorageLevel
sc = SparkContext("yarn-client", "Chap7-2")
ssc = StreamingContext(sc, 10)
lines = ssc.textFileStream("hdfs:///user/y_tadayasu/data/") 
words = lines.flatMap(lambda line: line.split(" ")).filter(lambda x:x)
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()             
ssc.awaitTermination()