数据流自动缩放不会提高性能 [英] Dataflow autoscale does not boost performance

查看:32
本文介绍了数据流自动缩放不会提高性能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在构建一个 Dataflow 管道,该管道从 pubsub 读取并将请求发送到 3rd 方 API.管道使用 THROUGHPUT_BASED 自动缩放.

I'm building a Dataflow pipeline that reads from pubsub and sends requests to a 3rd party API. The pipeline use THROUGHPUT_BASED autoscaling.

然而,当我对它进行负载测试时,在它自动扩展到 4 个工作以赶上 pubsub 中的积压后,但似乎相同的工作负载在工作之间分散了事件,但总体吞吐量并没有显着增加.

However when I was doing a load test against it, after it autoscaled to 4 works to catch up with the backlog in pubsub, but it seems the same workload was spread out event between works, but overall throughput did not increase significantly.

^ pubsub 中未确认消息的数量.高峰期是交通停止进入时

^ Number of unacknowledged messages in pubsub. The peak is when traffic stopped going in

^ 每个工人发送的字节数.高峰是初始工人.随着更多的工作人员被添加到池中,工作量会被卸载,而不是每个人都承担更多的工作量.CPU 利用率看起来相同,初始工作器的峰值利用率低于 30%.

^ Bytes sent from each worker. The peak is the initial worker. As more workers were added to the pool, the workload is offloaded, instead of each of them picking up more workload. The CPU utilization looks the same, where the peak utilization is below 30% for the initial worker.

^ 工人产生的历史.

感觉好像在某个地方遇到了限制,但我很难看出限制是什么.我每秒拉不到 300 条消息,每条消息大约 1kb.

It feels like either there is a limitation being hit somewhere, but I have a hard time seeing what the limitation is. I was pulling less than 300 messages per second, and each message is about 1kb.

更新:我对使用 TextIO 的批处理作业和使用 PubSubIO 的流作业进行了另一轮比较,两者都使用n1-standard-8"机器和固定数量的工人为 15.批处理作业达到 450 个元素/秒,但流作业仍达到 230 元素/秒的峰值.似乎限制来自源头.虽然我不确定是什么限制.

Update: I did another round of comparison between batched job using TextIO and streaming job using PubSubIO, both with "n1-standard-8" machines and fixed number of workers to 15. The batched job went up to 450 elements/s, but the streaming job still peaked at 230 elements/s. It seems the limitation is coming from the source. Although I'm not sure what was the limitation.

更新 2这是一个简单的代码片段来重现该问题.您需要手动将作品数量设置为 1 和 5,并比较管道处理的元素数量.您将需要一个负载测试器来有效地向主题发布消息.

Update 2 Here is a simple code snippet to reproduce the issue. You will need to manually set number of works to 1 and 5 and compare the number of element processed by the pipeline. You will need a load tester to efficiently publish messages to the topic.

package debug;

import java.io.IOException;

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class DebugPipeline {
    @SuppressWarnings("serial")
    public static PipelineResult main(String[] args) throws IOException {

        /*******************************************
         * SETUP - Build options.
         ********************************************/

        DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(DataflowPipelineOptions.class);
        options.setRunner(DataflowRunner.class);
        options.setAutoscalingAlgorithm(
                DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
        // Autoscaling will scale between n/15 and n workers, so from 1-15 here
        options.setMaxNumWorkers(15);
        // Default of 250GB is absurdly high and we don't need that much on every worker
        options.setDiskSizeGb(32);
        // Manually configure scaling (i.e. 1 vs 5 for comparison)
        options.setNumWorkers(5);

        // Debug Pipeline
        Pipeline pipeline = Pipeline.create(options);
        pipeline
            .apply(PubsubIO.readStrings()
                    .fromSubscription("your subscription"))
            // this is the transform that I actually care about. In production code, this will
            // send a REST request to some 3rd party endpoint.
            .apply("sleep", ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws InterruptedException {
                    Thread.sleep(500);
                    c.output(c.element());
                }
            }));

        return pipeline.run();
    }
}

推荐答案

考虑到:

  1. 从 PubSubIO 切换到 TextIO 没有显示出任何改进.
  2. 从 3 名员工更改为 15 名员工并没有改善.
  3. 批处理作业的速度达到了 450elements/s,但流式处理达到了 230elements/s 的峰值
  4. 有一个将 REST 请求发送到第 3 方 API 的转换,需要花费数小时的时间.
  5. 在测试中,取消转换会将吞吐量从 120elements/s 提高到 400elements/s.

问题似乎不在于 PubSub 方面.根据本文档,您可能会重载第 3 方 API.客户端文档中解释了相同的效果,而不是第 3 方 API:

The issue doesn't seems to lie on PubSub side. According to this documentation you might be overloading the 3rd party API. The same effect is explained in documentation for clients, instead of 3rd party APIs:

一个客户可能有大量消息,因为它没有能力处理传入的数量消息,但网络上的另一个客户端确实具有该容量.第二个客户可以减少整体积压,但它没有得到有机会因为第一个客户端无法将其消息发送到第二个客户端足够快.这降低了总体比率处理,因为消息卡在第一个客户端上.

It's possible that one client could have a backlog of messages because it doesn't have the capacity to process the volume of incoming messages, but another client on the network does have that capacity. The second client could reduce the overall backlog, but it doesn't get the chance to because the first client cannot send its messages to the second client quickly enough. This reduces the overall rate of processing because messages get stuck on the first client.

创建积压的消息消耗内存、CPU 和带宽资源,因为客户端库继续扩展消息'确认截止日期.

The messages that create a backlog consume memory, CPU, and bandwidth resources because the client library continues to extend the messages' acknowledgment deadline.

[...]

更一般地说,流量控制的需要表明消息是以高于它们被消费的速度发布.如果这是一种持久状态,而不是消息量的峰值,请考虑增加订阅者客户端实例和机器的数量.

More generally, the need for flow control indicates that messages are being published at a higher rate than they are being consumed. If this is a persistent state, rather than a spike in message volume, consider increasing the number of subscriber client instances and machines.

如果您只能在 PubSub 上工作以改进结果,并且您认为实现这一目标的方法是延长元素的确认截止时间,您可以通过访问 此处 并手动编辑订阅.要使用 Java 以编程方式执行此操作,请查看 this这个 文档,分别关于管理订阅和更改 ackDeadlineSeconds.

If you can only work on PubSub to improve the results and you think that the way to achieve this is extending acknowledgement deadline time for elements, you can test it by accessing here and manually editting the subscription. To do it programmatically using Java, have a look on this and this documentation, about managing subscriptions and changing ackDeadlineSeconds respectively.

这篇关于数据流自动缩放不会提高性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆