KafkaからTreasure DataにブリッジするDocker Compose
td-agentコンテナとKafka Consumerコンテナを使いKafkaからTreasure DataへブリッジするDocker Composeサービスを起動します。別のポストではPySpark Streamingのウィンドウ集計した結果をKafkaのトピックに出力するコードを書きました。このストリーム処理はデータパイプラインの前処理やエンリッチメントに相当します。後続にビッグデータのバッチ処理を想定してTreasure Dataに保存します。
Docker Compose
最初に今回作成するプロジェクトのディレクトリ構成です。
$ tree -a
.
├── docker-compose.yml
├── .env
├── .gitignore
├── kafka-bridge
│ ├── Dockerfile
│ ├── fluentd-consumer.properties
│ └── log4j.properties
└── td-agent2
├── Dockerfile
└── td-agent.conf
2 directories, 9 files
docker-compose.yml
td-agentとKafka ConsumerサービスはそれぞれDockefileを書いてビルドします。Kafkaはlandoop/fast-data-devを利用します。Confluent Open Sourceを同梱しているためKafkaとZooKeeperも起動します。
version: '2'
services:
kafka-stack:
image: landoop/fast-data-dev
environment:
- FORWARDLOGS=0
- RUNTESTS=0
- ADV_HOST=<仮想マシンのパブリックIPアドレス>
ports:
- 3030:3030
- 9092:9092
- 2181:2181
- 8081:8081
td-agent2:
build: ./td-agent2
env_file:
- ./.env
ports:
- 24224:24224
kafka-bridge:
build: ./kafka-bridge
depends_on:
- td-agent
.env
Treasure Dataの接続情報は環境変数ファイルの.env
に記述しDocker Composeから読み込みます。
TD_API_KEY=<YOUR API KEY>
TD_ENDPOINT=<TD ENDPOINT>
td-agent2
td-agentのDockerイメージを作成します。
Dockerfile
Overview of Server-Side Agent (td-agent)のインストール手順に従います。install-ubuntu-xenial-td-agent2.shの中ではsudoも必要です。
FROM ubuntu:xenial
RUN apt-get update && apt-get install sudo curl -y
RUN curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-xenial-td-agent2.sh | sh
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
ADD td-agent.conf /etc/td-agent/
EXPOSE 24224
CMD ["/usr/sbin/td-agent"]
td-agent.conf
td-agent.confは環境変数を参照することができます。Treasure Dataへの接続情報を.env
ファイルから取得します。
<match td.*.*>
@type tdlog
endpoint "#{ENV['TD_ENDPOINT']}"
apikey "#{ENV['TD_API_KEY']}"
auto_create_table
buffer_type file
buffer_path /var/log/td-agent/buffer/td
use_ssl true
num_threads 8
</match>
<source>
@type forward
</source>
kafka-fluentd-consumer
KafkaからTreasure Dataへのブリッジにはkafka-fluentd-consumerのJarを利用します。
Dockerfile
コンパイル済のkafka-fluentd-consumer-0.3.1-all.jarをダウンロードします。
FROM java:8-jre
ARG KAFKA_FLUENTD_CONSUMER_VERSION=0.3.1
WORKDIR /app
RUN wget -q -O kafka-fluentd-consumer-all.jar https://github.com/treasure-data/kafka-fluentd-consumer/releases/download/v$KAFKA_FLUENTD_CONSUMER_VERSION/kafka-fluentd-consumer-$KAFKA_FLUENTD_CONSUMER_VERSION-all.jar
ADD log4j.properties .
ADD fluentd-consumer.properties .
CMD ["java", "-Dlog4j.configuration=file:///app/log4j.properties", "-jar", "kafka-fluentd-consumer-all.jar", "fluentd-consumer.properties"]
fluentd-consumer.properties
デフォルトの設定から以下を変更します。fluentd.connect
とzookeeper.connect
はdocker-compose.ymlを使う場合はそれぞれサービス名を指定します。
- fluentd.connect=:24224
- fluentd.tag.prefix=td.<データベース名>.
- fluentd.consumer.topics=<トピック名>
- zookeeper.connect=:2181
- group.id=<コンシューマグループ名>
# Fluentd instance destinations.
fluentd.connect=td-agent2:24224
# Dynamic event tag with topic name.
fluentd.tag.prefix=td.sensortag_dev.
# Consumed topics.
fluentd.consumer.topics=sensortag-sink
# The number of threads per consumer streams
fluentd.consumer.threads=1
# The path for backup un-flushed events during shutdown.
fluentd.consumer.backup.dir=/tmp/fluentd-consumer-backup/
# Kafka Consumer related parameters
zookeeper.connect=kafka-stack:2181
group.id=my-sensortag-sink-group
zookeeper.session.timeout.ms=400
zookeeper.sync.time.ms=200
auto.commit.interval.ms=1000
log4j.properties
log4j.propertiesはデフォルトのまま使います。
# log4j logging configuration.
# This is based on Pinterest's secor
# root logger.
log4j.rootLogger=DEBUG, ROLLINGFILE
log4j.appender.ROLLINGFILE = org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=INFO
log4j.appender.ROLLINGFILE.File=/tmp/fluentd-consumer.log
# keep log files up to 1G
log4j.appender.ROLLINGFILE.MaxFileSize=20MB
log4j.appender.ROLLINGFILE.MaxBackupIndex=50
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [%t] (%C:%L) %-5p %m%n
動作確認
Docker Composeのサービスを起動します。
$ docker-compose up -d
td-agentのバージョンを確認します。
$ docker-compose exec td-agent2 td-agent --version
td-agent 0.12.35
Spark Streamingを使ったウィンドウ集計のサンプルのようにfluentd.consumer.topics
に指定したtopicへJSONフォーマットでデータを送信します。
テストとしてkafka-console-producer
から直接JSONを送信してみます。
$ docker-compose exec kafka-stack kafka-console-producer \
--broker-list localhost:9092 \
--topic sensortag-sink
コマンドを実行後の待機状態でJSON文字列を入力します。
{"bid": "B0:B4:48:BD:DA:03", "time": 1501654353, "humidity": 27.152099609375, "objecttemp": 21.6875, "ambient": 27.09375, "rh": 78.4423828125}
td-agentはファイルバッファを作成してデフォルトでは5分間隔でTreasure Dataへデータがアップロードします。
$ docker-compose exec td-agent2 ls /var/log/td-agent/buffer
td.sensortag_dev.sensortag_sink.b555bf24951c65554.log
Author And Source
この問題について(KafkaからTreasure DataにブリッジするDocker Compose), 我々は、より多くの情報をここで見つけました https://qiita.com/masato/items/50400d480ec91c27977e著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .