如何在 Kafka 流中使用带有 KTable 的自定义序列化程序? [英] How to use custom serializers with KTable in Kafka streams?

查看:19
本文介绍了如何在 Kafka 流中使用带有 KTable 的自定义序列化程序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我将 groupBy 引入我的 KStream 时,我在将 KStream 序列化为 KTable 的过程中遇到了一些错误.据我了解,一旦您在 KStream 上拥有 aggregatereduce,Kafka 会尝试将其转换为 KTable> 由于必要的 shuffle,因此 Kafka 必须再次序列化记录.所以,我原来的 KStream 只是像这样将记录从 JSON 映射到 AVRO,它工作正常.

I am facing some errors during serialization of KStream to KTable when I introduced the groupBy to my KStream. As I understood, once you have an aggregate or reduce on a KStream, Kafka tries to transform it to a KTable due to necessary shuffle and because of this Kafka has to serialize the records again. So, my original KStream was just mapping the records from JSON to AVRO like this, and it is working fine.

    @StreamListener("notification-input-channel")
    @SendTo("notification-output-avro-channel")
    public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
        log.info("received PosInvoice JSON: {}", input);
        KStream<String, NotificationAvro> notificationAvroKStream = input
                .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
                .mapValues(v -> recordBuilder.getNotificationAvro(v));
        notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
        return notificationAvroKStream;
    }

然后我引入了 groupByKeyreduce 并且我意识到它转换为 KTable 并且因此它需要 Serdesapplication.yaml 文件中的 code>.但不幸的是我无法配置默认的 Serdes 因为我有其他类型的序列化.因此我决定在 KTable 拓扑上进行序列化.我正在尝试根据此答案实施此解决方案.

then I introduced the groupByKey and reduce and I realized that it transforms to a KTable and hence it needed Serdes on the application.yaml file. But unfortunately I cannot configure the default Serdes because I have other types of serialization. Hence I decided to serialize on the KTable topology. I am trying to implement this solution based on this answer.

我尝试使用自定义 serdes 实现的代码部分无法正常工作(Materialized.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())).首先,我认为我不需要 KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");,但没有这个它也不起作用,我找不到不是 的物化KeyValueBytes... 在这里我可以定义我的 serdes CustomSerdes.String(), CustomSerdes.NotificationAvro().

The part of the code that I try to materialize with my custom serdes is not working properly (Materialized.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())). First, I don't think I need KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");, but without this it also does not work and I cannot find a materialized that is not KeyValueBytes... where I can define my serdes CustomSerdes.String(), CustomSerdes.NotificationAvro().

根据我在链接中提到的答案,他们还使用了 final StreamsBuilder builder = new StreamsBuilder();.但是由于我使用 spring-kafka 计算它,所以我没有这个选项,或者如果我有我不知道如何使用.

According to the answer that I mentioned on the link, they also use a final StreamsBuilder builder = new StreamsBuilder();. But since I am computing it using spring-kafka I don't have this option, or if I have I don't know how to use.

@Service
@Slf4j
@EnableBinding(PosListenerJsonAvroBinding.class)
public class NotificationJsonAvroProcessorService {
    @Autowired
    RecordBuilder recordBuilder;

    @StreamListener("notification-input-channel")
    @SendTo("notification-output-avro-channel")
    public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
        log.info("received PosInvoice JSON: {}", input);
        KStream<String, NotificationAvro> notificationAvroKStream = input
                .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
                .map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)));
        notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));

        // *********************************************
        // IS THERE A KeyValueStoreSupplier THAT I CAN PASS ALSO MY SERDES INSTEAD OF Bytes?
        // KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
        KTable<String, NotificationAvro> convertedTable = notificationAvroKStream
                .toTable(
                        // *********************************************
                        // HOW TO MATERIALIZE KTABLE VALUES WITH SERDES ?
                        Materialized
                                // .as(storeSupplier) // this is not necessary
                                .with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
                        // *********************************************
                )
                .groupBy((cardNo, notificationAvro) -> KeyValue.pair(cardNo, notificationAvro))
                .reduce(
                        (aggValue, newValue) -> {
                            newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
                            return newValue;
                        },
                        (aggValue, oldValue) -> oldValue
                );
        KStream<String, NotificationAvro> notificationAggAvroKStream = convertedTable.toStream();
        notificationAggAvroKStream.foreach((k, v) -> log.info(String.format("Notification agg avro - key: %s, value: %s", k, v)));

        return notificationAggAvroKStream;
    }
}

自定义 serdes:

the custom serdes:

@Service
public class CustomSerdes extends Serdes {
    private static final String schema_registry_url = "http://localhost:8081";
    private final static Map<String, String> serdeConfig = Collections
            .singletonMap("schema.registry.url", schema_registry_url);
    public static Serde<NotificationAvro> NotificationAvro() {
        final Serde<NotificationAvro> notificationAvroSerde = new SpecificAvroSerde<>();
        notificationAvroSerde.configure(serdeConfig, false);
        return notificationAvroSerde;
    }
}

和错误:

线程异常NotificationJsonAvroProcessorService-process-applicationId-3e262d96-19ca-438d-a2b8-9d3c2e9bb4ab-StreamThread-1"org.apache.kafka.streams.errors.StreamsException: ClassCastException同时为主题生成数据NotificationJsonAvroProcessorService-process-applicationId-KTABLE-AGGREGATE-STATE-STORE-0000000010-repartition.序列化程序(关键:org.apache.kafka.common.serialization.StringSerializer/值:org.apache.kafka.streams.kstream.internals.ChangedSerializer) 不是与实际的键或值类型兼容(键类型:java.lang.String/值类型:org.apache.kafka.streams.kstream.internals.Change).更改 StreamConfig 中的默认 Serdes 或提供正确的 Serdes通过方法参数(例如,如果使用 DSL,#to(String topic, Produced<K, V>生产)Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))).......引起:java.lang.ClassCastException: classcom.github.felipeguterrez.explore.spring.model.NotificationAvro不能转换为 java.lang.String 类(com.github.felipegutierrez.explore.spring.model.NotificationAvro 是在加载程序app"的未命名模块中;java.lang.String 在模块中java.base of loader 'bootstrap')

Exception in thread "NotificationJsonAvroProcessorService-process-applicationId-3e262d96-19ca-438d-a2b8-9d3c2e9bb4ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic NotificationJsonAvroProcessorService-process-applicationId-KTABLE-AGGREGATE-STATE-STORE-0000000010-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.streams.kstream.internals.ChangedSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.kafka.streams.kstream.internals.Change). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, #to(String topic, Produced<K, V> produced) with Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))). ... ... Caused by: java.lang.ClassCastException: class com.github.felipegutierrez.explore.spring.model.NotificationAvro cannot be cast to class java.lang.String (com.github.felipegutierrez.explore.spring.model.NotificationAvro is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

推荐答案

所以,我通过阅读这个答案解决了,使用已弃用的 .groupByKey(...)Serialized.with(...) .我正在使用 Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()).

So, I solved through reading this answer, which is using the .groupByKey(...) and Serialized.with(...) that are deprecated. I am using the Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()).

KStream<String, NotificationAvro> notificationAvroKStream = input
     .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
     .map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)))
     .groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()))
     .reduce((aggValue, newValue) -> {
          newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
          return newValue;
     })
     .toStream();
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro agg - key: %s, value: %s", k, v)));
return notificationAvroKStream;

这篇关于如何在 Kafka 流中使用带有 KTable 的自定义序列化程序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆