認定Big Data勉強記 - 5: Amazon EMR


みなさん、こんにちは、えいりんぐーです。

今回はEMRについてまとめます。

Q: Amazon EMR とは何ですか?

Amazon EMR は、企業、研究者、データアナリスト、および開発者が、簡単に、そして費用対効果の高い方法で、莫大な量のデータを処理することができるようにするウェブサービスです。Amazon Elastic Compute Cloud (Amazon EC2)、および Amazon Simple Storage Service (Amazon S3) のウェブスケールインストラクチャで実行されるホストされた Hadoop フレームワークを使用します。

資料集

基本的に以下の資料を参考にしています。

仕組み

Hadoopフレームワーク、マスターが多数のコアを管理して分散処理する。

ノード セキュリティグループ 役割
Master Node Master Instance Group - コアノードやタスクノードの監視。ネームノードやリソースマネジャー (YARN) が動く。Failover非対応。sshのポートが開いている。
Core Node Core Instance Group HDFSをアタッチされたボリュームとして持つ。計算を実行する。データノードが動く。追加したり、HDFSやCPUやRAMを増設できる。基本的にマスターとしか通信しない。
Task Node Task Instance Group HDFSを持たないコアノード。増設がより自由。スポット価格やインスタンスタイプを調整できる。基本的にマスターとしか通信しない。
YARN: Yet Another Resource Negotiator
  • Resource Manager
    • マスターサーバーで動く
    • スレーブのリソースを管理
    • ジョブ管理はしない
  • Node Manager
    • スレーブサーバーで動く
    • サーバーのリソースをRMに報告
    • サーバー上のコンテナの管理
  • Container
    • スレーブのリソースが切り出されたもの
    • NMによって起動
  • Application Master
    • ジョブ全体を管理するコンテナ

EMRにおいては、MapReduceやHiveやSparkへ計算資源を割り振ってくれる。共通のリソースを分割して扱ってくれる。

特徴と機能

  • EMRFS
    • S3をHDFSのように扱える。
      • CPUとストレージの分離
      • S3の暗号化も対応
    • Consistent View
      • 新しいオブジェクトのプット → Read-after-write consistency
      • 上書きプット/デリート → 結果整合性
      • DynamoDBにメタデータを格納
  • Bootstrap Action
    • ノード起動時に実行されるスクリプト
      • Bash, Ruby, Python, etc
      • S3におく
      • 引数も自由
  • Step
    • 任意のタイミングで追加・設定できる処理
      • HQLとか
    • S3のURIやローカルファイルを指定して実行
      • jar, Streaming, Hive, Pig, Spark, bash
  • ジョブ実行
    • クラスター外
      • Step API, Lambda, Data Pipeline, Airflowなどから
    • クラスター内
      • マスターノードにssh
      • AWS System Managerから
      • Hive, Spark, Zeppelin, Hue, Livy, Oozie
  • Private Subnetの利用
    • S3/DynamoにはVPCエンドポイントが必要
    • それ以外にはNAT GW
    • Security Groupが必要
  • 暗号化
    • at-rest: サーバーサイドやクライアントサイドで暗号化
    • in-transit: TLS
    • セキュリティ設定やCloudFormationで
  • タグ
    • タグをIAMポリシーのコンディションに指定してアクセス管理できる
  • オートスケール
    • yarn.resourcemanager.decommissioning.timeout
      • defalut: 1 hour
  • HBaseのデータストアとしてのS3
    • 可用性・耐久性が高くなる
  • Cloud Watch Event
    • EMRのイベントをモニタリングできる
  • インスタンスフリート
    • スポットインスタンスをまとめて管理
  • カスタムAMIを利用したクラスター起動
    • 追加ソフトウェアを事前にロードできるので設定が楽
  • アプリケーション
    • サポート
      • クラスタ作成時にオプションをつけてインストール可能なもの
    • カスタム
      • 事前にApache Bigtopに追加してBootstrapでデプロイ
      • カスタムAMIを使う

ポイント

  • Hive Metastoreは重要
    • SparkやPrestoからも参照可能
    • MetastoreのMySQLをクラスタ外にも作成可能
    • Glue Data Catalogを利用可能
  • 列指向ファイルフォーマット
    • ORCとParquet
      • I/O効率と圧縮効率がいい
  • HiveでKinesisのデータ処理
    • Kinssisが保持するデータをHiveのテーブルとして扱える
      • アーカイブ用途など
    • StreamのShard毎にMapperがデータを読み出す
  • Spark
    • 分散インメモリ処理フレームワーク
    • アプリケーション群
      • SQL
        • HQL互換
      • Streaming
        • Discretized Streamと呼ばれる、高レベル抽象表現
        • マイクロバッチ処理
        • KinesisのストリームをEMRに流せる
          • KCLがバックエンド
          • 数秒〜数分、ニアリアルタイム
      • MLlib
      • GraphX
  • IAMロール: 2つ必要
    • EMRロール
      • EC2を起動するのに必要
    • EC2 instance profile
      • EC2がS3などにアクセスするのに必要
  • Kinesis統合
    • Kinesisコネクタというものがある
      • Kinesisストリームから直接データの読み取りとクエリを実行できる
      • いちいち独立したアプリケーションを開発する必要がなくなった
    • Cronなどでスケジュールされたジョブ実行ができる
    • MapReduce自体はバッチ処理なので、ストリームを複数のバッチに分割する
      • 各バッチを反復計算と呼ぶ
      • バッチの失敗に際して再試行するので、冪等性を保証する
    • 1つの反復計算で複数のクエリを実行できる
      • kinesis.checkpoint.iteration.no を設定する
      • 論理名を別にすることで、並列クエリ実行ができる
    • 連続ストリーム処理はできない
      • Twitter StormやSpark Streamingを使えばなんとか
    • 複数のKinesisストリームを結合することもできる
    • ストリームにデータがないとき
      • kinesis.nodata.timeout の時間分だけレコードの取得を試行する(ポーリング)
      • 時間が経ったらポーリングを停止して、すでにあるレコードだけ処理する
      • 新しくレコードが来たら kinesis.iteration.timeout 分だけ待ってポーリングする

ベストプラクティス

データ移行
  • s3distcp を使う
    • distcp はHadoopエコシステムでよく使われるコピーコマンド
    • s3distcp はS3に最適化されている
  • AWS Import/Exportを使う
    • データをストレージに入れて物理的に移行する
    • データが大きい時に有効
  • AWS Direct Connect
    • AWSとオンプレミスを専用回線でつなぐ
    • パブリック環境とプライベート環境とでネットワークを分離できる
データ集約
  • 多数の小さいデータではなく、少数の大きいデータにまとめる
  • S3のオブジェクトは、チャンクサイズに分割されてEMRに読み込まれる
    • だいたいチャンクサイズは64MBとか
    • データの圧縮形式が分割に対応していると良い
      • LZOやBZIP2
  • 集約されたデータのサイズは均等だと良い
  • マッパーとレデューサーの間の中間データも圧縮できる
    • データ量は REDUCE_SHUFFLE で確認できる
    • 圧縮は mapreduce.output.compress を true にする
  • マッパーが書き出すファイルも圧縮できる
    • mapred.compress.map.output を true にする
  • データのパーティション
    • 大きすぎるデータは逆に非効率なので、程よいサイズに分割する
    • 1時間単位で分析するようなデータなら、日時でデータを分割する
インスタンスタイプ

EBSをアタッチできるようになったので、M4やC4ファミリーのインスタンスを使えるようになった
ただし暗号化されたEBSをサポートしていない
要件として、計算が重いのか、メモリが必要なのか、I/Oが重要なのかに応じてインスタンスを選ぶ

EC2 A1, T3, T2, M5, M5a, M4 C5, C5n, C4, z1d R5, R5a, R4, X1e, X1, ハイメモリ P3, P2, G3, F1 H1, I3, D2
A1: ARMアーキテクチャ。Tx: バースト可能、処理のスパイクに対応。Mx: 汎用 コンピューティング最適化。CPUが強い メモリ最適化。メモリが強い。 GPU ストレージ最適化。ローカルストレージが強い。データのスループット良い
EBS gp2 io1 st1 sc1
汎用 プロビジョンドIOPS。低レイテンシー・高スループット スループット最適化HDD。HDFS向き。 Cold HDD。低アクセス。HDFS向き
  • 一時的なクラスター: Transient Cluster
    • 処理が終了したらシャットダウンするもの
      • HDFSがプライマリストレージでない
      • 試行錯誤する作業
  • 永続的なクラスター: Permanent Cluster
    • 処理が終了しても起動状態にしておくもの
      • ジョブが頻繁で定期的
      • 相互のジョブに依存関係がある
  • オンデマンドインスタンス
    • 使用する処理能力に応じて課金
    • 一時的なクラスターに向いている
  • リザーブドインスタンス
    • 予約するインスタンスに応じて予約金を払う
    • オンデマンドより単価は下がる
    • 永続的なクラスターに向いている
  • スポットインスタンス
    • 非稼動のEC2インスタンスに入札して利用する
    • オンデマンドより単価が下がる
    • 一時的なクラスターやタスクノードの追加に向いている
アーキテクチャ
  • S3をプライマリストレージにする
    • メリット
      • データの耐久性
      • コンピューティングとストレージの分離
      • データセキュリティやライフサイクルの管理
      • ストレージのスケーリング
      • Transient Clusterの利用
      • 複数クラスターでのデータ共有
    • デメリット
      • 試行錯誤する作業の場合は、S3とのネットワークがボトルネックになる
  • S3とHDFSの併用
    • S3に保存しつつ、実行時にHDFSにコピーする
    • メリット
      • S3の耐久性と可用性
      • Transient Clusterの利用
      • 反復的な作業の効率化
    • デメリット
      • データコピーによる処理時間増加
  • HDFSがプライマリ、S3をバックアップ
    • メリット
      • S3からデータをコピーする必要がない
      • 頻繁な処理の高速化
    • デメリット
      • データの耐久性
  • 伸縮自在なクラスター
    • マスターノードと必要最低限のコアノードを設定する
    • タスクノードをデータ処理の要求に応じて追加・削除する
    • インスタンスフリート

セキュリティ

  • IAMによるリソースのアクセスコントロール
  • セキュリティグループによるノードに対するネットワーク
  • S3-SSEやKMS、CSE-KMSを利用したat-rest暗号化
  • TLSを利用したin-transit暗号化
  • オープンソースアプリケーションやLUKSを利用したローカルディスクでの暗号化
  • Kerberos認証

細かいこと

  • TCPウィンドウスケーリング
    • TCP ウィンドウスケーリングを使用すると、64 KB を超えるウィンドウサイズをサポートすることにより、オペレーティングシステムおよびアプリケーションレイヤーと Amazon S3 との間でネットワークスループットのパフォーマンスを向上させることができます。TCP セッションの開始時に、クライアントは、そのサポートされているウィンドウ WSCALE 係数をアドバタイズし、Amazon S3 は、アップストリーム方向でサポートされている受信ウィンドウ WSCALE 係数を返します。
  • TCP選択的伝送確認
    • 一部は正しく受信しましたよ、と送信側に信号を送る。伝送効率が上がる。
  • 高度な最適化
    • データ構造の設計
      • 適切にパーティショニングする
    • Hadoopは基本的にバッチ処理フレームワークなので、時間制約がある場合はStormやSparkを使う
    • クラスターの細かいチューニングよりノードの追加が簡単で有効
    • マッパーの改善
      • マップ数の削減
        • 必然的に大きなデータを処理することになるので、マッパーあたりの処理量が向上する
      • マッパー出力の圧縮
      • マッパーのディスク書き出しの回避
    • レデューサーの改善
      • アイドル状態レデューサーの削減
      • レデューサーメモリの増加