如何在Apache Beam中提取Google PubSub发布时间 [英] How to extract Google PubSub publish time in Apache Beam
问题描述
我的目标是能够访问由Google PubSub在Apache Beam(数据流)中记录和设置的PubSub消息发布时间.
My goal is to be able to access PubSub message Publish Time as recorded and set by Google PubSub in Apache Beam (Dataflow).
PCollection<PubsubMessage> pubsubMsg
= pipeline.apply("Read Messages From PubSub",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(pocOptions.getInputSubscription()));
似乎没有包含一个作为属性. 我尝试过
Does not seem to contain one as an attribute. I have tried
.withTimestampAttribute("publish_time")
也没有运气.我想念什么?是否可以在数据流中提取Google PubSub发布时间?
No luck either. What am I missing? Is it possible to extract Google PubSub publish time in dataflow?
推荐答案
Java版本:
PubsubIO将从Pub/Sub中读取消息,并将消息发布时间分配给元素作为记录时间戳.因此,您可以使用 ProcessContext.timestamp()
.例如:
PubsubIO will read the message from Pub/Sub and assign the message publish time to the element as the record timestamp. Therefore, you can access it using ProcessContext.timestamp()
. As an example:
p
.apply("Read Messages", PubsubIO.readStrings().fromSubscription(subscription))
.apply("Log Publish Time", ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("Message: " + c.element());
LOG.info("Publish time: " + c.timestamp().toString());
Date date= new Date();
Long time = date.getTime();
LOG.info("Processing time: " + new Instant(time).toString());
}
}));
我提前发布了一条消息(在事件和处理时间之间有显着差异),并且DirectRunner的输出是:
I published a message a little bit ahead (to have a significant difference between event and processing time) and output with DirectRunner was:
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Message: I published this message a little bit before
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Publish time: 2019-03-27T09:57:07.005Z
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Processing time: 2019-03-27T10:03:08.229Z
最小代码此处
Python版本:
现在可以通过process
方法的DoFn.TimestampParam
访问时间戳(
Now the timestamp can be accessed through DoFn.TimestampParam
of the process
method (docs):
class GetTimestampFn(beam.DoFn):
"""Prints element timestamp"""
def process(self, element, timestamp=beam.DoFn.TimestampParam):
timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
logging.info(">>> Element timestamp: %s", timestamp_utc.strftime("%Y-%m-%d %H:%M:%S"))
yield element
注意:感谢此答案.日期解析.
输出:
INFO:root:>>> Element timestamp: 2019-08-12 20:16:53
完整的代码
这篇关于如何在Apache Beam中提取Google PubSub发布时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!