勾配反復ツリー(GBDT)アルゴリズムの原理とSpark MLlib呼び出し例(Scala/Java/python)

8878 ワード

勾配反復ツリー
アルゴリズムの概要:
勾配リフトツリーは、決定ツリーの統合アルゴリズムです.決定ツリーを反復訓練することによって損失関数を最小化する.決定ツリーと同様に、勾配アップツリーはカテゴリフィーチャーを処理でき、多分類問題に拡張しやすく、フィーチャースケールを必要としないなどの性質を有する.Spark.mlは、既存のdecision treeツールを使用することによって実現される.
勾配アップツリーは、一連の決定ツリーを順次反復的に訓練する.1回の反復では、アルゴリズムは、既存の統合を使用して各トレーニングインスタンスのカテゴリを予測し、予測結果を実際のラベル値と比較する.再タグにより、予測結果が悪いインスタンスにより高い重みが与えられます.したがって、次の反復では、決定ツリーが以前のエラーを修正します.
インスタンスラベルを再タグするメカニズムは、損失関数によって指定されます.反復のたびに、勾配反復ツリーは、トレーニングデータ上で損失関数の値をさらに減少させる.spark.mlは分類問題に損失関数(Log Loss)を提供し,回帰問題に2種類の損失関数(二乗誤差と絶対誤差)を提供する.
       Spark.mlは,連続的特徴およびカテゴリ特徴に適した二分類および回帰ランダム森林アルゴリズムをサポートする.
*勾配リフトツリーは現在、多分類問題をサポートしていません.
パラメータ:
checkpointInterval:
タイプ:整数.
意味:チェックポイント間隔(>=1)を設定するか、チェックポイント(-1)を設定しない.
featuresCol:
タイプ:文字列タイプ.
意味:フィーチャー列名.
impurity:
タイプ:文字列タイプ.
意味:情報利得を計算する基準(大文字と小文字を区別しない).
labelCol:
タイプ:文字列タイプ.
意味:ラベル列名.
lossType:
タイプ:文字列タイプ.
意味:損失関数タイプ.
maxBins:
タイプ:整数.
意味:連続フィーチャーの離散化の最大数と、ノードごとにフィーチャーを分割する方法を選択します.
maxDepth:
タイプ:整数.
意味:ツリーの最大深度(>=0).
maxIter:
タイプ:整数.
意味:反復回数(>=0)
minInfoGain:
タイプ:二重精度タイプ.
意味:ノードを分割する際に必要な最小情報利得.
minInstancesPerNode:
タイプ:整数.
意味:分割後にノードから最小に含まれるインスタンスの数.
predictionCol:
タイプ:文字列タイプ.
意味:予測結果列名.
rawPredictionCol:
タイプ:文字列タイプ.
意味:元の予測.
seed:
タイプ:ロング整数.
意味:ランダムシード.
subsamplingRate:
タイプ:二重精度タイプ.
意味:1本の決定木で使用される訓練データの割合を学習し、範囲[0,1].
stepSize:
タイプ:二重精度タイプ.
意味:ステップ長を反復ごとに最適化します.
例:
次の例では、LibSVM形式のデータをインポートし、トレーニングデータとテストデータに分けます.第1部のデータを用いて訓練を行い,残りのデータをテストした.訓練の前に2つのデータ前処理法を用いて特徴を変換し,DataFrameにメタデータを追加した.
Scala:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a GBT model.
val gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

// Chain indexers and GBT in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))

// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))

val gbtModel = model.stages(2).asInstanceOf[GBTClassificationModel]
println("Learned classification GBT model:
" + gbtModel.toDebugString)

Java:
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.GBTClassificationModel;
import org.apache.spark.ml.classification.GBTClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset data = spark
  .read()
  .format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data);
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset trainingData = splits[0];
Dataset testData = splits[1];

// Train a GBT model.
GBTClassifier gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10);

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels());

// Chain indexers and GBT in a Pipeline.
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {labelIndexer, featureIndexer, gbt, labelConverter});

// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));

GBTClassificationModel gbtModel = (GBTClassificationModel)(model.stages()[2]);
System.out.println("Learned classification GBT model:
" + gbtModel.toDebugString());

Python:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only