在批处理管道中,如何为来自批处理源的数据分配时间戳,例如 Beam 管道中的 csv 文件 [英] In a batch pipeline how do I assign timestamps to data from the batch sources for example csv files in a Beam pipeline

查看:28
本文介绍了在批处理管道中,如何为来自批处理源的数据分配时间戳,例如 Beam 管道中的 csv 文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在批处理管道中从有界源(一个 csv 文件)读取数据,并希望根据存储为 csv 文件中的列的数据为元素分配时间戳.如何在 Apache Beam 管道中执行此操作?

I am reading data from a bounded source, a csv file, in a batch pipeline and would like to assign a timestamp to the elements based on data stored as a column in the csv file. How do I do this in a Apache Beam pipeline?

推荐答案

如果您的批处理数据源包含每个元素的基于事件的时间戳,例如您有一个包含元组 {'timestamp, ' 的点击事件userid','ClickedSomething'}.您可以将时间戳分配给管道中 DoFn 内的元素.

If your batched source of data contains an event based timestamp per element, for example you have a click event which has the tuple {'timestamp, 'userid','ClickedSomething'}. You can assign the timestamp to the element within a DoFn in your pipeline.

Java:

public void process(ProcessContext c){
     c.outputWithTimestamp(
         c.element(), 
         new Instant(c.element().getTimestamp()));
}

蟒蛇:

'AddEventTimestamps' >> beam.Map(
            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))

class AddTimestampDoFn(beam.DoFn):

  def process(self, element):
    # Extract the numeric Unix seconds-since-epoch timestamp to be
    # associated with the current log entry.
    unix_timestamp = extract_timestamp_from_log_entry(element)
    # Wrap and emit the current entry and new timestamp in a
    # TimestampedValue.
    yield beam.window.TimestampedValue(element, unix_timestamp)

timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())

[根据安东评论编辑]更多信息可以在@

More information can be found @

https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements

这篇关于在批处理管道中,如何为来自批处理源的数据分配时间戳,例如 Beam 管道中的 csv 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆