ELK---PythonはMysqlデータをESにインポートし、Geoipデータ型書き込みを実現
使用環境:Python 3.6
Pyパッケージの準備: elasticsearch、geoip2、pymysql
次のコード
1.必要なモジュールのインポート
2.Mysqlデータを取得する関数をカプセル化し、指定したパラメータで入力し、結果セットを返す
3.IPアドレスに対応する地理情報を取得する関数をカプセル化する
4.一括挿入ESデータをカプセル化する関数
5.最終実行主体関数(以下私のサンプルコード、具体的なパラメータ値は、中国語の注釈によって自分のパラメータに変更すればよい)
结尾:过程の中で実はやはり多くの问题に出会って、これは最终的に完全に処理した后に运行することができる结果で、疑问の童靴は交流を评论することができます!
---------オリジナル、素手で打って、あなたに役に立つと思ったら、いいね!
Pyパッケージの準備: elasticsearch、geoip2、pymysql
次のコード
1.必要なモジュールのインポート
# encoding:utf-8
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import geoip2.database
import pymysql
2.Mysqlデータを取得する関数をカプセル化し、指定したパラメータで入力し、結果セットを返す
def get_data(host, port, user, password, database, sql):
"""
:param host:
:param port:
:param user:
:param password:
:param database:
:param sql:
:return:
"""
db = pymysql.connect(host=host, port=port, user=user, password=password, database=database)
cursor = db.cursor()
cursor.execute(sql)
dt = cursor.fetchall()
db.close()
return dt
3.IPアドレスに対応する地理情報を取得する関数をカプセル化する
def select_ip(ip):
"""
:param ip:Ip
:return: Body
"""
reader = geoip2.database.Reader(
'F:\logstash-7.3.1\config\GeoLite2-City.mmdb') # logstash GeoIP ,
data = reader.city(ip)
body_son = {}
body_son["country_code2"] = data.country.iso_code #
body_son["longitude"] = data.location.longitude #
body_son["timezone"] = data.location.time_zone #
body_son["ip"] = data.traits.ip_address # Ip
body_son["country_code3"] = data.registered_country.iso_code #
body_son[
"region_code"] = data.subdivisions.most_specific.iso_code # subdivisions most_specific ,
body_son["latitude"] = data.location.latitude #
body_son["country_name"] = data.country.names['en'] #
body_son["location"] = {"lon": data.location.longitude, "lat": data.location.latitude} #
body_son["continent_code"] = data.continent.code #
return body_son
4.一括挿入ESデータをカプセル化する関数
def insert_batch_index(server, port, actions):
"""
:param server:
:param port:
:param actions:
"""
es = Elasticsearch([server], prot=port)
res = helpers.bulk(es, actions)
5.最終実行主体関数(以下私のサンプルコード、具体的なパラメータ値は、中国語の注釈によって自分のパラメータに変更すればよい)
# ES
def main():
# Mysql
login_data = GetDataMethod.get_data(host='192.168.40.9', port=3306, user='root', password='123456',
database='wechat',
sql='select LoginID,LanIP,OpenTime,TotalAmount from sys_re_agentlogin where LoginID<=1500')
es_body = {
"mappings": {
# es 7 , _doc,
"properties": {
"LoginID": {"type": "long"},
"LanIP": {"type": "text"},
"OpenTime": {"type": "date"},
"TotalAmount": {"type": "text"},
"geoip.ip": {"type": "ip"},
"geoip.location": {"type": "geo_point"}
}
}
}
#
EsMethod.create_index('py_wechat_login', '192.168.99.100', 9200, es_body)
#
actions = []
for dt_son in login_data:
action = {
"_index": 'py_wechat_login', #
"_type": '_doc', # , _doc,
"_id": dt_son[0], # ID
"_source": {
"LoginID": dt_son[0],
"LanIP": dt_son[1],
"OpenTime": dt_son[2],
"TotalAmount": dt_son[3],
"geoip": select_ip(dt_son[1])
}
}
actions.append(action)
insert_batch_index('192.168.99.100', 9200, actions)
结尾:过程の中で実はやはり多くの问题に出会って、これは最终的に完全に処理した后に运行することができる结果で、疑问の童靴は交流を评论することができます!
---------オリジナル、素手で打って、あなたに役に立つと思ったら、いいね!