从Kafka读取消息流时如何处理Avro消息? [英] How to process Avro messages while reading a stream of messages from Kafka?

查看:135
本文介绍了从Kafka读取消息流时如何处理Avro消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面的代码从Kafka中读取消息,并且消息位于Avro中,那么如何解析消息并将其放入Spark 2.2.0中的数据帧中?

  Dataset< Row>df = sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers","localhost:9092").option("subscribe","topic1").加载(); 

https://github.com/databricks/spark-avro 库没有流案例的例子.

解决方案

如何解析消息并将其放入Spark 2.2.0中的数据框中?

这是您的家庭锻炼,需要一些编码.

https://github.com/databricks/spark-avro 库没有流案例的例子.

有人告诉我(在这里看到了几个问题),spark-avro确实支持Spark结构化流(又名Spark Streams).它适用于非流数据集,但不能处理流数据集.

这就是为什么我写这是您必须自己编写代码的原因.

如下所示(为简单起见,我使用Scala):

 //步骤1.将消息转换为字符串val avroMessages = df.select($"value"强制转换为"string")//步骤2.剥离avro层val from_avro = udf {(s:String)=>...在这里处理...}val cleanDataset = avroMessages.withColumn("no_avro_anymore",from_avro($"value")) 

那将需要开发一个 from_avro 自定义UDF,该UDF将实现您想要的功能(这类似于Spark使用标准功能 from_json 来处理JSON格式的方式!)


或者(以更高级的方式?/使用复杂的方法)编写您自己的自定义流 Dataset< Row>df = sparkSession.readStream().format("avro-kafka")//<-在此自定义源.option("kafka.bootstrap.servers","localhost:9092").option("subscribe","topic1").加载();

我尚未发现 avro-kafka 格式的可行性.它确实是可行的,但是可以同时做两件事,即从Kafka 阅读以进行Avro转换,并且不确信这是在Spark Structured Streaming和软件工程中做事的方式.我希望有一种方法可以一种又一种地应用一种格式,但是这在Spark 2.2.1中是不可能的(并且也不打算在2.3中使用).

那么我认为UDF暂时是最好的解决方案.


只是想一想,您还可以编写自定义的Kafka Dataset<Row> df = sparkSession.readStream() .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "topic1") .load();

This https://github.com/databricks/spark-avro library had no example for streaming case.

解决方案

how do I parse the message and put it into a dataframe in Spark 2.2.0?

That's your home exercise that is going to require some coding.

This https://github.com/databricks/spark-avro library had no example for streaming case.

I've been told (and seen a couple of questions here) that spark-avro does not support Spark Structured Streaming (aka Spark Streams). It works fine with non-streaming Datasets, but can't handle streaming ones.

That's why I wrote that this is something you have to code yourself.

That could look as follows (I use Scala for simplicity):

// Step 1. convert messages to be strings
val avroMessages = df.select($"value" cast "string")

// Step 2. Strip the avro layer off
val from_avro = udf { (s: String) => ...processing here... }
val cleanDataset = avroMessages.withColumn("no_avro_anymore", from_avro($"value"))

That would require developing a from_avro custom UDF that would do what you want (and would be similar to how Spark handles JSON format using from_json standard function!)


Alternatively (and in a slightly more advanced? / convoluted approach) write your own custom streaming Source for datasets in Avro format in Kafka and use it instead.

Dataset<Row> df = sparkSession.readStream()
            .format("avro-kafka") // <-- HERE YOUR CUSTOM Source
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "topic1")
            .load();

I'm yet to find out how doable avro-kafka format is. It is indeed doable, but does two things at once, i.e. reading from Kafka and doing Avro conversion, and am not convinced that's the way to do things in Spark Structured Streaming and in software engineering in general. I wished there were a way to apply one format after another, but that's not possible in Spark 2.2.1 (and is not planned for 2.3 either).

I think then that a UDF is the best solution for the time being.


Just a thought, you could also write a custom Kafka Deserializer that would do the deserialization while Spark loads messages.

这篇关于从Kafka读取消息流时如何处理Avro消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆