Spring Cloud Stream 主题分区 KStream 读/写 [英] Spring Cloud Stream Topic Partitions KStream Read/Write
问题描述
我有多个微服务并且前端有 API,喜欢在单独的分区上为每个域事件使用相同的主题,我能够使用
I have multiple microservices and fronted with API, like to use same topic for events each domain event on separate partition, i was able to configure spring kafka binder to send to different partition using
spring.cloud.stream.bindings.<channel>.producer.partition-key- extractor-name=
实施 PartitionKeyExtractorStrategy
implementing PartitionKeyExtractorStrategy
我的问题是我可以将 Kstream 绑定器配置为仅对 @input 和 @Output 进行用户分区.
my question here is can i configure Kstream binder to be able to user partition only for @input and @Output.
到目前为止我的理解是
spring.cloud.stream.kafka.streams.bindings.<channel>.producer.configuration.partitioner.class=
但它永远不会被配置.如果有任何其他方式或我犯了错误,请提出建议
but it never get configured. if there is any other way or i am making mistake please suggest
推荐答案
您是否确定性地将记录发送到某个分区?换句话说,您知道每个键的实际分区吗?如果您只提供 PartitionKeyExtractorStrategy
,则绑定器将任意选择一个分区来发送该记录.如果你想让它具有确定性,那么你可以在你的生产者端提供一个 partitionSelectorClass
作为属性(实现接口 PartitionSelectorStrategy
).此界面允许您根据密钥选择分区.假设您想将所有具有密钥 UUID-1 的记录发送到分区 1
,并且您通过 PartitionSelectorStrategy
实现对其进行编码.这意味着您的 kafka 流处理器知道密钥 UUID-1 的记录来自分区 1
.有了这些假设,您可以在 kafka 流处理器中执行以下操作.这基本上是为您的其他问题之一提供的此答案的变体.
Are you deterministically sending the records to a certain partition? In other words, do you know the actual partition for each key? If you only provide the PartitionKeyExtractorStrategy
, then the binder will arbitrarily pick a partition to send that record. If you want to make it deterministic, then you can provide a partitionSelectorClass
as property (Implement the interface PartitionSelectorStrategy
) on your producer side. This interface allows you to select a partition based on the key. Let's say you want to send all the records with key UUID-1 to partition 1
and you coded that through the PartitionSelectorStrategy
implementation. This then means that your kafka streams processor knows that records with key UUID-1 is coming from partition 1
. With these assumptions you can do the following in your kafka streams processor. This is basically a variant of this answer provided for one of your other questions.
@StreamListener("requesti")
@SendTo("responseo")
public KStream<UUID,Account> process(KStream<UUID, Account> events) {
return events.transform(() -> new Transformer<UUID, Account, KeyValue<UUID, Account>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<UUID,Account> transform(UUID key, Account value) {
if (this.context.partition() == 1) {
//your processing logic
return KeyValue.pair(key, value);
}
return null;
}
@Override
public void close() {
}
});
}
通过上面的代码,基本上可以过滤掉transform
方法中所有不相关的分区.仍然存在将出站数据发送到特定分区的问题.如果您按原样使用上述代码,那么活页夹会将数据发送到任意分区(尽管这可能是为活页夹添加的一个很好的功能).但是,如果您希望出站记录落在确定性分区上,您可以在这种情况下直接使用 Kafka Streams.见下文.
With the above code, you can basically filter out all the irrelevant partitions in the transform
method. There is still that problem of sending data on the outbound to a particular partition. If you go with the above code as is, then the binder will send the data to arbitrary partitions (this might be a good feature to add for the binder though). However, you can directly use Kafka Streams in this case if you want the outbound records to land on deterministic partitions. See below.
@StreamListener("requesti")
public void process(KStream<UUID, Account> events) {
final KStream<UUID, Account> transformed = events.transform(() -> new Transformer<UUID, Account, KeyValue<UUID, Account>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<UUID, Account> transform(UUID key, Account value) {
if (this.context.partition() == 1) {
//your processing logic
return KeyValue.pair(key, value);
}
return null;
}
@Override
public void close() {
}
});
transformed.to("outputTopic", Produced.with(new JsonSerde<>(), new JsonSerde<>(), new CustomStreamPartitioner()));
}
class CustomStreamPartitioner implements StreamPartitioner<UUID, Account> {
@Override
public Integer partition(String topic, UUID key, Account value, int numPartitions) {
return 1; //change to the right partition based on the key.
}
}
这篇关于Spring Cloud Stream 主题分区 KStream 读/写的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!