使用Spark结构化流技术读取带有模式的Kafka Connect JSONConverter消息 [英] Reading Kafka Connect JSONConverter messages with schema using Spark Structured Streaming

查看:160
本文介绍了使用Spark结构化流技术读取带有模式的Kafka Connect JSONConverter消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从Kafka主题中读取消息.消息采用以下格式(示例格式):

I am trying to read message from Kafka Topic. Message are in below format (sample format):

{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}

另外,请注意主题包含来自不同表的消息,而不仅仅是1个表.

Also, please note topic has message from different tables and not just 1 table.

我要实现的目标是使用Spark结构化流技术从Kafka Topic阅读以上消息,并创建一个数据列,其列名和值均来自JSON消息本身.

What I am trying to achieve is to read above message from Kafka Topic using Spark Structured Streaming and create a dataframe with column names ad its value both coming from JSON message itself.

我不想使用案例类或StructType显式定义架构.

I don't want to explicitly define a schema using case class or StructType.

我尝试过:

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", "topic1").option("startingOffsets", "earliest").load()

val y=df.select(get_json_object(($"value"), "$.payload").alias("payload")

当我查看Y(它是一个数据框)时,它以1列的形式出现,有效载荷下的值为该列中的JSON.

When I view Y (which is a dataframe), it comes as 1 column with value under payload as JSON in that column.

如何获取数据框中的单个列?我没有实现.

How to get individual column in a dataframe? I am not achieve this.

(再次重申,由于通过Kafka消息传递的消息来自不同的表,因此我不能对模式部分使用通用的案例类或StructType,因此我希望在运行时从JSON本身创建更多的动态Schema.)

(Again reiterating I cannot use a generic case class or StructType for schema part as messages coming through Kafka message are from different tables so I want a more of dynamic Schema created from JSON itself on the run.)

推荐答案

选项1:更改Kafka Connect源以设置value.converter.schemas.enable=false.这只会给您(开始时未包装的有效载荷),然后您可以跳到下面的文章.

Option 1: Change the Kafka Connect source to set value.converter.schemas.enable=false. This will only give you the (unwrapped payload to begin with), then you can skip to below post.

否则,剥离连接架构后,您将需要使用from_json()来应用架构

Otherwise, after you strip the Connect schema, you would need to use from_json() to apply a schema

val y = df.select(get_json_object($"value", "$.payload").alias("payload"))
val z = df.select(from_json($"payload", schema))

您所有的字段都是字符串,因此看起来像

All your fields are strings, so would look like

val schema: StructType = StructType(Seq(
  StructField("emp_id", StringType()),
  StructField("emp_name", StringType()),
  StructField("city", StringType()),
  StructField("emp_sal", StringType()),
  StructField("manager_name", StringType())
))

相关

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆