Spark 结构化流 kafka 转换 JSON 没有模式(推断模式) [英] Spark structured streaming kafka convert JSON without schema (infer schema)

查看:34
本文介绍了Spark 结构化流 kafka 转换 JSON 没有模式(推断模式)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我读到 Spark Structured Streaming 不支持将 Kafka 消息读取为 JSON 的模式推断.有没有办法像 Spark Streaming 一样检索架构:

I read Spark Structured Streaming doesn't support schema inference for reading Kafka messages as JSON. Is there a way to retrieve schema the same as Spark Streaming does:

val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema 

推荐答案

这是一种可行的方法:

  1. 在开始流式传输之前,从 Kafka 获取一小批数据

  1. Before you start streaming, get a small batch of the data from Kafka

从小批量推断模式

开始使用提取的架构流式传输数据.

Start streaming the data using the extracted schema.

下面的伪代码说明了这种方法.

The pseudo-code below illustrates this approach.

第 1 步:

从 Kafka 中提取一小部分(两条记录),

Extract a small (two records) batch from Kafka,

val smallBatch = spark.read.format("kafka")
                           .option("kafka.bootstrap.servers", "node:9092")
                           .option("subscribe", "topicName")
                           .option("startingOffsets", "earliest")
                           .option("endingOffsets", """{"topicName":{"0":2}}""")
                           .load()
                           .selectExpr("CAST(value AS STRING) as STRING").as[String].toDF()

第 2 步:将小批量写入文件:

Step 2: Write the small batch to a file:

smallBatch.write.mode("overwrite").format("text").save("/batch")

此命令将小批量写入 hdfs 目录/batch.它创建的文件的名称是 part-xyz*.所以你首先需要使用 hadoop FileSystem 命令重命名文件(参见 org.apache.hadoop.fs._ 和 org.apache.hadoop.conf.Configuration,这是一个例子 https://stackoverflow.com/a/41990859) 然后将文件读取为 json:

This command writes the small batch into hdfs directory /batch. The name of the file that it creates is part-xyz*. So you first need to rename the file using hadoop FileSystem commands (see org.apache.hadoop.fs._ and org.apache.hadoop.conf.Configuration, here's an example https://stackoverflow.com/a/41990859) and then read the file as json:

val smallBatchSchema = spark.read.json("/batch/batchName.txt").schema

这里,batchName.txt 是文件的新名称,smallBatchSchema 包含从小批量推断的架构.

Here, batchName.txt is the new name of the file and smallBatchSchema contains the schema inferred from the small batch.

最后,您可以按如下方式流式传输数据(第 3 步):

Finally, you can stream the data as follows (Step 3):

val inputDf = spark.readStream.format("kafka")
                             .option("kafka.bootstrap.servers", "node:9092")
                             .option("subscribe", "topicName")
                             .option("startingOffsets", "earliest")
                             .load()

val dataDf = inputDf.selectExpr("CAST(value AS STRING) as json")
                    .select( from_json($"json", schema=smallBatchSchema).as("data"))
                    .select("data.*")

希望这会有所帮助!

这篇关于Spark 结构化流 kafka 转换 JSON 没有模式(推断模式)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆