[追記]Go言語でチャンネル書込がブロックされるのを無理矢理解決した話


概要

Go言語のチャンネルは非常に便利なんですが、正直複雑すぎてあまり使いこなせていません。

この記事は、チャンネルの受信側のgoroutineが先に死んだ場合に、送信側のチャンネル書き込みが永久にブロックされるのを無理矢理解決した事例です。

正直こんな書き方がGo言語的に良いのかどうかわからないので、ご意見等いただけると嬉しいです。

バッドノウハウかもしれないのでご注意を!!
[追記]もしかしてBind呼ぶたびに終わらないgoroutineが増えるかも?

参考

Go言語のチャンネル書き込みの時に…

条件 結果
受信ルーチンが生きていてチャンネルが満杯でない ブロックしない
受信ルーチンが生きていてチャンネルが満杯 チャンネルが空くまでブロック
受信ルーチンが死んでいる 永久にブロック
チャンネルにnilが代入されている 永久にブロック
チャンネルがcloseされている panicする

panicするコード

Playgroundへのリンク

panic.go
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    fmt.Println("Hello, playground")
    ch := make(chan string)
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer func() {
            wg.Done()
            fmt.Println("Writer Finished")
        }()
        ch <- "AAA"
        time.Sleep(time.Second * 6)
        ch <- "BBB"
    }()
    wg.Add(1)
    go func() {
        defer func() {
            wg.Done()
            fmt.Println("Reader Finished")
        }()
        for i := 0; i < 5; i++ {
            select {
            case v := <-ch:
                fmt.Printf("OK:%v\n", v)
            default:
                fmt.Println("NG")
            }
            time.Sleep(time.Second)
        }
    }()
    wg.Wait()
}

受信側のルーチンが5秒で死んで、送信側が6秒後に新しい値を書こうとしてpanicします。

無理矢理解決したコード

Playgroundへのリンク

muriyari.go
package main

import (
    "fmt"
    "sync"
    "time"
)

var m sync.Map

func main() {
    fmt.Println("Hello, playground")
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer func() {
            wg.Done()
            fmt.Println("Writer Finished")
        }()
        m.Store("AAA", "BBB")
        m.Store("CCC", "DDD")
        time.Sleep(time.Second * 6)
        m.Store("EEE", "FFF")
    }()
    wg.Add(1)
    go func() {
        defer func() {
            wg.Done()
            fmt.Println("Reader Finished")
        }()
        ch := make(chan interface{})
        Bind(ch, "CCC")
        for i := 0; i < 5; i++ {
            select {
            case v := <-ch:
                fmt.Printf("OK:%v\n", v)
            default:
                fmt.Println("NG")
            }
            time.Sleep(time.Second)
        }
    }()
    wg.Wait()
    fmt.Println("Main Finished")
}

func Bind(ch chan interface{}, key string) {
    go func() {
        for {
            if v, ok := m.LoadAndDelete(key); ok {
                ch <- v
            }
            time.Sleep(time.Millisecond * 200)
        }
    }()
}

チャンネルに直接書き込むのではなく、sync.Mapに書き込んでるのでブロックしません。sync.Mapに書き込むところをきちんと型を書いた新たなfuncを定義してやれば、型チェックは働きます。上で定義しているBind関数も同様です。

ちなみに最初はBind関数内でチャンネルを作ろうとしてたんですが、関数を抜けた時点で変数が解放されるのでpanic起こしてダメでした。

パフォーマンスは計測してませんが、生のチャンネルを扱うよりは確実に悪いでしょう。sleepの時間調整である程度パフォーマンス調整が出来るかもしれません。

余談

チャンネルの容量を増やすことで満杯になるまではブロックしない書き込みになるんですが、無限に増やすわけにもいかないのでこんな方法を考えてみました。

設計が悪いのは明らかなんですが、一応なぜこんな設計を考えているのかの事情は説明しておきます。

それはCtrl+CやSIGTERMなどOSのシステムコール時に全てのGoroutineが後始末をしながら終了をして欲しいからで、その際にcontextを使ってルーチンを終了させているのですがルーチンの終了の順番が不定だからです。なので送信側ルーチンが先に死んで正常終了するケースと、受信ルーチンが先に死んで永久にプロセスが動き続ける場合がありました。

これはあくまで本当に無理矢理やった例ですので、もっと洗練されたやり方があったり、ライブラリがあればご紹介いただければ嬉しいです。