python各種ファイルの読み込み
1501 ワード
mapPartitions()を使用して解析器を再利用
非構造化pythonの読み込み
ストレージcsv:
Spark Sql Hiveデータの読み出し
1、Hive-site.xmlファイルをSparkの./にコピーconf/ディレクトリの下
2、HiveContextオブジェクト、Spark SQLの入口、spark 2を作成する.0以降はsparkSessionAPIを使用してオブジェクトの作成を実現できます.
コードは次のとおりです.
非構造化pythonの読み込み
import json
data = input.map(lambda x : json.loads(x))
CSV python
import csv
import StringIO
...
def loadRecord(line):
""" csv """
input = StringIO.StringIO(line)
reader = csv.DictReader(input,fieldnames = ["name","favoriteAnimal"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
ストレージcsv:
def writeRecord(records):
""" csv """
output = StringIO.StringIO()
writer = csv.DictWriter(output,fieldnames=["name","favoriteAnimal"])
for record in records:
writer.writerow(record)
return [output.getvalue()]
pandaLoves.mapPartitions(writeRecords).saveAsTextFile(outputfile)
Spark Sql Hiveデータの読み出し
1、Hive-site.xmlファイルをSparkの./にコピーconf/ディレクトリの下
2、HiveContextオブジェクト、Spark SQLの入口、spark 2を作成する.0以降はsparkSessionAPIを使用してオブジェクトの作成を実現できます.
コードは次のとおりです.
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name,age From Users")
firstRow = rows.first()
print(firstRow.name)
構造が一致するjsonファイルを取得# jsonFile ROW RDD
tweets = hiveCtx.jsonFile("tweets.json)
# RDD ,
tweets.registerTempTable('tweets')
results = hiveCtx.sql("SELECT user.name,text FROM tweets")