使用协程和线程时的吞吐量差异 [英] Throughput differences when using coroutines vs threading

查看:88
本文介绍了使用协程和线程时的吞吐量差异的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

几天前,我问了一个关于SO的问题,有关如何帮助我设计用于构造多个HTTP请求的范例

A few days ago I has asked a question on SO about helping me design a paradigm for structuring multiple HTTP requests

这是场景.我想要一个多生产者,多消费者的系统.我的生产者抓取并抓取了一些站点,并将其找到的链接添加到队列中.由于我将爬网多个站点,因此我希望有多个生产者/爬网者.

Here's the scenario. I would like a have a multi-producer, multi-consumer system. My producers crawl and scrape a few sites and add the links that it finds into a queue. Since I'll be crawling multiple sites, I would like to have multiple producers/crawlers.

使用者/工作人员从此队列中取出,对这些链接发出TCP/UDP请求,并将结果保存到我的Django DB中.我还希望有多个工作人员,因为每个队列项目都是彼此独立的.

The consumers/workers feed off this queue, make TCP/UDP requests to these links and saves the results to my Django DB. I would also like to have multiple-workers as each queue item is totally independent of each other.

人们建议为此使用协程库,即Gevent或Eventlet.从未使用过协程,我读到即使编程范例与线程范例相似,但只有一个线程正在主动执行,但是当发生阻塞调用(例如I/O调用)时,堆栈将在内存中切换,而其他则变为绿色线程接管直到遇到某种阻塞的I/O调用.希望我做对了吗?这是我的一篇SO帖子中的代码:

People suggested that use a coroutine library for this i.e. Gevent or Eventlet. Having never worked with coroutines, I read that even though the programming paradigm is similar to threaded paradigms, only one thread is actively executing but when blocking calls occur - such as I/O calls - the stacks are switched in-memory and the other green thread takes over until it encounters some sort of a blocking I/O call. Hopefully I got this right? Here's the code from one of my SO posts:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []


def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid


def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)


for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

# This doesn't work.
for j in range(2):
    producers.append(gevent.spawn(producer))

# Uncommenting this makes this script work.
# producer()

q.join()

这很好用,因为sleep调用阻塞了调用,并且当发生sleep事件时,另一个绿色线程接管了该操作.这比顺序执行要快得多. 如您所见,我的程序中没有任何代码有意让一个线程执行到另一个线程.我想看到所有线程同时执行,所以看不到如何适应上述情况.

This works well because the sleep calls are blocking calls and when a sleep event occurs, another green thread takes over. This is a lot faster than sequential execution. As you can see, I don't have any code in my program that purposely yields the execution of one thread to another thread. I fail to see how this fits into scenario above as I would like to have all the threads executing simultaneously.

一切正常,但是我觉得使用Gevent/Eventlets获得的吞吐量高于原始顺序运行的程序,但大大低于使用实线程实现的吞吐量.

All works fine, but I feel the throughput that I've achieved using Gevent/Eventlets is higher than the original sequentially running program but drastically lower than what could be achieved using real-threading.

如果我要使用线程机制重新实现程序,那么我的每个生产者和消费者都可以同时工作,而无需像协程那样换入和换出栈.

If I were to re-implement my program using threading mechanisms, each of my producers and consumers could simultaneously be working without the need to swap stacks in and out like coroutines.

是否应该使用线程重新实现?我的设计错了吗?我没有看到使用协程的真正好处.

Should this be re-implemented using threading? Is my design wrong? I've failed to see the real benefits of using coroutines.

也许我的概念有点混乱,但这就是我所吸收的.对我的范式和概念的任何帮助或澄清都是很好的.

Maybe my concepts are little muddy but this is what I've assimilated. Any help or clarification of my paradigm and concepts would be great.

谢谢

推荐答案

如您所见,我的程序中没有故意存在的任何代码 产生一个线程到另一个线程的执行.我看不到 这如何适合上述情况,因为我想拥有所有 线程同时执行.

As you can see, I don't have any code in my program that purposely yields the execution of one thread to another thread. I fail to see how this fits into scenario above as I would like to have all the threads executing simultaneously.

只有一个OS线程,但是有几个greenlet.在您的情况下,gevent.sleep()允许工作程序同时执行.如果您使用修补后的urllib2gevent一起使用(通过调用gevent.monkey.patch_*()),则阻塞urllib2.urlopen(url).read()之类的IO调用也将执行相同的操作.

There is a single OS thread but several greenlets. In your case gevent.sleep() allows workers to execute concurrently. Blocking IO calls such as urllib2.urlopen(url).read() do the same if you use urllib2 patched to work with gevent (by calling gevent.monkey.patch_*()).

另请参阅有关协程和并发的好奇课程,以了解代码如何工作同时在单线程环境中.

See also A Curious Course on Coroutines and Concurrency to understand how a code can work concurrently in a single threaded environment.

要比较gevent,线程和多处理之间的吞吐量差异,您可以编写与所有方法兼容的代码:

To compare throughput differences between gevent, threading, multiprocessing you could write the code that compatible with all aproaches:

#!/usr/bin/env python
concurrency_impl = 'gevent' # single process, single thread
##concurrency_impl = 'threading' # single process, multiple threads
##concurrency_impl = 'multiprocessing' # multiple processes

if concurrency_impl == 'gevent':
    import gevent.monkey; gevent.monkey.patch_all()

import logging
import time
import random
from itertools import count, islice

info = logging.info

if concurrency_impl in ['gevent', 'threading']:
    from Queue import Queue as JoinableQueue
    from threading import Thread
if concurrency_impl == 'multiprocessing':
    from multiprocessing import Process as Thread, JoinableQueue

对于所有并发实现,其余脚本都是相同的:

The rest of the script is the same for all concurrency implementations:

def do_work(wid, value):
    time.sleep(random.randint(0,2))
    info("%d Task %s done" % (wid, value))

def worker(wid, q):
    while True:
        item = q.get()
        try:
            info("%d Got item %s" % (wid, item))
            do_work(wid, item)
        finally:
            q.task_done()
            info("%d Done item %s" % (wid, item))

def producer(pid, q):
    for item in iter(lambda: random.randint(1, 11), 10):
        time.sleep(.1) # simulate a green blocking call that yields control
        info("%d Added item %s" % (pid, item))
        q.put(item)
    info("%d Signal Received" % (pid,))

不要在模块级别执行代码,而是将其放入main():

Don't execute code at a module level put it in main():

def main():
    logging.basicConfig(level=logging.INFO,
                        format="%(asctime)s %(process)d %(message)s")

    q = JoinableQueue()
    it = count(1)
    producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)]
    workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)]
    for t in producers+workers:
        t.daemon = True
        t.start()

    for t in producers: t.join() # put items in the queue
    q.join() # wait while it is empty
    # exit main thread (daemon workers die at this point)

if __name__=="__main__":    
   main()

这篇关于使用协程和线程时的吞吐量差异的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆