带有线程安全的python线程队列生产者-消费者 [英] python threading Queue producer-consumer with thread-safe

查看:109
本文介绍了带有线程安全的python线程队列生产者-消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用线程和队列来获取url并将其存储到数据库.
我只希望一个线程来完成存储工作.
所以我写如下代码:

I am using threading and Queue to fetch url and store to database.
I just want one thread to do storing job.
so I write code as below:

import threading
import time

import Queue

site_count = 10

fetch_thread_count = 2

site_queue = Queue.Queue()
proxy_array=[]        


class FetchThread(threading.Thread):
    def __init__(self,site_queue,proxy_array):
        threading.Thread.__init__(self)
        self.site_queue = site_queue
        self.proxy_array = proxy_array
    def run(self):
        while True:
            index = self.site_queue.get()
            self.get_proxy_one_website(index)
            self.site_queue.task_done()
    def get_proxy_one_website(self,index):
        print '{0} fetched site :{1}\n'.format(self.name,index)
        self.proxy_array.append(index)


def save():
    while True:
        if site_queue.qsize() > 0:
            if len(proxy_array) > 10:
                print 'save :{0}  to database\n'.format(proxy_array.pop())

            else:
                time.sleep(1)
        elif len(proxy_array) > 0:
            print 'save :{0} to database\n'.format(proxy_array.pop())

        elif len(proxy_array) == 0:
            print 'break'
            break
        else:
            print 'continue'
            continue

def start_crawl():
    global site_count,fetch_thread_count,site_queue,proxy_array
    print 'init'
    for i in range(fetch_thread_count):
        ft = FetchThread(site_queue,proxy_array)
        ft.setDaemon(True)
        ft.start()

    print 'put site_queue'
    for i in range(site_count):
        site_queue.put(i)

    save()

    print 'start site_queue join'
    site_queue.join()
    print 'finish'

start_crawl()

执行结果:

init
put site_queue
Thread-1 fetched site :0

Thread-2 fetched site :1

Thread-1 fetched site :2

Thread-2 fetched site :3

Thread-1 fetched site :4

Thread-2 fetched site :5

Thread-1 fetched site :6

Thread-2 fetched site :7

Thread-1 fetched site :8

Thread-2 fetched site :9

save :9 to database

save :8 to database

save :7 to database

save :6 to database

save :5 to database

save :4 to database

save :3 to database

save :2 to database

save :1 to database

save :0 to database

break
start site_queue join
finish
[Finished in 1.2s]

为什么save()函数在site_queue.join()之后编写的site_queue.join()之后运行.
我还用线程函数替换了save(),但是它也不起作用.
这是否意味着我必须将proxy_array=[]更改为proxy_queue=Queue.Queue(),然后才能使用ading存储数据?
我只想要一个thead来执行此操作,并且没有其他thead可以从proxy_array获取数据,为什么要加入它呢?使用Queue看起来很奇怪.
有更好的解决方案吗?

Why save() function run after site_queue.join() which written after save().
I also have substituted save() with a thread function ,but it doesn't work too.
Does it mean I must change proxy_array=[] to proxy_queue=Queue.Queue() ,then I can use theading to store data?
I just want one thead to do this,and there is not any other theads would get data from proxy_array , why should I join it?Using Queue seems very weird.
Is there any better solusion?

更新:
我不想等到所有FetchThreads完成他们的工作.我想保存数据,但要快得多. 我希望结果如下所示(因为我使用array.pop(),所以保存0可能会在以后出现,这只是一个易于理解的示例.):

UPDATE:
I don't want to wait until all the FetchThreads complete their work.I want to save data while fethcing,it would be much faster. I want the result be something like below(Becuase I use array.pop(),so save 0 maybe appear very later,this is just a example for easily understand. ):

Thread-2 fetched site :1

Thread-1 fetched site :2

save :0 to database

Thread-2 fetched site :3

Thread-1 fetched site :4

save :2 to database

save :3 to database


Thread-2 fetched site :5

Thread-1 fetched site :6

save :4 to database
.......

某人的UPDATE2问题与以下相同:

问题:
就像我在上述情况下所说的那样,没有其他主题可以从proxy_array获取数据.
我只是无法想象为什么会破坏线程安全性?

答案:
misha的答案中的生产者-消费者问题,我仔细阅读后会理解.

question:
As I saying as above context,there is not any other theads would get data from proxy_array.
I just can not imagine why it would break thread-safe?

answer:
producer-consumer problem in misha's answer, I understand after reading it carefully.


问题:
还有一个问题,程序主线程是否可以与FetchThreads一起用作消费用户(换句话说,不需要创建StoreThread)

这是我不知道的,找到答案后我会进行更新.


question:
And one more asking,if the Program main thread can play as comsumer with FetchThreads (in another word,needn't create StoreThread)

this is what I cannot figure out,I would update after finded the answer.

推荐答案

我建议您阅读有关生产者-消费者问题.生产者是获取线程.您的使用者是save函数.如果我理解正确,则您希望使用者尽快将获取的结果保存起来.为了使此工作正常进行,生产者和使用者必须能够以某种线程安全的方式(例如队列)进行通信.

I recommend you read about the producer-consumer problem. Your producers are the fetch threads. Your consumer is the save function. If I understand correctly, you want the consumer to save the fetched result as soon as its available. For this to work, the producer and consumer must be able to communicate in some thread-safe way (e.g. a queue).

基本上,您需要另一个队列.它将替换proxy_array.您的save函数将如下所示:

Basically, you need another queue. It would replace proxy_array. Your save function will look something like this:

while True:
 try:
   data = fetch_data_from_output_queue()
   save_to_database(data)
 except EmptyQueue:
   if not stop_flag.is_set():
     # All done
     break
   time.sleep(1)
   continue

save函数将需要在其自己的线程中运行. stop_flag事件,其设置在之后您将加入获取线程.

This save function will need to run in its own thread. stop_flag is an Event that gets set after you join your fetch threads.

从高层次看,您的应用程序将如下所示:

From a high level, your application will look like this:

input_queue = initialize_input_queue()
ouput_queue = initialize_output_queue()

stop_flag = Event()
create_and_start_save_thread(output_queue) # read from output queue, save to DB
create_and_start_fetch_threads(input_queue, output_queue) # get sites to crawl from input queue, push crawled results to output_queue
join_fetch_threads() # this will block until the fetch threads have gone through everything in the input_queue
stop_flag.set() # this will inform the save thread that we are done
join_save_thread() # wait for all the saving to complete

这篇关于带有线程安全的python线程队列生产者-消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆