pydispatcher 是否在后台线程中运行处理程序函数? [英] Does pydispatcher run the handler function in a background thread?
问题描述
在查找事件处理程序模块时,我遇到了 pydispatcher,它似乎对初学者很友好.我对该库的用例是,如果我的队列大小超过阈值,我想发送一个信号.然后,处理程序函数可以开始处理并从队列中删除项目(然后在数据库中进行批量插入).
Upon looking up event handler modules, I came across pydispatcher, which seemed beginner friendly. My use case for the library is that I want to send a signal if my queue size is over a threshold. The handler function can then start processing and removing items from the queue (and subsequently do a bulk insert into the database).
我希望处理程序函数在后台运行.我知道我可以简单地覆盖 queue.append() 方法检查队列大小并异步调用处理程序函数,但我想实现侦听器-调度程序模型以保持逻辑干净和分离.
I would like the handler function to run in the background. I am aware that I can simply overwrite the queue.append() method checking for the queue size and calling the handler function asynchronously, but I would like to implement the listener-dispatcher model to keep the logic clean and separated.
pydispatcher 是否开箱即用?如果没有,是否有另一个模块可以帮助我做到这一点?我是否需要管理对队列的访问,因为可能有多个线程同时处理和附加到队列?
Does pydispatcher do this out of the box? If not, is there another module that can help me do this? Would I need to manage the access to the queue, since there might be multiple threads processing and appending to the queue at the same time?
请注意,在我的用例中,只有一个调度程序和事件处理程序.
Note that in my use case there is only a single dispatcher and event handler.
推荐答案
我最近发布了 Akuanduba 模块,可以帮助您完成此任务.存储库中有一个示例可以帮助您了解它的工作原理,并且看起来与您想要的很相似.
I've recently released the Akuanduba module, that may help you with this task. There's a single example on the repository that may help you understand how it works and it seems similar to what you want.
无论如何,我将尝试在这里解释一种使用 Akuanduba 实现您的代码的方法:
Anyway, I'll try to explain here a way of implementing your code with Akuanduba:
- 首先,您可以制作一个数据框来容纳您的队列:
- First you could make a data frame that would hold your queue:
# Mandatory imports
from Akuanduba.core.messenger.macros import *
from Akuanduba.core.constants import *
from Akuanduba.core import NotSet, AkuandubaDataframe
# Your imports go here:
from queue import Queue
class MyQueue (AkuandubaDataframe):
def __init__(self, name):
# Mandatory stuff
AkuandubaDataframe.__init__(self, name)
self.__queue = Queue ()
def getQueue (self):
return self.__queue
def putQueue (self, val):
self.__queue.put(val)
def getQueueSize (self):
return self.__queue.qsize()
#
# "toRawObj" method is a mandatory method that delivers a dict with the desired data
# for file saving
#
def toRawObj(self):
d = {
"Queue" : self.getQueue(),
}
return d
- 然后你可以创建一个 TriggerCondition 来检查队列大小:
- Then you could make a TriggerCondition that would check the queue size:
from Akuanduba.core import StatusCode, NotSet, StatusTrigger
from Akuanduba.core.messenger.macros import *
from Akuanduba.core import TriggerCondition
import time
class CheckQueueSize (TriggerCondition):
def __init__(self, name, maxSize):
TriggerCondition.__init__(self, name)
self._name = name
self._maxSize = maxSize
def initialize(self):
return StatusCode.SUCCESS
def execute (self):
size = self.getContext().getHandler("MyQueue").getQueueSize()
if (size > SIZE_THRESHOLD):
return StatusTrigger.TRIGGERED
else:
return StatusTrigger.NOT_TRIGGERED
def finalize(self):
return StatusCode.SUCCESS
- 制作一个工具作为你的处理函数:
- Make a tool that would be your handler function:
# Mandatory imports
from Akuanduba.core import AkuandubaTool, StatusCode, NotSet, retrieve_kw
# Your imports go here:
class SampleTool(AkuandubaTool):
def __init__(self, name, **kw):
# Mandatory stuff
AkuandubaTool.__init__(self, name)
def initialize(self):
# Lock the initialization. After that, this tool can not be initialized once again
self.init_lock()
return StatusCode.SUCCESS
def execute(self,context):
#
# DO SOMETHING HERE
#
# Always return SUCCESS
return StatusCode.SUCCESS
def finalize(self):
self.fina_lock()
return StatusCode.SUCCESS
- 最后,制作一个主要脚本,以便让所有内容协同工作:
# Akuanduba imports
from Akuanduba.core import Akuanduba, LoggingLevel, AkuandubaTrigger
from Akuanduba import ServiceManager, ToolManager, DataframeManager
# This sample's imports
import MyQueue, CheckQueueSize, SampleTool
# Creating your handler
your_handler = SampleTool ("Your Handler's name")
# Creating dataframes
queue = MyQueue ("MyQueue")
# Creating trigger
trigger = AkuandubaTrigger("Sample Trigger Name", triggerType = 'or')
# Append conditions and tools to trigger just adding them
# Tools appended to the trigger will only run when trigger is StatusTrigger.TRIGGERED,
# and will run in the order they've been appended
trigger += CheckQueueSize( "CheckQueueSize condition", MAX_QUEUE_SIZE )
trigger += your_handler
# Creating Akuanduba
manager = Akuanduba("Akuanduba", level=LoggingLevel.INFO)
# Appending tools
#
# ToolManager += TOOL_1
# ToolManager += TOOL_2
#
ToolManager += trigger
# Apprending dataframes
DataframeManager += sampleDataframe
# Initializing
manager.initialize()
manager.execute()
manager.finalize()
那样,您将拥有干净且分离的代码.
That way, you'd have clean and separated code.
这篇关于pydispatcher 是否在后台线程中运行处理程序函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!