如何在Kafka流中将自定义序列化程序与KTable一起使用? [英] How to use custom serializers with KTable in Kafka streams?
问题描述
当我将 groupBy
引入我的 KStream
时,在将 KStream
序列化为 KTable
的过程中,我遇到一些错误..据我了解,一旦在 KStream
上具有 aggregate
或 reduce
,Kafka就会尝试将其转换为 KTable
由于必须进行洗牌,因此Kafka必须再次对记录进行序列化.因此,我最初的 KStream
只是将记录从 JSON
映射到 AVRO
,并且运行良好.
I am facing some errors during serialization of KStream
to KTable
when I introduced the groupBy
to my KStream
. As I understood, once you have an aggregate
or reduce
on a KStream
, Kafka tries to transform it to a KTable
due to necessary shuffle and because of this Kafka has to serialize the records again. So, my original KStream
was just mapping the records from JSON
to AVRO
like this, and it is working fine.
@StreamListener("notification-input-channel")
@SendTo("notification-output-avro-channel")
public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
log.info("received PosInvoice JSON: {}", input);
KStream<String, NotificationAvro> notificationAvroKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.mapValues(v -> recordBuilder.getNotificationAvro(v));
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
return notificationAvroKStream;
}
然后我介绍了 groupByKey
和 reduce
,我意识到它可以转换为 KTable
,因此需要 Serdes
application.yaml
文件上的code>.但是很遗憾,我无法配置默认的 Serdes
,因为我还有其他类型的序列化.因此,我决定在 KTable
拓扑上进行序列化.我正在尝试根据此答案 .
then I introduced the groupByKey
and reduce
and I realized that it transforms to a KTable
and hence it needed Serdes
on the application.yaml
file. But unfortunately I cannot configure the default Serdes
because I have other types of serialization. Hence I decided to serialize on the KTable
topology. I am trying to implement this solution based on this answer.
我尝试使用自定义Serdes实现的部分代码无法正常工作( Materialized.with(CustomSerdes.String(),CustomSerdes.NotificationAvro())
).首先,我认为我不需要 KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
,但如果没有此方法,它也将无法正常工作,而且我无法找到不是KeyValueBytes ...
,我可以在其中定义我的Serdes CustomSerdes.String(),CustomSerdes.NotificationAvro()
.
The part of the code that I try to materialize with my custom serdes is not working properly (Materialized.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
). First, I don't think I need KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
, but without this it also does not work and I cannot find a materialized that is not KeyValueBytes...
where I can define my serdes CustomSerdes.String(), CustomSerdes.NotificationAvro()
.
根据我在链接上提到的答案,他们还使用了 final StreamsBuilder builder = new StreamsBuilder();
.但是由于我是使用 spring-kafka
计算它的,所以我没有此选项,或者如果我有,我不知道如何使用.
According to the answer that I mentioned on the link, they also use a final StreamsBuilder builder = new StreamsBuilder();
. But since I am computing it using spring-kafka
I don't have this option, or if I have I don't know how to use.
@Service
@Slf4j
@EnableBinding(PosListenerJsonAvroBinding.class)
public class NotificationJsonAvroProcessorService {
@Autowired
RecordBuilder recordBuilder;
@StreamListener("notification-input-channel")
@SendTo("notification-output-avro-channel")
public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
log.info("received PosInvoice JSON: {}", input);
KStream<String, NotificationAvro> notificationAvroKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)));
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
// *********************************************
// IS THERE A KeyValueStoreSupplier THAT I CAN PASS ALSO MY SERDES INSTEAD OF Bytes?
// KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
KTable<String, NotificationAvro> convertedTable = notificationAvroKStream
.toTable(
// *********************************************
// HOW TO MATERIALIZE KTABLE VALUES WITH SERDES ?
Materialized
// .as(storeSupplier) // this is not necessary
.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
// *********************************************
)
.groupBy((cardNo, notificationAvro) -> KeyValue.pair(cardNo, notificationAvro))
.reduce(
(aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
},
(aggValue, oldValue) -> oldValue
);
KStream<String, NotificationAvro> notificationAggAvroKStream = convertedTable.toStream();
notificationAggAvroKStream.foreach((k, v) -> log.info(String.format("Notification agg avro - key: %s, value: %s", k, v)));
return notificationAggAvroKStream;
}
}
自定义SERDES:
@Service
public class CustomSerdes extends Serdes {
private static final String schema_registry_url = "http://localhost:8081";
private final static Map<String, String> serdeConfig = Collections
.singletonMap("schema.registry.url", schema_registry_url);
public static Serde<NotificationAvro> NotificationAvro() {
final Serde<NotificationAvro> notificationAvroSerde = new SpecificAvroSerde<>();
notificationAvroSerde.configure(serdeConfig, false);
return notificationAvroSerde;
}
}
和错误:
线程异常"NotificationJsonAvroProcessorService-process-applicationId-3e262d96-19ca-438d-a2b8-9d3c2e9bb4ab-StreamThread-1"org.apache.kafka.streams.errors.StreamsException:ClassCastException在生成主题数据时NotificationJsonAvroProcessorService-process-applicationId-KTABLE-AGGREGATE-STATE-STORE-0000000010分区.序列化器(键:org.apache.kafka.common.serialization.StringSerializer/值:org.apache.kafka.streams.kstream.internals.ChangedSerializer)不是与实际的键或值类型兼容(键类型:java.lang.String/值类型:org.apache.kafka.streams.kstream.internals.Change).更改StreamConfig中的默认Serdes或提供正确的Serdes通过方法参数(例如,如果使用DSL,
#to(String主题,生产的
生产的) Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))
).......引起原因:java.lang.ClassCastException:classcom.github.felipegutierrez.explore.spring.model.NotificationAvro不能强制转换为类java.lang.String(com.github.felipegutierrez.explore.spring.model.NotificationAvro是在加载程序"app"的未命名模块中;java.lang.String在模块中加载程序'bootstrap'的java.base)
Exception in thread "NotificationJsonAvroProcessorService-process-applicationId-3e262d96-19ca-438d-a2b8-9d3c2e9bb4ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic NotificationJsonAvroProcessorService-process-applicationId-KTABLE-AGGREGATE-STATE-STORE-0000000010-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.streams.kstream.internals.ChangedSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.kafka.streams.kstream.internals.Change). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL,
#to(String topic, Produced<K, V> produced)
withProduced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))
). ... ... Caused by: java.lang.ClassCastException: class com.github.felipegutierrez.explore.spring.model.NotificationAvro cannot be cast to class java.lang.String (com.github.felipegutierrez.explore.spring.model.NotificationAvro is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
推荐答案
因此,我通过阅读此答案来解决,使用不推荐使用的 .groupByKey(...)
和 Serialized.with(...)
.我正在使用 Grouped.with(CustomSerdes.String(),CustomSerdes.NotificationAvro())
.
So, I solved through reading this answer, which is using the .groupByKey(...)
and Serialized.with(...)
that are deprecated. I am using the Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
.
KStream<String, NotificationAvro> notificationAvroKStream = input
.filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
.map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)))
.groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()))
.reduce((aggValue, newValue) -> {
newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
return newValue;
})
.toStream();
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro agg - key: %s, value: %s", k, v)));
return notificationAvroKStream;
这篇关于如何在Kafka流中将自定义序列化程序与KTable一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!