pythonはkafkaにおけるログデータ監視レポート(釘)を実現する

13544 ワード

# !/usr/bin/env python
# coding:utf-8
#        

from datetime import datetime, timedelta
import os
from dingtalkchatbot.chatbot import DingtalkChatbot
from kafka import KafkaConsumer
import re

#       
messages = {500: {}, 400: {}}


# import logging as log
# log.basicConfig(level=log.DEBUG)

#   ,       5XX   4XX,     uri       。
def sort_dict():
    global messages
    tmp = messages
    for i in tmp:
        tmp[i] = sorted(tmp[i].iteritems(), key=lambda item: item[1], reverse=True)
    tmp = sorted(tmp.iteritems(), key=lambda tmp: tmp[0], reverse=True)
    return tmp


def send_mail():
    try:
        message = ''
        alldict = sort_dict()
        for i in alldict:
            if i[0] == 500:
                t = '5XX'
                message = message + '*******' + t + ':' + '
'
else: t = '4XX' message = message + '*******' + t + ':' + '
'
for j in i[1]: message = message + j[0] + ' ' + str(j[1]) + '
'
# , 。。。。 a = 'curl url -d "business=gateway&content=%s"' % message print os.system(a) # # posturl = "https://oapi.dingtalk.com/robot/send?access_token=" \ # "378cc60b9306b89e53d71cecccfe70**********************" # xiaoding = DingtalkChatbot(posturl) # xiaoding.send_text(msg=message) print 'successfully sent!' return 0 # , except Exception as e: now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print('nnnnnnnn', now, e) return -1 def tactics(time1, send_time, delta): global messages rt = send_time if time1 - send_time > delta: # messages 1 , print 'time1 - send_time > delta' a = send_mail() print a if a == 0: # , messages send_time = time1 rt = send_time messages = {500: {}, 400: {}} print 'clean messages' return rt # messages # def filter(msg): '''t = ['\"server_name: ' + re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] + '\", ' + '\"request: ' + re.findall(r'\"request\": \"(.+?)\",', msg)[0] + '\", ' + '\"upstream_status: ' + re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + '\" ' ]''' '''t = [re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + ' ' + re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] + re.findall(r'\"request\": \"(.+?)\",', msg)[0].split(' ')[1] ]''' # appid, ‘--’ if len(re.findall(r'\"http_x_app_id\": \"(.+?)\",', msg)) != 0: t = [re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + ' ' + re.findall(r'\"http_x_app_id\": \"(.+?)\",', msg)[0] + ' ' + re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] + re.findall(r'\"request\": \"(.+?)\",', msg)[0].split(' ')[1] ] else: t = [re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + ' ' + ' -- ' + ' ' + re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] + re.findall(r'\"request\": \"(.+?)\",', msg)[0].split(' ')[1] ] return t def kafka_cli(bootstrap_servers, source_topic): send_time = datetime.now() # print 'send_time', send_time # delta = timedelta(minutes=30) delta = timedelta(hours=1) # # delta = timedelta(seconds=5) while True: try: consumer = KafkaConsumer(source_topic, bootstrap_servers=bootstrap_servers) for msg in consumer: t = msg.value # time1 = datetime.now() # t = t.replace('\\', '') # , status = re.findall(r'\"upstream_status\": \"(.+?)\",', t) # # print status if status[0] != '-': # status = int(status[0]) if status < 400: print "ok", status else: # print 'err' t = filter(t) # if status > 499: print t[0] if t[0] not in messages[500]: # , messages[500][t[0]] = 0 messages[500][t[0]] += 1 rt = tactics(time1, send_time, delta) # send_time = rt else: print t[0] if t[0] not in messages[400]: messages[400][t[0]] = 0 messages[400][t[0]] += 1 rt = tactics(time1, send_time, delta) send_time = rt # send_time else: print status except Exception as e: now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(now, e) consumer.close() def main(): bootstrap_servers = 'kafkaip:9092' source_topic = 'kibana' kafka_cli(bootstrap_servers, source_topic) if __name__ == '__main__': main()