Apache Beam + Dataflow对于仅18k数据而言太慢 [英] Apache Beam + Dataflow too slow for only 18k data

查看:167
本文介绍了Apache Beam + Dataflow对于仅18k数据而言太慢的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们需要对简单但大量的数据执行繁重的计算.
输入数据是BigQuery表中的行,分为两列:ID(整数)和DATA(字符串). DATA值的形式为"1#2#3#4#...".具有36个值.
Ouput数据的格式相同,但是DATA只是通过算法进行转换.
这是一对一"的解决方案.转变.

we need to execute heavy calculation on simple but numerous data.
Input data are rows in a BigQuery table, two columns: ID (Integer) and DATA (STRING). The DATA values are of the form "1#2#3#4#..." with 36 values.
Ouput data are the same form, but DATA are just transformed by an algorithm.
It's a "one for one" transformation.

我们已经尝试将Apache Beam与Google Cloud Dataflow一起使用,但是它不起作用,一旦实例化了多个工作程序,就会出现错误.
对于我们的POC,我们仅使用18k输入行,目标约为1百万.

这是该类的简化版本(我已删除了写部分,其行为保持不变):

We have tried Apache Beam with Google Cloud Dataflow, but it does not work, there are errors as soon as several workers are instancied.
For our POC we use only 18k input rows, the target is about 1 million.

Here is a light version of the class (I've removed the write part, the behaviour remains the same):

public class MyClass {

static MyService myService = new MyService();

static class ExtractDataFn extends DoFn<TableRow, KV<Long, String>> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        Long id = Long.parseLong((String) c.element().get("ID"));  
        String data = (String) c.element().get("DATA");         
        c.output(KV.of(id, data));
    }
}

public interface Options extends PipelineOptions {
    String getInput();
    void setInput(String value);

    @Default.Enum("EXPORT")
    TypedRead.Method getReadMethod();
    void setReadMethod(TypedRead.Method value);

    @Validation.Required
    String getOutput();
    void setOutput(String value);
}

static void run(Options options) {
    Pipeline p = Pipeline.create(options);

    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("ID").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("DATA").setType("STRING"));
    TableSchema schema = new TableSchema().setFields(fields);

    PCollection<TableRow> rowsFromBigQuery = p.apply(
            BigQueryIO.readTableRows().from(options.getInput()).withMethod(options.getReadMethod())
    );              
    
    PCollection<KV<Long, String>> inputdata = rowsFromBigQuery.apply(ParDo.of(new ExtractDataFn()));
    PCollection<KV<Long, String>> outputData = applyTransform(inputdata);
    // Here goes the part where data are written in a BQ table
    p.run().waitUntilFinish();
}

static PCollection<KV<Long, String>> applyTransform(PCollection<KV<Long, String>> inputData) {      
    PCollection<KV<Long, String>> forecasts = inputData.apply(ParDo.of(new DoFn<KV<Long, String>, KV<Long, String>> () {
                    
        @ProcessElement
        public void processElement(@Element KV<Long, String> element, OutputReceiver<KV<Long, String>> receiver, ProcessContext c) {
            MyDto dto = new MyDto();
            List<Double> inputData = Arrays.asList(element.getValue().split("#")).stream().map(Double::valueOf).collect(Collectors.toList());
            dto.setInputData(inputData);                
            dto = myService.calculate(dto); // here is the time consuming operation
            String modifiedData = dto.getModifiedData().stream().map(Object::toString).collect(Collectors.joining(","));
            receiver.output(KV.of(element.getKey(), modifiedData));
        }
      }))
    ;
    return forecasts;
}

public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    run(options);
}

}

在GCP日志控制台中,我们可以看到工作程序的数量增加到10个,在大约5分钟内,它减少到3个或4个,然后我们收到了这类消息(数百个),并且CPU大约0%:

In the GCP Logs console we can see the number of workers increasing up to 10, during about 5 minutes, it decreases to 3 or 4, and then we have this kind of messages (several hundreds of them), and CPU is about 0%:

Proposing dynamic split of work unit myproject;2020-10-06_06_18_27-12689839210406435299;1231063355075246317 at {"fractionConsumed":0.5,"position":{"shufflePosition":"f_8A_wD_AAAB"}}

Operation ongoing in step BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Read for at least 05m00s without outputting or completing in state read-shuffle at app//org.apache.beam.runners.dataflow.worker.ApplianceShuffleReader.readIncludingPosition(Native Method)

如果我们让它运行,它将以这种错误完成:

If we let it run it finishes in error of this kind :

Error message from worker: java.lang.RuntimeException: unexpected org.apache.beam.runners.dataflow.worker.util.common.worker.CachingShuffleBatchReader.read(CachingShuffleBatchReader.java:77)

如果我将myService.calculate方法修改为更快,则所有数据仅由一名工作人员处理,没有问题.仅在并行处理时才出现此问题.

If I modify the myService.calculate method to be faster, all the data are treated by only one worker and there is no problem. The problem seems to occured only when treatments are parallelized.

谢谢您的帮助

推荐答案

解决方案是通过添加允许工作人员之间进行通信的规则来配置防火墙.

The solution was to configure the firewall by adding a rule allowing communication between workers.

https://cloud.google.com/dataflow/docs/guides /routes-firewall

这篇关于Apache Beam + Dataflow对于仅18k数据而言太慢的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆