Spark结构化流式Kafka转换不带模式的JSON(推断模式) [英] Spark structured streaming kafka convert JSON without schema (infer schema)
问题描述
我阅读了Spark结构化流,不支持将Kafka消息读取为JSON的架构推断.有没有办法像Spark Streaming一样检索架构:
I read Spark Structured Streaming doesn't support schema inference for reading Kafka messages as JSON. Is there a way to retrieve schema the same as Spark Streaming does:
val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema
推荐答案
以下是执行此操作的一种可能方法:
Here is one possible way to do this:
-
开始流式传输之前,请先从Kafka获取一小部分数据
Before you start streaming, get a small batch of the data from Kafka
从小批量推断模式
开始使用提取的模式流式传输数据.
Start streaming the data using the extracted schema.
下面的伪代码说明了这种方法.
The pseudo-code below illustrates this approach.
第1步:
从卡夫卡提取一小批(两条记录),
Extract a small (two records) batch from Kafka,
val smallBatch = spark.read.format("kafka")
.option("kafka.bootstrap.servers", "node:9092")
.option("subscribe", "topicName")
.option("startingOffsets", "earliest")
.option("endingOffsets", """{"topicName":{"0":2}}""")
.load()
.selectExpr("CAST(value AS STRING) as STRING").as[String].toDF()
第2步: 将小批量写入文件:
Step 2: Write the small batch to a file:
smallBatch.write.mode("overwrite").format("text").save("/batch")
此命令将小批量写入hdfs目录/batch.它创建的文件的名称为part-xyz *.因此,您首先需要使用hadoop FileSystem命令重命名文件(请参阅org.apache.hadoop.fs._和org.apache.hadoop.conf.Configuration,这是示例
This command writes the small batch into hdfs directory /batch. The name of the file that it creates is part-xyz*. So you first need to rename the file using hadoop FileSystem commands (see org.apache.hadoop.fs._ and org.apache.hadoop.conf.Configuration, here's an example https://stackoverflow.com/a/41990859) and then read the file as json:
val smallBatchSchema = spark.read.json("/batch/batchName.txt").schema
在这里,batchName.txt是文件的新名称,smallBatchSchema包含从小批量中推断出的架构.
Here, batchName.txt is the new name of the file and smallBatchSchema contains the schema inferred from the small batch.
最后,您可以按以下方式流传输数据(第3步):
Finally, you can stream the data as follows (Step 3):
val inputDf = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "node:9092")
.option("subscribe", "topicName")
.option("startingOffsets", "earliest")
.load()
val dataDf = inputDf.selectExpr("CAST(value AS STRING) as json")
.select( from_json($"json", schema=smallBatchSchema).as("data"))
.select("data.*")
希望这会有所帮助!
这篇关于Spark结构化流式Kafka转换不带模式的JSON(推断模式)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!