pysparkはpysparkを勉強します.sql module
2254 ワード
pyspark.sql.SparkSession
役割:DFを作成し、DFを1枚のtableに登録し、table上でsqlを実行し、カラムストレージファイル(parquetフォーマット)を読み出す parquetファイルはカラム構造で格納されたデータであることに注意し、自分の理解に基づいて、このフォーマットはデータを(r,d,value)に変換してデータを圧縮することができ、読み取り記憶は速度を速めることができ、私たちの開発過程における中間データの格納再読み取りに適している.
作成方法実際、SparkSession、SparkContext、SQLContext、HiveContextは文書を見ていると、これらのクラスの方法の重なりが高く、感覚機能の多くが似ていることがわかります.https://blog.csdn.net/qq_21383435/article/details/77371142SparkSessionと他のいくつかの違いを紹介した. *要約の違いは、SparkSessionはspark 2.0がユーザーに提供する統一的な操作入口であり、SparkContext、SQLContext、HiveContextが実現できる機能であり、SparkSessionはすべて含まなければならない.したがって、プログラムを書くときにバージョンが2.0に達した場合は、SparkSession操作を使用することをお勧めします.
SparkSession method
createDataFrame (data,schema) dataはlist、rdd、またはpandasのDF であってもよい schemaは、空、またはカラム名、または表示される定義前のdataのタイプであってもよい. dataタイプの使用方法の定義に注意:pysparkを使用する.sql.typesのタイプ schemaはStructTypeを宣言する必要があり、StructFieldのリストからなり、StructFiledのデータ構造は:(string,dataType,nullable,metaData)、栗を挙げる: data=[("zhaoli",25),("zhaoxiaoli",18)]変換するのは(string,int)を要素とするlistであるため、schemaを宣言する際にstringのStructFieldとint型のStrucFieldを宣言する必要がある.
pyspark.sql.typesのすべてのタイプ NullTypeはクラス名であるため、StructFieldで宣言する場合、他のタイプ と同様に実例化する必要がある. StringType BooleanType TimestampType IntegerType ,LongType, FloatType ,ArrayType ,MapType
df、table変換はsparkを用いる.table 1をdfに変換
pyspark.sql.DataFrame SparkSession作成によりdataframe、またはrddを得ることができる.toDF()
method of DataFrame
Agg集約操作栗を挙げます:
aggの関数はpysparkに従う必要がある.sql.functionsのメソッド.(処理時columnタイプ)
役割:DFを作成し、DFを1枚のtableに登録し、table上でsqlを実行し、カラムストレージファイル(parquetフォーマット)を読み出す
作成方法
from pyspark.sql import SparkSession
#
spark=SparkSession.bulider.getOrCreate()
SparkSession method
createDataFrame (data,schema)
from pyspark.sql.types import *
data=[("zhaoli",25),("zhaoxiaoli",18)]
schema=StructType([StructField("name",StringType(),nullable=True),\
StructField("age",IntegerType(),nullable=True)]\
)
pyspark.sql.typesのすべてのタイプ
df、table変換はsparkを用いる.table 1をdfに変換
df.createOrReaplaceTempView("table1")
df=spark.table("table1")
pyspark.sql.DataFrame
method of DataFrame
Agg集約操作
from pyspark.sql import functions as F
df.agg(F.min(df.age),F.max(df.age),F.sum(),F.count(),F.avg(),F.countDistinct())
aggの関数はpysparkに従う必要がある.sql.functionsのメソッド.(処理時columnタイプ)