BehaviorSubjectの初期値がない版


背景

BehaviorSubjectは、最後に発行された値をキャッシュしてくれるため、あとから購読(Subscribe)しても値が取得できるという特性を持つSubjectです。便利。

しかしながら、BehaviorSubjectはコンストラクタで初期値を設定してやる必要があるため、利用場面によっては流れてくる値が「初期値」なのか「発行された値(null値?)」なのかを区別できるように定義する必要があって面倒なときがあります(もちろん、初期状態を定義できるものであればBehaviorSubjectで十分なんですが)。

そこで、BehaviorSubjectの初期値がない版を作ってみました。

[2020-04-09 追記]
- OnNext後にOnCompletedやOnErrorを呼んだ後にSubscribeしたときの挙動がおかしかったので修正。
- コメントで教えていただきました( https://qiita.com/Azleep/items/70ef913ccff1592458f0#comment-7a8c8c31686f79910633 )が、Valueプロパティが必要ないならReplaySubject<T>(1)で十分です。

BehaviorSubjectの初期値がない版

期待する挙動としては、

事前にOnNext Subscribeしたときの挙動
されてる 事前にOnNextされた値が流れる
されてない 値が流れない

こんな感じです。

/// <summary>
/// 状態を持つSubject。Subscribeしたときに最後に通知された値を流す。一度も通知されていない場合は流れない。
/// </summary>
public class StatefulSubject<T> : ISubject<T>, IDisposable
{
    private readonly Subject<T> _Implement = new Subject<T>();

    /// <summary>
    /// 現在の値
    /// </summary>
    public T Value => this._Value;
    private T _Value = default;

    /// <summary>
    /// 値が設定済みならtrue
    /// </summary>
    public bool HasValue => this._HasValue;
    private bool _HasValue = false;

    private Exception _Exception;

    private NotificationKind _LatestNotificationKind = NotificationKind.None;

    /// <summary>
    /// Provides the observer with new data.
    /// </summary>
    /// <param name="value">The current notification information.</param>
    public void OnNext(T value)
    {
        lock (this)
        {
            if (this._LatestNotificationKind == NotificationKind.Completed ||
                this._LatestNotificationKind == NotificationKind.Error)
            {
                return;
            }

            this._Value = value;
            this._HasValue = true;
            this._LatestNotificationKind = NotificationKind.Next;
            this._Implement.OnNext(value);
        }
    }

    /// <summary>
    /// Notifies the observer that the provider has experienced an error condition.
    /// </summary>
    /// <param name="error">An object that provides additional information about the error.</param>
    public void OnError(Exception error)
    {
        lock (this)
        {
            if (this._LatestNotificationKind == NotificationKind.Completed ||
                this._LatestNotificationKind == NotificationKind.Error)
            {
                return;
            }

            this._Exception = error;
            this._LatestNotificationKind = NotificationKind.Error;
            this._Implement.OnError(error);
        }
    }

    /// <summary>
    /// Notifies the observer that the provider has finished sending push-based notifications.
    /// </summary>
    public void OnCompleted()
    {
        lock (this)
        {
            if (this._LatestNotificationKind == NotificationKind.Completed || 
                this._LatestNotificationKind == NotificationKind.Error)
            {
                return;
            }

            this._LatestNotificationKind = NotificationKind.Completed;
            this._Implement.OnCompleted();
        }
    }

    /// <summary>
    /// Notifies the provider that an observer is to receive notifications.
    /// </summary>
    /// <param name="observer">The object that is to receive notifications.</param>
    /// <returns>A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.</returns>
    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (this)
        {
            if (!this._Implement.IsDisposed)
            {
                if (this._HasValue)
                {
                    observer.OnNext((T)this._Value);
                }

                switch (this._LatestNotificationKind)
                {
                    case NotificationKind.Next:
                    case NotificationKind.None:
                        break;
                    case NotificationKind.Error:
                        observer.OnError(this._Exception);
                        break;
                    case NotificationKind.Completed:
                        observer.OnCompleted();
                        break;
                }
            }
            return this._Implement.Subscribe(observer);
        }
    }

    /// <summary>
    /// Indicates whether the subject has observers subscribed to it.
    /// </summary>
    public bool HasObservers => this._Implement.HasObservers;

    /// <summary>
    /// Indicates whether the subject has been disposed.
    /// </summary>
    public bool IsDisposed => this._Implement.IsDisposed;

    /// <summary>
    /// 破棄
    /// </summary>
    public void Dispose()
    {
        this._Value = default;
        this._Exception = default;
        this._Implement?.Dispose();
    }

    /// <summary>
    /// 通知の種類
    /// </summary>
    private enum NotificationKind
    {
        /// <summary>
        /// 未通知
        /// </summary>
        None,

        /// <summary>
        /// OnNext
        /// </summary>
        Next,

        /// <summary>
        /// OnError
        /// </summary>
        Error,

        /// <summary>
        /// OnCompleted
        /// </summary>
        Completed,
    }
}

RxのSubjectのコードを見てるともっと賢い排他の方法を使ってるっぽくて、性能を追求する場合はそのあたりの改善が必要です。

いちおう、Rxの他のSubjectの動きを見ながらそれっぽく動くようにしたつもりですけど、挙動に変なところがあるかもしれません。お許しください。