如何在 Apache Beam 步骤中并行化 HTTP 请求? [英] How to parallelize HTTP requests within an Apache Beam step?

查看:24
本文介绍了如何在 Apache Beam 步骤中并行化 HTTP 请求?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个在 Google Dataflow 上运行的 Apache Beam 管道,其工作相当简单:

I have an Apache Beam pipeline running on Google Dataflow whose job is rather simple:

  • 它从 Pub/Sub 读取单个 JSON 对象
  • 解析它们
  • 并通过 HTTP 将它们发送到某些 API

这个 API 要求我分批发送 75 个项目.所以我构建了一个 DoFn 将事件累积在一个列表中,并在我得到 75 个时通过这个 API 发布它们.结果是太慢了,所以我想不要使用线程池在不同的线程中执行那些 HTTP 请求.

This API requires me to send the items in batches of 75. So I built a DoFn that accumulates events in a list and publish them via this API once they I get 75. This results to be too slow, so I thought instead of executing those HTTP requests in different threads using a thread pool.

我现在所拥有的实现如下:

The implementation of what I have right now looks like this:

private class WriteFn : DoFn<TheEvent, Void>() {
  @Transient var api: TheApi

  @Transient var currentBatch: MutableList<TheEvent>

  @Transient var executor: ExecutorService

  @Setup
  fun setup() {
    api = buildApi()
    executor = Executors.newCachedThreadPool()
  }

  @StartBundle
  fun startBundle() {
    currentBatch = mutableListOf()
  }

  @ProcessElement
  fun processElement(processContext: ProcessContext) {
    val record = processContext.element()

    currentBatch.add(record)

    if (currentBatch.size >= 75) {
      flush()
    }
  }

  private fun flush() {
    val payloadTrack = currentBatch.toList()
    executor.submit {
      api.sendToApi(payloadTrack)
    }
    currentBatch.clear()
  }

  @FinishBundle
  fun finishBundle() {
    if (currentBatch.isNotEmpty()) {
      flush()
    }
  }

  @Teardown
  fun teardown() {
    executor.shutdown()
    executor.awaitTermination(30, TimeUnit.SECONDS)
  }
}

从数据进入 API 的意义上说,这似乎很好".但我不知道这是否是正确的方法,而且我感觉这很慢.

This seems to work "fine" in the sense that data is making it to the API. But I don't know if this is the right approach and I have the sense that this is very slow.

我认为它很慢的原因是在负载测试时(通过向 Pub/Sub 发送几百万个事件),管道将这些消息转发到 API(具有响应不到 8 毫秒的时间)而不是我的笔记本电脑将它们输入到 Pub/Sub 中.

The reason I think it's slow is that when load testing (by sending a few million events to Pub/Sub), it takes it up to 8 times more time for the pipeline to forward those messages to the API (which has response times of under 8ms) than for my laptop to feed them into Pub/Sub.

我的实现有什么问题吗?这是我应该做的吗?

Is there any problem with my implementation? Is this the way I should be doing this?

另外...我是否需要在我的 @FinishBundle 方法中等待所有请求完成(即通过获取执行程序返回的期货并等待它们)?

Also... am I required to wait for all the requests to finish in my @FinishBundle method (i.e. by getting the futures returned by the executor and waiting on them)?

推荐答案

您在这里有两个相互关联的问题:

You have two interrelated questions here:

  1. 你做得对吗/你需要改变什么吗?
  2. 需要在@FinishBundle中等待吗?

第二个答案:是的.但实际上你需要更彻底地冲洗,这将变得清晰.

The second answer: yes. But actually you need to flush more thoroughly, as will become clear.

一旦您的 @FinishBundle 方法成功,Beam runner 将假定包已成功完成.但是您的 @FinishBundle 仅发送请求 - 它并不能确保它们已成功.因此,如果请求随后失败,您可能会以这种方式丢失数据.您的 @FinishBundle 方法实际上应该阻塞并等待 TheApi 的成功确认.顺便说一句,以上所有内容都应该是幂等的,因为在完成捆绑包后,可能会发生地震并导致重试;-)

Once your @FinishBundle method succeeds, a Beam runner will assume the bundle has completed successfully. But your @FinishBundle only sends the requests - it does not ensure they have succeeded. So you could lose data that way if the requests subsequently fail. Your @FinishBundle method should actually be blocking and waiting for confirmation of success from the TheApi. Incidentally, all of the above should be idempotent, since after finishing the bundle, an earthquake could strike and cause a retry ;-)

所以回答第一个问题:你应该改变什么吗?就以上几点.只要您确定在提交包之前提交了结果,以这种方式对请求进行批处理的做法就可以奏效.

So to answer the first question: should you change anything? Just the above. The practice of batching requests this way can work as long as you are sure the results are committed before the bundle is committed.

您可能会发现这样做会导致管道变慢,因为 @FinishBundle@Setup 发生得更频繁.要跨包批量处理请求,您需要使用状态和计时器的低级功能.我在 https://beam.apache.org/blog/2017/08/28/timely-processing.html.我很想知道这对你有什么作用.

You may find that doing so will cause your pipeline to slow down, because @FinishBundle happens more frequently than @Setup. To batch up requests across bundles you need to use the lower-level features of state and timers. I wrote up a contrived version of your use case at https://beam.apache.org/blog/2017/08/28/timely-processing.html. I would be quite interested in how this works for you.

可能只是您期望的在低毫秒范围内的极低延迟在您的管道中存在持久洗牌时不可用.

It may simply be that the extremely low latency you are expecting, in the low millisecond range, is not available when there is a durable shuffle in your pipeline.

这篇关于如何在 Apache Beam 步骤中并行化 HTTP 请求?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆