如何使用结构化流从Kafka中读取JSON格式的记录? [英] How to read records in JSON format from Kafka using Structured Streaming?

查看:658
本文介绍了如何使用结构化流从Kafka中读取JSON格式的记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用结构化流媒体方法使用基于DataFrame/Dataset API的Spark-Streaming从Kafka加载数据流.

I am trying to use structured streaming approach using Spark-Streaming based on DataFrame/Dataset API to load a stream of data from Kafka.

我使用:

  • 火花2.10
  • 卡夫卡0.10
  • spark-sql-kafka-0-10

Spark Kafka数据源已定义了基础架构:

Spark Kafka DataSource has defined underlying schema:

|key|value|topic|partition|offset|timestamp|timestampType|

我的数据采用json格式,并且存储在 value 列中.我正在寻找一种方法,该方法如何从value列中提取基础架构并将更新的数据框更新为存储在 value 中的列?我尝试了下面的方法,但是不起作用:

My data come in json format and they are stored in the value column. I am looking for a way how to extract underlying schema from value column and update received dataframe to columns stored in value? I tried the approach below but it does not work:

 val columns = Array("column1", "column2") // column names
 val rawKafkaDF = sparkSession.sqlContext.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("subscribe",topic)
  .load()
  val columnsToSelect = columns.map( x => new Column("value." + x))
  val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)

  // some analytics using stream dataframe kafkaDF

  val query = kafkaDF.writeStream.format("console").start()
  query.awaitTermination()

在这里,我收到异常org.apache.spark.sql.AnalysisException: Can't extract value from value#337;,因为在创建流时,其中的值是未知的...

Here I am getting Exception org.apache.spark.sql.AnalysisException: Can't extract value from value#337; because in time of creation of the stream, values inside are not known...

您有什么建议吗?

推荐答案

从Spark角度来看,value只是一个字节序列.它不了解序列化格式或内容.为了能够提取文件,您必须先对其进行解析.

From the Spark perspective value is just a byte sequence. It has no knowledge about the serialization format or content. To be able to extract the filed you have to parse it first.

如果将数据序列化为JSON字符串,则有两个选择.您可以cast valueStringType并使用from_json并提供模式:

If data is serialized as a JSON string you have two options. You can cast value to StringType and use from_json and provide a schema:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json

val schema: StructType = StructType(Seq(
  StructField("column1", ???),
  StructField("column2", ???)
))

rawKafkaDF.select(from_json($"value".cast(StringType), schema))

castStringType,使用get_json_object按路径提取字段:

or cast to StringType, extract fields by path using get_json_object:

import org.apache.spark.sql.functions.get_json_object

val columns: Seq[String] = ???

val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))

rawKafkaDF.select(exprs: _*)

cast之后更改为所需的类型.

and cast later to the desired types.

这篇关于如何使用结构化流从Kafka中读取JSON格式的记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆