在 Apache Beam 中创建自定义窗口函数 [英] Creating Custom Windowing Function in Apache Beam

查看:26
本文介绍了在 Apache Beam 中创建自定义窗口函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Beam 管道,它从读取多个文本文件开始,其中文件中的每一行代表一行,该行稍后会在管道中插入 Bigtable.该场景需要确认从每个文件中提取的行数后来插入到 Bigtable 匹配中的行数.为此,我计划开发一个自定义窗口策略,以便根据文件名作为将传递给窗口函数的键,将来自单个文件的行分配给单个窗口.

I have a Beam pipeline that starts off with reading multiple text files where each line in a file represents a row that gets inserted into Bigtable later in the pipeline. The scenario requires confirming that the count of rows extracted from each file & count of rows later inserted into Bigtable match. For this I am planning to develop a custom Windowing strategy so that lines from a single file get assigned to a single window based on the file name as the key that will be passed to the Windowing function.

是否有创建自定义窗口函数的代码示例?

Is there any code sample for creating custom Windowing functions?

推荐答案

虽然我改变了确认插入行数的策略,但对于任何对从批处理源读取的窗口元素感兴趣的人来说,例如FileIO 在批处理作业中,这是创建自定义窗口策略的代码:

Although I changed my strategy for confirming the inserted number of rows, for anyone who is interested in windowing elements read from a batch source e.g. FileIO in a batch job, here's the code for creating a custom windowing strategy:

public class FileWindows extends PartitioningWindowFn<Object, IntervalWindow>{

private static final long serialVersionUID = -476922142925927415L;
private static final Logger LOG = LoggerFactory.getLogger(FileWindows.class);

@Override
public IntervalWindow assignWindow(Instant timestamp) {
    Instant end = new Instant(timestamp.getMillis() + 1);
    IntervalWindow interval = new IntervalWindow(timestamp, end);
    LOG.info("FileWindows >> assignWindow(): Window assigned with Start: {}, End: {}", timestamp, end);
    return interval;
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return this.equals(other);
}

@Override
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
    if (!this.isCompatible(other)) {
        throw new IncompatibleWindowException(other, String.format("Only %s objects are compatible.", FileWindows.class.getSimpleName()));
    }
  }

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}   

}

然后它可以在管道中使用,如下所示:

and then it can be used in the pipeline as below:

p
 .apply("Assign_Timestamp_to_Each_Message", ParDo.of(new AssignTimestampFn()))
 .apply("Assign_Window_to_Each_Message", Window.<KV<String,String>>into(new FileWindows())
  .withAllowedLateness(Duration.standardMinutes(1))
  .discardingFiredPanes());

请记住,您需要编写 AssignTimestampFn() 以便每条消息都带有时间戳.

Please keep in mind that you will need to write the AssignTimestampFn() so that each message carries a timestamp.

这篇关于在 Apache Beam 中创建自定义窗口函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆