Apache Beam + Dataflow 对于仅 18k 数据来说太慢了 [英] Apache Beam + Dataflow too slow for only 18k data

查看:17
本文介绍了Apache Beam + Dataflow 对于仅 18k 数据来说太慢了的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们需要对简单但大量的数据执行大量计算.
输入数据是 BigQuery 表中的行,两列:ID (Integer) 和 DATA (STRING).DATA 值的形式为1#2#3#4#...".有 36 个值.
输出的数据是一样的形式,只是DATA只是通过算法转换的.
这是一个一对一"转型.

we need to execute heavy calculation on simple but numerous data.
Input data are rows in a BigQuery table, two columns: ID (Integer) and DATA (STRING). The DATA values are of the form "1#2#3#4#..." with 36 values.
Ouput data are the same form, but DATA are just transformed by an algorithm.
It's a "one for one" transformation.

我们已经尝试将 Apache Beam 与 Google Cloud Dataflow 结合使用,但它不起作用,一旦实例化了几个工作器,就会出现错误.
对于我们的 POC,我们仅使用 18k 输入行,目标约为 100 万.

这是该类的轻量版本(我删除了写入部分,行为保持不变):

We have tried Apache Beam with Google Cloud Dataflow, but it does not work, there are errors as soon as several workers are instancied.
For our POC we use only 18k input rows, the target is about 1 million.

Here is a light version of the class (I've removed the write part, the behaviour remains the same):

public class MyClass {

static MyService myService = new MyService();

static class ExtractDataFn extends DoFn<TableRow, KV<Long, String>> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        Long id = Long.parseLong((String) c.element().get("ID"));  
        String data = (String) c.element().get("DATA");         
        c.output(KV.of(id, data));
    }
}

public interface Options extends PipelineOptions {
    String getInput();
    void setInput(String value);

    @Default.Enum("EXPORT")
    TypedRead.Method getReadMethod();
    void setReadMethod(TypedRead.Method value);

    @Validation.Required
    String getOutput();
    void setOutput(String value);
}

static void run(Options options) {
    Pipeline p = Pipeline.create(options);

    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("ID").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("DATA").setType("STRING"));
    TableSchema schema = new TableSchema().setFields(fields);

    PCollection<TableRow> rowsFromBigQuery = p.apply(
            BigQueryIO.readTableRows().from(options.getInput()).withMethod(options.getReadMethod())
    );              
    
    PCollection<KV<Long, String>> inputdata = rowsFromBigQuery.apply(ParDo.of(new ExtractDataFn()));
    PCollection<KV<Long, String>> outputData = applyTransform(inputdata);
    // Here goes the part where data are written in a BQ table
    p.run().waitUntilFinish();
}

static PCollection<KV<Long, String>> applyTransform(PCollection<KV<Long, String>> inputData) {      
    PCollection<KV<Long, String>> forecasts = inputData.apply(ParDo.of(new DoFn<KV<Long, String>, KV<Long, String>> () {
                    
        @ProcessElement
        public void processElement(@Element KV<Long, String> element, OutputReceiver<KV<Long, String>> receiver, ProcessContext c) {
            MyDto dto = new MyDto();
            List<Double> inputData = Arrays.asList(element.getValue().split("#")).stream().map(Double::valueOf).collect(Collectors.toList());
            dto.setInputData(inputData);                
            dto = myService.calculate(dto); // here is the time consuming operation
            String modifiedData = dto.getModifiedData().stream().map(Object::toString).collect(Collectors.joining(","));
            receiver.output(KV.of(element.getKey(), modifiedData));
        }
      }))
    ;
    return forecasts;
}

public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    run(options);
}

}

在 GCP Logs 控制台中我们可以看到 worker 数量增加到 10,在大约 5 分钟内减少到 3 或 4,然后我们有这种消息(数百条),CPU 是大约 0%:

In the GCP Logs console we can see the number of workers increasing up to 10, during about 5 minutes, it decreases to 3 or 4, and then we have this kind of messages (several hundreds of them), and CPU is about 0%:

Proposing dynamic split of work unit myproject;2020-10-06_06_18_27-12689839210406435299;1231063355075246317 at {"fractionConsumed":0.5,"position":{"shufflePosition":"f_8A_wD_AAAB"}}

Operation ongoing in step BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Read for at least 05m00s without outputting or completing in state read-shuffle at app//org.apache.beam.runners.dataflow.worker.ApplianceShuffleReader.readIncludingPosition(Native Method)

如果我们让它运行它会以这种错误结束:

If we let it run it finishes in error of this kind :

Error message from worker: java.lang.RuntimeException: unexpected org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:77)

如果我修改 myService.calculate 方法更快,所有数据都只被一个 worker 处理,没有问题.这个问题似乎只有在并行处理时才会出现.

If I modify the myService.calculate method to be faster, all the data are treated by only one worker and there is no problem. The problem seems to occured only when treatments are parallelized.

感谢您的帮助

推荐答案

解决方案是通过添加允许工作人员之间通信的规则来配置防火墙.

The solution was to configure the firewall by adding a rule allowing communication between workers.

https://cloud.google.com/dataflow/docs/guides/routes-防火墙

这篇关于Apache Beam + Dataflow 对于仅 18k 数据来说太慢了的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆