Spark 结构化流 kafka 转换 JSON 没有模式(推断模式) [英] Spark structured streaming kafka convert JSON without schema (infer schema)
问题描述
我读到 Spark Structured Streaming 不支持将 Kafka 消息读取为 JSON 的模式推断.有没有办法像 Spark Streaming 一样检索架构:
I read Spark Structured Streaming doesn't support schema inference for reading Kafka messages as JSON. Is there a way to retrieve schema the same as Spark Streaming does:
val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema
推荐答案
这是一种可行的方法:
在开始流式传输之前,从 Kafka 获取一小批数据
Before you start streaming, get a small batch of the data from Kafka
从小批量推断模式
开始使用提取的架构流式传输数据.
Start streaming the data using the extracted schema.
下面的伪代码说明了这种方法.
The pseudo-code below illustrates this approach.
第 1 步:
从 Kafka 中提取一小部分(两条记录),
Extract a small (two records) batch from Kafka,
val smallBatch = spark.read.format("kafka")
.option("kafka.bootstrap.servers", "node:9092")
.option("subscribe", "topicName")
.option("startingOffsets", "earliest")
.option("endingOffsets", """{"topicName":{"0":2}}""")
.load()
.selectExpr("CAST(value AS STRING) as STRING").as[String].toDF()
第 2 步:将小批量写入文件:
Step 2: Write the small batch to a file:
smallBatch.write.mode("overwrite").format("text").save("/batch")
此命令将小批量写入 hdfs 目录/batch.它创建的文件的名称是 part-xyz*.所以你首先需要使用 hadoop FileSystem 命令重命名文件(参见 org.apache.hadoop.fs._ 和 org.apache.hadoop.conf.Configuration,这是一个例子 https://stackoverflow.com/a/41990859) 然后将文件读取为 json:
This command writes the small batch into hdfs directory /batch. The name of the file that it creates is part-xyz*. So you first need to rename the file using hadoop FileSystem commands (see org.apache.hadoop.fs._ and org.apache.hadoop.conf.Configuration, here's an example https://stackoverflow.com/a/41990859) and then read the file as json:
val smallBatchSchema = spark.read.json("/batch/batchName.txt").schema
这里,batchName.txt 是文件的新名称,smallBatchSchema 包含从小批量推断的架构.
Here, batchName.txt is the new name of the file and smallBatchSchema contains the schema inferred from the small batch.
最后,您可以按如下方式流式传输数据(第 3 步):
Finally, you can stream the data as follows (Step 3):
val inputDf = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "node:9092")
.option("subscribe", "topicName")
.option("startingOffsets", "earliest")
.load()
val dataDf = inputDf.selectExpr("CAST(value AS STRING) as json")
.select( from_json($"json", schema=smallBatchSchema).as("data"))
.select("data.*")
希望这会有所帮助!
这篇关于Spark 结构化流 kafka 转换 JSON 没有模式(推断模式)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!