如何从 ruby​​ 中实时创建的新线程异步收集结果 [英] How to asynchronously collect results from new threads created in real time in ruby

查看:62
本文介绍了如何从 ruby​​ 中实时创建的新线程异步收集结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想不断检查数据库中的表以运行命令.有些命令可能需要 4 分钟才能完成,有些则需要 10 秒.

I would like to continously check the table in the DB for the commands to run. Some commands might take 4minutes to complete, some 10 seconds.

因此我想在线程中运行它们.所以每条记录都会创建一个新线程,创建线程后,记录被删除.

Hence I would like to run them in threads. So every record creates new thread, and after thread is created, record gets removed.

因为数据库查找 + 线程创建将无限循环运行,我如何从线程获取响应"(线程将发出 shell 命令并获取我想要读取的响应代码)?

Because the DB lookup + Thread creation will run in an endless loop, how do I get the 'response' from the Thread (thread will issue shell command and get response code which I would like to read) ?

我想过创建两个无限循环的线程:- 首先用于数据库查找 + 创建新线程- 第二个......以某种方式读取线程结果并对每个响应采取行动

I thought about creating two Threads with endless loop each: - first for DB lookups + creating new threads - second for ...somehow reading the threads results and acting upon each response

或者也许我应该使用 fork,或者 os spawn 一个新进程?

Or maybe I should use fork, or os spawn a new process?

推荐答案

您可以让每个线程将其结果推送到队列中,然后您的主线程可以从队列中读取.默认情况下,从队列中读取是一个阻塞操作,因此如果没有结果,您的代码将阻塞并等待读取.

You can have each thread push its results onto a Queue, then your main thread can read from the Queue. Reading from a Queue is a blocking operation by default, so if there are no results, your code will block and wait on the read.

http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html

这是一个例子:

require 'thread'

jobs = Queue.new
results = Queue.new

thread_pool = []
pool_size = 5

(1..pool_size).each do |i|
  thread_pool << Thread.new do 
    loop do 
      job = jobs.shift #blocks waiting for a task
      break if job == "!NO-MORE-JOBS!"

      #Otherwise, do job...
      puts "#{i}...."
      sleep rand(1..5) #Simulate the time it takes to do a job
      results << "thread#{i} finished #{job}"  #Push some result from the job onto the Queue
      #Go back and get another task from the Queue
    end
  end
end


#All threads are now blocking waiting for a job...
puts 'db_stuff'
db_stuff = [
  'job1', 
  'job2', 
  'job3', 
  'job4', 
  'job5',
  'job6',
  'job7',
]

db_stuff.each do |job|
  jobs << job
end

#Threads are now attacking the Queue like hungry dogs.

pool_size.times do
  jobs << "!NO-MORE-JOBS!"
end

result_count = 0

loop do
  result = results.shift
  puts "result: #{result}"
  result_count +=1
  break if result_count == 7
end

这篇关于如何从 ruby​​ 中实时创建的新线程异步收集结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆