カフカ、Go、PostgresとGraphicsを使用してページビューの解析システムを作成する
なぜ我々はカフカが必要ですか?
Note: Skip this section if you already know what kafka is and benefits of using kafka😅
データがどのように管理され、どのように抽出、変換、ロード(ETL)技術が使用されるかについて、別のことを考えてください.
以前に、我々は操作データベースを持っていて、定期的な間隔で我々はデータとロードをデータウェアハウスに変えなければなりませんでした.
しかし、現在、データベースは分散データシステムによって拡張/置換されます、そこで、我々はMongoDB、Casandradb、Hadoopなどのような複数のデータベース/データソースを持っています.
ETLツールは、分散システムの場合、データベースとデータウェアハウスを扱う必要があります.ETLのツールはバッチファッションでデータを処理するために構築されました.それらは資源集約的で時間のかかるプロセスである.
この新しい時代によって、アプリケーションは操作データを収集するだけでなく、ログのようなメタデータ、多くのシステムによって収集された分析があります.
また、ストリームデータの増加は、バッチで処理する代わりにgo上のデータを処理する必要がある場合に増加しています.
データのストリーミングのこの新しい世界では、我々は能力の高いボリュームと非常に多様なデータを処理する必要があります.データは通常イベントの形で流れます.様々なデータソースを持つさまざまなソースと株式からイベントを収集するイベントセンターを持っている
カフカは、イベントセンターのこの役割を果たします.そこで、データはキューに入れられて、消費者によって消費されるまで保存されます.
カフカを使用する利点
消費者の障害データの場合には
消費者自身がこのデータを使用する方法を決定することができる今、
大丈夫KAKKAを使用して解析システムの構築を開始します.例を単純化するために、我々はウェブサイトからページ・イベントを記録して、彼らをPostgres DBに保存します.当社のシステム設計は以下のようになります
ステップ1:セットアップカフカサーバー
このデモでは、我々はKafkaサーバーを実行するDockerを使用します.しかし、生産のためにあなたはConfluentまたは他のホストサービスを使用することができます.
analytics-system/docker-compose.yaml
version: "3"
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
ZooKeeperは、構成情報、命名、分散した同期を提供して、グループサービスを提供することを維持するための集中したサービスです.docker-compose.yaml
ステップ2:ブートストラッププロジェクト
docker-compose up
mkdir analytics-system
cd analytics-system
mkdir producer && cd producer
go mod init producer
をつくりますcd .. && mkdir consumer && cd consumer
ステップ3:作成プロデューサー
GQLGenを使用したGraphSQLサーバーの作成
プロデューサーのディレクトリ242479172に変更
go mod init consumer
cd analytics-system/producer
go get github.com/99designs/gqlgen
Note: if you get validation failed error install the dependencies mentioned in the error: example
go get github.com/vektah/gqlparser/[email protected]
go run github.com/99designs/gqlgen init
の起動とテストgo build && ./producer
scalar Int64
type Event {
id: ID!
eventType: String
path: String
search: String
title: String
url: String
userId: String
}
type PingResponse {
message: String!
}
input RegisterKafkaEventInput {
eventType: String!
userId: String!
path: String!
search: String!
title: String!
url: String!
}
type Mutation {
register_kafka_event(event: RegisterKafkaEventInput!): Event!
}
type Query {
ping: PingResponse!
}
ここでは、ページビューイベントを生成するために必要な突然変異と型を定義します.analytics-system/producer/graph/schema.graphqls
の内容をクリアするecho "" > graph/schema.resolvers.go
go run github.com/99designs/gqlgen generate
analytics-system/producer/graph/schema.resolvers.go
またはいくつかの文字列を返すように置き換えます.Note this step is just to test if our server starts correctly
以下の内容による
ping
のHello world
レゾルバを更新してくださいfunc (r *queryResolver) Ping(ctx context.Context) (*model.PingResponse, error) {
res := &model.PingResponse{
Message: "Hello world",
}
return res, nil
}
Ping
graph/schema.resolvers.go
とpingクエリをテストしますquery {
ping {
message
}
}
セットアップKafkaプロデューサーを使用してビルド&プロデューサー
Note: Latest version of confluent-kafka-go doesn't require
librdkafka
, but in case if you face any errors check the following link and install the require dependencies https://github.com/confluentinc/confluent-kafka-go#installing-librdkafka
http://localhost:8080
に加えます.この関数は、常に作成されるトピックを確認します// function to create topic
// sample usage CreateTopic("PAGE_VIEW")
func CreateTopic(topicName string) {
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
if err != nil {
panic(err)
}
defer a.Close()
maxDur, err := time.ParseDuration("60s")
if err != nil {
panic("ParseDuration(60s)")
}
ctx := context.Background()
results, err := a.CreateTopics(
ctx,
// Multiple topics can be created simultaneously
// by providing more TopicSpecification structs here.
[]kafka.TopicSpecification{{
Topic: topicName,
NumPartitions: 1,
}},
// Admin options
kafka.SetAdminOperationTimeout(maxDur))
if err != nil {
log.Printf("Failed to create topic: %v\n", err)
}
log.Println("results:", results)
}
A Topic is a category/feed name to which records are stored and published
confluent-kakfka-go
のgo get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
レゾルバ関数を以下のように置き換えますfunc (r *mutationResolver) RegisterKafkaEvent(ctx context.Context, event model.RegisterKafkaEventInput) (*model.Event, error) {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
if err != nil {
panic(err)
}
defer p.Close()
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := event.EventType
CreateTopic(topic)
currentTimeStamp := fmt.Sprintf("%v", time.Now().Unix())
e := model.Event{
ID: currentTimeStamp,
EventType: &event.EventType,
Path: &event.Path,
Search: &event.Search,
Title: &event.Title,
UserID: &event.UserID,
URL: &event.URL,
}
value, err := json.Marshal(e)
if err != nil {
log.Println("=> error converting event object to bytes:", err)
}
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
}, nil)
// Wait for message deliveries before shutting down
p.Flush(15 * 1000)
return &e, nil
}
graph/schema.resolvers.go
mutation {
register_kafka_event(event: {
eventType: "PAGE_VIEW",
userId: "some_session_id",
path: "/test",
search: "?q=foo"
title: "Kafka Demo",
url: "kafka.demo.com"
}) {
id
eventType
}
}
ハーレイ!🚀 我々の生産者は準備ができている🎉ステップ4:作成消費者
我々はすでに
RegisterKafkaEvent
で消費者プロジェクトを設定している.ここで消費者では、我々はステップ3でKafka Serverによって作り出されたイベントを聞いて、それをPostgres DBに保存します.Note you can process & transform this data based on system that you want to store into.
プロセスを簡素化するために、我々はGolangのために
graph/schema.resolver.go
A SQL ORM(Object Relational Model)を使用しています.セットアップをビルド&プロデューサーとイベントスキーマ
Note: make sure you are in consumer dir:
cd analytics-system/consumer
go get -u gorm.io/gorm
go get -u gorm.io/driver/postgres
analytics-system/consumer
ファイルを作成しますgorm
およびセットアップ・イベント・スキーマのDBに接続するNote: for this example we are using local postgres instance.
gorm
package main
import (
"log"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/schema"
)
type Event struct {
ID string `gorm:"primaryKey"`
UserID string
EventType string
Path string
Search string
Title string
URL string
CreatedAt int64 `gorm:"autoCreateTime"` // same as receivedAt
UpdatedAt int64 `gorm:"autoUpdateTime"`
}
func SaveEvent(db *gorm.DB, event Event) (Event, error) {
result := db.Clauses(
clause.OnConflict{
UpdateAll: true,
Columns: []clause.Column{},
}).Create(&event)
if result.Error != nil {
log.Println(result.Error)
return event, result.Error
}
return event, nil
}
func main() {
dbURL :=
`postgres://localhost:5432/postgres`
ormConfig := &gorm.Config{
NamingStrategy: schema.NamingStrategy{
TablePrefix: "kafka_",
},
}
db, err := gorm.Open(postgres.Open(dbURL), ormConfig)
if err != nil {
panic(`Unable to connect to db`)
}
log.Println("=>Connected to successfully:", db)
err = db.AutoMigrate(&Event{})
if err != nil {
log.Println("Error migrating schema:", err)
}
}
セットアップKafka消費者コード
以下の内容による
package main
import (
"encoding/json"
"fmt"
"log"
"github.com/confluentinc/confluent-kafka-go/kafka"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/schema"
)
type Event struct {
ID string `gorm:"primaryKey"`
UserID string
EventType string
Path string
Search string
Title string
URL string
CreatedAt int64 `gorm:"autoCreateTime"` // same as receivedAt
UpdatedAt int64 `gorm:"autoUpdateTime"`
}
func SaveEvent(db *gorm.DB, event Event) (Event, error) {
result := db.Clauses(
clause.OnConflict{
UpdateAll: true,
Columns: []clause.Column{},
}).Create(&event)
if result.Error != nil {
log.Println(result.Error)
return event, result.Error
}
return event, nil
}
func main() {
dbURL :=
`postgres://localhost:5432/postgres`
ormConfig := &gorm.Config{
NamingStrategy: schema.NamingStrategy{
TablePrefix: "kafka_",
},
}
db, err := gorm.Open(postgres.Open(dbURL), ormConfig)
if err != nil {
panic(`Unable to connect to db`)
}
log.Println("=> Connected to db successfully", db)
err = db.AutoMigrate(&Event{})
if err != nil {
log.Println("Error migrating schema:", err)
}
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"PAGE_VIEW"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
var event Event
err := json.Unmarshal(msg.Value, &event)
if err != nil {
log.Println("=> error converting event object:", err)
}
_, err = SaveEvent(db, event)
if err != nil {
log.Println("=> error saving event to db...")
}
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
}
消費者テストステップ5:テストフロー
main.go
touch analytics-system/consumer/main.go
ブラウザの
mutation {
register_kafka_event(event: {
eventType: "PAGE_VIEW",
userId: "some_session_id",
path: "/test",
search: "?q=foo"
title: "Kafka Demo",
url: "kafka.demo.com"
}) {
id
eventType
}
}
main.go
ハーレイ!🚀 Thatsすべてのページビューの分析イベントシステムは準備ができています.👏
あなたはgithubでフルコードベースをチェックすることができます
Reference
この問題について(カフカ、Go、PostgresとGraphicsを使用してページビューの解析システムを作成する), 我々は、より多くの情報をここで見つけました https://dev.to/lakhansamani/create-page-view-analytics-system-using-kafka-go-postgres-graphql-in-5-steps-1hc1テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol