如何使用camel-kafka手动控制偏移量提交? [英] How to manually control the offset commit with camel-kafka?

查看:114
本文介绍了如何使用camel-kafka手动控制偏移量提交?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用骆驼kafka组件,不清楚在提交偏移量的情况下到底发生了什么.如下所示,我正在汇总记录,并且我认为对于我的用例,只有在将记录保存到SFTP之后提交偏移量才有意义.

I'm using the camel kafka component and I'm unclear what is happening under the hood with committing the offsets. As can be seen below, I'm aggregating records and I think for my use case that it only makes sense to commit the offsets after the records have been saved to SFTP.

是否可以手动控制何时可以执行提交?

Is it possible to manually control when I can perform the commit?

private static class MyRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        from("kafka:{{mh.topic}}?" + getKafkaConfigString())
        .unmarshal().string()
        .aggregate(constant(true), new MyAggregationStrategy())
            .completionSize(1000)
            .completionTimeout(1000)
        .setHeader("CamelFileName").constant("transactions-" + (new Date()).getTime())
        .to("sftp://" + getSftpConfigString())

        // how to commit offset only after saving messages to SFTP?

        ;
    }

    private final class MyAggregationStrategy implements AggregationStrategy {
        @Override
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null) {
                return newExchange;
            }
            String oldBody = oldExchange.getIn().getBody(String.class); 
            String newBody = newExchange.getIn().getBody(String.class);
            String body = oldBody + newBody;
            oldExchange.getIn().setBody(body);
            return oldExchange;
        }
    }
}

private static String getSftpConfigString() {
        return "{{sftp.hostname}}/{{sftp.dir}}?"
                + "username={{sftp.username}}"
                + "&password={{sftp.password}}"
                + "&tempPrefix=.temp."
                + "&fileExist=Append"
                ;
}

private static String getKafkaConfigString() {
        return "brokers={{mh.brokers}}" 
            + "&saslMechanism={{mh.saslMechanism}}"  
            + "&securityProtocol={{mh.securityProtocol}}"
            + "&sslProtocol={{mh.sslProtocol}}"
            + "&sslEnabledProtocols={{mh.sslEnabledProtocols}}" 
            + "&sslEndpointAlgorithm={{mh.sslEndpointAlgorithm}}"
            + "&saslJaasConfig={{mh.saslJaasConfig}}" 
            + "&groupId={{mh.groupId}}"
            ;
}

推荐答案

不,您不能. Kafka每隔X秒在后台执行一次自动提交(您可以配置它).

No you cannot. Kafka performs an auto commit in the background every X seconds (you can configure this).

camel-kafka中没有手动提交支持.另外,由于聚合器与kafka使用者及其执行提交的使用者分开,因此这是不可能的.

There is no manual commit support in camel-kafka. Also this would not be possible as the aggregator is separated from the kafka consumer, and its the consumer that performs the commit.

这篇关于如何使用camel-kafka手动控制偏移量提交?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆