Pythonスクリプトを使用してHiveからデータを取り出して計算してMysqlにロードする例

18574 ワード

サーバにPythonライブラリをインストールする権限がないため、この文書ではOs操作HiveおよびMysqlライブラリを用いてデータの読み取りと書き込みを行う.
重点的な関心と学習:
PythonはOsによってデータベースを操作する方法を受信し、転送する.外部パラメータをPythonに転送して使用します.Pythonはデータパケットのまとめを行う.Pythonのsetオブジェクトを利用してデータの重さと交差、並列、差の操作を行う.Pythonのset、list、str間の相互変換;Pythonの関数で定義された関数と、関数の呼び出し、関数パラメータの使用.「if__name_='_main_':」メインプログラムエントリの使用;Python時間の印刷、プログラムの実行時間の計算;などなど.1、新規ユーザーが1ヶ月以内に毎日保存する計算(外部パラメータの入力なし、内部定義)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

batch_date = today - datetime.timedelta(days=52)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "     :",now_time

remain_static = {}

newuser_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
select a1.appsource,a1.appkey,a1.identifier from ( \
select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day = '%s' ) a1 \
left join \
(select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
where a2.identifier is null \
;" \
"""
%(batch_date,batch_date)).readlines();

remain_static['newuser_cnt'] = len(newuser_data)
for i in range(2,31):
    sql_text = "select appsource,appkey,identifier from bi_all_access_log where pt_day = date_add('%s',%s);"%(batch_date,i)
    print sql_text
    day2user_data = os.popen("""/usr/lib/hive-current/bin/hive -e "%s" """%(sql_text)).readlines();
    day2remain_data = set(newuser_data) & set(day2user_data)
    remain_static['day%sremain_cnt'%(i)] = len(day2remain_data)
# print remain_static['newuser_cnt'], remain_static['day2remain_cnt'], remain_static['day3remain_cnt'], remain_static['day4remain_cnt'], remain_static['day5remain_cnt'], remain_static['day6remain_cnt']
os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data;delete from bi_remain_user_static where data_date='%s'; \
insert into bi_remain_user_static(data_date,newuser_cnt,day2remain_cnt,day3remain_cnt,day4remain_cnt,day5remain_cnt,day6remain_cnt,day7remain_cnt,day8remain_cnt,day9remain_cnt,day10remain_cnt,day11remain_cnt,day12remain_cnt,day13remain_cnt,day14remain_cnt,day15remain_cnt,day16remain_cnt,day17remain_cnt,day18remain_cnt,day19remain_cnt,day20remain_cnt,day21remain_cnt,day22remain_cnt,day23remain_cnt,day24remain_cnt,day25remain_cnt,day26remain_cnt,day27remain_cnt,day28remain_cnt,day29remain_cnt,day30remain_cnt) \
select '%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s'; \
 " """ % (batch_date, batch_date, remain_static['newuser_cnt'], remain_static['day2remain_cnt'], remain_static['day3remain_cnt'], remain_static['day4remain_cnt'], remain_static['day5remain_cnt'], remain_static['day6remain_cnt'], remain_static['day7remain_cnt'], remain_static['day8remain_cnt'], remain_static['day9remain_cnt'], remain_static['day10remain_cnt'], remain_static['day11remain_cnt'], remain_static['day12remain_cnt'], remain_static['day13remain_cnt'], remain_static['day14remain_cnt'], remain_static['day15remain_cnt'], remain_static['day16remain_cnt'], remain_static['day17remain_cnt'], remain_static['day18remain_cnt'], remain_static['day19remain_cnt'], remain_static['day20remain_cnt'], remain_static['day21remain_cnt'], remain_static['day22remain_cnt'], remain_static['day23remain_cnt'], remain_static['day24remain_cnt'], remain_static['day25remain_cnt'], remain_static['day26remain_cnt'], remain_static['day27remain_cnt'], remain_static['day28remain_cnt'], remain_static['day29remain_cnt'], remain_static['day30remain_cnt']))
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "     :",now_time

2、新しいユーザーが1ヶ月以内に毎日保存する計算(外部パラメータの入力)/Users/nisj/PycharmProjects/EsDataProc/Hive_remain_par.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import sys

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

# batch_date = today - datetime.timedelta(days=52)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "     :",now_time

def user_remain_proc(batch_date):
    remain_static = {}

    newuser_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
    select a1.appsource,a1.appkey,a1.identifier from ( \
    select appsource,appkey,identifier \
    from bi_all_access_log \
    where pt_day = '%s' ) a1 \
    left join \
    (select appsource,appkey,identifier \
    from bi_all_access_log \
    where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
    where a2.identifier is null \
    ;" \
    """
    %(batch_date,batch_date)).readlines();

    remain_static['newuser_cnt'] = len(newuser_data)
    for i in range(2,31):
        sql_text = "select appsource,appkey,identifier from bi_all_access_log where pt_day = date_add('%s',%s);"%(batch_date,i)
        print sql_text
        day2user_data = os.popen("""/usr/lib/hive-current/bin/hive -e "%s" """%(sql_text)).readlines();
        day2remain_data = set(newuser_data) & set(day2user_data)
        remain_static['day%sremain_cnt'%(i)] = len(day2remain_data)

    os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data;delete from bi_remain_user_static where data_date='%s'; \
    insert into bi_remain_user_static(data_date,newuser_cnt,day2remain_cnt,day3remain_cnt,day4remain_cnt,day5remain_cnt,day6remain_cnt,day7remain_cnt,day8remain_cnt,day9remain_cnt,day10remain_cnt,day11remain_cnt,day12remain_cnt,day13remain_cnt,day14remain_cnt,day15remain_cnt,day16remain_cnt,day17remain_cnt,day18remain_cnt,day19remain_cnt,day20remain_cnt,day21remain_cnt,day22remain_cnt,day23remain_cnt,day24remain_cnt,day25remain_cnt,day26remain_cnt,day27remain_cnt,day28remain_cnt,day29remain_cnt,day30remain_cnt) \
    select '%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s'; \
     " """ % (batch_date, batch_date, remain_static['newuser_cnt'], remain_static['day2remain_cnt'], remain_static['day3remain_cnt'], remain_static['day4remain_cnt'], remain_static['day5remain_cnt'], remain_static['day6remain_cnt'], remain_static['day7remain_cnt'], remain_static['day8remain_cnt'], remain_static['day9remain_cnt'], remain_static['day10remain_cnt'], remain_static['day11remain_cnt'], remain_static['day12remain_cnt'], remain_static['day13remain_cnt'], remain_static['day14remain_cnt'], remain_static['day15remain_cnt'], remain_static['day16remain_cnt'], remain_static['day17remain_cnt'], remain_static['day18remain_cnt'], remain_static['day19remain_cnt'], remain_static['day20remain_cnt'], remain_static['day21remain_cnt'], remain_static['day22remain_cnt'], remain_static['day23remain_cnt'], remain_static['day24remain_cnt'], remain_static['day25remain_cnt'], remain_static['day26remain_cnt'], remain_static['day27remain_cnt'], remain_static['day28remain_cnt'], remain_static['day29remain_cnt'], remain_static['day30remain_cnt']))

if __name__ == '__main__':
    for batch_date in sys.argv[1:]:
        print batch_date
        user_remain_proc(batch_date)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "     :",now_time

3、新しいユーザーが1ヶ月以内に毎日保存する計算(プラットフォーム、チャネル次元に持ち込む)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain_group_par.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import re
import sys
from itertools import groupby
from operator import itemgetter

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

# batch_date = today - datetime.timedelta(days=52)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "     :",now_time

def user_remain_proc(batch_date):
    os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
                delete from bi_user_remain_static where data_date='%s'; \
                 " """ % (batch_date))

    newuser_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
    select a1.appsource,a1.appkey,a1.identifier from ( \
    select appsource,appkey,identifier \
    from bi_all_access_log \
    where pt_day = '%s' ) a1 \
    left join \
    (select appsource,appkey,identifier \
    from bi_all_access_log \
    where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
    where a2.identifier is null \
    ;" \
    """
    %(batch_date,batch_date)).readlines();


    for i in range(31):
        sql_text = "select appsource,appkey,identifier from bi_all_access_log where pt_day = date_add('%s',%s) ;"%(batch_date, i)
        print sql_text
        day2user_data = os.popen("""/usr/lib/hive-current/bin/hive -e "%s" """%(sql_text)).readlines();
        day2remain_data = set(newuser_data) & set(day2user_data)
        day2remain_data = list(day2remain_data)
        day2remain_list = []
        for d2rm_list in day2remain_data:
            d2l = re.split('\t',d2rm_list.replace('
','')) day2remain_list.append(d2l) day2remain_data_sorted = sorted(day2remain_list, key=itemgetter(0, 1)) groupby_day2remain_data = groupby(day2remain_data_sorted, key=itemgetter(0, 1)) rl = [] for key, item in groupby_day2remain_data: item_cnt = 0 for jtem in item: item_cnt += 1 groupby_day2remain_list = (key, item_cnt) rl.append(groupby_day2remain_list) # print rl for x in rl: print x[0][0], x[0][1], x[1] appsource = x[0][0] appkey = x[0][1] data_type = "day(%s)remain_cnt" % (i) data_cnt = x[1] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \ insert into bi_user_remain_static(data_date,appsource,appkey,data_type,data_cnt,etl_time) \ select '%s','%s','%s','%s','%s','%s'; \ " """ % (batch_date, appsource, appkey, data_type, data_cnt, etl_time)) if __name__ == '__main__': for batch_date in sys.argv[1:]: print batch_date user_remain_proc(batch_date) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print " :",now_time

4、DAUの計算(プラットフォーム、チャネル次元に持ち込む)/Users/nisj/PycharmProjects/EsDataProc/Hive_dau_group_par.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import re
import sys
from itertools import groupby
from operator import itemgetter

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

# batch_date = today - datetime.timedelta(days=52)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "     :",now_time

def user_dau_proc(batch_date):
    os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
                delete from bi_user_dau_static where data_date='%s'; \
                 " """ % (batch_date))

    def dau_data_proc(dau_data, data_type):
        dau_data = set(dau_data)
        dau_data = list(dau_data)
        dau_list = []
        for dd_list in dau_data:
            ddl = re.split('\t',dd_list.replace('
','')) dau_list.append(ddl) dau_data_sorted = sorted(dau_list, key=itemgetter(0, 1)) groupby_dau_data = groupby(dau_data_sorted, key=itemgetter(0, 1)) rl = [] for key, item in groupby_dau_data: item_cnt = 0 for jtem in item: item_cnt += 1 groupby_dau_list = (key, item_cnt) rl.append(groupby_dau_list) # print rl for x in rl: # print x[0][0], x[0][1], x[1] appsource = x[0][0] appkey = x[0][1] data_type = data_type data_cnt = x[1] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \ insert into bi_user_dau_static(data_date,appsource,appkey,data_type,data_cnt,etl_time) \ select '%s','%s','%s','%s','%s','%s'; \ " """ % (batch_date, appsource, appkey, data_type, data_cnt, etl_time)) ytd_dau_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \ select appsource,appkey,identifier \ from bi_all_access_log \ where pt_day = '%s' " \ """ % (batch_date)).readlines(); # before_dau_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \ # select appsource,appkey,identifier \ # from bi_all_access_log \ # where pt_day < '%s' " \ # """ # % (batch_date)).readlines(); dau_data_proc(ytd_dau_data, 'dau'); # dau_data_proc(before_dau_data, 'before-dau'); if __name__ == '__main__': for batch_date in sys.argv[1:]: print batch_date user_dau_proc(batch_date) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print " :",now_time

5、究極の残存計算(プラットフォーム、チャネル次元に持ち込む)/Users/nisj/PycharmProjects/EsDataProc/Hive_final_remain_par.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import re
import sys

warnings.filterwarnings("ignore")

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)

# batch_date = today - datetime.timedelta(days=52)

now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print "     :",now_time

def user_dau_proc(batch_date):
    os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
                delete from bi_final_remain_static where data_date='%s'; \
                 " """ % (batch_date))

    def dau_data_proc(dau_data):
        dau_data = set(dau_data)
        dau_data = list(dau_data)
        dau_list = []
        for dd_list in dau_data:
            ddl = re.split('\t',dd_list.replace('
','')) dau_list.append(ddl) for x in dau_list: print x[0], x[1], x[2], x[3] appsource = x[0] appkey = x[1] ytd_dau = x[2] before_ytd_dau = x[3] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \ insert into bi_final_remain_static(data_date,appsource,appkey,ytd_dau,before_ytd_dau,etl_time) \ select '%s','%s','%s','%s','%s','%s'; \ " """ % (batch_date, appsource, appkey, ytd_dau, before_ytd_dau, etl_time)) final_remain_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \ with ytd_dau as(select appkey,appsource,count(distinct identifier) ytd_dau from bi_all_access_log where pt_day='%s' group by appkey,appsource), \ before_ytd_dau as (select appkey,appsource,count(distinct identifier) before_ytd_dau from bi_all_access_log where pt_day

6、データの一括スケジューリングの方式
nohup python hive_remain_user.py 2016-10-01 2016-10-02 2016-10-03 2016-10-04 2016-10-05 2016-10-06 2016-10-07 2016-10-08 2016-10-09 2016-10-10 2016-10-11 2016-10-12 2016-10-13 2016-10-14 2016-10-15 2016-10-16 2016-10-17 2016-10-18 2016-10-19 2016-10-20 2016-10-21 2016-10-22 2016-10-23 2016-10-24 2016-10-25 2016-10-26 2016-10-27 2016-10-28 2016-10-29 2016-10-30 2016-10-31 &