Data Engineering Meetup #1 勉強会メモ


概要

Data Engineering Meetup #1

Data Engineering Meetup は、データエンジニアリング(データの収集・管理・処理・可視化など)に関する技術の情報交換・共有を行うためのミートアップイベントです。

発表

ところどころ抜けや間違いがあると思うので参考程度に

Spark 2.4 & 3.0 - What's next -

発表資料

演者

株式会社エヌ・ティ・ティ・データ 猿田 浩輔 様

内容

  • Sparkの変遷
    • 最新ver 2.4
    • 今年リリース予定ver 3.0
    • 直近では、RDDからSparkSQLへの処理系へ移行し、パフォーマンスと利便性が向上
      • 宣言的なAPIやSQLで処理が書ける
      • オプティマイザによる最適化の恩恵を受けられる
      • 開発言語による性能差が出にくい
      • MLLibなどのライブラリもDataFrameを利用したものが整備されてきた
  • kubernetesサポート
    • 2.3からサポートされた
    • 2.4対応
      • PySparkとSparkR対応
      • クライアントモードによるインタラクティブな利用
    • 3.0対応
      • Kerberos認証サポート
      • ダイナミックリソースアロケーション
  • Project Hydorogen

    • Sparkにおけるディープラーニングを活用したユースケースをカバーするサブプロジェクト
      • Barrier Execution Mode
      • Accelerator Aware Scheduling
      • Optimized Data Exchange
    • BarrierExecution Mode (2.4で基本対応)
      • ディープラーニングは、複数タスクの協調動作が必要とされる
      • 従来のSparkのタスクスケジューリングでは難しいため、新しいスケジューリング機能
    • Accelerator Aware Scheduling (3.0対応)
      • GPUのサポート
  • Spark Graph(3.0対応)

    • 既存のグラフ処理ライブラリであるGraphXやGraphFrameの課題を解決するライブラリ
      • GraphX
        • あまりメンテナンスされていない
        • RDDベースでScalaAPIしか提供されていない
      • GraphFrames
        • SparkPackagesで提供されるサードパーティパッケージ
        • DataFrameベース
        • 単純なグラフマッチングしか行えない
        • エッジやノードのセマンティックが弱く、単純なグラフマッチングしか行えない
    • SparkGraph
      • GraphFramesでは、エッジやノードの型を指定できず、グラフ形状によるマッチングのみ
      • 型が指定できるProperty Graphというデータモデルを採用
      • Cypherによるクエリで、型や属性を用いたグラフマッチングが可能

持続可能なデータ基盤のためのデータの多様性に対する取り組み

発表資料

演者

株式会社サイバーエージェント 善明 晃由 様

内容

  • データエンジニアリングは技術的負債が生まれやすい

    • 専門性が高く、採用が難しい(人がいない)
    • 規模が大きく、アップデートにかかるコストが大きい
    • 技術の進歩が速い
  • 問題意識

    • 各サービスが独立して案件を進めるため、サイロ化
    • 各案件に十分なリソースを避けず、担当者にフルスタックなスキルが求められる
    • 開発から運用まで一連をやる必要があり、専門性が深められない
    • 変化が速いために、技術的負債につながりやすい
  • 方針

    • 各システムの共通部分をプロダクト化を促進
    • 役割を分離して、専門性を高める
  • HBaseのスキーマ設計

    • Hiveの集計結果をHBaseに書き込み、結果を可視化
    • HBaseは、キーによってパフォーマンスが大きく変わるので、スキーマ設計が重要
    • 用途によって有効なスキーマが異なる
      • PVやUUの集計
      • ページごとのPVやレスポンスタイム
      • 広告の効果測定
    • HiveからHBaseのKeyValueに変換する汎用的な設計モデルツールを作成
    • やることが、対応スキーマの拡張と問い合わせ処理の最適化のみに
  • WebUIでのレポート

    • 元となるデータは表形式
    • 列は2種類
      • ディメンジョンとメトリック
    • Grafanaで出力できる統一的なフォーマット化
    • やることが、フロントエンドでの可視化やUI改善のみに

Deep Dive into Spark SQL with Advanced Performance Tuning

発表資料

演者

Databricks Inc. 上新 卓也 様

内容

  • SparkSQLの内部詳細

    • クエリからRDDを生成する
    • df.explainでプランを生成、解釈し、パフォーマンスチューニングを実施する
  • Metadata Catalog

    • パーティションのメタデータ取得のコストを下げる
      • Hive metastoreのアップグレード
      • cardinalityの高いパーティションカラムを避ける
      • 取得時にパーティションの絞り込みを入れる
  • Cache Manager

    • 始めて利用するときにキャッシュデータ作成
    • 以降、プランが一致した場合に、キャッシュデータと置き換える
    • ディスクに書き出される場合があり、必要以上にキャッシュしないようにする
  • Optimizer

    • 生成されたロジカルプランを最適化
    • ヒューリスティックとコストベースでプランの書き換え
    • 独自にOptimizerやPlannnerRuleを組み込むことができる
  • Plannner

    • ロジカルプランをフィジカルプランへ
    • 各プランのコストに基づいて最適なフィジカルプラン選択
    • 結合時のbroadcast joinとshuffle joinの選択(前者が早い)
      • spark.sql.autoBroadCastJoinThresholdでbroadcast joinをする閾値を設定
      • 統計情報を細心に保つ
      • クエリにbroadcastJoinヒントを組み込み、明示的に選択
    • Join条件に少なくとも=条件を1つ入れることで、計算量削減
  • Query Excecution

    • Memory Managerのメモリ管理

      • spark.executor.memoryとspark.memory.frationに多少余裕を持たせて設定する
      • Spkarの監視外のメモリのため
      • spark.memory.offHeap.enabledとspark.memory.offHeap.sizeを設定して、オフヒープを有効化
      • そのぶんspark.executor.memoryを減らしてよい
    • Code GeneratorのフィジカルプランからJavaコードの生成

      • spark.sql.codegen.hugeMethodLimitを設定
      • バイトコードのサイズが8kバイトを超える大きなメソッドはJITコンパイルできず、インタプリタで実行するため遅くなる
      • 設定を8kにするとよい?

Improving Spark SQL Performance

発表資料

(Hadoop / Spark Conferenceから参照したので違う資料かも)

演者

LINE株式会社 吉田 啓二 様

内容

  • OASIS

    • 内製のBIツール - 1日200人以上、700以上のSparkジョブ
    • 課題:遅いクエリが存在する
      • エンドユーザのクエリの改善によって、パフォーマンスを解決することが難しい
      • プラットフォーム側で性能改善を頑張る
  • 結合方式の最適化

    • sort merge join
      • 結合するテーブルそれぞれでシャッフルされる
    • broadcast hash join
      • 1つのテーブルのみをシャッフルするため、早い
      • シャッフルするテーブルがメモリ上に乗せる
      • spark.sql.broadcashhashjoinThresholdで指定される(デフォルト10MB)
    • できる限りbroadcast hash joinが選択されるようにする
  • ケース1

    • コスト見積もりが正しく行えていない
      • spark.sql(query).queryExecution.otimizedPlan.stats.sizeInBytes
    • データ追加時に、統計情報を取得して、正しく見積もりできるようにする
      • sql('ANALYZE TABLE [db_name.]table_name COMPUTE STATISTICS NOSCAN')
  • ケース2

    • ファイルをLOAD DATAで参照しているhiveテーブルの統計情報がとれない
      • hive.stats.autogetherの仕様
    • broadcastヒント
      • 結合時にbroadcastを選択することができる
      • クエリに明示的に入れる必要があるため、ここでは却下
    • spark.experimental.extraOptimizationsで独自の最適化ルールを追加
  • cost-based optimizationについて

    • Spark 2.2.0~
    • デフォルトはルールベース
    • 設定値でコストベースに変えることが出来る
      • spark.sql.cbo.enabled = true
      • spark.sql.cbo.joinReorder.enabled = true
    • broadcast hash joinをより精緻に選択される
      • ルールベースでは、テーブル単位の統計情報を参照している
      • where句などの絞り込み後のカラム単位の統計情報を参照する
    • 結合順を最適化される
      • 小さいテーブルから結合して、シャッフルのコストが少なくなる