Apache Spark 1.6.1ラーニングチュートリアル-Titanic Dataのレビュー
20600 ワード
このブログは主にTitanic datasetを利用してpyspark 1.6を簡単にプレゼンテーションしている.1の使い方.このグループのデータは比較的に小さくて、訓練データは891行しかなくて、訓練、テストデータはここでダウンロードすることができます(train.csv、test.csv).
内容データロードおよび変換 データクリーンアップ 特徴抽出 セットml/mllibアルゴリズム 1.データのロードと変換
a.データロード
pysparkを実行すると、SparkContect(sc)が同時に実行されます.sc.textFileを使用してcsvファイルを読み込み、生成されたデータフォーマットはRDDです.同時に、sqlContextを使用することもできます.read.textはcsvファイルを読み込みますが、生成データフォーマットはDataFrameです.
最初の3行のRDDデータを見てみましょう.
データの構造はpython listで、各行はstringです.
b.RDDをDataFrameに変換
Spark DataFrameは、R data frameとpython pandas DataFrameから得られたインスピレーションであり、Sparkの新しいデータフォーマットであり、以降のバージョンではRDDに取って代わる.その文法はRDDとは異なり、Rとpandasに近い.ここではRDDをDataFrameに変換して、後のデータ処理のためにします.
手順:データヘッダ(1行目) を削除各行のデータをカンマで分割しtuple に変換するデータヘッダによるデータ列 の命名
DataFrameのフォーマットを見てみましょう.
+———–+——–+——+———+——————–+——+—+—–+—–+—————-+——-+—–+——–+ |PassengerId|Survived|Pclass|FirstName| Name| Sex|Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked| +———–+——–+——+———+——————–+——+—+—–+—–+—————-+——-+—–+——–+ | 1| 0| 3| Braund| Mr. Owen Harris| male| 22| 1| 0| A/5 21171| 7.25| | S| | 2| 1| 1| Cumings| Mrs. John Bradle…|female| 38| 1| 0| PC 17599|71.2833| C85| C| | 3| 1| 3|Heikkinen| Miss. Laina|female| 26| 0| 0|STON/O2. 3101282| 7.925| | S| +———–+——–+——+———+——————–+——+—+—–+—–+—————-+——-+—–+——–+
c.トレーニングとテストデータの統合
トレーニングとテストデータを統合し、データの整理と特徴抽出を容易にします.
2.データクリーンアップ
a.Age,SibSp,Parch,Fareを数値データに変換
Age,SibSp,Parch,Fareの4つの変数が数値データに変換されていることが分かる.
b.失われたデータを平均数で埋める
Age,Fareは263,1個の欠落データがあり,ここでは単純に平均値で充填した.
各データの欠落:
3.フィーチャー抽出
a.NameからのTitleの抽出
ここでの主なアイデアは、user-defined-function(udf)をName列に適用して、Titleをキャプチャすることです.
データdf複数列Title:
b.索引カテゴリ変数
カテゴリ変数は、通常、いくつかの機械学習アルゴリズムを適用するために数値変数を変換する必要があります.ここでは単にインデックスを利用してこの機能を実現するだけです.このようなマッピングは、例えば、Sex−male=>0、Sex−female=>1である.しかし,この方法には,無形に導入された人為的な変数間の数値的関連性のため,その不足もある.One-hot-encodingメソッドはこの不足を回避できますが、データ次元(フィーチャー数)が大幅に増加します.
生成されたデータでは、EmbarkedはS=>0、C=>1、Q=>2にマッピングされる.
c.データフォーマットはlabel/features
ml/mllibアルゴリズムパッケージを使用するには、特徴をVectorに変換する必要がある.
c.再分割訓練、検証、テストデータ
4.ml/mllibアルゴリズムモデルの使用
ml対応のデータフォーマットはDataFrameであり、mllib対応のデータフォーマットはRDDである.次に,論理回帰,決定木,ランダム森林を用いてフィッティングを行い,それらのモデル表現を観察する.
ろんりかいふく
AUC ROC of Logistic Regression model is:0.836952368823論理回帰モデルのROC 0.837,次に決定木とランダム森林と比較する.
決定木とランダム森林
モデル変調がない場合,ランダム森林はより良い予測効果を示すように見える.
完全なpythonコードはここで見つけることができます
内容
a.データロード
pysparkを実行すると、SparkContect(sc)が同時に実行されます.sc.textFileを使用してcsvファイルを読み込み、生成されたデータフォーマットはRDDです.同時に、sqlContextを使用することもできます.read.textはcsvファイルを読み込みますが、生成データフォーマットはDataFrameです.
train_path='/Users/chaoranliu/Desktop/github/kaggle/titanic/train.csv'
test_path='/Users/chaoranliu/Desktop/github/kaggle/titanic/test.csv'
# Load csv file as RDD
train_rdd = sc.textFile(train_path)
test_rdd = sc.textFile(test_path)
最初の3行のRDDデータを見てみましょう.
train_rdd.take(3)
データの構造はpython listで、各行はstringです.
[u'PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked',
u'1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S',
u'2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C']
b.RDDをDataFrameに変換
Spark DataFrameは、R data frameとpython pandas DataFrameから得られたインスピレーションであり、Sparkの新しいデータフォーマットであり、以降のバージョンではRDDに取って代わる.その文法はRDDとは異なり、Rとpandasに近い.ここではRDDをDataFrameに変換して、後のデータ処理のためにします.
手順:
# Parse RDD to DF
def parseTrain(rdd):
# extract data header (first row)
header = rdd.first()
# remove header
body = rdd.filter(lambda r: r!=header)
def parseRow(row):
# a function to parse each text row into
# data format
# remove double quote, split the text row by comma
row_list = row.replace('"','').split(",")
# convert python list to tuple, which is
# compatible with pyspark data structure
row_tuple = tuple(row_list)
return row_tuple
rdd_parsed = body.map(parseRow)
colnames = header.split(",")
colnames.insert(3,'FirstName')
return rdd_parsed.toDF(colnames)
## Parse Test RDD to DF
def parseTest(rdd):
header = rdd.first()
body = rdd.filter(lambda r: r!=header)
def parseRow(row):
row_list = row.replace('"','').split(",")
row_tuple = tuple(row_list)
return row_tuple
rdd_parsed = body.map(parseRow)
colnames = header.split(",")
colnames.insert(2,'FirstName')
return rdd_parsed.toDF(colnames)
train_df = parseTrain(train_rdd)
test_df = parseTest(test_rdd)
DataFrameのフォーマットを見てみましょう.
train_df.show(3)
+———–+——–+——+———+——————–+——+—+—–+—–+—————-+——-+—–+——–+ |PassengerId|Survived|Pclass|FirstName| Name| Sex|Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked| +———–+——–+——+———+——————–+——+—+—–+—–+—————-+——-+—–+——–+ | 1| 0| 3| Braund| Mr. Owen Harris| male| 22| 1| 0| A/5 21171| 7.25| | S| | 2| 1| 1| Cumings| Mrs. John Bradle…|female| 38| 1| 0| PC 17599|71.2833| C85| C| | 3| 1| 3|Heikkinen| Miss. Laina|female| 26| 0| 0|STON/O2. 3101282| 7.925| | S| +———–+——–+——+———+——————–+——+—+—–+—–+—————-+——-+—–+——–+
c.トレーニングとテストデータの統合
トレーニングとテストデータを統合し、データの整理と特徴抽出を容易にします.
## Add Survived column to test
from pyspark.sql.functions import lit, col
train_df = train_df.withColumn('Mark',lit('train'))
test_df = (test_df.withColumn('Survived',lit(0))
.withColumn('Mark',lit('test')))
test_df = test_df[train_df.columns]
## Append Test data to Train data
df = train_df.unionAll(test_df)
2.データクリーンアップ
a.Age,SibSp,Parch,Fareを数値データに変換
df = (df.withColumn('Age',df['Age'].cast("double"))
.withColumn('SibSp',df['SibSp'].cast("double"))
.withColumn('Parch',df['Parch'].cast("double"))
.withColumn('Fare',df['Fare'].cast("double"))
.withColumn('Survived',df['Survived'].cast("double"))
)
df.printSchema()
Age,SibSp,Parch,Fareの4つの変数が数値データに変換されていることが分かる.
root
|-- PassengerId: string (nullable = true)
|-- Survived: double (nullable = true)
|-- Pclass: string (nullable = true)
|-- FirstName: string (nullable = true)
|-- Name: string (nullable = true)
|-- Sex: string (nullable = true)
|-- Age: double (nullable = true)
|-- SibSp: double (nullable = true)
|-- Parch: double (nullable = true)
|-- Ticket: string (nullable = true)
|-- Fare: double (nullable = true)
|-- Cabin: string (nullable = true)
|-- Embarked: string (nullable = true)
|-- Mark: string (nullable = false)
b.失われたデータを平均数で埋める
Age,Fareは263,1個の欠落データがあり,ここでは単純に平均値で充填した.
numVars = ['Survived','Age','SibSp','Parch','Fare']
def countNull(df,var):
return df.where(df[var].isNull()).count()
missing = {var: countNull(df,var) for var in numVars}
age_mean = df.groupBy().mean('Age').first()[0]
fare_mean = df.groupBy().mean('Fare').first()[0]
df = df.na.fill({'Age':age_mean,'Fare':fare_mean})
各データの欠落:
{'Age': 263, 'Fare': 1, 'Parch': 0, 'SibSp': 0, 'Survived': 0}
3.フィーチャー抽出
a.NameからのTitleの抽出
ここでの主なアイデアは、user-defined-function(udf)をName列に適用して、Titleをキャプチャすることです.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
## created user defined function to extract title
getTitle = udf(lambda name: name.split('.')[0].strip(),StringType())
df = df.withColumn('Title', getTitle(df['Name']))
df.select('Name','Title').show(3)
データdf複数列Title:
+--------------------+-----+
| Name|Title|
+--------------------+-----+
| Mr. Owen Harris| Mr|
| Mrs. John Bradle...| Mrs|
| Miss. Laina| Miss|
+--------------------+-----+
only showing top 3 rows
b.索引カテゴリ変数
カテゴリ変数は、通常、いくつかの機械学習アルゴリズムを適用するために数値変数を変換する必要があります.ここでは単にインデックスを利用してこの機能を実現するだけです.このようなマッピングは、例えば、Sex−male=>0、Sex−female=>1である.しかし,この方法には,無形に導入された人為的な変数間の数値的関連性のため,その不足もある.One-hot-encodingメソッドはこの不足を回避できますが、データ次元(フィーチャー数)が大幅に増加します.
catVars = ['Pclass','Sex','Embarked','Title']
## index Sex variable
from pyspark.ml.feature import StringIndexer
si = StringIndexer(inputCol = 'Sex', outputCol = 'Sex_indexed')
df_indexed = si.fit(df).transform(df).drop('Sex').withColumnRenamed('Sex_indexed','Sex')
## make use of pipeline to index all categorical variables
def indexer(df,col):
si = StringIndexer(inputCol = col, outputCol = col+'_indexed').fit(df)
return si
indexers = [indexer(df,col) for col in catVars]
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = indexers)
df_indexed = pipeline.fit(df).transform(df)
df_indexed.select('Embarked','Embarked_indexed').show(3)
生成されたデータでは、EmbarkedはS=>0、C=>1、Q=>2にマッピングされる.
+--------+----------------+
|Embarked|Embarked_indexed|
+--------+----------------+
| S| 0.0|
| C| 1.0|
| S| 0.0|
+--------+----------------+
only showing top 3 rows
c.データフォーマットはlabel/features
ml/mllibアルゴリズムパッケージを使用するには、特徴をVectorに変換する必要がある.
catVarsIndexed = [i+'_indexed' for i in catVars]
featuresCol = numVars+catVarsIndexed
featuresCol.remove('Survived')
labelCol = ['Mark','Survived']
from pyspark.sql import Row
from pyspark.mllib.linalg import DenseVector
row = Row('mark','label','features')
df_indexed = df_indexed[labelCol+featuresCol]
# 0-mark, 1-label, 2-features
# map features to DenseVector
lf = (df_indexed.map(lambda r: (row(r[0],r[1],DenseVector(r[2:]))))
.toDF())
# index label
# convert numeric label to categorical, which is required by
# decisionTree and randomForest
lf = (StringIndexer(inputCol = 'label',outputCol='index')
.fit(lf)
.transform(lf))
lf.show(3)
+-----+-----+--------------------+-----+
| mark|label| features|index|
+-----+-----+--------------------+-----+
|train| 0.0|[22.0,1.0,0.0,7.2...| 0.0|
|train| 1.0|[38.0,1.0,0.0,71....| 1.0|
|train| 1.0|[26.0,0.0,0.0,7.9...| 1.0|
+-----+-----+--------------------+-----+
only showing top 3 rows
c.再分割訓練、検証、テストデータ
train = lf.where(lf.mark =='train')
test = lf.where(lf.mark =='test')
# random split further to get train/validate
train,validate = train.randomSplit([0.7,0.3],seed =121)
print 'Train Data Number of Row: '+ str(train.count())
print 'Validate Data Number of Row: '+ str(validate.count())
print 'Test Data Number of Row: '+ str(test.count())
Train Data Number of Row: 636
Validate Data Number of Row: 255
Test Data Number of Row: 418
4.ml/mllibアルゴリズムモデルの使用
ml対応のデータフォーマットはDataFrameであり、mllib対応のデータフォーマットはRDDである.次に,論理回帰,決定木,ランダム森林を用いてフィッティングを行い,それらのモデル表現を観察する.
ろんりかいふく
from pyspark.ml.classification import LogisticRegression
# regPara: lasso regularisation parameter (L1)
lr = LogisticRegression(maxIter = 100, regParam = 0.05, labelCol='index').fit(train)
# Evaluate model based on auc ROC(default for binary classification)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
def testModel(model, validate = validate):
pred = model.transform(validate)
evaluator = BinaryClassificationEvaluator(labelCol = 'index')
return evaluator.evaluate(prod)
print 'AUC ROC of Logistic Regression model is: '+str(testModel(lr))
AUC ROC of Logistic Regression model is:0.836952368823論理回帰モデルのROC 0.837,次に決定木とランダム森林と比較する.
決定木とランダム森林
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
dt = DecisionTreeClassifier(maxDepth = 3, labelCol ='index').fit(train)
rf = RandomForestClassifier(numTrees = 100, labelCol = 'index').fit(train)
models = {'LogisticRegression':lr,
'DecistionTree':dt,
'RandomForest':rf}
modelPerf = {k:testModel(v) for k,v in models.iteritems()}
print modelPerf
{'DecistionTree': 0.7700267447784003,
'LogisticRegression': 0.8369523688232298,
'RandomForest': 0.8597809475292919}
モデル変調がない場合,ランダム森林はより良い予測効果を示すように見える.
完全なpythonコードはここで見つけることができます