Spark Streamingの動作確認 ローカル及びクラスタ環境
Spark入門の7章及び以下のサイトを参考にSparkStreamingの動作確認を行なった際のメモ。
http://spark.apache.org/docs/latest/streaming-programming-guide.html
ローカル環境でSpark Streamingの動作確認
ポート9999番で待ち受けるデータサーバ(データを書き込む)がある。このネットワークソケットに接続して、1秒毎にテキストデータをストリームデータとして読み込み、そのストリームデータに含まれる単語数をカウントするプログラムの動作確認を行う。
ソースは上記で参照しているチュートリアルから参照可能。
作成したプログラムは以下のとおり。
# -*- coding:utf-8 -*-
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark import StorageLevel
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("localhost", 9999,StorageLevel.MEMORY_AND_DISK_SER)
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
StreamingContextを生成
StreamingContextはストリーミング機能のメインエントリーポイントであり、StreamingContextを利用して入力ストリームデータの定義などを行う。
class pyspark.streaming.StreamingContext(sparkContext, batchDuration=None, jssc=None)
以下の例では1秒毎のバッチ処理を繰り返すStreamingContextを生成。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
DStreamの作成
このコンテキスト(StreamingContext)を使用して、DStreamを定義する。DStreamは時々刻々と生成されるRDDみたいなもので、データサーバから受信するストリームデータである。DStreamに含まれるそれぞれのレコードは1行のテキストである。
以下の例ではsocketTextStreamメソッドを使用し、localhostの9999番ポートに接続するように定義。
lines = ssc.socketTextStream("localhost", 9999)
SocketTextStreamは、TCPソース(hostname:port)からinputを生成する。データはTCPソケットを使用して受け取とられ、UTF-8でエンコードし、¥nをデリミタとした行単位でバイトで受け取る。
socketTextStream(hostname, port, storageLevel=StorageLevel(True, True, False, False, 2))
StorageLevelは次のURLを参照。 http://spark.apache.org/docs/latest/api/python/pyspark.html
word count処理
ストリームデータとして得られたテキストラインをスペース区切りで文字に分割する。
words = lines.flatMap(lambda line: line.split(" "))
この例では、それぞれのテキストラインが複数の語(word)に分けられ、そしてwordのストリームがDStreamとなる。
次にwordをカウントする。
words DStreamを(word,1)ペアのDStreamにマップし、それからwordの頻度を得るためにreduce処理を行う。
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
これで単語毎の出現回数を保持するDStreamが定義できた。
次に毎秒生成されるカウントをpprintメソッドで出力する。
wordCounts.pprint()
ここまででセットアップは完了したので、以下のコードを追加してストリーム処理を開始する。
ssc.start() # 計算処理を開始
ssc.awaitTermination() # stopメソッドによるcontextの終了や例外が発生するまで待つ
動作確認
作成したSparkStreamingのアプリを実行
$ spark-submit --master local chap7-1.py
別ターミナルにて、データ書き込み用に、NetCatを使って9999番ポートで待ち受けるようにし、テキストデータを書き込む。
$ nc -lk 9999
hello world
プログラムを実行したターミナルにて以下の出力があることを確認。
・・・
------------------------------------------
Time: 2015-12-22 08:28:02
-------------------------------------------
('world', 1)
('hello', 1)
・・・
HDFS環境でSpark Streamingの動作確認
HDFS上のディレクトリを監視し、新しいファイルが生成されたら、入力データとして取り込み、ワードをカウントするアプリを実行する。
※既存のファイルを更新しても入力データとして認識されないので注意。
変更点
先ほどのコードとの違いは以下の点。
- localの代わりにyarn-clientをmasterとして動作するように指定
sc = SparkContext("yarn-client", "Chap7-2")
- socketTextStreamメソッドの代わりにtextFileStreamメソッドを使用し、監視対象のhdfsディレクトリを引数で指定。textFileStreamはディレクトリに含まれるファイルを監視し、新しいファイルが生成されたらストリームデータとして取得する。
lines = ssc.textFileStream("hdfs:///user/y_tadayasu/data/")
動作確認
アプリの実行
$ spark-submit --master yarn-client ./chap7-2.py
HDFS上にファイルを生成
$ hdfs dfs -put /opt/spark/README.md /user/y_tadayasu/data/
アプリを実行しているターミナルに以下の出力があることを確認。
-------------------------------------------
Time: 2015-12-23 04:15:50
-------------------------------------------
('', 67)
('guide,', 1)
('APIs', 1)
('name', 1)
('It', 2)
('package.', 1)
('particular', 3)
('tools', 1)
('must', 1)
('params', 1)
...
この結果だと空白もカウントされているので、filterを使用して空白は取り除くようにする。
words = lines.flatMap(lambda line: line.split(" ")).filter(lambda x:x)
-------------------------------------------
Time: 2015-12-23 04:25:30
-------------------------------------------
('guide,', 1)
('APIs', 1)
('optimized', 1)
('name', 1)
('It', 2)
('package.', 1)
('particular', 3)
('tools', 1)
('must', 1)
('params', 1)
...
コード全体は以下のとおり。
# -*- coding:utf-8 -*-
from pyspark.context import SparkContext
from pyspark.streaming import StreamingContext
from pyspark import StorageLevel
sc = SparkContext("yarn-client", "Chap7-2")
ssc = StreamingContext(sc, 10)
lines = ssc.textFileStream("hdfs:///user/y_tadayasu/data/")
words = lines.flatMap(lambda line: line.split(" ")).filter(lambda x:x)
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
Author And Source
この問題について(Spark Streamingの動作確認 ローカル及びクラスタ環境), 我々は、より多くの情報をここで見つけました https://qiita.com/t-yotsu/items/0544f5cc89b556c9b689著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .