Rust: Futureのざっくりとした説明(0.3)


自分の理解の確認だから間違ってるかも。

タスクとFutute

タスクは一つのオートマトンで、Futureはその部分。

Futureは計算の状態、つまり「どこまで計算を進めたか」という概念を取り扱うことを示すトレイトだ。
乱暴にいえばawaitする単位が計算の状態になる。
例えば、以下の例でいえば「(0)初期状態」「(1)foo待ち」「(2)bar待ち」「(3)計算終わり」など。

async {
// (0)
    let x = ... ;
    let y = ... ;
// (1)
    let f = await!(foo(x, y)); 
// (2)
    let b = await!(bar(f));

    Ok(b)
// (3)
}

図にするとこう。

Futureは再帰的な構造になっている。図中(1)や(2)の中身もまたFutureで、分岐とかを考えなければ、全て展開すると一本の長いチェーンになる。このチェーンがタスク。async/awaitがオートマトンを繋げていく。

いわゆるExecutorは、タスクに対してポーリングを行い、オートマトンを進められるところまで進める。
進められないところというのは、再帰を辿って行き着いた先にあるIOの部分などだ。IOは本質的に低速なため、そこで計算を止めざるをえない。
このとき、IOを待ってExecutorがブロックしないよう、Futureではブロックを別スレッドに押し付けるように書いておく。そして、計算の準備ができるまで、Executorには計算結果の代わりにPendingを返しておく。
ExecutorはPendingを受け取ると、一旦そのタスクの処理を中断し、別のタスクのポーリングを行う。次にこのタスクをポーリングするときは前回の続きから始めて、同様に計算を進められるところまで進め、また中断する。これをタスクの完了まで繰り返す。

そもそも計算機自体がオートマトンで、プログラムカウンタやメモリ上の変数などの形で計算状態が表されている。
Futureはそれをもっと粗い単位で表現するものだという理解。

ライフタイム

Spawnに(スレッドをブロックせず)ぶん投げることができるのは'staticFutureだけだ。このタスクはいつ実行されるか分からないから、当然'staticでないと困る。
ただし、そのタスクの中で、タスク自身が所有権を持つものの参照を作り、その参照をタスク内で取り回す分には全くの合法になる。
これがいわゆるnon-'staticFuture

Webフレームワークでは、リクエストにつき一つのタスクを生成し、それをSpawnに投げてまた次のリクエストを待つ処理が必ずどこかに出来るはず。
このとき、アプリケーション全体の状態(コネクションプールとか)はArcで包んでリクエスト毎にクローンされ、タスク全体としては'staticにする。
そして、リクエストの処理中にはクローンされたものや処理中に生成したオブジェクトの参照だけを取り回したいはずだ。このときnon-'staticFutureが役立つ。

Waker

無駄なポーリングを防ぐために、ポーリングしていいことをExecutorに通知するための仕組み。

jsのPromiseに当てはめて考えるとわかりやすい。
Promiseはコンストラクタでresolverejectを貰う。
計算が完了したらこのいずれかを呼ぶ。これらはコールバックの中で呼ばれるだろう。

new Promise((resolve, reject) => {
    setTimeout(() => resolve("OK!"), 1)
})

RustのWakerは、いくつか(重大な)違いがあるものの、雰囲気としてはこれに近い。
Futureトレイトを実装する時必須になるメソッドのpollは、引数に&Wakerを貰う。
waker.wake()を呼ぶことで、Executorにこのタスクをポーリングしていいことを通知する。

fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
    if (*self).is_progress {
        if self.flag.load(Ordering::Relaxed) {
            return Poll::Ready(String::from("OK!"));
        } else {
            return Poll::Pending;
        }
    }

    (*self).is_progress = true;
    let flag = Arc::clone(&self.flag);
    let waker = waker.clone();

    thread::spawn(move || {
        thread::sleep(std::time::Duration::from_millis(1));
        flag.store(true, Ordering::Relaxed);
        waker.wake();
    });

    Poll::Pending
}

ゴツいのは許して。
ネイティブなマルチスレッドを触り、スレッド間で何らかの通信を行う以上、Futureの奥の方はどうしてもゴツくなる。どのタイミングでポーリングされてもいいように実装しておく必要があるため、その分の分岐も要る。

そのかわり、これを使うときにはasync/awaitで簡単。

async {
    let ok = await!(Fuga::new());
    println!("{}", ok); // => OK!
}

また、内部にFutureを持つ別のFutureを実装するのもそんなに難しくない。下のは上のasyncブロックと大体同じ。

impl Future for Hoge {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
        Future::poll(Pin::new(&mut self.fuga), waker).map(|ok| println!("{}", ok));
    }
}

力尽きたからここまで!