Spark结构化流式Kafka转换不带模式的JSON(推断模式) [英] Spark structured streaming kafka convert JSON without schema (infer schema)

查看:176
本文介绍了Spark结构化流式Kafka转换不带模式的JSON(推断模式)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我阅读了Spark结构化流,不支持将Kafka消息读取为JSON的架构推断.有没有办法像Spark Streaming一样检索架构:

I read Spark Structured Streaming doesn't support schema inference for reading Kafka messages as JSON. Is there a way to retrieve schema the same as Spark Streaming does:

val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema 

推荐答案

以下是执行此操作的一种可能方法:

Here is one possible way to do this:

  1. 开始流式传输之前,请先从Kafka获取一小部分数据

  1. Before you start streaming, get a small batch of the data from Kafka

从小批量推断模式

开始使用提取的模式流式传输数据.

Start streaming the data using the extracted schema.

下面的伪代码说明了这种方法.

The pseudo-code below illustrates this approach.

第1步:

从卡夫卡提取一小批(两条记录),

Extract a small (two records) batch from Kafka,

val smallBatch = spark.read.format("kafka")
                           .option("kafka.bootstrap.servers", "node:9092")
                           .option("subscribe", "topicName")
                           .option("startingOffsets", "earliest")
                           .option("endingOffsets", """{"topicName":{"0":2}}""")
                           .load()
                           .selectExpr("CAST(value AS STRING) as STRING").as[String].toDF()

第2步: 将小批量写入文件:

Step 2: Write the small batch to a file:

smallBatch.write.mode("overwrite").format("text").save("/batch")

此命令将小批量写入hdfs目录/batch.它创建的文件的名称为part-xyz *.因此,您首先需要使用hadoop FileSystem命令重命名文件(请参阅org.apache.hadoop.fs._和org.apache.hadoop.conf.Configuration,这是示例

This command writes the small batch into hdfs directory /batch. The name of the file that it creates is part-xyz*. So you first need to rename the file using hadoop FileSystem commands (see org.apache.hadoop.fs._ and org.apache.hadoop.conf.Configuration, here's an example https://stackoverflow.com/a/41990859) and then read the file as json:

val smallBatchSchema = spark.read.json("/batch/batchName.txt").schema

在这里,batchName.txt是文件的新名称,smallBatchSchema包含从小批量中推断出的架构.

Here, batchName.txt is the new name of the file and smallBatchSchema contains the schema inferred from the small batch.

最后,您可以按以下方式流传输数据(第3步):

Finally, you can stream the data as follows (Step 3):

val inputDf = spark.readStream.format("kafka")
                             .option("kafka.bootstrap.servers", "node:9092")
                             .option("subscribe", "topicName")
                             .option("startingOffsets", "earliest")
                             .load()

val dataDf = inputDf.selectExpr("CAST(value AS STRING) as json")
                    .select( from_json($"json", schema=smallBatchSchema).as("data"))
                    .select("data.*")

希望这会有所帮助!

这篇关于Spark结构化流式Kafka转换不带模式的JSON(推断模式)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆