Spark SQL関数操作

9235 ワード

Spark内蔵関数
Spark SQLにおける内蔵関数を使ってデータを分析すると、Spark SQL APIが異なるのは、DataFrameにおける内蔵関数動作の結果、Columnオブジェクトに戻ることであり、DataFrameは生まれつき「A distributed collection of data organized inmed columns」であり、これはデータの複雑な分析の基礎を提供しています。私たちはDataFrameを操作する方法で、いつでも内蔵関数を呼び出して業務に必要な処理を行うことができます。これは私たちが添付ファイルを構築する業務ロジックにおいて、不必要な時間消費を大幅に減らすことができます。これはエンジニアの生産力を高めるために非常に価値のあるSpark 1.5.xから大量の内蔵関数を提供しています。また、max、mean、min、sum、avg、explode、size、sort_があります。array、day、to_date、abs、acros、asin、apanは全体として5つの基本タイプを含んでいます。
  • 集計関数、例えば、count Distinct、sumDiscinctなど。
  • セットの関数、例えばsort_array、explodeなどの
  • 日、時間関数、例えばhour、quarter、next_day
  • 数学関数、例えばasin、aton、sqrt、tan、roundなど。
  • 開窓関数、例えばrowNumberなどの
  • 文字列関数、concat、format_number、rexexp_extract
  • 他の関数、isNaN、shar、randn、calUDF
  • Hiveの下の単行の単語の統計
    select t.wd ,count(t.wd) as count from (select explode(split(line," ")) as wd from word) t group by t.wd;
    
    プログラムコードを作成するときに関数を呼び出すと、functionsを導入します。
    import org.apache.spark.sql.types.{DataTypes, StructType}
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.{Row, SQLContext, functions}
    
    object SparkSQLFunctionOps {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkSQLFunctionOps").setMaster("local");
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val linesRDD = sc.textFile("E:/test/scala/sql-rdd-source.txt")
        val rowRDD = linesRDD.map(line => {
          val splits = line.split(",")
          Row(splits(0).trim.toInt,splits(1).trim,splits(2).trim.toInt,splits(3).trim.toInt)
        })
        val structType = StructType(Array(
          DataTypes.createStructField("id",DataTypes.IntegerType,true),
          DataTypes.createStructField("name",DataTypes.StringType,true),
          DataTypes.createStructField("age",DataTypes.IntegerType,true),
          DataTypes.createStructField("height",DataTypes.IntegerType,true)
        ))
    
        val df = sqlContext.createDataFrame(rowRDD,structType)
        df.registerTempTable("person")
        df.show()
        /**
          *     df        
          *            ,   
          * height    
          * */
        sqlContext.sql("select avg(age) from person").show()
        sqlContext.sql("select max(age) from person").show()
        sqlContext.sql("select sum(height) from person").show()
        sc.stop()
      }
    }
    
    Javaバージョン
    public class SparkSQLFunctionJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf( ).setAppName(SparkSQLFunctionJava.class.getSimpleName()).setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
       JavaRDD linesRDD = sc.textFile("E:/test/scala/sql-rdd-source.txt");
    
        JavaRDD rowRDD = linesRDD.map(new Function() {
            @Override
            public Row call(String s) throws Exception {
                String splits[] = s.split(",");
                return  RowFactory.create(Integer.valueOf(splits[0].trim()),splits[1].trim(),Integer.valueOf(splits[2].trim()),Integer.valueOf(splits[3].trim()));
            }
        });
        StructField structFields[] = new StructField[4];
        structFields[0] = DataTypes.createStructField("id",DataTypes.IntegerType,true);
        structFields[1] = DataTypes.createStructField("name",DataTypes.StringType,true);
        structFields[2] = DataTypes.createStructField("age",DataTypes.IntegerType,true);
        structFields[3] = DataTypes.createStructField("height",DataTypes.IntegerType,true);
        StructType structType = new StructType(structFields);
        DataFrame dataFrame = sqlContext.createDataFrame(rowRDD, structType);
        dataFrame.show();
        dataFrame.registerTempTable("person");
        /**
         *     df        
         *            ,   
         * height    
         * */
        sqlContext.sql("select avg(age) from person").show();
        sqlContext.sql("select max(age) from person").show();
        sqlContext.sql("select sum(height) from person").show();
        sc.close();
    }
    }
    
    Spark運転ログのレベルを変更します。
    cp logs 4 j.properties.template logs 4 j.properties vim conf/logl 4 j.properties INFOをERRORレベルに変更するにはSparkクラスタを再起動して、それを有効にする必要があります。
    HIVE設定カラム構造表示
    set hive.ci.print.header=true;
    Spark SQL開窓関数
    1、Spark 1.5.xバージョン以降、Spark SQLとDataFrameに窓開け関数を導入しました。例えば一番経典的なのは私達のrow_です。number()は、パケットのtopnを取るロジックを実現させてくれます。
    2、一つのケースを作ってtopnの値を取る(Sparkの窓開け関数を利用して)、まだ印象があるかどうかは分かりませんが、私たちは以前最初にtopnの計算をしたことがあります。しかし、今はSpark SQLを使って、とても便利です。
    /**
      * Spark SQL       row_number
      * */
    object SparkSQLOpenWindowFunction {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkSQLOpenWindowFunction").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new HiveContext(sc)
        val linesRDD = sc.textFile("E:/test/scala/topn.txt")
        val rowRDD = linesRDD.map(line =>{
          val splits = line.split(" ")
          Row(splits(0).trim,splits(1).trim.toInt)
        })
        val structType = DataTypes.createStructType(Array(
          DataTypes.createStructField("class",DataTypes.StringType,true),
          DataTypes.createStructField("score",DataTypes.IntegerType,true)
        ))
        val df = sqlContext.createDataFrame(rowRDD,structType)
        df.registerTempTable("stu_score")
        /**
          *     
          *    class     ,     class        TOP3
          * */
         val topNDF = sqlContext.sql("select temp.* from (select *, row_number() over(partition by class order by score desc) rank from stu_score ) temp where temp.rank < 4")
         topNDF.show()
        sc.stop()
      }
    }
    
    Javaバージョン
    public class SparkSQLOpenWindowFunctionJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName(SparkSQLOpenWindowFunctionJava.class.getSimpleName()).setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        HiveContext sqlContext = new HiveContext(sc);
        JavaRDD linesRDD = sc.textFile("E:/test/scala/topn.txt");
    
        JavaRDD rowRDD = linesRDD.map(new Function() {
            @Override
            public Row call(String s) throws Exception {
                return RowFactory.create(s.split(" ")[0].trim(),Integer.valueOf(s.split(" ")[1].trim()));
            }
        });
        StructField structFields[] = new StructField[2];
        structFields[0] = DataTypes.createStructField("class",DataTypes.StringType,true);
        structFields[1] = DataTypes.createStructField("score",DataTypes.IntegerType,true);
        StructType structType = new StructType(structFields);
        DataFrame df = sqlContext.createDataFrame(rowRDD, structType);
        df.registerTempTable("stu_score");
        /**
         *     
         *    class     ,     class        TOP3
         * */
        DataFrame dataFrame = null;
        dataFrame = sqlContext.sql("select temp.* from (select *, row_number() over(partition by class order by score desc) rank from stu_score ) temp where temp.rank < 4");
        dataFrame.show();
        sc.close();
    }
    }
    
    spark-sqlで運行します。
  • まずhiveの中で、表を作って、create table topn(class string、score int)row format delimited fields terminated by';
  • データをテーブルに導入します。ロードdata local inpath'/opt/data/spark/topn.txt'
  • はその後、窓開け関数を利用してグループ化順序付けができます。select temp.x from(select*、rowunumber()over(partition class order by score desc)rank from topn)temp.rank<4;
  • UDFカスタム関数
    1、UDF:User Defined Function。ユーザー定義関数。私達が通常言っているUDFカスタム関数は、一対一の関係です。一つの入力パラメータと一つの出力パラメータはUDFを作成するステップです。1、まずカスタムの関数func 2を作成して、sql Context.df().register(名前をつけてください。func_)3、私達のsqlで直接使えばいいです。
    object SparkSQLUDFOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkSQLUDFOps").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
    
        val linesRDD = sc.textFile("E:/test/scala/sql-rdd-source.txt")
        val rowRDD = linesRDD.map(line => {
            val splits = line.split(", ")
            Row(splits(0).trim.toInt, splits(1).trim, splits(2).trim.toInt, splits(3).trim.toInt)
        })
        val structType = StructType(Array(
            DataTypes.createStructField("id", DataTypes.IntegerType, true),
            DataTypes.createStructField("name", DataTypes.StringType, true),
            DataTypes.createStructField("age", DataTypes.IntegerType, true),
            DataTypes.createStructField("height", DataTypes.IntegerType, true)
        ))
        val df = sqlContext.createDataFrame(rowRDD, structType)
        df.registerTempTable("person")
        //2、      UDF
        /**
          *          
          */
        sqlContext.udf.register("myLen", myLen _)
        sqlContext.udf.register("len", (str:String, len:Int) => str.length > len)
        //3、    
        sqlContext.sql("select id, name, myLen(name) as len from person where len(name, 5)").show()
        sc.stop()
    }
    
    //1、          ,          
    def myLen(str: String) = str.length
    
    )