创建一个 Kafka 聚合器并加入一个事件 [英] Creating a Kafka aggregator and joining it with an event

查看:29
本文介绍了创建一个 Kafka 聚合器并加入一个事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个聚合器,我可以在其中侦听多条记录并将它们合并为一个.合并后,我通过在 listen() 方法中加入流和聚合应用程序来等待进程事件.流程事件到达时,会触发一些业务逻辑.我在一个 Spring Boot 项目中同时定义了聚合器和进程监听器.

I am trying to create an aggregator wherein I listen for multiple records and consolidate them into one. After consolidation, I wait for a process event by joining a stream and aggregated application in listen() method. On arrival of the process event, some business logic is triggered. I have defined both aggregator and process listener in a single spring boot project.

@Bean
    public Function<KStream<FormUUID, FormData>, KStream<UUID, Application>> process()
    {
        return formEvent -> formEvent.groupByKey()
                .reduce((k, v) -> v)
                .toStream()
                .selectKey((k, v) -> k.getReferenceNo())
                .groupByKey()
                .aggregate(Application::new, (key, value, aggr) -> aggr.performAggregate(value),
                        Materialized.<UUID, Application, KeyValueStore<Bytes, byte[]>> as("appStore")
                                .withKeySerde(new JsonSerde<>(UUID.class))
                                .withValueSerde(new JsonSerde<>(Application.class)))
                .toStream();
    }

    @Bean
    public BiConsumer<KStream<String, ProcessEvent>, KTable<String, Application>> listen()
    {

        return (eventStream, appTable) -> 
        {
            eventStream.join(appTable, (event, app) -> app)
                    .foreach((k, app) -> app.createQuote());
        };

    }

但是,现在我面临着 SerializationException.第一部分(聚合)工作正常,但连接失败,异常

However, now I am facing SerializationException. The first part(aggregation) works fine however the join is failing with exception

java.lang.ClassCastException: com.xxxxx.datamapper.domain.FormData cannot be cast to com.xxxxx.datamapper.domain.Application
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42) ~[kafka-streams-2.3.1.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117) ~[kafka-streams-2.3.1.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.3.1.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.3.1.jar:?]

<小时>

org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store APPLICATION_TOPIC-STATE-STORE-0000000001
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:280) ~[kafka-streams-2.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) ~[kafka-streams-2.3.1.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:519) ~[kafka-streams-2.3.1.jar:?]

我认为,问题出在我的 application.yml 中.由于spring.json.key.default.type"属性被设置为 FormUUID,因此同样用于监听方法中的 Application 对象.我想在 application.yml 中配置剩余类型 UUID、Application 和 ProcessEvent 的类型.但不确定如何为定义的每个消费者和生产者配置映射类型.

I think, the problem is in my application.yml. Since the "spring.json.key.default.type" property is set as FormUUID the same is being used for Application object present in listen method. I want to configure the type for remaining types UUID, Application and ProcessEvent in my application.yml. but not sure how to configure the mapping type for each consumer and producer defined.

spring.cloud:
 function.definition: process;listen
 stream:
  kafka.streams:
    bindings:
      process-in-0.consumer.application-id: form-aggregator
      listen-in-0.consumer.application-id: event-processor
      listen-in-1.consumer.application-id: event-processor
    binder.configuration:
      default.key.serde: org.springframework.kafka.support.serializer.JsonSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.key.default.type: com.xxxx.datamapper.domain.FormUUID
      spring.json.value.default.type: com.xxxx.datamapper.domain.FormData
      commit.interval.ms: 1000
  bindings:
    process-in-0.destination: FORM_DATA_TOPIC
    process-out-0.destination: APPLICATION_TOPIC
    listen-in-0.destination: PROCESS_TOPIC
    listen-in-1: 
      destination: APPLICATION_TOPIC
      consumer:
       useNativeDecoding: true

推荐答案

如果您使用的是最新 Horsham 版本的 Spring Cloud Stream Kafka Streams binder,则无需为入站和出站设置任何显式 Serdes.但是,您仍然需要在 Kafka Streams API 需要它们的任何地方提供它们,就像上面的聚合方法调用一样.如果您在第二个处理器的入站中遇到此序列化错误,我建议尝试从配置中删除所有 Serdes.您可以简化如下(假设您使用的是最新的 Horsham 版本).活页夹将推断在入站/出站中使用的正确 Serdes.将其委托给绑定器的一个好处是您不需要通过配置提供任何显式的键/值类型,因为绑定器会针对这些类型进行自省.确保您使用的 POJO 类型是 JSON 友好的.看看这是否有效.如果您仍然遇到问题,请创建一个小示例应用程序,我们可以在其中重现问题,我们将查看.

If you are using the latest Horsham versions of Spring Cloud Stream Kafka Streams binder, you do not need to set any explicit Serdes for inbound and outbound. However, you still need to provide them wherever the Kafka Streams API requires them, as in the case of your aggregate method call above. If you are facing this serialization error on the inbound of the second processor, I suggest trying to remove all Serdes from the configuration. You can simplify as it below (given that you are on the latest Horsham release). The binder will infer the correct Serdes to use on the inbound/outbound. One benefit of delegating this to the binder is that you don't need to provide any explicit key/value types through configuration because the binder will introspect for the types. Make sure your POJO types that you are using are JSON friendly. See if that works. If you are still having issues, please create a small sample application where we can reproduce the issue and we will take a look.

spring.cloud:
 function.definition: process;listen
 stream:
  kafka.streams:
    bindings:
      process-in-0.consumer.application-id: form-aggregator
      listen-in-0.consumer.application-id: event-processor
      listen-in-1.consumer.application-id: event-processor
    binder.configuration:
      commit.interval.ms: 1000
  bindings:
    process-in-0.destination: FORM_DATA_TOPIC
    process-out-0.destination: APPLICATION_TOPIC
    listen-in-0.destination: PROCESS_TOPIC
    listen-in-1.destination: APPLICATION_TOPIC

这篇关于创建一个 Kafka 聚合器并加入一个事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆