python 3はkafkaからデータを取得しjson形式に解析しmysqlに書き込む
54666 ワード
プロジェクト要件:kafkaが解析したログをデータベースの変更レコードに取得し、注文のレベルと注文の詳細レベルに従ってデータベースに書き込み、1つの注文のすべての情報は各種の次元情報を含む1つのjsonに保存し、mysql 5に書き込む.7にあります.構成情報:
コードは長くて醜くて、中途半端で、プロセス向けのプログラミングしか完成していません.対象ができません.すぐに見て、問題があれば私のコードに連絡してください.
[Global]
kafka_server=xxxxxxxxxxx:9092
kafka_topic=mes
consumer_group=test100
passwd = tracking
port = 3306
host = xxxxxxxxxx
user = track
schema = track
dd_socket =
dd_host = xxxxxxxxxxxx
dd_port = 3306
dd_user = xxxxxxxxx
dd_passwd = xxxxxxxx
コードは長くて醜くて、中途半端で、プロセス向けのプログラミングしか完成していません.対象ができません.すぐに見て、問題があれば私のコードに連絡してください.
#encoding=utf-8
import datetime
import configparser
import re
import pymysql
from vertica_python import connect
import vertica_python
import json
from confluent_kafka import Consumer, KafkaError
import csv
import logging
import os
import time
import signal
import sys
#
logging.basicConfig(filename=os.path.join(os.getcwd(), 'log_tracking.txt'), level=logging.WARN, filemode='a',format='%(asctime)s - %(levelname)s: %(message)s')
def writeErrorLog(errSrc, errType, errMsg):
try:
v_log_file = 'err_tracking.log';
v_file = open(v_log_file, 'a')
v_file.write(datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType +" : " + errMsg + '
')
v_file.flush()
except Exception as data:
v_err_file = open('err_tracking.log', 'a')
v_err_file.write(str(data) + '
')
v_err_file.write(datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType + " : " + errMsg + '
')
v_err_file.flush()
v_err_file.close()
finally:
v_file.close()
class RH_Consumer:
# ,
def __init__(self):
self.config = configparser.ConfigParser()
self.config.read('config.ini')
self.host = self.config.get('Global', 'host')
self.user = self.config.get('Global', 'user')
self.passwd = self.config.get('Global', 'passwd')
self.schema = self.config.get('Global', 'schema')
self.port = int(self.config.get('Global', 'port'))
self.kafka_server = self.config.get('Global', 'kafka_server')
self.kafka_topic = self.config.get('Global', 'kafka_topic')
self.consumer_group = self.config.get('Global', 'consumer_group')
self.dd_host = self.config.get('Global', 'dd_host')
self.dd_user = self.config.get('Global', 'dd_user')
self.dd_passwd = self.config.get('Global', 'dd_passwd')
self.dd_port = int(self.config.get('Global', 'dd_port'))
self.dd_socket = self.config.get('Global', 'dd_socket')
self.operation_time = datetime.datetime.now()
self.stop_flag = 0
self.src_table_name = []
self.__init_db()
self.__init_mes_db()
self._get_all_src_table()
#
def __init_db(self):
try:
self.conn_info = {'host': self.host,'port': self.port,'user': self.user,'password': self.passwd,'db': 'tracking'}
self.mysql_db = pymysql.connect(**self.conn_info, charset="utf8" )
self.mysql_cur = self.mysql_db.cursor()
except Exception as data:
writeErrorLog('__init_db', 'Error', str(data))
# ,
def __init_mes_db(self):
try:
self.mes_mysql_db = pymysql.connect(host=self.dd_host, user=self.dd_user, passwd=self.dd_passwd,port=self.dd_port, unix_socket=self.dd_socket, charset="utf8")
self.mes_mysql_cur = self.mes_mysql_db.cursor()
except Exception as data:
writeErrorLog('__init_db', 'Error', str(data))
#
def _release_db(self):
self.mysql_cur.close()
self.mysql_db.close()
self.mes_mysql_cur.close()
self.mes_mysql_db.close()
# ( )
def _get_all_src_table(self):
try:
# table
select_src_table_names = "select distinct src_table_name from tracking.tracking_table_mapping_rule"
self.mysql_cur.execute(select_src_table_names)
rows = self.mysql_cur.fetchall()
for item in rows:
self.src_table_name.append(item[0])
return self.src_table_name
except Exception as data:
writeErrorLog('_get_all_src_table', 'Error', str(data))
logging.error('_get_all_src_table: ' + str(data))
# src
def _get_tgt_table_name(self,table_name,table_schema):
try:
# table (table_name schema|tablename)
select_tgt_table_names = "select distinct tgt_table_name from tracking.tracking_table_mapping_rule where src_table_name = '%s' and src_table_schema = '%s'" %(table_name,table_schema)
self.mysql_cur.execute(select_tgt_table_names)
rows = self.mysql_cur.fetchall()
tgt_table_names=[]
for item in rows:
tgt_table_names.append(item[0])
return tgt_table_names
except Exception as data:
writeErrorLog('_get_tgt_table_name', 'Error', str(data))
logging.error('_get_tgt_table_name: ' + str(data))
# table_name, json
def _get_config(self,table_name,tgt_table_name,table_schema):
try:
# table (table_name schema|tablename)
select_table_config = "select coalesce( src_system, '' ) as src_system,coalesce ( src_table_schema, '' ) as src_table_schema,coalesce ( src_table_name, '' ) as src_table_name,coalesce ( tgt_operation, '{}' ) as tgt_operation,active_flag,coalesce ( tgt_system, '' ) as tgt_system,coalesce ( tgt_table_schema, '' ) as tgt_table_schema,coalesce ( tgt_table_name, '' ) as tgt_table_name from tracking.tracking_table_mapping_rule where src_table_name = '%s' and tgt_table_name='%s' and src_table_schema = '%s' " %(table_name,tgt_table_name,table_schema)
self.mysql_cur.execute(select_table_config)
rows = self.mysql_cur.fetchall()
for item in rows:
self.src_system = item[0]
self.src_table_schema = item[1]
self.src_table_name = item[2]
self.tgt_operation = item[3]
self.active_flag = item[4]
self.tgt_system = item[5]
self.tgt_table_schema = item[6]
self.tgt_table_name = item[7]
# self.tgt_operation
self.tgt_operation = eval(self.tgt_operation)
result_data = {'src_system':self.src_system,
'src_table_schema':self.src_table_schema,
'src_table_name':self.src_table_name,
'tgt_operation':self.tgt_operation,
'active_flag':self.active_flag,
'tgt_system': self.tgt_system,
'tgt_table_schema': self.tgt_table_schema,
'tgt_table_name': self.tgt_table_name,
# self.tgt_operation
'source_primary_key': self.tgt_operation['source_primary_key'],
'source_all_column': self.tgt_operation['source_all_column'],
'target_primary_key': self.tgt_operation['target_primary_key'],
'target_column': self.tgt_operation['target_column'],
'source_level': self.tgt_operation['source_level'] }
return result_data
except Exception as data:
writeErrorLog('_get_config', 'Error', str(data)+':table is not available')
logging.error('_get_config: ' + str(data))
#
def _do(self):
try:
# consumer ,
c = Consumer({
'bootstrap.servers': self.kafka_server,
'group.id': self.consumer_group,
'default.topic.config': {
'auto.offset.reset': 'smallest',
'enable.auto.commit': False}
})
# kafka
c.subscribe([self.kafka_topic])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
text = msg.value().decode(encoding="utf-8")
# kfk_text = eval(text)
kfk_text = json.loads(text)
# kfk , ,
# , commit,
try:
kfk_table = kfk_text['table']
if kfk_table in ['order_mails'] :
print(type(text),text)
logging.warning('-------------- start exec table time : ' + str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))+'---------------------')
kfk_text = str(kfk_text)
kfk_text = kfk_text.replace(": None",": ''")
kfk_text = eval(kfk_text)
kfk_datas = kfk_text['data']
kfk_type = kfk_text['type']
kfk_old = kfk_text['old']
logging.warning(' table_name: '+ str(kfk_table)+ ' table_type : ' + kfk_type)
if kfk_type == 'UPDATE':
continue
print('update')
for i,data in enumerate(kfk_datas):
kfk_text['data'] = eval("["+str(data)+"]")
kfk_text['old'] = eval("[" + str(kfk_old[i]) + "]")
self._get_rh_from_kafka(kfk_text)
else:
print('insert')
for data in kfk_datas:
kfk_text['data'] = eval("["+str(data)+"]")
print(type(kfk_text), kfk_text)
self._get_rh_from_kafka(kfk_text)
logging.warning('----------------end exec table time : ' + str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))+'---------------')
c.commit()
except Exception as data:
writeErrorLog('_do', 'exce data Error', str(data))
logging.error('_do: ' + str(data))
#
if self.stop_flag == 1:
self._exit_consumer()
c.close()
except Exception as data:
print(data)
writeErrorLog('_do', 'Error', str(data))
logging.error('_do: ' + str(data))
def _trans_path(self,tgt_path):
new_tgt_path=tgt_path.replace('.','\".\"').replace('$\".','$.')+'\"'
return new_tgt_path
# kafka ,
def _get_rh_from_kafka(self,kfk_text):
try:
# kfk
self.kfk_tb_schema = kfk_text["database"]#schema
self.kfk_tb_name = kfk_text["table"]#table_name
self.kfk_data = kfk_text['data'][0]#data
self.kfk_type = kfk_text['type']# type
self.kfk_es = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(float(kfk_text['es'] / 1000)))#
# kfk src , -----
tgt_table_names=self._get_tgt_table_name(self.kfk_tb_name,self.kfk_tb_schema)
if len(tgt_table_names) != 0:
for tgt_table_name_for_config in tgt_table_names:
tb_config = self._get_config(self.kfk_tb_name,tgt_table_name_for_config,self.kfk_tb_schema)
tgt_pk_key = tb_config['target_primary_key']# (order_no/order_item_id)
tgt_schema = tb_config['tgt_table_schema']# schema
tgt_table_name = tb_config['tgt_table_name']# ( tracking_order,tracking_order_item)
src_table_name = tb_config['src_table_name']# (schema|table_name)
src_table_schema = tb_config['src_table_schema']
tgt_columns = tb_config['target_column']# ( order_info )
src_level = tb_config['source_level']# level, root,leaf,father
src_pk_key = tb_config['source_primary_key']#
src_pk_value = self.kfk_data[src_pk_key]# ( kfk )
tgt_operation=tb_config['tgt_operation']# ,
# , ,root,leaf,father , insert,update delete
if self.kfk_type == 'INSERT': # kfk INSERT,UPDATE,DELETE
if src_level == 'root': # root
tgt_pk_value = self.kfk_data[tgt_pk_key]# root , ( src_pk_value )
for item in tgt_columns: # item :order_info、order_progress , ,
tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # , _get_data_from_kfk , {"order_info": ["order_no", "cust_no"]}
tgt_column = eval(tgt_column) # dict
if str(tgt_columns[item]['target_path'])=='{}':
logging.warning(str(item)+" is null,please check")
else:
tgt_path = list(tgt_columns[item]['target_path'].values())[0]# ,
(table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column,src_table_name,tgt_pk_value) # , json,
# kfk
self._insert_data_from_kfk_for_root(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value, item, table_insert_data,tgt_path)# kfk
self._insert_father_data(src_table_schema,src_table_name, insert_data, tgt_path, tgt_pk_value, item,catalog_type,tgt_table_name_for_config)#
# insert : , (order_no/order_item_id) , ( ), ,
elif src_level == 'leaf': # kfk INSERT,UPDATE,DELETE
parent_pk_info=tgt_operation['parent_pk_key']
for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement
tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # # , _get_data_from_kfk , {"order_info": ["order_no", "cust_no"]}
tgt_column = eval(tgt_column) # dict
if str(tgt_columns[item]['target_path'])=='{}':# ( , , ,root ?)
logging.warning(str(item) + " is null,please check")
else:
tgt_path = list(tgt_columns[item]['target_path'].keys())[0]#
(table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column,src_table_name,src_pk_value) # # , json,
(parent_tgt_path, tgt_pk_value_new) = self._get_tgt_info_for_leaf(item, tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data)#
tgt_path_true=parent_tgt_path+"."+src_table_name# ( )
self._insert_data_from_kfk_for_leaf(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value_new, item, table_insert_data_for_leaf,tgt_path_true,src_pk_value,insert_data) # kafka
tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'# ( )
self._insert_father_data(src_table_name, insert_data, tgt_path_new, tgt_pk_value_new, item,catalog_type,tgt_table_name_for_config)# ,
elif src_level == 'father':#
for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement
tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]}
tgt_column = eval(tgt_column) #
if str(tgt_columns[item]['target_path']) == '{}':
logging.warning(str(item) + " is null,please check")
else:
tgt_paths = list(tgt_columns[item]['target_path'].values())
(table_insert_data, table_insert_data_for_leaf,insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column, src_table_name, src_pk_value) # kafka json
if 'product' in src_table_name.lower():
catalog_type='PRODUCT'
elif 'service' in src_table_name.lower():
catalog_type='SERVICE'
else:
catalog_type='0'
for tgt_path in tgt_paths:
tgt_info_for_father = self._get_tgt_info_for_father(tgt_path, src_pk_key, src_pk_value, tgt_pk_key, tgt_schema,tgt_table_name, item,catalog_type)
if len(tgt_info_for_father)==0:
logging.warning('can not available the data of the root and leaf table ')
else:
for i in range(len(tgt_info_for_father)):
tgt_pk_value_new = tgt_info_for_father[i][0]
tgt_path_new = ('.'.join(tgt_info_for_father[i][1].split('.')[:-1]))[1:]
self._insert_data_from_db(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value_new,item, tgt_path_new, insert_data)
self._insert_father_data(src_table_name, insert_data, tgt_path_new,tgt_pk_value_new, item, catalog_type,tgt_table_name_for_config)
elif self.kfk_type == 'UPDATE':#update
# update : , ( ), ,
if src_level == 'root': # root
tgt_pk_value = self.kfk_data[tgt_pk_key]## root , ( src_pk_value )
for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement
tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]}
tgt_column = eval(tgt_column) #
if str(tgt_columns[item]['target_path'])=='{}':
logging.warning(str(item) + " is null,please check")
else:
update_columns = kfk_text['old'][0]# kfk
tgt_path = list(tgt_columns[item]['target_path'].values())[0]
(table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column, src_table_name,tgt_pk_value)
self._update_data(tgt_schema, tgt_table_name, tgt_pk_key, src_table_name,update_columns, insert_data, tgt_path, tgt_pk_value, item,catalog_type,tgt_table_name_for_config,src_table_schema)#
#
if 'alter_column' in list(tgt_columns[item].keys()):
record_history_column = tgt_columns[item]['alter_column']
self._insert_history_data(update_columns,insert_data,tgt_path,record_history_column,self.kfk_es,item,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value)
else:
logging.warning(str(item) + " alter_column is not available")
# update : , (order_no/order_item_id) , ( ), ,
elif src_level == 'leaf': ## root
parent_pk_info=tgt_operation['parent_pk_key']
for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement
tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]}
tgt_column = eval(tgt_column) #
if str(tgt_columns[item]['target_path'])=='{}':
logging.warning(str(item) + " is null,please check")
else:
update_columns = kfk_text['old'][0] #
tgt_path = list(tgt_columns[item]['target_path'].keys())[0]
(table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column,src_table_name,src_pk_value) # kafka json
(parent_tgt_path, tgt_pk_value_new) = self._get_tgt_info_for_leaf(item, tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data) #
tgt_path_true=parent_tgt_path+"."+src_table_name## ( )
tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'# ( )
self._update_data(tgt_schema, tgt_table_name, tgt_pk_key, src_table_name,update_columns, insert_data, tgt_path_new, tgt_pk_value_new, item,catalog_type,tgt_table_name_for_config,src_table_schema)
if 'alter_column' in list(tgt_columns[item].keys()):
record_history_column = tgt_columns[item]['alter_column']
self._insert_history_data(update_columns, insert_data, tgt_path_new,record_history_column, self.kfk_es, item, tgt_schema,tgt_table_name, tgt_pk_key, tgt_pk_value_new)
else:
logging.warning(str(item) + " alter_column is not available")
# : , , , ,
elif src_level == 'father': # kfk pass
for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement
tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]}
tgt_column = eval(tgt_column) #
if str(tgt_columns[item]['target_path']) == '{}':
logging.warning(str(item) + " is null,please check")
else:
update_columns = kfk_text['old'][0] #
tgt_paths = list(tgt_columns[item]['target_path'].values())
(table_insert_data, table_insert_data_for_leaf,insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column, src_table_name, src_pk_value) # kafka json
if 'product' in src_table_name.lower():
catalog_type='PRODUCT'
elif 'service' in src_table_name.lower():
catalog_type='SERVICE'
else:
catalog_type='0'
for tgt_path in tgt_paths:
tgt_info_for_father = self._get_tgt_info_for_father(tgt_path, src_pk_key, src_pk_value, tgt_pk_key, tgt_schema,tgt_table_name, item,catalog_type)
for i in range(len(tgt_info_for_father)):
tgt_pk_value_new = tgt_info_for_father[i][0]
tgt_path_new = ('.'.join(tgt_info_for_father[i][1].split('.')[:-1]))[1:]
self._update_data(tgt_schema, tgt_table_name, tgt_pk_key, src_table_name,update_columns, insert_data, tgt_path_new,tgt_pk_value_new, item,catalog_type,tgt_table_name_for_config,src_table_schema)
# :root ,leaf , ,
elif self.kfk_type == 'DELETE':
if src_level == 'root':
tgt_pk_value = self.kfk_data[tgt_pk_key]
self._delete_data_for_root(tgt_pk_key,tgt_pk_value,tgt_schema,tgt_table_name)
elif src_level == 'leaf': #
parent_pk_info = tgt_operation['parent_pk_key']
for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement
tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]}
tgt_column = eval(tgt_column) #
if str(tgt_columns[item]['target_path'])=='{}':
logging.warning(str(item) + " is null,please check")
else:
tgt_path = list(tgt_columns[item]['target_path'].keys())[0]
(parent_tgt_path, tgt_pk_value_new) = self._get_tgt_info_for_leaf(item, tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data)#
tgt_path_true=parent_tgt_path+"."+src_table_name#
tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'
self._delete_data_for_leaf(tgt_schema, tgt_table_name, item, tgt_path_new, tgt_pk_key,tgt_pk_value_new,tgt_path_true)
except Exception as data:
writeErrorLog('_get_rh_from_kafka', 'Error', str(data))
logging.error('_get_rh_from_kafka: ' + str(data))
def _get_tgt_info_for_father(self,tgt_path,src_pk_key,src_pk_value,tgt_pk_key,tgt_schema,tgt_table_name,tgt_column,catalog_type):
try:
tgt_path_true = tgt_path + "." + src_pk_key
if catalog_type=='0':
select_sql_for_father="select "+tgt_pk_key+",json_search("+tgt_column+",\'all\',\'"+src_pk_value+"\',null,\'"+tgt_path_true+"\') from "+tgt_schema+"."+tgt_table_name+" where json_extract(json_extract("+tgt_column+",\'"+tgt_path_true+"\'),\'$[0]\')=\'"+src_pk_value+"\';"
else:
select_sql_for_father = "select " + tgt_pk_key + ",json_search(" + tgt_column + ",\'all\',\'" + src_pk_value + "\',null,\'" + tgt_path_true + "\') from " + tgt_schema + "." + tgt_table_name + " where json_extract(json_extract(" + tgt_column + ",\'" + tgt_path_true + "\'),\'$[0]\')=\'" + src_pk_value + "\' and json_extract(" + tgt_column + ",\'$." +tgt_table_name+".type=\'"+catalog_type+"\';"
self.mysql_cur.execute(select_sql_for_father)
tgt_info_for_father=self.mysql_cur.fetchall()
return tgt_info_for_father
except Exception as data:
writeErrorLog('_get_tgt_info_for_father', 'Error', str(data))
logging.error('_get_tgt_info_for_father: ' + str(data))
def _delete_data_for_root(self,tgt_pk_key,tgt_pk_value,tgt_schema,tgt_table_name):
try:
delete_sql="delete from "+tgt_schema+"."+tgt_table_name+" where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';"
self.mysql_cur.execute(delete_sql)
self.mysql_db.commit()
except Exception as data:
writeErrorLog('_delete_data_for_root', 'Error', str(data))
logging.error('_delete_data_for_root: ' + str(data))
def _delete_data_for_leaf(self,tgt_schema,tgt_table_name,tgt_column,tgt_path,tgt_pk_key,tgt_pk_value,tgt_path_true):
try:
delete_sql="update "+tgt_schema+"."+tgt_table_name+" set "+tgt_column+"=json_remove("+tgt_column+",\'"+tgt_path+"\') where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';"
self.mysql_cur.execute(delete_sql)
self.mysql_db.commit()
select_sql="select json_extract("+tgt_column+",\'"+tgt_path_true+"\') from "+tgt_schema+"."+tgt_table_name+" where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';"
self.mysql_cur.execute(select_sql)
tgt_column_value=self.mysql_cur.fetchall()[0][0]
if tgt_column_value==r'{}':
table_delete_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_remove(" + tgt_column + ",\'" + tgt_path_true + "\') where " + tgt_pk_key + "=\'" + str(tgt_pk_value) + "\';"
self.mysql_cur.execute(table_delete_sql)
self.mysql_db.commit()
except Exception as data:
writeErrorLog('_delete_data_for_leaf', 'Error', str(data))
logging.error('_delete_data_for_leaf: ' + str(data))
def _insert_history_data(self,update_columns,insert_data,tgt_path,record_history_column,data_time,tgt_column,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value):
try:
update_columns_key=list(update_columns.keys())
for item in record_history_column:
if item in update_columns_key:
tgt_path_for_column = tgt_path + '.alter_data.' + item
tgt_path_for_alter = tgt_path + '.alter_data'
select_sql_for_alter_column_path = 'select json_extract(' + tgt_column + ',\'' + tgt_path_for_column + '\')' + ' from ' + tgt_schema + '.' + tgt_table_name + ' where ' + tgt_pk_key + '=\'' + str(tgt_pk_value) + '\';'
select_sql_for_alter_path = 'select json_extract(' + tgt_column + ',\'' + tgt_path_for_alter + '\')' + ' from ' + tgt_schema + '.' + tgt_table_name + ' where ' + tgt_pk_key + '=\'' + str(tgt_pk_value) + '\';'
self.mysql_cur.execute(select_sql_for_alter_column_path)
tgt_path_vlaue_for_column = self.mysql_cur.fetchall()
self.mysql_cur.execute(select_sql_for_alter_path)
tgt_path_vlaue_for_alter = self.mysql_cur.fetchall()
old_data = update_columns[item]
new_data = eval(insert_data)[item]
if tgt_path_vlaue_for_alter[0][0]==None:
history_data = '{\"' + item + '\":[{\"old_data\":\"' + str(old_data) + '\",\"new_data\":\"' + str(new_data) + '\",\"time\":\"' + data_time + '\"}]}'
insert_sql = "update "+tgt_schema + "." + tgt_table_name + " set " + tgt_column +"=json_insert("+tgt_column+",\'"+tgt_path_for_alter+"\',cast(\'"+history_data+"\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';"
else:
if tgt_path_vlaue_for_column[0][0]==None:
history_data='[{\"old_data\":\"'+str(old_data)+'\",\"new_data\":\"'+str(new_data)+'\",\"time\":\"'+data_time+'\"}]'
insert_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path_for_column + "\',cast(\'" + history_data + "\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';"
else:
history_data='{\"old_data\":\"'+str(old_data)+'\",\"new_data\":\"'+str(new_data)+'\",\"time\":\"'+data_time+'\"}'
insert_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_array_append(" + tgt_column + ",\'" + tgt_path_for_column + "\',cast(\'" + history_data + "\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';"
self.mysql_cur.execute(insert_sql)
self.mysql_db.commit()
except Exception as data:
writeErrorLog('_insert_history_data', 'Error', str(data))
logging.error('_insert_history_data: ' + str(data))
# kfk , , json
def _get_data_from_kfk(self, text, tgt_column,src_table_name,src_table_pk_value):
try:
tgt_column_json = tgt_column #
tgt_column_key = ''
for key in tgt_column_json:# tgt_column key
json_column_key = '{'
for item in tgt_column_json[key]:
json_column_key += '"' + item + '":"' + text['data'][0][item].replace('"',r'\\"') + '",'
tgt_column_item = json_column_key[:-1]
tgt_column_key += tgt_column_item + '},'
if 'type' in text['data'][0]:
catalog_type=text['data'][0]['type']
else:
catalog_type='0'
table_insert_data = '{\"' + src_table_name + '\":' + tgt_column_key[:-1] + '}'# {"order":{"order_no":"100"}}
insert_data = tgt_column_key[:-1]# {"order_no":"100"}
table_insert_data_for_leaf = '{\"' + src_table_pk_value + '\":'+insert_data+'}'# {"100":{"order_no":"100"}}
print(insert_data)
return (table_insert_data, table_insert_data_for_leaf,insert_data,catalog_type)#
except Exception as data:
writeErrorLog('_get_data_from_kfk', 'Error', str(data))
logging.error('_get_data_from_kfk: ' + str(data))
def _insert_data_from_kfk_for_root(self,tgt_schema,tgt_table_name,tgt_table_pk,tgt_table_value,tgt_column,table_insert_data,tgt_path):
try:
# , , ,
select_tb_count = 'select count(*) from ' + tgt_schema +"."+tgt_table_name + ' where ' + tgt_table_pk + '=\'' + tgt_table_value + '\';'
#
select_tb_column_count ='select case when coalesce(' + tgt_column + ', \'\') = \'\' then 1 else 0 end from ' + tgt_schema +"."+tgt_table_name + ' where ' + tgt_table_pk + '=\'' + tgt_table_value + '\';'
self.mysql_cur.execute(select_tb_count)
tb_count = self.mysql_cur.fetchall()
self.mysql_cur.execute(select_tb_column_count)
tb_column_count = self.mysql_cur.fetchall()
# , , (order_no/order_item_id)
if tb_count[0][0] == 0:
insert_pk_sql = "insert into " + tgt_schema+"."+tgt_table_name + "(" + tgt_table_pk + ") values ('" + tgt_table_value + "')"
self.mysql_cur.execute(insert_pk_sql)
self.mysql_db.commit()
update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "= cast('" + table_insert_data +"' as json) where " + tgt_table_pk + "= '"+ tgt_table_value + "';"
else:
# , , table_name json
if tb_column_count[0][0]==1:#
update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "= cast('" + table_insert_data + "' as json) where " + tgt_table_pk + "= '" + tgt_table_value + "';"
else:
# , , json_insert table_name json
update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path + "\',cast(\'" + table_insert_data + "\' as json)) where " + tgt_table_pk + "=\'" + tgt_table_value + "\';"
self.mysql_cur.execute(update_sql)
self.mysql_db.commit()
except Exception as data:
writeErrorLog('_insert_data_from_kfk_for_root', 'Error', str(data))
logging.error('_insert_data_from_kfk_for_root: ' + str(data))
def _get_tgt_pk_value_for_leaf(self,tgt_table_pk,tgt_schema,tgt_table_name,tgt_column,tgt_path,parent_pk_value):
try:
select_tgt_pk_sql = "select " + tgt_table_pk + " from " + tgt_schema + "." + tgt_table_name + " where json_extract(" + tgt_column + ",\'" + tgt_path + "\')=\'" + parent_pk_value + "\';"
self.mysql_cur.execute(select_tgt_pk_sql)
tgt_pk_value = self.mysql_cur.fetchall()[0][0]
return tgt_pk_value
except Exception as data:
writeErrorLog('_get_tgt_pk_value_for_leaf', 'Error', str(data))
logging.error('_get_tgt_pk_value_for_leaf: ' + str(data))
#
def _get_tgt_info_for_leaf(self,tgt_column,tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,kafka_data):
try:
if_tgt_path='.'.join(tgt_path.split('.')[:-1])
i=0
json_search_sql=''
where_sql=''
if if_tgt_path=='$':
for parent_pk_key in list(parent_pk_info.keys()):
parent_pk_value = kafka_data[parent_pk_info[parent_pk_key]]
json_search_sql += ",json_search(" + tgt_column + ", 'one','" + str(parent_pk_value) + "', null, '" + tgt_path + "." + parent_pk_key + "') as tgt_path" + str(i)
where_sql += " tgt_path" + str(i) + " is not null and"
i = i + 1
else:
for parent_pk_key in list(parent_pk_info.keys()):
parent_pk_value = kafka_data[parent_pk_info[parent_pk_key]]
json_search_sql += ",json_search(" + tgt_column + ", 'one','" + str(parent_pk_value) + "', null, '" + tgt_path + ".*." + parent_pk_key + "') as tgt_path" + str(i)
where_sql += " tgt_path" + str(i) + " is not null and"
i = i + 1
select_sql = "select "+tgt_pk_key+",tgt_path0 from (select "+tgt_pk_key+json_search_sql+" from " + tgt_schema + "." + tgt_table_name +") t where "+where_sql[:-4]+";"
self.mysql_cur.execute(select_sql)
rows=self.mysql_cur.fetchall()[0]
tgt_path_new = ('.'.join(rows[1].split('.')[:-1]))[1:]
tgt_pk_value_new=rows[0]
return (tgt_path_new,tgt_pk_value_new)
except Exception as data:
writeErrorLog('_get_tgt_info_for_leaf', 'Error', str(data))
logging.error('_get_tgt_info_for_leaf: ' + str(data))
def _insert_data_from_kfk_for_leaf(self,tgt_schema,tgt_table_name,tgt_table_pk,tgt_table_value,tgt_column,table_insert_data_for_leaf,tgt_path,src_pk_value,insert_data):
try:
select_tb_column_key = 'select case when coalesce(json_extract(' + tgt_column + ',\'' + tgt_path + '\') , \'\') = \'\' then 1 else 0 end from ' + tgt_schema + "." + tgt_table_name + ' where ' + tgt_table_pk + '=\'' + str(tgt_table_value) + '\';'
self.mysql_cur.execute(select_tb_column_key)
column_key_data = self.mysql_cur.fetchall()
if column_key_data[0][0] == 1:# ,
tgt_path_new = tgt_path
tgt_insert_data=table_insert_data_for_leaf
else:
tgt_path_new=tgt_path+r'.\"'+str(src_pk_value)+r'\"'
tgt_insert_data=insert_data
update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path_new + "\',cast(\'" + tgt_insert_data + "\' as json)) where " + tgt_table_pk + "=\'" + str(tgt_table_value) + "\';"
self.mysql_cur.execute(update_sql)
self.mysql_db.commit()
except Exception as data:
writeErrorLog('_insert_data_from_kfk_for_leaf', 'Error', str(data))
logging.error('_insert_data_from_kfk_for_leaf: ' + str(data))
# ( , )
def _insert_father_data(self,src_table_schema,scr_table_name,insert_data,src_path,root_pk_value,tgt_column,catalog_type,tgt_table_name_for_config):
try:
src_config_data=self._get_config(scr_table_name,tgt_table_name_for_config,src_table_schema)# ( , )
src_foreign_info=src_config_data['target_column'][tgt_column]['source_foreign_info']# source_foreign_info , , , ,
if len(json.dumps(src_foreign_info))==2:# , ‘{}’ 2, , source_foreign_info=[], 2
logging.warning(scr_table_name+" :Recursive over")
else:
for src_pk_key in src_foreign_info:# ( customer org_id,"source_foreign_info": {"org_id": {"customer.organization": "org_id"}})
foreign_table_name_tmp=list(src_foreign_info[src_pk_key].keys())[0] # foreign_table_name(organization),( key , ,order_info, )
foreign_table_schema=foreign_table_name_tmp.split('.')[0]
foreign_table_name_tmp=foreign_table_name_tmp.split('.')[1]
if '#' in foreign_table_name_tmp:
foreign_table_name = foreign_table_name_tmp.replace('#', catalog_type).lower()
else:
foreign_table_name = foreign_table_name_tmp
foreign_table_pk_key=list(src_foreign_info[src_pk_key].values())[0]# foreign_table_key, org_id
foreign_datas = self._get_config(foreign_table_name,tgt_table_name_for_config,foreign_table_schema)# ,
foreign_column = foreign_datas['target_column'][tgt_column]# order_info/order_progress)(organization )
foreign_schema = foreign_datas['src_table_schema']# schema(organization src schema)
foreign_table_pk_value = eval(str(insert_data))[src_pk_key] # value( organization kfk )
# ( )
tgt_schema=foreign_datas['tgt_table_schema']
tgt_table_name=foreign_datas['tgt_table_name']
tgt_pk_key=foreign_datas['target_primary_key']
tgt_pk_value=root_pk_value#
# , , ( insert )
for foreign_path in foreign_column['target_path']:
src_tgt_path=list(foreign_path.keys())[0]
foreign_tgt_path = list(foreign_path.values())[0]
if re.sub('.\"\S*?\"',r'*',src_path) ==src_tgt_path and re.sub('.\"\S*?\"',r'*',src_path)+'.'+src_pk_key==foreign_tgt_path:
next_src_path=src_path+'.'+src_pk_key
next_insert_data=self._get_data_from_db(foreign_column,foreign_table_name,foreign_schema,foreign_table_pk_key,foreign_table_pk_value,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,src_path,tgt_column,next_src_path)
self._insert_father_data(foreign_table_schema,foreign_table_name,next_insert_data,next_src_path,root_pk_value,tgt_column,catalog_type,tgt_table_name_for_config)
else:
logging.warning(foreign_table_name + ' :have no next level')
except Exception as data:
writeErrorLog('_insert_father_data', 'Error', str(data))
logging.error('_insert_father_data: ' + str(data))
# , , ,
def _get_data_from_db(self,src_tgt_column,src_table_name,src_table_schema,src_table_pk_key,src_table_pk_value,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,src_path,tgt_column,tgt_path):
try:
result_data = '{'
src_column=src_tgt_column['source_column']#
if len(src_column)==0:
logging(str(src_column)+ ' length equal 0 error ')
else:
for item in src_column:# sql ,
select_sql1 = 'concat(\''
select_sql1 += u'"' + item + '":"\',coalesce(' + item + ',\'\'),\'",'
select_sql1 = select_sql1[:-1] + '\')'
select_sql = "select " + select_sql1 + " from " + src_table_schema + "." + src_table_name + " where " + src_table_pk_key + "=\'" + src_table_pk_value + "\';"
# execute SQL
self.mes_mysql_cur.execute(select_sql)
# fetchone()
data = self.mes_mysql_cur.fetchall()
if len(data) == 0:
result_data += ''
else:
result_data+=data[0][0]+','
if result_data != '{':
tgt_value=result_data[:-1] + '}'
else:
tgt_value = result_data+'\"'+src_table_pk_key+'\":\"'+src_table_pk_value+'\"}'
self._insert_data_from_db(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value, tgt_column,tgt_path, tgt_value) #
return tgt_value# , ( * , , )
except Exception as data:
writeErrorLog('_get_data_from_db', 'Error', str(data))
logging.error('_get_data_from_db: ' + str(data))
#
def _insert_data_from_db(self,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,tgt_column,tgt_path,tgt_value):
try:
insert_sql="update "+ tgt_schema+"."+tgt_table_name +" set "+ tgt_column+"=json_replace("+tgt_column+",\'"+tgt_path+"\',cast(\'"+tgt_value+"\' as json)) where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';"
# self.mysql_cur.execute(insert_sql.encode("utf-8").decode("latin1"))
self.mysql_cur.execute(insert_sql)
self.mysql_db.commit()
except Exception as data:
writeErrorLog('_insert_data_from_db', 'Error', str(data))
logging.error('_insert_data_from_db: ' + str(data))
# ,
def _update_data(self, tgt_schema, tgt_table_name, tgt_pk_key, src_table_name, update_columns,insert_data, src_path, root_pk_value, tgt_column,catalog_type,tgt_table_name_for_config,src_table_schema):
try:
# , , json, _get_data_from_db
insert_data = json.loads(insert_data)
for update_column in update_columns:#
if update_column in list(insert_data.keys()):
update_column_data = '\"' + insert_data[update_column] + '\"'
tgt_path = src_path + '.' + update_column
self._insert_data_from_db(tgt_schema, tgt_table_name, tgt_pk_key, root_pk_value,tgt_column, tgt_path, update_column_data)
self._insert_father_data(src_table_schema,src_table_name, insert_data, src_path, root_pk_value, tgt_column,catalog_type,tgt_table_name_for_config)
except Exception as data:
writeErrorLog('_update_data', 'Error', str(data))
logging.error('_update_data: ' + str(data))
#
def _exit_consumer(self):
self._release_db()
sys.exit()
def exit_program(signum, frame):
logging.info("Received Signal: %s at frame: %s" % (signum, frame))
p.stop_flag = 1
def main():
#
p = RH_Consumer()
signal.signal(signal.SIGTERM, exit_program)
# while True:
p._do()
main()