使用Spark结构化流技术读取带有模式的Kafka Connect JSONConverter消息 [英] Reading Kafka Connect JSONConverter messages with schema using Spark Structured Streaming
问题描述
我正在尝试从Kafka主题中读取消息.消息采用以下格式(示例格式):
I am trying to read message from Kafka Topic. Message are in below format (sample format):
{"schema":{"type":"struct","name":"emp_table","fields":[{"field":"emp_id","type":"string"},{"field":"emp_name","type":"String"},{"field":"city","type":"string"},{"field":"emp_sal","type":"string"},{"field":"manager_name","type":"string"}]},"payload":{"emp_id":"1","emp_name":"abc","city":"NYK","emp_sal":"100000","manager_name":"xyz"}}
另外,请注意主题包含来自不同表的消息,而不仅仅是1个表.
Also, please note topic has message from different tables and not just 1 table.
我要实现的目标是使用Spark结构化流技术从Kafka Topic阅读以上消息,并创建一个数据列,其列名和值均来自JSON消息本身.
What I am trying to achieve is to read above message from Kafka Topic using Spark Structured Streaming and create a dataframe with column names ad its value both coming from JSON message itself.
我不想使用案例类或StructType显式定义架构.
I don't want to explicitly define a schema using case class or StructType.
我尝试过:
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", brokers).option("subscribe", "topic1").option("startingOffsets", "earliest").load()
val y=df.select(get_json_object(($"value"), "$.payload").alias("payload")
当我查看Y(它是一个数据框)时,它以1列的形式出现,有效载荷下的值为该列中的JSON.
When I view Y (which is a dataframe), it comes as 1 column with value under payload as JSON in that column.
如何获取数据框中的单个列?我没有实现.
How to get individual column in a dataframe? I am not achieve this.
(再次重申,由于通过Kafka消息传递的消息来自不同的表,因此我不能对模式部分使用通用的案例类或StructType,因此我希望在运行时从JSON本身创建更多的动态Schema.)
(Again reiterating I cannot use a generic case class or StructType for schema part as messages coming through Kafka message are from different tables so I want a more of dynamic Schema created from JSON itself on the run.)
推荐答案
选项1:更改Kafka Connect源以设置value.converter.schemas.enable=false
.这只会给您(开始时未包装的有效载荷),然后您可以跳到下面的文章.
Option 1: Change the Kafka Connect source to set value.converter.schemas.enable=false
. This will only give you the (unwrapped payload to begin with), then you can skip to below post.
否则,剥离连接架构后,您将需要使用from_json()
来应用架构
Otherwise, after you strip the Connect schema, you would need to use from_json()
to apply a schema
val y = df.select(get_json_object($"value", "$.payload").alias("payload"))
val z = df.select(from_json($"payload", schema))
您所有的字段都是字符串,因此看起来像
All your fields are strings, so would look like
val schema: StructType = StructType(Seq(
StructField("emp_id", StringType()),
StructField("emp_name", StringType()),
StructField("city", StringType()),
StructField("emp_sal", StringType()),
StructField("manager_name", StringType())
))
相关
- 如何在Kafka connect 0.10和Spark结构化流中使用from_json?
-
这篇关于使用Spark结构化流技术读取带有模式的Kafka Connect JSONConverter消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!