python各種ファイルの読み込み

1501 ワード

mapPartitions()を使用して解析器を再利用
非構造化pythonの読み込み
import json
data = input.map(lambda x : json.loads(x))

  CSV     python
import csv
import StringIO
...
def loadRecord(line):
	"""    csv  """
	input = StringIO.StringIO(line)
	reader = csv.DictReader(input,fieldnames = ["name","favoriteAnimal"])
	return reader.next()
input = sc.textFile(inputFile).map(loadRecord)

ストレージcsv:
def writeRecord(records):
	"""    csv  """
	output =  StringIO.StringIO()
	writer = csv.DictWriter(output,fieldnames=["name","favoriteAnimal"])
	for record in records:
		writer.writerow(record)
	return [output.getvalue()]

pandaLoves.mapPartitions(writeRecords).saveAsTextFile(outputfile)

Spark Sql Hiveデータの読み出し
1、Hive-site.xmlファイルをSparkの./にコピーconf/ディレクトリの下
2、HiveContextオブジェクト、Spark SQLの入口、spark 2を作成する.0以降はsparkSessionAPIを使用してオブジェクトの作成を実現できます.
コードは次のとおりです.
from pyspark.sql import HiveContext

hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name,age From Users")
firstRow = rows.first()
print(firstRow.name)
構造が一致するjsonファイルを取得
#  jsonFile           ROW     RDD
tweets = hiveCtx.jsonFile("tweets.json)
#    RDD       ,         
tweets.registerTempTable('tweets')
results = hiveCtx.sql("SELECT user.name,text FROM tweets")