Go同時プログラミング
2745 ワード
マルチスレッド数
マルチスレッド制御
生産消費
サブスクリプションの発行
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type Number struct {
count int64
mutex sync.Mutex
}
var number Number
//
func Worker(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 10000; i++ {
number.mutex.Lock()
number.count += 1
number.mutex.Unlock()
}
}
// go
func Worker2(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 10000; i++ {
atomic.AddInt64(&number.count, 1)
}
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go Worker2(&wg)
go Worker2(&wg)
wg.Wait()
fmt.Println(number.count)
}
マルチスレッド制御
package main
func main() {
ch := make(chan bool)
go func() {
println("hello , world")
ch
生産消費
package main
import (
"fmt"
"time"
)
// : factor
func Producer(factor int, out chan
サブスクリプションの発行
// Package pubsub implements a simple multi-topic pub-sub library.
package main
import (
"fmt"
"strings"
"sync"
"time"
)
type (
subscriber chan interface{} //
topicFunc func(v interface{}) bool //
)
//
type Publisher struct {
m sync.RWMutex //
buffer int //
timeout time.Duration //
subscribers map[subscriber]topicFunc //
}
// ,
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
// ,
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// ,
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
//
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
defer p.m.Unlock()
delete(p.subscribers, sub)
close(sub)
}
//
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
defer p.m.RUnlock()
var wg sync.WaitGroup
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, &wg)
}
wg.Wait()
}
// , 。
func (p *Publisher) Close() {
p.m.Lock()
defer p.m.Unlock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
}
// ,
func (p *Publisher) sendTopic(
sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup,
) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
select {
case sub