kafkaを触ったメモ


kafkaとは?特徴は?

  • 分散型イベント駆動型プラットフォーム
  • PUB/SUB(Producer/Consumer) でイベント駆動開発ができる
  • message順番を守ることができる1
  • message保持方法を必要に応じて設定できる2
    • 時間が経過すると、自動削除
    • logデータが大きくなると、自動削除
    • データの上書きで最新のデータのみを保持
    • 永遠に保持し、time travelもできる
  • Consumer側でどこまでデータをconsumeしたのかを管理できる3
    • 自動
    • 手動
  • データのlossをコントロールできる4
  • transactionサポート5
    • atomicity, consistencyを実現するため、データを大きなjsonに入れて、PUBする方法もある

kafkaでシステムレベルでのイベント駆動型開発イメージ

kafkaのlogoが表すように、様々なサービスがkafka経由でリアルタイムで情報を連携できる

システムを設定するときの留意点

  • messageの順番を守るべきか
  • ある程度のデータlossは許すべきか
  • 重複データをconsumeするときに、処理を如何するか
  • schema lessなので、topicやデータ構造のドキュメントを書くべき
  • データのバージョン管理
    • データ自体にバージョンを入れるか、topicでバージョン管理するかを決める必要がある

dockerでKafka serverを作る

wurstmeister/kafka-dockerを使用

docker-compose -f docker-compose-single-broker.yml up

rails + karafka でmessageの交換を試す

  • rails appを作成
rails new karafka_example
  • Gemfileに以下のgemを追加し、bundle installする
gem 'karafka'
  • Karafkaの設定を作成
bundle exec karafka install

以下のファイルが作成される

app/consumers/application_consumer.rb
app/responders/application_responder.rb
karafka.rb
  • karafka.rbを編集
ENV['RAILS_ENV'] ||= 'development'
ENV['KARAFKA_ENV'] = ENV['RAILS_ENV']
require ::File.expand_path('../config/environment', __FILE__)
Rails.application.eager_load!

class KarafkaApp < Karafka::App
  setup do |config|
    config.kafka.seed_brokers = %w[kafka://127.0.0.1:9092]
    config.client_id = 'example_app'
    config.backend = :inline
    config.batch_fetching = true
    config.logger = Rails.logger
  end

  Karafka.monitor.subscribe(Karafka::Instrumentation::Listener)

  consumer_groups.draw do
    consumer_group :bigger_group do
      batch_fetching false

      topic :users do
        consumer UsersConsumer
      end
    end
  end
end

KarafkaApp.boot!
  • consumerを作成する

app/consumers/users_consumers.rb

class UsersConsumer < ApplicationConsumer
  def consume
    Karafka.logger.info "New [User] event: #{params}"
  end
end
  • responderを作成する

app/responders/users_responder.rb

class UsersResponder < ApplicationResponder
  topic :users

  def respond(event_payload)
    respond_to :users, event_payload
  end
end
  • karafka server起動
bundle exec karafka server
  • rails consoleでデータ作成を試す
UsersResponder.call({ event_name: "user_created", payload: { id: 1 } }

関連資料