Pythonスクリプトを使用してHiveからデータを取り出して計算してMysqlにロードする例
18574 ワード
サーバにPythonライブラリをインストールする権限がないため、この文書ではOs操作HiveおよびMysqlライブラリを用いてデータの読み取りと書き込みを行う.
重点的な関心と学習:
PythonはOsによってデータベースを操作する方法を受信し、転送する.外部パラメータをPythonに転送して使用します.Pythonはデータパケットのまとめを行う.Pythonのsetオブジェクトを利用してデータの重さと交差、並列、差の操作を行う.Pythonのset、list、str間の相互変換;Pythonの関数で定義された関数と、関数の呼び出し、関数パラメータの使用.「if__name_='_main_':」メインプログラムエントリの使用;Python時間の印刷、プログラムの実行時間の計算;などなど.1、新規ユーザーが1ヶ月以内に毎日保存する計算(外部パラメータの入力なし、内部定義)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain.py
2、新しいユーザーが1ヶ月以内に毎日保存する計算(外部パラメータの入力)/Users/nisj/PycharmProjects/EsDataProc/Hive_remain_par.py
3、新しいユーザーが1ヶ月以内に毎日保存する計算(プラットフォーム、チャネル次元に持ち込む)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain_group_par.py
4、DAUの計算(プラットフォーム、チャネル次元に持ち込む)/Users/nisj/PycharmProjects/EsDataProc/Hive_dau_group_par.py
5、究極の残存計算(プラットフォーム、チャネル次元に持ち込む)/Users/nisj/PycharmProjects/EsDataProc/Hive_final_remain_par.py
6、データの一括スケジューリングの方式
重点的な関心と学習:
PythonはOsによってデータベースを操作する方法を受信し、転送する.外部パラメータをPythonに転送して使用します.Pythonはデータパケットのまとめを行う.Pythonのsetオブジェクトを利用してデータの重さと交差、並列、差の操作を行う.Pythonのset、list、str間の相互変換;Pythonの関数で定義された関数と、関数の呼び出し、関数パラメータの使用.「if__name_='_main_':」メインプログラムエントリの使用;Python時間の印刷、プログラムの実行時間の計算;などなど.1、新規ユーザーが1ヶ月以内に毎日保存する計算(外部パラメータの入力なし、内部定義)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
warnings.filterwarnings("ignore")
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)
batch_date = today - datetime.timedelta(days=52)
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print " :",now_time
remain_static = {}
newuser_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
select a1.appsource,a1.appkey,a1.identifier from ( \
select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day = '%s' ) a1 \
left join \
(select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
where a2.identifier is null \
;" \
"""
%(batch_date,batch_date)).readlines();
remain_static['newuser_cnt'] = len(newuser_data)
for i in range(2,31):
sql_text = "select appsource,appkey,identifier from bi_all_access_log where pt_day = date_add('%s',%s);"%(batch_date,i)
print sql_text
day2user_data = os.popen("""/usr/lib/hive-current/bin/hive -e "%s" """%(sql_text)).readlines();
day2remain_data = set(newuser_data) & set(day2user_data)
remain_static['day%sremain_cnt'%(i)] = len(day2remain_data)
# print remain_static['newuser_cnt'], remain_static['day2remain_cnt'], remain_static['day3remain_cnt'], remain_static['day4remain_cnt'], remain_static['day5remain_cnt'], remain_static['day6remain_cnt']
os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data;delete from bi_remain_user_static where data_date='%s'; \
insert into bi_remain_user_static(data_date,newuser_cnt,day2remain_cnt,day3remain_cnt,day4remain_cnt,day5remain_cnt,day6remain_cnt,day7remain_cnt,day8remain_cnt,day9remain_cnt,day10remain_cnt,day11remain_cnt,day12remain_cnt,day13remain_cnt,day14remain_cnt,day15remain_cnt,day16remain_cnt,day17remain_cnt,day18remain_cnt,day19remain_cnt,day20remain_cnt,day21remain_cnt,day22remain_cnt,day23remain_cnt,day24remain_cnt,day25remain_cnt,day26remain_cnt,day27remain_cnt,day28remain_cnt,day29remain_cnt,day30remain_cnt) \
select '%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s'; \
" """ % (batch_date, batch_date, remain_static['newuser_cnt'], remain_static['day2remain_cnt'], remain_static['day3remain_cnt'], remain_static['day4remain_cnt'], remain_static['day5remain_cnt'], remain_static['day6remain_cnt'], remain_static['day7remain_cnt'], remain_static['day8remain_cnt'], remain_static['day9remain_cnt'], remain_static['day10remain_cnt'], remain_static['day11remain_cnt'], remain_static['day12remain_cnt'], remain_static['day13remain_cnt'], remain_static['day14remain_cnt'], remain_static['day15remain_cnt'], remain_static['day16remain_cnt'], remain_static['day17remain_cnt'], remain_static['day18remain_cnt'], remain_static['day19remain_cnt'], remain_static['day20remain_cnt'], remain_static['day21remain_cnt'], remain_static['day22remain_cnt'], remain_static['day23remain_cnt'], remain_static['day24remain_cnt'], remain_static['day25remain_cnt'], remain_static['day26remain_cnt'], remain_static['day27remain_cnt'], remain_static['day28remain_cnt'], remain_static['day29remain_cnt'], remain_static['day30remain_cnt']))
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print " :",now_time
2、新しいユーザーが1ヶ月以内に毎日保存する計算(外部パラメータの入力)/Users/nisj/PycharmProjects/EsDataProc/Hive_remain_par.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import sys
warnings.filterwarnings("ignore")
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)
# batch_date = today - datetime.timedelta(days=52)
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print " :",now_time
def user_remain_proc(batch_date):
remain_static = {}
newuser_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
select a1.appsource,a1.appkey,a1.identifier from ( \
select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day = '%s' ) a1 \
left join \
(select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
where a2.identifier is null \
;" \
"""
%(batch_date,batch_date)).readlines();
remain_static['newuser_cnt'] = len(newuser_data)
for i in range(2,31):
sql_text = "select appsource,appkey,identifier from bi_all_access_log where pt_day = date_add('%s',%s);"%(batch_date,i)
print sql_text
day2user_data = os.popen("""/usr/lib/hive-current/bin/hive -e "%s" """%(sql_text)).readlines();
day2remain_data = set(newuser_data) & set(day2user_data)
remain_static['day%sremain_cnt'%(i)] = len(day2remain_data)
os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data;delete from bi_remain_user_static where data_date='%s'; \
insert into bi_remain_user_static(data_date,newuser_cnt,day2remain_cnt,day3remain_cnt,day4remain_cnt,day5remain_cnt,day6remain_cnt,day7remain_cnt,day8remain_cnt,day9remain_cnt,day10remain_cnt,day11remain_cnt,day12remain_cnt,day13remain_cnt,day14remain_cnt,day15remain_cnt,day16remain_cnt,day17remain_cnt,day18remain_cnt,day19remain_cnt,day20remain_cnt,day21remain_cnt,day22remain_cnt,day23remain_cnt,day24remain_cnt,day25remain_cnt,day26remain_cnt,day27remain_cnt,day28remain_cnt,day29remain_cnt,day30remain_cnt) \
select '%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s'; \
" """ % (batch_date, batch_date, remain_static['newuser_cnt'], remain_static['day2remain_cnt'], remain_static['day3remain_cnt'], remain_static['day4remain_cnt'], remain_static['day5remain_cnt'], remain_static['day6remain_cnt'], remain_static['day7remain_cnt'], remain_static['day8remain_cnt'], remain_static['day9remain_cnt'], remain_static['day10remain_cnt'], remain_static['day11remain_cnt'], remain_static['day12remain_cnt'], remain_static['day13remain_cnt'], remain_static['day14remain_cnt'], remain_static['day15remain_cnt'], remain_static['day16remain_cnt'], remain_static['day17remain_cnt'], remain_static['day18remain_cnt'], remain_static['day19remain_cnt'], remain_static['day20remain_cnt'], remain_static['day21remain_cnt'], remain_static['day22remain_cnt'], remain_static['day23remain_cnt'], remain_static['day24remain_cnt'], remain_static['day25remain_cnt'], remain_static['day26remain_cnt'], remain_static['day27remain_cnt'], remain_static['day28remain_cnt'], remain_static['day29remain_cnt'], remain_static['day30remain_cnt']))
if __name__ == '__main__':
for batch_date in sys.argv[1:]:
print batch_date
user_remain_proc(batch_date)
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print " :",now_time
3、新しいユーザーが1ヶ月以内に毎日保存する計算(プラットフォーム、チャネル次元に持ち込む)
/Users/nisj/PycharmProjects/EsDataProc/Hive_remain_group_par.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import re
import sys
from itertools import groupby
from operator import itemgetter
warnings.filterwarnings("ignore")
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)
# batch_date = today - datetime.timedelta(days=52)
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print " :",now_time
def user_remain_proc(batch_date):
os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
delete from bi_user_remain_static where data_date='%s'; \
" """ % (batch_date))
newuser_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
select a1.appsource,a1.appkey,a1.identifier from ( \
select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day = '%s' ) a1 \
left join \
(select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
where a2.identifier is null \
;" \
"""
%(batch_date,batch_date)).readlines();
for i in range(31):
sql_text = "select appsource,appkey,identifier from bi_all_access_log where pt_day = date_add('%s',%s) ;"%(batch_date, i)
print sql_text
day2user_data = os.popen("""/usr/lib/hive-current/bin/hive -e "%s" """%(sql_text)).readlines();
day2remain_data = set(newuser_data) & set(day2user_data)
day2remain_data = list(day2remain_data)
day2remain_list = []
for d2rm_list in day2remain_data:
d2l = re.split('\t',d2rm_list.replace('
',''))
day2remain_list.append(d2l)
day2remain_data_sorted = sorted(day2remain_list, key=itemgetter(0, 1))
groupby_day2remain_data = groupby(day2remain_data_sorted, key=itemgetter(0, 1))
rl = []
for key, item in groupby_day2remain_data:
item_cnt = 0
for jtem in item:
item_cnt += 1
groupby_day2remain_list = (key, item_cnt)
rl.append(groupby_day2remain_list)
# print rl
for x in rl:
print x[0][0], x[0][1], x[1]
appsource = x[0][0]
appkey = x[0][1]
data_type = "day(%s)remain_cnt" % (i)
data_cnt = x[1]
etl_time = time.strftime('%Y-%m-%d %X', time.localtime())
os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
insert into bi_user_remain_static(data_date,appsource,appkey,data_type,data_cnt,etl_time) \
select '%s','%s','%s','%s','%s','%s'; \
" """ % (batch_date, appsource, appkey, data_type, data_cnt, etl_time))
if __name__ == '__main__':
for batch_date in sys.argv[1:]:
print batch_date
user_remain_proc(batch_date)
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print " :",now_time
4、DAUの計算(プラットフォーム、チャネル次元に持ち込む)/Users/nisj/PycharmProjects/EsDataProc/Hive_dau_group_par.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import re
import sys
from itertools import groupby
from operator import itemgetter
warnings.filterwarnings("ignore")
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)
# batch_date = today - datetime.timedelta(days=52)
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print " :",now_time
def user_dau_proc(batch_date):
os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
delete from bi_user_dau_static where data_date='%s'; \
" """ % (batch_date))
def dau_data_proc(dau_data, data_type):
dau_data = set(dau_data)
dau_data = list(dau_data)
dau_list = []
for dd_list in dau_data:
ddl = re.split('\t',dd_list.replace('
',''))
dau_list.append(ddl)
dau_data_sorted = sorted(dau_list, key=itemgetter(0, 1))
groupby_dau_data = groupby(dau_data_sorted, key=itemgetter(0, 1))
rl = []
for key, item in groupby_dau_data:
item_cnt = 0
for jtem in item:
item_cnt += 1
groupby_dau_list = (key, item_cnt)
rl.append(groupby_dau_list)
# print rl
for x in rl:
# print x[0][0], x[0][1], x[1]
appsource = x[0][0]
appkey = x[0][1]
data_type = data_type
data_cnt = x[1]
etl_time = time.strftime('%Y-%m-%d %X', time.localtime())
os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
insert into bi_user_dau_static(data_date,appsource,appkey,data_type,data_cnt,etl_time) \
select '%s','%s','%s','%s','%s','%s'; \
" """ % (batch_date, appsource, appkey, data_type, data_cnt, etl_time))
ytd_dau_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
select appsource,appkey,identifier \
from bi_all_access_log \
where pt_day = '%s' " \
"""
% (batch_date)).readlines();
# before_dau_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
# select appsource,appkey,identifier \
# from bi_all_access_log \
# where pt_day < '%s' " \
# """
# % (batch_date)).readlines();
dau_data_proc(ytd_dau_data, 'dau');
# dau_data_proc(before_dau_data, 'before-dau');
if __name__ == '__main__':
for batch_date in sys.argv[1:]:
print batch_date
user_dau_proc(batch_date)
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print " :",now_time
5、究極の残存計算(プラットフォーム、チャネル次元に持ち込む)/Users/nisj/PycharmProjects/EsDataProc/Hive_final_remain_par.py
# -*- coding=utf-8 -*-
import warnings
import datetime
import time
import os
import re
import sys
warnings.filterwarnings("ignore")
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)
# batch_date = today - datetime.timedelta(days=52)
now_time = time.strftime('%Y-%m-%d %X', time.localtime())
print " :",now_time
def user_dau_proc(batch_date):
os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
delete from bi_final_remain_static where data_date='%s'; \
" """ % (batch_date))
def dau_data_proc(dau_data):
dau_data = set(dau_data)
dau_data = list(dau_data)
dau_list = []
for dd_list in dau_data:
ddl = re.split('\t',dd_list.replace('
',''))
dau_list.append(ddl)
for x in dau_list:
print x[0], x[1], x[2], x[3]
appsource = x[0]
appkey = x[1]
ytd_dau = x[2]
before_ytd_dau = x[3]
etl_time = time.strftime('%Y-%m-%d %X', time.localtime())
os.system("""/usr/bin/mysql -hwebserverip -P6603 -uwebserveruser -pwebserverpasswd -e "use funnyai_data; \
insert into bi_final_remain_static(data_date,appsource,appkey,ytd_dau,before_ytd_dau,etl_time) \
select '%s','%s','%s','%s','%s','%s'; \
" """ % (batch_date, appsource, appkey, ytd_dau, before_ytd_dau, etl_time))
final_remain_data = os.popen("""/usr/lib/hive-current/bin/hive -e " \
with ytd_dau as(select appkey,appsource,count(distinct identifier) ytd_dau from bi_all_access_log where pt_day='%s' group by appkey,appsource), \
before_ytd_dau as (select appkey,appsource,count(distinct identifier) before_ytd_dau from bi_all_access_log where pt_day
6、データの一括スケジューリングの方式
nohup python hive_remain_user.py 2016-10-01 2016-10-02 2016-10-03 2016-10-04 2016-10-05 2016-10-06 2016-10-07 2016-10-08 2016-10-09 2016-10-10 2016-10-11 2016-10-12 2016-10-13 2016-10-14 2016-10-15 2016-10-16 2016-10-17 2016-10-18 2016-10-19 2016-10-20 2016-10-21 2016-10-22 2016-10-23 2016-10-24 2016-10-25 2016-10-26 2016-10-27 2016-10-28 2016-10-29 2016-10-30 2016-10-31 &