使用Spark 2.0.2从Kafka读取Avro消息(结构化流) [英] Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)

查看:310
本文介绍了使用Spark 2.0.2从Kafka读取Avro消息(结构化流)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个spark 2.0应用程序,该应用程序使用Spark Streaming(带有spark-streaming-kafka-0-10_2.11)从kafka读取消息.

I have a spark 2.0 application that reads messages from kafka using spark streaming (with spark-streaming-kafka-0-10_2.11).

结构化的流媒体看起来真的很酷,所以我想尝试迁移代码,但是我不知道如何使用它.

Structured streaming looks really cool so I wanted to try and migrate the code but I can't figure out how to use it.

在常规流中,我使用kafkaUtils来创建Dstrean,在传递的参数中,它是值反序列化器.

in the regular streaming I used kafkaUtils to createDstrean and in the parameters I passed it was the value deserializer.

在结构化流式传输中,文档说我应该使用DataFrame函数反序列化,但是我无法确切知道这意味着什么.

in the Structured streaming the doc says that I should deserialize using DataFrame functions but I can't figure exactly what that means.

我查看了诸如示例这样的示例,但我在Kafka中的Avro对象非常复杂,不能像示例中的String那样简单地转换.

I looked at examples such as this example but my Avro object in Kafka is quit complex and cannot be simply casted like the String in the example..

到目前为止,我尝试了这种代码(在另一个问题中看到了这种代码):

So far I tried this kind of code (which I saw here in a different question):

import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","RED-test-tal4").load()

  ds1.printSchema()
  ds1.select("value").printSchema()
  val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()

我得到数据类型不匹配:无法将BinaryType强制转换为StructType(StructField(....)".

and I get "data type mismatch: cannot cast BinaryType to StructType(StructField(...."

如何反序列化值?

推荐答案

如上所述,自Spark 2.1.0起,批处理读取器支持avro,但SparkSession.readStream()不支持.这是我根据其他响应在Scala中使用它的方式.为了简化起见,我简化了架构.

As noted above, as of Spark 2.1.0 there is support for avro with the batch reader but not with SparkSession.readStream(). Here is how I got it to work in Scala based on the other responses. I've simplified the schema for brevity.

package com.sevone.sparkscala.mypackage

import org.apache.spark.sql._
import org.apache.avro.io.DecoderFactory
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}

object MyMain {

    // Create avro schema and reader
    case class KafkaMessage (
        deviceId: Int,
        deviceName: String
    )
    val schemaString = """{
        "fields": [
            { "name":  "deviceId",      "type": "int"},
            { "name":  "deviceName",    "type": "string"},
        ],
        "name": "kafkamsg",
        "type": "record"
    }""""
    val messageSchema = new Schema.Parser().parse(schemaString)
    val reader = new GenericDatumReader[GenericRecord](messageSchema)
    // Factory to deserialize binary avro data
    val avroDecoderFactory = DecoderFactory.get()
    // Register implicit encoder for map operation
    implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord]

    def main(args: Array[String]) {

        val KafkaBroker =  args(0);
        val InTopic = args(1);
        val OutTopic = args(2);

        // Get Spark session
        val session = SparkSession
                .builder
                .master("local[*]")
                .appName("myapp")
                .getOrCreate()

        // Load streaming data
        import session.implicits._
        val data = session
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", KafkaBroker)
                .option("subscribe", InTopic)
                .load()
                .select($"value".as[Array[Byte]])
                .map(d => {
                    val rec = reader.read(null, avroDecoderFactory.binaryDecoder(d, null))
                    val deviceId = rec.get("deviceId").asInstanceOf[Int]
                    val deviceName = rec.get("deviceName").asInstanceOf[org.apache.avro.util.Utf8].toString
                    new KafkaMessage(deviceId, deviceName)
                })

这篇关于使用Spark 2.0.2从Kafka读取Avro消息(结构化流)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆