我可以在Apache Beam 2.0+中使用setWorkerCacheMb吗? [英] Can I use setWorkerCacheMb in Apache Beam 2.0+?

查看:68
本文介绍了我可以在Apache Beam 2.0+中使用setWorkerCacheMb吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的Dataflow作业(使用Java SDK 2.1.0)非常慢,要花50天以上的时间将花费超过一天的时间.我只是从BigQuery(50GB)中提取了整个表格,并从GCS(100 + MB)中添加了一个csv文件.

My Dataflow job (using Java SDK 2.1.0) is quite slow and it is going to take more than a day to process just 50GB. I just pull a whole table from BigQuery (50GB), join with one csv file from GCS (100+MB).

https://cloud.google.com/dataflow/model/group -按键
在我认为使用CoGroupByKey效率更高的同时,我使用sideInputs进行联接(以上文档中的后一种方法),但是我不确定这是我的工作非常慢的唯一原因.

https://cloud.google.com/dataflow/model/group-by-key
I use sideInputs to perform join (the latter way in the documentation above) while I think using CoGroupByKey is more efficient, however I'm not sure that is the only reason my job is super slow.

我用谷歌搜索,默认情况下看起来是将侧面输入的缓存设置为100MB,并且我认为我的侧面输入略高于该限制,然后每个工作人员都会不断地重新读取侧面输入.为了改善它,我想我可以使用setWorkerCacheMb方法来增加缓存大小.

I googled and it looks by default, a cache of sideinputs set as 100MB and I assume my one is slightly over that limit then each worker continuously re-reads sideinputs. To improve it, I thought I can use setWorkerCacheMb method to increase the cache size.

但是看起来DataflowPipelineOptions没有此方法,并且DataflowWorkerHarnessOptions已隐藏.只需在-Dexec.args中传递--workerCacheMb=200会导致

However it looks DataflowPipelineOptions does not have this method and DataflowWorkerHarnessOptions is hidden. Just passing --workerCacheMb=200 in -Dexec.args results in

An exception occured while executing the Java class.
null: InvocationTargetException:
Class interface com.xxx.yyy.zzz$MyOptions missing a property
named 'workerCacheMb'. -> [Help 1]

如何使用此选项?谢谢.

How can I use this option? Thanks.

我的管道:

MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

Pipeline p = Pipeline.create(options);

PCollection<TableRow> rows = p.apply("Read from BigQuery",
        BigQueryIO.read().from("project:MYDATA.events"));

// Read account file
PCollection<String> accounts = p.apply("Read from account file",
        TextIO.read().from("gs://my-bucket/accounts.csv")
                .withCompressionType(CompressionType.GZIP));
PCollection<TableRow> accountRows = accounts.apply("Convert to TableRow",
        ParDo.of(new DoFn<String, TableRow>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                String line = c.element();
                CSVParser csvParser = new CSVParser();
                String[] fields = csvParser.parseLine(line);

                TableRow row = new TableRow();
                row = row.set("account_id", fields[0]).set("account_uid", fields[1]);
                c.output(row);
            }
        }));
PCollection<KV<String, TableRow>> kvAccounts = accountRows.apply("Populate account_uid:accounts KV",
        ParDo.of(new DoFn<TableRow, KV<String, TableRow>>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = c.element();
                String uid = (String) row.get("account_uid");
                c.output(KV.of(uid, row));
            }
        }));
final PCollectionView<Map<String, TableRow>> uidAccountView = kvAccounts.apply(View.<String, TableRow>asMap());

// Add account_id from account_uid to event data
PCollection<TableRow> rowsWithAccountID = rows.apply("Join account_id",
        ParDo.of(new DoFn<TableRow, TableRow>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = c.element();

                if (row.containsKey("account_uid") && row.get("account_uid") != null) {
                    String uid = (String) row.get("account_uid");
                    TableRow accRow = (TableRow) c.sideInput(uidAccountView).get(uid);
                    if (accRow == null) {
                        LOG.warn("accRow null, {}", row.toPrettyString());
                    } else {
                        row = row.set("account_id", accRow.get("account_id"));
                    }
                }
                c.output(row);
            }
        }).withSideInputs(uidAccountView));

// Insert into BigQuery
WriteResult result = rowsWithAccountID.apply(BigQueryIO.writeTableRows()
        .to(new TableRefPartition(StaticValueProvider.of("MYDATA"), StaticValueProvider.of("dev"),
                StaticValueProvider.of("deadletter_bucket")))
        .withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
            private static final long serialVersionUID = 1L;

            @Override
            public TableRow apply(TableRow row) {
                return row;
            }
        }).withCreateDisposition(CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(WriteDisposition.WRITE_APPEND));

p.run();

从历史上看,我的系统有两个用户标识,即新用户(account_id)和旧用户(account_uid).现在,我需要追溯性地向存储在BigQuery中的事件数据中添加新的account_id,因为旧数据仅具有旧account_uid.帐户表(在account_uid和account_id之间具有关系)已经转换为csv并存储在GCS中.

Historically my system have two identifiers of users, new one (account_id) and old one(account_uid). Now I need to add new account_id to our event data stored in BigQuery retroactively, because old data only has old account_uid. Accounts table (which has relation between account_uid and account_id) is already converted as csv and stored in GCS.

最后一个函数TableRefPartition只是根据每个事件时间戳将数据存储到BQ的相应分区中.作业仍在运行(2017-10-30_22_45_59-18169851018279768913),瓶颈看起来像是加入account_id部分. 吞吐量的一部分(xxx个元素/秒)根据该图上升和下降.根据该图,sideInputs的估计大小为106MB.

The last func TableRefPartition just store data into BQ's corresponding partition depending on each event timestamp. The job is still running (2017-10-30_22_45_59-18169851018279768913) and bottleneck looks Join account_id part. That part of throughput (xxx elements/s) goes up and down according to the graph. According to the graph, estimated size of sideInputs is 106MB.

如果切换到CoGroupByKey可以显着提高性能,我会这样做.我只是懒惰,以为使用sideInputs可以更轻松地处理没有帐户信息的事件数据.

If switching to CoGroupByKey improves performance dramatically, I will do so. I was just lazy and thought using sideInputs is easier to handle event data which doesn't have account info as well.

推荐答案

您可以采取一些措施来提高代码的性能:

There's a few things you can do to improve the performance of your code:

  • 您的侧面输入是Map<String, TableRow>,但是您仅在TableRow-accRow.get("account_id")中使用单个字段.取而代之的是将account_id本身作为值,使它成为Map<String, String>呢?这可能比笨重的TableRow对象要有效得多.
  • 您可以将side输入的值提取到DoFn中的延迟初始化的成员变量中,以避免重复调用.sideInput().
  • Your side input is a Map<String, TableRow>, but you're using only a single field in the TableRow - accRow.get("account_id"). How about making it a Map<String, String> instead, having the value be the account_id itself? That'll likely be quite a bit more efficient than the bulky TableRow object.
  • You could extract the value of the side input into a lazily initialized member variable in your DoFn, to avoid repeated invocations of .sideInput().

也就是说,这种表现是出乎意料的,我们正在调查是否还有其他情况发生.

That said, this performance is unexpected and we are investigating whether there's something else going on.

这篇关于我可以在Apache Beam 2.0+中使用setWorkerCacheMb吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆