Spark SQLプログラミングガイド

7162 ワード

Spark SQLプログラミングガイド
概要
Spark SQLでは、SparkでSQLを実行するか、HiveQLの関係クエリー式をサポートします.そのコアコンポーネントは、新しいRDDタイプのJavaSchemaRDDです.JavaSchemaRDDは、Rowオブジェクトと、この行の各列を表すデータ型のschemaからなる.JavaSchemaRDDは、従来のリレーショナル・データベースのテーブルに似ています.JavaSchemaRDDは、既存のRDD、Parquetファイル、JSONデータセット、またはHiveSQLを実行することによってApache Hiveに格納されたデータを作成することができる.
Spark SQLは現在alphaコンポーネントです.APIの変化を最小限に抑えることができますが、いくつかのAPIはその後のリリースで変更されます.
はじめに
Sparkでは、すべてのリレーショナル関数機能のエントリポイントがJavaSQLContextクラスです.あるいは彼の子.基本的なJavaSQLContextを作成するには、JavaSparkContextが必要です.
JavaSparkContext sc = ...; // An existing JavaSparkContext.
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);

データソース
Spark SQLは、JavaSchemaRDDインタフェースを介してさまざまなデータソースを操作することをサポートします.1つのデータセットがロードされ、テーブルに登録されたり、他のソースからのデータ接続に登録されたりすることができます.
RDDs
Spark SQLがサポートするテーブルの1つのタイプはJavaBeansのRDDです.BeanInfoはこのテーブルのschemaを定義します.現在、Spark SQLでは、ListsやArraysなどのネストまたは複雑なタイプを含むJavaBeansはサポートされていません.Serializableを実装し、そのすべてのフィールドにgettersメソッドとsettersメソッドがあるクラスクラスを作成することでJavaBeansを作成できます.
public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

既存のRDDにschemaを適用し、applySchemaを呼び出し、このJavaBeanのクラスオブジェクトを提供することができる.
// sc is an existing JavaSparkContext.
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc)

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
  new Function<String, Person>() {
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");

      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));

      return person;
    }
  });

// Apply a schema to an RDD of JavaBeans and register it as a table.
JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class);
schemaPeople.registerAsTable("people");

// SQL can be run over RDDs that have been registered as tables.
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

Spark SQLは現在、非常に簡単なSQL解析器を使用しています.ユーザーがより完全なSQL言語を取得するには、HiveContextが提供するHiveQLサポートを参照してください.
Parquet Files
Parquetはcolumnar形式であり、多くの他のデータ処理システムによってサポートされている.Spark SQLは、Parquetファイルの読み書きをサポートし、元のデータのSchemaを自動的に保存します.次の例では、データを使用します.
// sqlContext from the previous example is used in this example.

JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.

// JavaSchemaRDDs can be saved as Parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet");

// Read in the Parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

JSON Datasets
Spark SQLは、JSONデータセットのschemaを自動的に推定し、JavaSchemaRDDにロードすることができます.この変換は、JavaSQLContextの2つの方法のうちの1つで行うことができます.
  • jsonFile-ディレクトリの下のファイルからデータをロードします.このファイルの各行はJSONオブジェクトです.
  • jsonRdd-既存のRDDからデータがロードされ、このRDDの各要素はJSONオブジェクトを含むStringである.
    // sc is an existing JavaSparkContext.
    JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
    
    // A JSON dataset is pointed to by path.
    // The path can be either a single text file or a directory storing text files.
    String path = "examples/src/main/resources/people.json";
    // Create a JavaSchemaRDD from the file(s) pointed to by path
    JavaSchemaRDD people = sqlContext.jsonFile(path);
    
    // The inferred schema can be visualized using the printSchema() method.
    people.printSchema();
    // root
    //  |-- age: IntegerType
    //  |-- name: StringType
    
    // Register this JavaSchemaRDD as a table.
    people.registerAsTable("people");
    
    // SQL statements can be run by using the sql methods provided by sqlContext.
    JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
    
    // Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
    // an RDD[String] storing one JSON object per string.
    List<String> jsonData = Arrays.asList(
      "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
    JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
    JavaSchemaRDD anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD);
    

  • Hive Tables
    Spark SQLでは、apache Hiveに格納されているデータの読み取りと書き込みもサポートされています.しかし、Hiveは非常に大きな依存を持っているため、Sparkのデフォルトの宝には含まれていません.Hiveを使用するには、「SPARK_HIVE=true sbt/sbt assembly/assembly'(またはMavenに対して-Phiveを使用します).このコマンドはHiveを含むassemblyを構築します.このHive assemblyは、Hiveに格納されているデータにアクセスするためにHiveのシーケンス化パケットおよび正方シーケンス化パケット(SerDes)にアクセスする必要があるため、すべてのワークノード上に配置する必要があります.
    confディレクトリの下のhive-siteを通ることができます.xmlファイルはHive構成を完了します.
    Hiveと連携するには、JavaHiveContextを構築する必要があります.JavaSQLContextを継承し、MetaStoreのテーブルを発見し、HiveQLを使用してクエリーを作成する機能を追加します.さらに、sqlメソッドに加えて、JavaHiveContextメソッドは、クエリがHiveQL表現を使用することを可能にするhqlメソッドを提供します.
    Writing Language-Integrated Relational Queries
    Language-integratedクエリは現在Scalaでのみサポートされています.
    Spark SQLでは、レルム固有の言語を使用してクエリーを記述することもサポートされています.また、上記の例のデータを使用します.
    // sc is an existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // Importing the SQL context gives access to all the public SQL functions and implicit conversions.
    import sqlContext._
    val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
    
    // The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
    val teenagers = people.where('age >= 10).where('age <= 19).select('name)
    teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
    

    DSLは、基本テーブルのテーブルを表すためにScalaで得られたタグを使用し、接頭辞'を使用して識別される.SQLがエンジン評価を実行するようにマークされた式を暗黙的に変換します.これらの機能をサポートする完了リストは、ScalaDocで見つけることができます.