Hadoop/Spark/SparkSQL/SparkStreaming/SparkMLlib入門
Hadoop、Spark、Spark SQL、Spark Streaming、Spark MLlibを一通り試用した。
環境はCloudera Quickstart VM (VirutalBox)。
Hadoop
HadoopでWord Countする。
今回分析対象にするテキストファイルを拾ってくる。
[cloudera@quickstart ~]$ wget http://stewartonbibleschool.org/bible/text/genesis.txt
HDFSにコピー。
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal genesis.txt
[cloudera@quickstart ~]$ hadoop fs -ls
Found 1 items
-rw-r--r-- 1 cloudera cloudera 207327 2018-01-28 08:32 genesis.txt
MapReduceのアプリケーションの例やWordcountの使用法を確認。
[cloudera@quickstart ~]$ hadoop jar /usr/jars/hadoop-examples.jar
An example program must be given as the first argument.
Valid program names are:
aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files.
aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files.
bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi.
dbcount: An example job that count the pageview counts from a database.
distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi.
grep: A map/reduce program that counts the matches of a regex in the input.
join: A job that effects a join over sorted, equally partitioned datasets
multifilewc: A job that counts words from several files.
pentomino: A map/reduce tile laying program to find solutions to pentomino problems.
pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method.
randomtextwriter: A map/reduce program that writes 10GB of random textual data per node.
randomwriter: A map/reduce program that writes 10GB of random data per node.
secondarysort: An example defining a secondary sort to the reduce.
sort: A map/reduce program that sorts the data written by the random writer.
sudoku: A sudoku solver.
teragen: Generate data for the terasort
terasort: Run the terasort
teravalidate: Checking results of terasort
wordcount: A map/reduce program that counts the words in the input files.
wordmean: A map/reduce program that counts the average length of the words in the input files.
wordmedian: A map/reduce program that counts the median length of the words in the input files.
wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files.
[cloudera@quickstart ~]$ hadoop jar /usr/jars/hadoop-examples.jar wordcount
Usage: wordcount <in> [<in>...] <out>
Word Countを実行。
[cloudera@quickstart ~]$ hadoop jar /usr/jars/hadoop-examples.jar wordcount genesis.txt out
…(略)…
18/01/28 08:35:52 INFO mapreduce.Job: map 0% reduce 0%
18/01/28 08:35:58 INFO mapreduce.Job: map 100% reduce 0%
18/01/28 08:36:05 INFO mapreduce.Job: map 100% reduce 100%
…(略)…
結果のファイルの確認。
[cloudera@quickstart ~]$ hadoop fs -ls
Found 2 items
-rw-r--r-- 1 cloudera cloudera 207327 2018-01-28 08:32 genesis.txt
drwxr-xr-x - cloudera cloudera 0 2018-01-28 08:36 out
[cloudera@quickstart ~]$ hadoop fs -ls out/
Found 2 items
-rw-r--r-- 1 cloudera cloudera 0 2018-01-28 08:36 out/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 54576 2018-01-28 08:36 out/part-r-00000
ローカルにコピー。
[cloudera@quickstart ~]$ hadoop fs -copyToLocal out/part-r-00000 local.txt
[cloudera@quickstart ~]$ head local.txt
(for 2
(from 1
(is 1
(out 1
(the 1
10:10: 1
10:11: 1
10:12: 1
10:13: 1
10:14: 1
Spark
SparkでWord Countする。
Wordcountの対象ファイルを確認。さっき使ったのと同じやつ。
[cloudera@quickstart ~]$ hadoop fs -ls
Found 1 items
-rw-r--r-- 1 cloudera cloudera 5458199 2018-01-28 02:10 genesis.txt
PySpark/Jupyter notebookを立ち上げてHDFSからRDDに読み込む。各行がRDDにおける1要素になる。
lines = sc.textFile("hdfs:/user/cloudera/genesis.txt")
正しく読み込めてるか確認する。
lines.count()
# >>> 1538
各行を単語毎に区切って、各単語と1をペアにしたタプルにする。
ちなみにflatMapは、[line, line, ...]を[[word, word, ...], [word, ...], ...]ではなく[word, word, word, ...]にするためのマップ関数。mapは普通のマップ関数。
words = lines.flatMap(lambda line: line.split(" "))
tuples = words.map(lambda word: (word, 1))
キー毎にreduceする。
counts = tuples.reduceByKey(lambda a, b: (a+b))
RDDからHDFSに保存する。RDDのパーティションをまとめて、ひとつの出力ファイルにするためにcoalesce(1)を使う。
counts.coalesce(1).saveAsTextFile("hdfs:/user/cloudera/wordcount/outputDir")
ローカルファイルにコピー。
[cloudera@quickstart ~]$ hadoop fs -copyToLocal wordcount/outputDir/part-00000 count.txt
[cloudera@quickstart ~]$ head count.txt
('', 393)
('womb.', 3)
('21:23:', 1)
('27:15:', 1)
('subtilty,', 1)
('sea.', 1)
('23:15:', 1)
('bulls,', 1)
('backward.', 1)
('doest:', 1)
SparkSQL
SparkSQLでGROUPBYやJOINなど基本的な操作をしてみる。
PySparkでPostgreSQLのテーブルを読みこむ:
from pyspark.sql import SQLContext
sqlsc = SQLContext(sc)
df = sqlsc.read.format("jdbc")\
.option("url", "jdbc:postgresql://localhost/cloudera?user=cloudera")\
.option("dbtable", "gameclicks")\
.load()
df.printSchema()
# >>> root
# >>> |-- timestamp: timestamp (nullable = false)
# >>> |-- clickid: integer (nullable = false)
# >>> |-- userid: integer (nullable = false)
# >>> |-- usersessionid: integer (nullable = false)
# >>> |-- ishit: integer (nullable = false)
# >>> |-- teamid: integer (nullable = false)
# >>> |-- teamlevel: integer (nullable = false)
df.count()
# >>> 755806
df.show(6)
# >>> +--------------------+-------+------+-------------+-----+------+---------+
# >>> | timestamp|clickid|userid|usersessionid|ishit|teamid|teamlevel|
# >>> +--------------------+-------+------+-------------+-----+------+---------+
# >>> |2016-05-26 15:06:...| 105| 1038| 5916| 0| 25| 1|
# >>> |2016-05-26 15:07:...| 154| 1099| 5898| 0| 44| 1|
# >>> |2016-05-26 15:07:...| 229| 899| 5757| 0| 71| 1|
# >>> |2016-05-26 15:07:...| 322| 2197| 5854| 0| 99| 1|
# >>> |2016-05-26 15:07:...| 22| 1362| 5739| 0| 13| 1|
# >>> |2016-05-26 15:07:...| 107| 1071| 5939| 0| 27| 1|
# >>> +--------------------+-------+------+-------------+-----+------+---------+
# >>> only showing top 6 rows
SELECT、FILTER、GroupByなどを試す:
df.select("userid", "teamid").show(6)
# >>> +------+------+
# >>> |userid|teamid|
# >>> +------+------+
# >>> | 1038| 25|
# >>> | 1099| 44|
# >>> | 899| 71|
# >>> | 2197| 99|
# >>> | 1362| 13|
# >>> | 1071| 27|
# >>> +------+------+
# >>> only showing top 6 rows
df.filter(df["teamid"] < 20).select("userid", "teamid").show(6)
# >>> +------+------+
# >>> |userid|teamid|
# >>> +------+------+
# >>> | 1362| 13|
# >>> | 1072| 13|
# >>> | 624| 2|
# >>> | 217| 18|
# >>> | 1072| 13|
# >>> | 937| 11|
# >>> +------+------+
# >>> only showing top 6 rows
df.groupBy("teamid").count().show(6)
# >>> +------+-----+
# >>> |teamid|count|
# >>> +------+-----+
# >>> | 32| 8734|
# >>> | 35| 8817|
# >>> | 36| 8755|
# >>> | 39| 9398|
# >>> | 44| 9045|
# >>> | 51| 8911|
# >>> +------+-----+
# >>> only showing top 6 rows
平均と合計の計算:
from pyspark.sql.functions import *
df.select(mean('ishit'), sum('ishit')).show()
# >>> +------------------+----------+
# >>> | avg(ishit)|sum(ishit)|
# >>> +------------------+----------+
# >>> |0.1103232840173272| 83383|
# >>> +------------------+----------+
JOINを試す:
df2 = sqlsc.read.format("jdbc")\
.option("url", "jdbc:postgresql://localhost/cloudera?user=cloudera")\
.option("dbtable", "adclicks")\
.load()
df2.printSchema()
# >>> root
# >>> |-- timestamp: timestamp (nullable = false)
# >>> |-- txid: integer (nullable = false)
# >>> |-- usersessionid: integer (nullable = false)
# >>> |-- teamid: integer (nullable = false)
# >>> |-- userid: integer (nullable = false)
# >>> |-- adid: integer (nullable = false)
# >>> |-- adcategory: string (nullable = false)
merge = df.join(df2, "userid")
merge.printSchema()
# >>> root
# >>> |-- userid: integer (nullable = false)
# >>> |-- timestamp: timestamp (nullable = false)
# >>> |-- clickid: integer (nullable = false)
# >>> |-- usersessionid: integer (nullable = false)
# >>> |-- ishit: integer (nullable = false)
# >>> |-- teamid: integer (nullable = false)
# >>> |-- teamlevel: integer (nullable = false)
# >>> |-- timestamp: timestamp (nullable = false)
# >>> |-- txid: integer (nullable = false)
# >>> |-- usersessionid: integer (nullable = false)
# >>> |-- teamid: integer (nullable = false)
# >>> |-- adid: integer (nullable = false)
# >>> |-- adcategory: string (nullable = false)
merge.show(6)
# >>> +------+--------------------+-------+-------------+-----+------+---------+--------------------+-----+-------------+------+----+----------+
# >>> |userid| timestamp|clickid|usersessionid|ishit|teamid|teamlevel| timestamp| txid|usersessionid|teamid|adid|adcategory|
# >>> +------+--------------------+-------+-------------+-----+------+---------+--------------------+-----+-------------+------+----+----------+
# >>> | 231|2016-06-08 00:45:...| 376796| 23626| 0| 142| 4|2016-06-08 01:40:...|23669| 23626| 142| 27| games|
# >>> | 231|2016-06-08 00:45:...| 376796| 23626| 0| 142| 4|2016-06-08 09:24:...|24122| 23626| 142| 4| games|
# >>> | 231|2016-06-08 00:45:...| 376796| 23626| 0| 142| 4|2016-06-08 17:21:...|24659| 23626| 142| 22| computers|
# >>> | 231|2016-06-08 00:45:...| 376796| 23626| 0| 142| 4|2016-06-08 23:34:...|25076| 23626| 142| 21| movies|
# >>> | 231|2016-06-08 00:45:...| 376796| 23626| 0| 142| 4|2016-06-09 16:32:...|26220| 23626| 142| 16| clothing|
# >>> | 231|2016-06-08 00:45:...| 376796| 23626| 0| 142| 4|2016-06-10 10:43:...|28180| 27925| 142| 13| computers|
# >>> +------+--------------------+-------+-------------+-----+------+---------+--------------------+-----+-------------+------+----+----------+
# >>> only showing top 6 rows
Spark Streaming
Spark Steamingでバッチインターバルやウインドウ幅の設定などをする。
import re
def parse(line):
match = re.search("Sx=(\d+)", line)
if match:
val = match.group(1)
return [int(val)]
return []
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc,1)
lines = ssc.socketTextStream("rtd.hpwren.ucsd.edu", 12020)
vals = lines.flatMap(parse)
window = vals.window(10,5)
ウィンドウ内の各データに対して最大、最小、平均、標準偏差を求める
def stats(rdd):
print(rdd.collect())
if rdd.count() > 0:
print("max = {}, min = {}, mean = {}, stdev = {}".format(rdd.max(), rdd.min(), rdd.mean(), rdd.stdev()))
window.foreachRDD(lambda rdd: stats(rdd))
ssc.start()
# >>> [10, 10, 10]
# >>> max = 10, min = 10, mean = 10.0, stdev = 0.0
# >>> [10, 10, 10, 10, 10, 10, 10, 10]
# >>> max = 10, min = 10, mean = 10.0, stdev = 0.0
# >>> [10, 10, 10, 10, 10, 9, 8, 8, 8, 8]
# >>> max = 10, min = 8, mean = 9.1, stdev = 0.9433981132056604
# >>> [9, 8, 8, 8, 8, 8, 8, 8, 8, 8]
# >>> max = 9, min = 8, mean = 8.1, stdev = 0.29999999999999993
# >>> [8, 8, 8, 8, 8, 8, 8, 8, 8, 8]
# >>> max = 8, min = 8, mean = 8.0, stdev = 0.0
ストリーミングを停止する。
ssc.stop()
# >>> [8, 8, 8, 8, 8, 8, 8, 8, 8, 8]
# >>> max = 8, min = 8, mean = 8.0, stdev = 0.0
Spark MLlib
JPEXのスポット市場取引データに対して決定木アルゴリズムを試用する。
データを読み込む。
rom pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer
sqlContext = SQLContext(sc)
df = sqlContext.read.load("file:///home/cloudera/tmp4/spot_2017.csv",
format="com.databricks.spark.csv",
header='true', inferSchema='true')
df.count()
# >>> 15312
df.columns
# >>> ['id',
# >>> 'date',
# >>> 'time',
# >>> 'sell_kWh',
# >>> 'buy_kWh',
# >>> 'contract_kWh',
# >>> 'sys_price',
# >>> 'price_hokkaido',
# >>> 'price_tohoku',
# >>> 'price_tokyo',
# >>> 'price_chubu',
# >>> 'price_hokuriku',
# >>> 'price_kansai',
# >>> 'price_chugoku',
# >>> 'price_shikoku',
# >>> 'price_kyushu']
システムプライスが9円/kWhを超えるかどうかを予測対象とし、
各地域のエリアプライスを特徴量とする。
binarizer = Binarizer(threshold=9.0, inputCol="sys_price", outputCol="label")
binarizedDF = binarizer.transform(df)
binarizedDF.select("sys_price","label").show(6)
# >>> +---------+-----+
# >>> |sys_price|label|
# >>> +---------+-----+
# >>> | 10.23| 1.0|
# >>> | 9.62| 1.0|
# >>> | 9.17| 1.0|
# >>> | 8.8| 0.0|
# >>> | 8.89| 0.0|
# >>> | 8.69| 0.0|
# >>> +---------+-----+
# >>> only showing top 6 rows
featureColumns = [ 'price_hokkaido',
'price_tohoku',
'price_tokyo',
'price_chubu',
'price_hokuriku',
'price_kansai',
'price_chugoku',
'price_shikoku',
'price_kyushu']
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
assembled = assembler.transform(binarizedDF)
訓練データとテストデータに分ける。
(trainingData, testData) = assembled.randomSplit([0.8, 0.2], seed=12345)
(trainingData.count(), testData.count())
# >>> (12194, 3118)
学習と予測
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5,
minInstancesPerNode=20, impurity="gini")
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)
predictions = model.transform(testData)
predictions.select("prediction", "label").show(10)
# >>> +----------+-----+
# >>> |prediction|label|
# >>> +----------+-----+
# >>> | 0.0| 0.0|
# >>> | 0.0| 0.0|
# >>> | 0.0| 0.0|
# >>> | 1.0| 1.0|
# >>> | 0.0| 1.0|
# >>> | 1.0| 1.0|
# >>> | 1.0| 1.0|
# >>> | 1.0| 1.0|
# >>> | 1.0| 1.0|
# >>> | 0.0| 0.0|
# >>> +----------+-----+
# >>> only showing top 10 rows
# predictions.select("prediction", "label").write.save(path="file:///home/cloudera/tmp4/predictions.csv",
# format="com.databricks.spark.csv",
# header="true")
予測精度による評価
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
evaluator = MulticlassClassificationEvaluator(labelCol="label",
predictionCol="prediction",
metricName="precision")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
# >>> Accuracy = 0.949968
混同行列(Confusion Matix)による評価
predictions = predictions.select("prediction", "label")
# predictions.rdd.take(2)
# >>> [Row(prediction=0.0, label=0.0), Row(prediction=0.0, label=0.0)]
# predictions.rdd.map(tuple).take(2)
# >>> [(0.0, 0.0), (0.0, 0.0)]
metrics = MulticlassMetrics(predictions.rdd.map(tuple))
metrics.confusionMatrix().toArray().transpose()
# >>> array([[ 1755., 84.],
# >>> [ 72., 1207.]])
参考
Big Data Specialization, UCSanDiego, cousera
Author And Source
この問題について(Hadoop/Spark/SparkSQL/SparkStreaming/SparkMLlib入門), 我々は、より多くの情報をここで見つけました https://qiita.com/s0sasaki/items/c2d1702f1ac2c11fedff著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .