是否有一些优雅的方式来暂停和恢复其他goroutine? [英] Is there some elegant way to pause and resume any other goroutine?
问题描述
就我而言,我有成千上万个goroutine作为 work()
同时工作.我也有一个 sync()
goroutine.当 sync
开始时,我需要任何其他goroutine在同步作业完成后暂停一会儿.这是我的代码:
In my case, I have thousands of goroutines working simultaneously as work()
. I also had a sync()
goroutine. When sync
starts, I need any other goroutine to pause for a while after sync job is done. Here is my code:
var channels []chan int
var channels_mutex sync.Mutex
func work() {
channel := make(chan int, 1)
channels_mutex.Lock()
channels = append(channels, channel)
channels_mutex.Unlock()
for {
for {
sync_stat := <- channel // blocked here
if sync_stat == 0 { // if sync complete
break
}
}
// Do some jobs
if (some condition) {
return
}
}
}
func sync() {
channels_mutex.Lock()
// do some sync
for int i := 0; i != len(channels); i++ {
channels[i] <- 0
}
channels_mutex.Unlock()
}
现在的问题是,由于<-
始终在读取时处于阻塞状态,因此每次进入 sync_stat:=<-channel
都处于阻塞状态.我知道该频道是否已关闭它不会被阻止,但是由于我必须使用此频道,直到 work()
退出,而且我找不到任何方法来重新打开已关闭的频道.
Now the problem is, since <-
is always blocking on read, every time goes to sync_stat := <- channel
is blocking. I know if the channel was closed it won't be blocked, but since I have to use this channel until work()
exits, and I didn't find any way to reopen a closed channel.
我怀疑自己走错了路,对您的帮助将不胜感激.是否有一些优雅"的形象?暂停和恢复其他goroutine的方法吗?
I suspect myself on a wrong way, so any help is appreciated. Is there some "elegant" way to pause and resume any other goroutine?
推荐答案
如果我对您的理解正确,那么您需要N个工作人员和一个控制器,该控制器可以随意暂停,恢复和停止工作人员.下面的代码就可以做到这一点.
If I understand you correctly, you want N number of workers and one controller, which can pause, resume and stop the workers at will. The following code will do just that.
package main
import (
"fmt"
"runtime"
"sync"
)
// Possible worker states.
const (
Stopped = 0
Paused = 1
Running = 2
)
// Maximum number of workers.
const WorkerCount = 1000
func main() {
// Launch workers.
var wg sync.WaitGroup
wg.Add(WorkerCount + 1)
workers := make([]chan int, WorkerCount)
for i := range workers {
workers[i] = make(chan int, 1)
go func(i int) {
worker(i, workers[i])
wg.Done()
}(i)
}
// Launch controller routine.
go func() {
controller(workers)
wg.Done()
}()
// Wait for all goroutines to finish.
wg.Wait()
}
func worker(id int, ws <-chan int) {
state := Paused // Begin in the paused state.
for {
select {
case state = <-ws:
switch state {
case Stopped:
fmt.Printf("Worker %d: Stopped\n", id)
return
case Running:
fmt.Printf("Worker %d: Running\n", id)
case Paused:
fmt.Printf("Worker %d: Paused\n", id)
}
default:
// We use runtime.Gosched() to prevent a deadlock in this case.
// It will not be needed of work is performed here which yields
// to the scheduler.
runtime.Gosched()
if state == Paused {
break
}
// Do actual work here.
}
}
}
// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
// Start workers
setState(workers, Running)
// Pause workers.
setState(workers, Paused)
// Unpause workers.
setState(workers, Running)
// Shutdown workers.
setState(workers, Stopped)
}
// setState changes the state of all given workers.
func setState(workers []chan int, state int) {
for _, w := range workers {
w <- state
}
}
这篇关于是否有一些优雅的方式来暂停和恢复其他goroutine?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!