异步骆驼组件-doStop()立即调用 [英] Asynchronous Camel Component - doStop() called immediately

查看:49
本文介绍了异步骆驼组件-doStop()立即调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个使用来自外部服务的API的骆驼组件.

I am trying to create a camel component which consumes an API from an external service.

我的路线如下

from("myComponent:entity?from=&to=")
.to("seda:one")

from("seda:one")
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(5)
.completionTimeout(5000)
.process( new Processor1() )
to("seda:two")

.
.
.


from("seda:five")
.to("myComponent2:entity")

我实现了我的组件使用者,如下所示

I implemented my component consumer as follows

public class MyComponentConsumer extends DefaultConsumer {

    public MyComponentConsumer(MyComponentEndpoint endpoint, Processor processor) {
        super(endpoint, processor);
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        flag = true;
        while ( flag ) {
            //external API call
            Resource resource = getNextResource();
            if ( resource.next() == null ) {
                flag = false;
            }
            Exchange ex = endpoint.createExchange(ExchangePattern.InOnly);
            ex.getIn().setBody(resource.toString());
            getAsyncProcessor().process(
                            ex
                            doneSync -> {
                                LOG.info("Message processed");
                            }
                    );
        }
    }

    @Override
    protected void doStop() throws Exception {
        super.doStop();
        System.out.println("stop ---- ");
    }
}

一切正常,数据在路由中传播.我唯一的问题是,在整个过程完成之前,数据不会传播到下一部分.接下来的部分异步运行.

Everything worked fine and the data was propogating through the route. My only problem was that data did not propogate to the next part until the whole of this process was completed. And the next parts were running asynchronously.

我看了StreamConsumer的示例,并尝试使用runnable和executorService将其实现到我的代码中.但是,如果我这样做,消费者就会一开始就停止.

I looked at the example of StreamConsumer and tried to implement it to my code using a runnable and an executorService. But if I do that consumer stops as soon as it starts.

我将代码更改为

public class MyComponentConsumer extends DefaultConsumer implements Runnable 

并添加

private ExecutorService executor;
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "myComponent");
executor.execute(this);

,然后将我的逻辑移到run()方法中.但是,使用者线程一开始就结束.并且异步处理器无法正确传输数据.

and moved my logic inside the run() method. But, the consumer thread ends as soon as it starts. and the async processor does not transfer the data properly.

还有其他方法可以实现我需要的功能吗?还是我在这里的某个地方弄错了.任何帮助将不胜感激.

Is there any other way to implement the functionality I need or am I mistaken somewhere here. Any help would be appreciated.

推荐答案

您正在使用哪个版本的骆驼?

What version of camel are you using?

在骆驼2.x中管理骆驼2.x的消费者状态存在问题. CAMEL-12765 可能会导致您在此处描述的问题.

There was an issue with managing the state of consumer in camel 2.x which was fixed in camel 3.x CAMEL-12765 which can lead to the issue you are describing here.

如果您使用的是骆驼2.x,请尝试使用newScheduledThreadPool而不是newSingleThreadExecutor.也是executor.schedule(this,5L,TimeUnit.SECONDS),而不是executor.execute(this).

If you are on camel 2.x try using newScheduledThreadPool instead of newSingleThreadExecutor. Also executor.schedule(this, 5L, TimeUnit.SECONDS) instead of executor.execute(this).

执行器的启动延迟可能有助于避免您遇到的问题.

Delayed start of executor might help avoid the problem you are facing.

这篇关于异步骆驼组件-doStop()立即调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆