【Spark SQL】-データを読み、簡単な照会を行う


1.試験データ
    : people.json
   :
{"name":"Michael", "age":12}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
{"name":"kafak", "age":19}
ここはIDEAローカルで運行していますので、マスターはLocalを設定しています。
2.コード
  • SaprkSession
  • を作成します。
     SparkConf conf = new SparkConf()
                    .setMaster("local")
                    .setAppName("SqlDemo02");
            JavaSparkContext sc = new JavaSparkContext(conf);
            //                        
            sc.setLogLevel("WARN");
    
            SparkSession spark = SparkSession
                    .builder()
                    .appName("Java Spark SQL basic example")
                    .config("spark.some.config.option", "some-value")
                    .getOrCreate();
    
            /*             appname           */
    
            DSL(spark);
            System.out.println("*************************************");
            SQL(spark);
    
            //         !
            spark.stop();
            sc.stop();
    
  • DSL
  •  private static void DSL(SparkSession spark) {
    
            /**
             * 1.    ,           Dataset    
             * 2.        : json , csv ,         , txt   ....
             * 3.      resources   
             * 4.        json("people.json")     
             */
            Dataset df = spark.read().json("SparkSql/src/main/resources/people.json");
    
            // 1.       
            df.show();
            /**
             * +---+-------+
             * |age|   name|
             * +---+-------+
             * | 12|Michael|
             * | 30|   Andy|
             * | 19| Justin|
             * | 19|  kafak|
             * +---+-------+
             */
    
    
            // 2.       
            df.printSchema();
            /**
             *  root
             *  |-- age: long (nullable = true)
             *  |-- name: string (nullable = true)
             */
    
            // 3.   :     Select   1 ,   2 from people
            /**
             * select("  1" , "  2")
             */
            df.select("age").show();
            /**
             * |age|
             * +---+
             * | 12|
             * | 30|
             * | 19|
             * | 19|
             * +---+
             */
    
            /**
             * select(Column  1 , Column  2)
             *      : Col                    
             *     : import static org.apache.spark.sql.functions.col;
             *    Col         ,                   .   
             *      functions.col
             *
             * Col(  ) ->    Column   
             * Column                   
             * plus(int n)            n    
             * gt(int n)       n    
             *   
             */
            df.select(col("name"), col("age").plus(1)).show();
            /**
             * +-------+---------+
             * |   name|(age + 1)|
             * +-------+---------+
             * |Michael|       13|
             * |   Andy|       31|
             * | Justin|       20|
             * |  kafak|       20|
             * +-------+---------+
             */
    
            // 4.     
            /**
             * ->       21      
             * ->          21      col("age").lt(21)
             *   ,          : filter("age > 21")
             *        : age   name          age  int, name String
             *   "age > 21"         ,       "name > 21"      
             *            ,     
             */
            df.filter(col("age").gt(21)).show();
            /**
             * +---+----+
             * |age|name|
             * +---+----+
             * | 30|Andy|
             * +---+----+
             */
    
            // 5.    
            /**
             *    : groupBy("age").show()       ,                 select("age").show()
             * ->              ,        Count        
             * ->              max()             
             * ->  df.groupBy(col("age")).max().show();
             */
            df.groupBy(col("age")).count().show();
            /**
             * +---+-----+
             * |age|count|
             * +---+-----+
             * | 19|    2|
             * | 12|    1|
             * | 30|    1|
             * +---+-----+
             */
    
        }
    
  • SQL
  •  //   SQL      
        private static void SQL(SparkSession spark) throws AnalysisException {
    
            //     
            Dataset df = spark.read().json("SparkSql/src/main/resources/people.json");
    
            // 1.       
            /**
             *    ! :              
             * createOrReplaceTempView :
             * ->                   ,                SparkSession        SaprkSession.stop
             *    SparkSession   ,            ,         SparkSesion           createGlobalTempView
             * createGlobalTempView :         ,   Spark                 ,       SparkSession  
             */
            // 1.       
            df.createOrReplaceTempView("people");
    
            // 2.       
            Dataset sqlDF_01 = spark.sql("SELECT * FROM people");
            sqlDF_01.show();
    
            // 3.          
            df.createGlobalTempView("people");
            //       :           ,              global_temp.      
            //                             global_temp    .            
            Dataset sqlDF_02 = spark.sql("SELECT * FROM global_temp.people");
            sqlDF_02.show();
    
            // 1.        
            /**
             *         ,      SparkContext   (?)               ,      ,          
             *      ,           SparkSession ,   SparkContext     
             */
            SparkSession sparkSession = spark.newSession();
            //       
            Dataset sqlDF_03 = spark.sql("SELECT * FROM global_temp.people");
            Dataset sqlDF_04 = spark.sql("SELECT * FROM people");
            sqlDF_03.show();
            sqlDF_04.show();
        }