无法反序列化实例Kafka流 [英] Cannot deserialize instance Kafka Streams

查看:103
本文介绍了无法反序列化实例Kafka流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在做什么错,我的以下kafka流程序在传输数据时出现问题,无法从START_ARRAY令牌中反序列化com.kafka.productiontest.models.TimeOff实例".

What am I doing wrong, My below kafka stream program giving issue while streaming the data, "Cannot deserialize instance of com.kafka.productiontest.models.TimeOff out of START_ARRAY token ".

我有一个主题timeOffs2,其中包含具有键timeOffID的超时信息,并且值是包含employeeId的object类型.我只想将所有休假时间归为员工钥匙并写到商店.

I have a topic timeOffs2 which contain time offs information with key timeOffID and value is of type object which contain employeeId. I just want to group all time offs for employee key and write to the store.

对于商店,密钥将为employeeId,值将为超时列表.

For store key will be employeeId and value will be list of timeoffs.

程序属性和流逻辑:

public Properties getKafkaProperties() throws UnknownHostException {

    InetAddress myHost = InetAddress.getLocalHost();

    Properties kafkaStreamProperties = new Properties();
    kafkaStreamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    kafkaStreamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    kafkaStreamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, TimeOffSerde.class);
    kafkaStreamProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    kafkaStreamProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.kafka.productiontest.models.TimeOffSerializer");
    kafkaStreamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, application_id );
    kafkaStreamProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myHost.getHostName() + ":" + port);
    return kafkaStreamProperties;
}



  String topic = "timeOffs2";
StreamsBuilder builder = new StreamsBuilder();

KStream<String, TimeOff> source = builder.stream(topic);

KTable<String, ArrayList<TimeOff>> newStore = source.groupBy((k, v) -> v.getEmployeeId())
    .aggregate(ArrayList::new,
        (key, value, aggregate) -> {
          aggregate.add(value);
          return aggregate;
        }, Materialized.as("NewStore").withValueSerde(TimeOffListSerde(TimeOffSerde)));

final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, getKafkaProperties());

TimeOffSerializer.java

TimeOffSerializer.java

ackage com.kafka.productiontest.models;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class TimeOffSerializer implements Serializer  {

  @Override
  public void configure(Map configs, boolean isKey) {

  }

  @Override
  public byte[] serialize(String topic, Object data) {
    byte[] retVal = null;
    ObjectMapper objectMapper = new ObjectMapper();
    try {
      retVal = objectMapper.writeValueAsString(data).getBytes();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return retVal;
  }

  @Override
  public void close() {
  }
}

TimeOffDeserializer.java

TimeOffDeserializer.java

package com.kafka.productiontest.models;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.kafka.common.serialization.Deserializer ;

import java.util.Map;

public class TimeOffDeserializer implements Deserializer {

  @Override
  public void configure(Map configs, boolean isKey) {

  }
  @Override
  public TimeOff deserialize(String arg0, byte[] arg1) {
    ObjectMapper mapper = new ObjectMapper();
    TimeOff timeOff = null;
    try {
      timeOff = mapper.readValue(arg1, TimeOff.class);
    } catch (Exception e) {
      e.printStackTrace();
    }
    return timeOff;
  }

  @Override
  public void close() {

  }

}

TimeOffSerde.java

TimeOffSerde.java

package com.kafka.productiontest.models;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class TimeOffSerde implements Serde<Object> {

  private final Serde inner;

  public TimeOffSerde(){
    inner = Serdes.serdeFrom(new TimeOffSerializer(), new TimeOffDeserializer());
  }
  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    inner.serializer().configure(configs, isKey);
    inner.deserializer().configure(configs, isKey);
  }

  @Override
  public void close() {
    inner.serializer().close();
    inner.deserializer().close();
  }

  @Override
  public Serializer<Object> serializer() {
    return inner.serializer();
  }

  @Override
  public Deserializer<Object> deserializer() {
    return inner.deserializer();
  }
}

TimeOffListSerializer.java

TimeOffListSerializer.java

package com.kafka.productiontest.models;
import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.sql.Time;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;

public class TimeOffListSerializer implements Serializer<ArrayList<TimeOff>> {

  private Serializer<TimeOff> inner;

  public TimeOffListSerializer(Serializer<TimeOff> inner) {
    this.inner = inner;
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {

  }

  @Override
  public byte[] serialize(String topic, ArrayList<TimeOff> data) {
    final int size = data.size();
    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    final DataOutputStream dos = new DataOutputStream(baos);
    final Iterator<TimeOff> iterator = data.iterator();
    try {
      dos.writeInt(size);
      while (iterator.hasNext()) {
        final byte[] bytes = inner.serialize(topic, iterator.next());
        dos.writeInt(bytes.length);
        dos.write(bytes);
      }

    }catch (Exception ex) {

    }
    return baos.toByteArray();
  }

  @Override
  public void close() {
      inner.close();
  }
}

TimeOffListDeserializer.java

TimeOffListDeserializer.java

package com.kafka.productiontest.models;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;

public class TimeOffListDeserializer  implements Deserializer<ArrayList<TimeOff>> {

  private final Deserializer<TimeOff> valueDeserializer;

  public TimeOffListDeserializer(final Deserializer<TimeOff> valueDeserializer) {
    this.valueDeserializer = valueDeserializer;
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {

  }

  @Override
  public ArrayList<TimeOff> deserialize(String topic, byte[] data)  {
    if (data == null || data.length == 0) {
      return null;
    }

    final ArrayList<TimeOff> arrayList = new ArrayList<>();
    final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(data));

    try {
      final int records = dataInputStream.readInt();
      for (int i = 0; i < records; i++) {
        final byte[] valueBytes = new byte[dataInputStream.readInt()];
        dataInputStream.read(valueBytes);
        arrayList.add(valueDeserializer.deserialize(topic, valueBytes));
      }
    } catch (IOException e) {
      throw new RuntimeException("Unable to deserialize ArrayList", e);
    }
    return arrayList;
  }

  @Override
  public void close() {

  }
}

TimeOffListSerde.java

TimeOffListSerde.java

package com.kafka.productiontest.models;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import java.util.ArrayList;
import java.util.Map;

public class TimeOffListSerde implements Serde<ArrayList<TimeOff>> {
  private Serde<ArrayList<TimeOff>> inner;

  public TimeOffListSerde() {
  }

  public TimeOffListSerde(Serde<TimeOff> serde){
    inner = Serdes.serdeFrom(new TimeOffListSerializer(serde.serializer()), new TimeOffListDeserializer(serde.deserializer()));
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    inner.serializer().configure(configs, isKey);
    inner.deserializer().configure(configs, isKey);
  }

  @Override
  public void close() {
    inner.serializer().close();
    inner.deserializer().close();
  }

  @Override
  public Serializer<ArrayList<TimeOff>> serializer() {
    return inner.serializer();
  }

  @Override
  public Deserializer<ArrayList<TimeOff>> deserializer() {
    return inner.deserializer();
  }
}

我认为withValueSerde是这部分的问题.我无法使用此代码进行编译.但是,如果我删除withValueSerde,它会给我这个问题无法反序列化TimeOff对象".您能帮我指导我做错了什么吗?

I think issue is in this part with withValueSerde. I can not compile with this code. But if I remove withValueSerde, it is giving me this issue "Can not deserialize TimeOff object". Can you please help and guide what I am doing wrong.

KTable<String, ArrayList<TimeOff>> newStore = source.groupBy((k, v) -> v.getEmployeeId())
    .aggregate(ArrayList::new,
        (key, value, aggregate) -> {
          aggregate.add(value);
          return aggregate;
        }, Materialized.as("NewStore").withValueSerde(TimeOffListSerde(TimeOffSerde)));

推荐答案

查看您的代码,我会看到几个问题:

Looking at your code I can see several issues:

  1. TimeOffSerde-应该实现Serde<TimeOff>而不是Serde<Object>
  2. 您没有在Materialized中传递键和值的类型,因此假定它是Object
  1. TimeOffSerde - It should implement Serde<TimeOff> not Serde<Object>
  2. You don't pass types for Key and Value in Materialized, so it assume it is Object

因此,您的流式传输部分应类似于:

So your streaming part should be something like:

KTable<String, ArrayList<TimeOff>> newStore = source.groupBy((k, v) -> v.getEmployeeId())
        .aggregate(ArrayList::new,
                (key, value, aggregate) -> {
                    aggregate.add(value);
                    return aggregate;
                }, Materialized.<String, ArrayList<TimeOff>, KeyValueStore<Bytes, byte[]>>as("NewStore").withValueSerde(new TimeOffListSerde(new TimeOffSerde())));

注意:在修改后退回以清除状态存储目录.

NOTICE: Rember to clear state store directory after modification.

这篇关于无法反序列化实例Kafka流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆