Spark Streaming:从Kafka读取JSON并添加event_time [英] Spark Streaming: Read JSON from Kafka and add event_time

查看:417
本文介绍了Spark Streaming:从Kafka读取JSON并添加event_time的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一个从Kafka读取的有状态Spark结构化流作业.作为要求的一部分,我需要在流中添加"event_time"作为附加列.我正在尝试这样的事情:

I am trying to write a Stateful Spark Structured Streaming job that reads from Kafka. As part of the requirement I need to add 'event_time' to my stream as an additional column. I am trying something like this:

val schema = spark.read.json("sample-data/test.json").schema
val myStream = sparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "myTopic")
      .load()
val df = myStream.select(from_json($"value".cast("string"), schema).alias("value"))
val withEventTime = df.selectExpr("*", "cast (value.arrivalTime as timestamp) as event_time")

但我不断收到消息:

无法解析给定的输入列"arrivalTime":[值]

如何引用JSON中的所有元素?

How do I refer to all the elements in my JSON?

推荐答案

我相信我可以使用以下方法解决此问题:

I believe I was able to solve this using this:

val withEventTime = df.withColumn("event_time",to_timestamp(col("value. arrivalTime")))

不确定为什么会起作用&不是另一个.

Not sure why this worked & not the other one.

这篇关于Spark Streaming:从Kafka读取JSON并添加event_time的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆