如何使用Apache Beam反序列化Kafka AVRO消息 [英] How to Deserialising Kafka AVRO messages using Apache Beam

查看:172
本文介绍了如何使用Apache Beam反序列化Kafka AVRO消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

主要目标是汇总两个Kafka主题,一个主题是压缩的慢速移动数据,另一个主题是每秒接收的快速移动数据.

The main goal is the aggregate two Kafka topics, one compacted slow moving data and the other fast moving data which is received every second.

我已经能够在诸如KV(长字符串)之类的简单方案中使用以下消息来消费消息:

I have been able to consume messages in simple scenarios such as a KV (Long,String) using something like:

PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long, 
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)

PCollection<String> output = input.apply(Values.<String>create());

但是,当您需要从AVRO反序列化时,这似乎不是这种方法.我有一个需要消耗的KV(STRING,AVRO).

But this doesn’t seem to be the approach when you need to deserialise from AVRO. I have a KV(STRING, AVRO) which I need to consume.

我尝试从AVRO模式生成Java类,然后将它们包含在应用"中,例如:

I attempted generating the Java Classes from the AVRO schema and then including them in the "apply" for example:

PCollection<MyClass> output = input.apply(Values.<MyClass>create());

但这似乎不是正确的方法.

But this didn’t seem to be the correct approach.

是否有任何人可以向我指出的文档/示例,因此我可以了解您如何使用Kafka AVRO和Beam. 任何帮助将非常感激.

Is there any documentation/examples anyone could point me to, so I could get an understanding as to how you would work with Kafka AVRO and Beam. Any help would be much appreciated.

我已更新代码:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;

public class Main {

public static void main(String[] args) {

    PipelineOptions options = PipelineOptionsFactory.create();

    Pipeline p = Pipeline.create(options);

    PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
    );

    p.run();

}
}
#######################################################
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(AvroCoder.class)
public class Myclass{
String name;
String age;

Myclass(){}
Myclass(String n, String a) {
    this.name= n;
    this.age= a;
}
}

但是我现在得到以下错误不兼容类型:java.lang.Class< io.confluent.kafka.serializers.KafkaAvroDeserializer>无法转换为java.lang.Class< ?扩展org.apache.kafka.common.serialization.Deserializer< java.lang.String>>

But i now get the following error incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >

我必须导入不正确的序列化程序吗?

I must be importing the incorrect serializers?

推荐答案

我也遇到了同样的问题.在此邮件归档中找到了解决方案. 没有http ://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E

I have faced the same issue. Found the solution in this mail-archives. http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E

对于您的情况,您需要定义自己的 KafkaAvroDeserializer ,如下所示.

In your case, you need to defined your own KafkaAvroDeserializer like as follows.

public class MyClassKafkaAvroDeserializer extends
  AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
      configure(new KafkaAvroDeserializerConfig(configs));
  }

  @Override
  public MyClass deserialize(String s, byte[] bytes) {
      return (MyClass) this.deserialize(bytes);
  }

  @Override
  public void close() {} }

然后将您的 KafkaAvroDeserializer 指定为ValueDeserializer.

Then specify your KafkaAvroDeserializer as ValueDeserializer.

p.apply(KafkaIO.<Long, MyClass>read()
 .withKeyDeserializer(LongDeserializer.class)
 .withValueDeserializer(MyClassKafkaAvroDeserializer.class) );

这篇关于如何使用Apache Beam反序列化Kafka AVRO消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆