如何在 Apache Beam 中提取 Google PubSub 发布时间 [英] How to extract Google PubSub publish time in Apache Beam
问题描述
我的目标是能够访问由 Google PubSub 在 Apache Beam (Dataflow) 中记录和设置的 PubSub 消息发布时间.
My goal is to be able to access PubSub message Publish Time as recorded and set by Google PubSub in Apache Beam (Dataflow).
PCollection<PubsubMessage> pubsubMsg
= pipeline.apply("Read Messages From PubSub",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(pocOptions.getInputSubscription()));
似乎不包含一个作为属性.我试过了
Does not seem to contain one as an attribute. I have tried
.withTimestampAttribute("publish_time")
也不走运.我错过了什么?是否可以在数据流中提取 Google PubSub 发布时间?
No luck either. What am I missing? Is it possible to extract Google PubSub publish time in dataflow?
推荐答案
Java 版本:
PubsubIO 将从 Pub/Sub 读取消息并将消息发布时间分配给元素作为记录时间戳.因此,您可以使用 ProcessContext.timestamp()
.举个例子:
PubsubIO will read the message from Pub/Sub and assign the message publish time to the element as the record timestamp. Therefore, you can access it using ProcessContext.timestamp()
. As an example:
p
.apply("Read Messages", PubsubIO.readStrings().fromSubscription(subscription))
.apply("Log Publish Time", ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("Message: " + c.element());
LOG.info("Publish time: " + c.timestamp().toString());
Date date= new Date();
Long time = date.getTime();
LOG.info("Processing time: " + new Instant(time).toString());
}
}));
我提前发布了一条消息(在事件和处理时间之间有显着差异),并且 DirectRunner 的输出是:
I published a message a little bit ahead (to have a significant difference between event and processing time) and output with DirectRunner was:
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Message: I published this message a little bit before
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Publish time: 2019-03-27T09:57:07.005Z
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Processing time: 2019-03-27T10:03:08.229Z
最小代码这里
Python 版本:
现在可以通过 process
方法的 DoFn.TimestampParam
访问时间戳(文档):
Now the timestamp can be accessed through DoFn.TimestampParam
of the process
method (docs):
class GetTimestampFn(beam.DoFn):
"""Prints element timestamp"""
def process(self, element, timestamp=beam.DoFn.TimestampParam):
timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
logging.info(">>> Element timestamp: %s", timestamp_utc.strftime("%Y-%m-%d %H:%M:%S"))
yield element
注意:日期解析感谢这个答案.
输出:
INFO:root:>>> Element timestamp: 2019-08-12 20:16:53
完整代码
这篇关于如何在 Apache Beam 中提取 Google PubSub 发布时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!