spark sqlとcatalystの例をソースコード分析と組み合わせる


今週はspark sqlとcatalystに注目し、関連資料と各種コミュニティ内の動態を見て、ここでまとめました.
spark sqlとcatalystはなぜ現れたのですか?これはsharkから言えば,sharkはspark生態においてhadoop生態におけるhiveに相当するが,sharkはhiveとの互換性を実現するためにhql解析,論理クエリ計画の翻訳と最適化を用いて物理クエリ計画を生成する.hiveは物理実行計画をMRジョブに変換するがsharkは物理実行計画をRDD操作に変換する.したがってsparkチームはspark sqlとcatalystを導入し,spark上のsql機能をhiveからできるだけ独立させた.
spark sqlとcatalystは何をしましたか?現在のspark sqlバージョンはhive解析後に抽象構文ツリーを生成してからhiveの仕事を引き継いだ.物理クエリーの生成計画を最適化するにはcatalystが完了します.spark sqlはまた、簡素なsql parserとscala dslのセットを提供するため、ユーザーはhiveから完全に離れることができる.
次にspark sqlドキュメントに示されている例を見てみましょう.この例の各文をソースコードと組み合わせて分析します.例は次のとおりです.
      val sqlc=new org.apache.spark.sql.SQLContext(sc)
      import sqlc._
      case class Person(name:String,age:Int)
      val people=sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))
      people.registerAsTable("people")
      val teenagers=sql("select name from people where age>=13 and age<=19")
      teenagers.map(t=>"name: "+t(0)).collect().foreach(println)

val sqlc=new org.apache.spark.sql.SQLContext(sc)

まずscによってsqlのコンテキストを構築する.
case class Person(name:String,age:Int)

caseクラスを新規作成し、personのschema情報を定義します.personのschema情報はstringの名前とintの年齢であることがわかります.
val people=sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))

次に、すぐに行うsqlクエリで使用されるデータの各レコードにpersonインスタンスを構築します(ここで各レコードにperson情報を構築するのはスペースがもったいないのではないでしょうか).
people.registerAsTable("people")

上記の文は、peopleがMappeddDであるためregisterAsTableという方法はありませんが、sqlコンテキストクラスでは次の方法が定義されています.
  /**
   * Creates a SchemaRDD from an RDD of case classes.
   *       ,   RDD  SchemaRDD       ,  RDD       ,         
   *   :
   * val people=sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))
   * people.registerAsTable("people")
   *          ,people MappedRdd,       SchemaRDD    ,
   *            people   SchemaRDD,ExistingRdd.fromProductRdd(rdd)      
   * @group userf
   */
  implicit def createSchemaRDD[A <: product:="" typetag="" rdd="" new="" schemardd="" sparklogicalplan=""/>
この方法はpeopleを暗黙的にSchemaRDDに変換し、registerAsTableを呼び出してtable情報をcatalogに格納し、SchemaRDDにはsparkで自分で定義したdslもある.「schemaRDD.select('a,'b+'c,'d as'aliasedName)」など、sql文のキーワードのような関数をSchemaRDDインスタンスで直接呼び出すことができます.
val teenagers=sql("select name from people where age>=13 and age<=19")

ここでは、sqlコンテキストクラスのsqlメソッドを用いて、sqlが返すレコードセットを実行するRDDを得、非常に重要なsqlメソッドを注意深く分析します.
  /**
   * Executes a SQL query using Spark, returning the result as a SchemaRDD.
   *     sql  ,      sql         
   * @group userf
   */
  def sql(sqlText: String): SchemaRDD = {
    /*
     *     sqlText   string,        RDD,  :
     * val teenagers=sql("select name from people where age>=13 and age<=19")
     * parseSql(sqlText)          
     * */
    val result = new SchemaRDD(this, parseSql(sqlText))
    
    // We force query optimization to happen right away instead of letting it happen lazily like
    // when using the query DSL.  This is so DDL commands behave as expected.  This is only
    // generates the RDD lineage for DML queries, but do not perform any execution.
    /*
     * toRdd         ,   result       ,    RDD,    RDD row   
     * */
    result.queryExecution.toRdd
    result
  }

ここで実行をトリガするのは.まずresultを見てqueryExecutionは、sqlコンテキストからexecutePlanを呼び出し、論理実行計画として入力します.
  /*     :
   *           ,              ,      toRDD  (       ,
   * QueryExecution   lazy  ,         toRDD         ),
   * toRDD            ,
   * */
  protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
    //QueryExecution      
    new this.QueryExecution { val logical = plan }

この関数ではQueryExecutionはQueryExecutionクラスの多くの文を実行しますが、lazyで明記されているのでnew this.QueryExecutionは実際には実行されていません.最後にtoRddメソッドが実行されると、これらの文の実際の実行がトリガーされます.lazyと明記された文に関連するクラスの多くはcatalystモジュールにあり、分析論理実行計画、論理実行計画の最適化、最後のポリシーの選択と物理実行計画の生成が含まれています.toRddは物理実行計画のexecute関数を呼び出し、sql文を「実行」させるだけです.
    lazy val analyzed = analyzer(logical)
    lazy val optimizedPlan = optimizer(analyzed)
    // TODO: Don't just pick the first one...
    lazy val sparkPlan = planner(optimizedPlan).next()
    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
       
    /** Internal version of the RDD. Avoids copies and has no schema */
    lazy val toRdd: RDD[Row] = executedPlan.execute()

以上の文は後でcatalystを分析し、ここでいくつかの不明な点は、次に分析する必要がある:1、データはここでExistingRddによって入力され、登録tableの暗黙的な変換では、直接ExistingRddをクエリー計画のリーフノードとして使用したが、table scan操作はどこにあるのか、この例はscan操作がないため、データはpersonのインスタンスセットにある.
2,sql文を論理実行計画にどのように解析するか,ここでは主にsqlparserファイルを見る.
3,catalystモジュールに関連するanalyzer,optimizer,plannerの詳細はどうであるか.(本文完)