是否有一些优雅的方式来暂停和恢复其他goroutine? [英] Is there some elegant way to pause and resume any other goroutine?

查看:42
本文介绍了是否有一些优雅的方式来暂停和恢复其他goroutine?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

就我而言,我有成千上万个goroutine作为 work()同时工作.我也有一个 sync() goroutine.当 sync 开始时,我需要任何其他goroutine在同步作业完成后暂停一会儿.这是我的代码:

In my case, I have thousands of goroutines working simultaneously as work(). I also had a sync() goroutine. When sync starts, I need any other goroutine to pause for a while after sync job is done. Here is my code:

var channels []chan int
var channels_mutex sync.Mutex

func work() {
  channel := make(chan int, 1)
  channels_mutex.Lock()  
  channels = append(channels, channel)
  channels_mutex.Unlock()
  for {
    for {
      sync_stat := <- channel // blocked here
      if sync_stat == 0 { // if sync complete
        break  
      }
    }
    // Do some jobs
    if (some condition) {
      return
    }
  }
}

func sync() {
  channels_mutex.Lock()
  // do some sync

  for int i := 0; i != len(channels); i++ {
    channels[i] <- 0
  }
  channels_mutex.Unlock()
}

现在的问题是,由于<-始终在读取时处于阻塞状态,因此每次进入 sync_stat:=<-channel 都处于阻塞状态.我知道该频道是否已关闭它不会被阻止,但是由于我必须使用此频道,直到 work()退出,而且我找不到任何方法来重新打开已关闭的频道.

Now the problem is, since <- is always blocking on read, every time goes to sync_stat := <- channel is blocking. I know if the channel was closed it won't be blocked, but since I have to use this channel until work() exits, and I didn't find any way to reopen a closed channel.

我怀疑自己走错了路,对您的帮助将不胜感激.是否有一些优雅"的形象?暂停和恢复其他goroutine的方法吗?

I suspect myself on a wrong way, so any help is appreciated. Is there some "elegant" way to pause and resume any other goroutine?

推荐答案

如果我对您的理解正确,那么您需要N个工作人员和一个控制器,该控制器可以随意暂停,恢复和停止工作人员.下面的代码就可以做到这一点.

If I understand you correctly, you want N number of workers and one controller, which can pause, resume and stop the workers at will. The following code will do just that.

package main

import (
    "fmt"
    "runtime"
    "sync"
)

// Possible worker states.
const (
    Stopped = 0
    Paused  = 1
    Running = 2
)

// Maximum number of workers.
const WorkerCount = 1000

func main() {
    // Launch workers.
    var wg sync.WaitGroup
    wg.Add(WorkerCount + 1)

    workers := make([]chan int, WorkerCount)
    for i := range workers {
        workers[i] = make(chan int, 1)

        go func(i int) {
            worker(i, workers[i])
            wg.Done()
        }(i)
    }

    // Launch controller routine.
    go func() {
        controller(workers)
        wg.Done()
    }()

    // Wait for all goroutines to finish.
    wg.Wait()
}

func worker(id int, ws <-chan int) {
    state := Paused // Begin in the paused state.

    for {
        select {
        case state = <-ws:
            switch state {
            case Stopped:
                fmt.Printf("Worker %d: Stopped\n", id)
                return
            case Running:
                fmt.Printf("Worker %d: Running\n", id)
            case Paused:
                fmt.Printf("Worker %d: Paused\n", id)
            }

        default:
            // We use runtime.Gosched() to prevent a deadlock in this case.
            // It will not be needed of work is performed here which yields
            // to the scheduler.
            runtime.Gosched()

            if state == Paused {
                break
            }

            // Do actual work here.
        }
    }
}

// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
    // Start workers
    setState(workers, Running)

    // Pause workers.
    setState(workers, Paused)

    // Unpause workers.
    setState(workers, Running)

    // Shutdown workers.
    setState(workers, Stopped)
}

// setState changes the state of all given workers.
func setState(workers []chan int, state int) {
    for _, w := range workers {
        w <- state
    }
}

这篇关于是否有一些优雅的方式来暂停和恢复其他goroutine?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆