Sparkシミュレーションでは、ドメイン名ごとに下のアクセス数が最も多い上位3つのURLを統計します.


Sparkシミュレーションでは、ドメイン名ごとに下のアクセス数が最も多い上位3つのURLを統計します.
一、需要:
現在、Java、PHP、netなどの複数のコラムがあるIT教育サイトを想定しています.以下はシミュレーション実装のサイトログです.
1番目のフィールドはアクセス日、2番目のフィールドはアクセスURLで、各フィールドには次のような独立したドメイン名があります.
java.aaaaaaa.cn net.aaaaaaa.cn php.aaaaaaa.cn
各ドメイン名の下で最もアクセス数の多い最初の3つのURLを統計します.
二、純rdd方法:
1、最も簡単で利用可能なコード:
from urlparse import urlparse
top_k=3
sc.textFile('/home/hdit/workdir/yhe/test/data.txt')\
    .filter(lambda line :len(line.split('  ')) == 2)\
    .map(lambda line:((urlparse(line.split('  ')[1]).netloc, line.split('  ')[1]), 1))\
    .reduceByKey(lambda v1, v2: v1+v2)\
    .map(lambda line: (line[0][0], [(line[0][1], line[1])]))\
    .reduceByKey(lambda v1, v2: v1+v2)\
    .map(lambda line: (line[0], sorted(line[1], key=lambda b: b[1], reverse=True)[:top_k]))\
    .flatMap(lambda line: [(line[0], str(p[0]), str(p[1])) for p in line[1]])\
    .map(lambda line: "\t".join(line)).saveAsTextFile('/home/hdit/workdir/yhe/test/result1.txt')

2、以下はテストコーディングプロセスです.
def split_data(line):
     k,v_list=line
     return_list=[]
     for v in v_list:
             return_list.append((k,v))
     return return_list

from urlparse import urlparse
top_k=3
rdd=sc.textFile('/home/hdit/workdir/yhe/test/data.txt')\
    .filter(lambda line :len(line.split('  ')) == 2)\
    .map(lambda line:((urlparse(line.split('  ')[1]).netloc, line.split('  ')[1]), 1))\
    .reduceByKey(lambda v1, v2: v1+v2)\
    .map(lambda line: (line[0][0], [(line[0][1], line[1])]))\
    .reduceByKey(lambda v1, v2: v1+v2)\
    .map(lambda line: (line[0], sorted(line[1], key=lambda b: b[1], reverse=True)[:top_k]))

'''
>>> rdd.map(split_data).collect()
[[(u'php.aaaaaaa.cn', (u'http://php.aaaaaaa.cn/php/teacher.shtml', 7)), (u'php.aaaaaaa.cn', (u'http://php.aaaaaaa.cn/php/video.shtml', 5)), (u'php.aaaaaaa.cn', (u'http://php.aaaaaaa.cn/php/course.shtml', 3))], [(u'java.aaaaaaa.cn', (u'http://java.aaaaaaa.cn/java/course/javaee.shtml', 7)), (u'java.aaaaaaa.cn', (u'http://java.aaaaaaa.cn/java/course/cloud.shtml', 7)), (u'java.aaaaaaa.cn', (u'http://java.aaaaaaa.cn/java/course/android.shtml', 6))], [(u'net.aaaaaaa.cn', (u'http://net.aaaaaaa.cn/net/teacher.shtml', 10)), (u'net.aaaaaaa.cn', (u'http://net.aaaaaaa.cn/net/video.shtml', 8)), (u'net.aaaaaaa.cn', (u'http://net.aaaaaaa.cn/net/course.shtml', 7))]]
>>> rdd.flatMap(split_data).collect()
[(u'php.aaaaaaa.cn', (u'http://php.aaaaaaa.cn/php/teacher.shtml', 7)), (u'php.aaaaaaa.cn', (u'http://php.aaaaaaa.cn/php/video.shtml', 5)), (u'php.aaaaaaa.cn', (u'http://php.aaaaaaa.cn/php/course.shtml', 3)), (u'java.aaaaaaa.cn', (u'http://java.aaaaaaa.cn/java/course/javaee.shtml', 7)), (u'java.aaaaaaa.cn', (u'http://java.aaaaaaa.cn/java/course/cloud.shtml', 7)), (u'java.aaaaaaa.cn', (u'http://java.aaaaaaa.cn/java/course/android.shtml', 6)), (u'net.aaaaaaa.cn', (u'http://net.aaaaaaa.cn/net/teacher.shtml', 10)), (u'net.aaaaaaa.cn', (u'http://net.aaaaaaa.cn/net/video.shtml', 8)), (u'net.aaaaaaa.cn', (u'http://net.aaaaaaa.cn/net/course.shtml', 7))]
'''

def split_data(line):
    k, v_list = line
    return_list = []
    for v in v_list:
        return_list.append('\t'.join([k, v[0], str(v[1])]))
    return return_list
#    split_data        :lambda line: [(line[0], str(p[0]), str(p[1])) for p in line[1]]
rdd.flatMap(split_data).saveAsTextFile('/home/hdit/workdir/yhe/test/result.txt')

三、rdd+dataframe方法:
1、最も簡単で利用可能なコード:
rdd = sc.textFile('/home/hdit/workdir/yhe/test/data.txt')\
     .filter(lambda line: len(line.split('  ')) == 2)\
     .map(lambda line: ((urlparse(line.split('  ')[1]).netloc, line.split('  ')[1]), 1))\
     .reduceByKey(lambda v1, v2: v1+v2)\
     .map(lambda line: (line[0][0], line[0][1], line[1]))
pp = sqlContext.createDataFrame(rdd, ["domain_name", "url", "visit_num"])\
    .registerTempTable("visit_tb")
ee = sqlContext.sql("select * from (select domain_name,url,visit_num,row_number() over(partition by domain_name order by visit_num desc) rk from visit_tb)aa where rk<4")
res_rdd = ee.rdd\
            .map(lambda line: (line.domain_name, line.url, line.visit_num, line.rk))\
            .map(lambda line: "\t".join([line[0], line[1], str(line[2]), str(line[3])]))\
            .saveAsTextFile('/home/hdit/workdir/yhe/test/202001.res')

2、テストプロセス
#!/usr/bin/python2.6
# -*- coding:utf-8 -*-
from pyspark import SparkConf, SparkContext, HiveContext
conf = SparkConf().setAppName('spark_rdd_dataframe_get_top_visit_page')
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)

rdd=sc.textFile('/home/hdit/workdir/yhe/test/data.txt')\
     .filter(lambda line :len(line.split('  ')) == 2)\
     .map(lambda line:((urlparse(line.split('  ')[1]).netloc, line.split('  ')[1]), 1))\
     .reduceByKey(lambda v1,v2:v1+v2)\
     .map(lambda line:(line[0][0],line[0][1],line[1]))
pp=sqlContext.createDataFrame(rdd, ["domain_name", "url","visit_num"])
pp.registerTempTable("visit_tb")
'''
>>> pp
DataFrame[domain_name: string, url: string, visit_num: bigint]
>>> pp=sqlContext.createDataFrame(rdd, ["domain_name", "url","visit_num"])
>>> pp
DataFrame[domain_name: string, url: string, visit_num: bigint]
>>> pp.show()
+---------------+--------------------+---------+
|    domain_name|                 url|visit_num|
+---------------+--------------------+---------+
|java.aaaaaaa.cn|http://java.aaaaa...|        7|
|java.aaaaaaa.cn|http://java.aaaaa...|        7|
|java.aaaaaaa.cn|http://java.aaaaa...|        4|
| net.aaaaaaa.cn|http://net.aaaaaa...|        7|
| net.aaaaaaa.cn|http://net.aaaaaa...|       10|
|java.aaaaaaa.cn|http://java.aaaaa...|        6|
| php.aaaaaaa.cn|http://php.aaaaaa...|        5|
|java.aaaaaaa.cn|http://java.aaaaa...|        3|
| php.aaaaaaa.cn|http://php.aaaaaa...|        7|
|java.aaaaaaa.cn|http://java.aaaaa...|        5|
|java.aaaaaaa.cn|http://java.aaaaa...|        6|
| net.aaaaaaa.cn|http://net.aaaaaa...|        8|
| php.aaaaaaa.cn|http://php.aaaaaa...|        3|
|java.aaaaaaa.cn|http://java.aaaaa...|        5|
+---------------+--------------------+---------+

>>> pp.registerTempTable("visit_tb")
>>> ee=sqlContext.sql("select * from visit_tb")
>>> ee.show()
+---------------+--------------------+---------+
|    domain_name|                 url|visit_num|
+---------------+--------------------+---------+
|java.aaaaaaa.cn|http://java.aaaaa...|        7|
|java.aaaaaaa.cn|http://java.aaaaa...|        7|
|java.aaaaaaa.cn|http://java.aaaaa...|        4|
| net.aaaaaaa.cn|http://net.aaaaaa...|        7|
|java.aaaaaaa.cn|http://java.aaaaa...|        3|
|java.aaaaaaa.cn|http://java.aaaaa...|        6|
| php.aaaaaaa.cn|http://php.aaaaaa...|        5|
| net.aaaaaaa.cn|http://net.aaaaaa...|       10|
| php.aaaaaaa.cn|http://php.aaaaaa...|        7|
|java.aaaaaaa.cn|http://java.aaaaa...|        5|
|java.aaaaaaa.cn|http://java.aaaaa...|        6|
| net.aaaaaaa.cn|http://net.aaaaaa...|        8|
| php.aaaaaaa.cn|http://php.aaaaaa...|        3|
|java.aaaaaaa.cn|http://java.aaaaa...|        5|
+---------------+--------------------+---------+
>>> ee=sqlContext.sql("select domain_name,url,visit_num,row_number() over(partition by domain_name order by visit_num desc) rk from visit_tb ")
>>> ee.show()
+---------------+--------------------+---------+---+
|    domain_name|                 url|visit_num| rk|
+---------------+--------------------+---------+---+
| net.aaaaaaa.cn|http://net.aaaaaa...|       10|  1|
| net.aaaaaaa.cn|http://net.aaaaaa...|        8|  2|
| net.aaaaaaa.cn|http://net.aaaaaa...|        7|  3|
| php.aaaaaaa.cn|http://php.aaaaaa...|        7|  1|
| php.aaaaaaa.cn|http://php.aaaaaa...|        5|  2|
| php.aaaaaaa.cn|http://php.aaaaaa...|        3|  3|
|java.aaaaaaa.cn|http://java.aaaaa...|        7|  1|
|java.aaaaaaa.cn|http://java.aaaaa...|        7|  2|
|java.aaaaaaa.cn|http://java.aaaaa...|        6|  3|
|java.aaaaaaa.cn|http://java.aaaaa...|        6|  4|
|java.aaaaaaa.cn|http://java.aaaaa...|        5|  5|
|java.aaaaaaa.cn|http://java.aaaaa...|        5|  6|
|java.aaaaaaa.cn|http://java.aaaaa...|        4|  7|
|java.aaaaaaa.cn|http://java.aaaaa...|        3|  8|
+---------------+--------------------+---------+---+
'''
ee=sqlContext.sql("select * from (select domain_name,url,visit_num,row_number() over(partition by domain_name order by visit_num desc) rk from visit_tb)aa where rk<4")
'''
>>> ee.show()
+---------------+--------------------+---------+---+
|    domain_name|                 url|visit_num| rk|
+---------------+--------------------+---------+---+
| net.aaaaaaa.cn|http://net.aaaaaa...|       10|  1|
| net.aaaaaaa.cn|http://net.aaaaaa...|        8|  2|
| net.aaaaaaa.cn|http://net.aaaaaa...|        7|  3|
| php.aaaaaaa.cn|http://php.aaaaaa...|        7|  1|
| php.aaaaaaa.cn|http://php.aaaaaa...|        5|  2|
| php.aaaaaaa.cn|http://php.aaaaaa...|        3|  3|
|java.aaaaaaa.cn|http://java.aaaaa...|        7|  1|
|java.aaaaaaa.cn|http://java.aaaaa...|        7|  2|
|java.aaaaaaa.cn|http://java.aaaaa...|        6|  3|
+---------------+--------------------+---------+---+

>>> ee.rdd.collect()
[Row(domain_name=u'net.aaaaaaa.cn', url=u'http://net.aaaaaaa.cn/net/teacher.shtml', visit_num=10, rk=1), Row(domain_name=u'net.aaaaaaa.cn', url=u'http://net.aaaaaaa.cn/net/video.shtml', visit_num=8, rk=2), Row(domain_name=u'net.aaaaaaa.cn', url=u'http://net.aaaaaaa.cn/net/course.shtml', visit_num=7, rk=3), Row(domain_name=u'php.aaaaaaa.cn', url=u'http://php.aaaaaaa.cn/php/teacher.shtml', visit_num=7, rk=1), Row(domain_name=u'php.aaaaaaa.cn', url=u'http://php.aaaaaaa.cn/php/video.shtml', visit_num=5, rk=2), Row(domain_name=u'php.aaaaaaa.cn', url=u'http://php.aaaaaaa.cn/php/course.shtml', visit_num=3, rk=3), Row(domain_name=u'java.aaaaaaa.cn', url=u'http://java.aaaaaaa.cn/java/course/javaee.shtml', visit_num=7, rk=1), Row(domain_name=u'java.aaaaaaa.cn', url=u'http://java.aaaaaaa.cn/java/course/cloud.shtml', visit_num=7, rk=2), Row(domain_name=u'java.aaaaaaa.cn', url=u'http://java.aaaaaaa.cn/java/course/base.shtml', visit_num=6, rk=3)]
>>> ee.rdd.map(lambda line:(line.domain_name,line.url,line.visit_num,line.rk)).collect()
[(u'net.aaaaaaa.cn', u'http://net.aaaaaaa.cn/net/teacher.shtml', 10, 1), (u'net.aaaaaaa.cn', u'http://net.aaaaaaa.cn/net/video.shtml', 8, 2), (u'net.aaaaaaa.cn', u'http://net.aaaaaaa.cn/net/course.shtml', 7, 3), (u'php.aaaaaaa.cn', u'http://php.aaaaaaa.cn/php/teacher.shtml', 7, 1), (u'php.aaaaaaa.cn', u'http://php.aaaaaaa.cn/php/video.shtml', 5, 2), (u'php.aaaaaaa.cn', u'http://php.aaaaaaa.cn/php/course.shtml', 3, 3), (u'java.aaaaaaa.cn', u'http://java.aaaaaaa.cn/java/course/javaee.shtml', 7, 1), (u'java.aaaaaaa.cn', u'http://java.aaaaaaa.cn/java/course/cloud.shtml', 7, 2), (u'java.aaaaaaa.cn', u'http://java.aaaaaaa.cn/java/course/base.shtml', 6, 3)]
>>>
'''
res_rdd= ee.rdd.map(lambda line:(line.domain_name, line.url, line.visit_num, line.rk)).map(lambda line: "\t".join([line[0],line[1],str(line[2]),str(line[3])]))
# >>> res_rdd.collect()
# [u'net.aaaaaaa.cn\thttp://net.aaaaaaa.cn/net/teacher.shtml\t10\t1', u'net.aaaaaaa.cn\thttp://net.aaaaaaa.cn/net/video.shtml\t8\t2', u'net.aaaaaaa.cn\thttp://net.aaaaaaa.cn/net/course.shtml\t7\t3', u'php.aaaaaaa.cn\thttp://php.aaaaaaa.cn/php/teacher.shtml\t7\t1', u'php.aaaaaaa.cn\thttp://php.aaaaaaa.cn/php/video.shtml\t5\t2', u'php.aaaaaaa.cn\thttp://php.aaaaaaa.cn/php/course.shtml\t3\t3', u'java.aaaaaaa.cn\thttp://java.aaaaaaa.cn/java/course/javaee.shtml\t7\t1', u'java.aaaaaaa.cn\thttp://java.aaaaaaa.cn/java/course/cloud.shtml\t7\t2', u'java.aaaaaaa.cn\thttp://java.aaaaaaa.cn/java/course/base.shtml\t6\t3']

res_rdd.saveAsTextFile('/home/hdit/workdir/yhe/test/202001.res')

四、データdata.txtの内容は以下の通りです.
20160321101954  http://java.aaaaaaa.cn/java/course/javaeeadvanced.shtml 20160321101954  http://java.aaaaaaa.cn/java/course/javaee.shtml 20160321101954  http://java.aaaaaaa.cn/java/course/android.shtml 20160321101954  http://java.aaaaaaa.cn/java/video.shtml 20160321101954  http://java.aaaaaaa.cn/java/teacher.shtml 20160321101954  http://java.aaaaaaa.cn/java/course/android.shtml 20160321101954  http://php.aaaaaaa.cn/php/teacher.shtml 20160321101954  http://net.aaaaaaa.cn/net/teacher.shtml 20160321101954  http://java.aaaaaaa.cn/java/course/hadoop.shtml 20160321101954  http://java.aaaaaaa.cn/java/course/base.shtml 20160321101954  http://net.aaaaaaa.cn/net/course.shtml 20160321101954  http://php.aaaaaaa.cn/php/teacher.shtml 20160321101954  http://net.aaaaaaa.cn/net/video.shtml 20160321101954  http://java.aaaaaaa.cn/java/course/base.shtml 20160321101954  http://net.aaaaaaa.cn/net/teacher.shtml 20160321101954  http://java.aaaaaaa.cn/java/video.shtml 20160321101954  http://java.aaaaaaa.cn/java/video.shtml 20160321101954  http://net.aaaaaaa.cn/net/video.shtml 20160321101954  http://net.aaaaaaa.cn/net/course.shtml 20160321101954  http://java.aaaaaaa.cn/java/course/javaee.shtml 20160321101954  http://java.aaaaaaa.cn/java/course/android.shtml 20160321101955  http://php.aaaaaaa.cn/php/course.shtml 20160321101955  http://net.aaaaaaa.cn/net/teacher.shtml 20160321101955  http://php.aaaaaaa.cn/php/teacher.shtml 20160321101955  http://java.aaaaaaa.cn/java/course/base.shtml 20160321101955  http://net.aaaaaaa.cn/net/teacher.shtml 20160321101955  http://java.aaaaaaa.cn/java/course/javaee.shtml 20160321101955  http://php.aaaaaaa.cn/php/video.shtml 20160321101955  http://net.aaaaaaa.cn/net/course.shtml 20160321101955  http://php.aaaaaaa.cn/php/video.shtml 20160321101955  http://java.aaaaaaa.cn/java/course/android.shtml 20160321101955  http://java.aaaaaaa.cn/java/course/javaee.shtml 20160321101955  http://java.aaaaaaa.cn/java/course/javaee.shtml 20160321101955  http://net.aaaaaaa.cn/net/video.shtml 20160321101955  http://net.aaaaaaa.cn/net/teacher.shtml 20160321101955  http://java.aaaaaaa.cn/java/teacher.shtml 20160321101955  http://java.aaaaaaa.cn/java/course/android.shtml 20160321101955  http://java.aaaaaaa.cn/java/course/javaee.shtml 20160321101955  http://java.aaaaaaa.cn/java/course/cloud.shtml 20160321101955  http://net.aaaaaaa.cn/net/video.shtml 20160321101956  http://java.aaaaaaa.cn/java/course/javaeeadvanced.shtml 20160321101956  http://net.aaaaaaa.cn/net/video.shtml 20160321101956  http://net.aaaaaaa.cn/net/video.shtml 20160321101956  http://java.aaaaaaa.cn/java/course/javaeeadvanced.shtml 20160321101956  http://java.aaaaaaa.cn/java/course/android.shtml 20160321101956  http://java.aaaaaaa.cn/java/course/hadoop.shtml 20160321101956  http://java.aaaaaaa.cn/java/course/javaee.shtml 20160321101956  http://java.aaaaaaa.cn/java/course/javaeeadvanced.shtml 20160321101956  http://php.aaaaaaa.cn/php/teacher.shtml 20160321101956  http://net.aaaaaaa.cn/net/teacher.shtml 20160321101956  http://java.aaaaaaa.cn/java/course/base.shtml 20160321101956  http://java.aaaaaaa.cn/java/course/cloud.shtml 20160321101956  http://php.aaaaaaa.cn/php/teacher.shtml 20160321101956  http://net.aaaaaaa.cn/net/course.shtml 20160321101956  http://net.aaaaaaa.cn/net/teacher.shtml 20160321101956  http://php.aaaaaaa.cn/php/video.shtml 20160321101956  http://java.aaaaaaa.cn/java/course/cloud.shtml 20160321101956  http://java.aaaaaaa.cn/java/course/cloud.shtml 20160321101956  http://java.aaaaaaa.cn/java/course/hadoop.shtml 20160321101957  http://java.aaaaaaa.cn/java/teacher.shtml 20160321101957  http://php.aaaaaaa.cn/php/teacher.shtml 20160321101957  http://net.aaaaaaa.cn/net/teacher.shtml 20160321101957  http://net.aaaaaaa.cn/net/teacher.shtml 20160321101957  http://php.aaaaaaa.cn/php/teacher.shtml 20160321101957  http://php.aaaaaaa.cn/php/course.shtml 20160321101957  http://java.aaaaaaa.cn/java/course/base.shtml 20160321101957  http://net.aaaaaaa.cn/net/course.shtml 20160321101957  http://java.aaaaaaa.cn/java/video.shtml 20160321101957  http://php.aaaaaaa.cn/php/video.shtml 20160321101957  http://net.aaaaaaa.cn/net/teacher.shtml 20160321101957  http://java.aaaaaaa.cn/java/video.shtml 20160321101957  http://net.aaaaaaa.cn/net/video.shtml 20160321101957  http://java.aaaaaaa.cn/java/course/hadoop.shtml 20160321101957  http://net.aaaaaaa.cn/net/course.shtml 20160321101957  http://java.aaaaaaa.cn/java/course/cloud.shtml 20160321101957  http://java.aaaaaaa.cn/java/course/cloud.shtml 20160321101958  http://net.aaaaaaa.cn/net/course.shtml 20160321101958  http://java.aaaaaaa.cn/java/course/hadoop.shtml 20160321101958  http://php.aaaaaaa.cn/php/video.shtml 20160321101958  http://php.aaaaaaa.cn/php/course.shtml 20160321101958  http://java.aaaaaaa.cn/java/course/cloud.shtml 20160321101958  http://net.aaaaaaa.cn/net/video.shtml 20160321101958  http://java.aaaaaaa.cn/java/course/base.shtml
需要参照:https://blog.csdn.net/jiaotongqu6470/article/details/78462203テクニカルリファレンス:https://blog.csdn.net/helloxiaozhe/article/details/103443028                   https://blog.csdn.net/helloxiaozhe/article/details/103809088