RayをDatabricksで活用する


How to Use Ray, a Distributed Python Framework, on Databricks - The Databricks Blogの翻訳です。

Rayは膨大な計算資源を必要とするあらゆるPythonワークロードをシンプルにスケールオープンソースプロジェクトであり、当初はRISELabによって開発されていました。柔軟性のある分散実行フレームワーク上に構築された豊富なライブラリとインテグレーションによって、Rayは新たなユースケースをもたらし、通常であれば開発するには複雑なカスタム分散処理Python関数の開発をシンプルなものにします。

RayをApache Spark™のクラスターで実行することで、PySparkのUDF(ユーザー定義関数)の内部コードを分散処理できる能力を得ることに加え、ドライバーノードでの実行にのみ用いられるPythonコードも分散することが可能となります。また、Rayのスケーラブルな強化学習ライブラリRLibをすぐに利用することもできます。これらの能力によって、幅広い新たなアプリケーションを活用できるようになります。

どうしてSparkの上に別の分散フレームワークが必要なのでしょうか?

クラスターに関数をどのように分散させるかに関しては二通りの考え方があります。一つ目の考え方においては、データセットのパーツが分割され、関数はそれぞれのパーツに対して動作し、結果を収集します。これは、ビッグデータの領域においては最も一般的な形態であり、Apache Sparkのベストな適用例でもあり、これをデータの並列化と呼びます。データ並列フレームワークのモダンな形態においては、通常データフレーム関数が存在しており、UDFの外の手作りの関数のような分散処理オペレーションの内部の低レベルの構成要素になることを意図したものではありません。

図1: データの並列化

分散関数の別形態は、データセットは小規模ですがオペレーションが複雑で、異なるパーティションに同じ関数をシンプルに適用するだけでは問題を解決できないケースで必要となります。これは、タスクの並列化、論理の並列化として知られるものであり、多くの関数を同時に実行することができ、依存関係を取り扱うためのスケジューラーとパラメーターサーバーを用いて複雑なパイプラインにおいてセットアップされます。このタイプの並列化は、多くの場合HPCや(ハイパフォーマンスコンピューティング)や、データフレームのオペレーションでは実現できないカスタム分散ジョブで見かけられます。多くの場合、これらのフレームワークはスクラッチから分散関数を設計するためのものとなっています。例としては、物理シミュレーション、金融取引アルゴリズム、先進的な数理計算が含まれます。

図2: タスクの並列化

しかし、多くのタスク並列化ライブラリ、HPCライブラリは(多くのデータサイエンスパイプラインで求められる)Pythonワークロード向けではなくC++で記述されており、先進的なデザインパターンのようなカスタムジョブに対応できるほど汎化されていません。また、これらはクラスターにおける分散関数ではなく、シングルマシンにおける線形代数オペレーションのパフォーマンスを改善するために、マルチコアCPUアーキテクチャのハードウェアに対する最適化が行われている場合があります。このようなハードウェア依存のライブラリは、コモディティなクラウドハードウェアではなく専用のハードウェア向けに開発されています。大部分のタスク並列ライブラリにおける主な課題は、タスク間の依存関係を作成するのに求められる複雑性のレベルと膨大な開発期間です。これらの課題を解決するために、多くのオープンソースPythonライブラリは、Pythonのシンプルさとカスタムタスクをスケールさせる能力を組み合わせるための開発を行ってきました。

Pythonにおけるタスク、論理並列化の最新かつベストな例がRayです。シンプルさ、低レーテンシーの分散スケジューリング、分散関数間の高度に複雑な依存関係を迅速に作成できる能力によって、汎化性能、スケーラビリティ、複雑性に関する問題を解決します。詳細はGentle Introduction to Rayをご覧ください。

Rayアーキテクチャの簡単なご紹介


図3: Rayのアーキテクチャ

Rayのアーキテクチャで特筆すべき点は、どのようにジョブをスケジュールするのかに関して2レベルの抽象化が存在するということです。Rayはローカルシステムを、異なるプロセスや、典型的なビッグデータ用語でいうノードのような関数であるRayletが動作するクラスターとして取り扱います。別々のマシンをノードとして取り扱うことができるグローバルスケジューラーも存在します。これによって、シングルノードやラップトップレベルでの開発から、大規模スケールのクラウドコンピューティングまで効率的にスケールすることができます。それぞれのノードはグローバルスケジューラーとコミュニケーションすることができる自身のローカルスケジューラーを持っているので、あらゆるノードからクラスターの他のノードにタスクを送信することができます。この機能によって、開発者は他のリモートタスクを起動することができるリモートタスクを作成することができ、オブジェクト指向プログラミングの多くのデザインパターンを分散システムに持ち込むことができ、これはスクラッチで分散アプリケーションを作成することを目的としたライブラリにおいては不可欠なものになります。また、タスク、関数、イベント、その他のシステムレベルのメタデータの追跡を行うグローバルコントロールストア(GCS)を管理するノードも存在します。

図4: ワーカーノードとGCS間のデータフロー図

Rayにおけるオブジェクトストアは、クラスターで使用される共有の関数、オブジェクト、タスクを管理するApache Arrow上に構築される分散オブジェクトストアとなっています。Rayの最も重要な側面の一つとして、このオブジェクトストアはオブジェクトの削除、あるいはメモリ溢れを引き起こす永続化(Ray v1.2以降)のために階層型のメモリー管理を伴うインメモリストアであるということがあります。この高速インメモリシステムによって、大規模での高性能コミュニケーションを実現しますが、メモリ溢れを回避するために大容量のメモリーを搭載したインスタンスが必要になります。

関数内で他のリモートタスクを呼び出すリモートタスクのシンプルな例を見てみましょう。プログラムの依存関係はタスクのグラフ(Task graph)で表現され、物理的実行(Physical execution)においては、関数は別のワーカーノードで実行されながらも、どのようにオブジェクトストアが共通の変数と結果を保持するのかを示しています。

図5: アプリケーションにおけるドライバー、ワーカーノードとオブジェクトストアの関係を示したサンプル

リモートクラスオブジェクト(Rayではリモートアクターと呼びます)によって、ネストされたアクター、関数のツリーのようにさらに洗練されたデザインパターンやパラメーターサーバーを実現できるようになります。このシンプルなAPIとアーキテクチャを用いることで、背後のインフラストラクチャを構築することなしに、複雑な分散タスクを迅速に設計できます。こちらから数多くのデザインパターンのサンプルにアクセスできます。

Python
@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

    def get_counter(self):
        return self.value
counter_actor = Counter.remote()

背後のアーキテクチャの詳細に関しては、Ray 1.0 Architecture whitepaperをご覧ください。

RayをDatabricksクラスターで動かしてみる

注意
公式のRayのドキュメントでは、RayDPプロジェクトの文脈でSparkとのインテグレーションを説明しています。しかし、Databricksでは、Rayクラスターとして初期化するのではなく、マネージドのSparkクラスターを起動するので、これは「SparkにおけるRay」に関するものです。また、Databricksでは公式にはRayをサポートしていません。

Databricks上のRayでスクリプトを実行する前に、いくつかのカスタムセットアップが必要となります。initスクリプトは、Apache Sparkのドライバー、ワーカーのJVMがスタートする前位に、それぞれのクラスターノードの起動時に実行されるシェルスクリプトです。initスクリプトの設定手順はこちらから参照することができます。

initスクリプトを作成するために、Databricksノートブックで以下のセルを実行します。

Python
%python

kernel_gateway_init = """
#!/bin/bash

#RAY PORT
RAY_PORT=9339
REDIS_PASS="d4t4bricks"

# install ray
/databricks/python/bin/pip install ray

# Install additional ray libraries
/databricks/python/bin/pip install ray[debug,dashboard,tune,rllib,serve]

# If starting on the Spark driver node, initialize the Ray head node
# If starting on the Spark worker node, connect to the head Ray node
if [ ! -z $DB_IS_DRIVER ] && [ $DB_IS_DRIVER = TRUE ] ; then
  echo "Starting the head node"
  ray start  --min-worker-port=20000 --max-worker-port=25000 --temp-dir="/tmp/ray" --head --port=$RAY_PORT --redis-password="$REDIS_PASS"  --include-dashboard=false
else
  sleep 40
  echo "Starting the non-head node - connecting to $DB_DRIVER_IP:$RAY_PORT"
  ray start  --min-worker-port=20000 --max-worker-port=25000 --temp-dir="/tmp/ray" --address="$DB_DRIVER_IP:$RAY_PORT" --redis-password="$REDIS_PASS"
fi
""" 
# Change ‘username’ to your Databricks username in DBFS
# Example: username = “[email protected]username = ""
dbutils.fs.put("dbfs:/Users/{0}/init/ray.sh".format(username), kernel_gateway_init, True)

ノートブックで作成したinitスクリプトを、クラスター起動時に実行するようにクラスターを設定します。クラスターのUIを使っている場合には、Advanced optionsは以下のようになります。

図6: クラスター設定例

Python UDFの分散処理

ユーザー定義関数(UDF)は、関数内部が直列に実行されるため、最適化が難しい場合があります。データを転送するためにApache Arrow、データを操作するためにPandasを使用するPandas UDFを用いることでUDFの性能を改善するなど、Spark UDFを最適化する選択肢が存在ます。これらの選択肢はハードウェアによる最適化を可能としますが、通常分散処理できない複雑なPythonタスクの実行時間を劇的に削減するために、論理的最適化にRayを使用することができます。添付ノートブックにあるUDF内の分散MLモデルのサンプルは、2倍の性能改善を達成しています。

強化学習


図7: 強化学習

重要かつ成長している機械学習のアプリケーションに、MLエージェントがリワード関数を最大化するように、環境におけるアクションを学習する強化学習があります。適用事例には、自律運転、電力消費量の最適化から最先端のゲームプレーなどがあります。強化学習は、教師なし学習、教師あり学習に続く機械学習の3つ目のカテゴリーとなっています。

強化学習アプリケーションの開発における課題には、学習環境やエージェントがトレーニングを行うシミュレーションの作成の必要性、スケーリングの複雑性、オープンソース標準の欠如が含まれます。それぞれのアプリケーションには、多くの場合カスタムメイドの環境が必要となり、エージェントが実行するアクションの結果を提供する履歴データや物理シミュレーションなどを通じて作成されます。このようなシミュレーション環境の例として、OpenAI Gym(古典的なAtariゲームやロボティクス向けの環境)、CARLA(オープンソースのドライビングシミュレーター)、Tensor Trade(証券取引のトレーディングアルゴリズム向け)などがあります。

これらのシミュレーションをスケールさせるために、シンプルにデータセットのパーティションに対して処理を実行することはできません。最もシンプルな分散モデルトレーニングの形態においても、いくつかのシミュレーションは他のシミュレーションの前に完了するでしょうし、これらは自身のコピーとコミュニケーションしなくてはなりませんし、モデルを統合するために中央サーバーに機械学習モデルの重みを返却する必要があります。このため、これはビッグデータではなく、非常に複雑な計算処理を同時かつ大量に処理するタスク並列化の問題となります。最後に述べる問題は強化学習ライブラリにおけるオープンスタンダードの欠如です。ディープラーニングや従来の機械学習は、標準やフレームワークの差異を吸収するライブラリ(例えばMLflow)の確立に長い時間を費やしてきましたが、強化学習は開発の初期段階であり、モデルライブラリの確立された標準はまだ存在しておらず、様々なバリエーションが存在しています。このため、アルゴリズムやフレームワークを切り替える際の開発期間が長期化します。

これらの問題を解決するために、RayはRLibと呼ばれるスケーラビリティの高い強化学習ライブラリと統合されたAPIを提供しています。OpenAI Gymやユーザー定義環境で動作し、様々なアルゴリズムでトレーニングを行うことができ、背後のニューラルネットワークとしてTensorFlowやPyTorchをサポートしています。RLibとDatabricksを組み合わせることで、スケーラビリティの高いストリーミングとDelta Lakeによるデータ統合と最先端の強化学習モデルの高いパフォーマンスのメリットを享受することができます。

RLibは、モデルのバリエーションを実行しベストなモデルを探し出すスケーラブルなハイパーパラメーターチューニングのためのRayのライブラリであるTuneを使用しています。このコードサンプルでは、PPO(Proximal Policy Optimization)エージェントをOpenAI GymのCartPole環境で実行し、学習率の3つのオプションに対するグリッドサーチを実行しています。内部で行われているのは、Sparkノード上のRayプロセスが環境のシミュレーションを実行し、バッチに対してモデルのトレーニングを実行する中央のトレーニングRayプロセスにバッチを返却しています。そして、より多くのトレーニングデータを収集するために、モデルをロールアウトワーカーに送信します。トレーナープロセスを高速化するためにGPUを使用することができますが、設定num_gpusを0にすることで、安価なCPUノードで処理を行うことも可能です。

図8: PPOアーキテクチャ

Python
from ray import tune

tune.run(
    "PPO",
    stop={"episode_reward_mean": 200},
    config={
        "env": "CartPole-v0",
        "num_gpus": 0,
        "num_workers": 3,
        "lr": tune.grid_search([0.01, 0.001, 0.0001]),
    },
)

強化学習の適用範囲は広く、シミュレーションが実行可能で、コスト関数を定義でき、固定の論理的ルールやシンプルなヒューリスティックなモデルを適用できないほど複雑なケースで強化学習を活用することができます。強化学習の最も有名な適用事例は、研究目的のものであれば、AlphaGo、人間を超えるレベルのAtariエージェント、シミュレートされた自律運転などがありますが、実世界におけるビジネスユースケースも多く存在します。最近の適用事例には、工場のロボット操作制御、電力消費の最適化、マーケティング、広告レコメンデーションなどがあります。

使ってみる

Sparkのパワーと連携したRayを活用することで、Databrciksのレイクハウスプラットフォームをの適用範囲を拡大することができ、スケーラブルなタスク並列化や強化学習が可能となります。このインテグレーションは、新たなストリーミング、ML、ビッグデータワークロードに対応するために、信頼性、セキュリティに優れた分散計算資源、Delta lakeによるパートナーインテグレーション、Rayの汎用的な分散処理フレームワークのメリットを組み合わせたものとなります。

ノートブックを試してみる

Databricks 無料トライアル

Databricks 無料トライアル