【Spark SQL】-データを読み、簡単な照会を行う
5791 ワード
1.試験データ
2.コード SaprkSession を作成します。 DSL SQL
: people.json
:
{"name":"Michael", "age":12}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
{"name":"kafak", "age":19}
ここはIDEAローカルで運行していますので、マスターはLocalを設定しています。2.コード
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("SqlDemo02");
JavaSparkContext sc = new JavaSparkContext(conf);
//
sc.setLogLevel("WARN");
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
/* appname */
DSL(spark);
System.out.println("*************************************");
SQL(spark);
// !
spark.stop();
sc.stop();
private static void DSL(SparkSession spark) {
/**
* 1. , Dataset
* 2. : json , csv , , txt ....
* 3. resources
* 4. json("people.json")
*/
Dataset df = spark.read().json("SparkSql/src/main/resources/people.json");
// 1.
df.show();
/**
* +---+-------+
* |age| name|
* +---+-------+
* | 12|Michael|
* | 30| Andy|
* | 19| Justin|
* | 19| kafak|
* +---+-------+
*/
// 2.
df.printSchema();
/**
* root
* |-- age: long (nullable = true)
* |-- name: string (nullable = true)
*/
// 3. : Select 1 , 2 from people
/**
* select(" 1" , " 2")
*/
df.select("age").show();
/**
* |age|
* +---+
* | 12|
* | 30|
* | 19|
* | 19|
* +---+
*/
/**
* select(Column 1 , Column 2)
* : Col
* : import static org.apache.spark.sql.functions.col;
* Col , .
* functions.col
*
* Col( ) -> Column
* Column
* plus(int n) n
* gt(int n) n
*
*/
df.select(col("name"), col("age").plus(1)).show();
/**
* +-------+---------+
* | name|(age + 1)|
* +-------+---------+
* |Michael| 13|
* | Andy| 31|
* | Justin| 20|
* | kafak| 20|
* +-------+---------+
*/
// 4.
/**
* -> 21
* -> 21 col("age").lt(21)
* , : filter("age > 21")
* : age name age int, name String
* "age > 21" , "name > 21"
* ,
*/
df.filter(col("age").gt(21)).show();
/**
* +---+----+
* |age|name|
* +---+----+
* | 30|Andy|
* +---+----+
*/
// 5.
/**
* : groupBy("age").show() , select("age").show()
* -> , Count
* -> max()
* -> df.groupBy(col("age")).max().show();
*/
df.groupBy(col("age")).count().show();
/**
* +---+-----+
* |age|count|
* +---+-----+
* | 19| 2|
* | 12| 1|
* | 30| 1|
* +---+-----+
*/
}
// SQL
private static void SQL(SparkSession spark) throws AnalysisException {
//
Dataset df = spark.read().json("SparkSql/src/main/resources/people.json");
// 1.
/**
* ! :
* createOrReplaceTempView :
* -> , SparkSession SaprkSession.stop
* SparkSession , , SparkSesion createGlobalTempView
* createGlobalTempView : , Spark , SparkSession
*/
// 1.
df.createOrReplaceTempView("people");
// 2.
Dataset sqlDF_01 = spark.sql("SELECT * FROM people");
sqlDF_01.show();
// 3.
df.createGlobalTempView("people");
// : , global_temp.
// global_temp .
Dataset sqlDF_02 = spark.sql("SELECT * FROM global_temp.people");
sqlDF_02.show();
// 1.
/**
* , SparkContext (?) , ,
* , SparkSession , SparkContext
*/
SparkSession sparkSession = spark.newSession();
//
Dataset sqlDF_03 = spark.sql("SELECT * FROM global_temp.people");
Dataset sqlDF_04 = spark.sql("SELECT * FROM people");
sqlDF_03.show();
sqlDF_04.show();
}