Apache Beam数据流中的外部api调用 [英] external api call in apache beam dataflow

查看:156
本文介绍了Apache Beam数据流中的外部api调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用例,其中,我读取了存储在Google云存储中的换行json元素,并开始处理每个json.在处理每个json时,无论是否先前发现该json元素,我都必须调用一个外部API进行重复数据删除.我在每个json上都用DoFnParDo.

I have an use case where, I read in the newline json elements stored in google cloud storage and start processing each json. While processing each json, I have to call an external API for doing de-duplication whether that json element was discovered previously. I'm doing a ParDo with a DoFn on each json.

我还没有看过任何在线教程,没有关于如何从apache beam DoFn Dataflow调用外部API端点的信息.

I haven't seen any online tutorial saying how to call an external API endpoint from apache beam DoFn Dataflow.

我正在使用Beam的JAVA SDK.我研究过的一些教程解释了使用startBundleFinishBundle的方法,但是我不清楚如何使用它

I'm using JAVA SDK of Beam. Some of the tutorial I studied explained that using startBundle and FinishBundle but I'm not clear on how to use it

推荐答案

如果您需要为每个JSON记录检查外部存储中的重复项,则仍然可以使用DoFn.有几个注释,例如@Setup@StartBundle@FinishBundle等,可用于注释DoFn中的方法.

If you need to check duplicates in external storage for every JSON record, then you still can use DoFn for that. There are several annotations, like @Setup, @StartBundle, @FinishBundle, etc, that can be used to annotate methods in your DoFn.

例如,如果您需要实例化一个客户端对象以将请求发送到外部数据库,则您可能想在@Setup方法中执行此操作(如POJO构造函数),然后在@ProcessElement中利用此客户端对象方法.

For example, if you need to instantiate a client object to send requests to your external database, then you might want to do this in @Setup method (like POJO constructor) and then leverage this client object in your @ProcessElement method.

让我们考虑一个简单的例子:

Let's consider a simple example:

static class MyDoFn extends DoFn<Record, Record> {

    static transient MyClient client;

    @Setup
    public void setup() {
        client = new MyClient("host");
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // process your records
        Record r = c.element();
        // check record ID for duplicates
        if (!client.isRecordExist(r.id()) {
            c.output(r);
        }
    }

    @Teardown
    public void teardown() {
        if (client != null) {
            client.close();
            client = null;
        }
    }
}

此外,为避免对每个记录进行远程调用,您可以将束记录分批放入内部缓冲区(将输入数据束分成束),并以批处理方式检查重复项(如果客户端支持).为此,您可以使用@StartBundle@FinishBundle带注释的方法,这些方法将在相应地处理Beam束之前和之后立即调用.

Also, to avoid doing remote calls for every record, you can batch bundle records into internal buffer (Beam split input data into bundles) and check duplicates in batch mode (if your client support this). For this purpose, you might use @StartBundle and @FinishBundle annotated methods that will be called right before and after processing Beam bundle accordingly.

对于更复杂的示例,我建议看一下不同Beam IO中的Sink实现,例如

For more complicated examples, I'd recommend to take a look on a Sink implementations in different Beam IOs, like KinesisIO, for instance.

这篇关于Apache Beam数据流中的外部api调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆