从Java与Django / Celery进行互操作 [英] Interoperating with Django/Celery From Java

查看:256
本文介绍了从Java与Django / Celery进行互操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们公司有一个基于Python的网站和一些基于Python的工作者节点,通过Django / Celery和RabbitMQ进行通信。我有一个基于Java的应用程序,它需要向Celery的工作人员提交任务。我可以从Java发送工作到RabbitMQ,但是Celery的工作人员从来没有收到工作。从查看两种类型的作业提交的数据包捕获,有差异,但我不能理解如何解释它们,因为很多是二进制的,我找不到有关解码的文档。有没有人有Java / RabbitMQ和Celery一起工作的任何参考或经验?

Our company has a Python based web site and some Python based worker nodes which communicate via Django/Celery and RabbitMQ. I have a Java based application which needs to submit tasks to the Celery based workers. I can send jobs to RabbitMQ from Java just fine, but the Celery based workers are never picking up the jobs. From looking at the packet captures of both types of job submissions, there are differences, but I cannot fathom how to account for them because a lot of it is binary that I cannot find documentation about decoding. Does anyone here have any reference or experience with having Java/RabbitMQ and Celery working together?

推荐答案

我找到了解决方案。 RabbitMQ的Java库指的是交换/队列/路由密钥。在Celery中,队列名实际上映射到Java库中引用的交换。默认情况下,Celery的队列只是芹菜。如果您的Django设置使用以下语法定义名为myqueue的队列:

I found the solution. The Java library for RabbitMQ refers to exchanges/queues/routekeys. In Celery, the queue name is actually mapping to the exchange referred to in the Java library. By default, the queue for Celery is simply "celery". If your Django settings define a queue called "myqueue" using the following syntax:

CELERY_ROUTES = {
    'mypackage.myclass.runworker'      : {'queue':'myqueue'},
}

然后基于Java的代码需要执行以下操作:

Then the Java based code needs to do something like the following:

        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = null ;
        try {
            connection = factory.newConnection(mqHost, mqPort);
        } catch (IOException ioe) {
            log.error("Unable to create new MQ connection from factory.", ioe) ;
        }

        Channel channel = null ;
        try {
            channel = connection.createChannel();
        } catch (IOException ioe) {
            log.error("Unable to create new channel for MQ connection.", ioe) ;
        }

        try {
            channel.queueDeclare("celery", false, false, false, true, null);
        } catch (IOException ioe) {
            log.error("Unable to declare queue for MQ channel.", ioe) ;
        }

        try {
            channel.exchangeDeclare("myqueue", "direct") ;
        } catch (IOException ioe) {
            log.error("Unable to declare exchange for MQ channel.", ioe) ;
        }

        try {
            channel.queueBind("celery", "myqueue", "myqueue") ;
        } catch (IOException ioe) {
            log.error("Unable to bind queue for channel.", ioe) ;
        }

            // Generate the message body as a string here.

        try {
            channel.basicPublish(mqExchange, mqRouteKey, 
                new AMQP.BasicProperties("application/json", "ASCII", null, null, null, null, null, null, null, null, null, "guest", null, null),
                messageBody.getBytes("ASCII"));
        } catch (IOException ioe) {
            log.error("IOException encountered while trying to publish task via MQ.", ioe) ;
        }

事实证明,这仅仅是术语的区别。

It turns out that it is just a difference in terminology.

这篇关于从Java与Django / Celery进行互操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆