Spark Streaming:从具有多个架构的kafka读取数据 [英] Spark Streamming : Reading data from kafka that has multiple schema

查看:90
本文介绍了Spark Streaming:从具有多个架构的kafka读取数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为Spark Streaming的实现而苦苦挣扎.

I am struggling with the implementation in spark streaming.

kafka发出的消息看起来像这样,但具有更多字段

The messages from the kafka looks like this but with with more fields

{"event":"sensordata", "source":"sensors", "payload": {"actual data as a json}}
{"event":"databasedata", "mysql":"sensors", "payload": {"actual data as a json}}
{"event":"eventApi", "source":"event1", "payload": {"actual data as a json}}
{"event":"eventapi", "source":"event2", "payload": {"actual data as a json}}

我正在尝试从Kafka主题(具有多个架构)中读取消息.我需要阅读每条消息并查找事件和源字段,并确定将其存储为数据集的位置.实际数据以JSON形式存在于字段有效负载中,而JSON只是一条记录.

I am trying to read the messages from a Kafka topic (which has multiple schemas). I need to read each message and look for an event and source field and decide where to store as a Dataset. The actual data is in the field payload as a JSON which is only a single record.

有人可以帮助我实施此方法或其他替代方法吗?

Can someone help me to implement this or any other alternatives?

这是在同一个主题中发送具有多个架构的消息并加以使用的好方法吗?

Is it a good way to send the messages with multiple schemas in the same topic and consume it?

预先感谢

推荐答案

您可以根据传入的JSON对象创建Dataframe.

You can create a Dataframe from the incoming JSON object.

创建JSON对象的Seq[Sring].

使用val df=spark.read.json[Seq[String]].

在您选择的dataframe df上执行操作.

Perform the operations on the dataframe df of your choice.

这篇关于Spark Streaming:从具有多个架构的kafka读取数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆