在Apache Beam中创建自定义窗口功能 [英] Creating Custom Windowing Function in Apache Beam

查看:101
本文介绍了在Apache Beam中创建自定义窗口功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Beam管道,该管道首先读取多个文本文件,其中文件中的每一行代表一行,稍后将在该管道的Bigtable中插入该行.该方案需要确认从每个文件中提取的行数.后来插入Bigtable匹配项的行数.为此,我计划开发一个自定义的Windowing策略,以便将基于文件名的键将单个文件中的行分配给单个窗口,并将其传递给Windowing函数.

I have a Beam pipeline that starts off with reading multiple text files where each line in a file represents a row that gets inserted into Bigtable later in the pipeline. The scenario requires confirming that the count of rows extracted from each file & count of rows later inserted into Bigtable match. For this I am planning to develop a custom Windowing strategy so that lines from a single file get assigned to a single window based on the file name as the key that will be passed to the Windowing function.

是否有用于创建自定义Windowing函数的代码示例?

Is there any code sample for creating custom Windowing functions?

推荐答案

尽管我更改了确认插入行数的策略,但是对于那些对从批处理源读取的窗口元素感兴趣的人来说,例如FileIO在批处理作业中,以下是用于创建自定义窗口策略的代码:

Although I changed my strategy for confirming the inserted number of rows, for anyone who is interested in windowing elements read from a batch source e.g. FileIO in a batch job, here's the code for creating a custom windowing strategy:

public class FileWindows extends PartitioningWindowFn<Object, IntervalWindow>{

private static final long serialVersionUID = -476922142925927415L;
private static final Logger LOG = LoggerFactory.getLogger(FileWindows.class);

@Override
public IntervalWindow assignWindow(Instant timestamp) {
    Instant end = new Instant(timestamp.getMillis() + 1);
    IntervalWindow interval = new IntervalWindow(timestamp, end);
    LOG.info("FileWindows >> assignWindow(): Window assigned with Start: {}, End: {}", timestamp, end);
    return interval;
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return this.equals(other);
}

@Override
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
    if (!this.isCompatible(other)) {
        throw new IncompatibleWindowException(other, String.format("Only %s objects are compatible.", FileWindows.class.getSimpleName()));
    }
  }

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}   

}

,然后可以在管道中使用它,如下所示:

and then it can be used in the pipeline as below:

p
 .apply("Assign_Timestamp_to_Each_Message", ParDo.of(new AssignTimestampFn()))
 .apply("Assign_Window_to_Each_Message", Window.<KV<String,String>>into(new FileWindows())
  .withAllowedLateness(Duration.standardMinutes(1))
  .discardingFiredPanes());

请记住,您需要编写AssignTimestampFn(),以便每条消息都带有时间戳.

Please keep in mind that you will need to write the AssignTimestampFn() so that each message carries a timestamp.

这篇关于在Apache Beam中创建自定义窗口功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆