kafkaから情報を取得してredisクラスタに書き込む
4032 ワード
1.環境
2.問題の説明
GWゲートウェイ装置から送信されたradiusメッセージを受信し、レベルは2000 TPS程度で、本来はflumeでudpメッセージを受信しようと思っていた.テスト時にsyslogudp方式しか使用できないことが判明しましたが、解析されたメッセージは多くのフィールドが失われ、flumeが入庫するとメッセージ情報が遮断されていることがわかりました.gitからいくつかのudpプラグインを調べたが、javaをやらないでjarパッケージにコンパイルしなければならないので、自分で書くことにした.現在,kafkaから単機でデータを取得し解析する効率を3 W/sとし,redis入庫を加えて1.5 w/sに低下させた.
2.kafka接続
本当はtravisjeffery/jockoを使いたかったのですがgroupがサポートされていないことに気づきましたのでconfluentinc/confluent-kafka-goは星があまり多くありませんが、このConfluent's Apache Kafka Golang clientを見てわかります.
このモジュールを使用する前にlibrdkafkaをインストールする必要があります.インストール時に発生します.
make[1]: “/slview/librdkafka-master/src-cpp”
make -C examples
make[1]: “/slview/librdkafka-master/examples”
gcc -g -O2 -fPIC -Wall -Wsign-compare -Wfloat-equal -Wpointer-arith -Wcast-align -I../src rdkafka_example.c -o rdkafka_example \
../src/librdkafka.a -lpthread -lz -lssl -lrt
/bin/ld: ../src/librdkafka.a(rdkafka_sasl_scram.o): undefined reference to symbol 'BIO_read'
/bin/ld: note: 'BIO_read' is defined in DSO /usr/local/ssl/lib/libcrypto.so.1.0.0 so try adding it to the linker command line
/usr/local/ssl/lib/libcrypto.so.1.0.0: could not read symbols:
collect2: :ld 1
make[1]: *** [rdkafka_example] 1
make[1]: “/slview/librdkafka-master/examples”
Googleは長い間解決策を発見していないので、最後には避けましょう.sslをコンパイルしないと成功する
./configure --disable-ssl
接続方法:
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"group.id": group,
"session.timeout.ms": 6000,
"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "smallest"}})
3.redis接続
redis 3.2はクラスタの確立に使用するイントラネットアドレスであり、対応するイントラネットアドレスを使用して接続できず、構成問題であるかどうか不明である
client := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{"172.17.50.73:6379", "172.17.50.74:6379", "172.17.50.75:6379", "172.17.50.76:6379", "172.17.50.77:6379", "172.17.50.78:6379"},
ReadOnly: false,
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
PoolSize: 6000,
PoolTimeout: 30 * time.Second,
})
if client == nil {
return client, ConnectError
}
4.radiusメッセージ解析
bronze 1 man/radiusモジュールを使用します.コードは次のとおりです.
radinfo := make(map[string]interface{})
pac, err := radius.DecodePacket(secret, p)
if err != nil {
log.Println("[pac.Decode]", err)
return
}
for i := range pac.AVPs {
Type := pac.AVPs[i].Type
if Type == radius.EventTimestamp {
Value := pac.AVPs[i].Decode(pac).([]uint8)
timestamp := binary.BigEndian.Uint32([]byte(Value))
timestr := time.Unix(int64(timestamp), 0).Format("2006-01-02 15:04:05")
Typestr := fmt.Sprintf("%s", Type)
radinfo[Typestr] = timestr
//fmt.Printf("Type: %s Value: %s
", Type, timestr)
} else {
Typestr := fmt.Sprintf("%s", Type)
if _, ok := Item[Typestr]; ok {
Value := pac.AVPs[i].GetValue()
radinfo[Typestr] = Value
}
}
}
5.入庫後の結果
172.17.50.76:6379> hgetall 86xxxxxxxxxx
1) "SessionID"
2) "73aa0e5b1fca2b97"
3) "APN"
4) "ctnet"
5) "Status"
6) "Stop"
7) "StartTime"
8) ""
9) "MDN"
10) "xxxxxxxxx"
11) "IPAddr"
12) "x.x.x.x"
13) "StopTime"
14) "2017-05-06 14:45:44"
15) "TerminateCause"
16) "UserRequest"
17) "Duration"
18) "2846"
19) "OutputOctets"
20) "0"
21) "InputOctets"
22) "0"