【強化学習】Ape-X の高速な実装を簡単に!


内部実装の技術的な話は Zenn に記事を投稿しました。
興味のある方はそちらをお読みください。

1. はじめに

以前紹介記事も書いていますが、強化学習の経験再生 (Experience Replay) 用のReplay Bufferのライブラリを開発して公開しています。

Experience Replay は強化学習で広く利用されているにも関わらず、多くの人がネット上等に公開されているコードをコピペしたりスクラッチで書き直したりと車輪の再発明に時間を費やしている状況は良くないと思い開発と公開を続けています。しかも意外とハマりどころがあり、非効率的な実装や同じ轍を踏むバグに遭遇したりと、深層学習に興味がある研究者・開発者たちにとっては頭の痛い問題であるので、サクッと利用可能なライブラリがあることは重要だと思っています。

(もちろんRLlibなど強化学習全体を内包した素晴らしいライブラリはありますが、実装されていないアルゴリズムを自作して乗せようとなるとなかなか骨が折れるため、強化学習のアルゴリズムの研究者には使い勝手が良くはないと思っています。また、先日公開されたDeepMind製のReverbは直接の競合相手ですが、あちらはもっと大規模なスケールが前提で、こちらは現状単一コンピュータでの利用が想定されています。)

主題からそれるので、ここでは詳しくは記載しませんが、深層学習のフレームワーク非依存で、かつ高い自由度と効率性を主眼に開発をしています。興味が湧いた方はぜひ利用してみて、レポジトリにスターをつけたり issue を立てたりとフィードバックしてもらえるととても嬉しいです。

2. Ape-X

強化学習を短い時間で学習させる手法のひとつとして分散学習の Ape-X が提案されています。ざっくり言うと、環境の探索とネットワークの学習を分離して、かつ複数の探索を同時に実行する方法です。

細かい解説や実装は Qiita 上にもしっかり書かれた素晴らしい記事がありました。

これらの実装でも、Replay Buffer やプロセス間データ通信をフルスクラッチで書かれているように思います。再利用する際に、「自分はTensorFlow派ではなくてPyTorch派なんだけど・・・」となるとコピペして関係する部分を全部書き直すのは大変だと思っています。

3. cpprb のインストール

今回の Ape-X 向けの機能は、 v9.4.2 以降が必要です。

3.1 Linux/Windows

PyPI からそのままバイナリをインストールできます。

pip install cpprb

3.2 macOS

残念ながらデフォルトで利用される clang ではコンパイルができないため、Homebrew なり MacPorts なりで、gcc を用意してもらいインストール時に手元でコンパイルしてもらうことが必要です。

/path/to/g++ はインストールした g++ のパスに置き換えてください。

CC=/path/to/g++ CXX=/path/to/g++ pip install cpprb

参考: 公式サイトのインストール手順

4. cpprb を利用した Ape-X サンプル実装コード

cpprb を利用した際の Ape-X のサンプル実装を以下に掲載します。深層学習のモデル(ネットワーク)部分は含んでいない Ape-X の骨格の部分のコードです。モックの MyModel の部分を変えてもらって、必要に応じて TensorBoard などの可視化を入れたりモデルの保存を加えたりすることで、実際に利用できるようになると思います。(その辺りの実装は、普段から強化学習を研究開発している方たちにとって難しくはないと思います。)

apex.py
from multiprocessing import Process, Event, SimpleQueue
import time

import gym
import numpy as np
from tqdm import tqdm

from cpprb import ReplayBuffer, MPPrioritizedReplayBuffer


class MyModel:
    def __init__(self):
        self._weights = 0

    def get_action(self,obs):
        # Implement action selection
        return 0

    def abs_TD_error(self,sample):
        # Implement absolute TD error
        return np.zeros(sample["obs"].shape[0])

    @property
    def weights(self):
        return self._weights

    @weights.setter
    def weights(self,w):
        self._weights = w

    def train(self,sample):
        # Implement model update
        pass


def explorer(global_rb,env_dict,is_training_done,queue):
    local_buffer_size = int(1e+2)
    local_rb = ReplayBuffer(local_buffer_size,env_dict)

    model = MyModel()
    env = gym.make("CartPole-v1")

    obs = env.reset()
    while not is_training_done.is_set():
        if not queue.empty():
            w = queue.get()
            model.weights = w

        action = model.get_action(obs)
        next_obs, reward, done, _ = env.step(action)
        local_rb.add(obs=obs,act=action,rew=reward,next_obs=next_obs,done=done)

        if done:
            local_rb.on_episode_end()
            obs = env.reset()
        else:
            obs = next_obs

        if local_rb.get_stored_size() == local_buffer_size:
            local_sample = local_rb.get_all_transitions()
            local_rb.clear()

            absTD = model.abs_TD_error(local_sample)
            global_rb.add(**local_sample,priorities=absTD)


def learner(global_rb,queues):
    batch_size = 64
    n_warmup = 100
    n_training_step = int(1e+4)
    explorer_update_freq = 100

    model = MyModel()

    while global_rb.get_stored_size() < n_warmup:
        time.sleep(1)

    for step in tqdm(range(n_training_step)):
        sample = global_rb.sample(batch_size)

        model.train(sample)
        absTD = model.abs_TD_error(sample)
        global_rb.update_priorities(sample["indexes"],absTD)

        if step % explorer_update_freq == 0:
            w = model.weights
            for q in queues:
                q.put(w)


if __name__ == "__main__":
    buffer_size = int(1e+6)
    env_dict = {"obs": {"shape": 4},
                "act": {},
                "rew": {},
                "next_obs": {"shape": 4},
                "done": {}}
    n_explorer = 4

    global_rb = MPPrioritizedReplayBuffer(buffer_size,env_dict)

    is_training_done = Event()
    is_training_done.clear()

    qs = [SimpleQueue() for _ in range(n_explorer)]
    ps = [Process(target=explorer,
                  args=[global_rb,env_dict,is_training_done,q])
          for q in qs]

    for p in ps:
        p.start()

    learner(global_rb,qs)
    is_training_done.set()

    for p in ps:
        p.join()

    print(global_rb.get_stored_size())

見ていただければわかるように、グローバル・バッファとして利用している MPPrioritizedReplayBuffer (Multi-Process サポートな Prioritized Replay Buffer) は複数プロセスから、特に意識することなくアクセスすることができます。内部データを共有メモリに乗せているので プロキシ (multiprocessing.managers.SyncManager 等) や、キュー (multiprocessing.Queue 等) よりも高速なプロセス間データ共有を実現しています。

また、データの不整合が起きないようにするロックも各メソッドの内部で実行しているので、複数 explorer + 単一 learner という基本構成をユーザーが守っている限り、手動でロックをかける必要はありません。しかもバッファ全体をロックするわけではなく、データの整合性を保つために必要な最小限のクリティカル・セクションだけをロックしているので下手にグローバル・バッファ全体をロックするよりも非常に効率的です。(特に深層学習のネットワークが小さいとか、シミュレーターなどの環境の計算負荷が軽いなどの状況下ではその差は大きくなり、厳密な試験ではないですが手元で開発している際には単純なグローバル・バッファ全体のロックと比較して、explorerの速度が3-4倍、learnerの速度が1.2-2倍程度になりました。)

5. おまけ

友人が cpprb を利用して、TensorFlow 2.x 向けの強化学習ライブラリ tf2rl を開発しています。
そちらも興味があればよろしくおねがいします。

作者本人の紹介記事 → tf2rl: TensorFlow2 Reinforcement Learning

追記

この記事の基礎となる Experience Replay をcpprbで実装するケースの記事を書きました。