我可以在 Apache Beam 2.0+ 中使用 setWorkerCacheMb 吗? [英] Can I use setWorkerCacheMb in Apache Beam 2.0+?
问题描述
我的 Dataflow 作业(使用 Java SDK 2.1.0)速度很慢,仅处理 50GB 就需要一天多的时间.我只是从 BigQuery (50GB) 中提取一整张表,加入来自 GCS (100+MB) 的一个 csv 文件.
My Dataflow job (using Java SDK 2.1.0) is quite slow and it is going to take more than a day to process just 50GB. I just pull a whole table from BigQuery (50GB), join with one csv file from GCS (100+MB).
https://cloud.google.com/dataflow/model/group逐键
我使用 sideInputs 来执行 join(上面文档中的后一种方式),而我认为使用 CoGroupByKey 效率更高,但是我不确定这是我的工作超慢的唯一原因.
https://cloud.google.com/dataflow/model/group-by-key
I use sideInputs to perform join (the latter way in the documentation above) while I think using CoGroupByKey is more efficient, however I'm not sure that is the only reason my job is super slow.
我用谷歌搜索了一下,默认情况下,侧输入缓存设置为 100MB,我假设我的缓存略高于该限制,然后每个工作人员不断重新读取侧输入.为了改进它,我想我可以使用 setWorkerCacheMb
方法来增加缓存大小.
I googled and it looks by default, a cache of sideinputs set as 100MB and I assume my one is slightly over that limit then each worker continuously re-reads sideinputs. To improve it, I thought I can use setWorkerCacheMb
method to increase the cache size.
但是看起来 DataflowPipelineOptions
没有这个方法并且 DataflowWorkerHarnessOptions
是隐藏的.只需在 -Dexec.args
中传递 --workerCacheMb=200
结果
However it looks DataflowPipelineOptions
does not have this method and DataflowWorkerHarnessOptions
is hidden. Just passing --workerCacheMb=200
in -Dexec.args
results in
An exception occured while executing the Java class.
null: InvocationTargetException:
Class interface com.xxx.yyy.zzz$MyOptions missing a property
named 'workerCacheMb'. -> [Help 1]
我怎样才能使用这个选项?谢谢.
How can I use this option? Thanks.
我的管道:
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> rows = p.apply("Read from BigQuery",
BigQueryIO.read().from("project:MYDATA.events"));
// Read account file
PCollection<String> accounts = p.apply("Read from account file",
TextIO.read().from("gs://my-bucket/accounts.csv")
.withCompressionType(CompressionType.GZIP));
PCollection<TableRow> accountRows = accounts.apply("Convert to TableRow",
ParDo.of(new DoFn<String, TableRow>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String line = c.element();
CSVParser csvParser = new CSVParser();
String[] fields = csvParser.parseLine(line);
TableRow row = new TableRow();
row = row.set("account_id", fields[0]).set("account_uid", fields[1]);
c.output(row);
}
}));
PCollection<KV<String, TableRow>> kvAccounts = accountRows.apply("Populate account_uid:accounts KV",
ParDo.of(new DoFn<TableRow, KV<String, TableRow>>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = c.element();
String uid = (String) row.get("account_uid");
c.output(KV.of(uid, row));
}
}));
final PCollectionView<Map<String, TableRow>> uidAccountView = kvAccounts.apply(View.<String, TableRow>asMap());
// Add account_id from account_uid to event data
PCollection<TableRow> rowsWithAccountID = rows.apply("Join account_id",
ParDo.of(new DoFn<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = c.element();
if (row.containsKey("account_uid") && row.get("account_uid") != null) {
String uid = (String) row.get("account_uid");
TableRow accRow = (TableRow) c.sideInput(uidAccountView).get(uid);
if (accRow == null) {
LOG.warn("accRow null, {}", row.toPrettyString());
} else {
row = row.set("account_id", accRow.get("account_id"));
}
}
c.output(row);
}
}).withSideInputs(uidAccountView));
// Insert into BigQuery
WriteResult result = rowsWithAccountID.apply(BigQueryIO.writeTableRows()
.to(new TableRefPartition(StaticValueProvider.of("MYDATA"), StaticValueProvider.of("dev"),
StaticValueProvider.of("deadletter_bucket")))
.withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public TableRow apply(TableRow row) {
return row;
}
}).withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
p.run();
过去我的系统有两个用户标识符,新的(account_id)和旧的(account_uid).现在我需要向存储在 BigQuery 中的事件数据追溯添加新的 account_id,因为旧数据只有旧的 account_uid.帐户表(在 account_uid 和 account_id 之间有关系)已经转换为 csv 并存储在 GCS 中.
Historically my system have two identifiers of users, new one (account_id) and old one(account_uid). Now I need to add new account_id to our event data stored in BigQuery retroactively, because old data only has old account_uid. Accounts table (which has relation between account_uid and account_id) is already converted as csv and stored in GCS.
最后一个func TableRefPartition
只是根据每个事件时间戳将数据存储到BQ 对应的分区中.作业仍在运行 (2017-10-30_22_45_59-18169851018279768913) 和瓶颈看起来加入 account_id 部分.根据图表,那部分吞吐量(xxx 个元素/秒)上下波动.根据图表,sideInputs 的估计大小为 106MB.
The last func TableRefPartition
just store data into BQ's corresponding partition depending on each event timestamp. The job is still running (2017-10-30_22_45_59-18169851018279768913) and bottleneck looks Join account_id part.
That part of throughput (xxx elements/s) goes up and down according to the graph. According to the graph, estimated size of sideInputs is 106MB.
如果切换到 CoGroupByKey 可以显着提高性能,我会这样做.我只是懒惰,认为使用 sideInputs 更容易处理没有帐户信息的事件数据.
If switching to CoGroupByKey improves performance dramatically, I will do so. I was just lazy and thought using sideInputs is easier to handle event data which doesn't have account info as well.
推荐答案
您可以采取一些措施来提高代码的性能:
There's a few things you can do to improve the performance of your code:
- 您的辅助输入是
Map
,但您仅使用TableRow
中的一个字段 -accRow.get("account_id")
.把它变成一个Map
怎么样,值是account_id
本身?这可能比庞大的TableRow
对象效率更高. - 您可以将侧输入的值提取到
DoFn
中延迟初始化的成员变量中,以避免重复调用.sideInput()
.
- Your side input is a
Map<String, TableRow>
, but you're using only a single field in theTableRow
-accRow.get("account_id")
. How about making it aMap<String, String>
instead, having the value be theaccount_id
itself? That'll likely be quite a bit more efficient than the bulkyTableRow
object. - You could extract the value of the side input into a lazily initialized member variable in your
DoFn
, to avoid repeated invocations of.sideInput()
.
也就是说,这种表现出乎意料,我们正在调查是否还有其他事情发生.
That said, this performance is unexpected and we are investigating whether there's something else going on.
这篇关于我可以在 Apache Beam 2.0+ 中使用 setWorkerCacheMb 吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!