Spark Streaming:从Kafka读取JSON并添加event_time [英] Spark Streaming: Read JSON from Kafka and add event_time
本文介绍了Spark Streaming:从Kafka读取JSON并添加event_time的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试编写一个从Kafka读取的有状态Spark结构化流作业.作为要求的一部分,我需要在流中添加"event_time"作为附加列.我正在尝试这样的事情:
I am trying to write a Stateful Spark Structured Streaming job that reads from Kafka. As part of the requirement I need to add 'event_time' to my stream as an additional column. I am trying something like this:
val schema = spark.read.json("sample-data/test.json").schema
val myStream = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "myTopic")
.load()
val df = myStream.select(from_json($"value".cast("string"), schema).alias("value"))
val withEventTime = df.selectExpr("*", "cast (value.arrivalTime as timestamp) as event_time")
但我不断收到消息:
无法解析给定的输入列"arrivalTime":[值]
如何引用JSON中的所有元素?
How do I refer to all the elements in my JSON?
推荐答案
我相信我可以使用以下方法解决此问题:
I believe I was able to solve this using this:
val withEventTime = df.withColumn("event_time",to_timestamp(col("value. arrivalTime")))
不确定为什么会起作用&不是另一个.
Not sure why this worked & not the other one.
这篇关于Spark Streaming:从Kafka读取JSON并添加event_time的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文