【原】Learning Spark(Python版)学習ノート(二)――キー値ペア、データ読み込みと保存、共有特性

41135 ワード

先周更新するはずだったのに、メーデーにぶつかって、だるい癌が発作して、延期になりました==.これからも時間通りに任務を完成しなければならない.くだらないことを言わないで、第4章-第6章は主に3つの内容を話しました:キー値の対、データの読み取りと保存とSparkの2つの共有特性(アキュムレータと放送変数).
キー値ペア(PaiRDD)
1.作成
1 # Python               pairRDD,  map()  
2 pairs = lines.map(lambda x:(x.split(" ")[0],x))

 
2.変換(Transformation)
変換操作はreduceByKey,foldByKey(),combineByKey()などが多く,通常のRDDにおけるreduce(),fold(),aggregate()などと類似しており,キーによる操作にすぎない.
 
reduceByKey():recude()と同様、キーによる集約にすぎません
foldByKey():fold()と類似
combineByKey():aggregate()と類似
 
 1 # Python          
 2 result = pairs.filter(lambda keyValue:len(keyValue[1]) < 20)
 3 
 4 # Python   reduceByKey() mapValues()           
 5 rdd.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
 6 
 7 # Python      
 8 rdd.sc.textFile("    ")
 9 words = rdd.flatMap(lambda x:x.split(" "))
10 result = words.map(lambda x:(x,1)).reduceByKey((x,y)=>x+y)
11 
12 # Python   combineByKey()          
13 sumCount = nums.combineByKey((lambda x:(x,1)),
14                              (lambda x,y:(x[0]+y,x[1]+1)),
15                              (lambda x,y:(x[0]+y[0],x[1]+y[1])))
16 sumCount.map(lambda key,xy:(key.xy[0]/xy[1])).collectAsMap()
17 
18 # Python    reduceByKey()    
19 data = [("a",3),("b",4),("a",1)]
20 sc.parallelize(data).reduceByKey(lambda x,y:x+y)#     
21 sc.parallelize(data).reduceByKey(lambda x,y:x+y,10)#      
22 
23 # Python                 
24 rdd.sortByKey(ascending = True,numPartitions = None,keyFunc = lambda x: str(x))

 
3.行動操作(Action)
≪データ・パーティション|Data Partition|oem_src≫:データが比較的大きい場合、partitionBy()を使用してハッシュ・パーティションに変換できます.すなわち、partitionByにspark.HashPartitionerオブジェクトを渡すことによって、この動作が実現される.PythonではHashPartitionrオブジェクトをpartionByに渡すことはできません.必要なパーティション数(rdd.partitionBy(100)など)を渡すだけです.
sparkでは、生成された結果RDDにパーティション方式を設定する操作として、cogroup()、groupWith()、join()、leftOuterJoin()、rightOutJoin、groupByKey()、reduceByKey()、combineByKey()、partitionByy()、sort()、mapValues()、flatMapValues()、filter().最後の3つは、親RDDにパーティション方式がある場合にのみ、結果としてRDDにパーティションRDDがある.他の操作で生成された結果には、特定のパーティション方式は存在しません.
カスタムパーティション方式:
#Python       
import urlparse

def hash_domain(url):
    return hash(urlparse.urlparse(url).netloc)

rdd.partitionBy(20,hash_domain)   #  20   

 
 
データの読み込みと保存
ファイル形式
書式名
構造化
コメント
テキストファイル
いいえ
通常のテキストファイル、行ごとに1つのレコード
JSON
はんこうぞう
一般的なテキストベースのフォーマット、半構造化;ほとんどのライブラリでは、ローごとにレコードが必要です.
CSV
はい
一般的なテキスト構造
SequenceFile
はい
キー値対データの一般的なHadoopファイルフォーマット
Protocol buffers
はい
高速で読み取り、スペースを節約する言語間フォーマット
オブジェクトファイル
はい
Sparkジョブのデータを格納して共有コードを読み込む.クラスを変更するときに失効します.Javaシーケンス化に依存するため
テキストファイル
1 #      
2 input=sc.textFile("    ")
3 #      
4 result.saveAsTextFile(outputFile)

 
JSON
1 #  Jason
2 import json
3 data = input.map(lambda x: json.loads(x))
4 #  
5 (data.filter(lambda x : x["lovaPandas"]).map(lambda x:json.dumps(x))).saveAsTextFile(outputF

 
CSVファイル
 1 # textFile  csv
 2 import csv
 3 import StringIO
 4 def loadRecord(line):
 5     """    csv  """
 6     input = StringIO.StringIO(line)
 7     reader = csv.DictReader(input,filenames =["name","favouriteAnimal"])
 8     return reader.next()
 9 input = sc.textFile(inputFile).map(loadRecord)
10 
11 #    csv
12 def loadRecords(filenameContents):
13     """            """
14     input  = StringIO.StringIO(filenameContents[1])
15     reader = csv.DictReader(input,fieldnames = ["name","favouriteAnimal"])
16     return reader
17 fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
18 
19 #  csv
20 def writeRecords(records):
21     """    csv  """
22     output = StringIO.StringIO()
23     writer = csv.DictReader(output,filenames = ["name","favouriteAnimal"])
24     for record in records:
25         writer.writerow(record)
26     return [output.getvalue()]
27 pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

 
SequenceFile
1 #  SequenceFile
2 val data = sc.sequenceFile(inFile,"ord.apache.hadoop.io.Text","org.apache.hadoop.io.InWitable")

 
オブジェクトファイル
 1#オブジェクトファイル、Javaでシーケンス化書き、速度が遅い、保存用saveAsObjectFile()、読み出し用SparkContextのobjectFile()関数でパスを受信し、対応するRDDを返します.Pythonでは使用できません 
 
Spark SQLの構造化データ
Apache Hive
1 #Apache Hive
2 # Python  HiveContext     
3 from pyspark.sql import HiveContext
4 
5 hiveCtx =HiveContext(sc)
6 rows = hiveCtx.sql("SELECT name, age FROM users ")
7 firstRow = rows.first()
8 print firstRow.name

 
JSONデータ
 1 #JSON    
 2 {"user":{"name":"Holden","location":"San Francisco"},"text":"Nice day out today"}
 3 {"user":{"name":"Matei","location":"Berkeley"},"text":"Even nicer here :)"}
 4 
 5 # Python   SparkSQL  JSON  
 6 tweets = hiveCtx.jsonFile("tweets.json")
 7 # UserWarning: jsonFile is deprecated. Use read.json() instead.
 8 # warnings.warn("jsonFile is deprecated. Use read.json() instead.")
 9 
10 tweets.registerTempTable("tweets")
11 results = hiveCtx.sql("SELECT user.name,text FROM tweets")

  
この章ではsqlに関するコマンドは少ないですが、SQLに関する他のコマンドはSparkの公式ドキュメント(PySpark 1.6.1 documentation)を見て、詳しく説明しています.注意してください.これはspark 1.6バージョンです.1.2バージョンをインストールしている場合は、1.6のコマンドが使えない場合は、アップグレードしてから使用することができます.
 
最後にSparkにおける2種類の共有変数について述べる:アキュムレータ(accumulator)とブロードキャスト変数(broadcast variable)
≪アキュムレータ|アキュムレータ|oem_src≫:情報を集約します.デバッグ時にジョブをカウントするのが一般的です.例を挙げると、コール・リストに対応するログをファイルから読み込み、入力ファイルに空白行がどれだけあるかを知りたい場合にアキュムレータを使用できます.例:
 
1 #  JSON         
2 #    :             。       ,           ,                。                  ,        
3 {"address":"address here","band":"40m","callSigns":"KK6JLK","city":"SUNNYVALE",
4  "contactlat":"37.384733","contactlong":"-122.032164",
5  "county":"Santa Clara","dxcc":"291","fullname":"MATTHEW McPherrin",
6  "id":57779,"mode":"FM","mylat":"37.751952821","mylong":"-122.4208688735",...}

 
 1 # Python     
 2 file = sc.textFile(inputFile)
 3 #  Accumulator[int]      0
 4 blankLines = sc.accumulator(0)
 5 
 6 def extractCallSigns(line):
 7     global blankLines  #      
 8     if (line == ""):
 9         blankLines += 1
10     return line.split(" ")
11 
12 callSigns = file.flatMap(extractCallSigns)
13 callSigns.saveAsTextFile(outputDir + "/callSigns")
14 print "Blank Lines:%d " % blankLines.value

 
このプログラムを見てみましょう.まずblankLinesというAccumulator[Int]オブジェクトを作成し、入力中に空の行を見ると+1になります.変換操作を実行するとアキュムレータの値が印刷されます.注意:saveAsTextFile()というaction操作を実行してから正しいカウントが表示されます.flatMap()transformation操作であり、惰性であることは前のブログで述べた.
 
しかし、前の記事ではreduce()にも言及しています.などのような操作も集約操作なのですが、なぜアキュムレータというものが存在するのでしょうか.RDD自体が提供する同期メカニズムの粒度が太すぎるため、特にtransformation操作では変数状態が同期できず、アキュムレータはRDD自体の範囲や粒度とは異なる値を集約することができますが、write-onlyの変数であり、この値を読み取ることはできませんを選択すると、ドライバでvalueメソッドを使用してアキュムレータの値を読み込むことができます.
  
アキュムレータの使用方法:
  • ドライブでSparkContext.accumulator(initialValue)メソッドを呼び出すことで、初期値が格納されたアキュムレータを作成します.戻り値はorg.apache.spark.Accumulator[T]オブジェクトで、Tは初期値initialValueのタイプです.
  • Spark閉パッケージ内のエフェクタコードは、アキュムレータの+=メソッド(Javaではadd)を使用してアキュムレータの値を増加させることができる.
  • ドライバプログラムは、アキュムレータの値(Javaでvalue()またはsetValue()を使用)
  • にアクセスするためにアキュムレータのValue属性を呼び出すことができる.
     
    以前のデータについては、さらに計算することができます.
     1 # Python            
     2 #            
     3 validSignCount = sc.accumulator(0)
     4 invalidSignCount = sc.accumulator(0)
     5 
     6 def validataSign(sign):
     7     global validSignCount,invalidSignCount
     8     if re.match(r"\A\d?[a-zA-Z]{1,2}\d{1,4}[a-zA-Z]{1,3}\Z",sign):
     9         validSignCount += 1
    10         return True
    11     else:
    12         invalidSignCount += 1
    13         return False
    14 
    15 #               
    16 validSigns = callings.filter(validataSign)
    17 contactCount = validSigns.map(lambda sign:(sign,1)).reduceByKey(lambda (x,y):x+y)
    18 
    19 #        
    20 contactCount.count()
    21 if validSignCount.value < 0.1 * validSignCount.value:
    22     contactCount.saveAsTextFile(outputDir + "/contactCount")
    23 else:
    24     print "Too many errors: %d in %d" %(invalidSignCount.value,validSignCount.value)

     
    アキュムレータとフォールトトレランス:
    Sparkは分散型計算であることを知っています.一部のマシンで実行が遅い場合やエラーが発生した場合、Sparkはこれらの失敗したタスクや遅いタスクを自動的に再実行します.これにより、同じ関数が同じデータに対して複数回実行される可能性があります.簡単に言えば、メモリが消費され、計算速度が低下します.この場合、アキュムレータはどのように処理しますか.
    Action操作で使用するアキュムレータの場合、Sparkは各タスクのアキュムレータへの変更を一度だけ適用し、foreach()操作に一般的に配置します.Transformation操作のアキュムレータの場合、一度だけ更新しない場合があります.したがって、Transformationのアキュムレータはデバッグでのみ使用することが望ましいです.
     
    ブロードキャスト変数
    ブロードキャスト変数を使用すると、プログラマは、タスクごとにコピーを保存するのではなく、読み取り専用の変数を各マシンにキャッシュできます.ブロードキャスト変数を使用すると、より効率的な方法で大きなデータ量入力セットのコピーを各ノードに割り当てることができます.ブロードキャスト変数は、2つの側面でデータ共有効率を向上させます.1、クラスタ内の各ノード(物理機器)コピーは1つのみであり、デフォルトの閉パケットはタスクごとに1つのコピーである.2、ブロードキャスト伝送はBTダウンロードモード、すなわちP 2 Pダウンロードにより実現され、クラスタが多い場合、データ伝送速度を極めて向上させることができる.ブロードキャスト変数が変更された後、他のノードにフィードバックしない.
     Sparkでは、参照されたすべての変数が自動的にワークノードに送信されます.これは便利ですが、非常に効果的ではありません.1つは、デフォルトのタスク送信メカニズムが小さなタスクのために最適化されていることです.2つは、実際の過程で複数の並列操作で同じ変数を使用する可能性がありますが、Sparkはそれぞれの操作にこの変数を送信します.例を挙げます.,呼び出し記号の接頭辞で国を検索し,Sparkで直接次のように実現すると仮定する.
     
     1 # Python     
     2 #  RDD contactCounts         ,                 
     3 signPrefixes = loadCallSignTable()
     4 
     5 def processSignCount(sign_count,signPrefixes):
     6     country = lookupCountry(sign_count[0],signPrefixes)
     7     count = sign_count[1]
     8     return (country,count)
     9 
    10 countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x,y:x+y)))

     
    データ量が小さいときは実行できますが、このテーブルが大きいとsignPrefixesのMBレベルに達しやすく、プライマリノードからタスクごとにこのような配列を送信するとメモリが非常に消費され、その後signPrefixesという変数を使用する必要がある場合は、ノードごとにもう一度送信する必要があります.
    signPrefixesをブロードキャスト変数に変更すれば、この問題を解決できます.
     
     1 # Python            
     2 #  RDD contactCounts         ,                 
     3 signPrefixes = sc.broadcast(loadCallSignTable())
     4 
     5 
     6 def processSignCount(sign_count,signPrefixes):
     7     country = lookupCountry(sign_count[0],signPrefixes.value)
     8     count = sign_count[1]
     9     return (country,count)
    10 
    11 countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x,y:x+y)))
    12 
    13 countryContactCounts.saveAsTextFile(outputDir +"/contries.txt")

     
    ブロードキャスト変数のプロセスをまとめます.
  • は、タイプTのオブジェクトに対してSparkContext.broadcastを呼び出すことによってBroadcast[T]オブジェクトを作成します.シーケンス化可能な任意のオブジェクトは、このように実装できます.
  • valueプロパティを介してオブジェクトにアクセスする値
  • 変数は各ノードに1回しか送信されず、読み取り専用値として処理する必要があります(この値を変更しても他のノードには影響しません).
  •  
    ブロードキャストの最適化
    ブロードキャストの値が大きい場合は、高速で良好なシーケンス化フォーマットを選択できます.ScalaおよびJava APIでは、デフォルトでJavaシーケンス化ライブラリが使用されます.基本的なタイプの配列を除く任意のオブジェクトに対して非効率です.spark.serializerプロパティを使用して別のシーケンス化ライブラリを選択してシーケンス化プロセスを最適化できます.(reduce()も使用できます).メソッドはPythonのpickleライブラリカスタムシーケンス化)
     
    パーティションベースの操作
    2つの関数:map()とforeach()
    関数名
    提供された
    リターン
    RDD[T]の関数署名について
    mapPartitions()
    パーティション内の要素の反復
    戻る要素の反復
    f:(Iterator[T])->Iterator[U]
    mapPartitionsWithIndex()
    パーティション番号、および各パーティション内の要素の反復
    戻る要素の反復
    f:(Int,Iterator[T])->Iterator[U]
    foreachPartitions()
    元素反復器
    なし
    f:(Iterator[T])->Unit
    例:このデータベースを使用して、ログに記録された連絡先のコールリストをクエリーできるオンラインのラジオ局のコールデータがあります.
     1 # Python        
     2 def processCallSigns(signs):
     3     """         """
     4     #       
     5     http = urllib3.PoolManager()
     6     #          URL
     7     urls = map(lambda x: "http://73s.com/qsos/%s.json" % x,signs)
     8     #    (   )
     9     requests = map(lambda x:(x,http.request('GET',x)),urls)
    10     #    
    11     result = map(lambda x:(x[0],json.loads(x[1].data)),requests)
    12     #         
    13     return filter(lambda x:x[1] is not None,result)
    14 
    15 def fetchCallSigns(input):
    16     """    """
    17     return input.mapPartitions(lambda callsigns:processCallSigns(callsigns))
    18 
    19 contactsCountList = fetchCallSigns(validSigns)

     
    mapPartitions()の機能を例に挙げて説明します.
     
     1 # Python    mapPartitions()    
     2 def combineCtrs(c1,c2):
     3     return (c1[0]+c2[0],c1[1]+c2[1])
     4 
     5 def basicAvg(nums):
     6     """     """
     7     nums.map(lambda num:(num,1)).reduce(combineCtrs)
     8 
     9 
    10 
    11 # Python   mapPartitions()    
    12 def partitionCtr(nums):
    13     """     sumCounter"""
    14     sumCount = [0,0]
    15     for num in nums:
    16         sumCount[0] +=num
    17         sumCount[1] +=1
    18     return [sumCount]
    19 
    20 def fastAvg(nums):
    21     """     """
    22     sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)
    23     return sumCount[0]/float(sumCount[1])

     
    数値RDDの動作
    方法
    意味
    count()
    RDDの要素数
    mean()
    要素の平均
    sum()
    合計
    max()
    最大値
    min()
    最小値
    variance()
    元素の分散
    sampleVariance()
    サンプリングの分散
    stdev()
    ひょうじゅんさ
    sampleStdev()
    サンプリングの標準偏差
     
    例:コールログから遠く離れた連絡先を削除する
    1 # Python     
    2 #  String   RDD       ,                
    3 distanceNumerics = distances.map(lambda string :float(string))
    4 stats = distanceNumerics.stats()
    5 stddev = stdts.stdev()
    6 mean  =stats.mean()
    7 reasonableDistances = distanceNumerics.filter(lambda x:math.fabs(x-mean) < 3 * stddev)
    8 print reasonableDistances.collect()

     
    この3章の内容は比較的に実用的で、生産の中でも実際の応用があります.来週7-9章を更新して、主にSparkのクラスタ上の運行、Sparkの調整とデバッグとSpark SQLについて話します.