我可以在Apache Beam 2.0+中使用setWorkerCacheMb吗? [英] Can I use setWorkerCacheMb in Apache Beam 2.0+?
问题描述
我的Dataflow作业(使用Java SDK 2.1.0)非常慢,要花50天以上的时间将花费超过一天的时间.我只是从BigQuery(50GB)中提取了整个表格,并从GCS(100 + MB)中添加了一个csv文件.
My Dataflow job (using Java SDK 2.1.0) is quite slow and it is going to take more than a day to process just 50GB. I just pull a whole table from BigQuery (50GB), join with one csv file from GCS (100+MB).
https://cloud.google.com/dataflow/model/group -按键
在我认为使用CoGroupByKey效率更高的同时,我使用sideInputs进行联接(以上文档中的后一种方法),但是我不确定这是我的工作非常慢的唯一原因.
https://cloud.google.com/dataflow/model/group-by-key
I use sideInputs to perform join (the latter way in the documentation above) while I think using CoGroupByKey is more efficient, however I'm not sure that is the only reason my job is super slow.
我用谷歌搜索,默认情况下看起来是将侧面输入的缓存设置为100MB,并且我认为我的侧面输入略高于该限制,然后每个工作人员都会不断地重新读取侧面输入.为了改善它,我想我可以使用setWorkerCacheMb
方法来增加缓存大小.
I googled and it looks by default, a cache of sideinputs set as 100MB and I assume my one is slightly over that limit then each worker continuously re-reads sideinputs. To improve it, I thought I can use setWorkerCacheMb
method to increase the cache size.
但是看起来DataflowPipelineOptions
没有此方法,并且DataflowWorkerHarnessOptions
已隐藏.只需在-Dexec.args
中传递--workerCacheMb=200
会导致
However it looks DataflowPipelineOptions
does not have this method and DataflowWorkerHarnessOptions
is hidden. Just passing --workerCacheMb=200
in -Dexec.args
results in
An exception occured while executing the Java class.
null: InvocationTargetException:
Class interface com.xxx.yyy.zzz$MyOptions missing a property
named 'workerCacheMb'. -> [Help 1]
如何使用此选项?谢谢.
How can I use this option? Thanks.
我的管道:
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> rows = p.apply("Read from BigQuery",
BigQueryIO.read().from("project:MYDATA.events"));
// Read account file
PCollection<String> accounts = p.apply("Read from account file",
TextIO.read().from("gs://my-bucket/accounts.csv")
.withCompressionType(CompressionType.GZIP));
PCollection<TableRow> accountRows = accounts.apply("Convert to TableRow",
ParDo.of(new DoFn<String, TableRow>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String line = c.element();
CSVParser csvParser = new CSVParser();
String[] fields = csvParser.parseLine(line);
TableRow row = new TableRow();
row = row.set("account_id", fields[0]).set("account_uid", fields[1]);
c.output(row);
}
}));
PCollection<KV<String, TableRow>> kvAccounts = accountRows.apply("Populate account_uid:accounts KV",
ParDo.of(new DoFn<TableRow, KV<String, TableRow>>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = c.element();
String uid = (String) row.get("account_uid");
c.output(KV.of(uid, row));
}
}));
final PCollectionView<Map<String, TableRow>> uidAccountView = kvAccounts.apply(View.<String, TableRow>asMap());
// Add account_id from account_uid to event data
PCollection<TableRow> rowsWithAccountID = rows.apply("Join account_id",
ParDo.of(new DoFn<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = c.element();
if (row.containsKey("account_uid") && row.get("account_uid") != null) {
String uid = (String) row.get("account_uid");
TableRow accRow = (TableRow) c.sideInput(uidAccountView).get(uid);
if (accRow == null) {
LOG.warn("accRow null, {}", row.toPrettyString());
} else {
row = row.set("account_id", accRow.get("account_id"));
}
}
c.output(row);
}
}).withSideInputs(uidAccountView));
// Insert into BigQuery
WriteResult result = rowsWithAccountID.apply(BigQueryIO.writeTableRows()
.to(new TableRefPartition(StaticValueProvider.of("MYDATA"), StaticValueProvider.of("dev"),
StaticValueProvider.of("deadletter_bucket")))
.withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public TableRow apply(TableRow row) {
return row;
}
}).withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
p.run();
从历史上看,我的系统有两个用户标识,即新用户(account_id)和旧用户(account_uid).现在,我需要追溯性地向存储在BigQuery中的事件数据中添加新的account_id,因为旧数据仅具有旧account_uid.帐户表(在account_uid和account_id之间具有关系)已经转换为csv并存储在GCS中.
Historically my system have two identifiers of users, new one (account_id) and old one(account_uid). Now I need to add new account_id to our event data stored in BigQuery retroactively, because old data only has old account_uid. Accounts table (which has relation between account_uid and account_id) is already converted as csv and stored in GCS.
最后一个函数TableRefPartition
只是根据每个事件时间戳将数据存储到BQ的相应分区中.作业仍在运行(2017-10-30_22_45_59-18169851018279768913),瓶颈看起来像是加入account_id部分.
吞吐量的一部分(xxx个元素/秒)根据该图上升和下降.根据该图,sideInputs的估计大小为106MB.
The last func TableRefPartition
just store data into BQ's corresponding partition depending on each event timestamp. The job is still running (2017-10-30_22_45_59-18169851018279768913) and bottleneck looks Join account_id part.
That part of throughput (xxx elements/s) goes up and down according to the graph. According to the graph, estimated size of sideInputs is 106MB.
如果切换到CoGroupByKey可以显着提高性能,我会这样做.我只是懒惰,以为使用sideInputs可以更轻松地处理没有帐户信息的事件数据.
If switching to CoGroupByKey improves performance dramatically, I will do so. I was just lazy and thought using sideInputs is easier to handle event data which doesn't have account info as well.
推荐答案
您可以采取一些措施来提高代码的性能:
There's a few things you can do to improve the performance of your code:
- 您的侧面输入是
Map<String, TableRow>
,但是您仅在TableRow
-accRow.get("account_id")
中使用单个字段.取而代之的是将account_id
本身作为值,使它成为Map<String, String>
呢?这可能比笨重的TableRow
对象要有效得多. - 您可以将side输入的值提取到
DoFn
中的延迟初始化的成员变量中,以避免重复调用.sideInput()
.
- Your side input is a
Map<String, TableRow>
, but you're using only a single field in theTableRow
-accRow.get("account_id")
. How about making it aMap<String, String>
instead, having the value be theaccount_id
itself? That'll likely be quite a bit more efficient than the bulkyTableRow
object. - You could extract the value of the side input into a lazily initialized member variable in your
DoFn
, to avoid repeated invocations of.sideInput()
.
也就是说,这种表现是出乎意料的,我们正在调查是否还有其他情况发生.
That said, this performance is unexpected and we are investigating whether there's something else going on.
这篇关于我可以在Apache Beam 2.0+中使用setWorkerCacheMb吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!