Spark API Example理解

3961 ワード

学習の背景:Sparkを学ぶ過程で、必要なのは必ず公的文書を読むことです.ここですhttp://spark.apache.org/examp... の例をいくつかして性質の総括を理解します.
Spark API Examplesには以下の内容が含まれています.
  • RDD API:データ変換、操作の2つの部分
  • を完了する.
  • Data Frame API:RDDはDataFrameに変換され、データベーステーブルを読んでDataFrameに変換され、その後、関係動作
  • が行われる.
  • マシン学習API:Logisticで訓練と予測を行う
  • RDD処理:スペースで区切られた単語の数を統計してファイルに保存します.
    JavaRDD textFile = sc.textFile("hdfs://...");
    JavaPairRDD counts = textFile
        .flatMap(s -> Arrays.asList(s.split(" ")).iterator())
        .mapToPair(word -> new Tuple2<>(word, 1))
        .reduceByKey((a, b) -> a + b);
    counts.saveAsTextFile("hdfs://...");
    RDD処理:NUM_を投げるSAMPLESのサンプルポイント(x、yランダム)は、円内に落ちる確率を統計します.
    List l = new ArrayList<>(NUM_SAMPLES);
    for (int i = 0; i < NUM_SAMPLES; i++) {
      l.add(i);
    }
    
    //   parallelize        ,  :  、    
    long count = sc.parallelize(l).filter(i -> {
      double x = Math.random();
      double y = Math.random();
      return x*x + y*y < 1;
    }).count();
    System.out.println("Pi is roughly " + 4.0 * count / NUM_SAMPLES);
    DataFrame処理:Sparkでは、DataFrameは列名の分散式集合であり、この集合では様々な関係操作が可能である.以下の例:logファイルのerror情報を読み出す.一列に処理します.RDD、StructTypeでData Frameに変換します.ラインの列にERRORが含まれている行をフィルタリングし、カウントします.
    // Creates a DataFrame having a single column named "line"
    JavaRDD textFile = sc.textFile("hdfs://...");
    JavaRDD rowRDD = textFile.map(RowFactory::create);
    List fields = Arrays.asList(
      DataTypes.createStructField("line", DataTypes.StringType, true));
    StructType schema = DataTypes.createStructType(fields);
    DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
    
    DataFrame errors = df.filter(col("line").like("%ERROR%"));
    // Counts all the errors
    errors.count();
    // Counts errors mentioning MySQL
    errors.filter(col("line").like("%MySQL%")).count();
    // Fetches the MySQL errors as an array of strings
    errors.filter(col("line").like("%MySQL%")).collect();
    DataFrame処理:sql Contectを使ってmysqlデータベースのテーブルを読み取り、DataFrameを返します.グループによって、各年齢層の人数を計算します.結果をJson形式に保存します.
    // Creates a DataFrame based on a table named "people"
    // stored in a MySQL database.
    String url =
      "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword";
    DataFrame df = sqlContext
      .read()
      .format("jdbc")
      .option("url", url)
      .option("dbtable", "people")
      .load();
    
    // Looks the schema of this DataFrame.
    df.printSchema();
    
    // Counts people by age
    DataFrame countsByAge = df.groupBy("age").count();
    countsByAge.show();
    
    // Saves countsByAge to S3 in the JSON format.
    countsByAge.write().format("json").save("s3a://...");
    マシン学習API:sparkのマシン学習ライブラリmllibが提供されました.
  • 多くの分布式mlアルゴリズムは、特徴抽出、分類、回帰、クラスター、推奨などのタスク
  • を含む.
  • は、ワークフローを構築するためのmlパイプ、パラメータを最適化するためのcrossvalidator、およびモデルを保存およびロードするためのモデルの耐久性などのツールを提供する.
  • 次の例:データRDDを読んで、ラベルとフィーチャーの列があるDataFrameに変換します.LRモデルに入力してトレーニングします.モデルトレーニングが終了したら、各点のlabelを予測します.
    // Every record of this DataFrame contains the label and
    // features represented by a vector.
    StructType schema = new StructType(new StructField[]{
      new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
      new StructField("features", new VectorUDT(), false, Metadata.empty()),
    });
    DataFrame df = jsql.createDataFrame(data, schema);
    
    // Set parameters for the algorithm.
    // Here, we limit the number of iterations to 10.
    LogisticRegression lr = new LogisticRegression().setMaxIter(10);
    
    // Fit the model to the data.
    LogisticRegressionModel model = lr.fit(df);
    
    // Inspect the model: get the feature weights.
    Vector weights = model.weights();
    
    // Given a dataset, predict each point's label, and show the results.
    model.transform(df).show();