SparkSQL操作RDDの2つの方式の比較例

6916 ワード

前言
GitHubアドレス:https://github.com/guofei1219
背景
新しいチャネルの入力数の統計
SparkSQL操作RDDの2つの方式の比較
1.反射を用いてSchemaタイプを推定し、具体的な解析は以下の公式サイトを参照して説明する.
The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.
case class blb_intpc_info(chnl_code:String,id_num:String)

2.プログラミング方式を用いてSchemaタイプを作成し、具体的な解析は以下の公式サイトの説明を参照する.
When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.
>1.Create an RDD of Rows from the original RDD;
2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
2.Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.
val structTypes = StructType(Array(
  StructField("chnl_code", StringType, true),
  StructField("id_num", StringType, true)
))

比較まとめ
1.case classテンプレートクラスモードの可視化が良い2.case classテンプレートクラスパラメータの上限は22個で、フィールドが多い場合は3.プログラミング方式は日常開発に適している
コード実装
ソースデータフォーマット
1列目がチャネルコード、2列目が入力ID
306DC4246 411324199209142831 306DC423A 360124199011241838 306DC423D 440802198010290019 306DC4226 612328197403120016 306DC4201 452629199104050312 306DC4201 350212198505025514
はんしゃモード
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  *           
  * Created by Michael on 2016/11/29.
  */
object Custmer_Statistics_CaseClass {

  /**
    *              
    * @param chnl_code
    * @param id_num
    */
  case class blb_intpc_info(chnl_code:String,id_num:String)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Custmer_Statistics_CaseClass").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //RDD     DataFrame
    import sqlContext.implicits._
    //      
    val blb_intpc_infoDF = sc.textFile("C:/work/ideabench/SparkSQL/data/channel/blb_intpc_info_10000_2.txt")
      .map(_.split("\\t"))
      .map(d => blb_intpc_info(d(0), d(1))).toDF()

    //   
    blb_intpc_infoDF.registerTempTable("blb_intpc_info")

    /**
      *                    
      */
    blb_intpc_infoDF.registerTempTable("blb_intpc_info")
    sqlContext.sql("" +
      "select chnl_code,count(*) as intpc_sum " +
      "from blb_intpc_info " +
      "group by chnl_code").toDF().sort($"intpc_sum".desc).show()
  }

}

実行結果
+---------+---------+
|chnl_code|intpc_sum|
+---------+---------+
|306DC421E|      631|
|306DC4201|      603|
|306DC422B|      472|
|306DC4221|      326|
|306DC425E|      280|
|306DC4237|      277|
|306DC4210|      238|
|306DC4246|      236|
|306DC4229|      223|
|306DC4257|      202|
|306DC420E|      197|
|306DC4215|      183|
|306DC421F|      176|
|306DC425A|      156|
|306DC4251|      140|
|306DC4202|      131|
|306DC424D|      125|
|306DC4226|      122|
|306DC422A|      112|
|306DC422D|      108|

プログラミングモード
クエリーHiveメタデータベースHive指定テーブルフィールド情報を取得します.注意:Hiveのメタデータの表の構造に対して理解していない学友はgoogleでいくつかの招待状を探して見てあるいは本文の末尾を参照して文章を参考にして説明しないで、直接コードに行きます
public static String getHiveMetaData(String hiveTableName) {
    Connection conn = getConn();
    String sql = "SELECT
" + " #TBLS.`TBL_NAME`,
" + " #
" + " COLUMNS_V2.`COLUMN_NAME`
" + " #
" + " #COLUMNS_V2.`TYPE_NAME` #
" + "FROM
" + " TBLS #
" + " LEFT JOIN SDS #
" + " ON TBLS.SD_ID = SDS.SD_ID
" + " LEFT JOIN CDS
" + " ON SDS.CD_ID = CDS.CD_ID
" + " LEFT JOIN COLUMNS_V2 #
" + " ON CDS.CD_ID = COLUMNS_V2.CD_ID
" + "WHERE TBLS.`TBL_NAME` = \"gd_py_corp_sharehd_info\""; PreparedStatement pstmt; String result=""; try { pstmt = (PreparedStatement)conn.prepareStatement(sql); ResultSet rs = pstmt.executeQuery(); int col = rs.getMetaData().getColumnCount(); while (rs.next()) { for (int i = 1; i <= col; i++) { result = result + rs.getString(i) + "\t"; } } } catch (SQLException e) { e.printStackTrace(); } return result; }
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import utils.DataUtils

/**
  *           
  * Created by Michael on 2016/11/29.
  */
object Custmer_Statistics_StructType {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Custmer_Statistics_StructType").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    //RDD     DataFrame
    import sqlContext.implicits._
    //      
    val blb_intpc_infoRow = sc.textFile("C:/work/ideabench/SparkSQL/data/channel/blb_intpc_info_10000_2.txt")
      .map(_.split("\\t"))
      .map(d => {
        Row(d(0),d(1))
      })

    //Hive        
    val schemaString = DataUtils.getHiveMetaData("blb_intpc_info")
    val schema =StructType(schemaString.split("\\t")
      .map(fieldName => StructField(fieldName, StringType, true)))

    val blb_intpc_infoDF = sqlContext.createDataFrame(blb_intpc_infoRow,schema)
    //   
    blb_intpc_infoDF.registerTempTable("blb_intpc_info")

    /**
      *                    
      */
    blb_intpc_infoDF.registerTempTable("blb_intpc_info")
    sqlContext.sql("" +
      "select chnl_code,count(*) as intpc_sum " +
      "from blb_intpc_info " +
      "group by chnl_code").toDF().sort($"intpc_sum".desc).show()
  }

}

参考記事
1.hiveメタデータ:http://blog.csdn.net/wf1982/article/details/6644258 2.SparkSQL公式サイト:http://spark.apache.org/docs/1.6.0/sql-programming-guide.html