使用 Spark 2.0.2(结构化流)从 Kafka 读取 Avro 消息 [英] Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)

查看:37
本文介绍了使用 Spark 2.0.2(结构化流)从 Kafka 读取 Avro 消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 spark 2.0 应用程序,它使用 spark 流(使用 spark-streaming-kafka-0-10_2.11)从 kafka 读取消息.

I have a spark 2.0 application that reads messages from kafka using spark streaming (with spark-streaming-kafka-0-10_2.11).

结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它.

Structured streaming looks really cool so I wanted to try and migrate the code but I can't figure out how to use it.

在常规流中,我使用 kafkaUtils 来 createDstrean,在我传递的参数中,它是值反序列化器.

in the regular streaming I used kafkaUtils to createDstrean and in the parameters I passed it was the value deserializer.

在 Structured Streaming 中,文档说我应该使用 DataFrame 函数进行反序列化,但我无法弄清楚这到底是什么意思.

in the Structured streaming the doc says that I should deserialize using DataFrame functions but I can't figure exactly what that means.

我查看了这样的示例 example 但是我在 Kafka 中的 Avro 对象非常复杂,不能像示例中的 String 那样简单地进行转换..

I looked at examples such as this example but my Avro object in Kafka is quit complex and cannot be simply casted like the String in the example..

到目前为止,我尝试过这种代码(我在一个不同的问题中看到过):

So far I tried this kind of code (which I saw here in a different question):

import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","RED-test-tal4").load()

  ds1.printSchema()
  ds1.select("value").printSchema()
  val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()

我得到数据类型不匹配:无法将 BinaryType 强制转换为 StructType(StructField(...."

and I get "data type mismatch: cannot cast BinaryType to StructType(StructField(...."

如何反序列化值?

推荐答案

如上所述,从 Spark 2.1.0 开始,批处理读取器支持 avro,但 SparkSession.readStream() 不支持.以下是我根据其他响应让它在 Scala 中工作的方法.为简洁起见,我简化了架构.

As noted above, as of Spark 2.1.0 there is support for avro with the batch reader but not with SparkSession.readStream(). Here is how I got it to work in Scala based on the other responses. I've simplified the schema for brevity.

package com.sevone.sparkscala.mypackage

import org.apache.spark.sql._
import org.apache.avro.io.DecoderFactory
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}

object MyMain {

    // Create avro schema and reader
    case class KafkaMessage (
        deviceId: Int,
        deviceName: String
    )
    val schemaString = """{
        "fields": [
            { "name":  "deviceId",      "type": "int"},
            { "name":  "deviceName",    "type": "string"},
        ],
        "name": "kafkamsg",
        "type": "record"
    }"""
    val messageSchema = new Schema.Parser().parse(schemaString)
    val reader = new GenericDatumReader[GenericRecord](messageSchema)
    // Factory to deserialize binary avro data
    val avroDecoderFactory = DecoderFactory.get()
    // Register implicit encoder for map operation
    implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord]

    def main(args: Array[String]) {

        val KafkaBroker =  args(0);
        val InTopic = args(1);
        val OutTopic = args(2);

        // Get Spark session
        val session = SparkSession
                .builder
                .master("local[*]")
                .appName("myapp")
                .getOrCreate()

        // Load streaming data
        import session.implicits._
        val data = session
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", KafkaBroker)
                .option("subscribe", InTopic)
                .load()
                .select($"value".as[Array[Byte]])
                .map(d => {
                    val rec = reader.read(null, avroDecoderFactory.binaryDecoder(d, null))
                    val deviceId = rec.get("deviceId").asInstanceOf[Int]
                    val deviceName = rec.get("deviceName").asInstanceOf[org.apache.avro.util.Utf8].toString
                    new KafkaMessage(deviceId, deviceName)
                })

这篇关于使用 Spark 2.0.2(结构化流)从 Kafka 读取 Avro 消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆