Spring Cloud Stream 主题分区 KStream 读/写 [英] Spring Cloud Stream Topic Partitions KStream Read/Write

查看:49
本文介绍了Spring Cloud Stream 主题分区 KStream 读/写的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有多个微服务并且前端有 API,喜欢在单独的分区上为每个域事件使用相同的主题,我能够使用

I have multiple microservices and fronted with API, like to use same topic for events each domain event on separate partition, i was able to configure spring kafka binder to send to different partition using

spring.cloud.stream.bindings.<channel>.producer.partition-key- extractor-name= 

实施 PartitionKeyExtractorStrategy

implementing PartitionKeyExtractorStrategy

我的问题是我可以将 Kstream 绑定器配置为仅对 @input 和 @Output 进行用户分区.

my question here is can i configure Kstream binder to be able to user partition only for @input and @Output.

到目前为止我的理解是

spring.cloud.stream.kafka.streams.bindings.<channel>.producer.configuration.partitioner.class=

但它永远不会被配置.如果有任何其他方式或我犯了错误,请提出建议

but it never get configured. if there is any other way or i am making mistake please suggest

推荐答案

您是否确定性地将记录发送到某个分区?换句话说,您知道每个键的实际分区吗?如果您只提供 PartitionKeyExtractorStrategy,则绑定器将任意选择一个分区来发送该记录.如果你想让它具有确定性,那么你可以在你的生产者端提供一个 partitionSelectorClass 作为属性(实现接口 PartitionSelectorStrategy).此界面允许您根据密钥选择分区.假设您想将所有具有密钥 UUID-1 的记录发送到分区 1,并且您通过 PartitionSelectorStrategy 实现对其进行编码.这意味着您的 kafka 流处理器知道密钥 UUID-1 的记录来自分区 1.有了这些假设,您可以在 kafka 流处理器中执行以下操作.这基本上是为您的其他问题之一提供的此答案的变体.

Are you deterministically sending the records to a certain partition? In other words, do you know the actual partition for each key? If you only provide the PartitionKeyExtractorStrategy, then the binder will arbitrarily pick a partition to send that record. If you want to make it deterministic, then you can provide a partitionSelectorClass as property (Implement the interface PartitionSelectorStrategy) on your producer side. This interface allows you to select a partition based on the key. Let's say you want to send all the records with key UUID-1 to partition 1and you coded that through the PartitionSelectorStrategy implementation. This then means that your kafka streams processor knows that records with key UUID-1 is coming from partition 1. With these assumptions you can do the following in your kafka streams processor. This is basically a variant of this answer provided for one of your other questions.

@StreamListener("requesti")
@SendTo("responseo")
public KStream<UUID,Account> process(KStream<UUID, Account> events) {


        return  events.transform(() -> new Transformer<UUID, Account, KeyValue<UUID, Account>>() {
            ProcessorContext context;

            @Override
            public void init(ProcessorContext context) {
                this.context = context;
            }

            @Override
            public KeyValue<UUID,Account> transform(UUID key, Account value) {
                if (this.context.partition() == 1) {
                    //your processing logic
                    return KeyValue.pair(key, value);
                }
                return null;
            }

            @Override
            public void close() {

            }
        });
    }

通过上面的代码,基本上可以过滤掉transform方法中所有不相关的分区.仍然存在将出站数据发送到特定分区的问题.如果您按原样使用上述代码,那么活页夹会将数据发送到任意分区(尽管这可能是为活页夹添加的一个很好的功能).但是,如果您希望出站记录落在确定性分区上,您可以在这种情况下直接使用 Kafka Streams.见下文.

With the above code, you can basically filter out all the irrelevant partitions in the transform method. There is still that problem of sending data on the outbound to a particular partition. If you go with the above code as is, then the binder will send the data to arbitrary partitions (this might be a good feature to add for the binder though). However, you can directly use Kafka Streams in this case if you want the outbound records to land on deterministic partitions. See below.

@StreamListener("requesti")
public void process(KStream<UUID, Account> events) {


    final KStream<UUID, Account> transformed = events.transform(() -> new Transformer<UUID, Account, KeyValue<UUID, Account>>() {
            ProcessorContext context;


            @Override
            public void init(ProcessorContext context) {
                this.context = context;
            }

            @Override
            public KeyValue<UUID, Account> transform(UUID key, Account value) {
                if (this.context.partition() == 1) {
                    //your processing logic
                    return KeyValue.pair(key, value);
                }
                return null;
            }

            @Override
            public void close() {

            }
        });


        transformed.to("outputTopic", Produced.with(new JsonSerde<>(), new JsonSerde<>(), new CustomStreamPartitioner()));
    }

    class CustomStreamPartitioner implements StreamPartitioner<UUID, Account> {

        @Override
        public Integer partition(String topic, UUID key, Account value, int numPartitions) {
            return 1; //change to the right partition based on the key.
        }
    }

这篇关于Spring Cloud Stream 主题分区 KStream 读/写的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆