我可以在 Apache Beam 2.0+ 中使用 setWorkerCacheMb 吗? [英] Can I use setWorkerCacheMb in Apache Beam 2.0+?

查看:16
本文介绍了我可以在 Apache Beam 2.0+ 中使用 setWorkerCacheMb 吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 Dataflow 作业(使用 Java SDK 2.1.0)速度很慢,仅处理 50GB 就需要一天多的时间.我只是从 BigQuery (50GB) 中提取一整张表,加入来自 GCS (100+MB) 的一个 csv 文件.

My Dataflow job (using Java SDK 2.1.0) is quite slow and it is going to take more than a day to process just 50GB. I just pull a whole table from BigQuery (50GB), join with one csv file from GCS (100+MB).

https://cloud.google.com/dataflow/model/group逐键
我使用 sideInputs 来执行 join(上面文档中的后一种方式),而我认为使用 CoGroupByKey 效率更高,但是我不确定这是我的工作超慢的唯一原因.

https://cloud.google.com/dataflow/model/group-by-key
I use sideInputs to perform join (the latter way in the documentation above) while I think using CoGroupByKey is more efficient, however I'm not sure that is the only reason my job is super slow.

我用谷歌搜索了一下,默认情况下,侧输入缓存设置为 100MB,我假设我的缓存略高于该限制,然后每个工作人员不断重新读取侧输入.为了改进它,我想我可以使用 setWorkerCacheMb 方法来增加缓存大小.

I googled and it looks by default, a cache of sideinputs set as 100MB and I assume my one is slightly over that limit then each worker continuously re-reads sideinputs. To improve it, I thought I can use setWorkerCacheMb method to increase the cache size.

但是看起来 DataflowPipelineOptions 没有这个方法并且 DataflowWorkerHarnessOptions 是隐藏的.只需在 -Dexec.args 中传递 --workerCacheMb=200 结果

However it looks DataflowPipelineOptions does not have this method and DataflowWorkerHarnessOptions is hidden. Just passing --workerCacheMb=200 in -Dexec.args results in

An exception occured while executing the Java class.
null: InvocationTargetException:
Class interface com.xxx.yyy.zzz$MyOptions missing a property
named 'workerCacheMb'. -> [Help 1]

我怎样才能使用这个选项?谢谢.

How can I use this option? Thanks.

我的管道:

MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

Pipeline p = Pipeline.create(options);

PCollection<TableRow> rows = p.apply("Read from BigQuery",
        BigQueryIO.read().from("project:MYDATA.events"));

// Read account file
PCollection<String> accounts = p.apply("Read from account file",
        TextIO.read().from("gs://my-bucket/accounts.csv")
                .withCompressionType(CompressionType.GZIP));
PCollection<TableRow> accountRows = accounts.apply("Convert to TableRow",
        ParDo.of(new DoFn<String, TableRow>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                String line = c.element();
                CSVParser csvParser = new CSVParser();
                String[] fields = csvParser.parseLine(line);

                TableRow row = new TableRow();
                row = row.set("account_id", fields[0]).set("account_uid", fields[1]);
                c.output(row);
            }
        }));
PCollection<KV<String, TableRow>> kvAccounts = accountRows.apply("Populate account_uid:accounts KV",
        ParDo.of(new DoFn<TableRow, KV<String, TableRow>>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = c.element();
                String uid = (String) row.get("account_uid");
                c.output(KV.of(uid, row));
            }
        }));
final PCollectionView<Map<String, TableRow>> uidAccountView = kvAccounts.apply(View.<String, TableRow>asMap());

// Add account_id from account_uid to event data
PCollection<TableRow> rowsWithAccountID = rows.apply("Join account_id",
        ParDo.of(new DoFn<TableRow, TableRow>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = c.element();

                if (row.containsKey("account_uid") && row.get("account_uid") != null) {
                    String uid = (String) row.get("account_uid");
                    TableRow accRow = (TableRow) c.sideInput(uidAccountView).get(uid);
                    if (accRow == null) {
                        LOG.warn("accRow null, {}", row.toPrettyString());
                    } else {
                        row = row.set("account_id", accRow.get("account_id"));
                    }
                }
                c.output(row);
            }
        }).withSideInputs(uidAccountView));

// Insert into BigQuery
WriteResult result = rowsWithAccountID.apply(BigQueryIO.writeTableRows()
        .to(new TableRefPartition(StaticValueProvider.of("MYDATA"), StaticValueProvider.of("dev"),
                StaticValueProvider.of("deadletter_bucket")))
        .withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
            private static final long serialVersionUID = 1L;

            @Override
            public TableRow apply(TableRow row) {
                return row;
            }
        }).withCreateDisposition(CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(WriteDisposition.WRITE_APPEND));

p.run();

过去我的系统有两个用户标识符,新的(account_id)和旧的(account_uid).现在我需要向存储在 BigQuery 中的事件数据追溯添加新的 account_id,因为旧数据只有旧的 account_uid.帐户表(在 account_uid 和 account_id 之间有关系)已经转换为 csv 并存储在 GCS 中.

Historically my system have two identifiers of users, new one (account_id) and old one(account_uid). Now I need to add new account_id to our event data stored in BigQuery retroactively, because old data only has old account_uid. Accounts table (which has relation between account_uid and account_id) is already converted as csv and stored in GCS.

最后一个func TableRefPartition 只是根据每个事件时间戳将数据存储到BQ 对应的分区中.作业仍在运行 (2017-10-30_22_45_59-18169851018279768913) 和瓶颈看起来加入 account_id 部分.根据图表,那部分吞吐量(xxx 个元素/秒)上下波动.根据图表,sideInputs 的估计大小为 106MB.

The last func TableRefPartition just store data into BQ's corresponding partition depending on each event timestamp. The job is still running (2017-10-30_22_45_59-18169851018279768913) and bottleneck looks Join account_id part. That part of throughput (xxx elements/s) goes up and down according to the graph. According to the graph, estimated size of sideInputs is 106MB.

如果切换到 CoGroupByKey 可以显着提高性能,我会这样做.我只是懒惰,认为使用 sideInputs 更容易处理没有帐户信息的事件数据.

If switching to CoGroupByKey improves performance dramatically, I will do so. I was just lazy and thought using sideInputs is easier to handle event data which doesn't have account info as well.

推荐答案

您可以采取一些措施来提高代码的性能:

There's a few things you can do to improve the performance of your code:

  • 您的辅助输入是 Map,但您仅使用 TableRow 中的一个字段 - accRow.get("account_id").把它变成一个 Map 怎么样,值是 account_id 本身?这可能比庞大的 TableRow 对象效率更高.
  • 您可以将侧输入的值提取到 DoFn 中延迟初始化的成员变量中,以避免重复调用 .sideInput().
  • Your side input is a Map<String, TableRow>, but you're using only a single field in the TableRow - accRow.get("account_id"). How about making it a Map<String, String> instead, having the value be the account_id itself? That'll likely be quite a bit more efficient than the bulky TableRow object.
  • You could extract the value of the side input into a lazily initialized member variable in your DoFn, to avoid repeated invocations of .sideInput().

也就是说,这种表现出乎意料,我们正在调查是否还有其他事情发生.

That said, this performance is unexpected and we are investigating whether there's something else going on.

这篇关于我可以在 Apache Beam 2.0+ 中使用 setWorkerCacheMb 吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆