FileIOのDynamic DestinationのDestinationTはequals/hashCodeが必要だよ


細かすぎて伝わらないApache Beam/Dataflow選手権#1です。

FileIOって?

Apache Beamのファイル一般を扱うIOクラスです。
ドキュメントによるとS3/GCS/HDFS/ローカルのファイルを、扱う事が出来るようです。

Dynamic Destinationって?

PCollectionの中身によって出力先を変える機能です。
具体的な設定はFileIO.Writeで定義されており、

  • byで、入力(UserT)をグループ(DestinationT)に分類するクラスを指定
  • viaで、入力(UserT)を出力(OutputT)に変換するクラスを指定
  • withNamingで、グループ(DestinationT)からファイル名を決めるクラスを指定

の3つが、Dynamic Destinationの主な設定です。

DestinationTって

入力(UserT)分類し、グループ化するためのラベルとして使われるクラス(ジェネリクス)です。
境界(extends)が定義されてはいないので、特に必要なメソッドや親クラスは指定されていません。

って思うじゃん‥

詰まった

やろうとしていたこと

  • 数十万件のPub/Subメッセージを、Dynamic Destinationで仕分け(5種類くらい)してGCSに書き込みたい
  • Dynamic Destinationなしでは、楽勝に処理出来ていた

起きたこと

  • ファイルを書き出す段階で、WorkerでOut Of Memory(OOM)が起きた
    • (Dynamic Destination無しと、同じインスタンス種類の場合)
  • ごく一部しか結果がファイルに書き込まれなかった
  • ワーカーのメモリを巨大(64GB)にすると、ゆっくり書き出される
    • オートスケールにしても台数は増えない

原因

DestinationTの実装に

  • equals
  • hashCode

を(正しく)実装していなかったのが原因。

なぜ必要か

端的にはDestinationTがHashMapで使われているためです。

細かい流れが気になる人向けに:

Dynamic Destinationでは、以下の流れで書き込みを行います。

  1. 入力(UserT)をファイルに割り振るために、番号を付ける(ここらへん
  2. 番号でGroupByKey(ここらへん)
  3. 一時ファイルに書き出す(ここらへん
  4. 一時ファイルを改名(ここらへん

下のような理由で、DestinationTがHashMapに使われています:

  • 入力を連番に割り振る時(1)に、ハッシュが衝突する可能性がある
  • 衝突すると、異なるDestinationがGroupByKeyでまとめられる
  • そのため、一時ファイル書き出し(3)の部分でもDestinationTを計算。異なるDestinationTを異なるファイルに書き出すようにしている
    • この時、DestinationTと書き出し先を管理するために、DestinationTをHashMapに突っ込んでいる

このため、DestinationTがhashCode/equalsを(正しく)実装していないと、

  • 入力毎に違う書き出し先とみなされる
    • つまり1ファイル1行
  • 超絶細かい単位で書き出すので時間がかかる
  • 出力が終わるまで、入力データが保持するので、メモリ使用量が多くなる

のではないかと推測しています。

今回学んだこと

  • equals/hashCodeを実装しないでHashMapを使うと死ぬ
    • Effective Java大事
  • DataflowでもDumpが取れる
    • Googleの人がわかりやすい資料を書いてくれています
    • (がダンプしても分かるとは限らない)
  • Beamのファイル書き出しは、ちょっと複雑な処理をしている