Apache Beam 中的顺序执行 - Java SDK 2.18.0 [英] Sequential Execution in Apache Beam - Java SDK 2.18.0

查看:31
本文介绍了Apache Beam 中的顺序执行 - Java SDK 2.18.0的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有几个查询要运行 &使用 Apache Beam 依次保存结果,我见过一些类似的问题,但找不到答案.我习惯于使用 Airflow 设计管道,而我对 Apache Beam 还是比较陌生.我正在使用 Dataflow 运行程序.这是我现在的代码:我希望 query2 仅在 query1 结果保存到相应表后运行.我如何链接它们?

Hi I have a couple of queries I want to run & save results in sequence one after another using Apache Beam, I've seen some similar questions but couldn't find an answer. I'm used to designing pipelines using Airflow and I'm fairly new to Apache Beam. I'm using the Dataflow runner. Here's my code right now: I would like query2 to run only after query1 results are saved to the corresponding table. How do I chain them?

    PCollection<TableRow> resultsStep1 = getData("Run Query 1",
            "Select * FROM basetable");

    resultsStep1.apply("Save Query1 data",
            BigQueryIO.writeTableRows()
                    .withSchema(BigQueryUtils.toTableSchema(resultsStep1.getSchema()))
                    .to("resultsStep1")
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );

    PCollection<TableRow> resultsStep2 = getData("Run Query 2",
            "Select * FROM resultsStep1");

    resultsStep2.apply("Save Query2 data",
            BigQueryIO.writeTableRows()
                    .withSchema(BigQueryUtils.toTableSchema(resultsStep2.getSchema()))
                    .to("resultsStep2")
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    );

这是我的 getData 函数定义:

And here's my getData function definition:

private PCollection<TableRow> getData(final String taskName, final String query) {
    return pipeline.apply(taskName,
            BigQueryIO.readTableRowsWithSchema()
                    .fromQuery(query)
                    .usingStandardSql()
                    .withCoder(TableRowJsonCoder.of()));
}

编辑(更新):结果:您无法将 BigQuery 写入的完成与管道的其他步骤排序.

我认为这是设计管道的一个很大限制.来源:https://beam.apache.org/documentation/io/built-in/google-bigquery/#limitations

Which I think is a big limitation for designing pipelines. Source: https://beam.apache.org/documentation/io/built-in/google-bigquery/#limitations

推荐答案

您可以使用 Wait 方法来执行此操作.下面是一个人为的例子

You can use the Wait method to do this. A contrived example is below

 PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to first database...));
 data.apply(Wait.on(firstWriteResults))
     // Windows of this intermediate PCollection will be processed no earlier than when
     // the respective window of firstWriteResults closes.
     .apply(ParDo.of(...write to second database...));

您可以在此处提供的 API 文档中找到更多详细信息 - https://beam.apache.org/releases/javadoc/2.17.0/index.html?org/apache/beam/sdk/transforms/Wait.html

You can find more details in the API documentation present here - https://beam.apache.org/releases/javadoc/2.17.0/index.html?org/apache/beam/sdk/transforms/Wait.html

这篇关于Apache Beam 中的顺序执行 - Java SDK 2.18.0的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆