No.4 新卒未経験エンジニアがFlumeを使ってTweetを取得してみた


今回はメッセージングのミドルウェアとして有名なFlumeの「Twitter 1% firehose Source」を新卒未経験エンジニアが試してみたお話です。

Flumeって何?

最初にFlumeって何、てところから始めようと思います。
Flumeはズバリ!
「大量のログデータを、多量のソースから中央のデータストアへ、効率的に集約して移動させるための分散型メッセージングミドルウェア」
です!
よく分からないという方のために僕のイメージをお伝えします。

一軒の大人気の居酒屋さんの金曜夜の20時頃をイメージしてください。
大勢のお客さんが四方八方から店員を呼びつけて注文をします。
店員さんは必死に注文をポスに打ち込み、注文をとります。
その店に一つしかない厨房にはバイトリーダー的な人がいて、その人が各料理人に指示を出して絶えず入ってくる注文を捌いていきます。
そして出来あがった料理をお客さんの元へ届けます。
この時、料理のオーダーという情報が店内にいる複数の店員から厨房に集まり、最終的にはお客さんの元に戻っていきます。
厳密には違いますが、複数の情報源からの情報(ログデータ)が一つの場所に集約されて、そこから必要な箇所へと渡されていくという構造は似ているんじゃないかな、と思います。
(あくまで僕のイメージなので間違っている可能性は大いにあります。。)

Flumeの仕組み

Flumeの仕組みを図で説明します。

Webサーバーなどから送られたログデータなどのデータは一度Sourceに入り、そこから一旦Channelに書き込まれ、最終的にはSinkへと出力されていきます。
先ほどの例で言うと、
Webサーバー:お客さん
Source、Sink:店員さん
Channel:厨房
といったイメージです。
このSource、Channel、Sinkという要素がまとまって、Agentという単位になります。

Twitter 1% firehose Sourceって何?

Twitterの、流れてくる最新のつぶやきを連続的にダウンロードして、Avro形式に変換し、それをSinkに送信する実験的なSourceです。
こちらはFlumeの公式ドキュメントにも載っています。

事前準備

まずはTwitterのAPIを利用するためのAPIキーとアクセストークンを取得する必要があります。
TwitterのAPIキーとアクセストークンの取得の仕方はこちらを参考にして頂ければと思います。
必要なのは以下の4つの情報です。
・consumer key
・consumer secret
・access token
・access token secret

いざ実践!

では実際にどんな風に使っていくのか試してみます。
こちらの投稿を参考にさせて頂きました。
ありがとうございます!!
Flumeを使いMac上でTwitterのツイートを取得する

まず、Flumeをインストールしましょう!
こちらのサイトから【flume-ng-latest.tar.gz】をダウンロードしてください。
インストールしたらそれを解凍して適当な場所においてください。
僕はホームディレクトリ配下にdevelop/flumeというディレクトリを作ってそこに置いています。
FlumeはSinkから出力する時にAvroというデータ形式で出力します。
イメージはヘッダー付きCSVだ、と偉い人が言っていました。
Avroをローカルで閲覧するためのツールのavro-toolsをインストールします。

brew install avro-tools

無事インストールが終わり、flumeディレクトリに入ると

apache-flume-1.6.0-cdh5.12.1-binディレクトリに入り、そこでtwitter.confというファイルを作成します。
また、取得したTweetを書き込んだファイルの保存先として/tmp/flume-logというディレクトリも作成しておきます。
作成したtwitter.conf内に以下を記述します。

twitter.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
a1.sources.r1.channels = c1
a1.sources.r1.consumerKey = YOUR_CONSUMER_KEY
a1.sources.r1.consumerSecret = YOUR_CONSUMER_KEY_SECRET
a1.sources.r1.accessToken = YOUR_ACCESS_TOKEN
a1.sources.r1.accessTokenSecret = YOUR_ACCESS_TOKEN_SECRET
a1.sources.r1.maxBatchSize = 50000
a1.sources.r1.maxBatchDurationMillis = 100000

# Describe the sink
# a1.sinks.k1.type = logger
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /tmp/flume-log
a1.sinks.k1.sink.rollInterval = 120

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

ここで上記をコピペしてそのまま実行しようとするとエラーがおきます。
なぜでしょう!?

そうです。

twitter.conf
a1.sources.r1.consumerKey = YOUR_CONSUMER_KEY
a1.sources.r1.consumerSecret = YOUR_CONSUMER_KEY_SECRET
a1.sources.r1.accessToken = YOUR_ACCESS_TOKEN
a1.sources.r1.accessTokenSecret = YOUR_ACCESS_TOKEN_SECRET

ここの箇所には先ほど取得して頂いたAPIキーとアクセストークンを記入しなければいけませんので気をつけてください!

さぁ!
実際にFlumeを起動していきましょう!!
先ほどのapache-flume-1.6.0-cdh5.12.1-binディレクトリにいる状態で、以下のコマンドを叩いてください。

bin/flume-ng agent -n a1 -c conf -f twitter.conf -Dflume.root.logger=INFO,console -Xmx1g

Flumeは起動する際に、Agentとconfigファイルを指定してあげる必要があります。
twitter.confを見ると、

twitter.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k
a1.channels = c1

この部分がAgentを表しています。
a1というのは、そのAgentの名前です。
a1、r1、k、c1はただの名前なので任意のもので構いません。

ほっとくといつまでTweetを収集するか分からないのでテキトーなところで⌃c(control+c)を押して処理を止めます。
作成した/tmp/flume-logディレクトリ配下に入り、以下のコマンドを叩くと、取得したTweetをjson形式で見ることが出来ます。

avro-tools tojson --pretty 生成されたファイル | less

すると。。。
まぁ大量のTweetを取得していますこと!
そんなにAPI叩いて大丈夫だったのかな、と不安になりながらも無事Tweetを取得出来たことにほっとしました。
ちなみに、

avro-tools getschema 生成されたファイル

を叩くと、Avroのスキーマ(何かの構造を表したもの)情報を見ることが出来ます。
ちなみに(2回目)、
Flumeの移動時に叩いたコマンドで

-Dflume.root.logger=INFO,console

この部分はTweetを取得する処理のログをconsole上に表示させる、というコマンドです。

まとめ

最初に思っていたよりも難しいことはなく、導入から実践までは躓くこともなくさらっといけた感じでした。
まだまだFlumeの実力を知らないので、ぜひ実践で使っていきたいです。

終わり。