pythonはkafkaにおけるログデータ監視レポート(釘)を実現する
13544 ワード
# !/usr/bin/env python
# coding:utf-8
#
from datetime import datetime, timedelta
import os
from dingtalkchatbot.chatbot import DingtalkChatbot
from kafka import KafkaConsumer
import re
#
messages = {500: {}, 400: {}}
# import logging as log
# log.basicConfig(level=log.DEBUG)
# , 5XX 4XX, uri 。
def sort_dict():
global messages
tmp = messages
for i in tmp:
tmp[i] = sorted(tmp[i].iteritems(), key=lambda item: item[1], reverse=True)
tmp = sorted(tmp.iteritems(), key=lambda tmp: tmp[0], reverse=True)
return tmp
def send_mail():
try:
message = ''
alldict = sort_dict()
for i in alldict:
if i[0] == 500:
t = '5XX'
message = message + '*******' + t + ':' + '
'
else:
t = '4XX'
message = message + '*******' + t + ':' + '
'
for j in i[1]:
message = message + j[0] + ' ' + str(j[1]) + '
'
# , 。。。。
a = 'curl url -d "business=gateway&content=%s"' % message
print os.system(a)
#
# posturl = "https://oapi.dingtalk.com/robot/send?access_token=" \
# "378cc60b9306b89e53d71cecccfe70**********************"
# xiaoding = DingtalkChatbot(posturl)
# xiaoding.send_text(msg=message)
print 'successfully sent!'
return 0
# ,
except Exception as e:
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('nnnnnnnn', now, e)
return -1
def tactics(time1, send_time, delta):
global messages
rt = send_time
if time1 - send_time > delta:
# messages 1 ,
print 'time1 - send_time > delta'
a = send_mail()
print a
if a == 0:
# , messages
send_time = time1
rt = send_time
messages = {500: {}, 400: {}}
print 'clean messages'
return rt
# messages
#
def filter(msg):
'''t = ['\"server_name: ' + re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] + '\", '
+ '\"request: ' + re.findall(r'\"request\": \"(.+?)\",', msg)[0] + '\", '
+ '\"upstream_status: ' + re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + '\" '
]'''
'''t = [re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + ' '
+ re.findall(r'\"server_name\": \"(.+?)\",', msg)[0]
+ re.findall(r'\"request\": \"(.+?)\",', msg)[0].split(' ')[1]
]'''
# appid, ‘--’
if len(re.findall(r'\"http_x_app_id\": \"(.+?)\",', msg)) != 0:
t = [re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + ' '
+ re.findall(r'\"http_x_app_id\": \"(.+?)\",', msg)[0] + ' '
+ re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] +
re.findall(r'\"request\": \"(.+?)\",', msg)[0].split(' ')[1]
]
else:
t = [re.findall(r'\"upstream_status\": \"(.+?)\",', msg)[0] + ' '
+ ' -- ' + ' '
+ re.findall(r'\"server_name\": \"(.+?)\",', msg)[0] +
re.findall(r'\"request\": \"(.+?)\",', msg)[0].split(' ')[1]
]
return t
def kafka_cli(bootstrap_servers, source_topic):
send_time = datetime.now()
#
print 'send_time', send_time
# delta = timedelta(minutes=30)
delta = timedelta(hours=1)
#
# delta = timedelta(seconds=5)
while True:
try:
consumer = KafkaConsumer(source_topic, bootstrap_servers=bootstrap_servers)
for msg in consumer:
t = msg.value
#
time1 = datetime.now()
#
t = t.replace('\\', '')
# ,
status = re.findall(r'\"upstream_status\": \"(.+?)\",', t)
#
# print status
if status[0] != '-':
#
status = int(status[0])
if status < 400:
print "ok", status
else:
# print 'err'
t = filter(t)
#
if status > 499:
print t[0]
if t[0] not in messages[500]:
# ,
messages[500][t[0]] = 0
messages[500][t[0]] += 1
rt = tactics(time1, send_time, delta)
#
send_time = rt
else:
print t[0]
if t[0] not in messages[400]:
messages[400][t[0]] = 0
messages[400][t[0]] += 1
rt = tactics(time1, send_time, delta)
send_time = rt
# send_time
else:
print status
except Exception as e:
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(now, e)
consumer.close()
def main():
bootstrap_servers = 'kafkaip:9092'
source_topic = 'kibana'
kafka_cli(bootstrap_servers, source_topic)
if __name__ == '__main__':
main()