python 3はkafkaからデータを取得しjson形式に解析しmysqlに書き込む

54666 ワード

プロジェクト要件:kafkaが解析したログをデータベースの変更レコードに取得し、注文のレベルと注文の詳細レベルに従ってデータベースに書き込み、1つの注文のすべての情報は各種の次元情報を含む1つのjsonに保存し、mysql 5に書き込む.7にあります.構成情報:
[Global]
kafka_server=xxxxxxxxxxx:9092
kafka_topic=mes
consumer_group=test100
passwd = tracking
port = 3306
host = xxxxxxxxxx
user = track
schema = track
dd_socket =
dd_host = xxxxxxxxxxxx
dd_port = 3306
dd_user = xxxxxxxxx
dd_passwd = xxxxxxxx

コードは長くて醜くて、中途半端で、プロセス向けのプログラミングしか完成していません.対象ができません.すぐに見て、問題があれば私のコードに連絡してください.
#encoding=utf-8
import datetime
import configparser
import re
import pymysql
from vertica_python import connect
import vertica_python
import json
from confluent_kafka import Consumer, KafkaError
import csv
import logging
import os
import time
import signal
import sys

#   
logging.basicConfig(filename=os.path.join(os.getcwd(), 'log_tracking.txt'), level=logging.WARN, filemode='a',format='%(asctime)s - %(levelname)s: %(message)s')

def writeErrorLog(errSrc, errType, errMsg):
    try:
        v_log_file = 'err_tracking.log';
        v_file = open(v_log_file, 'a')
        v_file.write(datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType +" : " + errMsg + '
') v_file.flush() except Exception as data: v_err_file = open('err_tracking.log', 'a') v_err_file.write(str(data) + '
') v_err_file.write(datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType + " : " + errMsg + '
') v_err_file.flush() v_err_file.close() finally: v_file.close() class RH_Consumer: # , def __init__(self): self.config = configparser.ConfigParser() self.config.read('config.ini') self.host = self.config.get('Global', 'host') self.user = self.config.get('Global', 'user') self.passwd = self.config.get('Global', 'passwd') self.schema = self.config.get('Global', 'schema') self.port = int(self.config.get('Global', 'port')) self.kafka_server = self.config.get('Global', 'kafka_server') self.kafka_topic = self.config.get('Global', 'kafka_topic') self.consumer_group = self.config.get('Global', 'consumer_group') self.dd_host = self.config.get('Global', 'dd_host') self.dd_user = self.config.get('Global', 'dd_user') self.dd_passwd = self.config.get('Global', 'dd_passwd') self.dd_port = int(self.config.get('Global', 'dd_port')) self.dd_socket = self.config.get('Global', 'dd_socket') self.operation_time = datetime.datetime.now() self.stop_flag = 0 self.src_table_name = [] self.__init_db() self.__init_mes_db() self._get_all_src_table() # def __init_db(self): try: self.conn_info = {'host': self.host,'port': self.port,'user': self.user,'password': self.passwd,'db': 'tracking'} self.mysql_db = pymysql.connect(**self.conn_info, charset="utf8" ) self.mysql_cur = self.mysql_db.cursor() except Exception as data: writeErrorLog('__init_db', 'Error', str(data)) # , def __init_mes_db(self): try: self.mes_mysql_db = pymysql.connect(host=self.dd_host, user=self.dd_user, passwd=self.dd_passwd,port=self.dd_port, unix_socket=self.dd_socket, charset="utf8") self.mes_mysql_cur = self.mes_mysql_db.cursor() except Exception as data: writeErrorLog('__init_db', 'Error', str(data)) # def _release_db(self): self.mysql_cur.close() self.mysql_db.close() self.mes_mysql_cur.close() self.mes_mysql_db.close() # ( ) def _get_all_src_table(self): try: # table select_src_table_names = "select distinct src_table_name from tracking.tracking_table_mapping_rule" self.mysql_cur.execute(select_src_table_names) rows = self.mysql_cur.fetchall() for item in rows: self.src_table_name.append(item[0]) return self.src_table_name except Exception as data: writeErrorLog('_get_all_src_table', 'Error', str(data)) logging.error('_get_all_src_table: ' + str(data)) # src def _get_tgt_table_name(self,table_name,table_schema): try: # table (table_name schema|tablename) select_tgt_table_names = "select distinct tgt_table_name from tracking.tracking_table_mapping_rule where src_table_name = '%s' and src_table_schema = '%s'" %(table_name,table_schema) self.mysql_cur.execute(select_tgt_table_names) rows = self.mysql_cur.fetchall() tgt_table_names=[] for item in rows: tgt_table_names.append(item[0]) return tgt_table_names except Exception as data: writeErrorLog('_get_tgt_table_name', 'Error', str(data)) logging.error('_get_tgt_table_name: ' + str(data)) # table_name, json def _get_config(self,table_name,tgt_table_name,table_schema): try: # table (table_name schema|tablename) select_table_config = "select coalesce( src_system, '' ) as src_system,coalesce ( src_table_schema, '' ) as src_table_schema,coalesce ( src_table_name, '' ) as src_table_name,coalesce ( tgt_operation, '{}' ) as tgt_operation,active_flag,coalesce ( tgt_system, '' ) as tgt_system,coalesce ( tgt_table_schema, '' ) as tgt_table_schema,coalesce ( tgt_table_name, '' ) as tgt_table_name from tracking.tracking_table_mapping_rule where src_table_name = '%s' and tgt_table_name='%s' and src_table_schema = '%s' " %(table_name,tgt_table_name,table_schema) self.mysql_cur.execute(select_table_config) rows = self.mysql_cur.fetchall() for item in rows: self.src_system = item[0] self.src_table_schema = item[1] self.src_table_name = item[2] self.tgt_operation = item[3] self.active_flag = item[4] self.tgt_system = item[5] self.tgt_table_schema = item[6] self.tgt_table_name = item[7] # self.tgt_operation self.tgt_operation = eval(self.tgt_operation) result_data = {'src_system':self.src_system, 'src_table_schema':self.src_table_schema, 'src_table_name':self.src_table_name, 'tgt_operation':self.tgt_operation, 'active_flag':self.active_flag, 'tgt_system': self.tgt_system, 'tgt_table_schema': self.tgt_table_schema, 'tgt_table_name': self.tgt_table_name, # self.tgt_operation 'source_primary_key': self.tgt_operation['source_primary_key'], 'source_all_column': self.tgt_operation['source_all_column'], 'target_primary_key': self.tgt_operation['target_primary_key'], 'target_column': self.tgt_operation['target_column'], 'source_level': self.tgt_operation['source_level'] } return result_data except Exception as data: writeErrorLog('_get_config', 'Error', str(data)+':table is not available') logging.error('_get_config: ' + str(data)) # def _do(self): try: # consumer , c = Consumer({ 'bootstrap.servers': self.kafka_server, 'group.id': self.consumer_group, 'default.topic.config': { 'auto.offset.reset': 'smallest', 'enable.auto.commit': False} }) # kafka c.subscribe([self.kafka_topic]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print(msg.error()) break text = msg.value().decode(encoding="utf-8") # kfk_text = eval(text) kfk_text = json.loads(text) # kfk , , # , commit, try: kfk_table = kfk_text['table'] if kfk_table in ['order_mails'] : print(type(text),text) logging.warning('-------------- start exec table time : ' + str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))+'---------------------') kfk_text = str(kfk_text) kfk_text = kfk_text.replace(": None",": ''") kfk_text = eval(kfk_text) kfk_datas = kfk_text['data'] kfk_type = kfk_text['type'] kfk_old = kfk_text['old'] logging.warning(' table_name: '+ str(kfk_table)+ ' table_type : ' + kfk_type) if kfk_type == 'UPDATE': continue print('update') for i,data in enumerate(kfk_datas): kfk_text['data'] = eval("["+str(data)+"]") kfk_text['old'] = eval("[" + str(kfk_old[i]) + "]") self._get_rh_from_kafka(kfk_text) else: print('insert') for data in kfk_datas: kfk_text['data'] = eval("["+str(data)+"]") print(type(kfk_text), kfk_text) self._get_rh_from_kafka(kfk_text) logging.warning('----------------end exec table time : ' + str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))+'---------------') c.commit() except Exception as data: writeErrorLog('_do', 'exce data Error', str(data)) logging.error('_do: ' + str(data)) # if self.stop_flag == 1: self._exit_consumer() c.close() except Exception as data: print(data) writeErrorLog('_do', 'Error', str(data)) logging.error('_do: ' + str(data)) def _trans_path(self,tgt_path): new_tgt_path=tgt_path.replace('.','\".\"').replace('$\".','$.')+'\"' return new_tgt_path # kafka , def _get_rh_from_kafka(self,kfk_text): try: # kfk self.kfk_tb_schema = kfk_text["database"]#schema self.kfk_tb_name = kfk_text["table"]#table_name self.kfk_data = kfk_text['data'][0]#data self.kfk_type = kfk_text['type']# type self.kfk_es = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(float(kfk_text['es'] / 1000)))# # kfk src , ----- tgt_table_names=self._get_tgt_table_name(self.kfk_tb_name,self.kfk_tb_schema) if len(tgt_table_names) != 0: for tgt_table_name_for_config in tgt_table_names: tb_config = self._get_config(self.kfk_tb_name,tgt_table_name_for_config,self.kfk_tb_schema) tgt_pk_key = tb_config['target_primary_key']# (order_no/order_item_id) tgt_schema = tb_config['tgt_table_schema']# schema tgt_table_name = tb_config['tgt_table_name']# ( tracking_order,tracking_order_item) src_table_name = tb_config['src_table_name']# (schema|table_name) src_table_schema = tb_config['src_table_schema'] tgt_columns = tb_config['target_column']# ( order_info ) src_level = tb_config['source_level']# level, root,leaf,father src_pk_key = tb_config['source_primary_key']# src_pk_value = self.kfk_data[src_pk_key]# ( kfk ) tgt_operation=tb_config['tgt_operation']# , # , ,root,leaf,father , insert,update delete if self.kfk_type == 'INSERT': # kfk INSERT,UPDATE,DELETE if src_level == 'root': # root tgt_pk_value = self.kfk_data[tgt_pk_key]# root , ( src_pk_value ) for item in tgt_columns: # item :order_info、order_progress , , tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # , _get_data_from_kfk , {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # dict if str(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item)+" is null,please check") else: tgt_path = list(tgt_columns[item]['target_path'].values())[0]# , (table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column,src_table_name,tgt_pk_value) # , json, # kfk self._insert_data_from_kfk_for_root(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value, item, table_insert_data,tgt_path)# kfk self._insert_father_data(src_table_schema,src_table_name, insert_data, tgt_path, tgt_pk_value, item,catalog_type,tgt_table_name_for_config)# # insert : , (order_no/order_item_id) , ( ), , elif src_level == 'leaf': # kfk INSERT,UPDATE,DELETE parent_pk_info=tgt_operation['parent_pk_key'] for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # # , _get_data_from_kfk , {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # dict if str(tgt_columns[item]['target_path'])=='{}':# ( , , ,root ?) logging.warning(str(item) + " is null,please check") else: tgt_path = list(tgt_columns[item]['target_path'].keys())[0]# (table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column,src_table_name,src_pk_value) # # , json, (parent_tgt_path, tgt_pk_value_new) = self._get_tgt_info_for_leaf(item, tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data)# tgt_path_true=parent_tgt_path+"."+src_table_name# ( ) self._insert_data_from_kfk_for_leaf(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value_new, item, table_insert_data_for_leaf,tgt_path_true,src_pk_value,insert_data) # kafka tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'# ( ) self._insert_father_data(src_table_name, insert_data, tgt_path_new, tgt_pk_value_new, item,catalog_type,tgt_table_name_for_config)# , elif src_level == 'father':# for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # if str(tgt_columns[item]['target_path']) == '{}': logging.warning(str(item) + " is null,please check") else: tgt_paths = list(tgt_columns[item]['target_path'].values()) (table_insert_data, table_insert_data_for_leaf,insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column, src_table_name, src_pk_value) # kafka json if 'product' in src_table_name.lower(): catalog_type='PRODUCT' elif 'service' in src_table_name.lower(): catalog_type='SERVICE' else: catalog_type='0' for tgt_path in tgt_paths: tgt_info_for_father = self._get_tgt_info_for_father(tgt_path, src_pk_key, src_pk_value, tgt_pk_key, tgt_schema,tgt_table_name, item,catalog_type) if len(tgt_info_for_father)==0: logging.warning('can not available the data of the root and leaf table ') else: for i in range(len(tgt_info_for_father)): tgt_pk_value_new = tgt_info_for_father[i][0] tgt_path_new = ('.'.join(tgt_info_for_father[i][1].split('.')[:-1]))[1:] self._insert_data_from_db(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value_new,item, tgt_path_new, insert_data) self._insert_father_data(src_table_name, insert_data, tgt_path_new,tgt_pk_value_new, item, catalog_type,tgt_table_name_for_config) elif self.kfk_type == 'UPDATE':#update # update : , ( ), , if src_level == 'root': # root tgt_pk_value = self.kfk_data[tgt_pk_key]## root , ( src_pk_value ) for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # if str(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item) + " is null,please check") else: update_columns = kfk_text['old'][0]# kfk tgt_path = list(tgt_columns[item]['target_path'].values())[0] (table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column, src_table_name,tgt_pk_value) self._update_data(tgt_schema, tgt_table_name, tgt_pk_key, src_table_name,update_columns, insert_data, tgt_path, tgt_pk_value, item,catalog_type,tgt_table_name_for_config,src_table_schema)# # if 'alter_column' in list(tgt_columns[item].keys()): record_history_column = tgt_columns[item]['alter_column'] self._insert_history_data(update_columns,insert_data,tgt_path,record_history_column,self.kfk_es,item,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value) else: logging.warning(str(item) + " alter_column is not available") # update : , (order_no/order_item_id) , ( ), , elif src_level == 'leaf': ## root parent_pk_info=tgt_operation['parent_pk_key'] for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # if str(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item) + " is null,please check") else: update_columns = kfk_text['old'][0] # tgt_path = list(tgt_columns[item]['target_path'].keys())[0] (table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column,src_table_name,src_pk_value) # kafka json (parent_tgt_path, tgt_pk_value_new) = self._get_tgt_info_for_leaf(item, tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data) # tgt_path_true=parent_tgt_path+"."+src_table_name## ( ) tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'# ( ) self._update_data(tgt_schema, tgt_table_name, tgt_pk_key, src_table_name,update_columns, insert_data, tgt_path_new, tgt_pk_value_new, item,catalog_type,tgt_table_name_for_config,src_table_schema) if 'alter_column' in list(tgt_columns[item].keys()): record_history_column = tgt_columns[item]['alter_column'] self._insert_history_data(update_columns, insert_data, tgt_path_new,record_history_column, self.kfk_es, item, tgt_schema,tgt_table_name, tgt_pk_key, tgt_pk_value_new) else: logging.warning(str(item) + " alter_column is not available") # : , , , , elif src_level == 'father': # kfk pass for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # if str(tgt_columns[item]['target_path']) == '{}': logging.warning(str(item) + " is null,please check") else: update_columns = kfk_text['old'][0] # tgt_paths = list(tgt_columns[item]['target_path'].values()) (table_insert_data, table_insert_data_for_leaf,insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column, src_table_name, src_pk_value) # kafka json if 'product' in src_table_name.lower(): catalog_type='PRODUCT' elif 'service' in src_table_name.lower(): catalog_type='SERVICE' else: catalog_type='0' for tgt_path in tgt_paths: tgt_info_for_father = self._get_tgt_info_for_father(tgt_path, src_pk_key, src_pk_value, tgt_pk_key, tgt_schema,tgt_table_name, item,catalog_type) for i in range(len(tgt_info_for_father)): tgt_pk_value_new = tgt_info_for_father[i][0] tgt_path_new = ('.'.join(tgt_info_for_father[i][1].split('.')[:-1]))[1:] self._update_data(tgt_schema, tgt_table_name, tgt_pk_key, src_table_name,update_columns, insert_data, tgt_path_new,tgt_pk_value_new, item,catalog_type,tgt_table_name_for_config,src_table_schema) # :root ,leaf , , elif self.kfk_type == 'DELETE': if src_level == 'root': tgt_pk_value = self.kfk_data[tgt_pk_key] self._delete_data_for_root(tgt_pk_key,tgt_pk_value,tgt_schema,tgt_table_name) elif src_level == 'leaf': # parent_pk_info = tgt_operation['parent_pk_key'] for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # if str(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item) + " is null,please check") else: tgt_path = list(tgt_columns[item]['target_path'].keys())[0] (parent_tgt_path, tgt_pk_value_new) = self._get_tgt_info_for_leaf(item, tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data)# tgt_path_true=parent_tgt_path+"."+src_table_name# tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"' self._delete_data_for_leaf(tgt_schema, tgt_table_name, item, tgt_path_new, tgt_pk_key,tgt_pk_value_new,tgt_path_true) except Exception as data: writeErrorLog('_get_rh_from_kafka', 'Error', str(data)) logging.error('_get_rh_from_kafka: ' + str(data)) def _get_tgt_info_for_father(self,tgt_path,src_pk_key,src_pk_value,tgt_pk_key,tgt_schema,tgt_table_name,tgt_column,catalog_type): try: tgt_path_true = tgt_path + "." + src_pk_key if catalog_type=='0': select_sql_for_father="select "+tgt_pk_key+",json_search("+tgt_column+",\'all\',\'"+src_pk_value+"\',null,\'"+tgt_path_true+"\') from "+tgt_schema+"."+tgt_table_name+" where json_extract(json_extract("+tgt_column+",\'"+tgt_path_true+"\'),\'$[0]\')=\'"+src_pk_value+"\';" else: select_sql_for_father = "select " + tgt_pk_key + ",json_search(" + tgt_column + ",\'all\',\'" + src_pk_value + "\',null,\'" + tgt_path_true + "\') from " + tgt_schema + "." + tgt_table_name + " where json_extract(json_extract(" + tgt_column + ",\'" + tgt_path_true + "\'),\'$[0]\')=\'" + src_pk_value + "\' and json_extract(" + tgt_column + ",\'$." +tgt_table_name+".type=\'"+catalog_type+"\';" self.mysql_cur.execute(select_sql_for_father) tgt_info_for_father=self.mysql_cur.fetchall() return tgt_info_for_father except Exception as data: writeErrorLog('_get_tgt_info_for_father', 'Error', str(data)) logging.error('_get_tgt_info_for_father: ' + str(data)) def _delete_data_for_root(self,tgt_pk_key,tgt_pk_value,tgt_schema,tgt_table_name): try: delete_sql="delete from "+tgt_schema+"."+tgt_table_name+" where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" self.mysql_cur.execute(delete_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_delete_data_for_root', 'Error', str(data)) logging.error('_delete_data_for_root: ' + str(data)) def _delete_data_for_leaf(self,tgt_schema,tgt_table_name,tgt_column,tgt_path,tgt_pk_key,tgt_pk_value,tgt_path_true): try: delete_sql="update "+tgt_schema+"."+tgt_table_name+" set "+tgt_column+"=json_remove("+tgt_column+",\'"+tgt_path+"\') where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" self.mysql_cur.execute(delete_sql) self.mysql_db.commit() select_sql="select json_extract("+tgt_column+",\'"+tgt_path_true+"\') from "+tgt_schema+"."+tgt_table_name+" where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" self.mysql_cur.execute(select_sql) tgt_column_value=self.mysql_cur.fetchall()[0][0] if tgt_column_value==r'{}': table_delete_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_remove(" + tgt_column + ",\'" + tgt_path_true + "\') where " + tgt_pk_key + "=\'" + str(tgt_pk_value) + "\';" self.mysql_cur.execute(table_delete_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_delete_data_for_leaf', 'Error', str(data)) logging.error('_delete_data_for_leaf: ' + str(data)) def _insert_history_data(self,update_columns,insert_data,tgt_path,record_history_column,data_time,tgt_column,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value): try: update_columns_key=list(update_columns.keys()) for item in record_history_column: if item in update_columns_key: tgt_path_for_column = tgt_path + '.alter_data.' + item tgt_path_for_alter = tgt_path + '.alter_data' select_sql_for_alter_column_path = 'select json_extract(' + tgt_column + ',\'' + tgt_path_for_column + '\')' + ' from ' + tgt_schema + '.' + tgt_table_name + ' where ' + tgt_pk_key + '=\'' + str(tgt_pk_value) + '\';' select_sql_for_alter_path = 'select json_extract(' + tgt_column + ',\'' + tgt_path_for_alter + '\')' + ' from ' + tgt_schema + '.' + tgt_table_name + ' where ' + tgt_pk_key + '=\'' + str(tgt_pk_value) + '\';' self.mysql_cur.execute(select_sql_for_alter_column_path) tgt_path_vlaue_for_column = self.mysql_cur.fetchall() self.mysql_cur.execute(select_sql_for_alter_path) tgt_path_vlaue_for_alter = self.mysql_cur.fetchall() old_data = update_columns[item] new_data = eval(insert_data)[item] if tgt_path_vlaue_for_alter[0][0]==None: history_data = '{\"' + item + '\":[{\"old_data\":\"' + str(old_data) + '\",\"new_data\":\"' + str(new_data) + '\",\"time\":\"' + data_time + '\"}]}' insert_sql = "update "+tgt_schema + "." + tgt_table_name + " set " + tgt_column +"=json_insert("+tgt_column+",\'"+tgt_path_for_alter+"\',cast(\'"+history_data+"\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';" else: if tgt_path_vlaue_for_column[0][0]==None: history_data='[{\"old_data\":\"'+str(old_data)+'\",\"new_data\":\"'+str(new_data)+'\",\"time\":\"'+data_time+'\"}]' insert_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path_for_column + "\',cast(\'" + history_data + "\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';" else: history_data='{\"old_data\":\"'+str(old_data)+'\",\"new_data\":\"'+str(new_data)+'\",\"time\":\"'+data_time+'\"}' insert_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_array_append(" + tgt_column + ",\'" + tgt_path_for_column + "\',cast(\'" + history_data + "\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';" self.mysql_cur.execute(insert_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_insert_history_data', 'Error', str(data)) logging.error('_insert_history_data: ' + str(data)) # kfk , , json def _get_data_from_kfk(self, text, tgt_column,src_table_name,src_table_pk_value): try: tgt_column_json = tgt_column # tgt_column_key = '' for key in tgt_column_json:# tgt_column key json_column_key = '{' for item in tgt_column_json[key]: json_column_key += '"' + item + '":"' + text['data'][0][item].replace('"',r'\\"') + '",' tgt_column_item = json_column_key[:-1] tgt_column_key += tgt_column_item + '},' if 'type' in text['data'][0]: catalog_type=text['data'][0]['type'] else: catalog_type='0' table_insert_data = '{\"' + src_table_name + '\":' + tgt_column_key[:-1] + '}'# {"order":{"order_no":"100"}} insert_data = tgt_column_key[:-1]# {"order_no":"100"} table_insert_data_for_leaf = '{\"' + src_table_pk_value + '\":'+insert_data+'}'# {"100":{"order_no":"100"}} print(insert_data) return (table_insert_data, table_insert_data_for_leaf,insert_data,catalog_type)# except Exception as data: writeErrorLog('_get_data_from_kfk', 'Error', str(data)) logging.error('_get_data_from_kfk: ' + str(data)) def _insert_data_from_kfk_for_root(self,tgt_schema,tgt_table_name,tgt_table_pk,tgt_table_value,tgt_column,table_insert_data,tgt_path): try: # , , , select_tb_count = 'select count(*) from ' + tgt_schema +"."+tgt_table_name + ' where ' + tgt_table_pk + '=\'' + tgt_table_value + '\';' # select_tb_column_count ='select case when coalesce(' + tgt_column + ', \'\') = \'\' then 1 else 0 end from ' + tgt_schema +"."+tgt_table_name + ' where ' + tgt_table_pk + '=\'' + tgt_table_value + '\';' self.mysql_cur.execute(select_tb_count) tb_count = self.mysql_cur.fetchall() self.mysql_cur.execute(select_tb_column_count) tb_column_count = self.mysql_cur.fetchall() # , , (order_no/order_item_id) if tb_count[0][0] == 0: insert_pk_sql = "insert into " + tgt_schema+"."+tgt_table_name + "(" + tgt_table_pk + ") values ('" + tgt_table_value + "')" self.mysql_cur.execute(insert_pk_sql) self.mysql_db.commit() update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "= cast('" + table_insert_data +"' as json) where " + tgt_table_pk + "= '"+ tgt_table_value + "';" else: # , , table_name json if tb_column_count[0][0]==1:# update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "= cast('" + table_insert_data + "' as json) where " + tgt_table_pk + "= '" + tgt_table_value + "';" else: # , , json_insert table_name json update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path + "\',cast(\'" + table_insert_data + "\' as json)) where " + tgt_table_pk + "=\'" + tgt_table_value + "\';" self.mysql_cur.execute(update_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_insert_data_from_kfk_for_root', 'Error', str(data)) logging.error('_insert_data_from_kfk_for_root: ' + str(data)) def _get_tgt_pk_value_for_leaf(self,tgt_table_pk,tgt_schema,tgt_table_name,tgt_column,tgt_path,parent_pk_value): try: select_tgt_pk_sql = "select " + tgt_table_pk + " from " + tgt_schema + "." + tgt_table_name + " where json_extract(" + tgt_column + ",\'" + tgt_path + "\')=\'" + parent_pk_value + "\';" self.mysql_cur.execute(select_tgt_pk_sql) tgt_pk_value = self.mysql_cur.fetchall()[0][0] return tgt_pk_value except Exception as data: writeErrorLog('_get_tgt_pk_value_for_leaf', 'Error', str(data)) logging.error('_get_tgt_pk_value_for_leaf: ' + str(data)) # def _get_tgt_info_for_leaf(self,tgt_column,tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,kafka_data): try: if_tgt_path='.'.join(tgt_path.split('.')[:-1]) i=0 json_search_sql='' where_sql='' if if_tgt_path=='$': for parent_pk_key in list(parent_pk_info.keys()): parent_pk_value = kafka_data[parent_pk_info[parent_pk_key]] json_search_sql += ",json_search(" + tgt_column + ", 'one','" + str(parent_pk_value) + "', null, '" + tgt_path + "." + parent_pk_key + "') as tgt_path" + str(i) where_sql += " tgt_path" + str(i) + " is not null and" i = i + 1 else: for parent_pk_key in list(parent_pk_info.keys()): parent_pk_value = kafka_data[parent_pk_info[parent_pk_key]] json_search_sql += ",json_search(" + tgt_column + ", 'one','" + str(parent_pk_value) + "', null, '" + tgt_path + ".*." + parent_pk_key + "') as tgt_path" + str(i) where_sql += " tgt_path" + str(i) + " is not null and" i = i + 1 select_sql = "select "+tgt_pk_key+",tgt_path0 from (select "+tgt_pk_key+json_search_sql+" from " + tgt_schema + "." + tgt_table_name +") t where "+where_sql[:-4]+";" self.mysql_cur.execute(select_sql) rows=self.mysql_cur.fetchall()[0] tgt_path_new = ('.'.join(rows[1].split('.')[:-1]))[1:] tgt_pk_value_new=rows[0] return (tgt_path_new,tgt_pk_value_new) except Exception as data: writeErrorLog('_get_tgt_info_for_leaf', 'Error', str(data)) logging.error('_get_tgt_info_for_leaf: ' + str(data)) def _insert_data_from_kfk_for_leaf(self,tgt_schema,tgt_table_name,tgt_table_pk,tgt_table_value,tgt_column,table_insert_data_for_leaf,tgt_path,src_pk_value,insert_data): try: select_tb_column_key = 'select case when coalesce(json_extract(' + tgt_column + ',\'' + tgt_path + '\') , \'\') = \'\' then 1 else 0 end from ' + tgt_schema + "." + tgt_table_name + ' where ' + tgt_table_pk + '=\'' + str(tgt_table_value) + '\';' self.mysql_cur.execute(select_tb_column_key) column_key_data = self.mysql_cur.fetchall() if column_key_data[0][0] == 1:# , tgt_path_new = tgt_path tgt_insert_data=table_insert_data_for_leaf else: tgt_path_new=tgt_path+r'.\"'+str(src_pk_value)+r'\"' tgt_insert_data=insert_data update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path_new + "\',cast(\'" + tgt_insert_data + "\' as json)) where " + tgt_table_pk + "=\'" + str(tgt_table_value) + "\';" self.mysql_cur.execute(update_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_insert_data_from_kfk_for_leaf', 'Error', str(data)) logging.error('_insert_data_from_kfk_for_leaf: ' + str(data)) # ( , ) def _insert_father_data(self,src_table_schema,scr_table_name,insert_data,src_path,root_pk_value,tgt_column,catalog_type,tgt_table_name_for_config): try: src_config_data=self._get_config(scr_table_name,tgt_table_name_for_config,src_table_schema)# ( , ) src_foreign_info=src_config_data['target_column'][tgt_column]['source_foreign_info']# source_foreign_info , , , , if len(json.dumps(src_foreign_info))==2:# , ‘{}’ 2, , source_foreign_info=[], 2 logging.warning(scr_table_name+" :Recursive over") else: for src_pk_key in src_foreign_info:# ( customer org_id,"source_foreign_info": {"org_id": {"customer.organization": "org_id"}}) foreign_table_name_tmp=list(src_foreign_info[src_pk_key].keys())[0] # foreign_table_name(organization),( key , ,order_info, ) foreign_table_schema=foreign_table_name_tmp.split('.')[0] foreign_table_name_tmp=foreign_table_name_tmp.split('.')[1] if '#' in foreign_table_name_tmp: foreign_table_name = foreign_table_name_tmp.replace('#', catalog_type).lower() else: foreign_table_name = foreign_table_name_tmp foreign_table_pk_key=list(src_foreign_info[src_pk_key].values())[0]# foreign_table_key, org_id foreign_datas = self._get_config(foreign_table_name,tgt_table_name_for_config,foreign_table_schema)# , foreign_column = foreign_datas['target_column'][tgt_column]# order_info/order_progress)(organization ) foreign_schema = foreign_datas['src_table_schema']# schema(organization src schema) foreign_table_pk_value = eval(str(insert_data))[src_pk_key] # value( organization kfk ) # ( ) tgt_schema=foreign_datas['tgt_table_schema'] tgt_table_name=foreign_datas['tgt_table_name'] tgt_pk_key=foreign_datas['target_primary_key'] tgt_pk_value=root_pk_value# # , , ( insert ) for foreign_path in foreign_column['target_path']: src_tgt_path=list(foreign_path.keys())[0] foreign_tgt_path = list(foreign_path.values())[0] if re.sub('.\"\S*?\"',r'*',src_path) ==src_tgt_path and re.sub('.\"\S*?\"',r'*',src_path)+'.'+src_pk_key==foreign_tgt_path: next_src_path=src_path+'.'+src_pk_key next_insert_data=self._get_data_from_db(foreign_column,foreign_table_name,foreign_schema,foreign_table_pk_key,foreign_table_pk_value,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,src_path,tgt_column,next_src_path) self._insert_father_data(foreign_table_schema,foreign_table_name,next_insert_data,next_src_path,root_pk_value,tgt_column,catalog_type,tgt_table_name_for_config) else: logging.warning(foreign_table_name + ' :have no next level') except Exception as data: writeErrorLog('_insert_father_data', 'Error', str(data)) logging.error('_insert_father_data: ' + str(data)) # , , , def _get_data_from_db(self,src_tgt_column,src_table_name,src_table_schema,src_table_pk_key,src_table_pk_value,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,src_path,tgt_column,tgt_path): try: result_data = '{' src_column=src_tgt_column['source_column']# if len(src_column)==0: logging(str(src_column)+ ' length equal 0 error ') else: for item in src_column:# sql , select_sql1 = 'concat(\'' select_sql1 += u'"' + item + '":"\',coalesce(' + item + ',\'\'),\'",' select_sql1 = select_sql1[:-1] + '\')' select_sql = "select " + select_sql1 + " from " + src_table_schema + "." + src_table_name + " where " + src_table_pk_key + "=\'" + src_table_pk_value + "\';" # execute SQL self.mes_mysql_cur.execute(select_sql) # fetchone() data = self.mes_mysql_cur.fetchall() if len(data) == 0: result_data += '' else: result_data+=data[0][0]+',' if result_data != '{': tgt_value=result_data[:-1] + '}' else: tgt_value = result_data+'\"'+src_table_pk_key+'\":\"'+src_table_pk_value+'\"}' self._insert_data_from_db(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value, tgt_column,tgt_path, tgt_value) # return tgt_value# , ( * , , ) except Exception as data: writeErrorLog('_get_data_from_db', 'Error', str(data)) logging.error('_get_data_from_db: ' + str(data)) # def _insert_data_from_db(self,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,tgt_column,tgt_path,tgt_value): try: insert_sql="update "+ tgt_schema+"."+tgt_table_name +" set "+ tgt_column+"=json_replace("+tgt_column+",\'"+tgt_path+"\',cast(\'"+tgt_value+"\' as json)) where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" # self.mysql_cur.execute(insert_sql.encode("utf-8").decode("latin1")) self.mysql_cur.execute(insert_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_insert_data_from_db', 'Error', str(data)) logging.error('_insert_data_from_db: ' + str(data)) # , def _update_data(self, tgt_schema, tgt_table_name, tgt_pk_key, src_table_name, update_columns,insert_data, src_path, root_pk_value, tgt_column,catalog_type,tgt_table_name_for_config,src_table_schema): try: # , , json, _get_data_from_db insert_data = json.loads(insert_data) for update_column in update_columns:# if update_column in list(insert_data.keys()): update_column_data = '\"' + insert_data[update_column] + '\"' tgt_path = src_path + '.' + update_column self._insert_data_from_db(tgt_schema, tgt_table_name, tgt_pk_key, root_pk_value,tgt_column, tgt_path, update_column_data) self._insert_father_data(src_table_schema,src_table_name, insert_data, src_path, root_pk_value, tgt_column,catalog_type,tgt_table_name_for_config) except Exception as data: writeErrorLog('_update_data', 'Error', str(data)) logging.error('_update_data: ' + str(data)) # def _exit_consumer(self): self._release_db() sys.exit() def exit_program(signum, frame): logging.info("Received Signal: %s at frame: %s" % (signum, frame)) p.stop_flag = 1 def main(): # p = RH_Consumer() signal.signal(signal.SIGTERM, exit_program) # while True: p._do() main()