Spark(AWS/EMR)を使ってウィンドウ分析+比較 spark-shell(scala) vs pyspark(python)


はじめに

Sparkを触り始めた時のメモを始めに紹介し、そのあとspark-shell(scala) vs pyspark(python)の比較を行った検証について紹介します。

引用元

まず手始めにSparkとはなんぞや?のところはNTTデータ様の情報が分かりやすいかと。
http://www.intellilink.co.jp/article/column/bigdata-kk01.html

やること

ここから書く内容は引用元に記載されている内容とほぼ変わらないのですが、ローカルインストールは非常にめんどいです。なので、インストールする代わりにEMRを使うと楽にSparkが試せますよというところの紹介です。

実際何をやるかは"iPhone"という文字が入っているツイートを分析対象として、そのツイートの中にどのような言葉が含まれているか?というのを上位20位だけ表示するというプログラムです。これは1分間に1回実行されます。

環境準備

では、さっそく準備としてAWS / EMRを使って、5分程度で準備をしたいと思います。

手順
EMRに接続する。
※手順は簡単なので書きたかったのですが、時間が無かったため断念。申し訳ない。
こちらを参照してください。http://aws.typepad.com/aws_japan/2015/06/apache-spark-on-amazon-emr.html

EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRRR      RRRRRR

[hadoop@ip-172-31-3-2 ~]$

# 形態素解析のkuromojiをダウンロードして解凍
wget https://github.com/downloads/atilika/kuromoji/kuromoji-0.7.7.zip
unzip kuromoji-0.7.7.zip

# twitter-streaming用のjarファイルを取得する。
wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-twitter_2.10/1.1.0/spark-streaming-twitter_2.10-1.1.0.jar

# twitter4j関連のjarファイルを取得する。
wget http://twitter4j.org/archive/twitter4j-3.0.3.zip
unzip -j ./twitter4j-3.0.3.zip "lib/*.jar" -d twitter4j/
TWITTER4J_JARS=./twitter4j/twitter4j-async-3.0.3.jar,./twitter4j/twitter4j-core-3.0.3.jar,./twitter4j/twitter4j-examples-3.0.3.jar,./twitter4j/twitter4j-media-support-3.0.3.jar,./twitter4j/twitter4j-stream-3.0.3.jar

#sparkを実行!!
spark-shell --jars ./kuromoji-0.7.7/lib/kuromoji-0.7.7.jar,./spark-streaming-twitter_2.10-1.1.0.jar,$TWITTER4J_JARS

これで準備ができました。

scala >という文字が出てくるまで30秒ほど待ちます。
表示されたら、次のスクリプトを実行します。

scala>
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import org.atilika.kuromoji._
import org.atilika.kuromoji.Tokenizer._
import java.util.regex._

System.setProperty("twitter4j.oauth.consumerKey", "J7dXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
System.setProperty("twitter4j.oauth.consumerSecret", "8PtXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
System.setProperty("twitter4j.oauth.accessToken", "144549526-LLXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
System.setProperty("twitter4j.oauth.accessTokenSecret", "gAiXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")


val ssc = new StreamingContext(sc, Seconds(60))
val stream = TwitterUtils.createStream(ssc, None, Array("iPhone"))

val tweetStream = stream.flatMap(status => {
   val tokenizer : Tokenizer = Tokenizer.builder().build()
   val features : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]()
   var tweetText : String = status.getText()
   val japanese_pattern : Pattern = Pattern.compile("[¥¥u3040-¥¥u309F]+")
   if(japanese_pattern.matcher(tweetText).find()) {
     tweetText = tweetText.replaceAll("http(s*)://(.*)/", "")

     val tokens : java.util.List[Token] = tokenizer.tokenize(tweetText)
     val pattern : Pattern = Pattern.compile("^[a-zA-Z]+$|^[0-9]+$")
     for(index <- 0 to tokens.size()-1) { // <-の間にスペースがあったので削除
       val token = tokens.get(index)
       val matcher : Matcher = pattern.matcher(token.getSurfaceForm())
       if(token.getSurfaceForm().length() >= 3 && !matcher.find()) {
         features += (token.getSurfaceForm() + "-" + token.getAllFeatures())
       }
     }
   }
   (features)
})

val topCounts60 = tweetStream.map((_, 1)
                  ).reduceByKeyAndWindow(_+_, Seconds(60*60)
                  ).map{case (topic, count) => (count, topic)
                  }.transform(_.sortByKey(false))

topCounts60.foreachRDD(rdd => {
   val topList = rdd.take(20)
   println("¥ nPopular topics in last 60*60 seconds (%s words):".format(rdd.count()))
   topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
ssc.start()
ssc.awaitTermination()

コメントを取り除いて、変えた所だけコメントをしておきました。

consumerKeyとかについては、こちらを参照:https://syncer.jp/twitter-api-matome

実行結果

結果は次のようになりました。iPhoneという単語を呟いている人は、"キャス","ケース","アプリ"という単語を一緒につぶやいているというのが分かってきます。

15/10/18_11-35-01
 INFO DAGScheduler: Job 11 finished: foreachRDD at <console>:51, took 0.057814 s
¥ nPopular topics in last 60*60 seconds (879 words):
キャス-名詞,一般,*,*,*,*,* (96 tweets)
цены-名詞,固有名詞,組織,*,*,*,* (83 tweets)
Украине-名詞,固有名詞,組織,*,*,*,* (82 tweets)
известны-名詞,固有名詞,組織,*,*,*,* (82 tweets)
стали-名詞,固有名詞,組織,*,*,*,* (82 tweets)
ケース-名詞,一般,*,*,*,*,ケース,ケース,ケース (50 tweets)
アプリ-名詞,一般,*,*,*,*,* (38 tweets)
...-名詞,サ変接続,*,*,*,*,* (30 tweets)
アクセサリ-名詞,一般,*,*,*,*,アクセサリ,アクセサリ,アクセサリ (27 tweets)
スマホ-名詞,一般,*,*,*,*,* (26 tweets)
チェック-名詞,サ変接続,*,*,*,*,チェック,チェック,チェック (21 tweets)
いろいろ-副詞,助詞類接続,*,*,*,*,いろいろ,イロイロ,イロイロ (20 tweets)
вас-名詞,固有名詞,組織,*,*,*,* (19 tweets)
золотых-名詞,固有名詞,組織,*,*,*,* (18 tweets)
Кто-名詞,固有名詞,組織,*,*,*,* (18 tweets)
сможет-名詞,固有名詞,組織,*,*,*,* (18 tweets)
монет-名詞,固有名詞,組織,*,*,*,* (18 tweets)
больше-名詞,固有名詞,組織,*,*,*,* (18 tweets)
собрал-名詞,固有名詞,組織,*,*,*,* (18 tweets)
....-名詞,サ変接続,*,*,*,*,* (17 tweets)
15/10/18_11-36-01
 INFO DAGScheduler: Job 14 finished: foreachRDD at <console>:51, took 0.060231 s
¥ nPopular topics in last 60*60 seconds (1220 words):
キャス-名詞,一般,*,*,*,*,* (147 tweets)
цены-名詞,固有名詞,組織,*,*,*,* (83 tweets)
Украине-名詞,固有名詞,組織,*,*,*,* (82 tweets)
известны-名詞,固有名詞,組織,*,*,*,* (82 tweets)
стали-名詞,固有名詞,組織,*,*,*,* (82 tweets)
アプリ-名詞,一般,*,*,*,*,* (69 tweets)
ケース-名詞,一般,*,*,*,*,ケース,ケース,ケース (60 tweets)
...-名詞,サ変接続,*,*,*,*,* (36 tweets)
スマホ-名詞,一般,*,*,*,*,* (28 tweets)
вас-名詞,固有名詞,組織,*,*,*,* (28 tweets)
золотых-名詞,固有名詞,組織,*,*,*,* (27 tweets)
Кто-名詞,固有名詞,組織,*,*,*,* (27 tweets)
アクセサリ-名詞,一般,*,*,*,*,アクセサリ,アクセサリ,アクセサリ (27 tweets)
сможет-名詞,固有名詞,組織,*,*,*,* (27 tweets)
монет-名詞,固有名詞,組織,*,*,*,* (27 tweets)
собрал-名詞,固有名詞,組織,*,*,*,* (27 tweets)
больше-名詞,固有名詞,組織,*,*,*,* (27 tweets)
チェック-名詞,サ変接続,*,*,*,*,チェック,チェック,チェック (25 tweets)
....-名詞,サ変接続,*,*,*,*,* (22 tweets)

以上。サクッと試したいときにEMRを使っていますという話です。
こんな感じで、自社製品をキーワードとして、それに対するポジティブワードやネガティブワードを拾ったり集計したりするときに使えると思います。

私が所属している会社ですとプロビジョニングツールを使って物理マシンにインストールするってことをやっておりますが、EMRに比べると地道でインストールだけで1日終わったりします。

比較 spark-shell(scala) vs pyspark(python) あと java と MapReduce

結局どれを使えばいいのでしょう?
次の3つの観点から考えてみたいと思います。

1:利用意図

例えば、上記twitter4j / kuromojiを利用したいと思った時にpysparkだと、一旦JVMを挟むので効率が悪いです。さらにpythonからJavaを呼ぶにはwrapperを作る必要も出てくるので生産性は微妙であると思います。Javaのライブラリを使う時にはpythonは選択肢から外れてしまいます。

2:生産性

Javaの場合クラスを作って、クラスをコンパイルして・・・という使い方をします。
一方、python, scalaは対話式に、スクリプトを記述できます。間違って書いてもすぐに修正ができます。
pythonとscalaの違いは、場合によってはvalを付けるか付けないか程度で、記述する行数も変わりません。
Javaはこれらの2倍以上行数が膨らむので、オススメしません。

3:パフォーマンス

まだデータが取り切れていないので記述しづらいのですが、ワードカウントを行うプログラムを組んだ際、
おおよそ、Scala > Java >> Python > MapReduceで
Scala のパフォーマンスが一番良いという結果が出ています。
MapReduceより2倍~3倍ほど早くなるという体感はもっております。CPU効率の安定性はscalaが一番です。

(追記)spark-shell vs pysparkの速度実験を行ってみた。

今回はm3.xlarge を3台動かすという元で実験。

適当にファイルを拾ってきます。
wget http://samplecsvs.s3.amazonaws.com/SacramentocrimeJanuary2006.csv

ファイルを10000倍にします。

shell
for i in `seq 1 10000`
do
  cat SacramentocrimeJanuary2006.csv >> testdata.csv
done

途中でディスクがないって言われる。Ctrl+Cで中断。

ls -lah
2.3G Oct 18 16:35 testdata.csv
2.3GBのファイルができたみたい。

hadoop fs -put testdata.csv
hadoop fs -ls /user/hadoop/ #testdata.csvが入ったことを確認する。

ローカルのtestdata.csvは削除しておく
rm testdata.csv

これで分析対象のファイルについて確認ができました。
pysparkを起動します。

pysparkでワードカウント

pyspark
text_file  = sc.textFile("/user/hadoop/testdata.csv")
power = text_file.filter(lambda line: "POWER" in line)
power.count()

初回実行:36.418306 s
結果:130158

2回目実行:32.066442 s
3回目実行:36.211069 s

spark-shellでワードカウント

spark-shell
val text_file  = sc.textFile("/user/hadoop/testdata.csv")
val power = text_file.filter(line => line.contains("POWER"))
power.count()

初回実行:6.206395s
結果変わらず

2回目実行:3.943935 s
3回目実行:3.948451 s

考察

今回はすごく単純な処理だったので5-10倍くらい差が開いたが、複雑な処理をやったときでも2-3倍差が開くことを確認済み
spark-shellの場合、1回目の実行にメモリにのると、2回目以降は早い。でも、pysparkはあまり速度が出ない気がする。

今のところ、scalaで記述したほうが良さそうというのが個人的所感。