如何使用camel-kafka手动控制偏移提交? [英] How to manually control the offset commit with camel-kafka?

查看:23
本文介绍了如何使用camel-kafka手动控制偏移提交?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用骆驼 kafka 组件,但我不清楚提交偏移量的幕后发生了什么.如下所示,我正在聚合记录,我认为对于我的用例,只有在将记录保存到 SFTP 后提交偏移量才有意义.

I'm using the camel kafka component and I'm unclear what is happening under the hood with committing the offsets. As can be seen below, I'm aggregating records and I think for my use case that it only makes sense to commit the offsets after the records have been saved to SFTP.

是否可以手动控制何时可以执行提交?

Is it possible to manually control when I can perform the commit?

private static class MyRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        from("kafka:{{mh.topic}}?" + getKafkaConfigString())
        .unmarshal().string()
        .aggregate(constant(true), new MyAggregationStrategy())
            .completionSize(1000)
            .completionTimeout(1000)
        .setHeader("CamelFileName").constant("transactions-" + (new Date()).getTime())
        .to("sftp://" + getSftpConfigString())

        // how to commit offset only after saving messages to SFTP?

        ;
    }

    private final class MyAggregationStrategy implements AggregationStrategy {
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null) {
                return newExchange;
            }
            String oldBody = oldExchange.getIn().getBody(String.class); 
            String newBody = newExchange.getIn().getBody(String.class);
            String body = oldBody + newBody;
            oldExchange.getIn().setBody(body);
            return oldExchange;
        }
    }
}

private static String getSftpConfigString() {
        return "{{sftp.hostname}}/{{sftp.dir}}?"
                + "username={{sftp.username}}"
                + "&password={{sftp.password}}"
                + "&tempPrefix=.temp."
                + "&fileExist=Append"
                ;
}

private static String getKafkaConfigString() {
        return "brokers={{mh.brokers}}" 
            + "&saslMechanism={{mh.saslMechanism}}"  
            + "&securityProtocol={{mh.securityProtocol}}"
            + "&sslProtocol={{mh.sslProtocol}}"
            + "&sslEnabledProtocols={{mh.sslEnabledProtocols}}" 
            + "&sslEndpointAlgorithm={{mh.sslEndpointAlgorithm}}"
            + "&saslJaasConfig={{mh.saslJaasConfig}}" 
            + "&groupId={{mh.groupId}}"
            ;
}

推荐答案

不,你不能.Kafka 每 X 秒在后台执行一次自动提交(您可以配置).

No you cannot. Kafka performs an auto commit in the background every X seconds (you can configure this).

camel-kafka 没有手动提交支持.此外,这也是不可能的,因为聚合器与 kafka 使用者是分离的,并且是执行提交的使用者.

There is no manual commit support in camel-kafka. Also this would not be possible as the aggregator is separated from the kafka consumer, and its the consumer that performs the commit.

这篇关于如何使用camel-kafka手动控制偏移提交?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆