Ruby中的线程安全枚举器 [英] Thread safe Enumerator in Ruby

查看:78
本文介绍了Ruby中的线程安全枚举器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Ruby On Rails应用程序中有一个想要同时运行的方法.该方法应创建一个包含站点报告的zip文件,其中zip文件中的每个文件均为PDF.从html到PDF的转换有些慢,因此需要多线程.

I have a method in a Ruby On Rails application that I wanted to run concurrently. The method is supposed to create a zip file containing reports from the site, where each file in the zip is a PDF. The conversion from html to PDF is somewhat slow, thus the desire to multi-thread.

我想使用5个线程,所以我想在线程之间会有一个共享的Enumerator.每个线程都会从枚举器中弹出一个值,并对它运行操作.这是我一直认为会起作用的方式:

I wanted to use 5 threads, so I figured I would have a shared Enumerator between the threads. Each thread would pop a value from the Enumerator, and run do stuff to it. Here's how I was thinking it would work:

t = Zip::OutputStream::write_buffer do |z|
  mutex = Mutex.new
  gen = Enumerator.new{ |g|
    Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}).find_each do |report|
      g.yield report
    end
  }
  5.times.map {
    Thread.new do
      begin
        loop do
          mutex.synchronize  do
            @report = gen.next
          end
          title = @report.title + "_" + @report.id.to_s
          title += ".pdf" unless title.end_with?(".pdf")
          pdf = PDFKit.new(render_to_string(:template => partial_url, locals: {array: [@report]},
                                            :layout => false)).to_pdf
          mutex.synchronize  do
            z.put_next_entry(title)
            z.write(pdf)
          end
        end
      rescue StopIteration
        # do nothing
      end
    end
  }.each {|thread| thread.join }
end

当我尝试时发生了什么:

运行上面的代码时,出现以下错误:

What happened when I tried it:

When I ran the above code, I got the following error:

FiberError at /generate_report
fiber called across threads

经过一番搜索,我发现了这篇文章,其中建议我使用队列而不是枚举器,因为队列是线程安全的,而枚举器则不是.尽管这对于非Rails应用程序可能是合理的,但对我而言这是不切实际的.

After some searching, I came across this post, which recommended that I use a Queue instead of an Enumerator, because Queues are thread safe, while Enumerators are not. While this might be reasonable for non-Rails applications, this is impractical for me.

关于Rails 4 ActiveRecord的好处是它不会加载查询,直到对它们进行迭代为止.而且,如果使用类似find_each的方法对其进行迭代,则它会以1000为批次进行处理,因此您不必一次将整个表都存储在ram中.我正在使用的查询结果:Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]})大.很大.而且我需要能够即时加载它,而不是执行类似的操作:

The nice thing about Rails 4 ActiveRecord is that it doesn't load queries until they are iterated over. And, if you use a method like find_each to iterate over it, it does it in batches of 1000, so you never have to store an entire table in ram all at once. The results from query I'm using: Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}) is large. Very large. And I need to be able to load it on the fly, rather than doing something like:

gen = Report.all.includes("employee" => ["boss", "client"], "projects" => {"project_owner" => ["project_team"]}).map(&queue.method(:push))

哪个会将整个查询加载到ram中.

Which will load that entire query into ram.

是否有执行此操作的线程安全方式:

Is there a thread-safe way of doing this:

gen = Enumerator.new{ |g|
        Report.all.includes(...).find_each do |report|
          g.yield report
        end
}

这样我可以跨多个线程从gen中弹出数据,而不必将我的整个Report(以及所有包含)表加载到ram中?

So that I can pop data from gen across multiple threads, without having to load my entire Report (and all of the includes) table into ram?

推荐答案

如果在填充队列之前启动工作线程,则在填充队列时它们将开始消耗该队列,并且由于经验法则-网络比CPU慢,每个批次(大部分)应在下一批到达时消耗掉:

If you start the worker threads before filling up the queue, they will start consuming the queue as you fill it up, and because as a rule of thumb - network is slower than CPU, each batch should be (mostly) consumed by the time the next batch arrives:

queue = Queue.new

t1 = Thread.new do
  while !queue.empty?
    p queue.pop(true)
    sleep(0.1)
  end
end
t2 = Thread.new do
  while !queue.empty?
    p queue.pop(true)
    sleep(0.1)
  end
end

(0..1000).map(&queue.method(:push))

t1.join
t2.join

如果仍然证明速度太慢,则可以选择使用

If that proves too slow still, you can opt to use SizedQueue, which will block the push if the queue reaches a big enough size:

queue = SizedQueue.new(100)

t1 = Thread.new do
  while !queue.empty?
    p "#{queue.pop(true)} - #{queue.size}"
    sleep(0.1)
  end
end
t2 = Thread.new do
  while !queue.empty?
    p queue.pop(true)
    sleep(0.1)
  end
end
(0..300).map(&queue.method(:push))
t1.join
t2.join

这篇关于Ruby中的线程安全枚举器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆