节流梁应用中的一个步骤 [英] Throttling a step in beam application

查看:71
本文介绍了节流梁应用中的一个步骤的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在google dataflow上使用python beam,我的管道如下所示:

I'm using python beam on google dataflow, my pipeline looks like this:

从文件中读取图像URL >>下载图像>>处理图像

Read image urls from file >> Download images >> Process images

问题是我不能让下载图像逐步扩展,因为我的应用程序可能会被图像服务器阻止.

The problem is that I can't let Download images step scale as much as it needs because my application can get blocked from the image server.

这是我可以限制步伐的一种方法吗?每分钟输入或输出.

Is it a way that I can throttle the step ? Either on input or output per minute.

谢谢.

推荐答案

一种可能是幼稚的方法是在步骤中引入睡眠.为此,您需要知道可以同时运行的ParDo实例的最大数量.如果将autoscalingAlgorithm设置为NONE,则可以从numWorkersworkerMachineType(DataflowPipelineOptions)获得该值.准确地,有效率将除以线程总数:desired_rate/(num_workers*num_threads(per worker)).睡眠时间将是有效率的倒数:

One possibility, maybe naïve, is to introduce a sleep in the step. For this you need to know the maximum number of instances of the ParDo that can be running at the same time. If autoscalingAlgorithm is set to NONE you can obtain that from numWorkers and workerMachineType (DataflowPipelineOptions). Precisely, the effective rate will be divided by the total number of threads: desired_rate/(num_workers*num_threads(per worker)). The sleep time will be the inverse of that effective rate:

Integer desired_rate = 1; // QPS limit

if (options.getNumWorkers() == 0) { num_workers = 1; }
else { num_workers = options.getNumWorkers(); }

if (options.getWorkerMachineType() != null) { 
    machine_type = options.getWorkerMachineType();
    num_threads = Integer.parseInt(machine_type.substring(machine_type.lastIndexOf("-") + 1));
}
else { num_threads = 1; }

Double sleep_time = (double)(num_workers * num_threads) / (double)(desired_rate);

然后,您可以在节流的Fn中使用TimeUnit.SECONDS.sleep(sleep_time.intValue());或同等功能.在我的示例中,作为一个用例,我想从一个公共文件中读取内容,解析出空行并以最大1 QPS的速率调用自然语言处理API(我之前将desired_rate初始化为1):

Then you can use TimeUnit.SECONDS.sleep(sleep_time.intValue()); or equivalent inside the throttled Fn. In my example, as a use case, I wanted to read from a public file, parse out the empty lines and call the Natural Language Processing API with a maximum rate of 1 QPS (I initialized desired_rate to 1 previously):

p
    .apply("Read Lines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt"))
    .apply("Omit Empty Lines", ParDo.of(new OmitEmptyLines()))
    .apply("NLP requests", ParDo.of(new ThrottledFn()))
    .apply("Write Lines", TextIO.write().to(options.getOutput()));

速率限制Fn为ThrottledFn,请注意sleep函数:

The rate-limited Fn is ThrottledFn, notice the sleep function:

static class ThrottledFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {

        // Instantiates a client
        try (LanguageServiceClient language = LanguageServiceClient.create()) {

          // The text to analyze
          String text = c.element();
          Document doc = Document.newBuilder()
              .setContent(text).setType(Type.PLAIN_TEXT).build();

          // Detects the sentiment of the text
          Sentiment sentiment = language.analyzeSentiment(doc).getDocumentSentiment();                 
          String nlp_results = String.format("Sentiment: score %s, magnitude %s", sentiment.getScore(), sentiment.getMagnitude());

          TimeUnit.SECONDS.sleep(sleep_time.intValue());

          Log.info(nlp_results);
          c.output(nlp_results);
        }
    }
}

这样,如下图所示,我获得了1个元素/s的速率,并且即使使用了多个工作程序,也避免了配额不足的情况(即使请求没有真正散开(您可能会同时收到8个请求,然后睡眠8s,依此类推). ).这只是一个测试,可能更好的实现是使用guava的

With this I get a 1 element/s rate as seen in the image below and avoid hitting quota when using multiple workers, even if requests are not really spread out (you might get 8 simultaneous requests and then 8s sleep, etc.). This was just a test, possibly a better implemention would be using guava's rateLimiter.

如果管道正在使用自动缩放(THROUGHPUT_BASED),那么它将更加复杂,并且应该更新工作程序的数量(例如,Stackdriver Monitoring具有

If the pipeline is using autoscaling (THROUGHPUT_BASED) then it would be more complicated and the number of workers should be updated (for example, Stackdriver Monitoring has a job/current_num_vcpus metric). Other general considerations would be controlling the number of parallel ParDos by using a dummy GroupByKey or splitting the source with splitIntoBundles, etc. I'd like to see if there are other nicer solutions.

这篇关于节流梁应用中的一个步骤的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆