如何在Kafka流中将自定义序列化程序与KTable一起使用? [英] How to use custom serializers with KTable in Kafka streams?

查看:124
本文介绍了如何在Kafka流中将自定义序列化程序与KTable一起使用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我将 groupBy 引入我的 KStream 时,在将 KStream 序列化为 KTable 的过程中,我遇到一些错误..据我了解,一旦在 KStream 上具有 aggregate reduce ,Kafka就会尝试将其转换为 KTable 由于必须进行洗牌,因此Kafka必须再次对记录进行序列化.因此,我最初的 KStream 只是将记录从 JSON 映射到 AVRO ,并且运行良好.

I am facing some errors during serialization of KStream to KTable when I introduced the groupBy to my KStream. As I understood, once you have an aggregate or reduce on a KStream, Kafka tries to transform it to a KTable due to necessary shuffle and because of this Kafka has to serialize the records again. So, my original KStream was just mapping the records from JSON to AVRO like this, and it is working fine.

    @StreamListener("notification-input-channel")
    @SendTo("notification-output-avro-channel")
    public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
        log.info("received PosInvoice JSON: {}", input);
        KStream<String, NotificationAvro> notificationAvroKStream = input
                .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
                .mapValues(v -> recordBuilder.getNotificationAvro(v));
        notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));
        return notificationAvroKStream;
    }

然后我介绍了 groupByKey reduce ,我意识到它可以转换为 KTable ,因此需要 Serdes application.yaml 文件上的code>.但是很遗憾,我无法配置默认的 Serdes ,因为我还有其他类型的序列化.因此,我决定在 KTable 拓扑上进行序列化.我正在尝试根据此答案 .

then I introduced the groupByKey and reduce and I realized that it transforms to a KTable and hence it needed Serdes on the application.yaml file. But unfortunately I cannot configure the default Serdes because I have other types of serialization. Hence I decided to serialize on the KTable topology. I am trying to implement this solution based on this answer.

我尝试使用自定义Serdes实现的部分代码无法正常工作( Materialized.with(CustomSerdes.String(),CustomSerdes.NotificationAvro())).首先,我认为我不需要 KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore"); ,但如果没有此方法,它也将无法正常工作,而且我无法找到不是KeyValueBytes ... ,我可以在其中定义我的Serdes CustomSerdes.String(),CustomSerdes.NotificationAvro().

The part of the code that I try to materialize with my custom serdes is not working properly (Materialized.with(CustomSerdes.String(), CustomSerdes.NotificationAvro())). First, I don't think I need KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");, but without this it also does not work and I cannot find a materialized that is not KeyValueBytes... where I can define my serdes CustomSerdes.String(), CustomSerdes.NotificationAvro().

根据我在链接上提到的答案,他们还使用了 final StreamsBuilder builder = new StreamsBuilder(); .但是由于我是使用 spring-kafka 计算它的,所以我没有此选项,或者如果我有,我不知道如何使用.

According to the answer that I mentioned on the link, they also use a final StreamsBuilder builder = new StreamsBuilder();. But since I am computing it using spring-kafka I don't have this option, or if I have I don't know how to use.

@Service
@Slf4j
@EnableBinding(PosListenerJsonAvroBinding.class)
public class NotificationJsonAvroProcessorService {
    @Autowired
    RecordBuilder recordBuilder;

    @StreamListener("notification-input-channel")
    @SendTo("notification-output-avro-channel")
    public KStream<String, NotificationAvro> process(KStream<String, PosInvoice> input) {
        log.info("received PosInvoice JSON: {}", input);
        KStream<String, NotificationAvro> notificationAvroKStream = input
                .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
                .map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)));
        notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro - key: %s, value: %s", k, v)));

        // *********************************************
        // IS THERE A KeyValueStoreSupplier THAT I CAN PASS ALSO MY SERDES INSTEAD OF Bytes?
        // KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
        KTable<String, NotificationAvro> convertedTable = notificationAvroKStream
                .toTable(
                        // *********************************************
                        // HOW TO MATERIALIZE KTABLE VALUES WITH SERDES ?
                        Materialized
                                // .as(storeSupplier) // this is not necessary
                                .with(CustomSerdes.String(), CustomSerdes.NotificationAvro())
                        // *********************************************
                )
                .groupBy((cardNo, notificationAvro) -> KeyValue.pair(cardNo, notificationAvro))
                .reduce(
                        (aggValue, newValue) -> {
                            newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
                            return newValue;
                        },
                        (aggValue, oldValue) -> oldValue
                );
        KStream<String, NotificationAvro> notificationAggAvroKStream = convertedTable.toStream();
        notificationAggAvroKStream.foreach((k, v) -> log.info(String.format("Notification agg avro - key: %s, value: %s", k, v)));

        return notificationAggAvroKStream;
    }
}

自定义SERDES:

@Service
public class CustomSerdes extends Serdes {
    private static final String schema_registry_url = "http://localhost:8081";
    private final static Map<String, String> serdeConfig = Collections
            .singletonMap("schema.registry.url", schema_registry_url);
    public static Serde<NotificationAvro> NotificationAvro() {
        final Serde<NotificationAvro> notificationAvroSerde = new SpecificAvroSerde<>();
        notificationAvroSerde.configure(serdeConfig, false);
        return notificationAvroSerde;
    }
}

和错误:

线程异常"NotificationJsonAvroProcessorService-process-applicationId-3e262d96-19ca-438d-a2b8-9d3c2e9bb4ab-StreamThread-1"org.apache.kafka.streams.errors.StreamsException:ClassCastException在生成主题数据时NotificationJsonAvroProcessorService-process-applicationId-KTABLE-AGGREGATE-STATE-STORE-0000000010分区.序列化器(键:org.apache.kafka.common.serialization.StringSerializer/值:org.apache.kafka.streams.kstream.internals.ChangedSerializer)不是与实际的键或值类型兼容(键类型:java.lang.String/值类型:org.apache.kafka.streams.kstream.internals.Change).更改StreamConfig中的默认Serdes或提供正确的Serdes通过方法参数(例如,如果使用DSL, #to(String主题,生产的生产的) Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))).......引起原因:java.lang.ClassCastException:classcom.github.felipegutierrez.explore.spring.model.NotificationAvro不能强制转换为类java.lang.String(com.github.felipegutierrez.explore.spring.model.NotificationAvro是在加载程序"app"的未命名模块中;java.lang.String在模块中加载程序'bootstrap'的java.base)

Exception in thread "NotificationJsonAvroProcessorService-process-applicationId-3e262d96-19ca-438d-a2b8-9d3c2e9bb4ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic NotificationJsonAvroProcessorService-process-applicationId-KTABLE-AGGREGATE-STATE-STORE-0000000010-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.streams.kstream.internals.ChangedSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: org.apache.kafka.streams.kstream.internals.Change). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, #to(String topic, Produced<K, V> produced) with Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))). ... ... Caused by: java.lang.ClassCastException: class com.github.felipegutierrez.explore.spring.model.NotificationAvro cannot be cast to class java.lang.String (com.github.felipegutierrez.explore.spring.model.NotificationAvro is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

推荐答案

因此,我通过阅读此答案来解决,使用不推荐使用的 .groupByKey(...) Serialized.with(...).我正在使用 Grouped.with(CustomSerdes.String(),CustomSerdes.NotificationAvro()).

So, I solved through reading this answer, which is using the .groupByKey(...) and Serialized.with(...) that are deprecated. I am using the Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()).

KStream<String, NotificationAvro> notificationAvroKStream = input
     .filter((k, v) -> v.getCustomerType().equalsIgnoreCase(PRIME))
     .map((k, v) -> new KeyValue<>(v.getCustomerCardNo(), recordBuilder.getNotificationAvro(v)))
     .groupByKey(Grouped.with(CustomSerdes.String(), CustomSerdes.NotificationAvro()))
     .reduce((aggValue, newValue) -> {
          newValue.setTotalLoyaltyPoints(newValue.getEarnedLoyaltyPoints() + aggValue.getTotalLoyaltyPoints());
          return newValue;
     })
     .toStream();
notificationAvroKStream.foreach((k, v) -> log.info(String.format("Notification avro agg - key: %s, value: %s", k, v)));
return notificationAvroKStream;

这篇关于如何在Kafka流中将自定义序列化程序与KTable一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆