限制光束应用的一步 [英] Throttling a step in beam application

查看:23
本文介绍了限制光束应用的一步的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 google 数据流上使用 python beam,我的管道如下所示:

<块引用>

从文件中读取图片网址 >> 下载图片 >> 处理图片

问题是我不能让下载图像按需要进行分步扩展,因为我的应用程序可能会被图像服务器阻止.

这是一种可以节流步骤的方法吗?每分钟输入或输出.

谢谢.

解决方案

一种可能,可能很幼稚,是在步骤中引入睡眠.为此,您需要知道可以同时运行的 ParDo 的最大实例数.如果 autoscalingAlgorithm 设置为 NONE,您可以从 numWorkersworkerMachineType (DataflowPipelineOptions) 获取它.准确地说,有效率将除以线程总数:desired_rate/(num_workers*num_threads(per worker)).睡眠时间将是该有效率的倒数:

Integer desired_rate = 1;//QPS 限制if (options.getNumWorkers() == 0) { num_workers = 1;}else { num_workers = options.getNumWorkers();}如果 (options.getWorkerMachineType() != null) {machine_type = options.getWorkerMachineType();num_threads = Integer.parseInt(machine_type.substring(machine_type.lastIndexOf("-") + 1));}否则{ num_threads = 1;}双睡眠时间 = (double)(num_workers * num_threads)/(double)(desired_rate);

然后您可以在节流 Fn 中使用 TimeUnit.SECONDS.sleep(sleep_time.intValue()); 或等效项.在我的例子中,作为一个用例,我想从一个公共文件中读取,解析出空行并以 1 QPS 的最大速率调用自然语言处理 API(我将 desired_rate 初始化为 1以前):

p.apply("读取行", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt")).apply("省略空行", ParDo.of(new OmitEmptyLines())).apply("NLP 请求", ParDo.of(new ThrottledFn())).apply("写行", TextIO.write().to(options.getOutput()));

限速Fn是ThrottledFn,注意sleep函数:

static class ThrottledFn extends DoFn{@ProcessElementpublic void processElement(ProcessContext c) 抛出异常 {//实例化一个客户端试试 (LanguageServiceClient 语言 = LanguageServiceClient.create()) {//要分析的文本String text = c.element();文档文档 = Document.newBuilder().setContent(text).setType(Type.PLAIN_TEXT).build();//检测文本的情感情绪情绪 = language.analyzeSentiment(doc).getDocumentSentiment();String nlp_results = String.format("情绪:分数 %s,幅度 %s",sentiment.getScore(),sentiment.getMagnitude());TimeUnit.SECONDS.sleep(sleep_time.intValue());日志信息(nlp_results);c.输出(nlp_results);}}}

有了这个,我得到了 1 个元素/秒的速率,如下图所示,并且在使用多个工作器时避免达到配额,即使请求没有真正分散(您可能会同时收到 8 个请求,然后是 8 秒睡眠等).).这只是一个测试,可能更好的实现是使用番石榴的

如果管道使用自动缩放 (THROUGHPUT_BASED) 那么它会更复杂并且应该更新工作器的数量(例如,Stackdriver Monitoring 有一个 job/current_num_vcpus 指标).其他一般性考虑是通过使用虚拟 GroupByKey 或使用 splitIntoBundles 拆分源来控制并行 ParDo 的数量.我想看看是否有其他更好的解决方案.

I'm using python beam on google dataflow, my pipeline looks like this:

Read image urls from file >> Download images >> Process images

The problem is that I can't let Download images step scale as much as it needs because my application can get blocked from the image server.

Is it a way that I can throttle the step ? Either on input or output per minute.

Thank you.

解决方案

One possibility, maybe naïve, is to introduce a sleep in the step. For this you need to know the maximum number of instances of the ParDo that can be running at the same time. If autoscalingAlgorithm is set to NONE you can obtain that from numWorkers and workerMachineType (DataflowPipelineOptions). Precisely, the effective rate will be divided by the total number of threads: desired_rate/(num_workers*num_threads(per worker)). The sleep time will be the inverse of that effective rate:

Integer desired_rate = 1; // QPS limit

if (options.getNumWorkers() == 0) { num_workers = 1; }
else { num_workers = options.getNumWorkers(); }

if (options.getWorkerMachineType() != null) { 
    machine_type = options.getWorkerMachineType();
    num_threads = Integer.parseInt(machine_type.substring(machine_type.lastIndexOf("-") + 1));
}
else { num_threads = 1; }

Double sleep_time = (double)(num_workers * num_threads) / (double)(desired_rate);

Then you can use TimeUnit.SECONDS.sleep(sleep_time.intValue()); or equivalent inside the throttled Fn. In my example, as a use case, I wanted to read from a public file, parse out the empty lines and call the Natural Language Processing API with a maximum rate of 1 QPS (I initialized desired_rate to 1 previously):

p
    .apply("Read Lines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt"))
    .apply("Omit Empty Lines", ParDo.of(new OmitEmptyLines()))
    .apply("NLP requests", ParDo.of(new ThrottledFn()))
    .apply("Write Lines", TextIO.write().to(options.getOutput()));

The rate-limited Fn is ThrottledFn, notice the sleep function:

static class ThrottledFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {

        // Instantiates a client
        try (LanguageServiceClient language = LanguageServiceClient.create()) {

          // The text to analyze
          String text = c.element();
          Document doc = Document.newBuilder()
              .setContent(text).setType(Type.PLAIN_TEXT).build();

          // Detects the sentiment of the text
          Sentiment sentiment = language.analyzeSentiment(doc).getDocumentSentiment();                 
          String nlp_results = String.format("Sentiment: score %s, magnitude %s", sentiment.getScore(), sentiment.getMagnitude());

          TimeUnit.SECONDS.sleep(sleep_time.intValue());

          Log.info(nlp_results);
          c.output(nlp_results);
        }
    }
}

With this I get a 1 element/s rate as seen in the image below and avoid hitting quota when using multiple workers, even if requests are not really spread out (you might get 8 simultaneous requests and then 8s sleep, etc.). This was just a test, possibly a better implemention would be using guava's rateLimiter.

If the pipeline is using autoscaling (THROUGHPUT_BASED) then it would be more complicated and the number of workers should be updated (for example, Stackdriver Monitoring has a job/current_num_vcpus metric). Other general considerations would be controlling the number of parallel ParDos by using a dummy GroupByKey or splitting the source with splitIntoBundles, etc. I'd like to see if there are other nicer solutions.

这篇关于限制光束应用的一步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆