Spark(Python版)ゼロ基礎学習ノート(二)-Spark Transformationsのまとめと例
1.map(func)はfunc関数をデータセットの各要素に作用させ、新しい分布式のデータセットを生成し、返す
2.filter(func)は、すべてのfunc戻り値がtrueの要素を選択し、新しいデータセットとして返す
3.flatMap(func)はmapと似ているが、各入力のitemはmapから0個以上のitemsに出力することができ、すなわちfuncの戻り値は個別のitemではなくSequenceであるべきである
4.mapPartitions(func)はmapと似ているが、mapPartitionsの入力関数はRDDの各パーティション(block)に単独で作用するため、funcの入力と戻り値は反復器iteratorでなければならない.例えば、RDDに10個の要素0~9があり、3つの領域に分かれていると仮定し、mapPartitionsを使用して各要素の平方を返す.mapメソッドを使用すると、mapの入力関数は10回呼び出されますが、mapPartitionsメソッドを使用すると、入力関数は3回しか呼び出されず、パーティションごとに1回呼び出されます.
5.mapPartitionsWithIndex(func)はmapPartitionsと似ていますが、入力関数funcはパーティションの番号を表す正式なパラメータを提供します.
6.sample(withReplacement,fraction,seed)はデータからサンプリングし、withReplacementは戻しの有無を表し、withReplacement=trueは戻しサンプリングの有無を表し、fractionはサンプリングの確率(0<=fraction<=1)、seedはランダムシードである.例えば、1-100の間からサンプルを抽出し、サンプルに抽出される確率は0.2である
!!!なお、Sparkのsampleサンプリングは、withReplacement=Trueの場合、ポアソンサンプリングを採用することに相当する.withReplacement=Falseの場合、バーヌーリーサンプリングを採用することに相当し、fractionはサンプリングしたサンプルが元のデータ総量に占める割合ではなく、1つの要素がサンプルに抽出される確率を表す.fraction=0.2は、100個の数字のうち20%のデータをサンプルとして抽出することを示すものではなく、各数字がサンプルとして抽出される確率は0.2であり、これらの数字は同じ全体から来ていると考えられ、サンプルの大きさは固定的ではなく、二項分布に従う.
7.union(otherDatate)パラレル・オペレーションでは、ソース・データセットをunionの入力データセットとパラレル・セットし、デフォルトでは重複要素を保持します(重複要素を保持しない場合は、distinct操作で除去できます.下にdistinctを説明するときに説明します).
8.intersection(otherData set)の交差操作で、ソースデータセットとunionの入力データセットを交差させ、新しいデータセットを返します.
9.distinct([numTasks])データセットの重複要素を除去します.
下の一連のtransactionsが使用するキー(Key)という概念は、Key操作に関する次のデータセットをロンドンの各エリア(英語ではward)における学校と学生の人数に関する情報を記録するテーブルとして使用し、アドレスをダウンロードする.https://data.london.gov.uk/dataset/london-schools-atlas/resource/64f771ee-38b1-4eff-8cd2-e9ba31b90685#ダウンロード後にWardtoSecSchoolと名前を付けます.LDS_2015のsheetの中のデータはcsv形式に保存され、最初の行のヘッダーを削除し、schoolと名前を変更した.csvデータフォーマットは、(Ward_CODE,Ward_NAME,TotalWardPupils,Ward 2 Sec_Flow_No.,Secondary_School_URN,Secondary_School_Name,Pupil_count)まず、データの前処理を行います.
注意:1.ローカルからデータを読み込む場合は、コードの「file://」接頭辞でローカルファイルを読み込むように指定します.Spark shellのデフォルトはHDFSのファイルを読み込むため、まずHDFSにアップロードする必要があります.そうしないと、「org.apache.hadoop.mapred.InvalidInputException:Input path does not exist:hdfs://localhost:9000/user/hadoop/school.csv」というエラーが発生しました.2.データセットについて前処理を行い、正則マッチングで文字列を置換した.一部の学校の名前の文字列には「The City Academy,Hackney」などのカンマが含まれているため、この場合csvの区切り文字を利用すれば「The City Academy」と「Hackney」に分割することはできない.csvの区切り文字カンマの後ろにスペースがないことに気づきました.名前の中のカンマの後ろにはスペース(英語で書く習慣)があるので、まずre.subn文を用いてカンマの後ろに少なくとも1つのスペース(正規表現は',[s]+')を含むサブ文字列を置き換え、':',に置き換えてから、後続の操作を行います.以上が、このデータセットに対する前処理手順です.
10.groupByKey([numTasks])は、キー値ペア(K,V)からなるデータセットに作用し、キーと同じデータを一緒に配置し、キー値ペア(K,Iterable)からなるデータセットを返す.注意:1.この操作が、sumやaverageのような各キー上の集約(aggregation)を後続するために行われる場合、reduceByKeyまたはaggregateByKeyを使用すると効率が向上します.2.デフォルトでは、出力の並列度はRDDパーティションの数に依存しますが、オプションのパラメータnumTasksに値を割り当てることで、同時タスクの数を調整することもできます.
11.reduceByKey(func,[numTasks])はキー値ペア(K,V)に作用し、Keyグループを押して、キーと同じキー値ペアのValueをfunc操作し、funcのタイプが満たされなければならないことに注意する値を得る
12.aggregateByKey(zeroValue,seqOp,comOp,[numTasks])は、キー値ペア(K,V)のRDDにおいて、キーでvalueをグループ化し、マージする際に、各valueと初期値をseqOp関数のパラメータとして計算し、返された結果を新しいキー値ペア(K,V)として、その後キーでマージし、最後に各パケットのvalueをcomOp関数に渡して計算し(最初の2つのvalueを計算し、戻り結果と次のvalueをcomOp関数に渡し、これを類推)、keyと計算結果を新しいキー値対(K,V)として出力する.例:上記統計ward内の学生数の操作もaggregateByKeyで実現できるが、この場合seqOpとcomOpはいずれも加算操作を行い、コードは以下の通りである.
13.sortByKey([ascending=True],[numTasks])はKey順に並べ替えられ、ascendingの値はデフォルトでTrueとなり、True/Falseは昇順か降順かを示す.例えば、上記wardをward名降順に並べて上位10個を印刷する
14.join(otherDataset,[numTasks])はSQLの接続操作に似ています.すなわち、キー値ペア(K,V)および(K,W)に作用し、メタグループ(K,(V,W))に戻り、sparkもleftOuterJoin、rightOuterJoin、fullOuterJoinなどの外部接続をサポートします.例:
15.cogroup(otherDataset,[numTasks])は、キー値ペア(K,V)および(K,W)に作用し、メタグループ(K,(Iterable,Iterable))を返す.この操作はgroupWithと呼ばれます
16.cartesian(otherDataset)デカルト積は、データセットTとUに作用し、(T,U)、すなわちデータセットの各要素の2つの組合せを返す
17.pipe(command,[envVars])は、PerlまたはbashスクリプトなどのドライバのRDDをshell処理(外部プロセス)に渡します.RDD要素は標準入力としてスクリプトに渡され、スクリプト処理後の標準出力は新しいRDDとしてドライバに返されます.
18.coalesce(numPartitions)はRDDのパーティション数をnumPartitions個に減少させる.この操作を使用すると、データセットがフィルタリングによってスケールが減少すると、パフォーマンスが向上します.
19.repartition(numPartitions)はデータを再編成し、データはnumPartitions個に再ランダムにパーティション化され、numPartitionsは元より大きくても、元より小さくても、各パーティションをバランスさせることができる.この操作により、データセット全体がネットワークで再シャッフルされます.
20.repartitionAndSortWithinPartitions(partioner)は、与えられたpartioner関数に従ってRDDを再パーティション化し、パーティション内でソートする.これは、ソート操作をshuffleフェーズに移行させるため、先にrepartitionしてからパーティション内でsortよりも効率的である.
>>> a = sc.parallelize(('a', 'b', 'c'))
>>> a.map(lambda x: x+'1').collect()
['a1', 'b1', 'c1']
2.filter(func)は、すべてのfunc戻り値がtrueの要素を選択し、新しいデータセットとして返す
>>> a = sc.parallelize(range(10))
>>> a.filter(lambda x: x%2==0).collect() # 0-9
[0, 2, 4, 6, 8]
3.flatMap(func)はmapと似ているが、各入力のitemはmapから0個以上のitemsに出力することができ、すなわちfuncの戻り値は個別のitemではなくSequenceであるべきである
>>> l = ['I am Tom', 'She is Jenny', 'He is Ben']
>>> a = sc.parallelize(l,3)
>>> a.flatMap(lambda line: line.split()).collect() #
['I', 'am', 'Tom', 'She', 'is', 'Jenny', 'He', 'is', 'Ben']
4.mapPartitions(func)はmapと似ているが、mapPartitionsの入力関数はRDDの各パーティション(block)に単独で作用するため、funcの入力と戻り値は反復器iteratorでなければならない.例えば、RDDに10個の要素0~9があり、3つの領域に分かれていると仮定し、mapPartitionsを使用して各要素の平方を返す.mapメソッドを使用すると、mapの入力関数は10回呼び出されますが、mapPartitionsメソッドを使用すると、入力関数は3回しか呼び出されず、パーティションごとに1回呼び出されます.
>>> def squareFunc(a):
. . . for i in a:
. . . yield i*i
. . .
>>> a = sc.parallelize(range(10), 3)
PythonRDD[1] at RDD at PythonRDD.scala:48
>>> a.mapPartitions(squareFunc).collect()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
5.mapPartitionsWithIndex(func)はmapPartitionsと似ていますが、入力関数funcはパーティションの番号を表す正式なパラメータを提供します.
>>> def func(index, iterator): #
. . . yield (‘index ‘ + str(index) + ’ is: ‘ + str(list(iterator)))
. . .
>>> a = sc.parallelize(range(10),3)
>>> a.mapPartitionsWithIndex(func).collect()
['index 0 is: [0, 1, 2]', 'index 1 is: [3, 4, 5]', 'index 2 is: [6, 7, 8, 9]']
>>> def squareIndex(index, iterator): #
... for i in iterator:
... yield ("The index is: " + str(index) + ", and the square is: " + str(i*i))
...
>>> a.mapPartitionsWithIndex(squareIndex).collect()
['The index is: 0, and the square is: 0', 'The index is: 0, and the square is: 1', 'The index is: 1, and the square is: 4', 'The index is: 1, and the square is: 9', 'The index is: 1, and the square is: 16', 'The index is: 2, and the square is: 25', 'The index is: 2, and the square is: 36', 'The index is: 3, and the square is: 49', 'The index is: 3, and the square is: 64', 'The index is: 3, and the square is: 81']
6.sample(withReplacement,fraction,seed)はデータからサンプリングし、withReplacementは戻しの有無を表し、withReplacement=trueは戻しサンプリングの有無を表し、fractionはサンプリングの確率(0<=fraction<=1)、seedはランダムシードである.例えば、1-100の間からサンプルを抽出し、サンプルに抽出される確率は0.2である
>>> data = sc.parallelize(range(1,101),2)
>>> sample = data.sample(True, 0.2)
>>> sampleData.count()
19
>>> sampleData.collect()
[16, 19, 24, 29, 32, 33, 44, 45, 55, 56, 56, 57, 65, 65, 73, 83, 84, 92, 96]
!!!なお、Sparkのsampleサンプリングは、withReplacement=Trueの場合、ポアソンサンプリングを採用することに相当する.withReplacement=Falseの場合、バーヌーリーサンプリングを採用することに相当し、fractionはサンプリングしたサンプルが元のデータ総量に占める割合ではなく、1つの要素がサンプルに抽出される確率を表す.fraction=0.2は、100個の数字のうち20%のデータをサンプルとして抽出することを示すものではなく、各数字がサンプルとして抽出される確率は0.2であり、これらの数字は同じ全体から来ていると考えられ、サンプルの大きさは固定的ではなく、二項分布に従う.
7.union(otherDatate)パラレル・オペレーションでは、ソース・データセットをunionの入力データセットとパラレル・セットし、デフォルトでは重複要素を保持します(重複要素を保持しない場合は、distinct操作で除去できます.下にdistinctを説明するときに説明します).
>>> data1 = sc.parallelize(range(10))
>>> data2 = sc.parallelize(range(6,15))
>>> data1.union(data2).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 12, 13, 14]
8.intersection(otherData set)の交差操作で、ソースデータセットとunionの入力データセットを交差させ、新しいデータセットを返します.
>>> data1 = sc.parallelize(range(10))
>>> data2 = sc.parallelize(range(6,15))
>>> data1.intersection(data2).collect()
[8, 9, 6, 7]
9.distinct([numTasks])データセットの重複要素を除去します.
>>> data1 = sc.parallelize(range(10))
>>> data2 = sc.parallelize(range(6,15))
>>> data1.union(data2).distinct().collect()
[0, 8, 1, 9, 2, 10, 11, 3, 12, 4, 5, 13, 14, 6, 7]
下の一連のtransactionsが使用するキー(Key)という概念は、Key操作に関する次のデータセットをロンドンの各エリア(英語ではward)における学校と学生の人数に関する情報を記録するテーブルとして使用し、アドレスをダウンロードする.https://data.london.gov.uk/dataset/london-schools-atlas/resource/64f771ee-38b1-4eff-8cd2-e9ba31b90685#ダウンロード後にWardtoSecSchoolと名前を付けます.LDS_2015のsheetの中のデータはcsv形式に保存され、最初の行のヘッダーを削除し、schoolと名前を変更した.csvデータフォーマットは、(Ward_CODE,Ward_NAME,TotalWardPupils,Ward 2 Sec_Flow_No.,Secondary_School_URN,Secondary_School_Name,Pupil_count)まず、データの前処理を行います.
>>> school = sc.textFile("file:///home/yang/ /school.csv")
Data = sc.textFile("file:///home/yang/ /school.csv")
>>> school.count() # 16796
16796
>>> import re # python
>>> rows = school.map(lambda line: re.subn(',[\s]+',': ', line))
注意:1.ローカルからデータを読み込む場合は、コードの「file://」接頭辞でローカルファイルを読み込むように指定します.Spark shellのデフォルトはHDFSのファイルを読み込むため、まずHDFSにアップロードする必要があります.そうしないと、「org.apache.hadoop.mapred.InvalidInputException:Input path does not exist:hdfs://localhost:9000/user/hadoop/school.csv」というエラーが発生しました.2.データセットについて前処理を行い、正則マッチングで文字列を置換した.一部の学校の名前の文字列には「The City Academy,Hackney」などのカンマが含まれているため、この場合csvの区切り文字を利用すれば「The City Academy」と「Hackney」に分割することはできない.csvの区切り文字カンマの後ろにスペースがないことに気づきました.名前の中のカンマの後ろにはスペース(英語で書く習慣)があるので、まずre.subn文を用いてカンマの後ろに少なくとも1つのスペース(正規表現は',[s]+')を含むサブ文字列を置き換え、':',に置き換えてから、後続の操作を行います.以上が、このデータセットに対する前処理手順です.
10.groupByKey([numTasks])は、キー値ペア(K,V)からなるデータセットに作用し、キーと同じデータを一緒に配置し、キー値ペア(K,Iterable)からなるデータセットを返す.注意:1.この操作が、sumやaverageのような各キー上の集約(aggregation)を後続するために行われる場合、reduceByKeyまたはaggregateByKeyを使用すると効率が向上します.2.デフォルトでは、出力の並列度はRDDパーティションの数に依存しますが、オプションのパラメータnumTasksに値を割り当てることで、同時タスクの数を調整することもできます.
>>> newRows = rows.map(lambda r: r[0].split(','))
>>> ward_schoolname = newRows .map(lambda r: (r[1], r[5])).groupByKey() # r[1] ward ,r[5]
>>> ward_schoolname.map(lambda x: {x[0]: list(x[1])}).collect() # ward
[{'Stifford Clays': ['William Edwards School', 'Brentwood County High School', "The Coopers' Company and Coborn School", 'Becket Keys Church of England Free School', ...] # Stifford Clays ward William Edwards School,Brentwood County High School,The Coopers' Company and Coborn School ...
11.reduceByKey(func,[numTasks])はキー値ペア(K,V)に作用し、Keyグループを押して、キーと同じキー値ペアのValueをfunc操作し、funcのタイプが満たされなければならないことに注意する値を得る
>>> pupils = newRows.map(lambda r: (r[1], int(r[6]))) # r[1] ward ,r[6]
>>> ward_pupils = pupils.reduceByKey(lambda x, y: x+y) # ward
>>> ward_pupils.collect() # ward
[('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526), ('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835), ('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585), ('Camberwell Green', 1374), ('Glyndon', 4633),...]
12.aggregateByKey(zeroValue,seqOp,comOp,[numTasks])は、キー値ペア(K,V)のRDDにおいて、キーでvalueをグループ化し、マージする際に、各valueと初期値をseqOp関数のパラメータとして計算し、返された結果を新しいキー値ペア(K,V)として、その後キーでマージし、最後に各パケットのvalueをcomOp関数に渡して計算し(最初の2つのvalueを計算し、戻り結果と次のvalueをcomOp関数に渡し、これを類推)、keyと計算結果を新しいキー値対(K,V)として出力する.例:上記統計ward内の学生数の操作もaggregateByKeyで実現できるが、この場合seqOpとcomOpはいずれも加算操作を行い、コードは以下の通りである.
>>> ward_pupils = pupils.aggregateByKey(0, lambda x, y: x+y, lambda x, y: x+y)
>>> ward_pupils.collect()
[('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526), ('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835), ('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585), ('Camberwell Green', 1374), ('Glyndon', 4633),...]
13.sortByKey([ascending=True],[numTasks])はKey順に並べ替えられ、ascendingの値はデフォルトでTrueとなり、True/Falseは昇順か降順かを示す.例えば、上記wardをward名降順に並べて上位10個を印刷する
>>> ward_pupils.sortByKey(False, 4).take(10)
[('Yiewsley', 2560), ('Wormholt and White City', 1455), ('Woodside', 1204), ('Woodhouse', 2930), ('Woodcote', 1214), ('Winchmore Hill', 1116), ('Wilmington', 2243), ('Willesden Green', 1896), ('Whitefoot', 676), ('Whalebone', 2294)]
14.join(otherDataset,[numTasks])はSQLの接続操作に似ています.すなわち、キー値ペア(K,V)および(K,W)に作用し、メタグループ(K,(V,W))に戻り、sparkもleftOuterJoin、rightOuterJoin、fullOuterJoinなどの外部接続をサポートします.例:
>>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended'))
>>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended'))
>>> class1.join(class2).collect()
[('Tom', ('attended', 'attended'))]
>>> class1.leftOuterJoin(class2).collect()
[('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None))]
>>> class1.rightOuterJoin(class2).collect()
[('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]
>>> class1.fullOuterJoin(class2).collect()
[('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None)), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]
15.cogroup(otherDataset,[numTasks])は、キー値ペア(K,V)および(K,W)に作用し、メタグループ(K,(Iterable,Iterable))を返す.この操作はgroupWithと呼ばれます
>>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended'))
>>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended'))
>>> group = class1.cogroup(class2)
>>> group.collect()
[('John', (0x7fb7e808afd0>, 0x7fb7e808a1d0>)), ('Tom', (0x7fb7e808a7f0>, 0x7fb7e808a048>)), ('Jenny', (0x7fb7e808a9b0>, 0x7fb7e808a208>)), ('Bob', (0x7fb7e808ae80>, 0x7fb7e8b448d0>)), ('Amy', (0x7fb7e8b44c88>, 0x7fb7e8b44588>)), ('Alice', (0x7fb7e8b44748>, 0x7fb7e8b44f98>))]
>>> group.map(lambda x: {x[0]: [list(x[1][0]), list(x[1][1])]}).collect()
[{'John': [[], ['attended']]}, {'Tom': [['attended'], ['attended']]}, {'Jenny': [['attended'], []]}, {'Bob': [['attended'], []]}, {'Amy': [[], ['attended']]}, {'Alice': [[], ['attended']]}]
16.cartesian(otherDataset)デカルト積は、データセットTとUに作用し、(T,U)、すなわちデータセットの各要素の2つの組合せを返す
>>> a = sc.parallelize(('a', 'b', 'c'))
>>> b = sc.parallelize(('d', 'e', 'f'))
>>> a.cartesian(b).collect()
[('a', 'd'), ('a', 'e'), ('a', 'f'), ('b', 'd'), ('b', 'e'), ('b', 'f'), ('c', 'd'), ('c', 'e'), ('c', 'f')]
17.pipe(command,[envVars])は、PerlまたはbashスクリプトなどのドライバのRDDをshell処理(外部プロセス)に渡します.RDD要素は標準入力としてスクリプトに渡され、スクリプト処理後の標準出力は新しいRDDとしてドライバに返されます.
18.coalesce(numPartitions)はRDDのパーティション数をnumPartitions個に減少させる.この操作を使用すると、データセットがフィルタリングによってスケールが減少すると、パフォーマンスが向上します.
19.repartition(numPartitions)はデータを再編成し、データはnumPartitions個に再ランダムにパーティション化され、numPartitionsは元より大きくても、元より小さくても、各パーティションをバランスさせることができる.この操作により、データセット全体がネットワークで再シャッフルされます.
20.repartitionAndSortWithinPartitions(partioner)は、与えられたpartioner関数に従ってRDDを再パーティション化し、パーティション内でソートする.これは、ソート操作をshuffleフェーズに移行させるため、先にrepartitionしてからパーティション内でsortよりも効率的である.