IO<T>からIO<Exception>を抽出しようとしてみた
6964 ワード
何がしたかったか
適当なストリームが吐く例外を集めたストリームを拾ってみたくなった。
例えば、偶数をそのまま通して、奇数を入れるとエラーを吐くIO<int>
があって、
public class EvenReceiver : IObservable<int>
{
private Subject<int> subject = new Subject<int>();
public IDisposable Subscribe(IObserver<int> observer)
{
var normal = subject
.Where(x => x % 2 == 0)
.Subscribe(x => observer.OnNext(x));
var error = subject
.Where(x => x % 2 != 0)
.Subscribe(x => observer.OnError(new ArgumentException("Argument must be even.")));
return Disposable.Create(() => { normal.Dispose(); error.Dispose(); });
}
public void Receive(int x)
{
subject.OnNext(x);
}
}
こう書いたら
EvenReceiver rcv = new EvenReceiver();
var success = rcv.Retry();
var failure = rcv.Failure();
success.Subscribe(Console.WriteLine);
failure.Subscribe(Console.WriteLine);
rcv.Receive(0);
rcv.Receive(1);
rcv.Receive(2);
rcv.Receive(3);
rcv.Receive(4);
rcv.Receive(5);
こうなってほしい
0
System.ArgumentException: Argument must be even.
2
System.ArgumentException: Argument must be even.
4
System.ArgumentException: Argument must be even.
大本のストリーム(EvenReceiver
) を int
ではなく
Either
などを作ってそいつのストリームにすれば簡単そうだが、
大本がいじれない場合どうするか?
やってみた
こんな感じだろうか
public static class FailureExtension
{
public static IObservable<Exception> Failure<T>(this IObservable<T> source)
{
var error = new Subject<Exception>();
Reconnect(source, error);
return error;
}
private static void Reconnect<T>(IObservable<T> source, IObserver<Exception> error)
{
source.Subscribe(
_ => { },
ex =>
{
error.OnNext(ex);
Reconnect(source, error);
},
() => { });
}
}
まわりくどいことをしてしまっているような気がしてならない。
Author And Source
この問題について(IO<T>からIO<Exception>を抽出しようとしてみた), 我々は、より多くの情報をここで見つけました https://qiita.com/ozwk/items/2e0d7cf8a226f4daf5a4著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .