如何使用 Apache Beam 反序列化 Kafka AVRO 消息 [英] How to Deserialising Kafka AVRO messages using Apache Beam

查看:28
本文介绍了如何使用 Apache Beam 反序列化 Kafka AVRO 消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

主要目标是聚合两个 Kafka 主题,一个是压缩的慢速移动数据,另一个是每秒接收到的快速移动数据.

The main goal is the aggregate two Kafka topics, one compacted slow moving data and the other fast moving data which is received every second.

我已经能够在诸如 KV (Long,String) 之类的简单场景中使用类似以下内容的消息:

I have been able to consume messages in simple scenarios such as a KV (Long,String) using something like:

PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long, 
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)

PCollection<String> output = input.apply(Values.<String>create());

但是当您需要从 AVRO 反序列化时,这似乎不是方法.我有一个需要消耗的 KV(STRING, AVRO).

But this doesn’t seem to be the approach when you need to deserialise from AVRO. I have a KV(STRING, AVRO) which I need to consume.

我尝试从 AVRO 模式生成 Java 类,然后将它们包含在应用"中,例如:

I attempted generating the Java Classes from the AVRO schema and then including them in the "apply" for example:

PCollection<MyClass> output = input.apply(Values.<MyClass>create());

但这似乎不是正确的方法.

But this didn’t seem to be the correct approach.

是否有任何人可以指点我的文档/示例,以便我了解您将如何使用 Kafka AVRO 和 Beam.任何帮助将非常感激.

Is there any documentation/examples anyone could point me to, so I could get an understanding as to how you would work with Kafka AVRO and Beam. Any help would be much appreciated.

我已经更新了我的代码:

I have updated my code:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;

public class Main {

public static void main(String[] args) {

    PipelineOptions options = PipelineOptionsFactory.create();

    Pipeline p = Pipeline.create(options);

    PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
    );

    p.run();

}
}
#######################################################
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(AvroCoder.class)
public class Myclass{
String name;
String age;

Myclass(){}
Myclass(String n, String a) {
    this.name= n;
    this.age= a;
}
}

但我现在收到以下错误不兼容的类型:java.lang.Class <io.confluent.kafka.serializers.KafkaAvroDeserializer > 不能转换为 java.lang.Class <;?扩展 org.apache.kafka.common.serialization.Deserializer <java.lang.String > >

But i now get the following error incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >

我一定是导入了错误的序列化程序?

I must be importing the incorrect serializers?

推荐答案

我遇到了同样的问题.在此邮件档案中找到了解决方案.://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E

I have faced the same issue. Found the solution in this mail-archives. http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E

在您的情况下,您需要定义自己的 KafkaAvroDeserializer,如下所示.

In your case, you need to defined your own KafkaAvroDeserializer like as follows.

public class MyClassKafkaAvroDeserializer extends
  AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
      configure(new KafkaAvroDeserializerConfig(configs));
  }

  @Override
  public MyClass deserialize(String s, byte[] bytes) {
      return (MyClass) this.deserialize(bytes);
  }

  @Override
  public void close() {} }

然后将您的 KafkaAvroDeserializer 指定为 ValueDeserializer.

Then specify your KafkaAvroDeserializer as ValueDeserializer.

p.apply(KafkaIO.<Long, MyClass>read()
 .withKeyDeserializer(LongDeserializer.class)
 .withValueDeserializer(MyClassKafkaAvroDeserializer.class) );

这篇关于如何使用 Apache Beam 反序列化 Kafka AVRO 消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆