在Apache Beam中创建自定义窗口功能 [英] Creating Custom Windowing Function in Apache Beam
问题描述
我有一个Beam管道,该管道首先读取多个文本文件,其中文件中的每一行代表一行,稍后将在该管道的Bigtable中插入该行.该方案需要确认从每个文件中提取的行数.后来插入Bigtable匹配项的行数.为此,我计划开发一个自定义的Windowing策略,以便将基于文件名的键将单个文件中的行分配给单个窗口,并将其传递给Windowing函数.
I have a Beam pipeline that starts off with reading multiple text files where each line in a file represents a row that gets inserted into Bigtable later in the pipeline. The scenario requires confirming that the count of rows extracted from each file & count of rows later inserted into Bigtable match. For this I am planning to develop a custom Windowing strategy so that lines from a single file get assigned to a single window based on the file name as the key that will be passed to the Windowing function.
是否有用于创建自定义Windowing函数的代码示例?
Is there any code sample for creating custom Windowing functions?
推荐答案
尽管我更改了确认插入行数的策略,但是对于那些对从批处理源读取的窗口元素感兴趣的人来说,例如FileIO
在批处理作业中,以下是用于创建自定义窗口策略的代码:
Although I changed my strategy for confirming the inserted number of rows, for anyone who is interested in windowing elements read from a batch source e.g. FileIO
in a batch job, here's the code for creating a custom windowing strategy:
public class FileWindows extends PartitioningWindowFn<Object, IntervalWindow>{
private static final long serialVersionUID = -476922142925927415L;
private static final Logger LOG = LoggerFactory.getLogger(FileWindows.class);
@Override
public IntervalWindow assignWindow(Instant timestamp) {
Instant end = new Instant(timestamp.getMillis() + 1);
IntervalWindow interval = new IntervalWindow(timestamp, end);
LOG.info("FileWindows >> assignWindow(): Window assigned with Start: {}, End: {}", timestamp, end);
return interval;
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return this.equals(other);
}
@Override
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
if (!this.isCompatible(other)) {
throw new IncompatibleWindowException(other, String.format("Only %s objects are compatible.", FileWindows.class.getSimpleName()));
}
}
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
}
,然后可以在管道中使用它,如下所示:
and then it can be used in the pipeline as below:
p
.apply("Assign_Timestamp_to_Each_Message", ParDo.of(new AssignTimestampFn()))
.apply("Assign_Window_to_Each_Message", Window.<KV<String,String>>into(new FileWindows())
.withAllowedLateness(Duration.standardMinutes(1))
.discardingFiredPanes());
请记住,您需要编写AssignTimestampFn()
,以便每条消息都带有时间戳.
Please keep in mind that you will need to write the AssignTimestampFn()
so that each message carries a timestamp.
这篇关于在Apache Beam中创建自定义窗口功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!