通过IPython内核在Jupyter中的Javascript与服务器之间直接通信 [英] Direct communication between Javascript in Jupyter and server via IPython kernel

查看:136
本文介绍了通过IPython内核在Jupyter中的Javascript与服务器之间直接通信的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在Jupyter单元中显示一个基于Three.js的交互式网格可视化工具.工作流程如下:

I'm trying to display an interactive mesh visualizer based on Three.js inside a Jupyter cell. The workflow is the following:

  1. 用户启动Jupyter笔记本,然后在单元格中打开查看器
  2. 使用Python命令,用户可以手动添加网格并对其进行交互式动画

实际上,主线程正在通过ZMQ套接字向服务器发送请求(每个请求都需要一个答复),然后服务器使用其他套接字对将所需数据发送回主线程(许多请求",期望得到的答复很少),最终它通过ipython内核使用通信将数据发送到Javascript前端.到目前为止,一切都很好,并且可以正常工作,因为消息都朝着相同的方向流动:

In practice, the main thread is sending requests to a server via ZMQ sockets (every request needs a single reply), then the server sends back the desired data to the main thread using other socket pairs (many "request", very few replies expected), which finally uses communication through ipython kernel to send the data to the Javascript frontend. So far so good, and it works properly because the messages are all flowing in the same direction:

Main thread (Python command) [ZMQ REQ] -> [ZMQ REP] Server (Data) [ZMQ XREQ] -> [ZMQ XREQ] Main thread (Data) [IPykernel Comm] -> [Ipykernel Comm] Javascript (Display)

但是,当我要获取前端的状态以等待网格完成加载时,模式会有所不同:

However, the pattern is different when I'm want to fetch the status of the frontend to wait for the meshes to finish loading:

Main thread (Status request) --> Server (Status request) --> Main thread (Waiting for reply)
|                                                         |
<--------------------------------Javascript (Processing) <--

这一次,服务器将请求发送到前端,前端不会将回复直接发送回服务器,而是发送到主线程,该主线程会将回复转发给服务器,最后转发给主线程

This time, the server sends a request to the frontend, which in return does not send the reply directly back to the server, but to the main thread, that will forward the reply to the server, and finally to the main thread.

有一个明显的问题:主线程应该联合转发前端的回复并从服务器接收回复,这是不可能的.理想的解决方案是使服务器直接与前端通信,但是我不知道该怎么做,因为我无法在服务器端使用 get_ipython().kernel.comm_manager.register_target .我试图使用 jupyter_client.BlockingKernelClient 在服务器端实例化一个ipython内核客户端,但是我没有使用它进行通信或注册目标.

There is a clear issue: the main thread is supposed to jointly forward the reply of the frontend and receive the reply from the server, which is impossible. The ideal solution would be to enable the server to communicate directly with the frontend but I don't know how to do that, since I cannot use get_ipython().kernel.comm_manager.register_target on the server side. I tried to instantiate an ipython kernel client on the server side using jupyter_client.BlockingKernelClient, but I didn't manged to use it to communicate nor to register targets.

推荐答案

好,所以我现在找到了一个解决方案,但这不是很好.实际上,只是等待答复并保持主循环繁忙,我添加了一个超时并将其与内核的 do_one_iteration 进行交织,以强制处理消息:

OK so I found a solution for now but it is not great. Indeed of just waiting for a reply and keep busy the main loop, I added a timeout and interleave it with do_one_iteration of the kernel to force to handle to messages:

while True:
    try:
        rep = zmq_socket.recv(flags=zmq.NOBLOCK).decode("utf-8")
    except zmq.error.ZMQError:
        kernel.do_one_iteration()

它可以工作,但不幸的是,它不是真正可移植的,并且与Jupyter评估堆栈混为一谈(所有排队的评估将在此处而不是按顺序进行处理)...

It works but unfortunately it is not really portable and it messes up with the Jupyter evaluation stack (all queued evaluations will be processed here instead of in order)...

或者,还有另一种更具吸引力的方式:

Alternatively, there is another way that is more appealing:

import zmq
import asyncio
import nest_asyncio

nest_asyncio.apply()

zmq_socket.send(b"ready")
async def enforce_receive():
    await kernel.process_one(True)
    return zmq_socket.recv().decode("utf-8")
loop = asyncio.get_event_loop()
rep = loop.run_until_complete(enforce_receive())

但是在这种情况下,您需要提前知道您希望内核仅接收一条消息,并且依赖 nest_asyncio 也不理想.

but in this case you need to know in advance that you expect the kernel to receive exactly one message, and relying on nest_asyncio is not ideal either.

以下是与此主题相关的问题的链接,该主题为 Github ,以及示例笔记本.

Here is a link to an issue on this topic of Github, along with an example notebook.

我终于设法完全解决了我的问题,没有缺点.诀窍是分析每个传入的消息.不相关的消息按顺序放回队列中,而与通信相关的消息则当场处理:

I finally manage to solve completely my issue, without shortcomings. The trick is to analyze every incoming messages. The irrelevant messages are put back in the queue in order, while the comm-related ones are processed on-the-spot:

class CommProcessor:
    """
    @brief     Re-implementation of ipykernel.kernelbase.do_one_iteration
                to only handle comm messages on the spot, and put back in
                the stack the other ones.

    @details   Calling 'do_one_iteration' messes up with kernel
                'msg_queue'. Some messages will be processed too soon,
                which is likely to corrupt the kernel state. This method
                only processes comm messages to avoid such side effects.
    """

    def __init__(self):
        self.__kernel = get_ipython().kernel
        self.qsize_old = 0

    def __call__(self, unsafe=False):
        """
        @brief      Check once if there is pending comm related event in
                    the shell stream message priority queue.

        @param[in]  unsafe     Whether or not to assume check if the number
                                of pending message has changed is enough. It
                                makes the evaluation much faster but flawed.
        """
        # Flush every IN messages on shell_stream only
        # Note that it is a faster implementation of ZMQStream.flush
        # to only handle incoming messages. It reduces the computation
        # time from about 10us to 20ns.
        # https://github.com/zeromq/pyzmq/blob/e424f83ceb0856204c96b1abac93a1cfe205df4a/zmq/eventloop/zmqstream.py#L313
        shell_stream = self.__kernel.shell_streams[0]
        shell_stream.poller.register(shell_stream.socket, zmq.POLLIN)
        events = shell_stream.poller.poll(0)
        while events:
            _, event = events[0]
            if event:
                shell_stream._handle_recv()
                shell_stream.poller.register(
                    shell_stream.socket, zmq.POLLIN)
                events = shell_stream.poller.poll(0)

        qsize = self.__kernel.msg_queue.qsize()
        if unsafe and qsize == self.qsize_old:
            # The number of queued messages in the queue has not changed
            # since it last time it has been checked. Assuming those
            # messages are the same has before and returning earlier.
            return

        # One must go through all the messages to keep them in order
        for _ in range(qsize):
            priority, t, dispatch, args = \
                self.__kernel.msg_queue.get_nowait()
            if priority <= SHELL_PRIORITY:
                _, msg = self.__kernel.session.feed_identities(
                    args[1], copy=False)
                msg = self.__kernel.session.deserialize(
                    msg, content=False, copy=False)
            else:
                # Do not spend time analyzing already rejected message
                msg = None
            if msg is None or not 'comm_' in msg['header']['msg_type']:
                # The message is not related to comm, so putting it back in
                # the queue after lowering its priority so that it is send
                # at the "end of the queue", ie just at the right place:
                # after the next unchecked messages, after the other
                # messages already put back in the queue, but before the
                # next one to go the same way. Note that every shell
                # messages have SHELL_PRIORITY by default.
                self.__kernel.msg_queue.put_nowait(
                    (SHELL_PRIORITY + 1, t, dispatch, args))
            else:
                # Comm message. Processing it right now.
                tornado.gen.maybe_future(dispatch(*args))
        self.qsize_old = self.__kernel.msg_queue.qsize()

process_kernel_comm = CommProcessor()

这篇关于通过IPython内核在Jupyter中的Javascript与服务器之间直接通信的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆