数据流自动缩放不会提高性能 [英] Dataflow autoscale does not boost performance

查看:72
本文介绍了数据流自动缩放不会提高性能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在构建一个从pubsub读取数据流并将其请求发送到第三方API的数据流管道.管道使用 THROUGHPUT_BASED 自动缩放.

I'm building a Dataflow pipeline that reads from pubsub and sends requests to a 3rd party API. The pipeline use THROUGHPUT_BASED autoscaling.

但是,当我对其进行负载测试时,它自动缩放到4个工作量以赶上pubsub中的积压工作,但是似乎在工作之间分散了相同的工作负载事件,但是总体吞吐量并未显着增加.

However when I was doing a load test against it, after it autoscaled to 4 works to catch up with the backlog in pubsub, but it seems the same workload was spread out event between works, but overall throughput did not increase significantly.

^ pubsub中未确认​​的消息数.高峰是流量停止进入的时间

^ Number of unacknowledged messages in pubsub. The peak is when traffic stopped going in

^从每个工作人员发送的字节.高峰是最初的工作人员.随着将更多的工作人员添加到池中,工作量将被卸载,而不是每个人都承担更多的工作量.CPU利用率看起来相同,初始工作人员的峰值利用率低于30%.

^ Bytes sent from each worker. The peak is the initial worker. As more workers were added to the pool, the workload is offloaded, instead of each of them picking up more workload. The CPU utilization looks the same, where the peak utilization is below 30% for the initial worker.

^工人的历史产生了.

^ The history of worker spawned.

感觉某处受到限制,但是我很难知道该限制是什么.我每秒提取不到300条消息,每条消息大约1kb.

It feels like either there is a limitation being hit somewhere, but I have a hard time seeing what the limitation is. I was pulling less than 300 messages per second, and each message is about 1kb.

更新:我在使用TextIO的批处理作业和使用PubSubIO的流处理作业之间进行了另一轮比较,两者均使用"n1-standard-8"计算机,并且固定的工作人数为15.批处理的作业速度提高到450个元素/秒,但流式处理作业仍达到230个元素/秒的峰值.限制似乎来自源头.尽管我不确定有什么限制.

Update: I did another round of comparison between batched job using TextIO and streaming job using PubSubIO, both with "n1-standard-8" machines and fixed number of workers to 15. The batched job went up to 450 elements/s, but the streaming job still peaked at 230 elements/s. It seems the limitation is coming from the source. Although I'm not sure what was the limitation.

更新2 这是一个重现此问题的简单代码段.您将需要手动将工程数量设置为1和5,并比较管道处理的元素数量.您将需要一个负载测试器来有效地将消息发布到该主题.

Update 2 Here is a simple code snippet to reproduce the issue. You will need to manually set number of works to 1 and 5 and compare the number of element processed by the pipeline. You will need a load tester to efficiently publish messages to the topic.

package debug;

import java.io.IOException;

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class DebugPipeline {
    @SuppressWarnings("serial")
    public static PipelineResult main(String[] args) throws IOException {

        /*******************************************
         * SETUP - Build options.
         ********************************************/

        DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(DataflowPipelineOptions.class);
        options.setRunner(DataflowRunner.class);
        options.setAutoscalingAlgorithm(
                DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
        // Autoscaling will scale between n/15 and n workers, so from 1-15 here
        options.setMaxNumWorkers(15);
        // Default of 250GB is absurdly high and we don't need that much on every worker
        options.setDiskSizeGb(32);
        // Manually configure scaling (i.e. 1 vs 5 for comparison)
        options.setNumWorkers(5);

        // Debug Pipeline
        Pipeline pipeline = Pipeline.create(options);
        pipeline
            .apply(PubsubIO.readStrings()
                    .fromSubscription("your subscription"))
            // this is the transform that I actually care about. In production code, this will
            // send a REST request to some 3rd party endpoint.
            .apply("sleep", ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws InterruptedException {
                    Thread.sleep(500);
                    c.output(c.element());
                }
            }));

        return pipeline.run();
    }
}

推荐答案

考虑到这一点:

  1. 从PubSubIO切换到TextIO并没有改善.
  2. 从3名工人转变为15名工人并没有改善.
  3. 该批作业的速度高达450个元素/秒,但流达到了230个元素/秒的峰值
  4. 有一个转换将REST请求发送到第三方API,这需要花费数小时的时间.
  5. 在测试中,取消转换将吞吐量从120个元素/秒增加到400个元素/秒.

问题似乎不在于PubSub方面.根据此文档,您可能正在重载第三方API.客户端文档中解释了相同的效果,而不是第三方API:

The issue doesn't seems to lie on PubSub side. According to this documentation you might be overloading the 3rd party API. The same effect is explained in documentation for clients, instead of 3rd party APIs:

一个客户端可能积压了消息,因为它没有能力处理传入的数量消息,但网络上的另一个客户端确实具有该能力.第二个客户端可以减少总体积压,但没有得到因为第一个客户端无法将其消息发送到第二个客户足够快.这降低了正在处理,因为消息卡在了第一个客户端上.

It's possible that one client could have a backlog of messages because it doesn't have the capacity to process the volume of incoming messages, but another client on the network does have that capacity. The second client could reduce the overall backlog, but it doesn't get the chance to because the first client cannot send its messages to the second client quickly enough. This reduces the overall rate of processing because messages get stuck on the first client.

创建积压的消息消耗内存,CPU和带宽资源,因为客户端库继续扩展消息的确认截止日期.

The messages that create a backlog consume memory, CPU, and bandwidth resources because the client library continues to extend the messages' acknowledgment deadline.

[...]

更一般地说,对流控制的需求表明消息是被出版的速度高于被消费的速度.如果这是持久状态,而不是消息量激增,请考虑增加订户客户端实例和计算机的数量.

More generally, the need for flow control indicates that messages are being published at a higher rate than they are being consumed. If this is a persistent state, rather than a spike in message volume, consider increasing the number of subscriber client instances and machines.

如果您只能在PubSub上工作以改善结果,并且您认为实现此目标的方法是延长元素的确认截止时间,则可以通过访问文档,分别关于管理订阅和更改 ackDeadlineSeconds .

If you can only work on PubSub to improve the results and you think that the way to achieve this is extending acknowledgement deadline time for elements, you can test it by accessing here and manually editting the subscription. To do it programmatically using Java, have a look on this and this documentation, about managing subscriptions and changing ackDeadlineSeconds respectively.

这篇关于数据流自动缩放不会提高性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆