访问任务中的celery worker实例 [英] Accessing celery worker instance inside the task

查看:364
本文介绍了访问任务中的celery worker实例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在芹菜工人旁边使用jupyter内核.每个芹菜工人将有一个Jupyter内核.

I want to use jupyter kernels in side the celery worker. There will be one Jupyter Kernel for each Celery Worker.

要实现此目的,我将覆盖芹菜的默认Worker类,在工作人员初始化时,我将启动jupyter内核,并使用stop方法关闭jupyter内核.

To achieve it I am overriding the default Worker class of the celery, at the initialisation of the worker I am starting the jupyter kernel and with the stop method I am shutting down the jupyter kernel.

我当前面临的问题是在任务运行时如何访问任务内的内核实例?

The current problem I am facing is how can I access that kernel instance inside the task while the task is running ?

是否有比app.Worker = CustomWorker更好的方法来覆盖celery应用程序的Worker类定义?

Is there any better way to override the Worker class definition for the celery application than app.Worker = CustomWorker ?

这是自定义工作程序"的芹菜配置.

Here is the celery config with the Custom Worker.

from __future__ import absolute_import, unicode_literals
from celery import Celery
from jupyter_client import MultiKernelManager

app = Celery('proj',
    broker='redis://',
    backend='redis://',
    include=['tasks'])

app.conf.update(
    result_expires=3600
)

class CustomWorker(app.Worker):
    def __init__(self, *args, **kwargs):
        self.km = MultiKernelManager()
        self.kernel_id = self.km.start_kernel()
        print("Custom initializing")
        self.kernel_client = km.get_kernel(kernel_id).client()
        super(CustomWorker, self).__init__(*args, **kwargs)

    def on_close(self):
        self.km.shutdown_kernel(self.kernel_id)
        super(CustomWorker, self).on_close()

app.Worker = CustomWorker

if __name__ == '__main__':
    app.start()

这是tasks.py

from __future__ import absolute_import, unicode_literals
from celery import app

from celery import Task
from tornado import gen
from jupyter_client import MultiKernelManager
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()

reply_futures = {}

# This is my celery task where I pass the arbitary python code to execute on
# some celery worker(actually to the corresponding kernel)
@app.task
def pythontask(code):
    # I don't know how to get the kernel_client for current celery worker !!?
    kernel_client = self.get_current_worker().kernel_client
    mid = kernel_client.execute(code)

    # defining the callback which will be executed when message arrives on
    # zmq stream
    def reply_callback(session, stream, msg_list):
        idents, msg_parts = session.feed_identities(msg_list)
        reply = session.deserialize(msg_parts)
        parent_id = reply['parent_header'].get('msg_id')
        reply_future = reply_futures.get(parent_id)
        if reply_future:
            reply_future.set_result(reply)

    @gen.coroutine
    def execute(kernel_client, code):
        msg_id = kernel_client.execute(code)
        f = reply_futures[msg_id] = Future()
        yield f
        raise gen.Return(msg_id)

    # initializing the zmq streams and attaching the callback to receive message
    # from the kernel
    shell_stream = ZMQStream(kernel_client.shell_channel.socket)
    iopub_stream = ZMQStream(kernel_client.iopub_channel.socket)
    shell_stream.on_recv_stream(partial(reply_callback, kernel_client.session))
    iopub_stream.on_recv_stream(partial(reply_callback, kernel_client.session))

    # create a IOLoop
    loop = ioloop.IOLoop.current()
    # listen on the streams
    msg_id = loop.run_sync(lambda: execute(kernel_client,code))
    print(reply_msgs[msg_id])
    reply_msgs[msg_id] = []

    # Disable callback and automatic receiving.
    shell_stream.on_recv_stream(None)
    iopub_stream.on_recv_stream(None)

推荐答案

将工作实例信息添加到请求对象中解决了我的问题.为此,我覆盖了worker类的_process_task方法.

Adding that worker instance information to the request object solved my problem. to do that I overrode the _process_task method of the worker class.

def _process_task(self, req):
  try:
    req.kwargs['kernel_client'] = self.kernel_client
    print("printing from _process_task {}".format(req.kwargs))
    req.execute_using_pool(self.pool)
  except TaskRevokedError:
    try:
      self._quick_release()   # Issue 877
    except AttributeError:
      pass
  except Exception as exc:
    logger.critical('Internal error: %r\n%s',exc, traceback.format_exc(), exc_info=True)

这是我访问kernel_client

@app.task(bind=True)
def pythontask(self,code, kernel_client=None):

    mid = kernel_client.execute(code)

    print("{}".format(kernel_client))
    print("{}".format(mid))

仅当我以独奏模式启动工作程序时,此功能才起作用,否则,它将引发一些酸洗错误.无论如何,使用单工是我的要求,因此此解决方案对我有用

This thing works only when I start workers in solo mode otherwise not it throws some pickling error. Anyways using solo workers is my requirement so this solution works for me

这篇关于访问任务中的celery worker实例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆