Spark API Example理解
3961 ワード
学習の背景:Sparkを学ぶ過程で、必要なのは必ず公的文書を読むことです.ここですhttp://spark.apache.org/examp... の例をいくつかして性質の総括を理解します.
Spark API Examplesには以下の内容が含まれています. RDD API:データ変換、操作の2つの部分 を完了する. Data Frame API:RDDはDataFrameに変換され、データベーステーブルを読んでDataFrameに変換され、その後、関係動作 が行われる.マシン学習API:Logisticで訓練と予測を行う RDD処理:スペースで区切られた単語の数を統計してファイルに保存します.多くの分布式mlアルゴリズムは、特徴抽出、分類、回帰、クラスター、推奨などのタスク を含む.は、ワークフローを構築するためのmlパイプ、パラメータを最適化するためのcrossvalidator、およびモデルを保存およびロードするためのモデルの耐久性などのツールを提供する. 次の例:データRDDを読んで、ラベルとフィーチャーの列があるDataFrameに変換します.LRモデルに入力してトレーニングします.モデルトレーニングが終了したら、各点のlabelを予測します.
Spark API Examplesには以下の内容が含まれています.
JavaRDD textFile = sc.textFile("hdfs://...");
JavaPairRDD counts = textFile
.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);
counts.saveAsTextFile("hdfs://...");
RDD処理:NUM_を投げるSAMPLESのサンプルポイント(x、yランダム)は、円内に落ちる確率を統計します.List l = new ArrayList<>(NUM_SAMPLES);
for (int i = 0; i < NUM_SAMPLES; i++) {
l.add(i);
}
// parallelize , : 、
long count = sc.parallelize(l).filter(i -> {
double x = Math.random();
double y = Math.random();
return x*x + y*y < 1;
}).count();
System.out.println("Pi is roughly " + 4.0 * count / NUM_SAMPLES);
DataFrame処理:Sparkでは、DataFrameは列名の分散式集合であり、この集合では様々な関係操作が可能である.以下の例:logファイルのerror情報を読み出す.一列に処理します.RDD、StructTypeでData Frameに変換します.ラインの列にERRORが含まれている行をフィルタリングし、カウントします.// Creates a DataFrame having a single column named "line"
JavaRDD textFile = sc.textFile("hdfs://...");
JavaRDD rowRDD = textFile.map(RowFactory::create);
List fields = Arrays.asList(
DataTypes.createStructField("line", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
DataFrame errors = df.filter(col("line").like("%ERROR%"));
// Counts all the errors
errors.count();
// Counts errors mentioning MySQL
errors.filter(col("line").like("%MySQL%")).count();
// Fetches the MySQL errors as an array of strings
errors.filter(col("line").like("%MySQL%")).collect();
DataFrame処理:sql Contectを使ってmysqlデータベースのテーブルを読み取り、DataFrameを返します.グループによって、各年齢層の人数を計算します.結果をJson形式に保存します.// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
String url =
"jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword";
DataFrame df = sqlContext
.read()
.format("jdbc")
.option("url", url)
.option("dbtable", "people")
.load();
// Looks the schema of this DataFrame.
df.printSchema();
// Counts people by age
DataFrame countsByAge = df.groupBy("age").count();
countsByAge.show();
// Saves countsByAge to S3 in the JSON format.
countsByAge.write().format("json").save("s3a://...");
マシン学習API:sparkのマシン学習ライブラリmllibが提供されました.// Every record of this DataFrame contains the label and
// features represented by a vector.
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
DataFrame df = jsql.createDataFrame(data, schema);
// Set parameters for the algorithm.
// Here, we limit the number of iterations to 10.
LogisticRegression lr = new LogisticRegression().setMaxIter(10);
// Fit the model to the data.
LogisticRegressionModel model = lr.fit(df);
// Inspect the model: get the feature weights.
Vector weights = model.weights();
// Given a dataset, predict each point's label, and show the results.
model.transform(df).show();