多处理Queue.get()挂起 [英] Multiprocessing Queue.get() hangs

查看:51
本文介绍了多处理Queue.get()挂起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现基本的多处理,但是遇到了一个问题. python脚本附在下面.

I'm trying to implement basic multiprocessing and I've run into an issue. The python script is attached below.

import time, sys, random, threading
from multiprocessing import Process
from Queue import Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency

append_queue = Queue(10)
database = FrequencyStore()

def add_to_append_queue(_list):
    append_queue.put(_list)

def process_append_queue():
    while True:
        item = append_queue.get()
        database.append(item)
        print("Appended to database in %.4f seconds" % database.append_time)
        append_queue.task_done()
    return

def main():
    database.load_db()
    print("Database loaded in %.4f seconds" % database.load_time)
    append_queue_process = Process(target=process_append_queue)
    append_queue_process.daemon = True
    append_queue_process.start()
    #t = threading.Thread(target=process_append_queue)
    #t.daemon = True
    #t.start()

    while True:
        path = raw_input("file: ")
        if path == "exit":
            break
        a = AnalyzeFrequency(path)
        a.analyze()
        print("Analyzed file in %.4f seconds" % a._time)
        add_to_append_queue(a.get_results())

    append_queue.join()
    #append_queue_process.join()
    database.save_db()
    print("Database saved in %.4f seconds" % database.save_time)
    sys.exit(0)

if __name__=="__main__":
    main()

AnalyzeFrequency分析文件中单词的频率,get_results()返回所述单词和频率的排序列表.该列表非常大,可能有10000个项目.

The AnalyzeFrequency analyzes the frequencies of words in a file and get_results() returns a sorted list of said words and frequencies. The list is very large, perhaps 10000 items.

此列表然后传递到add_to_append_queue方法,该方法将其添加到队列中. process_append_queue一个接一个地获取项目,并将频率添加到数据库"中.此操作所需的时间比main()中的实际分析时间长一点,因此我正在尝试对该方法使用单独的过程.当我尝试使用线程模块执行此操作时,一切工作都很好,没有错误.当我尝试使用Process时,脚本挂在item = append_queue.get().

This list is then passed to the add_to_append_queue method which adds it to a queue. The process_append_queue takes the items one by one and adds the frequencies to a "database". This operation takes a bit longer than the actual analysis in main() so I am trying to use a seperate process for this method. When I try and do this with the threading module, everything works perfectly fine, no errors. When I try and use Process, the script hangs at item = append_queue.get().

有人可以解释一下这里发生了什么,也许可以指导我解决问题?

Could someone please explain what is happening here, and perhaps direct me toward a fix?

感谢所有答案!

更新

泡菜错误是我的错,这只是一个错字.现在,我在多重处理中使用Queue类,但是append_queue.get()方法仍然挂起. 新代码

The pickle error was my fault, it was just a typo. Now I am using the Queue class within multiprocessing but the append_queue.get() method still hangs. NEW CODE

import time, sys, random
from multiprocessing import Process, Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency

append_queue = Queue()
database = FrequencyStore()

def add_to_append_queue(_list):
    append_queue.put(_list)

def process_append_queue():
    while True:
        database.append(append_queue.get())
        print("Appended to database in %.4f seconds" % database.append_time)
    return

def main():
    database.load_db()
    print("Database loaded in %.4f seconds" % database.load_time)
    append_queue_process = Process(target=process_append_queue)
    append_queue_process.daemon = True
    append_queue_process.start()
    #t = threading.Thread(target=process_append_queue)
    #t.daemon = True
    #t.start()

    while True:
        path = raw_input("file: ")
        if path == "exit":
            break
        a = AnalyzeFrequency(path)
        a.analyze()
        print("Analyzed file in %.4f seconds" % a._time)
        add_to_append_queue(a.get_results())

    #append_queue.join()
    #append_queue_process.join()
    print str(append_queue.qsize())
    database.save_db()
    print("Database saved in %.4f seconds" % database.save_time)
    sys.exit(0)

if __name__=="__main__":
    main()

更新2

这是数据库代码:

class FrequencyStore:

    def __init__(self):
        self.sorter = Sorter()
        self.db = {}
        self.load_time = -1
        self.save_time = -1
        self.append_time = -1
        self.sort_time = -1

    def load_db(self):
        start_time = time.time()

        try:
            file = open("results.txt", 'r')
        except:
            raise IOError

        self.db = {}
        for line in file:
            word, count = line.strip("\n").split("=")
            self.db[word] = int(count)
        file.close()

        self.load_time = time.time() - start_time

    def save_db(self):
        start_time = time.time()

        _db = []
        for key in self.db:
            _db.append([key, self.db[key]])
        _db = self.sort(_db)

        try:
            file = open("results.txt", 'w')
        except:
            raise IOError

        file.truncate(0)
        for x in _db:
            file.write(x[0] + "=" + str(x[1]) + "\n")
        file.close()

        self.save_time = time.time() - start_time

    def create_sorted_db(self):
        _temp_db = []
        for key in self.db:
            _temp_db.append([key, self.db[key]])
        _temp_db = self.sort(_temp_db)
        _temp_db.reverse()
        return _temp_db

    def get_db(self):
        return self.db

    def sort(self, _list):
        start_time = time.time()

        _list = self.sorter.mergesort(_list)
        _list.reverse()

        self.sort_time = time.time() - start_time
        return _list

    def append(self, _list):
        start_time = time.time()

        for x in _list:
            if x[0] not in self.db:
                self.db[x[0]] = x[1]
            else:
                self.db[x[0]] += x[1]

        self.append_time = time.time() - start_time

推荐答案

注释建议您尝试在Windows上运行它.正如我在评论中说的,

Comments suggest you're trying to run this on Windows. As I said in a comment,

如果您在Windows上运行此程序,它将无法正常工作-Windows无法运行 有fork(),因此每个进程都有自己的队列,它们什么也没有 互相做​​.整个模块由从头开始"导入 Windows上的每个进程.您需要在main()中创建队列, 并将其作为参数传递给worker函数.

If you're running this on Windows, it can't work - Windows doesn't have fork(), so each process gets its own Queue and they have nothing to do with each other. The entire module is imported "from scratch" by each process on Windows. You'll need to create the Queue in main(), and pass it as an argument to the worker function.

尽管我删除了所有数据库内容,但这里充实了使它可移植所需要做的工作,因为它与到目前为止已描述的问题无关.我还删除了daemon摆弄,因为通常只是一种懒惰的方式,可以避免干净地关闭设备,并且以后常常会再次咬你:

Here's fleshing out what you need to do to make it portable, although I removed all the database stuff because it's irrelevant to the problems you've described so far. I also removed the daemon fiddling, because that's usually just a lazy way to avoid shutting down things cleanly, and often as not will come back to bite you later:

def process_append_queue(append_queue):
    while True:
        x = append_queue.get()
        if x is None:
            break
        print("processed %d" % x)
    print("worker done")

def main():
    import multiprocessing as mp

    append_queue = mp.Queue(10)
    append_queue_process = mp.Process(target=process_append_queue, args=(append_queue,))
    append_queue_process.start()
    for i in range(100):
        append_queue.put(i)
    append_queue.put(None)  # tell worker we're done
    append_queue_process.join()

if __name__=="__main__":
    main()

输出是显而易见的"东西:

The output is the "obvious" stuff:

processed 0
processed 1
processed 2
processed 3
processed 4
...
processed 96
processed 97
processed 98
processed 99
worker done

注意:因为Windows不能(不能)fork(),所以工作进程不可能继承 Windows上的任何Python对象.每个进程从一开始就运行整个程序.这就是为什么您的原始程序无法工作的原因:每个进程都创建了自己的Queue,与另一个进程中的Queue完全无关.在上面显示的方法中,只有主进程创建Queue,然后主进程将其(作为参数)传递给工作进程.

Note: because Windows doesn't (can't) fork(), it's impossible for worker processes to inherit any Python object on Windows. Each process runs the entire program from its start. That's why your original program couldn't work: each process created its own Queue, wholly unrelated to the Queue in the other process. In the approach shown above, only the main process creates a Queue, and the main process passes it (as an argument) to the worker process.

这篇关于多处理Queue.get()挂起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆