使用架构在Spark-Kafka中将ConsumerRecord值转换为Dataframe [英] Use schema to convert ConsumerRecord value to Dataframe in spark-kafka

查看:765
本文介绍了使用架构在Spark-Kafka中将ConsumerRecord值转换为Dataframe的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark 2.0.2,Kafka 0.11.0和 我正在尝试在Spark Streaming中使用来自kafka的消息.以下是代码:

I am using Spark 2.0.2, with Kafka 0.11.0, and I am trying to consume message from kafka in spark streaming. Following is the code:

val topics = "notes"
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:7092",
  "schema.registry.url" -> "http://localhost:7070",
  "group.id" -> "connect-cluster1",
  "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer"
)
val topicSet: Set[String] = Set(topics)
val stream = KafkaUtils.createDirectStream[String, String](
  SparkStream.ssc,
  PreferConsistent,
  Subscribe[String, String](topicSet, kafkaParams)
)
stream.foreachRDD ( rdd => {
  rdd.foreachPartition(iterator => {
    while (iterator.hasNext) {
      val next = iterator.next()
      println(next.value())
    }
  })
})

如果Kafka消息包含记录,则输出为:

If Kafka message contain records, the output would be:

{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312886984, "createdby": "karthik", "notes": "testing20"}
{"id": "4164a489-a0bb-4ea1-a259-b4e2a4519eee", "createdat": 1505312890472, "createdby": "karthik", "notes": "testing21"}

因此,从consumerRecord的值来看,接收到的消息将被Avro解码. 现在,我需要这些数据框格式的记录,但是即使手头的模式如下,我也不知道如何从这里开始:

Thus, received message is Avro decoded as seen from the consumerRecord's value. Now I need those records in a dataframe format, but I do not know how to proceed from here, even with the schema at hand as follows:

val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000)
val m = sr.getLatestSchemaMetadata(topics + "-value")
val schemaId = m.getId
val schemaString = m.getSchema

val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:7070", 1000)
val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry)
val parser = new Schema.Parser()
val avroSchema = parser.parse(schemaString)
println(avroSchema)

具有如下所示的模式:

{"type":"record","name":"notes","namespace":"db","fields":[{"name":"id","type":["null","string"],"default":null},{"name":"createdat","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":null},{"name":"createdby","type":["null","string"],"default":null},{"name":"notes","type":["null","string"],"default":null}],"connect.name":"db.notes"}

谁能帮助我了解如何从消费者记录的价值中获取数据框?我看过其他问题,例如使用架构将带有Spark的AVRO消息转换为DataFrame

Can anyone help me understand how to get the dataframe from the consumer record's value? I have looked at other questions such as Use schema to convert AVRO messages with Spark to DataFrame, Handling schema changes in running Spark Streaming application, but they are not dealing with the consumerRecord in the firstplace.

推荐答案

您可以在以下代码段中使用: stream是从kafka010的kafkaUtils api返回的消费者记录的DStream:

You can use below snippet : stream is the DStream of consumer record returned from kafkaUtils api of kafka010 :

stream.foreachRDD(rdd =>
    if (!rdd.isEmpty()) {
        val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
        import sqlContext.implicits._
        val topicValueStrings = rdd.map(record => (record.value()).toString)
        val df = sqlContext.read.json(topicValueStrings)
        df.show()
    })

这篇关于使用架构在Spark-Kafka中将ConsumerRecord值转换为Dataframe的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆