pydispatcher 是否在后台线程中运行处理程序函数? [英] Does pydispatcher run the handler function in a background thread?

查看:50
本文介绍了pydispatcher 是否在后台线程中运行处理程序函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在查找事件处理程序模块时,我遇到了 pydispatcher,它似乎对初学者很友好.我对该库的用例是,如果我的队列大小超过阈值,我想发送一个信号.然后,处理程序函数可以开始处理并从队列中删除项目(然后在数据库中进行批量插入).

Upon looking up event handler modules, I came across pydispatcher, which seemed beginner friendly. My use case for the library is that I want to send a signal if my queue size is over a threshold. The handler function can then start processing and removing items from the queue (and subsequently do a bulk insert into the database).

我希望处理程序函数在后台运行.我知道我可以简单地覆盖 queue.append() 方法检查队列大小并异步调用处理程序函数,但我想实现侦听器-调度程序模型以保持逻辑干净和分离.

I would like the handler function to run in the background. I am aware that I can simply overwrite the queue.append() method checking for the queue size and calling the handler function asynchronously, but I would like to implement the listener-dispatcher model to keep the logic clean and separated.

pydispatcher 是否开箱即用?如果没有,是否有另一个模块可以帮助我做到这一点?我是否需要管理对队列的访问,因为可能有多个线程同时处理和附加到队列?

Does pydispatcher do this out of the box? If not, is there another module that can help me do this? Would I need to manage the access to the queue, since there might be multiple threads processing and appending to the queue at the same time?

请注意,在我的用例中,只有一个调度程序和事件处理程序.

Note that in my use case there is only a single dispatcher and event handler.

推荐答案

我最近发布了 Akuanduba 模块,可以帮助您完成此任务.存储库中有一个示例可以帮助您了解它的工作原理,并且看起来与您想要的很相似.

I've recently released the Akuanduba module, that may help you with this task. There's a single example on the repository that may help you understand how it works and it seems similar to what you want.

无论如何,我将尝试在这里解释一种使用 Akuanduba 实现您的代码的方法:

Anyway, I'll try to explain here a way of implementing your code with Akuanduba:

  • 首先,您可以制作一个数据框来容纳您的队列:
  • First you could make a data frame that would hold your queue:
# Mandatory imports
from Akuanduba.core.messenger.macros import *
from Akuanduba.core.constants import *
from Akuanduba.core import NotSet, AkuandubaDataframe
# Your imports go here:
from queue import Queue

class MyQueue (AkuandubaDataframe):

  def __init__(self, name):

    # Mandatory stuff
    AkuandubaDataframe.__init__(self, name)

    self.__queue = Queue ()

  def getQueue (self):
    return self.__queue

  def putQueue (self, val):
    self.__queue.put(val)

  def getQueueSize (self):
    return self.__queue.qsize()

  #
  # "toRawObj" method is a mandatory method that delivers a dict with the desired data
  # for file saving
  #
  def toRawObj(self):
    d = {
          "Queue" : self.getQueue(),
          }
    return d

  • 然后你可以创建一个 TriggerCondition 来检查队列大小:
    • Then you could make a TriggerCondition that would check the queue size:
    • from Akuanduba.core import StatusCode, NotSet, StatusTrigger
      from Akuanduba.core.messenger.macros import *
      from Akuanduba.core import TriggerCondition
      import time
      
      class CheckQueueSize (TriggerCondition):
      
        def __init__(self, name, maxSize):
      
          TriggerCondition.__init__(self, name)
          self._name = name
          self._maxSize = maxSize
      
        def initialize(self):
      
          return StatusCode.SUCCESS
      
        def execute (self):
      
          size = self.getContext().getHandler("MyQueue").getQueueSize()
          if (size > SIZE_THRESHOLD):
            return StatusTrigger.TRIGGERED
          else:
            return StatusTrigger.NOT_TRIGGERED
      
        def finalize(self):
      
          return StatusCode.SUCCESS
      

      • 制作一个工具作为你的处理函数:
        • Make a tool that would be your handler function:
        • # Mandatory imports
          from Akuanduba.core import AkuandubaTool, StatusCode, NotSet, retrieve_kw
          # Your imports go here:
          
          class SampleTool(AkuandubaTool):
          
            def __init__(self, name, **kw):
          
              # Mandatory stuff
              AkuandubaTool.__init__(self, name)
          
          
            def initialize(self):
          
              # Lock the initialization. After that, this tool can not be initialized once again
              self.init_lock()
              return StatusCode.SUCCESS
          
          
            def execute(self,context):
          
              #
              # DO SOMETHING HERE
              #
          
              # Always return SUCCESS
              return StatusCode.SUCCESS
          
            def finalize(self):
              self.fina_lock()
              return StatusCode.SUCCESS
          

          • 最后,制作一个主要脚本,以便让所有内容协同工作:
          • # Akuanduba imports
            from Akuanduba.core import Akuanduba, LoggingLevel, AkuandubaTrigger
            from Akuanduba import ServiceManager, ToolManager, DataframeManager
            
            # This sample's imports
            import MyQueue, CheckQueueSize, SampleTool
            
            # Creating your handler
            your_handler = SampleTool ("Your Handler's name")
            
            # Creating dataframes
            queue = MyQueue ("MyQueue")
            
            # Creating trigger
            trigger  = AkuandubaTrigger("Sample Trigger Name", triggerType = 'or')
            
            # Append conditions and tools to trigger just adding them
            # Tools appended to the trigger will only run when trigger is StatusTrigger.TRIGGERED,
            # and will run in the order they've been appended
            trigger += CheckQueueSize( "CheckQueueSize condition", MAX_QUEUE_SIZE )
            trigger += your_handler
            
            # Creating Akuanduba
            manager = Akuanduba("Akuanduba", level=LoggingLevel.INFO)
            
            # Appending tools
            #
            # ToolManager += TOOL_1
            # ToolManager += TOOL_2
            #
            ToolManager += trigger
            
            # Apprending dataframes
            DataframeManager += sampleDataframe
            
            # Initializing 
            manager.initialize()
            manager.execute()
            manager.finalize()
            

            那样,您将拥有干净且分离的代码.

            That way, you'd have clean and separated code.

            这篇关于pydispatcher 是否在后台线程中运行处理程序函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆