访问任务中的celery worker实例 [英] Accessing celery worker instance inside the task
问题描述
我想在芹菜工人旁边使用jupyter内核.每个芹菜工人将有一个Jupyter内核.
I want to use jupyter kernels in side the celery worker. There will be one Jupyter Kernel for each Celery Worker.
要实现此目的,我将覆盖芹菜的默认Worker
类,在工作人员初始化时,我将启动jupyter内核,并使用stop方法关闭jupyter内核.
To achieve it I am overriding the default Worker
class of the celery, at the initialisation of the worker I am starting the jupyter kernel and with the stop method I am shutting down the jupyter kernel.
我当前面临的问题是在任务运行时如何访问任务内的内核实例?
The current problem I am facing is how can I access that kernel instance inside the task while the task is running ?
是否有比app.Worker = CustomWorker
更好的方法来覆盖celery
应用程序的Worker
类定义?
Is there any better way to override the Worker
class definition for the celery
application than app.Worker = CustomWorker
?
这是自定义工作程序"的芹菜配置.
Here is the celery config with the Custom Worker.
from __future__ import absolute_import, unicode_literals
from celery import Celery
from jupyter_client import MultiKernelManager
app = Celery('proj',
broker='redis://',
backend='redis://',
include=['tasks'])
app.conf.update(
result_expires=3600
)
class CustomWorker(app.Worker):
def __init__(self, *args, **kwargs):
self.km = MultiKernelManager()
self.kernel_id = self.km.start_kernel()
print("Custom initializing")
self.kernel_client = km.get_kernel(kernel_id).client()
super(CustomWorker, self).__init__(*args, **kwargs)
def on_close(self):
self.km.shutdown_kernel(self.kernel_id)
super(CustomWorker, self).on_close()
app.Worker = CustomWorker
if __name__ == '__main__':
app.start()
这是tasks.py
from __future__ import absolute_import, unicode_literals
from celery import app
from celery import Task
from tornado import gen
from jupyter_client import MultiKernelManager
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()
reply_futures = {}
# This is my celery task where I pass the arbitary python code to execute on
# some celery worker(actually to the corresponding kernel)
@app.task
def pythontask(code):
# I don't know how to get the kernel_client for current celery worker !!?
kernel_client = self.get_current_worker().kernel_client
mid = kernel_client.execute(code)
# defining the callback which will be executed when message arrives on
# zmq stream
def reply_callback(session, stream, msg_list):
idents, msg_parts = session.feed_identities(msg_list)
reply = session.deserialize(msg_parts)
parent_id = reply['parent_header'].get('msg_id')
reply_future = reply_futures.get(parent_id)
if reply_future:
reply_future.set_result(reply)
@gen.coroutine
def execute(kernel_client, code):
msg_id = kernel_client.execute(code)
f = reply_futures[msg_id] = Future()
yield f
raise gen.Return(msg_id)
# initializing the zmq streams and attaching the callback to receive message
# from the kernel
shell_stream = ZMQStream(kernel_client.shell_channel.socket)
iopub_stream = ZMQStream(kernel_client.iopub_channel.socket)
shell_stream.on_recv_stream(partial(reply_callback, kernel_client.session))
iopub_stream.on_recv_stream(partial(reply_callback, kernel_client.session))
# create a IOLoop
loop = ioloop.IOLoop.current()
# listen on the streams
msg_id = loop.run_sync(lambda: execute(kernel_client,code))
print(reply_msgs[msg_id])
reply_msgs[msg_id] = []
# Disable callback and automatic receiving.
shell_stream.on_recv_stream(None)
iopub_stream.on_recv_stream(None)
推荐答案
将工作实例信息添加到请求对象中解决了我的问题.为此,我覆盖了worker类的_process_task
方法.
Adding that worker instance information to the request object solved my problem. to do that I overrode the _process_task
method of the worker class.
def _process_task(self, req):
try:
req.kwargs['kernel_client'] = self.kernel_client
print("printing from _process_task {}".format(req.kwargs))
req.execute_using_pool(self.pool)
except TaskRevokedError:
try:
self._quick_release() # Issue 877
except AttributeError:
pass
except Exception as exc:
logger.critical('Internal error: %r\n%s',exc, traceback.format_exc(), exc_info=True)
这是我访问kernel_client
@app.task(bind=True)
def pythontask(self,code, kernel_client=None):
mid = kernel_client.execute(code)
print("{}".format(kernel_client))
print("{}".format(mid))
仅当我以独奏模式启动工作程序时,此功能才起作用,否则,它将引发一些酸洗错误.无论如何,使用单工是我的要求,因此此解决方案对我有用
This thing works only when I start workers in solo mode otherwise not it throws some pickling error. Anyways using solo workers is my requirement so this solution works for me
这篇关于访问任务中的celery worker实例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!