[ビッグデータシステム]DataFrame-2

44931 ワード


Temporary Views

  • データフレームを作成した後、sqlのフォーマットを使用できます.
  • spark sessionが終わってからなくなりました.
  • peopleDF.createOrReplaceTempView("People10M")
    # To view the contents of temporary view, use select notation
    spark.sql("select * from People10M where firstName = 'Donna'")
    DataFrame[id: int, firstName: string, middleName: string, lastName: string, gender: string, birthDate: timestamp, ssn: string, salary: int]
    display(spark.sql("select * from People10M where firstName = 'Donna'"))
    DataFrame[id: int, firstName: string, middleName: string, lastName: string, gender: string, birthDate: timestamp, ssn: string, salary: int]
    womenBornAfter1990DF = peopleDF.\
                    select("firstName", "middleName", "lastName", year('birthDate').alias("birthYear")).\
                    filter("birthYear > '1990'").\
                    filter("gender = 'F'")
    display(womenBornAfter1990DF) # dataframe으로 확인할 수 있음
    DataFrame[firstName: string, middleName: string, lastName: string, birthYear: int]
    #Create Temporary Views from the womenBornAfter1990DF Dataframe
    
    womenBornAfter1990DF.createOrReplaceTempView('womenBornAfter1990')
    spark.sql("select count(*) from womenBornAfter1990 where firstName = 'Mary'").show()
    +--------+
    |count(1)|
    +--------+
    |     268|
    +--------+

    Exercise1



    step 1


    Create a DataFrame called top10FremaleFirstNamesDF and display the results.
    from pyspark.sql.functions import count, desc
    
    top10FemaleFirstNamesDF = (peopleDF
                                    .select("firstName")
                                    .filter("gender == 'F'")
                                    .groupBy("firstName")
                                    .agg(count(col('firstname')).alias('total'))
                                    .orderBy((desc('total')))
                                    .limit(10)
                              )
    
    top10FemaleFirstNamesDF.show()
    +---------+-----+
    |firstName|total|
    +---------+-----+
    |   Sharyn| 1394|
    |  Lashell| 1387|
    |    Alice| 1384|
    |  Lucille| 1384|
    |    Louie| 1382|
    |Jacquelyn| 1381|
    |  Cristen| 1375|
    | Katherin| 1373|
    |Bridgette| 1373|
    |   Alesha| 1368|
    +---------+-----+

    step 2

    top10FemaleNamesDF = top10FemaleFirstNamesDF.orderBy("firstName")
    
    display(top10FemaleNamesDF)
    DataFrame[firstName: string, total: bigint]
    # 위와 같은 기능
    
    top10FemaleFirstNamesDF.createOrReplaceTempView("Top10FemaleFirstNames")
    resultsDF = spark.sql("SELECT * FROM Top10FemaleFirstNames ORDER BY firstName")
    display(resultsDF)
    
    
    resultsDF.show()
    DataFrame[firstName: string, total: bigint]
    
    
    +---------+-----+
    |firstName|total|
    +---------+-----+
    |   Alesha| 1368|
    |    Alice| 1384|
    |Bridgette| 1373|
    |  Cristen| 1375|
    |Jacquelyn| 1381|
    | Katherin| 1373|
    |  Lashell| 1387|
    |    Louie| 1382|
    |  Lucille| 1384|
    |   Sharyn| 1394|
    +---------+-----+

    Pyspark Join

    valuesA = [('Pirate',1), ('Monkey', 2), ('Ninja', 3), ('Spaghetti', 4)]
    TableA = spark.createDataFrame(valuesA, ['name', 'id'])
    
    valuesB = [('Rutabaga', 1), ('Pirate', 2), ('Ninja',3), ('Darth Vader', 4)]
    TableB = spark.createDataFrame(valuesB, ['name','id'])
    
    TableA.show()
    TableB.show()
    +---------+---+
    |     name| id|
    +---------+---+
    |   Pirate|  1|
    |   Monkey|  2|
    |    Ninja|  3|
    |Spaghetti|  4|
    +---------+---+
    
    +-----------+---+
    |       name| id|
    +-----------+---+
    |   Rutabaga|  1|
    |     Pirate|  2|
    |      Ninja|  3|
    |Darth Vader|  4|
    +-----------+---+

    innerJoin 2つのテーブルの共通項目の接続
    左Joinの左テーブルはキー、join
    rightJoinの右のテーブルはキー、join
    ないのはnull

    InnerJoin

    ta = TableA.alias('ta') # ta로 alias
    tb = TableB.alias('tb')
    inner_join = ta.join(tb, ta.name == tb.name) # join 명령어와 조건을 이용하여 ta와 tb를 join
    inner_join.show()
    +------+---+------+---+
    |  name| id|  name| id|
    +------+---+------+---+
    | Ninja|  3| Ninja|  3|
    |Pirate|  1|Pirate|  2|
    +------+---+------+---+

    LeftJoin

    left_join = ta.join(tb, ta.name == tb.name, how='left') # Colud also use 'left_outer'
                                                            # how를 이용하여 left 명시
    left_join.show()
    +---------+---+------+----+
    |     name| id|  name|  id|
    +---------+---+------+----+
    |Spaghetti|  4|  null|null|
    |    Ninja|  3| Ninja|   3|
    |   Pirate|  1|Pirate|   2|
    |   Monkey|  2|  null|null|
    +---------+---+------+----+
    from pyspark.sql.functions import col
    left_join = ta.join(tb, ta.name == tb.name, how='left')
    left_join.filter(col('tb.name').isNull()).show() ## null을 포함하는 행을 출력
    +---------+---+----+----+
    |     name| id|name|  id|
    +---------+---+----+----+
    |Spaghetti|  4|null|null|
    |   Monkey|  2|null|null|
    +---------+---+----+----+

    RightJoin

    right_join = ta.join(tb, ta.name == tb.name, how='right') # Colud also use 'right_outer'
    right_join.show()
    +------+----+-----------+---+
    |  name|  id|       name| id|
    +------+----+-----------+---+
    |  null|null|   Rutabaga|  1|
    | Ninja|   3|      Ninja|  3|
    |Pirate|   1|     Pirate|  2|
    |  null|null|Darth Vader|  4|
    +------+----+-----------+---+

    OuterJoin


    full outer joinを使用すると、左右の2つのテーブルが同時に表示されます.
    左joinと右joinを使用した結果
    full_outer_join = ta.join(tb, ta.name == tb.name, how='full') # Colud also use 'full_outer'
    full_outer_join.show()
    +---------+----+-----------+----+
    |     name|  id|       name|  id|
    +---------+----+-----------+----+
    |     null|null|   Rutabaga|   1|
    |Spaghetti|   4|       null|null|
    |    Ninja|   3|      Ninja|   3|
    |   Pirate|   1|     Pirate|   2|
    |   Monkey|   2|       null|null|
    |     null|null|Darth Vader|   4|
    +---------+----+-----------+----+
    フィールドの名前が同じ場合は、混乱する可能性があります.
    ta.name, tb.名前などの名前付きフィールドの名前を変更することをお勧めします.
    inner_join = ta.join(tb, ta.name == tb.name)
    inner_join.show()
    +------+---+------+---+
    |  name| id|  name| id|
    +------+---+------+---+
    | Ninja|  3| Ninja|  3|
    |Pirate|  1|Pirate|  2|
    +------+---+------+---+
    llist = [('bob', '2015-01-13', 4), ('alice', '2015-04-23', 10)]
    left = spark.createDataFrame(llist, ['name', 'date', 'duration'])
    right = spark.createDataFrame([('alice', 100), ('bob', 23)], ['name','upload'])
    
    df = left.join(right, left.name == right.name)
    display(df)
    
    df.show()
    
    ## 스키마가 달라도 join 하기에는 상관이 없음, 공통적인 필드만 존재하면 된다.
    DataFrame[name: string, date: string, duration: bigint, name: string, upload: bigint]
    
    
    +-----+----------+--------+-----+------+
    | name|      date|duration| name|upload|
    +-----+----------+--------+-----+------+
    |alice|2015-04-23|      10|alice|   100|
    |  bob|2015-01-13|       4|  bob|    23|
    +-----+----------+--------+-----+------+
    # 중복된 name 칼럼이 없어진다.
    df = left.join(right, on = "name")
    df.show()
    +-----+----------+--------+------+
    | name|      date|duration|upload|
    +-----+----------+--------+------+
    |alice|2015-04-23|      10|   100|
    |  bob|2015-01-13|       4|    23|
    +-----+----------+--------+------+
    # 중복된 name 칼럼이 없어진다.
    df = left.join(right, ["name"])
    df.show()
    +-----+----------+--------+------+
    | name|      date|duration|upload|
    +-----+----------+--------+------+
    |alice|2015-04-23|      10|   100|
    |  bob|2015-01-13|       4|    23|
    +-----+----------+--------+------+

    Preprocessing peopled: avg, round, min, max, col


    avg

    # avg
    from pyspark.sql.functions import avg
    avgSalaryDF = peopleDF.select(avg("salary").alias("averageSalary"))
    avgSalaryDF.show()
    +-------------+
    |averageSalary|
    +-------------+
    |72633.0076033|
    +-------------+

    round

    # round
    from pyspark.sql.functions import round
    roundedAvgSalaryDF = avgSalaryDF.select(round("averageSalary").alias("roundedAverageSalary"))
    roundedAvgSalaryDF.show()
    +--------------------+
    |roundedAverageSalary|
    +--------------------+
    |             72633.0|
    +--------------------+

    min, max

    # max, min
    from pyspark.sql.functions import min, max
    salaryDF = peopleDF.select(max("salary").alias("max"), min("salary").alias("min"),\
                              round(avg("salary")).alias("averageSalary"))
    salaryDF.show()
    +------+------+-------------+
    |   max|   min|averageSalary|
    +------+------+-------------+
    |180841|-26884|      72633.0|
    +------+------+-------------+
    # distinct
    print(peopleDF.count())
    peopleDFDistinctNamesDF = peopleDF.select("firstName").distinct()
    print(peopleDFDistinctNamesDF.count())
    peopleDFDistinctNamesDF.show(1)
    10000000
    5113
    +---------+
    |firstName|
    +---------+
    |   Alayna|
    +---------+
    only showing top 1 row

    ColumnRenamed

    peopleDFDistinctNamesDF = peopleDF.select("firstName")\
                        .withColumnRenamed("firstName", "peopleFirstName").distinct() # firstName -> peopleFirstName
    print(peopleDFDistinctNamesDF.count())
    peopleDFDistinctNamesDF.show(3)
    5113
    +---------------+
    |peopleFirstName|
    +---------------+
    |         Alayna|
    |        Melaine|
    |           Faye|
    +---------------+
    only showing top 3 rows

    col

    a = peopleDF.select("salary").show(3)
    b = peopleDF.select(col("salary")).show(3)
    +------+
    |salary|
    +------+
    | 56172|
    | 40203|
    | 53417|
    +------+
    only showing top 3 rows
    
    +------+
    |salary|
    +------+
    | 56172|
    | 40203|
    | 53417|
    +------+
    only showing top 3 rows
    peopleDF.select(avg("salary")).show(3)
    peopleDF.select(avg(col("salary"))).show(3)
    +-------------+
    |  avg(salary)|
    +-------------+
    |72633.0076033|
    +-------------+
    
    +-------------+
    |  avg(salary)|
    +-------------+
    |72633.0076033|
    +-------------+
    peopleDF.select(round("salary")).show(3)
    peopleDF.select(round(col("salary"))).show(3)
    +----------------+
    |round(salary, 0)|
    +----------------+
    |           56172|
    |           40203|
    |           53417|
    +----------------+
    only showing top 3 rows
    
    +----------------+
    |round(salary, 0)|
    +----------------+
    |           56172|
    |           40203|
    |           53417|
    +----------------+
    only showing top 3 rows
    peopleDF.select(min("salary")).show(3)
    peopleDF.select(min(col("salary"))).show(3)
    +-----------+
    |min(salary)|
    +-----------+
    |     -26884|
    +-----------+
    
    +-----------+
    |min(salary)|
    +-----------+
    |     -26884|
    +-----------+
  • abs,上等文字列をパラメータとして受け入れない->colはパラメータ
  • を渡す必要がある.
    from pyspark.sql.functions import abs
    #peopleDF.select(abs("salary")).show(3) # error
    
    peopleDF.select(abs(col("salary"))).show(3)
    +-----------+
    |abs(salary)|
    +-----------+
    |      56172|
    |      40203|
    |      53417|
    +-----------+
    only showing top 3 rows
    from pyspark.sql.functions import upper
    
    peopleDF.select(upper(col("firstName"))).show(3)
    +----------------+
    |upper(firstName)|
    +----------------+
    |          PENNIE|
    |              AN|
    |           QUYEN|
    +----------------+
    only showing top 3 rows

    salary sorting

    from pyspark.sql.functions import abs
    peopleWithFixedSalariesDF = peopleDF.select("firstName", "middleName",\
                                               "lastName", "gender", "birthDate", "ssn", abs(col("salary")).alias("salary"))
    peopleDFDistinctNamesDF.count()
    5113
    peopleWithFixedSalariesDF.show(3)
    +---------+----------+----------+------+-------------------+-----------+------+
    |firstName|middleName|  lastName|gender|          birthDate|        ssn|salary|
    +---------+----------+----------+------+-------------------+-----------+------+
    |   Pennie|     Carry|Hirschmann|     F|1955-07-02 13:30:00|981-43-9345| 56172|
    |       An|     Amira|    Cowper|     F|1992-02-08 14:00:00|978-97-8086| 40203|
    |    Quyen|    Marlen|      Dome|     F|1970-10-11 13:00:00|957-57-8246| 53417|
    +---------+----------+----------+------+-------------------+-----------+------+
    only showing top 3 rows
    peopleWithFixedSalariesSortedDF = peopleWithFixedSalariesDF.select("*").\
                        orderBy("salary").limit(20)
    peopleWithFixedSalariesSortedDF.show(4)
    +---------+----------+--------+------+-------------------+-----------+------+
    |firstName|middleName|lastName|gender|          birthDate|        ssn|salary|
    +---------+----------+--------+------+-------------------+-----------+------+
    |   Janene|      Lili|   Prinn|     F|1986-04-06 14:00:00|923-50-6804|     2|
    |    Brook|  Winifred| Durnell|     F|1999-09-07 13:00:00|989-18-7019|     3|
    |   Garret|   Garrett| Ashling|     M|1959-10-19 12:30:00|918-39-6461|     4|
    |  Doloris|  Domenica|   Matic|     F|1984-08-03 13:00:00|928-53-3688|     5|
    +---------+----------+--------+------+-------------------+-----------+------+
    only showing top 4 rows
    peopleWithFixedSalariesSortedDF = peopleWithFixedSalariesDF.select("*").\
                        orderBy("salary", ascending=False).limit(20).show(4)
    +---------+----------+---------+------+-------------------+-----------+------+
    |firstName|middleName| lastName|gender|          birthDate|        ssn|salary|
    +---------+----------+---------+------+-------------------+-----------+------+
    |  Belinda|     Talia|  Jessard|     F|1955-05-15 13:30:00|966-31-6469|180841|
    |    Shena|     Patty| Grinston|     F|1989-09-02 13:00:00|980-60-8702|173969|
    | Courtney|       Kip|    Liell|     M|1986-12-06 14:00:00|981-68-2592|170562|
    | Clarence|     Allan|MacDuffie|     M|1967-07-02 13:00:00|924-59-6873|170371|
    +---------+----------+---------+------+-------------------+-----------+------+
    only showing top 4 rows