python 3はkafkaからデータを取得しjson形式に解析しmysqlに書き込む


今日はpython 3実装kafkaからデータを取得し、jsonフォーマットとして解析し、mysqlに書き込むことができます.一緒に編集に従って、プロジェクトの需要を見てみましょう:kafka解析したログをデータベースの変更記録に取得し、注文のレベルと注文の詳細レベルに従ってデータベースに書き込み、1つの注文のすべての情報は各種の次元情報を含めて1つのjsonに保存し、mysql 5に書き込む.7にあります.
構成情報:
[Global]
kafka_server=xxxxxxxxxxx:9092
kafka_topic=mes
consumer_group=test100
passwd = tracking
port = 3306
host = xxxxxxxxxx
user = track
schema = track
dd_socket =
dd_host = xxxxxxxxxxxx
dd_port = 3306
dd_user = xxxxxxxxx
dd_passwd = xxxxxxxx

コードは长くて丑くて、中途半端で、ただプロセス向けのプログラミングを完成しただけで、対象をやり遂げていないで、すぐに见て、问题があったら私に连络することができます
コード:
#encoding=utf-8
import datetime
import configparser
import re
import pymysql
from vertica_python import connect
import vertica_python
import json
from confluent_kafka import Consumer, KafkaError
import csv
import logging
import os
import time
import signal
import sys
 
#   
logging.basicConfig(filename=os.path.join(os.getcwd(), 'log_tracking.txt'), level=logging.WARN, filemode='a',format='%(asctime)s - %(levelname)s: %(message)s')
 
def writeErrorLog(errSrc, errType, errMsg):
 try:
  v_log_file = 'err_tracking.log';
  v_file = open(v_log_file, 'a')
  v_file.write(datetime.datetime.strftime(datetime.datetime.now(),"%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType +" : " + errMsg + '
') v_file.flush() except Exception as data: v_err_file = open('err_tracking.log', 'a') v_err_file.write(str(data) + '
') v_err_file.write(datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %H:%M:%S") + " - " + errSrc + " - " + errType + " : " + errMsg + '
') v_err_file.flush() v_err_file.close() finally: v_file.close() class RH_Consumer: # , def __init__(self): self.config = configparser.ConfigParser() self.config.read('config.ini') self.host = self.config.get('Global', 'host') self.user = self.config.get('Global', 'user') self.passwd = self.config.get('Global', 'passwd') self.schema = self.config.get('Global', 'schema') self.port = int(self.config.get('Global', 'port')) self.kafka_server = self.config.get('Global', 'kafka_server') self.kafka_topic = self.config.get('Global', 'kafka_topic') self.consumer_group = self.config.get('Global', 'consumer_group') self.dd_host = self.config.get('Global', 'dd_host') self.dd_user = self.config.get('Global', 'dd_user') self.dd_passwd = self.config.get('Global', 'dd_passwd') self.dd_port = int(self.config.get('Global', 'dd_port')) self.dd_socket = self.config.get('Global', 'dd_socket') self.operation_time = datetime.datetime.now() self.stop_flag = 0 self.src_table_name = [] self.__init_db() self.__init_mes_db() self._get_all_src_table() # def __init_db(self): try: self.conn_info = {'host': self.host,'port': self.port,'user': self.user,'password': self.passwd,'db': 'tracking'} self.mysql_db = pymysql.connect(**self.conn_info, charset="utf8" ) self.mysql_cur = self.mysql_db.cursor() except Exception as data: writeErrorLog('__init_db', 'Error', str(data)) # , def __init_mes_db(self): try: self.mes_mysql_db = pymysql.connect(host=self.dd_host, user=self.dd_user, passwd=self.dd_passwd,port=self.dd_port, unix_socket=self.dd_socket, charset="utf8") self.mes_mysql_cur = self.mes_mysql_db.cursor() except Exception as data: writeErrorLog('__init_db', 'Error', str(data)) # def _release_db(self): self.mysql_cur.close() self.mysql_db.close() self.mes_mysql_cur.close() self.mes_mysql_db.close() # ( ) def _get_all_src_table(self): try: # table select_src_table_names = "select distinct src_table_name from tracking.tracking_table_mapping_rule" self.mysql_cur.execute(select_src_table_names) rows = self.mysql_cur.fetchall() for item in rows: self.src_table_name.append(item[0]) return self.src_table_name except Exception as data: writeErrorLog('_get_all_src_table', 'Error', str(data)) logging.error('_get_all_src_table: ' + str(data)) # src def _get_tgt_table_name(self,table_name,table_schema): try: # table (table_name schema|tablename) select_tgt_table_names = "select distinct tgt_table_name from tracking.tracking_table_mapping_rule where src_table_name = '%s' and src_table_schema = '%s'" %(table_name,table_schema) self.mysql_cur.execute(select_tgt_table_names) rows = self.mysql_cur.fetchall() tgt_table_names=[] for item in rows: tgt_table_names.append(item[0]) return tgt_table_names except Exception as data: writeErrorLog('_get_tgt_table_name', 'Error', str(data)) logging.error('_get_tgt_table_name: ' + str(data)) # table_name, json def _get_config(self,table_name,tgt_table_name,table_schema): try: # table (table_name schema|tablename) select_table_config = "select coalesce( src_system, '' ) as src_system,coalesce ( src_table_schema, '' ) as src_table_schema,coalesce ( src_table_name, '' ) as src_table_name,coalesce ( tgt_operation, '{}' ) as tgt_operation,active_flag,coalesce ( tgt_system, '' ) as tgt_system,coalesce ( tgt_table_schema, '' ) as tgt_table_schema,coalesce ( tgt_table_name, '' ) as tgt_table_name from tracking.tracking_table_mapping_rule where src_table_name = '%s' and tgt_table_name='%s' and src_table_schema = '%s' " %(table_name,tgt_table_name,table_schema) self.mysql_cur.execute(select_table_config) rows = self.mysql_cur.fetchall() for item in rows: self.src_system = item[0] self.src_table_schema = item[1] self.src_table_name = item[2] self.tgt_operation = item[3] self.active_flag = item[4] self.tgt_system = item[5] self.tgt_table_schema = item[6] self.tgt_table_name = item[7] # self.tgt_operation self.tgt_operation = eval(self.tgt_operation) result_data = {'src_system':self.src_system, 'src_table_schema':self.src_table_schema, 'src_table_name':self.src_table_name, 'tgt_operation':self.tgt_operation, 'active_flag':self.active_flag, 'tgt_system': self.tgt_system, 'tgt_table_schema': self.tgt_table_schema, 'tgt_table_name': self.tgt_table_name, # self.tgt_operation 'source_primary_key': self.tgt_operation['source_primary_key'], 'source_all_column': self.tgt_operation['source_all_column'], 'target_primary_key': self.tgt_operation['target_primary_key'], 'target_column': self.tgt_operation['target_column'], 'source_level': self.tgt_operation['source_level'] } return result_data except Exception as data: writeErrorLog('_get_config', 'Error', str(data)+':table is not available') logging.error('_get_config: ' + str(data)) # def _do(self): try: # consumer , c = Consumer({ 'bootstrap.servers': self.kafka_server, 'group.id': self.consumer_group, 'default.topic.config': { 'auto.offset.reset': 'smallest', 'enable.auto.commit': False} }) # kafka c.subscribe([self.kafka_topic]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print(msg.error()) break text = msg.value().decode(encoding="utf-8") # kfk_text = eval(text) kfk_text = json.loads(text) # kfk , , # , commit, try: kfk_table = kfk_text['table'] if kfk_table in ['order_mails'] : print(type(text),text) logging.warning('-------------- start exec table time : ' + str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))+'---------------------') kfk_text = str(kfk_text) kfk_text = kfk_text.replace(": None",": ''") kfk_text = eval(kfk_text) kfk_datas = kfk_text['data'] kfk_type = kfk_text['type'] kfk_old = kfk_text['old'] logging.warning(' table_name: '+ str(kfk_table)+ ' table_type : ' + kfk_type) if kfk_type == 'UPDATE': continue print('update') for i,data in enumerate(kfk_datas): kfk_text['data'] = eval("["+str(data)+"]") kfk_text['old'] = eval("[" + str(kfk_old[i]) + "]") self._get_rh_from_kafka(kfk_text) else: print('insert') for data in kfk_datas: kfk_text['data'] = eval("["+str(data)+"]") print(type(kfk_text), kfk_text) self._get_rh_from_kafka(kfk_text) logging.warning('----------------end exec table time : ' + str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))+'---------------') c.commit() except Exception as data: writeErrorLog('_do', 'exce data Error', str(data)) logging.error('_do: ' + str(data)) # if self.stop_flag == 1: self._exit_consumer() c.close() except Exception as data: print(data) writeErrorLog('_do', 'Error', str(data)) logging.error('_do: ' + str(data)) def _trans_path(self,tgt_path): new_tgt_path=tgt_path.replace('.','\".\"').replace('$\".','$.')+'\"' return new_tgt_path # kafka , def _get_rh_from_kafka(self,kfk_text): try: # kfk self.kfk_tb_schema = kfk_text["database"]#schema self.kfk_tb_name = kfk_text["table"]#table_name self.kfk_data = kfk_text['data'][0]#data self.kfk_type = kfk_text['type']# type self.kfk_es = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(float(kfk_text['es'] / 1000)))# # kfk src , ----- tgt_table_names=self._get_tgt_table_name(self.kfk_tb_name,self.kfk_tb_schema) if len(tgt_table_names) != 0: for tgt_table_name_for_config in tgt_table_names: tb_config = self._get_config(self.kfk_tb_name,tgt_table_name_for_config,self.kfk_tb_schema) tgt_pk_key = tb_config['target_primary_key']# (order_no/order_item_id) tgt_schema = tb_config['tgt_table_schema']# schema tgt_table_name = tb_config['tgt_table_name']# ( tracking_order,tracking_order_item) src_table_name = tb_config['src_table_name']# (schema|table_name) src_table_schema = tb_config['src_table_schema'] tgt_columns = tb_config['target_column']# ( order_info ) src_level = tb_config['source_level']# level, root,leaf,father src_pk_key = tb_config['source_primary_key']# src_pk_value = self.kfk_data[src_pk_key]# ( kfk ) tgt_operation=tb_config['tgt_operation']# , # , ,root,leaf,father , insert,update delete if self.kfk_type == 'INSERT': # kfk INSERT,UPDATE,DELETE if src_level == 'root': # root tgt_pk_value = self.kfk_data[tgt_pk_key]# root , ( src_pk_value ) for item in tgt_columns: # item :order_info、order_progress , , tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # , _get_data_from_kfk , {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # dict if str(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item)+" is null,please check") else: tgt_path = list(tgt_columns[item]['target_path'].values())[0]# , (table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column,src_table_name,tgt_pk_value) # , json, # kfk self._insert_data_from_kfk_for_root(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value, item, table_insert_data,tgt_path)# kfk self._insert_father_data(src_table_schema,src_table_name, insert_data, tgt_path, tgt_pk_value, item,catalog_type,tgt_table_name_for_config)# # insert : , (order_no/order_item_id) , ( ), , elif src_level == 'leaf': # kfk INSERT,UPDATE,DELETE parent_pk_info=tgt_operation['parent_pk_key'] for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # # , _get_data_from_kfk , {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # dict if str(tgt_columns[item]['target_path'])=='{}':# ( , , ,root ?) logging.warning(str(item) + " is null,please check") else: tgt_path = list(tgt_columns[item]['target_path'].keys())[0]# (table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column,src_table_name,src_pk_value) # # , json, (parent_tgt_path, tgt_pk_value_new) = self._get_tgt_info_for_leaf(item, tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data)# tgt_path_true=parent_tgt_path+"."+src_table_name# ( ) self._insert_data_from_kfk_for_leaf(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value_new, item, table_insert_data_for_leaf,tgt_path_true,src_pk_value,insert_data) # kafka tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'# ( ) self._insert_father_data(src_table_name, insert_data, tgt_path_new, tgt_pk_value_new, item,catalog_type,tgt_table_name_for_config)# , elif src_level == 'father':# for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # if str(tgt_columns[item]['target_path']) == '{}': logging.warning(str(item) + " is null,please check") else: tgt_paths = list(tgt_columns[item]['target_path'].values()) (table_insert_data, table_insert_data_for_leaf,insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column, src_table_name, src_pk_value) # kafka json if 'product' in src_table_name.lower(): catalog_type='PRODUCT' elif 'service' in src_table_name.lower(): catalog_type='SERVICE' else: catalog_type='0' for tgt_path in tgt_paths: tgt_info_for_father = self._get_tgt_info_for_father(tgt_path, src_pk_key, src_pk_value, tgt_pk_key, tgt_schema,tgt_table_name, item,catalog_type) if len(tgt_info_for_father)==0: logging.warning('can not available the data of the root and leaf table ') else: for i in range(len(tgt_info_for_father)): tgt_pk_value_new = tgt_info_for_father[i][0] tgt_path_new = ('.'.join(tgt_info_for_father[i][1].split('.')[:-1]))[1:] self._insert_data_from_db(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value_new,item, tgt_path_new, insert_data) self._insert_father_data(src_table_name, insert_data, tgt_path_new,tgt_pk_value_new, item, catalog_type,tgt_table_name_for_config) elif self.kfk_type == 'UPDATE':#update # update : , ( ), , if src_level == 'root': # root tgt_pk_value = self.kfk_data[tgt_pk_key]## root , ( src_pk_value ) for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # if str(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item) + " is null,please check") else: update_columns = kfk_text['old'][0]# kfk tgt_path = list(tgt_columns[item]['target_path'].values())[0] (table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column, src_table_name,tgt_pk_value) self._update_data(tgt_schema, tgt_table_name, tgt_pk_key, src_table_name,update_columns, insert_data, tgt_path, tgt_pk_value, item,catalog_type,tgt_table_name_for_config,src_table_schema)# # if 'alter_column' in list(tgt_columns[item].keys()): record_history_column = tgt_columns[item]['alter_column'] self._insert_history_data(update_columns,insert_data,tgt_path,record_history_column,self.kfk_es,item,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value) else: logging.warning(str(item) + " alter_column is not available") # update : , (order_no/order_item_id) , ( ), , elif src_level == 'leaf': ## root parent_pk_info=tgt_operation['parent_pk_key'] for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # if str(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item) + " is null,please check") else: update_columns = kfk_text['old'][0] # tgt_path = list(tgt_columns[item]['target_path'].keys())[0] (table_insert_data,table_insert_data_for_leaf, insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column,src_table_name,src_pk_value) # kafka json (parent_tgt_path, tgt_pk_value_new) = self._get_tgt_info_for_leaf(item, tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data) # tgt_path_true=parent_tgt_path+"."+src_table_name## ( ) tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"'# ( ) self._update_data(tgt_schema, tgt_table_name, tgt_pk_key, src_table_name,update_columns, insert_data, tgt_path_new, tgt_pk_value_new, item,catalog_type,tgt_table_name_for_config,src_table_schema) if 'alter_column' in list(tgt_columns[item].keys()): record_history_column = tgt_columns[item]['alter_column'] self._insert_history_data(update_columns, insert_data, tgt_path_new,record_history_column, self.kfk_es, item, tgt_schema,tgt_table_name, tgt_pk_key, tgt_pk_value_new) else: logging.warning(str(item) + " alter_column is not available") # : , , , , elif src_level == 'father': # kfk pass for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # if str(tgt_columns[item]['target_path']) == '{}': logging.warning(str(item) + " is null,please check") else: update_columns = kfk_text['old'][0] # tgt_paths = list(tgt_columns[item]['target_path'].values()) (table_insert_data, table_insert_data_for_leaf,insert_data,catalog_type) = self._get_data_from_kfk(kfk_text, tgt_column, src_table_name, src_pk_value) # kafka json if 'product' in src_table_name.lower(): catalog_type='PRODUCT' elif 'service' in src_table_name.lower(): catalog_type='SERVICE' else: catalog_type='0' for tgt_path in tgt_paths: tgt_info_for_father = self._get_tgt_info_for_father(tgt_path, src_pk_key, src_pk_value, tgt_pk_key, tgt_schema,tgt_table_name, item,catalog_type) for i in range(len(tgt_info_for_father)): tgt_pk_value_new = tgt_info_for_father[i][0] tgt_path_new = ('.'.join(tgt_info_for_father[i][1].split('.')[:-1]))[1:] self._update_data(tgt_schema, tgt_table_name, tgt_pk_key, src_table_name,update_columns, insert_data, tgt_path_new,tgt_pk_value_new, item,catalog_type,tgt_table_name_for_config,src_table_schema) # :root ,leaf , , elif self.kfk_type == 'DELETE': if src_level == 'root': tgt_pk_value = self.kfk_data[tgt_pk_key] self._delete_data_for_root(tgt_pk_key,tgt_pk_value,tgt_schema,tgt_table_name) elif src_level == 'leaf': # parent_pk_info = tgt_operation['parent_pk_key'] for item in tgt_columns: # item :order_info、order_progress、order_operation、order_adjudgement tgt_column = "{'" + item + "'" + ":" + str(tgt_columns[item]['source_column']) + "}" # tgt_column {"order_info": ["order_no", "cust_no"]} tgt_column = eval(tgt_column) # if str(tgt_columns[item]['target_path'])=='{}': logging.warning(str(item) + " is null,please check") else: tgt_path = list(tgt_columns[item]['target_path'].keys())[0] (parent_tgt_path, tgt_pk_value_new) = self._get_tgt_info_for_leaf(item, tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,self.kfk_data)# tgt_path_true=parent_tgt_path+"."+src_table_name# tgt_path_new=tgt_path_true+r'.\"'+src_pk_value+r'\"' self._delete_data_for_leaf(tgt_schema, tgt_table_name, item, tgt_path_new, tgt_pk_key,tgt_pk_value_new,tgt_path_true) except Exception as data: writeErrorLog('_get_rh_from_kafka', 'Error', str(data)) logging.error('_get_rh_from_kafka: ' + str(data)) def _get_tgt_info_for_father(self,tgt_path,src_pk_key,src_pk_value,tgt_pk_key,tgt_schema,tgt_table_name,tgt_column,catalog_type): try: tgt_path_true = tgt_path + "." + src_pk_key if catalog_type=='0': select_sql_for_father="select "+tgt_pk_key+",json_search("+tgt_column+",\'all\',\'"+src_pk_value+"\',null,\'"+tgt_path_true+"\') from "+tgt_schema+"."+tgt_table_name+" where json_extract(json_extract("+tgt_column+",\'"+tgt_path_true+"\'),\'$[0]\')=\'"+src_pk_value+"\';" else: select_sql_for_father = "select " + tgt_pk_key + ",json_search(" + tgt_column + ",\'all\',\'" + src_pk_value + "\',null,\'" + tgt_path_true + "\') from " + tgt_schema + "." + tgt_table_name + " where json_extract(json_extract(" + tgt_column + ",\'" + tgt_path_true + "\'),\'$[0]\')=\'" + src_pk_value + "\' and json_extract(" + tgt_column + ",\'$." +tgt_table_name+".type=\'"+catalog_type+"\';" self.mysql_cur.execute(select_sql_for_father) tgt_info_for_father=self.mysql_cur.fetchall() return tgt_info_for_father except Exception as data: writeErrorLog('_get_tgt_info_for_father', 'Error', str(data)) logging.error('_get_tgt_info_for_father: ' + str(data)) def _delete_data_for_root(self,tgt_pk_key,tgt_pk_value,tgt_schema,tgt_table_name): try: delete_sql="delete from "+tgt_schema+"."+tgt_table_name+" where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" self.mysql_cur.execute(delete_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_delete_data_for_root', 'Error', str(data)) logging.error('_delete_data_for_root: ' + str(data)) def _delete_data_for_leaf(self,tgt_schema,tgt_table_name,tgt_column,tgt_path,tgt_pk_key,tgt_pk_value,tgt_path_true): try: delete_sql="update "+tgt_schema+"."+tgt_table_name+" set "+tgt_column+"=json_remove("+tgt_column+",\'"+tgt_path+"\') where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" self.mysql_cur.execute(delete_sql) self.mysql_db.commit() select_sql="select json_extract("+tgt_column+",\'"+tgt_path_true+"\') from "+tgt_schema+"."+tgt_table_name+" where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" self.mysql_cur.execute(select_sql) tgt_column_value=self.mysql_cur.fetchall()[0][0] if tgt_column_value==r'{}': table_delete_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_remove(" + tgt_column + ",\'" + tgt_path_true + "\') where " + tgt_pk_key + "=\'" + str(tgt_pk_value) + "\';" self.mysql_cur.execute(table_delete_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_delete_data_for_leaf', 'Error', str(data)) logging.error('_delete_data_for_leaf: ' + str(data)) def _insert_history_data(self,update_columns,insert_data,tgt_path,record_history_column,data_time,tgt_column,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value): try: update_columns_key=list(update_columns.keys()) for item in record_history_column: if item in update_columns_key: tgt_path_for_column = tgt_path + '.alter_data.' + item tgt_path_for_alter = tgt_path + '.alter_data' select_sql_for_alter_column_path = 'select json_extract(' + tgt_column + ',\'' + tgt_path_for_column + '\')' + ' from ' + tgt_schema + '.' + tgt_table_name + ' where ' + tgt_pk_key + '=\'' + str(tgt_pk_value) + '\';' select_sql_for_alter_path = 'select json_extract(' + tgt_column + ',\'' + tgt_path_for_alter + '\')' + ' from ' + tgt_schema + '.' + tgt_table_name + ' where ' + tgt_pk_key + '=\'' + str(tgt_pk_value) + '\';' self.mysql_cur.execute(select_sql_for_alter_column_path) tgt_path_vlaue_for_column = self.mysql_cur.fetchall() self.mysql_cur.execute(select_sql_for_alter_path) tgt_path_vlaue_for_alter = self.mysql_cur.fetchall() old_data = update_columns[item] new_data = eval(insert_data)[item] if tgt_path_vlaue_for_alter[0][0]==None: history_data = '{\"' + item + '\":[{\"old_data\":\"' + str(old_data) + '\",\"new_data\":\"' + str(new_data) + '\",\"time\":\"' + data_time + '\"}]}' insert_sql = "update "+tgt_schema + "." + tgt_table_name + " set " + tgt_column +"=json_insert("+tgt_column+",\'"+tgt_path_for_alter+"\',cast(\'"+history_data+"\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';" else: if tgt_path_vlaue_for_column[0][0]==None: history_data='[{\"old_data\":\"'+str(old_data)+'\",\"new_data\":\"'+str(new_data)+'\",\"time\":\"'+data_time+'\"}]' insert_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path_for_column + "\',cast(\'" + history_data + "\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';" else: history_data='{\"old_data\":\"'+str(old_data)+'\",\"new_data\":\"'+str(new_data)+'\",\"time\":\"'+data_time+'\"}' insert_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_array_append(" + tgt_column + ",\'" + tgt_path_for_column + "\',cast(\'" + history_data + "\' as json)) where " + tgt_pk_key + "= '" + str(tgt_pk_value) + "';" self.mysql_cur.execute(insert_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_insert_history_data', 'Error', str(data)) logging.error('_insert_history_data: ' + str(data)) # kfk , , json def _get_data_from_kfk(self, text, tgt_column,src_table_name,src_table_pk_value): try: tgt_column_json = tgt_column # tgt_column_key = '' for key in tgt_column_json:# tgt_column key json_column_key = '{' for item in tgt_column_json[key]: json_column_key += '"' + item + '":"' + text['data'][0][item].replace('"',r'\\"') + '",' tgt_column_item = json_column_key[:-1] tgt_column_key += tgt_column_item + '},' if 'type' in text['data'][0]: catalog_type=text['data'][0]['type'] else: catalog_type='0' table_insert_data = '{\"' + src_table_name + '\":' + tgt_column_key[:-1] + '}'# {"order":{"order_no":"100"}} insert_data = tgt_column_key[:-1]# {"order_no":"100"} table_insert_data_for_leaf = '{\"' + src_table_pk_value + '\":'+insert_data+'}'# {"100":{"order_no":"100"}} print(insert_data) return (table_insert_data, table_insert_data_for_leaf,insert_data,catalog_type)# except Exception as data: writeErrorLog('_get_data_from_kfk', 'Error', str(data)) logging.error('_get_data_from_kfk: ' + str(data)) def _insert_data_from_kfk_for_root(self,tgt_schema,tgt_table_name,tgt_table_pk,tgt_table_value,tgt_column,table_insert_data,tgt_path): try: # , , , select_tb_count = 'select count(*) from ' + tgt_schema +"."+tgt_table_name + ' where ' + tgt_table_pk + '=\'' + tgt_table_value + '\';' # select_tb_column_count ='select case when coalesce(' + tgt_column + ', \'\') = \'\' then 1 else 0 end from ' + tgt_schema +"."+tgt_table_name + ' where ' + tgt_table_pk + '=\'' + tgt_table_value + '\';' self.mysql_cur.execute(select_tb_count) tb_count = self.mysql_cur.fetchall() self.mysql_cur.execute(select_tb_column_count) tb_column_count = self.mysql_cur.fetchall() # , , (order_no/order_item_id) if tb_count[0][0] == 0: insert_pk_sql = "insert into " + tgt_schema+"."+tgt_table_name + "(" + tgt_table_pk + ") values ('" + tgt_table_value + "')" self.mysql_cur.execute(insert_pk_sql) self.mysql_db.commit() update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "= cast('" + table_insert_data +"' as json) where " + tgt_table_pk + "= '"+ tgt_table_value + "';" else: # , , table_name json if tb_column_count[0][0]==1:# update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "= cast('" + table_insert_data + "' as json) where " + tgt_table_pk + "= '" + tgt_table_value + "';" else: # , , json_insert table_name json update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path + "\',cast(\'" + table_insert_data + "\' as json)) where " + tgt_table_pk + "=\'" + tgt_table_value + "\';" self.mysql_cur.execute(update_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_insert_data_from_kfk_for_root', 'Error', str(data)) logging.error('_insert_data_from_kfk_for_root: ' + str(data)) def _get_tgt_pk_value_for_leaf(self,tgt_table_pk,tgt_schema,tgt_table_name,tgt_column,tgt_path,parent_pk_value): try: select_tgt_pk_sql = "select " + tgt_table_pk + " from " + tgt_schema + "." + tgt_table_name + " where json_extract(" + tgt_column + ",\'" + tgt_path + "\')=\'" + parent_pk_value + "\';" self.mysql_cur.execute(select_tgt_pk_sql) tgt_pk_value = self.mysql_cur.fetchall()[0][0] return tgt_pk_value except Exception as data: writeErrorLog('_get_tgt_pk_value_for_leaf', 'Error', str(data)) logging.error('_get_tgt_pk_value_for_leaf: ' + str(data)) # def _get_tgt_info_for_leaf(self,tgt_column,tgt_path,tgt_schema,tgt_table_name,tgt_pk_key,parent_pk_info,kafka_data): try: if_tgt_path='.'.join(tgt_path.split('.')[:-1]) i=0 json_search_sql='' where_sql='' if if_tgt_path=='$': for parent_pk_key in list(parent_pk_info.keys()): parent_pk_value = kafka_data[parent_pk_info[parent_pk_key]] json_search_sql += ",json_search(" + tgt_column + ", 'one','" + str(parent_pk_value) + "', null, '" + tgt_path + "." + parent_pk_key + "') as tgt_path" + str(i) where_sql += " tgt_path" + str(i) + " is not null and" i = i + 1 else: for parent_pk_key in list(parent_pk_info.keys()): parent_pk_value = kafka_data[parent_pk_info[parent_pk_key]] json_search_sql += ",json_search(" + tgt_column + ", 'one','" + str(parent_pk_value) + "', null, '" + tgt_path + ".*." + parent_pk_key + "') as tgt_path" + str(i) where_sql += " tgt_path" + str(i) + " is not null and" i = i + 1 select_sql = "select "+tgt_pk_key+",tgt_path0 from (select "+tgt_pk_key+json_search_sql+" from " + tgt_schema + "." + tgt_table_name +") t where "+where_sql[:-4]+";" self.mysql_cur.execute(select_sql) rows=self.mysql_cur.fetchall()[0] tgt_path_new = ('.'.join(rows[1].split('.')[:-1]))[1:] tgt_pk_value_new=rows[0] return (tgt_path_new,tgt_pk_value_new) except Exception as data: writeErrorLog('_get_tgt_info_for_leaf', 'Error', str(data)) logging.error('_get_tgt_info_for_leaf: ' + str(data)) def _insert_data_from_kfk_for_leaf(self,tgt_schema,tgt_table_name,tgt_table_pk,tgt_table_value,tgt_column,table_insert_data_for_leaf,tgt_path,src_pk_value,insert_data): try: select_tb_column_key = 'select case when coalesce(json_extract(' + tgt_column + ',\'' + tgt_path + '\') , \'\') = \'\' then 1 else 0 end from ' + tgt_schema + "." + tgt_table_name + ' where ' + tgt_table_pk + '=\'' + str(tgt_table_value) + '\';' self.mysql_cur.execute(select_tb_column_key) column_key_data = self.mysql_cur.fetchall() if column_key_data[0][0] == 1:# , tgt_path_new = tgt_path tgt_insert_data=table_insert_data_for_leaf else: tgt_path_new=tgt_path+r'.\"'+str(src_pk_value)+r'\"' tgt_insert_data=insert_data update_sql = "update " + tgt_schema + "." + tgt_table_name + " set " + tgt_column + "=json_insert(" + tgt_column + ",\'" + tgt_path_new + "\',cast(\'" + tgt_insert_data + "\' as json)) where " + tgt_table_pk + "=\'" + str(tgt_table_value) + "\';" self.mysql_cur.execute(update_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_insert_data_from_kfk_for_leaf', 'Error', str(data)) logging.error('_insert_data_from_kfk_for_leaf: ' + str(data)) # ( , ) def _insert_father_data(self,src_table_schema,scr_table_name,insert_data,src_path,root_pk_value,tgt_column,catalog_type,tgt_table_name_for_config): try: src_config_data=self._get_config(scr_table_name,tgt_table_name_for_config,src_table_schema)# ( , ) src_foreign_info=src_config_data['target_column'][tgt_column]['source_foreign_info']# source_foreign_info , , , , if len(json.dumps(src_foreign_info))==2:# , ‘{}' 2, , source_foreign_info=[], 2 logging.warning(scr_table_name+" :Recursive over") else: for src_pk_key in src_foreign_info:# ( customer org_id,"source_foreign_info": {"org_id": {"customer.organization": "org_id"}}) foreign_table_name_tmp=list(src_foreign_info[src_pk_key].keys())[0] # foreign_table_name(organization),( key , ,order_info, ) foreign_table_schema=foreign_table_name_tmp.split('.')[0] foreign_table_name_tmp=foreign_table_name_tmp.split('.')[1] if '#' in foreign_table_name_tmp: foreign_table_name = foreign_table_name_tmp.replace('#', catalog_type).lower() else: foreign_table_name = foreign_table_name_tmp foreign_table_pk_key=list(src_foreign_info[src_pk_key].values())[0]# foreign_table_key, org_id foreign_datas = self._get_config(foreign_table_name,tgt_table_name_for_config,foreign_table_schema)# , foreign_column = foreign_datas['target_column'][tgt_column]# order_info/order_progress)(organization ) foreign_schema = foreign_datas['src_table_schema']# schema(organization src schema) foreign_table_pk_value = eval(str(insert_data))[src_pk_key] # value( organization kfk ) # ( ) tgt_schema=foreign_datas['tgt_table_schema'] tgt_table_name=foreign_datas['tgt_table_name'] tgt_pk_key=foreign_datas['target_primary_key'] tgt_pk_value=root_pk_value# # , , ( insert ) for foreign_path in foreign_column['target_path']: src_tgt_path=list(foreign_path.keys())[0] foreign_tgt_path = list(foreign_path.values())[0] if re.sub('.\"\S*?\"',r'*',src_path) ==src_tgt_path and re.sub('.\"\S*?\"',r'*',src_path)+'.'+src_pk_key==foreign_tgt_path: next_src_path=src_path+'.'+src_pk_key next_insert_data=self._get_data_from_db(foreign_column,foreign_table_name,foreign_schema,foreign_table_pk_key,foreign_table_pk_value,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,src_path,tgt_column,next_src_path) self._insert_father_data(foreign_table_schema,foreign_table_name,next_insert_data,next_src_path,root_pk_value,tgt_column,catalog_type,tgt_table_name_for_config) else: logging.warning(foreign_table_name + ' :have no next level') except Exception as data: writeErrorLog('_insert_father_data', 'Error', str(data)) logging.error('_insert_father_data: ' + str(data)) # , , , def _get_data_from_db(self,src_tgt_column,src_table_name,src_table_schema,src_table_pk_key,src_table_pk_value,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,src_path,tgt_column,tgt_path): try: result_data = '{' src_column=src_tgt_column['source_column']# if len(src_column)==0: logging(str(src_column)+ ' length equal 0 error ') else: for item in src_column:# sql , select_sql1 = 'concat(\'' select_sql1 += u'"' + item + '":"\',coalesce(' + item + ',\'\'),\'",' select_sql1 = select_sql1[:-1] + '\')' select_sql = "select " + select_sql1 + " from " + src_table_schema + "." + src_table_name + " where " + src_table_pk_key + "=\'" + src_table_pk_value + "\';" # execute SQL self.mes_mysql_cur.execute(select_sql) # fetchone() data = self.mes_mysql_cur.fetchall() if len(data) == 0: result_data += '' else: result_data+=data[0][0]+',' if result_data != '{': tgt_value=result_data[:-1] + '}' else: tgt_value = result_data+'\"'+src_table_pk_key+'\":\"'+src_table_pk_value+'\"}' self._insert_data_from_db(tgt_schema, tgt_table_name, tgt_pk_key, tgt_pk_value, tgt_column,tgt_path, tgt_value) # return tgt_value# , ( * , , ) except Exception as data: writeErrorLog('_get_data_from_db', 'Error', str(data)) logging.error('_get_data_from_db: ' + str(data)) # def _insert_data_from_db(self,tgt_schema,tgt_table_name,tgt_pk_key,tgt_pk_value,tgt_column,tgt_path,tgt_value): try: insert_sql="update "+ tgt_schema+"."+tgt_table_name +" set "+ tgt_column+"=json_replace("+tgt_column+",\'"+tgt_path+"\',cast(\'"+tgt_value+"\' as json)) where "+tgt_pk_key+"=\'"+str(tgt_pk_value)+"\';" # self.mysql_cur.execute(insert_sql.encode("utf-8").decode("latin1")) self.mysql_cur.execute(insert_sql) self.mysql_db.commit() except Exception as data: writeErrorLog('_insert_data_from_db', 'Error', str(data)) logging.error('_insert_data_from_db: ' + str(data)) # , def _update_data(self, tgt_schema, tgt_table_name, tgt_pk_key, src_table_name, update_columns,insert_data, src_path, root_pk_value, tgt_column,catalog_type,tgt_table_name_for_config,src_table_schema): try: # , , json, _get_data_from_db insert_data = json.loads(insert_data) for update_column in update_columns:# if update_column in list(insert_data.keys()): update_column_data = '\"' + insert_data[update_column] + '\"' tgt_path = src_path + '.' + update_column self._insert_data_from_db(tgt_schema, tgt_table_name, tgt_pk_key, root_pk_value,tgt_column, tgt_path, update_column_data) self._insert_father_data(src_table_schema,src_table_name, insert_data, src_path, root_pk_value, tgt_column,catalog_type,tgt_table_name_for_config) except Exception as data: writeErrorLog('_update_data', 'Error', str(data)) logging.error('_update_data: ' + str(data)) # def _exit_consumer(self): self._release_db() sys.exit() def exit_program(signum, frame): logging.info("Received Signal: %s at frame: %s" % (signum, frame)) p.stop_flag = 1 def main(): # p = RH_Consumer() signal.signal(signal.SIGTERM, exit_program) # while True: p._do() main()

私たちのpython学習基地をお勧めします.先輩たちがどのように勉強しているかを見てみましょう.基礎のpythonスクリプト、爬虫類、django、データマイニングなどのプログラミング技術、そしてゼロ基礎からプロジェクトの実戦までの資料を整理して、pythonを学ぶのが好きな仲間にあげます!毎日先輩が定刻にPythonの技術を説明して、いくつかの学習方法と注意しなければならない細部を分かち合って、クリックして私たちのpython学習者の集まりに参加します
以上のpython 3はkafkaからデータを取得しjson形式に解析することを実現し、mysqlに書き込むと編集者が皆さんに共有するすべての内容になります