如何使用结构化流从 Kafka 读取 JSON 格式的记录? [英] How to read records in JSON format from Kafka using Structured Streaming?

查看:50
本文介绍了如何使用结构化流从 Kafka 读取 JSON 格式的记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 结构化流方法使用基于DataFrame/Dataset API的Spark-Streaming从Kafka加载数据流.

I am trying to use structured streaming approach using Spark-Streaming based on DataFrame/Dataset API to load a stream of data from Kafka.

我使用:

  • Spark 2.10
  • 卡夫卡 0.10
  • spark-sql-kafka-0-10

Spark Kafka DataSource 已经定义了底层架构:

Spark Kafka DataSource has defined underlying schema:

|key|value|topic|partition|offset|timestamp|timestampType|

我的数据采用 json 格式,它们存储在 value 列中.我正在寻找一种方法,如何从值列中提取底层架构并将接收到的数据帧更新为存储在 value 中的列?我尝试了下面的方法,但它不起作用:

My data come in json format and they are stored in the value column. I am looking for a way how to extract underlying schema from value column and update received dataframe to columns stored in value? I tried the approach below but it does not work:

 val columns = Array("column1", "column2") // column names
 val rawKafkaDF = sparkSession.sqlContext.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("subscribe",topic)
  .load()
  val columnsToSelect = columns.map( x => new Column("value." + x))
  val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)

  // some analytics using stream dataframe kafkaDF

  val query = kafkaDF.writeStream.format("console").start()
  query.awaitTermination()

这里我得到异常 org.apache.spark.sql.AnalysisException: Can't extract value from value#337; 因为在创建流时,里面的值是未知的...

Here I am getting Exception org.apache.spark.sql.AnalysisException: Can't extract value from value#337; because in time of creation of the stream, values inside are not known...

您有什么建议吗?

推荐答案

从 Spark 的角度来看 value 只是一个字节序列.它不知道序列化格式或内容.为了能够提取该文件,您必须先解析它.

From the Spark perspective value is just a byte sequence. It has no knowledge about the serialization format or content. To be able to extract the filed you have to parse it first.

如果数据被序列化为 JSON 字符串,您有两个选择.您可以将 cast value 转换为 StringType 并使用 from_json 并提供架构:

If data is serialized as a JSON string you have two options. You can cast value to StringType and use from_json and provide a schema:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json

val schema: StructType = StructType(Seq(
  StructField("column1", ???),
  StructField("column2", ???)
))

rawKafkaDF.select(from_json($"value".cast(StringType), schema))

castStringType,使用get_json_object按路径提取字段:

or cast to StringType, extract fields by path using get_json_object:

import org.apache.spark.sql.functions.get_json_object

val columns: Seq[String] = ???

val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))

rawKafkaDF.select(exprs: _*)

cast 稍后转换为所需的类型.

and cast later to the desired types.

这篇关于如何使用结构化流从 Kafka 读取 JSON 格式的记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆