Apache Spark 1.6.1ラーニングチュートリアル-Titanic Dataのレビュー


このブログは主にTitanic datasetを利用してpyspark 1.6を簡単にプレゼンテーションしている.1の使い方.このグループのデータは比較的に小さくて、訓練データは891行しかなくて、訓練、テストデータはここでダウンロードすることができます(train.csv、test.csv).
内容
  • データロードおよび変換
  • データクリーンアップ
  • 特徴抽出
  • セットml/mllibアルゴリズム
  • 1.データのロードと変換
    a.データロード
    pysparkを実行すると、SparkContect(sc)が同時に実行されます.sc.textFileを使用してcsvファイルを読み込み、生成されたデータフォーマットはRDDです.同時に、sqlContextを使用することもできます.read.textはcsvファイルを読み込みますが、生成データフォーマットはDataFrameです.
    train_path='/Users/chaoranliu/Desktop/github/kaggle/titanic/train.csv'
    test_path='/Users/chaoranliu/Desktop/github/kaggle/titanic/test.csv'
    # Load csv file as RDD
    train_rdd = sc.textFile(train_path)
    test_rdd = sc.textFile(test_path)

    最初の3行のRDDデータを見てみましょう.
    train_rdd.take(3)

    データの構造はpython listで、各行はstringです.
      [u'PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked',
      u'1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S',
      u'2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C']

    b.RDDをDataFrameに変換
    Spark DataFrameは、R data frameとpython pandas DataFrameから得られたインスピレーションであり、Sparkの新しいデータフォーマットであり、以降のバージョンではRDDに取って代わる.その文法はRDDとは異なり、Rとpandasに近い.ここではRDDをDataFrameに変換して、後のデータ処理のためにします.
    手順:
  • データヘッダ(1行目)
  • を削除
  • 各行のデータをカンマで分割しtuple
  • に変換する
  • データヘッダによるデータ列
  • の命名
    # Parse RDD to DF
    def parseTrain(rdd):
    
        # extract data header (first row)
        header = rdd.first()
        # remove header
        body = rdd.filter(lambda r: r!=header)
    def parseRow(row):
            # a function to parse each text row into
            # data format
            # remove double quote, split the text row by comma
            row_list = row.replace('"','').split(",")
            # convert python list to tuple, which is
            # compatible with pyspark data structure
            row_tuple = tuple(row_list)
            return row_tuple
    
        rdd_parsed = body.map(parseRow)
    
        colnames = header.split(",")
        colnames.insert(3,'FirstName')
    
        return rdd_parsed.toDF(colnames)
    
    ## Parse Test RDD to DF 
    def parseTest(rdd):
        header = rdd.first()
        body = rdd.filter(lambda r: r!=header)
    
        def parseRow(row):
            row_list = row.replace('"','').split(",")
            row_tuple = tuple(row_list)
            return row_tuple
    
        rdd_parsed = body.map(parseRow)
    
        colnames = header.split(",")
        colnames.insert(2,'FirstName')
    
        return rdd_parsed.toDF(colnames)
    
    train_df = parseTrain(train_rdd)
    test_df = parseTest(test_rdd)

    DataFrameのフォーマットを見てみましょう.
    train_df.show(3)

    +———–+——–+——+———+——————–+——+—+—–+—–+—————-+——-+—–+——–+ |PassengerId|Survived|Pclass|FirstName| Name| Sex|Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked| +———–+——–+——+———+——————–+——+—+—–+—–+—————-+——-+—–+——–+ | 1| 0| 3| Braund| Mr. Owen Harris| male| 22| 1| 0| A/5 21171| 7.25| | S| | 2| 1| 1| Cumings| Mrs. John Bradle…|female| 38| 1| 0| PC 17599|71.2833| C85| C| | 3| 1| 3|Heikkinen| Miss. Laina|female| 26| 0| 0|STON/O2. 3101282| 7.925| | S| +———–+——–+——+———+——————–+——+—+—–+—–+—————-+——-+—–+——–+
    c.トレーニングとテストデータの統合
    トレーニングとテストデータを統合し、データの整理と特徴抽出を容易にします.
    ## Add Survived column to test
    from pyspark.sql.functions import lit, col
    train_df = train_df.withColumn('Mark',lit('train'))
    test_df = (test_df.withColumn('Survived',lit(0))
                      .withColumn('Mark',lit('test')))
    test_df = test_df[train_df.columns]
    ## Append Test data to Train data
    df = train_df.unionAll(test_df)

    2.データクリーンアップ
    a.Age,SibSp,Parch,Fareを数値データに変換
    df = (df.withColumn('Age',df['Age'].cast("double"))
                .withColumn('SibSp',df['SibSp'].cast("double"))
                .withColumn('Parch',df['Parch'].cast("double"))
                .withColumn('Fare',df['Fare'].cast("double"))
                .withColumn('Survived',df['Survived'].cast("double"))
                )
    df.printSchema()

    Age,SibSp,Parch,Fareの4つの変数が数値データに変換されていることが分かる.
    root
     |-- PassengerId: string (nullable = true)
     |-- Survived: double (nullable = true)
     |-- Pclass: string (nullable = true)
     |-- FirstName: string (nullable = true)
     |-- Name: string (nullable = true)
     |-- Sex: string (nullable = true)
     |-- Age: double (nullable = true)
     |-- SibSp: double (nullable = true)
     |-- Parch: double (nullable = true)
     |-- Ticket: string (nullable = true)
     |-- Fare: double (nullable = true)
     |-- Cabin: string (nullable = true)
     |-- Embarked: string (nullable = true)
     |-- Mark: string (nullable = false)

    b.失われたデータを平均数で埋める
    Age,Fareは263,1個の欠落データがあり,ここでは単純に平均値で充填した.
    numVars = ['Survived','Age','SibSp','Parch','Fare']
    def countNull(df,var):
        return df.where(df[var].isNull()).count()
    
    missing = {var: countNull(df,var) for var in numVars}
    age_mean = df.groupBy().mean('Age').first()[0]
    fare_mean = df.groupBy().mean('Fare').first()[0]
    df = df.na.fill({'Age':age_mean,'Fare':fare_mean})

    各データの欠落:
    {'Age': 263, 'Fare': 1, 'Parch': 0, 'SibSp': 0, 'Survived': 0}

    3.フィーチャー抽出
    a.NameからのTitleの抽出
    ここでの主なアイデアは、user-defined-function(udf)をName列に適用して、Titleをキャプチャすることです.
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    ## created user defined function to extract title
    getTitle = udf(lambda name: name.split('.')[0].strip(),StringType())
    df = df.withColumn('Title', getTitle(df['Name']))
    
    df.select('Name','Title').show(3)

    データdf複数列Title:
    +--------------------+-----+
    |                Name|Title|
    +--------------------+-----+
    |     Mr. Owen Harris|   Mr|
    | Mrs. John Bradle...|  Mrs|
    |         Miss. Laina| Miss|
    +--------------------+-----+
    only showing top 3 rows

    b.索引カテゴリ変数
    カテゴリ変数は、通常、いくつかの機械学習アルゴリズムを適用するために数値変数を変換する必要があります.ここでは単にインデックスを利用してこの機能を実現するだけです.このようなマッピングは、例えば、Sex−male=>0、Sex−female=>1である.しかし,この方法には,無形に導入された人為的な変数間の数値的関連性のため,その不足もある.One-hot-encodingメソッドはこの不足を回避できますが、データ次元(フィーチャー数)が大幅に増加します.
    catVars = ['Pclass','Sex','Embarked','Title']
    
    ## index Sex variable
    from pyspark.ml.feature import StringIndexer
    si = StringIndexer(inputCol = 'Sex', outputCol = 'Sex_indexed')
    df_indexed = si.fit(df).transform(df).drop('Sex').withColumnRenamed('Sex_indexed','Sex')
    
    ## make use of pipeline to index all categorical variables
    def indexer(df,col):
        si = StringIndexer(inputCol = col, outputCol = col+'_indexed').fit(df)
        return si
    
    indexers = [indexer(df,col) for col in catVars]
    
    from pyspark.ml import Pipeline
    pipeline = Pipeline(stages = indexers)
    df_indexed = pipeline.fit(df).transform(df)
    
    df_indexed.select('Embarked','Embarked_indexed').show(3)

    生成されたデータでは、EmbarkedはS=>0、C=>1、Q=>2にマッピングされる.
    +--------+----------------+
    |Embarked|Embarked_indexed|
    +--------+----------------+
    |       S|             0.0|
    |       C|             1.0|
    |       S|             0.0|
    +--------+----------------+
    only showing top 3 rows

    c.データフォーマットはlabel/features
    ml/mllibアルゴリズムパッケージを使用するには、特徴をVectorに変換する必要がある.
    catVarsIndexed = [i+'_indexed' for i in catVars]
    featuresCol = numVars+catVarsIndexed
    featuresCol.remove('Survived')
    labelCol = ['Mark','Survived']
    
    from pyspark.sql import Row
    from pyspark.mllib.linalg import DenseVector
    row = Row('mark','label','features')
    
    df_indexed = df_indexed[labelCol+featuresCol]
    # 0-mark, 1-label, 2-features
    # map features to DenseVector
    lf = (df_indexed.map(lambda r: (row(r[0],r[1],DenseVector(r[2:]))))
                    .toDF())
    # index label
    # convert numeric label to categorical, which is required by
    # decisionTree and randomForest
    lf = (StringIndexer(inputCol = 'label',outputCol='index')
                    .fit(lf)
                    .transform(lf))
    
    lf.show(3)
    +-----+-----+--------------------+-----+
    | mark|label|            features|index|
    +-----+-----+--------------------+-----+
    |train|  0.0|[22.0,1.0,0.0,7.2...|  0.0|
    |train|  1.0|[38.0,1.0,0.0,71....|  1.0|
    |train|  1.0|[26.0,0.0,0.0,7.9...|  1.0|
    +-----+-----+--------------------+-----+
    only showing top 3 rows

    c.再分割訓練、検証、テストデータ
    train = lf.where(lf.mark =='train')
    test = lf.where(lf.mark =='test')
    
    # random split further to get train/validate
    train,validate = train.randomSplit([0.7,0.3],seed =121)
    
    print 'Train Data Number of Row: '+ str(train.count())
    print 'Validate Data Number of Row: '+ str(validate.count())
    print 'Test Data Number of Row: '+ str(test.count())
    Train Data Number of Row: 636
    Validate Data Number of Row: 255
    Test Data Number of Row: 418

    4.ml/mllibアルゴリズムモデルの使用
    ml対応のデータフォーマットはDataFrameであり、mllib対応のデータフォーマットはRDDである.次に,論理回帰,決定木,ランダム森林を用いてフィッティングを行い,それらのモデル表現を観察する.
    ろんりかいふく
    from pyspark.ml.classification import LogisticRegression
    
    # regPara: lasso regularisation parameter (L1)
    lr = LogisticRegression(maxIter = 100, regParam = 0.05, labelCol='index').fit(train)
    
    # Evaluate model based on auc ROC(default for binary classification)
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
    def testModel(model, validate = validate):
        pred = model.transform(validate)
        evaluator = BinaryClassificationEvaluator(labelCol = 'index')
        return evaluator.evaluate(prod)
    
    print 'AUC ROC of Logistic Regression model is: '+str(testModel(lr))

    AUC ROC of Logistic Regression model is:0.836952368823論理回帰モデルのROC 0.837,次に決定木とランダム森林と比較する.
    決定木とランダム森林
    from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
    
    dt = DecisionTreeClassifier(maxDepth = 3, labelCol ='index').fit(train)
    rf = RandomForestClassifier(numTrees = 100, labelCol = 'index').fit(train)
    
    models = {'LogisticRegression':lr,
              'DecistionTree':dt,
              'RandomForest':rf}
    
    modelPerf = {k:testModel(v) for k,v in models.iteritems()}
    
    print modelPerf
    {'DecistionTree': 0.7700267447784003,
     'LogisticRegression': 0.8369523688232298,
     'RandomForest': 0.8597809475292919}

    モデル変調がない場合,ランダム森林はより良い予測効果を示すように見える.
    完全なpythonコードはここで見つけることができます