golangで同時作業キューを作成する
4694 ワード
実はgolangは1つの関数で1つの同時キューを構築することができて、私の前のblogを見て、しかしその機能はまだ強くなくて、今1つの柔軟で制御可能なキュープログラムを編纂して先に1つの仕事を定義します
ワークIDとタスクを実行するidを含んで、上のSMはタスクの具体的な内容だけで、これは具体的な業務と関連して、みんなは自分で自分のSM業務のロジックを編纂します
次にワークプールを定義します
ここでは、ワークキューのスライスを定義し、ワークキューの個数をカスタマイズしたり、後でワークを追加したり、キュータイプのパイプを定義したりすることができます.定義が完了すると、ワークプールを初期化できます.
この中で私がworkerを死んだ個数は3で、もちろんこれはプロファイルやパラメータを読み取ることで伝えることができます.この中でワークを1つずつ起動しますスタート()、これがポイント
これはgoが1つの協程を起動して、まず自分をworkerChanの中に置いて、それから絶えずw.RepJobsパイプの中から任務を取得して実行して、もし実行が完成したらまた自分をキューに戻します.タスクを実行する必要がある場合は、このパイプに入れればいいです.
パイプからworkerを取り出し、タスクidをworkerに配置して実行します.もちろんworkerを止めたりjobを止めたりすることができます
補足ですが、int 64と文字列変換です.stringからintまで、err:=strconv.Atoi(string)stringからint 64 int 64,err:=strconv.ParseInt(string,10,64)intからstring string:=strconv.Itoa(int)int 64からstring string:=strconv.FormatInt(int64,10)
type Worker struct {
ID int
RepJobs chan int64
SM *SM
quit chan bool
}
ワークIDとタスクを実行するidを含んで、上のSMはタスクの具体的な内容だけで、これは具体的な業務と関連して、みんなは自分で自分のSM業務のロジックを編纂します
次にワークプールを定義します
type workerPool struct {
workerChan chan *Worker
workerList []*Worker
}
ここでは、ワークキューのスライスを定義し、ワークキューの個数をカスタマイズしたり、後でワークを追加したり、キュータイプのパイプを定義したりすることができます.定義が完了すると、ワークプールを初期化できます.
func InitWorkerPool() error {
n := 3
WorkerPool = &workerPool{
workerChan: make(chan *Worker, n),
workerList: make([]*Worker, 0, n),
}
for i := 0; i < n; i++ {
worker := NewWorker(i)
WorkerPool.workerList = append(WorkerPool.workerList, worker)
worker.Start()
log.Debugf("worker %d started", worker.ID)
}
return nil
}
この中で私がworkerを死んだ個数は3で、もちろんこれはプロファイルやパラメータを読み取ることで伝えることができます.この中でワークを1つずつ起動しますスタート()、これがポイント
func (w *Worker) Start() {
go func() {
for {
WorkerPool.workerChan select {
case jobID := log.Debugf("worker: %d, will handle job: %d", w.ID, jobID)
w.handleRepJob(jobID)
case q := if q {
log.Debugf("worker: %d, will stop.", w.ID)
return
}
}
}
}()
}
これはgoが1つの協程を起動して、まず自分をworkerChanの中に置いて、それから絶えずw.RepJobsパイプの中から任務を取得して実行して、もし実行が完成したらまた自分をキューに戻します.タスクを実行する必要がある場合は、このパイプに入れればいいです.
func Dispatch() {
for {
select {
case job := go func(jobID int64) {
println("Trying to dispatch job: %d", jobID)
worker :=
パイプからworkerを取り出し、タスクidをworkerに配置して実行します.もちろんworkerを止めたりjobを止めたりすることができます
func (w *Worker) Stop() {
go func() {
w.quit true
}()
}
func (wp *workerPool) StopJobs(jobs []int64) {
log.Debugf("Works working on jobs: %v will be stopped", jobs)
for _, id := range jobs {
for _, w := range wp.workerList {
if w.SM.JobID == id {
log.Debugf("found a worker whose job ID is %d, will try to stop it", id)
w.SM.Stop(id)
}
}
}
}
補足ですが、int 64と文字列変換です.stringからintまで、err:=strconv.Atoi(string)stringからint 64 int 64,err:=strconv.ParseInt(string,10,64)intからstring string:=strconv.Itoa(int)int 64からstring string:=strconv.FormatInt(int64,10)