【UniRx】AsyncSubjectの紹介


AsyncSubjectとは

AsyncSubjectとはSubjectシリーズの1つであり、非同期処理を扱うのに特化したSubjectという位置づけです。
ふるまいはJavaScriptのPromiseやScalaのFutureとほぼ同じです。将来、結果が1つだけ確定する場合において用いることができる非同期処理向けのSubjectです。

UniRxにおけるObservableはストリームの長さが 1~無限 の長さを扱うことができる仕組みになっています。
このとき、ストリームの長さをあえて1に固定してしまうことで、Future/Promiseと全く同じことができるようになるという発想のもと作られたのがこのAsyncSubjectです。
Observableの方がFuture/Promiseよりも上位概念であるとも言えます)

AsyncSubjectのふるまい


(画像引用: ReactiveX)

AsyncSubjectのふるまいは次の通りです。

  • OnNextしても値をすぐに発行しない
  • OnCompletedが実行されたときにはじめてOnNextを1つだけ発行する
  • 発行するOnNextは一番最後の値となる
  • Complete済みのAsyncSubjectをSubscribeした場合、結果のOnNextとOnCompletedを直ちに発行する

Promiseresolve(value)OnNext(value) + OnCompletedに分解されたと、と考えてもらうとわかりやすいかもしれません。

つかいみち

ふつうのSubjectの場合はSubscribeするより以前に発行されたOnNextは購読できないという性質でした。
ですがこのAsyncSubjectの場合はSubscribeのタイミングによらずにAsyncSubjectが完了したら必ずOnNextとOnCompletedを受け取ることができるようになっています。

この性質から、非同期処理のコールバックの実装に非常に向いています(というかもともとそのための用途のSubjectですし)。

例:オブジェクトの初期化順序を制御する

いきなり変則的な例ですが、便利なので紹介します。
オブジェクトAとオブジェクトBの2つがあり、必ず「A → B」の順番で初期化して欲しいという例を考えてみます。

Unityの場合ですとScript Execution Orderを使いスクリプトの実行順序そのものを調整することで実現することができますが、「実行順序に依存する」という情報がソースコードからUnity側に漏れてしまうため非常に気持ち悪くできれば使いたくない機能です。

そこでAsyncSubjectを使い、初期化順序をうまく実装してみましょう。

実装例

ObjectA(先に初期化したいほう)
using UniRx;
using UnityEngine;

namespace AsyncSubjectSample
{
    /// <summary>
    /// 先に初期化されて欲しい方
    /// </summary>
    public class ObjectA : MonoBehaviour
    {
        private AsyncSubject<Unit> _initializedAsyncSubject = new AsyncSubject<Unit>();

        public IObservable<Unit> OnInitializedAsync
        {
            get { return _initializedAsyncSubject; }
        } 

        void Start()
        {
            Debug.Log("ObjectAのStartが実行されました");

            //ここでAの初期化処理

            Debug.Log("ObjectAの初期化が終わりました");

            //初期化完了通知
            _initializedAsyncSubject.OnNext(Unit.Default);
            _initializedAsyncSubject.OnCompleted();
        }

    }
}
ObjectB(Aの後に初期化したいほう)
using UniRx;
using UnityEngine;

namespace AsyncSubjectSample
{
    /// <summary>
    /// Aの後に初期化されて欲しい方
    /// </summary>
    public class ObjectB : MonoBehaviour
    {

        [SerializeField]
        private ObjectA objectA;

        void Start()
        {
            Debug.Log("ObjectBのStartが実行されました");

            // Aの初期化完了通知が来たらBを初期化する
            // OnInitializedAsyncが既にCompleted済みなら直ちに実行される
            objectA.OnInitializedAsync.Subscribe(_ => Initialize());

        }

        void Initialize()
        {
            //ここでBの初期化処理

            Debug.Log("ObjectBの初期化が終わりました");
        }
    }
}

結果

  • BのStart()の方が先に実行された場合
ObjectBのStartが実行されました
ObjectAのStartが実行されました
ObjectAの初期化が終わりました
ObjectBの初期化が終わりました
  • AのStart()の方が先に実行された場合
ObjectAのStartが実行されました
ObjectAの初期化が終わりました
ObjectBのStartが実行されました
ObjectBの初期化が終わりました

このように、Start()の実行順序とは関係なく必ずA → Bの順序で初期化が実行されるようになりました。

まとめ

AsyncSubjectは他のSubjectと比較して挙動が違うので注意が必要です。

とくに、RxはObservableのストリーム長を1に固定することで、Promise/Futureと全く同じ非同期処理の考え方に載せることができると言うことができ、その実装の1つがAsyncSubjectというわけです。

おまけ

AsyncSubject<Unit>をラップするオブジェクトを作るとちょっと便利だったので紹介します。

Single
using System;
using UniRx;

namespace UniRxExt
{
    /// <summary>
    /// 名前がRxJavaのSingleとかぶってるけど動作は違います
    /// </summary>
    public class Single : IObservable<Unit>, IDisposable
    {
        private readonly AsyncSubject<Unit> _asyncSubject = new AsyncSubject<Unit>();
        private readonly object lockObject = new object();

        public void Done()
        {
            lock (lockObject)
            {
                if (_asyncSubject.IsCompleted) return;
                _asyncSubject.OnNext(Unit.Default);
                _asyncSubject.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<Unit> observer)
        {
            lock (lockObject)
            {
                return _asyncSubject.Subscribe(observer);
            }
        }

        public void Dispose()
        {
            lock (lockObject)
            {
                _asyncSubject.Dispose();
            }
        }
    }
}

使い方

ObjectA
using UniRx;
using UniRxExt;
using UnityEngine;

namespace AsyncSubjectSample
{
    /// <summary>
    /// 先に初期化されて欲しい方
    /// </summary>
    public class ObjectA : MonoBehaviour
    {
        private Single _single = new Single();

        public IObservable<Unit> OnInitializedAsync
        {
            get { return _single; }
        } 

        void Start()
        {
            Debug.Log("ObjectAのStartが実行されました");

            //ここでAの初期化処理

            Debug.Log("ObjectAの初期化が終わりました");

            //初期化完了通知
            // OnNext + OnCompletedをくっつけて呼べるだけ
            _single.Done();
        }
    }
}