golangのchannelの実装を読んでみた


以前、きたるCPU10000コア時代に向けて近年で一番流行ったconcurrencyのおもちゃであるchannelをC++に実装した
( C++20 で goroutine/Channel みたいなことをやるライブラリ作った )
その際、golangのchannelを実装を読んだので実装について解説する

Channelとは?

機能的にはthreadでblocking_queueとか言われてるものと大体同じである

以下のような特徴を持つ
- (thread+mutexベースではなく)goroutine上で動く
- 有限のキューを用いる
- キューの要素数は0を許容する
- close機能がある

goroutineとは?

どうしてもgoroutineという言葉の定義が見つけられなかったが、goroutineという言葉はおおよそ下の2つの意味で使われていると思う

  1. なんかいい感じに実行・中断・再開してくれるgolangの軽量スレッドのこと
  2. golangのスケジューラのうちasyncのTask相当の、「実行できるもの」

なお、goroutineはソースコード上はg と省略される

参考

読む

早速channelの実装を見ていく
https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go

channelの構造体定義

読み進めると分かるが、大体下のような構造になっている

struct chan{
  std::deque<value_type> value_queue; // 本当はRingBufferだけどまぁ大体同じでしょ
  std::list<RecvSudog> recv_queue;
  std::list<SendSudog> send_queue;
  bool closed = false;
  std::mutex mutex;
};

channelは上のように主に3つのキューで出来ている
value_queueは値を保存しておくqueueである
send/recv_queueは送受信まちのgoroutineを保存しておくためのキューである

chanの生成

ここではchannelの生成を行っている
キューの要素or要素の型のサイズが0であるときvalue_queueを小さくする最適化をやってるが、それ以外面白いとこはないので読み飛ばす

送信処理の実装

送信処理の挙動は以下である

  1. channelがnilチャネルだったりclose済みの場合、何もしなかったり死んだりする
  2. recv_queueに受信待ちのgoroutineが存在する場合、そのgoroutineに要素を送信する
  3. value_queueに空きがある場合、value_queueに要素をpushする
  4. 自身のgoroutineをsend_queueにenqueueし、goroutineを中断する

これを頭に入れてからソースを読むと読みやすくなる。
入れたと思うのでさっそく読んでいく

上の方:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L158-L200
mutex(OSのlock)を用いず、atomic命令だけで実装できる処理や最適化を行っている。

recv_queueに要素がある場合:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L293
recv_queueからgoroutineをひとつ取り出し、値をgoroutineに渡し、channelのlockを解除し、取り出したgoroutineを再開可能(≒thread_poolにpost)にする

ここで注目すべきはlock解除後にgoroutineを再開可能を再開可能にしている点である。これは
「goroutineのスタックサイズの変更処理時にchannelのlock取ろうとしてdeadlockになる」という問題に起因している。
詳しくはこの辺のissueを見てほしい https://github.com/golang/go/issues/12967

value_queueに空きがある場合:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L214
解説するほど面白いとこはないので各自実装読んでください

送信待ちqueueにenqueueする場合:https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L258

gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

ここのgopark()(https://github.com/golang/go/blob/fd6ba1c8a23d8a3fffb6c475b21f78510152ef5c/src/runtime/proc.go#L319) による中断とlockの解放について説明する

goparkを呼び出すと
1. 現在のgoroutineを中断する
2. goroutineの外(machine:goroutineとは?の項を参照)でlockの開放処理を行う

という流れになる。

なぜ「oroutineの外(machine)でlockの開放処理を行う」というようなややこしい処理をしているかというと、
先にchannelのlockを開放してした場合、他の処理がchannelを使用することによりgoroutineが中断される前にgoroutineが再開させられてしまう可能性があるためである

これを防ぐにはgoroutineが同時実行されないようにgoroutineにmutexをつければよいのだが、その場合追加mutexコストが必要になる
それを防ぐために現在のような実装になってると思われる

受信処理・closeの実装

送信処理と大体おなじなので各自読んでください

selectの実装

selectは(なぜか)別ファイルに分かれている

selectの挙動は以下である
1. (nil channelやdefaultを含めて)即時実行可能なものがあればその操作を実行する
2. selectの対象になるすべてのChannelに行いたい操作を実行待ちqueue(send|recv_queue)にenqueueし、中断する
3. いずれかのchannelが実行され、goroutineが再開される
4. enqueueしたもののうち、実行しなかったものをキャンセルする

これを頭に入れてからソースを読むと読みやすくなる。入れたと思うのでさっそく読んでいく

pollorderの決定や各channelのlock獲得処理:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L121-L230

pollorderは「即時実行可能なものがあればその操作を実行する」さい、どの順序で探索するか?を決定するlistである
実装を見ると分かるが、その順序はランダムになっている
これは非同期処理のエミュレートである

lockの獲得についてはchannelの構造体のメモリアドレスの順にlockを獲得するようにしている
これは複数のchannnelのlockを獲得する処理が複数存在する場合、全員が同じ順序でlockを獲得しないとdeadlockになる恐れがあるためである

実行可能なものの探索:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L243

// pass 1 - look for something already waiting

のコメントの通り実行可能な操作を探して見つけ次第実行する

すべてのchannelへのenqueue:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L288-L327

// pass 2 - enqueue on all chans

すべてのchannelにenqueueして中断する

実行・再開:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L327
いずれかのchannelによって再開されるとpass3が実行される
https://github.com/golang/go/blob/7307e86afda3c5c7f6158d2469c39606fd1dba65/src/runtime/chan.go#L819
実行時の処理として、dequeue時にg.selectDoneに対しCAS操作を行うことで複数のchannelがselectの処理を実行しないようにしている

再開後のあと処理:https://github.com/golang/go/blob/c06a354bccf60ea32ed74238be409a00aac292c5/src/runtime/select.go#L336

// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.

pass3では pass2でenqueueしたもののうち、使用しなかったものをキューから削除する

おわりに

golangのchannelはよくよく見ていくと割とややこしい処理をしている