IO<T>からIO<Exception>を抽出しようとしてみた


何がしたかったか

適当なストリームが吐く例外を集めたストリームを拾ってみたくなった。

例えば、偶数をそのまま通して、奇数を入れるとエラーを吐くIO<int>があって、

public class EvenReceiver : IObservable<int>
{
    private Subject<int> subject = new Subject<int>();
    public IDisposable Subscribe(IObserver<int> observer)
    {
        var normal = subject
            .Where(x => x % 2 == 0)
            .Subscribe(x => observer.OnNext(x));
        var error = subject
            .Where(x => x % 2 != 0)
            .Subscribe(x => observer.OnError(new ArgumentException("Argument must be even.")));
        return Disposable.Create(() => { normal.Dispose(); error.Dispose(); });
    }

    public void Receive(int x)
    {
        subject.OnNext(x);
    }
}

こう書いたら

EvenReceiver rcv = new EvenReceiver();
var success = rcv.Retry();
var failure = rcv.Failure();

success.Subscribe(Console.WriteLine);
failure.Subscribe(Console.WriteLine);

rcv.Receive(0);
rcv.Receive(1);
rcv.Receive(2);
rcv.Receive(3);
rcv.Receive(4);
rcv.Receive(5);

こうなってほしい

0
System.ArgumentException: Argument must be even.
2
System.ArgumentException: Argument must be even.
4
System.ArgumentException: Argument must be even.

大本のストリーム(EvenReceiver) を int ではなく
Eitherなどを作ってそいつのストリームにすれば簡単そうだが、
大本がいじれない場合どうするか?

やってみた

こんな感じだろうか

public static class FailureExtension
{
    public static IObservable<Exception> Failure<T>(this IObservable<T> source)
    {
        var error = new Subject<Exception>();
        Reconnect(source, error);
        return error;
    }

    private static void Reconnect<T>(IObservable<T> source, IObserver<Exception> error)
    {
        source.Subscribe(
            _ => { },
            ex =>
            {
                error.OnNext(ex);
                Reconnect(source, error);
            },
            () => { });
    }
}

まわりくどいことをしてしまっているような気がしてならない。