sparkを使って郵便番号情報を抽出


            sparkを使うと処理が速い
と小耳に挟んだので一度動かしてみた。

初期設定

Homebrew を使って下記の通り導入しました。

brew install apache-spark

データ取得

下記のサイトからCSV形式の郵便番号データを取得した。
zipcloud

起動方法

Homebrewでインストールしたので、下記のapache-sparkのフォルダ配下へ移動する。

cd /usr/local/Cellar/apache-spark/1.5.2/bin/

どうやらsparkはscala,java,python,Rに対応しているようですが、自分はpythonを使いたかったため

pyspark

で起動する。
"spark"のマークが見えたらOK。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.2
      /_/

Using Python version 2.7.11 (default, Dec 26 2015 17:47:53)
SparkContext available as sc, HiveContext available as sqlContext.
>>>

実装内容

全国の郵便番号のデータは全部で約12万件ある。
実装する前に、大量にデータを処理することを想定してデータを88回コピペし、約1,200万件に倍加してCSVファイルを新たに作成した。
そして、
- 郵便番号に"7"が含んでいるもの
- 市町村名に動物の名前が含んでいるもの
をAND条件で検索するように実装してみた。

sparkSample.py
# -*- coding: utf-8 -*-
import time
from pyspark import SparkContext

def main():
    #検索したい漢字
    queryList = ["鹿","鳥","熊","猿","犬"]

    #時間計測スタート
    start = time.time()

    #データセット
    sc = SparkContext('local', 'Simple App')
    logData = sc.textFile('KEN_ALL_OVER_TEN_MILLION.CSV')

    #各クエリ毎に情報を抽出
    for item in queryList:
        #splitでリスト化
        lines = logData.map(lambda x: x.split(','))
        #郵便番号が7を含むものを抽出
        numberPicks = lines.filter(lambda s: unicode('7', 'utf-8') in s[2])
        #市町村名に対象の漢字が含んでいるものを抽出
        namePicks = lines.filter(lambda s: unicode(item, 'utf-8') in s[7])
        #リストに格納
        desList = namePicks.collect()

        #ログ出力
        for line in desList:
            s = u""
            for i, unit in enumerate(line):
                if i != 0:
                    s = s + u', '
                s = s + unit
            print s.encode('utf-8')

        #ヒット数出力
        outlog = "query:" + item.decode('utf-8') + u", count:" + \
            unicode(str(len(desList)), 'utf-8') + ", Time:{0}".format(time.time() - start) + u"[sec]"
        print outlog.encode('utf-8')    

    #時間計測ストップ
    finish_time = time.time() - start
    print u"Time[total]:{0}".format(finish_time) + u"[sec]"

    #終了処理
    sc.stop()

if __name__ == '__main__':
    main()

評価をするにあたり、sparkを使わない場合のコードも書いてみた。

plain.py
# -*- coding: utf-8 -*-
import time

def pickAnimal(recordList, qList, start):
    #各クエリ毎に情報を抽出
    for q in qList:
        count = 0
        for record in recordList:
            sepRecord = record.split(",")
            if len(sepRecord) == 15:
                #郵便番号が7を含むものを抽出
                #市町村名に対象の漢字が含んでいるものを抽出
                if -1 < sepRecord[2].find("7") and -1 < sepRecord[7].find(q):
                    count = count + 1
                    #ログ出力
                    print record
                    #ヒット数出力
        print "query:" + q + ", count:" + str(count) + ", Time:{0}".format(time.time() - start) + "[sec]"

def main():

    sepRecordList = []
    #検索したい漢字
    queryList = ["鹿","鳥","熊","猿","犬"]

    #データセット
    srcpath = "KEN_ALL_OVER_TEN_MILLION.CSV"
    srcIN = open(srcpath, 'r')

    #時間計測スタート
    start = time.time()
    for line in srcIN:
        sepRecordList.append(line)

    pickAnimal(sepRecordList, queryList, start)

    #時間計測ストップ
    finish_time = time.time() - start
    print "Time:{0}".format(finish_time) + "[sec]"

    #終了処理
    srcIN.close()

if __name__ == '__main__':
    main()

計測結果

$pyspark sparkSample.py
~(中略)~
Time[total]:645.52906394[sec]
$python plain.py
~(中略)~
Time:112.966698885[sec]

ふ、普通に実装した方が約6倍速い。。。

編集後記

速さを実感するためには、分散処理の環境を整えるか機械学習で大量のデータを試行を繰り返さないとどうやらダメみたいです。
sparkの威力を肌で体験できるまでが次の目標な気がしました。

参考