如何使用结构化流从 Kafka 读取 JSON 格式的记录? [英] How to read records in JSON format from Kafka using Structured Streaming?
问题描述
我正在尝试使用 结构化流方法使用基于DataFrame/Dataset API的Spark-Streaming从Kafka加载数据流.
I am trying to use structured streaming approach using Spark-Streaming based on DataFrame/Dataset API to load a stream of data from Kafka.
我使用:
- Spark 2.10
- 卡夫卡 0.10
- spark-sql-kafka-0-10
Spark Kafka DataSource 已经定义了底层架构:
Spark Kafka DataSource has defined underlying schema:
|key|value|topic|partition|offset|timestamp|timestampType|
我的数据采用 json 格式,它们存储在 value 列中.我正在寻找一种方法,如何从值列中提取底层架构并将接收到的数据帧更新为存储在 value 中的列?我尝试了下面的方法,但它不起作用:
My data come in json format and they are stored in the value column. I am looking for a way how to extract underlying schema from value column and update received dataframe to columns stored in value? I tried the approach below but it does not work:
val columns = Array("column1", "column2") // column names
val rawKafkaDF = sparkSession.sqlContext.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe",topic)
.load()
val columnsToSelect = columns.map( x => new Column("value." + x))
val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)
// some analytics using stream dataframe kafkaDF
val query = kafkaDF.writeStream.format("console").start()
query.awaitTermination()
这里我得到异常 org.apache.spark.sql.AnalysisException: Can't extract value from value#337;
因为在创建流时,里面的值是未知的...
Here I am getting Exception org.apache.spark.sql.AnalysisException: Can't extract value from value#337;
because in time of creation of the stream, values inside are not known...
您有什么建议吗?
推荐答案
从 Spark 的角度来看 value
只是一个字节序列.它不知道序列化格式或内容.为了能够提取该文件,您必须先解析它.
From the Spark perspective value
is just a byte sequence. It has no knowledge about the serialization format or content. To be able to extract the filed you have to parse it first.
如果数据被序列化为 JSON 字符串,您有两个选择.您可以将 cast
value
转换为 StringType
并使用 from_json
并提供架构:
If data is serialized as a JSON string you have two options. You can cast
value
to StringType
and use from_json
and provide a schema:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json
val schema: StructType = StructType(Seq(
StructField("column1", ???),
StructField("column2", ???)
))
rawKafkaDF.select(from_json($"value".cast(StringType), schema))
或cast
到StringType
,使用get_json_object
按路径提取字段:
or cast
to StringType
, extract fields by path using get_json_object
:
import org.apache.spark.sql.functions.get_json_object
val columns: Seq[String] = ???
val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))
rawKafkaDF.select(exprs: _*)
和 cast
稍后转换为所需的类型.
and cast
later to the desired types.
这篇关于如何使用结构化流从 Kafka 读取 JSON 格式的记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!