在批处理管道中,如何将时间戳分配给批处理源中的数据,例如Beam管道中的csv文件 [英] In a batch pipeline how do I assign timestamps to data from the batch sources for example csv files in a Beam pipeline
问题描述
我正在批处理管道中从有界源(csv文件)中读取数据,并想根据在csv文件中存储为列的数据为元素分配时间戳.如何在Apache Beam管道中执行此操作?
I am reading data from a bounded source, a csv file, in a batch pipeline and would like to assign a timestamp to the elements based on data stored as a column in the csv file. How do I do this in a Apache Beam pipeline?
推荐答案
如果批量数据源中每个元素都包含基于事件的时间戳,例如,您有一个具有元组{'timestamp, 'userid','ClickedSomething'}
的click事件.您可以将时间戳记分配给管道中DoFn
中的元素.
If your batched source of data contains an event based timestamp per element, for example you have a click event which has the tuple {'timestamp, 'userid','ClickedSomething'}
. You can assign the timestamp to the element within a DoFn
in your pipeline.
Java:
public void process(ProcessContext c){
c.outputWithTimestamp(
c.element(),
new Instant(c.element().getTimestamp()));
}
Python:
'AddEventTimestamps' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
[从Beam指南编辑非lambda Python示例:]
class AddTimestampDoFn(beam.DoFn):
def process(self, element):
# Extract the numeric Unix seconds-since-epoch timestamp to be
# associated with the current log entry.
unix_timestamp = extract_timestamp_from_log_entry(element)
# Wrap and emit the current entry and new timestamp in a
# TimestampedValue.
yield beam.window.TimestampedValue(element, unix_timestamp)
timestamped_items = items | 'timestamp' >> beam.ParDo(AddTimestampDoFn())
[根据安东评论进行编辑] 有关更多信息,请参见@
More information can be found @
这篇关于在批处理管道中,如何将时间戳分配给批处理源中的数据,例如Beam管道中的csv文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!