C#8.0までに非同期ストリームのことを知っておく


やりたいこと

C#8.0で追加される予定の 非同期ストリーム でどんなことができるのか、どんな書き方をするのかを知っておこう、というのがこの記事の趣旨です。

気になる点があれば、遠慮なくコメントしてください。

ずばり、できるようになったことは?

非同期ストリーム は「複数の値を順に扱いたい」かつ「非同期な処理をシンプルに書きたい」というのを実現させるための機能だといえそうです。
「複数の値を順に扱う」というのは、イテレータの機能で、従来よりyieldforeachなどで知られている機能のことです。
「非同期な処理をシンプルに書きたい」というのは、C#5.0の頃に追加された非同期機能で、async,awaitおよびTaskなどとして知られている機能のことです。
つまり、既存の2つの機能をうまく融合させた機能が「非同期ストリーム」なのです。

できなかったこと

これまでは1つのメソッドで、async/awaitキーワードと、yieldキーワードの共存ができませんでした。

// [C#7.3まで]async/await と yield は共存できない
// async IEnumerable<int> DoAsync(){

C#8.0からはこれが書ける

asyncメソッド、かつ、IAsyncEnumerable<T>またはIAsyncEnumerator<T>型をreturnするメソッドを書いた場合、メソッドの内部でyieldキーワードを利用できるようになりました。 つまり async/awaitキーワードと、yieldキーワードが共存できます。

async_awaitとyieldの夢の競演
async IAsyncEnumerable<int> DoAsync(){
   await Task.Delay(1000);
   yield return 1;
}

同期イテレータとの対応

同期 非同期
IEnumerable<T> IAsyncEnumerable<T>
.GetEnumerator() .GetAsyncEnumerator()
IEnumerator<T> IAsyncEnumerator<T>
.Current .Current
.MoveNext() .MoveNextAsync()
IDisposable IAsyncDisposable<T>
.Dispose() .DisposeAsync()

気づき:MoveNextAsync() では ValueTask が使用されている。

非同期ストリームを消費する

IAsyncEnumerable<T>returnされても、これを使う側がないと片手落ちです。IEnumerable<T>を使うためにforeachを使っていたのと同様、IAsyncEnumerable<T>でも、await foreachが使えるようになりました。
これも違和感なく使える構文になっていると思います。

await foreach(var item in asyncStream) {
    Console.WriteLine(item);
}

どんな動きになるのかな?

パターン1

このメソッドを
async IAsyncEnumerable<int> DoAsync() {
    foreach(var item in Enumerable.Range(1, 100).Select(x => x * 5)) {
        await Task.Delay(item);
        yield return item;
    }
}
こう呼び出す
async ValueTask DoSomething() {
    await foreach(var item in DoAsync()) {
        Console.WriteLine(item);
    }
}

これは動きが予想しやすいと思います。
5,10,15,...の数字が初めは勢いよく、徐々にスローダウンしながら表示されます。

パターン2

拡張メソッドをこう定義して
public static class Ex{
    // 受けとったTaskの配列をIAsyncEnumerableとしてreturnする拡張メソッド
    public static IAsyncEnumerable<T> AsAsyncEnumerable<T>(
            this IEnumerable<Task<T>> tasks
            ) => tasks switch {
        null => throw new ArgumentNullException(nameof(tasks)),
        IEnumerable<Task<T>> ts => ts.AsAsyncEnumerableImpl(),
    };
    static async IAsyncEnumerable<T> AsAsyncEnumerableImpl<T>(
            this IEnumerable<Task<T>> tasks
            ) {
        foreach(var task in tasks) {
            yield return await task;
        }
    }
}
こう呼び出す
async ValueTask DoSomething() {
    var tasks = new List<Task<int>>();
    tasks.Add(Task.Delay(1000).ContinueWith(_ => 1));
    tasks.Add(Task.Delay(3000).ContinueWith(_ => 2));
    tasks.Add(Task.Delay(5000).ContinueWith(_ => 3));
    tasks.Add(Task.Delay(2000).ContinueWith(_ => 4));
    tasks.Add(Task.Delay(4000).ContinueWith(_ => 5));
    await foreach(int item in 
            AsyncEnumerableEx.AsAsyncEnumerable(tasks)) {
        Console.WriteLine(item);
    }
}

Taskを使い慣れている人ならば、間違えないと思いますが、開始からおよそ1秒後に1が、そのおよそ2秒後に2が、そのおよそ2秒後に3,4,5が表示されます。
ListAddした時点でTaskが動き始めていることが重要ですね。

今回はTask.Delay()のような無駄な処理を呼び出していますが、これがダウンロード処理と考えれば使い道は多そうな気がします。非同期処理とは言っていますが、結果は最初に指定した順に取り出されるので、処理も追いかけやすいはずです。

まとめ

  • 非同期ストリームは、既存の2機能「イテレータ」と「非同期処理」の力強い融合
  • IAsyncEnumerable<T>またはIAsyncEnumerator<T>
    • 同期版のインタフェースやメソッドにAsyncが付いただけなので、違和感なく扱えそう。
  • 列挙するときは、await foreachを使う。
  • 非同期ストリーム機能の追加自体に難しいことはなさそう。難しいとすれば、それは非同期処理そのもの。

※C#8.0は現在Preview版です。Preview版で動作確認の上、当記事を書いていますが、記事の内容と異なる構文になる可能性もありますのでご了承ください。

今回書けなかったこと
  • 非同期ストリームの例外処理やCancellationToken
  • IAsyncEnumerable<T>IAsyncEnumerator<T>の自力実装
  • await foreachのダックタイピング的な挙動
  • 非同期LINQ的なこと
過去記事