如何从Java等非python语言调用芹菜任务延迟函数? [英] How to call a celery task delay function from non-python languages such as Java?

查看:187
本文介绍了如何从Java等非python语言调用芹菜任务延迟函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在3台机器上安装了celery + rabbitmq。我还创建了一个任务,它根据文件中的数据生成正则表达式,并使用该信息来解析文本。

I have setup celery + rabbitmq for on a 3 cluster machine. I have also created a task which generates a regular expression based on data from the file and uses the information to parse text.

from celery import Celery

celery = Celery('tasks', broker='amqp://localhost//')
import re

@celery.task
def add(x, y):
     return x + y


def get_regular_expression():
    with open("text") as fp:
        data = fp.readlines()
    str_re = "|".join([x.split()[2] for x in data ])
    return str_re    



@celery.task
def analyse_json(tw):
    str_re = get_regular_expression()
    re.match(str_re,tw.text) 

我可以使用以下python代码轻松调用此任务: -

I can make the call to this task very easily using the following python code :-

from tasks import analyse_tweet_json
x = tweet ## load from a file (x is a json)
analyse_tweet_json.delay(x) 

但是,现在我想从Java而不是python进行相同的调用。我不确定做同样事情的最简单方法是什么。

However, now I want to make the same call from Java and not python. I am not sure what's the easiest way of doing the same.

我写了这段代码,用于向AMQP经纪人发送消息。代码运行正常,但任务没有执行。我不知道如何指定应该执行的任务的名称。

I've written this code for sending a message to the AMQP broker. The code runs fine, but the task is not carried out. I am not sure how to specify the name of the task which should be carried out.

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

class try1 {
public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUri("amqp://localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "celery", "celery");
    String messageBody = "{\"text\":\"i am good\"}" ;
    byte[] msgBytes = messageBody.getBytes("ASCII") ;
    channel.basicPublish(queueName, queueName,
            new AMQP.BasicProperties
            ("application/json", null, null, null,
                    null, null, null, null,
                    null, null, null, "guest",
                    null, null),messageBody.getBytes("ASCII")) ;
    connection.close();    

}
}

这是rabbitMq的错误日志中的输出: -

this is the output in the errorlog of rabbitMq :-

connection <0.14627.0>, channel 1 - error:
{amqp_error,not_found,
"no exchange 'amq.gen-gEV47GX9pF_oZ-0bEnOazE' in vhost '/'",
'basic.publish'}

任何帮助将不胜感激。

谢谢,
Amit

thanks, Amit

推荐答案

有几个问题。

1)String queueName = channel.queueDeclare()。getQueue()命令返回错误的队列名称。我将queuename更改为芹菜,它运行良好。
2)json的格式必须是这种类型: -
{id:4cc7438e-afd4-4f8f-a2f3-f46567e7ca77,
任务:芹菜。 task.PingTask,
args:[],
kwargs:{},
retries:0,
eta:2009-11- 17T12:30:56.527191}

1) String queueName = channel.queueDeclare().getQueue() command was returning wrong queue name. I changed the queuename to "celery" and it worked fine. 2) The format of json has to be of this type:- {"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77", "task": "celery.task.PingTask", "args": [], "kwargs": {}, "retries": 0, "eta": "2009-11-17T12:30:56.527191"}

http://docs.celeryproject.org/en/latest/internals/protocol.html

这两个之后工作正常更改。

It worked fine after these two changes.

-Amit

这篇关于如何从Java等非python语言调用芹菜任务延迟函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆