线程队列在Python中挂起 [英] Threading queue hangs in Python

查看:0
本文介绍了线程队列在Python中挂起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过队列使解析器成为多线程的。它似乎起作用了,但我的队伍挂了起来。如果有人能告诉我如何修复这个问题,我将不胜感激,因为我很少编写多线程代码。

此代码读取Q:

from silk import *
import json
import datetime
import pandas
import Queue
from threading import Thread

l = []
q = Queue.Queue()

def parse_record():
    d = {}
    while not q.empty():
        rec = q.get()
        d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
        # ... many ops like this
        d['dport'] = rec.dport
        l.append(d) # l is global

这填满了问题:

def parse_records():
    ffile = '/tmp/query.rwf'
    flows = SilkFile(ffile, READ)
    numthreads = 2

    # fill queue
    for rec in flows:
        q.put(rec)
    # work on Queue    
    for i in range(numthreads):
        t = Thread(target = parse_record)
        t.daemon = True
        t.start()

    # blocking
    q.join()

    # never reached    
    data_df = pandas.DataFrame.from_records(l)
    return data_df
我只在我的Main中调用parse_records()。它永远不会结束。

推荐答案

Queue.empty doc表示:

...如果Empty()返回False,则不能保证后续对Get()的调用不会阻塞。

作为最低要求,您应该使用get_nowait,否则将面临数据丢失的风险。但更重要的是,只有当所有排队的项都被标记为完成了Queue.task_done调用:

时,联接才会释放

如果Join()当前被阻止,它将在处理完所有项目后恢复(这意味着已将()放入队列的每个项目都收到了TASK_Done()调用)。

作为附注,l.append(d)不是原子的,应该使用锁进行保护。

from silk import *
import json
import datetime
import pandas
import Queue
from threading import Thread, Lock

l = []
l_lock = Lock()
q = Queue.Queue()

def parse_record():
    d = {}
    while 1:
        try:
            rec = q.getnowait()
            d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
            # ... many ops like this
            d['dport'] = rec.dport
            with l_lock():
                l.append(d) # l is global
            q.task_done()
        except Queue.Empty:
            return

通过使用标准库中的线程池,您可以大大缩短代码。

from silk import *
import json
import datetime
import pandas
import multiprocessing.pool

def parse_record(rec):
    d = {}
    d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
    # ... many ops like this
    d['dport'] = rec.dport
    return d

def parse_records():
    ffile = '/tmp/query.rwf'
    flows = SilkFile(ffile, READ)
    pool = multiprocessing.pool.Pool(2)
    data_df = pandas.DataFrame.from_records(pool.map(parse_record), flows)
    pool.close()
    return data_df

这篇关于线程队列在Python中挂起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆