GO言語学習ノート-緩衝区Chanelsとスレッド池
10448 ワード
GO言語学習ノート-緩衝区Chanelsとスレッド池
6月04日、2018年Golang、菜鳥翻訳屋で発表されました.
http://www.hi-roy.com/2018/06/04/GO%E8%AF%AD%E8%A8%80%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0-%E 7%BC%93%E 5%86%B 2%E 5%8 C%BAChanels%E 5%92%8 C%E 7%BA%BF%E 7%E 7%E 8%E 6%B 1%A 0/?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io
原文、ここでは分かりやすいようにworker poolsをスレッド池に翻訳します.
バッファーチャンネルとは何ですか?
以前に検討したチャンネルは全部バッファがないので、読み取りと書き込みはブロックされます.バッファがあるチャンネルを作成することも可能です.このチャンネルはバッファが満杯になってから、空いているチャンネルを書き込みまたは読み出した時だけ閉塞されます.
バッファがあるチャンネルを作成するには、バッファサイズを示すために追加のパラメータ容量が必要です.
上のコードのcapacityは0より大きい必要があります.0に等しいなら、前に勉強した無バッファチャンネルチャンネルです.
例
もう一つの例
私たちはもう一つの例を見てみます.同時に実行するgoroutineで書き込み操作をして、main goroutineで読みます.この例はバッファチャンネルチャンネルチャンネルをよりよく理解するように助けてくれます.
メイン関数でsleep 2秒後、
このように循環してチャンネルが閉鎖されるまで、プログラムの最終出力は以下の通りです.
長さと容量
容量とは、バッファがあるチャンネルが最大で同時にどれぐらいのデータを記憶できるかを指します.この値は
上のコードの中で、容量が3のチャンネルを作って、中に2文字列を書きます.だから、今のチャンネルの長さは2です.次にチャンネルから文字列を1つ読みますので、現在の長さは1です.プログラム出力は以下の通りです.
WaitGroup
次のセクションでは、スレッド池を紹介します.より良い理解のためには、
WaitGroupはGoroutineのグループが全部実行されるのを待つために使用されます.その前にプログラムがブロックされます.もし私たちが3つのgoroutineを持っていたら、メインプログラムはこの3つのgoroutineが全部実行されてから終了します.コードを見て多く言わない:
WaitGroupはstructタイプで、18行でデフォルトのWaitGroupを作成しました.内部はカウンタに基づいて実現されます.
上記のコードは、ループ中に
プログラム出力は以下の通りです.
あなたの出力結果は上とは少し違っているかもしれません.
スレッド池(worker pools)
バッファチャンネルの重要な使い方はスレッド池を実現することです.
通常、スレッド池とは、スレッドのセットが彼らに割り当てられるのを待っています.タスクが完了すると、次のタスクが待ち続けられます.
次に、入力デジタルの各ビットの和を計算するスレッドプールを実現する.たとえば入力123は9(1+2+3)を返し、スレッドに入力された数字は疑似ランダムアルゴリズムで生成される.
以下は私たちが必要なコアステップです.グループgoroutineのセットを作成し、バッファチャンネルの待ち受けタスクを監視します. は、バッファにジョブを追加します. ジョブが終了したら、他のバッファチャンネルに結果を書き込みます. は、格納結果のチャンネルからデータを読み出して出力する. まずタスクと結果を格納する構造を作成します.
各
次にバッファチャンネルを作成してタスクと結果を受信します.
goroutineは、jobsからタスクを取得し、結果をresultに書き込みます.
以下の
この関数は、
次に、関数を作成してスレッド池にタスクを割り当てます.
上記の関数は、着信パラメータによって書き込みのジョブ数を決定し、乱数の最大値は
次に関数を作成して
上記の関数は
すべてのものがそろっています.
まずプログラムの実行開始時間を記録して、最後に終了時間から開始時間を差し引いてプログラムを実行します.時々、この時々に異なる数のスレッド池の違いを比較する必要があります.
最後に10個のgoroutineのスレッドプールが作成され、
完全コードは以下の通りです.
運転結果は以下の通りです.
プログラムは100行の出力があります.100個のjobを作成したので、あなたの出力順序は私とは違ってもいいです.ハードウェアの構成によっては時間が違います.これは全部で20秒かかります.
次に
このようにして、スレッド池でgoroutineが増加すると、プログラムの実行時間が減少することが分かります.結果を解析するために
6月04日、2018年Golang、菜鳥翻訳屋で発表されました.
http://www.hi-roy.com/2018/06/04/GO%E8%AF%AD%E8%A8%80%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0-%E 7%BC%93%E 5%86%B 2%E 5%8 C%BAChanels%E 5%92%8 C%E 7%BA%BF%E 7%E 7%E 8%E 6%B 1%A 0/?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io
原文、ここでは分かりやすいようにworker poolsをスレッド池に翻訳します.
バッファーチャンネルとは何ですか?
以前に検討したチャンネルは全部バッファがないので、読み取りと書き込みはブロックされます.バッファがあるチャンネルを作成することも可能です.このチャンネルはバッファが満杯になってから、空いているチャンネルを書き込みまたは読み出した時だけ閉塞されます.
バッファがあるチャンネルを作成するには、バッファサイズを示すために追加のパラメータ容量が必要です.
1
ch := make(chan type, capacity)
上のコードのcapacityは0より大きい必要があります.0に等しいなら、前に勉強した無バッファチャンネルチャンネルです.
例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch
上記の例では、2つの容量のチャンネルを作成しましたので、2つの文字列を書き込む前の書き込みは滞りません.その後、それぞれ12、13行で読み取り、プログラム出力は以下の通りです.naveen
paul
もう一つの例
私たちはもう一つの例を見てみます.同時に実行するgoroutineで書き込み操作をして、main goroutineで読みます.この例はバッファチャンネルチャンネルチャンネルをよりよく理解するように助けてくれます.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main
import (
"fmt"
"time"
)
func write(ch chan int) {
for i := 0; i < 5; i++ {
ch
上のコードは、容量が2のバッファチャネルを作成し、それをパラメータとしてwrite
関数に伝達し、次にsleep 2秒を実行します.同時にwrite
関数同時実行は、関数にfor
を使用して、ch
に0−4に書き込まれる.容量は2ですので、すぐにチャンネルに0と1を書き込み、ブロックして少なくとも1つの値が読み取られるのを待つことができます.プログラムはすぐに次の2行を出力します.successfully wrote 0 to ch
successfully wrote 1 to ch
メイン関数でsleep 2秒後、
for range
ループに入ってデータの読み出しを開始し、その後sleep 2秒を継続する.プログラムは次に出力されます.read value 0 from ch
successfully wrote 2 to ch
このように循環してチャンネルが閉鎖されるまで、プログラムの最終出力は以下の通りです.
successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch
デッド・ロック1
2
3
4
5
6
7
8
9
10
11
12
13
14
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 2)
ch
上のプログラムは、容量が2のチャンネルに3文字列を書きたいです.プログラムは11行まで実行するとブロックされます.もうチャンネルのバッファはいっぱいです.他のgoroutineからデータを読み込まないとプログラムがロックされます.エラーは以下の通りです1
2
3
4
5
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/tmp/sandbox274756028/main.go:11 +0x100
長さと容量
容量とは、バッファがあるチャンネルが最大で同時にどれぐらいのデータを記憶できるかを指します.この値は
make
キーワードを使って、チャンネルを作成する時に使います.長さとは、現在のチャンネルにどれぐらいのデータが格納されているかをいう.私たちは下のコードを見ます.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package main
import (
"fmt"
)
func main() {
ch := make(chan string, 3)
ch
上のコードの中で、容量が3のチャンネルを作って、中に2文字列を書きます.だから、今のチャンネルの長さは2です.次にチャンネルから文字列を1つ読みますので、現在の長さは1です.プログラム出力は以下の通りです.
capacity is 3
length is 2
read value naveen
new length is 1
WaitGroup
次のセクションでは、スレッド池を紹介します.より良い理解のためには、
WaitGroup
を先に紹介してから、これに基づいてスレッド池を実現します.WaitGroupはGoroutineのグループが全部実行されるのを待つために使用されます.その前にプログラムがブロックされます.もし私たちが3つのgoroutineを持っていたら、メインプログラムはこの3つのgoroutineが全部実行されてから終了します.コードを見て多く言わない:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended
", i)
wg.Done()
}
func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
WaitGroupはstructタイプで、18行でデフォルトのWaitGroupを作成しました.内部はカウンタに基づいて実現されます.
Add
方法を呼び出し、パラメータとしてその数値を伝え、カウンタは着信パラメータの値を増加させる.Done
メソッドを呼び出すと、カウンタは1を減算します.Wait
方法はカウンタがゼロになるまでgoroutineをブロックします.上記のコードは、ループ中に
wg.Add(1)
を呼び出してカウンタを3にし、同時に3つのgoroutineを起動し、カウンタがゼロになるまでwg.Wait()
で主goroutineをブロックします.関数process
では、カウンタを低減するためにwg.Done()
を呼び出すと、3つのgoroutineが実行されると、wg.Done()
は3回実行され、カウンタはゼロになり、主goroutineはブロックを解除する.wg
のアドレスをgoroutineに渡すことは非常に重要です.転送がアドレスではない場合、各goroutineにはコピーがあります.これで各goroutineが終了するとmain
関数に通知できません.プログラム出力は以下の通りです.
started Goroutine 2
started Goroutine 0
started Goroutine 1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
All go routines finished executing
あなたの出力結果は上とは少し違っているかもしれません.
スレッド池(worker pools)
バッファチャンネルの重要な使い方はスレッド池を実現することです.
通常、スレッド池とは、スレッドのセットが彼らに割り当てられるのを待っています.タスクが完了すると、次のタスクが待ち続けられます.
次に、入力デジタルの各ビットの和を計算するスレッドプールを実現する.たとえば入力123は9(1+2+3)を返し、スレッドに入力された数字は疑似ランダムアルゴリズムで生成される.
以下は私たちが必要なコアステップです.
1
2
3
4
5
6
7
8
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
各
Job
は、計算されるべき乱数を記憶するためにid
とrandomno
とを有する.Result
タイプは、結果を記憶するJob
属性とsumofdigits
を含む.次にバッファチャンネルを作成してタスクと結果を受信します.
1
2
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
goroutineは、jobsからタスクを取得し、結果をresultに書き込みます.
以下の
digits
関数は、合計を計算し、結果を返すために用いられ、Sleep
によって、消費時間の計算をシミュレーションする.1
2
3
4
5
6
7
8
9
10
11
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
次の関数はgoroutineを作成します.1
2
3
4
5
6
7
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results
jobs
のジョブを読み出してResult
構造を作成し、関数digits
が計算した結果を記憶し、results
のこのチャンネルに書き込む.この関数は、WaitGroupタイプのポインタパラメータwg
を受信し、計算が完了したらwg.Done()
を呼び出す.createWorkerPool
この関数は、スレッドプールを作成するために使用されます.1
2
3
4
5
6
7
8
9
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
この関数は、
noOfWorkers
個のgoroutineを含むスレッド池を作成し、goroutineを作成する前にwg.Add(1)
を呼び出してカウンタを増加させ、wg
のアドレスをworker
の関数に渡す.作成が完了したら、wg.Wait()
を使用して、すべてのgoroutineが実行されるのを待って、close
関数を呼び出してresults
のこのチャンネルをオフにします.これで、goroutineはデータに書き込むことができなくなります.次に、関数を作成してスレッド池にタスクを割り当てます.
1
2
3
4
5
6
7
8
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs
上記の関数は、着信パラメータによって書き込みのジョブ数を決定し、乱数の最大値は
998
であり、ループ中のカウンタi
をIDとしてjob
構造を作成し、jobs
に書き込み、完了したらjobs
をオフする.次に関数を作成して
results
のこのチャンネルを読み出し、出力を印刷する.1
2
3
4
5
6
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d
", result.job.id, result.job.randomno, result.sumofdigits)
}
done
上記の関数は
results
を読み、ID、乱数、結果をプリントし、最後にdone
のこのchanelにデータを書き込み、全ての結果がプリントされたことを示している.すべてのものがそろっています.
main
関数を完成させてください.1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
まずプログラムの実行開始時間を記録して、最後に終了時間から開始時間を差し引いてプログラムを実行します.時々、この時々に異なる数のスレッド池の違いを比較する必要があります.
done
という名前のチャンネルを作成し、result
関数に渡すことで、出力をプリントアウトし、すべての出力を完了した後に通知することができます.最後に10個のgoroutineのスレッドプールが作成され、
done
を読み取ることによって計算が完了するのを待つ.完全コードは以下の通りです.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results
運転結果は以下の通りです.
Job id 1, input random no 636, sum of digits 15
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken 20.01081009 seconds
プログラムは100行の出力があります.100個のjobを作成したので、あなたの出力順序は私とは違ってもいいです.ハードウェアの構成によっては時間が違います.これは全部で20秒かかります.
次に
noOfWorkers
から20を上げて、スレッド池の中のgoroutineの数を高めました.運行時間は絶対に減らすべきです.私のマシンでは、プログラム出力は以下の通りです....
total time taken 10.004364685 seconds
このようにして、スレッド池でgoroutineが増加すると、プログラムの実行時間が減少することが分かります.結果を解析するために
main
のnoOfJobs
およびnoOfWorkers
の値を任意に調整することができる.