异步 Camel 组件 - 立即调用 doStop() [英] Asynchronous Camel Component - doStop() called immediately

查看:51
本文介绍了异步 Camel 组件 - 立即调用 doStop()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个使用来自外部服务的 API 的骆驼组件.

I am trying to create a camel component which consumes an API from an external service.

我的路线如下

from("myComponent:entity?from=&to=")
.to("seda:one")

from("seda:one")
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(5)
.completionTimeout(5000)
.process( new Processor1() )
to("seda:two")

.
.
.


from("seda:five")
.to("myComponent2:entity")

我按如下方式实现了我的组件消费者

I implemented my component consumer as follows

public class MyComponentConsumer extends DefaultConsumer {

    public MyComponentConsumer(MyComponentEndpoint endpoint, Processor processor) {
        super(endpoint, processor);
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        flag = true;
        while ( flag ) {
            //external API call
            Resource resource = getNextResource();
            if ( resource.next() == null ) {
                flag = false;
            }
            Exchange ex = endpoint.createExchange(ExchangePattern.InOnly);
            ex.getIn().setBody(resource.toString());
            getAsyncProcessor().process(
                            ex
                            doneSync -> {
                                LOG.info("Message processed");
                            }
                    );
        }
    }

    @Override
    protected void doStop() throws Exception {
        super.doStop();
        System.out.println("stop ---- ");
    }
}

一切正常,数据通过路由传播.我唯一的问题是,直到整个过程完成后,数据才会传播到下一部分.接下来的部分是异步运行的.

Everything worked fine and the data was propogating through the route. My only problem was that data did not propogate to the next part until the whole of this process was completed. And the next parts were running asynchronously.

我查看了 StreamConsumer 的示例,并尝试使用 runnable 和 executorService 将其实现到我的代码中.但如果我这样做,消费者一开始就停止.

I looked at the example of StreamConsumer and tried to implement it to my code using a runnable and an executorService. But if I do that consumer stops as soon as it starts.

我把代码改成

public class MyComponentConsumer extends DefaultConsumer implements Runnable 

并添加

private ExecutorService executor;
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "myComponent");
executor.execute(this);

并将我的逻辑移到 run() 方法中.但是,消费者线程一开始就结束.并且异步处理器没有正确传输数据.

and moved my logic inside the run() method. But, the consumer thread ends as soon as it starts. and the async processor does not transfer the data properly.

有没有其他方法可以实现我需要的功能,或者我在这里的某个地方弄错了.任何帮助将不胜感激.

Is there any other way to implement the functionality I need or am I mistaken somewhere here. Any help would be appreciated.

推荐答案

您使用的是什么版本的骆驼?

What version of camel are you using?

在camel 2.x 中管理消费者状态存在问题,在camel 3.x 中已修复CAMEL-12765 这可能会导致您在此处描述的问题.

There was an issue with managing the state of consumer in camel 2.x which was fixed in camel 3.x CAMEL-12765 which can lead to the issue you are describing here.

如果您使用的是骆驼 2.x,请尝试使用 newScheduledThreadPool 而不是 newSingleThreadExecutor.还有 executor.schedule(this, 5L, TimeUnit.SECONDS) 而不是 executor.execute(this).

If you are on camel 2.x try using newScheduledThreadPool instead of newSingleThreadExecutor. Also executor.schedule(this, 5L, TimeUnit.SECONDS) instead of executor.execute(this).

延迟启动执行程序可能有助于避免您面临的问题.

Delayed start of executor might help avoid the problem you are facing.

这篇关于异步 Camel 组件 - 立即调用 doStop()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆