如何从 ruby 中实时创建的新线程异步收集结果 [英] How to asynchronously collect results from new threads created in real time in ruby
问题描述
我想不断检查数据库中的表以运行命令.有些命令可能需要 4 分钟才能完成,有些则需要 10 秒.
I would like to continously check the table in the DB for the commands to run. Some commands might take 4minutes to complete, some 10 seconds.
因此我想在线程中运行它们.所以每条记录都会创建一个新线程,创建线程后,记录被删除.
Hence I would like to run them in threads. So every record creates new thread, and after thread is created, record gets removed.
因为数据库查找 + 线程创建将无限循环运行,我如何从线程获取响应"(线程将发出 shell 命令并获取我想要读取的响应代码)?
Because the DB lookup + Thread creation will run in an endless loop, how do I get the 'response' from the Thread (thread will issue shell command and get response code which I would like to read) ?
我想过创建两个无限循环的线程:- 首先用于数据库查找 + 创建新线程- 第二个......以某种方式读取线程结果并对每个响应采取行动
I thought about creating two Threads with endless loop each: - first for DB lookups + creating new threads - second for ...somehow reading the threads results and acting upon each response
或者也许我应该使用 fork,或者 os spawn 一个新进程?
Or maybe I should use fork, or os spawn a new process?
推荐答案
您可以让每个线程将其结果推送到队列中,然后您的主线程可以从队列中读取.默认情况下,从队列中读取是一个阻塞操作,因此如果没有结果,您的代码将阻塞并等待读取.
You can have each thread push its results onto a Queue, then your main thread can read from the Queue. Reading from a Queue is a blocking operation by default, so if there are no results, your code will block and wait on the read.
http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html
这是一个例子:
require 'thread'
jobs = Queue.new
results = Queue.new
thread_pool = []
pool_size = 5
(1..pool_size).each do |i|
thread_pool << Thread.new do
loop do
job = jobs.shift #blocks waiting for a task
break if job == "!NO-MORE-JOBS!"
#Otherwise, do job...
puts "#{i}...."
sleep rand(1..5) #Simulate the time it takes to do a job
results << "thread#{i} finished #{job}" #Push some result from the job onto the Queue
#Go back and get another task from the Queue
end
end
end
#All threads are now blocking waiting for a job...
puts 'db_stuff'
db_stuff = [
'job1',
'job2',
'job3',
'job4',
'job5',
'job6',
'job7',
]
db_stuff.each do |job|
jobs << job
end
#Threads are now attacking the Queue like hungry dogs.
pool_size.times do
jobs << "!NO-MORE-JOBS!"
end
result_count = 0
loop do
result = results.shift
puts "result: #{result}"
result_count +=1
break if result_count == 7
end
这篇关于如何从 ruby 中实时创建的新线程异步收集结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!