kafkaから情報を取得してredisクラスタに書き込む

4032 ワード

1.環境

  • kafka kafka-0.10.2.1-src.tgz (asc, md5)
  • redisクラスタバージョン:3.2.1
  • redisサーバ:6台、CPU情報:8 Intel(R)Xeon(R)CPU E [email protected] GHzメモリ:32 G
  • 2.問題の説明


    GWゲートウェイ装置から送信されたradiusメッセージを受信し、レベルは2000 TPS程度で、本来はflumeでudpメッセージを受信しようと思っていた.テスト時にsyslogudp方式しか使用できないことが判明しましたが、解析されたメッセージは多くのフィールドが失われ、flumeが入庫するとメッセージ情報が遮断されていることがわかりました.gitからいくつかのudpプラグインを調べたが、javaをやらないでjarパッケージにコンパイルしなければならないので、自分で書くことにした.現在,kafkaから単機でデータを取得し解析する効率を3 W/sとし,redis入庫を加えて1.5 w/sに低下させた.

    2.kafka接続


    本当はtravisjeffery/jockoを使いたかったのですがgroupがサポートされていないことに気づきましたのでconfluentinc/confluent-kafka-goは星があまり多くありませんが、このConfluent's Apache Kafka Golang clientを見てわかります.
    このモジュールを使用する前にlibrdkafkaをインストールする必要があります.インストール時に発生します.
    make[1]:  “/slview/librdkafka-master/src-cpp”
    make -C examples
    make[1]:  “/slview/librdkafka-master/examples”
    gcc -g -O2 -fPIC -Wall -Wsign-compare -Wfloat-equal -Wpointer-arith -Wcast-align -I../src rdkafka_example.c -o rdkafka_example  \
            ../src/librdkafka.a -lpthread -lz -lssl -lrt
    /bin/ld: ../src/librdkafka.a(rdkafka_sasl_scram.o): undefined reference to symbol 'BIO_read'
    /bin/ld: note: 'BIO_read' is defined in DSO /usr/local/ssl/lib/libcrypto.so.1.0.0 so try adding it to the linker command line
    /usr/local/ssl/lib/libcrypto.so.1.0.0: could not read symbols:  
    collect2:  :ld   1
    make[1]: *** [rdkafka_example]   1
    make[1]:  “/slview/librdkafka-master/examples”
    

    Googleは長い間解決策を発見していないので、最後には避けましょう.sslをコンパイルしないと成功する
    ./configure --disable-ssl
    

    接続方法:
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
            "bootstrap.servers":    broker,
            "group.id":             group,
            "session.timeout.ms":   6000,
            "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "smallest"}})
    

    3.redis接続


    redis 3.2はクラスタの確立に使用するイントラネットアドレスであり、対応するイントラネットアドレスを使用して接続できず、構成問題であるかどうか不明である
    client := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs:        []string{"172.17.50.73:6379", "172.17.50.74:6379", "172.17.50.75:6379", "172.17.50.76:6379", "172.17.50.77:6379", "172.17.50.78:6379"},
    ReadOnly:    false,
    DialTimeout:  10 * time.Second,
    ReadTimeout:  30 * time.Second,
    WriteTimeout: 30 * time.Second,
    PoolSize:    6000,
    PoolTimeout:  30 * time.Second,
    })
    
    if client == nil {
    return client, ConnectError
    }
    

    4.radiusメッセージ解析


    bronze 1 man/radiusモジュールを使用します.コードは次のとおりです.
    radinfo := make(map[string]interface{})
    
    pac, err := radius.DecodePacket(secret, p)
    if err != nil {
        log.Println("[pac.Decode]", err)
        return
    }
    
    for i := range pac.AVPs {
        Type := pac.AVPs[i].Type
        if Type == radius.EventTimestamp {
            Value := pac.AVPs[i].Decode(pac).([]uint8)
            timestamp := binary.BigEndian.Uint32([]byte(Value))
            timestr := time.Unix(int64(timestamp), 0).Format("2006-01-02 15:04:05")
            Typestr := fmt.Sprintf("%s", Type)
            radinfo[Typestr] = timestr
            //fmt.Printf("Type: %s    Value: %s
    ", Type, timestr) } else { Typestr := fmt.Sprintf("%s", Type) if _, ok := Item[Typestr]; ok { Value := pac.AVPs[i].GetValue() radinfo[Typestr] = Value } } }

    5.入庫後の結果

    172.17.50.76:6379> hgetall 86xxxxxxxxxx
     1) "SessionID"
     2) "73aa0e5b1fca2b97"
     3) "APN"
     4) "ctnet"
     5) "Status"
     6) "Stop"
     7) "StartTime"
     8) ""
     9) "MDN"
    10) "xxxxxxxxx"
    11) "IPAddr"
    12) "x.x.x.x"
    13) "StopTime"
    14) "2017-05-06 14:45:44"
    15) "TerminateCause"
    16) "UserRequest"
    17) "Duration"
    18) "2846"
    19) "OutputOctets"
    20) "0"
    21) "InputOctets"
    22) "0"