カフカ、Go、PostgresとGraphicsを使用してページビューの解析システムを作成する


Apache Kafkaは、高性能データパイプラインに使用されるオープンソース分散イベントストリーミングプラットフォームです.これはリアルタイム/バッチデータ処理のために使用することができます.典型的なKafkaシステムは次のようになります

なぜ我々はカフカが必要ですか?


Note: Skip this section if you already know what kafka is and benefits of using kafka😅


データがどのように管理され、どのように抽出、変換、ロード(ETL)技術が使用されるかについて、別のことを考えてください.
以前に、我々は操作データベースを持っていて、定期的な間隔で我々はデータとロードをデータウェアハウスに変えなければなりませんでした.

しかし、現在、データベースは分散データシステムによって拡張/置換されます、そこで、我々はMongoDB、Casandradb、Hadoopなどのような複数のデータベース/データソースを持っています.
ETLツールは、分散システムの場合、データベースとデータウェアハウスを扱う必要があります.ETLのツールはバッチファッションでデータを処理するために構築されました.それらは資源集約的で時間のかかるプロセスである.
この新しい時代によって、アプリケーションは操作データを収集するだけでなく、ログのようなメタデータ、多くのシステムによって収集された分析があります.
また、ストリームデータの増加は、バッチで処理する代わりにgo上のデータを処理する必要がある場合に増加しています.
データのストリーミングのこの新しい世界では、我々は能力の高いボリュームと非常に多様なデータを処理する必要があります.データは通常イベントの形で流れます.様々なデータソースを持つさまざまなソースと株式からイベントを収集するイベントセンターを持っている

カフカは、イベントセンターのこの役割を果たします.そこで、データはキューに入れられて、消費者によって消費されるまで保存されます.

カフカを使用する利点


消費者の障害データの場合には
  • 再取得することができます
    消費者自身がこのデータを使用する方法を決定することができる今、
  • ETLのコストを削減する
  • はデータ
  • を非同期的に流す
  • は、それ自体をストリーミングしながら高ボリュームと多様なデータを処理することができます.
  • 詳細については、どのように大規模なアプリケーションを設計し、カフカを使用する方法については、データについて考える方法については、Naha Narkhedeでこの驚くべきtalkを確認することができます.
    大丈夫KAKKAを使用して解析システムの構築を開始します.例を単純化するために、我々はウェブサイトからページ・イベントを記録して、彼らをPostgres DBに保存します.当社のシステム設計は以下のようになります

    ステップ1:セットアップカフカサーバー


    このデモでは、我々はKafkaサーバーを実行するDockerを使用します.しかし、生産のためにあなたはConfluentまたは他のホストサービスを使用することができます.
  • の作成analytics-system/docker-compose.yaml
  • で以下の内容をペーストします
  • version: "3"
    services:
      zookeeper:
        image: wurstmeister/zookeeper
        container_name: zookeeper
        ports:
        - 2181:2181
    
      kafka:
        image: wurstmeister/kafka
        container_name: kafka
        ports:
        - 9092:9092
        environment:
          KAFKA_ADVERTISED_HOST_NAME: localhost
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    
    
    ZooKeeperは、構成情報、命名、分散した同期を提供して、グループサービスを提供することを維持するための集中したサービスです.
  • 起動Kafkaサーバー:docker-compose.yaml
  • ステップ2:ブートストラッププロジェクト

  • クリエイトレポdocker-compose up
  • 変更の作業ディレクトリmkdir analytics-system
  • 作成者ディレクトリcd analytics-system
  • initプロデューサープロジェクトmkdir producer && cd producer
  • は、消費者ディレクトリgo mod init producerをつくります
  • init消費者プロジェクトcd .. && mkdir consumer && cd consumer
  • ステップ3:作成プロデューサー


    GQLGenを使用したGraphSQLサーバーの作成


    プロデューサーのディレクトリ242479172に変更go mod init consumer
  • ダウンロード依存:cd analytics-system/producer
  • 初期化プロジェクト:go get github.com/99designs/gqlgen
  • Note: if you get validation failed error install the dependencies mentioned in the error: example go get github.com/vektah/gqlparser/[email protected]

  • Grapqlサーバgo run github.com/99designs/gqlgen initの起動とテスト
  • は、次の
  • で最初のBoilerPlatego build && ./producer
    scalar Int64
    
    type Event {
        id: ID!
        eventType: String
        path: String
        search: String
        title: String
        url: String
        userId: String
    }
    
    type PingResponse {
        message: String!
    }
    
    input RegisterKafkaEventInput {
        eventType: String!
        userId: String!
        path: String!
        search: String!
        title: String!
        url: String!
    }
    
    type Mutation {
        register_kafka_event(event: RegisterKafkaEventInput!): Event!
    }
    
    type Query {
        ping: PingResponse!
    }
    
    ここでは、ページビューイベントを生成するために必要な突然変異と型を定義します.analytics-system/producer/graph/schema.graphqlsの内容をクリアする
    echo "" > graph/schema.resolvers.go
    
  • 上記で定義されたGraphSQLファイルに従って新しいリゾルバとクエリを生成します.
  • go run github.com/99designs/gqlgen generate
    
  • クエリリゾルバはanalytics-system/producer/graph/schema.resolvers.goまたはいくつかの文字列を返すように置き換えます.
  • Note this step is just to test if our server starts correctly


    以下の内容によるpingHello worldレゾルバを更新してください
    func (r *queryResolver) Ping(ctx context.Context) (*model.PingResponse, error) {
        res := &model.PingResponse{
            Message: "Hello world",
        }
        return res, nil
    }
    
    
  • ビルドとテストサーバーPing
  • ブラウザーで
  • ヒットgraph/schema.resolvers.goとpingクエリをテストします
  • query {
      ping {
        message
      }
    }
    

    セットアップKafkaプロデューサーを使用してビルド&プロデューサー

  • 依存関係をインストールする
  • Note: Latest version of confluent-kafka-go doesn't require librdkafka, but in case if you face any errors check the following link and install the require dependencies https://github.com/confluentinc/confluent-kafka-go#installing-librdkafka

  • のセットアップカフカtopic :
  • 以下のUtilをhttp://localhost:8080に加えます.この関数は、常に作成されるトピックを確認します
    // function to create topic
    // sample usage CreateTopic("PAGE_VIEW")
    
    func CreateTopic(topicName string) {
        a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
        if err != nil {
            panic(err)
        }
    
        defer a.Close()
    
        maxDur, err := time.ParseDuration("60s")
        if err != nil {
            panic("ParseDuration(60s)")
        }
    
        ctx := context.Background()
        results, err := a.CreateTopics(
            ctx,
            // Multiple topics can be created simultaneously
            // by providing more TopicSpecification structs here.
            []kafka.TopicSpecification{{
                Topic:         topicName,
                NumPartitions: 1,
            }},
            // Admin options
            kafka.SetAdminOperationTimeout(maxDur))
        if err != nil {
            log.Printf("Failed to create topic: %v\n", err)
        }
    
        log.Println("results:", results)
    }
    
    

    A Topic is a category/feed name to which records are stored and published

  • は、カフカイベント
  • を生産しますconfluent-kakfka-gogo get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafkaレゾルバ関数を以下のように置き換えます
    func (r *mutationResolver) RegisterKafkaEvent(ctx context.Context, event model.RegisterKafkaEventInput) (*model.Event, error) {
        p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
        if err != nil {
            panic(err)
        }
    
        defer p.Close()
    
        // Delivery report handler for produced messages
        go func() {
            for e := range p.Events() {
                switch ev := e.(type) {
                case *kafka.Message:
                    if ev.TopicPartition.Error != nil {
                        fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                    } else {
                        fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                    }
                }
            }
        }()
    
        // Produce messages to topic (asynchronously)
        topic := event.EventType
        CreateTopic(topic)
        currentTimeStamp := fmt.Sprintf("%v", time.Now().Unix())
    
        e := model.Event{
            ID:        currentTimeStamp,
            EventType: &event.EventType,
            Path:      &event.Path,
            Search:    &event.Search,
            Title:     &event.Title,
            UserID:    &event.UserID,
            URL:       &event.URL,
        }
        value, err := json.Marshal(e)
        if err != nil {
            log.Println("=> error converting event object to bytes:", err)
        }
        p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(value),
        }, nil)
    
        // Wait for message deliveries before shutting down
        p.Flush(15 * 1000)
    
        return &e, nil
    }
    
  • は、生産されるイベントをテストします:graph/schema.resolvers.go
  • はlocalhostをヒットしました
  • mutation {
      register_kafka_event(event: {
        eventType: "PAGE_VIEW",
        userId: "some_session_id",
        path: "/test",
        search: "?q=foo"
        title: "Kafka Demo",
        url: "kafka.demo.com"
      }) {
        id
        eventType
      }
    }
    
    ハーレイ!🚀 我々の生産者は準備ができている🎉

    ステップ4:作成消費者


    我々はすでにRegisterKafkaEventで消費者プロジェクトを設定している.ここで消費者では、我々はステップ3でKafka Serverによって作り出されたイベントを聞いて、それをPostgres DBに保存します.

    Note you can process & transform this data based on system that you want to store into.


    プロセスを簡素化するために、我々はGolangのために graph/schema.resolver.go A SQL ORM(Object Relational Model)を使用しています.

    セットアップをビルド&プロデューサーとイベントスキーマ


    Note: make sure you are in consumer dir: cd analytics-system/consumer

  • 依存関係をインストールする
  • go get -u gorm.io/gorm
    go get -u gorm.io/driver/postgres 
    
    analytics-system/consumerファイルを作成します
    gormおよびセットアップ・イベント・スキーマのDBに接続する

  • Note: for this example we are using local postgres instance.

    gorm
    package main
    
    import (
        "log"
    
        "gorm.io/driver/postgres"
        "gorm.io/gorm"
        "gorm.io/gorm/clause"
        "gorm.io/gorm/schema"
    )
    
    type Event struct {
        ID        string `gorm:"primaryKey"`
        UserID    string
        EventType string
        Path      string
        Search    string
        Title     string
        URL       string
        CreatedAt int64 `gorm:"autoCreateTime"` // same as receivedAt
        UpdatedAt int64 `gorm:"autoUpdateTime"`
    }
    
    func SaveEvent(db *gorm.DB, event Event) (Event, error) {
        result := db.Clauses(
            clause.OnConflict{
                UpdateAll: true,
                Columns:   []clause.Column{},
            }).Create(&event)
    
        if result.Error != nil {
            log.Println(result.Error)
            return event, result.Error
        }
        return event, nil
    }
    
    func main() {
        dbURL :=
            `postgres://localhost:5432/postgres`
    
        ormConfig := &gorm.Config{
            NamingStrategy: schema.NamingStrategy{
                TablePrefix: "kafka_",
            },
        }
    
        db, err := gorm.Open(postgres.Open(dbURL), ormConfig)
        if err != nil {
            panic(`Unable to connect to db`)
        }
        log.Println("=>Connected to successfully:", db)
    
            err = db.AutoMigrate(&Event{})
        if err != nil {
            log.Println("Error migrating schema:", err)
        }
    }
    
    

    セットアップKafka消費者コード

  • 依存関係をインストールする
    以下の内容による
  • アップデート

  • package main
    
    import (
        "encoding/json"
        "fmt"
        "log"
    
        "github.com/confluentinc/confluent-kafka-go/kafka"
        "gorm.io/driver/postgres"
        "gorm.io/gorm"
        "gorm.io/gorm/clause"
        "gorm.io/gorm/schema"
    )
    
    type Event struct {
        ID        string `gorm:"primaryKey"`
        UserID    string
        EventType string
        Path      string
        Search    string
        Title     string
        URL       string
        CreatedAt int64 `gorm:"autoCreateTime"` // same as receivedAt
        UpdatedAt int64 `gorm:"autoUpdateTime"`
    }
    
    func SaveEvent(db *gorm.DB, event Event) (Event, error) {
        result := db.Clauses(
            clause.OnConflict{
                UpdateAll: true,
                Columns:   []clause.Column{},
            }).Create(&event)
    
        if result.Error != nil {
            log.Println(result.Error)
            return event, result.Error
        }
        return event, nil
    }
    
    func main() {
        dbURL :=
            `postgres://localhost:5432/postgres`
    
        ormConfig := &gorm.Config{
            NamingStrategy: schema.NamingStrategy{
                TablePrefix: "kafka_",
            },
        }
    
        db, err := gorm.Open(postgres.Open(dbURL), ormConfig)
        if err != nil {
            panic(`Unable to connect to db`)
        }
        log.Println("=> Connected to db successfully", db)
    
        err = db.AutoMigrate(&Event{})
        if err != nil {
            log.Println("Error migrating schema:", err)
        }
    
        c, err := kafka.NewConsumer(&kafka.ConfigMap{
            "bootstrap.servers": "localhost",
            "group.id":          "myGroup",
            "auto.offset.reset": "earliest",
        })
        if err != nil {
            panic(err)
        }
    
        c.SubscribeTopics([]string{"PAGE_VIEW"}, nil)
    
        for {
            msg, err := c.ReadMessage(-1)
            if err == nil {
                fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
                var event Event
                err := json.Unmarshal(msg.Value, &event)
                if err != nil {
                    log.Println("=> error converting event object:", err)
                }
    
                _, err = SaveEvent(db, event)
                if err != nil {
                    log.Println("=> error saving event to db...")
                }
            } else {
                // The client will automatically try to recover from all errors.
                fmt.Printf("Consumer error: %v (%v)\n", err, msg)
            }
        }
    }
    
    
    消費者テスト

  • ステップ5:テストフロー

  • 起動していない場合はKafkaサーバを起動します.
  • スタートプロデューサーmain.go
  • は、消費者touch analytics-system/consumer/main.go
  • を開始します
    ブラウザの
  • ヒットhttp://locahost:8080
  • 突然変異を引き起こす
  • mutation {
      register_kafka_event(event: {
        eventType: "PAGE_VIEW",
        userId: "some_session_id",
        path: "/test",
        search: "?q=foo"
        title: "Kafka Demo",
        url: "kafka.demo.com"
      }) {
        id
        eventType
      }
    }
    
  • 消費者ログをチェックします.Postgresで保存されているデータのログを見ることができます.
  • は、Postgresデータmain.go
  • をチェックします
    ハーレイ!🚀 Thatsすべてのページビューの分析イベントシステムは準備ができています.👏
    あなたはgithubでフルコードベースをチェックすることができます