GO言語学習ノート-緩衝区Chanelsとスレッド池

10448 ワード

GO言語学習ノート-緩衝区Chanelsとスレッド池
6月04日、2018年Golang、菜鳥翻訳屋で発表されました.
http://www.hi-roy.com/2018/06/04/GO%E8%AF%AD%E8%A8%80%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0-%E 7%BC%93%E 5%86%B 2%E 5%8 C%BAChanels%E 5%92%8 C%E 7%BA%BF%E 7%E 7%E 8%E 6%B 1%A 0/?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io
原文、ここでは分かりやすいようにworker poolsをスレッド池に翻訳します.
バッファーチャンネルとは何ですか?
以前に検討したチャンネルは全部バッファがないので、読み取りと書き込みはブロックされます.バッファがあるチャンネルを作成することも可能です.このチャンネルはバッファが満杯になってから、空いているチャンネルを書き込みまたは読み出した時だけ閉塞されます.
バッファがあるチャンネルを作成するには、バッファサイズを示すために追加のパラメータ容量が必要です.
1
ch := make(chan type, capacity)
 
上のコードのcapacityは0より大きい必要があります.0に等しいなら、前に勉強した無バッファチャンネルチャンネルです.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

import (  
    "fmt"
)


func main() {  
    ch := make(chan string, 2)
    ch 
上記の例では、2つの容量のチャンネルを作成しましたので、2つの文字列を書き込む前の書き込みは滞りません.その後、それぞれ12、13行で読み取り、プログラム出力は以下の通りです.
naveen  
paul
 
もう一つの例
私たちはもう一つの例を見てみます.同時に実行するgoroutineで書き込み操作をして、main goroutineで読みます.この例はバッファチャンネルチャンネルチャンネルをよりよく理解するように助けてくれます.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main

import (  
    "fmt"
    "time"
)

func write(ch chan int) {  
    for i := 0; i < 5; i++ {
        ch 
上のコードは、容量が2のバッファチャネルを作成し、それをパラメータとしてwrite関数に伝達し、次にsleep 2秒を実行します.同時にwrite関数同時実行は、関数にforを使用して、chに0−4に書き込まれる.容量は2ですので、すぐにチャンネルに0と1を書き込み、ブロックして少なくとも1つの値が読み取られるのを待つことができます.プログラムはすぐに次の2行を出力します.
successfully wrote 0 to ch  
successfully wrote 1 to ch
 
メイン関数でsleep 2秒後、for rangeループに入ってデータの読み出しを開始し、その後sleep 2秒を継続する.プログラムは次に出力されます.
read value 0 from ch  
successfully wrote 2 to ch
 
このように循環してチャンネルが閉鎖されるまで、プログラムの最終出力は以下の通りです.
successfully wrote 0 to ch  
successfully wrote 1 to ch  
read value 0 from ch  
successfully wrote 2 to ch  
read value 1 from ch  
successfully wrote 3 to ch  
read value 2 from ch  
successfully wrote 4 to ch  
read value 3 from ch  
read value 4 from ch
デッド・ロック
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main

import (  
    "fmt"
)

func main() {  
    ch := make(chan string, 2)
    ch 
上のプログラムは、容量が2のチャンネルに3文字列を書きたいです.プログラムは11行まで実行するとブロックされます.もうチャンネルのバッファはいっぱいです.他のgoroutineからデータを読み込まないとプログラムがロックされます.エラーは以下の通りです
1
2
3
4
5
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:  
main.main()  
    /tmp/sandbox274756028/main.go:11 +0x100
 
長さと容量
容量とは、バッファがあるチャンネルが最大で同時にどれぐらいのデータを記憶できるかを指します.この値はmakeキーワードを使って、チャンネルを作成する時に使います.長さとは、現在のチャンネルにどれぐらいのデータが格納されているかをいう.私たちは下のコードを見ます.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main

import (  
    "fmt"
)

func main() {  
    ch := make(chan string, 3)
    ch 
 
上のコードの中で、容量が3のチャンネルを作って、中に2文字列を書きます.だから、今のチャンネルの長さは2です.次にチャンネルから文字列を1つ読みますので、現在の長さは1です.プログラム出力は以下の通りです.
capacity is 3  
length is 2  
read value naveen  
new length is 1
 
WaitGroup
次のセクションでは、スレッド池を紹介します.より良い理解のためには、WaitGroupを先に紹介してから、これに基づいてスレッド池を実現します.
WaitGroupはGoroutineのグループが全部実行されるのを待つために使用されます.その前にプログラムがブロックされます.もし私たちが3つのgoroutineを持っていたら、メインプログラムはこの3つのgoroutineが全部実行されてから終了します.コードを見て多く言わない:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (  
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {  
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended
", i) wg.Done() } func main() { no := 3 var wg sync.WaitGroup for i := 0; i < no; i++ { wg.Add(1) go process(i, &wg) } wg.Wait() fmt.Println("All go routines finished executing") }
 
WaitGroupはstructタイプで、18行でデフォルトのWaitGroupを作成しました.内部はカウンタに基づいて実現されます.Add方法を呼び出し、パラメータとしてその数値を伝え、カウンタは着信パラメータの値を増加させる.Doneメソッドを呼び出すと、カウンタは1を減算します.Wait方法はカウンタがゼロになるまでgoroutineをブロックします.
上記のコードは、ループ中にwg.Add(1)を呼び出してカウンタを3にし、同時に3つのgoroutineを起動し、カウンタがゼロになるまでwg.Wait()で主goroutineをブロックします.関数processでは、カウンタを低減するためにwg.Done()を呼び出すと、3つのgoroutineが実行されると、wg.Done()は3回実行され、カウンタはゼロになり、主goroutineはブロックを解除する.wgのアドレスをgoroutineに渡すことは非常に重要です.転送がアドレスではない場合、各goroutineにはコピーがあります.これで各goroutineが終了するとmain関数に通知できません.
プログラム出力は以下の通りです.
started Goroutine  2  
started Goroutine  0  
started Goroutine  1  
Goroutine 0 ended  
Goroutine 2 ended  
Goroutine 1 ended  
All go routines finished executing
 
あなたの出力結果は上とは少し違っているかもしれません.
スレッド池(worker pools)
バッファチャンネルの重要な使い方はスレッド池を実現することです.
通常、スレッド池とは、スレッドのセットが彼らに割り当てられるのを待っています.タスクが完了すると、次のタスクが待ち続けられます.
次に、入力デジタルの各ビットの和を計算するスレッドプールを実現する.たとえば入力123は9(1+2+3)を返し、スレッドに入力された数字は疑似ランダムアルゴリズムで生成される.
以下は私たちが必要なコアステップです.
  • グループgoroutineのセットを作成し、バッファチャンネルの待ち受けタスクを監視します.
  • は、バッファにジョブを追加します.
  • ジョブが終了したら、他のバッファチャンネルに結果を書き込みます.
  • は、格納結果のチャンネルからデータを読み出して出力する.
  • まずタスクと結果を格納する構造を作成します.
    1
    2
    3
    4
    5
    6
    7
    8
    
    type Job struct {  
        id       int
        randomno int
    }
    type Result struct {  
        job         Job
        sumofdigits int
    }
    
     
    Jobは、計算されるべき乱数を記憶するためにidrandomnoとを有する.Resultタイプは、結果を記憶するJob属性とsumofdigitsを含む.
    次にバッファチャンネルを作成してタスクと結果を受信します.
    1
    2
    
    var jobs = make(chan Job, 10)  
    var results = make(chan Result, 10)
    
     
    goroutineは、jobsからタスクを取得し、結果をresultに書き込みます.
    以下のdigits関数は、合計を計算し、結果を返すために用いられ、Sleepによって、消費時間の計算をシミュレーションする.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    
    func digits(number int) int {  
        sum := 0
        no := number
        for no != 0 {
            digit := no % 10
            sum += digit
            no /= 10
        }
        time.Sleep(2 * time.Second)
        return sum
    }
    
    次の関数はgoroutineを作成します.
    1
    2
    3
    4
    5
    6
    7
    
    func worker(wg *sync.WaitGroup) {  
        for job := range jobs {
            output := Result{job, digits(job.randomno)}
            results 
      jobsのジョブを読み出してResult構造を作成し、関数digitsが計算した結果を記憶し、resultsのこのチャンネルに書き込む.この関数は、WaitGroupタイプのポインタパラメータwgを受信し、計算が完了したらwg.Done()を呼び出す.createWorkerPoolこの関数は、スレッドプールを作成するために使用されます.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    func createWorkerPool(noOfWorkers int) {  
        var wg sync.WaitGroup
        for i := 0; i < noOfWorkers; i++ {
            wg.Add(1)
            go worker(&wg)
        }
        wg.Wait()
        close(results)
    }
    
     
    この関数は、noOfWorkers個のgoroutineを含むスレッド池を作成し、goroutineを作成する前にwg.Add(1)を呼び出してカウンタを増加させ、wgのアドレスをworkerの関数に渡す.作成が完了したら、wg.Wait()を使用して、すべてのgoroutineが実行されるのを待って、close関数を呼び出してresultsのこのチャンネルをオフにします.これで、goroutineはデータに書き込むことができなくなります.
    次に、関数を作成してスレッド池にタスクを割り当てます.
    1
    2
    3
    4
    5
    6
    7
    8
    
    func allocate(noOfJobs int) {  
        for i := 0; i < noOfJobs; i++ {
            randomno := rand.Intn(999)
            job := Job{i, randomno}
            jobs 
     
    上記の関数は、着信パラメータによって書き込みのジョブ数を決定し、乱数の最大値は998であり、ループ中のカウンタiをIDとしてjob構造を作成し、jobsに書き込み、完了したらjobsをオフする.
    次に関数を作成してresultsのこのチャンネルを読み出し、出力を印刷する.
    1
    2
    3
    4
    5
    6
    
    func result(done chan bool) {  
        for result := range results {
            fmt.Printf("Job id %d, input random no %d , sum of digits %d
    ", result.job.id, result.job.randomno, result.sumofdigits) } done
     
    上記の関数はresultsを読み、ID、乱数、結果をプリントし、最後にdoneのこのchanelにデータを書き込み、全ての結果がプリントされたことを示している.
    すべてのものがそろっています.main関数を完成させてください.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    
    func main() {  
        startTime := time.Now()
        noOfJobs := 100
        go allocate(noOfJobs)
        done := make(chan bool)
        go result(done)
        noOfWorkers := 10
        createWorkerPool(noOfWorkers)
        
     
    まずプログラムの実行開始時間を記録して、最後に終了時間から開始時間を差し引いてプログラムを実行します.時々、この時々に異なる数のスレッド池の違いを比較する必要があります.doneという名前のチャンネルを作成し、result関数に渡すことで、出力をプリントアウトし、すべての出力を完了した後に通知することができます.
    最後に10個のgoroutineのスレッドプールが作成され、doneを読み取ることによって計算が完了するのを待つ.
    完全コードは以下の通りです.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    
    package main
    
    import (  
        "fmt"
        "math/rand"
        "sync"
        "time"
    )
    
    type Job struct {  
        id       int
        randomno int
    }
    type Result struct {  
        job         Job
        sumofdigits int
    }
    
    var jobs = make(chan Job, 10)  
    var results = make(chan Result, 10)
    
    func digits(number int) int {  
        sum := 0
        no := number
        for no != 0 {
            digit := no % 10
            sum += digit
            no /= 10
        }
        time.Sleep(2 * time.Second)
        return sum
    }
    func worker(wg *sync.WaitGroup) {  
        for job := range jobs {
            output := Result{job, digits(job.randomno)}
            results 
     
    運転結果は以下の通りです.
    Job id 1, input random no 636, sum of digits 15  
    Job id 0, input random no 878, sum of digits 23  
    Job id 9, input random no 150, sum of digits 6  
    ...
    total time taken  20.01081009 seconds
    
     
    プログラムは100行の出力があります.100個のjobを作成したので、あなたの出力順序は私とは違ってもいいです.ハードウェアの構成によっては時間が違います.これは全部で20秒かかります.
    次にnoOfWorkersから20を上げて、スレッド池の中のgoroutineの数を高めました.運行時間は絶対に減らすべきです.私のマシンでは、プログラム出力は以下の通りです.
    ...
    total time taken  10.004364685 seconds
    
     
    このようにして、スレッド池でgoroutineが増加すると、プログラムの実行時間が減少することが分かります.結果を解析するためにmainnoOfJobsおよびnoOfWorkersの値を任意に調整することができる.