RDD Operation(1) -Transformations

11705 ワード

(1)、(2)、および(3)を含むRDD動作を練習する.今、私たちは最初の伝送を学びます.

リファレンス


(1) Transformations
(2) Actions
(3) MapReduce

目次

  • Transformations
  • Map
  • Filter
  • Flatmap
  • 深化
  • Numpyの練習のように、簡単なサンプルコードを使ってよく使われる関数を勉強したほうがいいです.
    基本的な動作を見てみましょう.この操作でRDDの計算プロセスを検証しましょう.
    MapReduceについてまだ理解していない方は下記のリンクを参考にしてください!
    Split-Apply-Combine Strategy

    1.Transformations

  • map()
  • filter()
  • flatmap()
  • 2. map



    [ソース:トレーニング.databricks]
    map関数をxのすべての要素に適用した結果はy値であった.x,yの要素数は同じである.
    x = sc.parallelize(["b", "a", "c", "d"])
    y = x.map(lambda z: (z, 1))
    print(x.collect()) # collect()는 actions입니다.
    print(y.collect())
    
    ['b', 'a', 'c', 'd']
    [('b', 1), ('a', 1), ('c', 1), ('d', 1)]
    (z,1)形式で「b」、「a」、「c」および「d」のtupleリンゴ値を表示します.
    別の例です.
    nums = sc.parallelize([1,2,3])
    squares = nums.map(lambda x: x*x)
    print(squares.collect())
    
    [1, 4, 9]
    [1,2,3]各要素の二乗はリンゴ欠乏値として表される.

    3. filter()



    [ソース:トレーニング.databricks]
    filter演算は、いくつかの条件を満たす値のみを返します.したがって、条件文には、xとyの数が異なる場合があります.
    x = sc.parallelize([1,2,3,4,5])
    y = x.filter(lambda x: x%2 == 0)
    print(x.collect())
    print(y.collect())
    
    [1, 2, 3, 4, 5]
    [2, 4]
    集計は偶数の場合にのみyに適用されるため、2つの値は異なります.
    text = sc.parallelize(['a','b','c','d'])
    capital = text.map(lambda x : x.upper())
    A = capital.filter(lambda x : 'A' in x)
    print(text.collect())
    print(A.collect())
    
    ['a', 'b', 'c', 'd']
    ['A']
    textを大文字に変換し、「A」があれば「A」のコードを返します.

    4. flatmap


    この図はflatmapの演算過程を示している.




    [ソース:トレーニング.databricks]
    FlatMapはRDDの要素に対してmap演算を行い、要素の数を増やす.要素の個数は同じ数を増やす必要はありません.

    例を見てみましょう.
    x = sc.parallelize([1,2,3])
    y = x.flatmap(lambda x: (x, x*10, 30))
    print(x.collect())
    print(y.collect())
    
    [1, 2, 3]
    [1, 10, 30, 2, 20, 30, 3, 30, 30]
    [xの値,xに10を乗じた値,30]この3つの場合を繰り返す.
    次の例では、いくつかのRDD変換を1つのローにネストします.複雑に見えますが、collect()を使用して変換関数を適用する効果を単独で適用することで、結果を簡単に理解できます.
    wordsDataset = sc.parallelize(["Spark is funny", "It is beautiful", "And also It is implemented by python"])
    result = wordsDataset.flatMap(lambda x: x.split()).filter(lambda x: x != " ").map(lambda x: x.lower())
    # 공백을 제거합니다.
    # 단어를 공백 기준으로 split합니다.
    result.collect()
    
    ['spark',
     'is',
     'funny',
     'it',
     'is',
     'beautiful',
     'and',
     'also',
     'it',
     'is',
     'implemented',
     'by',
     'python']

    5.深化:CSVファイルの検索


    上記のRDD変換関数に基づいて,より現実的な例を論じる.
    多くのデータはcsvファイルです.有名なTitanicデータセットファイルをSparkに読み込む方法を練習しましょう.
    import os
    csv_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/train.csv'
    csv_data_0 = sc.textFile(csv_path)
    csv_data_0.take(5)
    
    ['survived,sex,age,n_siblings_spouses,parch,fare,class,deck,embark_town,alone',
     '0,male,22.0,1,0,7.25,Third,unknown,Southampton,n',
     '1,female,38.0,1,0,71.2833,First,C,Cherbourg,n',
     '1,female,26.0,0,0,7.925,Third,unknown,Southampton,y',
     '1,female,35.0,1,0,53.1,First,C,Southampton,n']
    最初の5つの例のみをロードしたファイルをロードしました.これをデータセットに設定するには、最初の行の列部分を分離し、各データ行(col 1、data 1)、(col 2、data 2)、...交換したいリスト形式です.
    csv_data_1 = csv.data_0.filter(lambda line: len(line) > 1).map(lambda line: line.split(","))
    csv_data_1.take(5)
    
    [['survived',
      'sex',
      'age',
      'n_siblings_spouses',
      'parch',
      'fare',
      'class',
      'deck',
      'embark_town',
      'alone'],
     ['0',
      'male',
      '22.0',
      '1',
      '0',
      '7.25',
      'Third',
      'unknown',
      'Southampton',
      'n'],
     ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'],
     ['1',
      'female',
      '26.0',
      '0',
      '0',
      '7.925',
      'Third',
      'unknown',
      'Southampton',
      'y'],
     ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n']]
    柱状部分だけを分離することもできます!
    columns = csv_data_1.take(1)
    columns
    
    [['survived',
      'sex',
      'age',
      'n_siblings_spouses',
      'parch',
      'fare',
      'class',
      'deck',
      'embark_town',
      'alone']]
    カラム以外のデータのみを分離する方法が必要です.データの最初の列は0または1の数値のみで構成されます.この条件をフィルタとして使用して、列の部分を除外します.
    csv_data_2 = csv_data_1.filter(lambda line: line[0].isdecimal())
    # 첫 번째 컬럼이 숫자인 것만 필터링
    csv_data_2.take(5)
    
    [['0',
      'male',
      '22.0',
      '1',
      '0',
      '7.25',
      'Third',
      'unknown',
      'Southampton',
      'n'],
     ['1', 'female', '38.0', '1', '0', '71.2833', 'First', 'C', 'Cherbourg', 'n'],
     ['1',
      'female',
      '26.0',
      '0',
      '0',
      '7.925',
      'Third',
      'unknown',
      'Southampton',
      'y'],
     ['1', 'female', '35.0', '1', '0', '53.1', 'First', 'C', 'Southampton', 'n'],
     ['0',
      'male',
      '28.0',
      '0',
      '0',
      '8.4583',
      'Third',
      'unknown',
      'Queenstown',
      'y']]
    今、列を基準にcsv data 2を整理します!
    csv_data_3 = csv_data_2.map(lambda line: [(columns[0][i], linedata) for i, linedata in enumerate(line)]
    csv_data_3.take(5)
    
    [[('survived', '0'),
      ('sex', 'male'),
      ('age', '22.0'),
      ('n_siblings_spouses', '1'),
      ('parch', '0'),
      ('fare', '7.25'),
      ('class', 'Third'),
      ('deck', 'unknown'),
      ('embark_town', 'Southampton'),
      ('alone', 'n')],
     [('survived', '1'),
      ('sex', 'female'),
      ('age', '38.0'),
      ('n_siblings_spouses', '1'),
      ('parch', '0'),
      ('fare', '71.2833'),
      ('class', 'First'),
      ('deck', 'C'),
      ('embark_town', 'Cherbourg'),
      ('alone', 'n')],
     [('survived', '1'),
      ('sex', 'female'),
      ('age', '26.0'),
      ('n_siblings_spouses', '0'),
      ('parch', '0'),
      ('fare', '7.925'),
      ('class', 'Third'),
      ('deck', 'unknown'),
      ('embark_town', 'Southampton'),
      ('alone', 'y')],
     [('survived', '1'),
      ('sex', 'female'),
      ('age', '35.0'),
      ('n_siblings_spouses', '1'),
      ('parch', '0'),
      ('fare', '53.1'),
      ('class', 'First'),
      ('deck', 'C'),
      ('embark_town', 'Southampton'),
      ('alone', 'n')],
     [('survived', '0'),
      ('sex', 'male'),
      ('age', '28.0'),
      ('n_siblings_spouses', '0'),
      ('parch', '0'),
      ('fare', '8.4583'),
      ('class', 'Third'),
      ('deck', 'unknown'),
      ('embark_town', 'Queenstown'),
      ('alone', 'y')]]
    必要に応じてcsvファイルを加工しました.次に,Actions関数を適用して種々の解析を行う.

    csvファイルをDataFrameに読み込む方法

    from pyspark import SparkConf, SparkContext, SQLContext
    from pyspark import SparkFiles
    
    url = 'https://storage.googleapis.com/tf-datasets/titanic/train.csv'
    sc.addFile(url)
    sqlContext = SQLContext(sc)
    
    df = sqlContext.read.csv(SparkFile.get("train.csv"), header=True, inferSchema = True)
    df.show(5, truncate = False)
    
    
    +--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
    |survived|sex   |age |n_siblings_spouses|parch|fare   |class|deck   |embark_town|alone|
    +--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
    |0       |male  |22.0|1                 |0    |7.25   |Third|unknown|Southampton|n    |
    |1       |female|38.0|1                 |0    |71.2833|First|C      |Cherbourg  |n    |
    |1       |female|26.0|0                 |0    |7.925  |Third|unknown|Southampton|y    |
    |1       |female|35.0|1                 |0    |53.1   |First|C      |Southampton|n    |
    |0       |male  |28.0|0                 |0    |8.4583 |Third|unknown|Queenstown |y    |
    +--------+------+----+------------------+-----+-------+-----+-------+-----------+-----+
    only showing top 5 rows
    
    我々が使用したSparkContextが再加工されたSQLCContextが提供するreadcsv()関数を使用してスパークのDataFrameを得ることができます.
    見慣れているのはPandasのDataFrameに似ているからです.とても似ています.実際、SQL Contextは、RDDを使用してデータを分析するよりも便利で強力な機能を提供しています.
    Spark SQL, DataFrames and Datasets Guidel
    # 위에서 얻은 데이터에서 40세 이상인 사람들의 데이터만 필터링해 봅시다.
    df2 = df[df['age']>40]
    df2.show(5, truncate = False)
    
    +--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
    |survived|sex   |age |n_siblings_spouses|parch|fare   |class |deck   |embark_town|alone|
    +--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
    |0       |male  |66.0|0                 |0    |10.5   |Second|unknown|Southampton|y    |
    |0       |male  |42.0|1                 |0    |52.0   |First |unknown|Southampton|n    |
    |1       |female|49.0|1                 |0    |76.7292|First |D      |Cherbourg  |n    |
    |0       |male  |65.0|0                 |1    |61.9792|First |B      |Cherbourg  |n    |
    |0       |male  |45.0|1                 |0    |83.475 |First |C      |Southampton|n    |
    +--------+------+----+------------------+-----+-------+------+-------+-----------+-----+
    only showing top 5 rows

    RDD-その他の転送機能


    RDD Programming Guide