ELK---PythonはMysqlデータをESにインポートし、Geoipデータ型書き込みを実現


使用環境:Python 3.6
Pyパッケージの準備:  elasticsearch、geoip2、pymysql
次のコード
1.必要なモジュールのインポート
# encoding:utf-8

from elasticsearch import Elasticsearch
from elasticsearch import helpers
import geoip2.database
import pymysql

2.Mysqlデータを取得する関数をカプセル化し、指定したパラメータで入力し、結果セットを返す
    def get_data(host, port, user, password, database, sql):
        """
        :param host:      
        :param port:   
        :param user:   
        :param password:  
        :param database:   
        :param sql:    
        :return:   
        """
        db = pymysql.connect(host=host, port=port, user=user, password=password, database=database)
        cursor = db.cursor()
        cursor.execute(sql)
        dt = cursor.fetchall()
        db.close()
        return dt

3.IPアドレスに対応する地理情報を取得する関数をカプセル化する
def select_ip(ip):
    """
    :param ip:Ip  
    :return:     Body  
    """
    reader = geoip2.database.Reader(
        'F:\logstash-7.3.1\config\GeoLite2-City.mmdb')  #          logstash GeoIP    ,           
    data = reader.city(ip)
    body_son = {}
    body_son["country_code2"] = data.country.iso_code  #   
    body_son["longitude"] = data.location.longitude  #   
    body_son["timezone"] = data.location.time_zone  #   
    body_son["ip"] = data.traits.ip_address  # Ip  
    body_son["country_code3"] = data.registered_country.iso_code  #     
    body_son[
        "region_code"] = data.subdivisions.most_specific.iso_code  #      subdivisions        most_specific   ,      
    body_son["latitude"] = data.location.latitude  #   
    body_son["country_name"] = data.country.names['en']  #     
    body_son["location"] = {"lon": data.location.longitude, "lat": data.location.latitude}  #     
    body_son["continent_code"] = data.continent.code  #   
    return body_son

4.一括挿入ESデータをカプセル化する関数
    def insert_batch_index(server, port, actions):
        """
        :param server:      
        :param port:    
        :param actions:    
        """
        es = Elasticsearch([server], prot=port)
        res = helpers.bulk(es, actions)

5.最終実行主体関数(以下私のサンプルコード、具体的なパラメータ値は、中国語の注釈によって自分のパラメータに変更すればよい)
#        ES
def main():
    #   Mysql    
    login_data = GetDataMethod.get_data(host='192.168.40.9', port=3306, user='root', password='123456',
                                        database='wechat',
                                        sql='select LoginID,LanIP,OpenTime,TotalAmount from  sys_re_agentlogin where LoginID<=1500')
    es_body = {
        "mappings": {
            # es 7             ,       _doc,            
            "properties": {
                "LoginID": {"type": "long"},
                "LanIP": {"type": "text"},
                "OpenTime": {"type": "date"},
                "TotalAmount": {"type": "text"},
                "geoip.ip": {"type": "ip"},
                "geoip.location": {"type": "geo_point"}
            }
        }
    }
    #     
    EsMethod.create_index('py_wechat_login', '192.168.99.100', 9200, es_body)
    #     
    actions = []
    for dt_son in login_data:
        action = {
            "_index": 'py_wechat_login', #    
            "_type": '_doc',  #    ,  _doc,      
            "_id": dt_son[0], #  ID
            "_source": {
                "LoginID": dt_son[0],
                "LanIP": dt_son[1],
                "OpenTime": dt_son[2],
                "TotalAmount": dt_son[3],
                "geoip": select_ip(dt_son[1])
            }
        }
        actions.append(action)
    insert_batch_index('192.168.99.100', 9200, actions)

结尾:过程の中で実はやはり多くの问题に出会って、これは最终的に完全に処理した后に运行することができる结果で、疑问の童靴は交流を评论することができます!
---------オリジナル、素手で打って、あなたに役に立つと思ったら、いいね!