如何在Apache Beam中提取Google PubSub发布时间 [英] How to extract Google PubSub publish time in Apache Beam

查看:143
本文介绍了如何在Apache Beam中提取Google PubSub发布时间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标是能够访问由Google PubSub在Apache Beam(数据流)中记录和设置的PubSub消息发布时间.

My goal is to be able to access PubSub message Publish Time as recorded and set by Google PubSub in Apache Beam (Dataflow).

    PCollection<PubsubMessage> pubsubMsg
            = pipeline.apply("Read Messages From PubSub",
            PubsubIO.readMessagesWithAttributes()
                .fromSubscription(pocOptions.getInputSubscription()));

似乎没有包含一个作为属性. 我尝试过

Does not seem to contain one as an attribute. I have tried

 .withTimestampAttribute("publish_time")

也没有运气.我想念什么?是否可以在数据流中提取Google PubSub发布时间?

No luck either. What am I missing? Is it possible to extract Google PubSub publish time in dataflow?

推荐答案

Java版本:

PubsubIO将从Pub/Sub中读取消息,并将消息发布时间分配给元素作为记录时间戳.因此,您可以使用 ProcessContext.timestamp() .例如:

PubsubIO will read the message from Pub/Sub and assign the message publish time to the element as the record timestamp. Therefore, you can access it using ProcessContext.timestamp(). As an example:

p
    .apply("Read Messages", PubsubIO.readStrings().fromSubscription(subscription))
    .apply("Log Publish Time", ParDo.of(new DoFn<String, Void>() {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            LOG.info("Message: " + c.element());
            LOG.info("Publish time: " + c.timestamp().toString());
            Date date= new Date();
            Long time = date.getTime();
            LOG.info("Processing time: " + new Instant(time).toString());
        }
    }));

我提前发布了一条消息(在事件和处理时间之间有显着差异),并且DirectRunner的输出是:

I published a message a little bit ahead (to have a significant difference between event and processing time) and output with DirectRunner was:

Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Message: I published this message a little bit before
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Publish time: 2019-03-27T09:57:07.005Z
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Processing time: 2019-03-27T10:03:08.229Z

最小代码此处

Python版本:

现在可以通过process方法的DoFn.TimestampParam访问时间戳(

Now the timestamp can be accessed through DoFn.TimestampParam of the process method (docs):

class GetTimestampFn(beam.DoFn):
  """Prints element timestamp"""
  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
    logging.info(">>> Element timestamp: %s", timestamp_utc.strftime("%Y-%m-%d %H:%M:%S"))
    yield element

注意:感谢此答案.日期解析.

输出:

INFO:root:>>> Element timestamp: 2019-08-12 20:16:53

完整的代码

这篇关于如何在Apache Beam中提取Google PubSub发布时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆