使用 Spark Structured Streaming 读取带有架构的 Kafka Connect JSONConverter 消息 [英] Reading Kafka Connect JSONConverter messages with schema using Spark Structured Streaming

查看:35
本文介绍了使用 Spark Structured Streaming 读取带有架构的 Kafka Connect JSONConverter 消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试读取来自 Kafka 主题的消息.消息格式如下(示例格式):

I am trying to read message from Kafka Topic. Message are in below format (sample format):

{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}

另外,请注意主题包含来自不同表格的消息,而不仅仅是 1 个表格.

Also, please note topic has message from different tables and not just 1 table.

我想要实现的是使用 Spark Structured Streaming 从 Kafka Topic 读取上述消息,并创建一个带有列名的数据框,其值都来自 JSON 消息本身.

What I am trying to achieve is to read above message from Kafka Topic using Spark Structured Streaming and create a dataframe with column names ad its value both coming from JSON message itself.

我不想使用 case 类或 StructType 显式定义架构.

I don't want to explicitly define a schema using case class or StructType.

我试过了:

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", "topic1").option("startingOffsets", "earliest").load()

val y=df.select(get_json_object(($"value"), "$.payload").alias("payload")

当我查看 Y(它是一个数据框)时,它是 1 列,有效载荷下的值是该列中的 JSON.

When I view Y (which is a dataframe), it comes as 1 column with value under payload as JSON in that column.

如何获取数据框中的单个列?我没有做到这一点.

How to get individual column in a dataframe? I am not achieve this.

(再次重申,我不能使用通用 case 类或 StructType 作为模式部分,因为通过 Kafka 消息传来的消息来自不同的表,所以我希望在运行时从 JSON 本身创建更多的动态模式.)

(Again reiterating I cannot use a generic case class or StructType for schema part as messages coming through Kafka message are from different tables so I want a more of dynamic Schema created from JSON itself on the run.)

推荐答案

选项 1:更改 Kafka Connect 源以设置 value.converter.schemas.enable=false.这只会给你(开始时未包装的有效载荷),然后你可以跳到下面的帖子.

Option 1: Change the Kafka Connect source to set value.converter.schemas.enable=false. This will only give you the (unwrapped payload to begin with), then you can skip to below post.

否则,在剥离 Connect 架构后,您需要使用 from_json() 来应用架构

Otherwise, after you strip the Connect schema, you would need to use from_json() to apply a schema

val y = df.select(get_json_object($"value", "$.payload").alias("payload"))
val z = df.select(from_json($"payload", schema))

你所有的字段都是字符串,所以看起来像

All your fields are strings, so would look like

val schema: StructType = StructType(Seq(
  StructField("emp_id", StringType()),
  StructField("emp_name", StringType()),
  StructField("city", StringType()),
  StructField("emp_sal", StringType()),
  StructField("manager_name", StringType())
))

相关

如何使用结构化流从 Kafka 读取 JSON 格式的记录?

这篇关于使用 Spark Structured Streaming 读取带有架构的 Kafka Connect JSONConverter 消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆