apache 光束数据流中的外部 api 调用 [英] external api call in apache beam dataflow

查看:29
本文介绍了apache 光束数据流中的外部 api 调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用例,我读入存储在谷歌云存储中的换行 json 元素并开始处理每个 json.在处理每个 json 时,我必须调用外部 API 来执行重复数据删除,无论该 json 元素之前是否被发现.我在每个 json 上做一个 ParDo 和一个 DoFn .

I have an use case where, I read in the newline json elements stored in google cloud storage and start processing each json. While processing each json, I have to call an external API for doing de-duplication whether that json element was discovered previously. I'm doing a ParDo with a DoFn on each json.

我还没有看到任何在线教程说明如何从 apache beam DoFn Dataflow 调用外部 API 端点.

I haven't seen any online tutorial saying how to call an external API endpoint from apache beam DoFn Dataflow.

我正在使用 Beam 的 JAVA SDK.我学习的一些教程解释说使用 startBundleFinishBundle 但我不清楚如何使用它

I'm using JAVA SDK of Beam. Some of the tutorial I studied explained that using startBundle and FinishBundle but I'm not clear on how to use it

推荐答案

如果您需要为每个 JSON 记录检查外部存储中的重复项,那么您仍然可以使用 DoFn 来实现.有几种注解,如 @Setup@StartBundle@FinishBundle 等,可用于注释 中的方法DoFn.

If you need to check duplicates in external storage for every JSON record, then you still can use DoFn for that. There are several annotations, like @Setup, @StartBundle, @FinishBundle, etc, that can be used to annotate methods in your DoFn.

例如,如果您需要实例化一个客户端对象以向外部数据库发送请求,那么您可能希望在 @Setup 方法(如 POJO 构造函数)中执行此操作,然后利用此客户端@ProcessElement 方法中的对象.

For example, if you need to instantiate a client object to send requests to your external database, then you might want to do this in @Setup method (like POJO constructor) and then leverage this client object in your @ProcessElement method.

让我们考虑一个简单的例子:

Let's consider a simple example:

static class MyDoFn extends DoFn<Record, Record> {

    static transient MyClient client;

    @Setup
    public void setup() {
        client = new MyClient("host");
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // process your records
        Record r = c.element();
        // check record ID for duplicates
        if (!client.isRecordExist(r.id()) {
            c.output(r);
        }
    }

    @Teardown
    public void teardown() {
        if (client != null) {
            client.close();
            client = null;
        }
    }
}

此外,为了避免对每条记录进行远程调用,您可以将捆绑记录批量放入内部缓冲区(将输入数据分束成捆绑包)并以批处理模式检查重复项(如果您的客户支持此功能).为此,您可以使用 @StartBundle@FinishBundle 注释方法,这些方法将在相应地处理 Beam bundle 之前和之后调用.

Also, to avoid doing remote calls for every record, you can batch bundle records into internal buffer (Beam split input data into bundles) and check duplicates in batch mode (if your client support this). For this purpose, you might use @StartBundle and @FinishBundle annotated methods that will be called right before and after processing Beam bundle accordingly.

对于更复杂的示例,我建议查看不同 Beam IO 中的 Sink 实现,例如 KinesisIO.

For more complicated examples, I'd recommend to take a look on a Sink implementations in different Beam IOs, like KinesisIO, for instance.

这篇关于apache 光束数据流中的外部 api 调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆