Apache Beam 中 DoFn 的线程同步 [英] Thread Synchronization for DoFn in Apache Beam

查看:35
本文介绍了Apache Beam 中 DoFn 的线程同步的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个 DoFn,其中的实例变量 elements(即共享资源)可以在 @ProcessElement 方法中改变:

I am writing a DoFn in which its instance variable elements (i.e., a shared resource) can be mutated in the @ProcessElement method:

import java.util.ArrayList;
import java.util.List;

import org.apache.beam.sdk.transforms.DoFn;

public class DemoDoFn extends DoFn<String, Void> {
  private final int batchSize;

  private transient List<String> elements;

  public DemoDoFn(int batchSize) {
    this.batchSize = batchSize;
  }

  @StartBundle
  public void startBundle() {
    elements = new ArrayList<>();
  }

  @ProcessElement
  public void processElement(@Element String element, ProcessContext context) {
    elements.add(element); // <-------- mutated

    if (elements.size() >= batchSize) {
      flushBatch();
    }
  }

  @FinishBundle
  public void finishBundle() {
    flushBatch();
  }

  private void flushBatch() {
    // Flush all elements, e.g., send all elements in a single API call to a server

    // Initialize a new array list for next batch
    elements = new ArrayList<>(); // <-------- mutated
  }
}

问题 1:我是否需要在 @ProcessElement 方法中添加 synchronized 关键字以避免竞争条件?

Question 1: Do I need to add the synchronized keyword to the @ProcessElement method in order to avoid a race condition?

根据 Apache Beam 线程兼容性:除非您明确创建自己的线程,否则您的函数 (DoFn) 对象的每个实例一次由单个线程访问,除非您明确创建自己的线程.但是请注意,Beam SDK 不是线程安全的.如果您创建自己的在您的用户代码中拥有自己的线程,您必须提供自己的同步."

According to Apache Beam Thread-compatibility: "Each instance of your function (DoFn) object is accessed by a single thread at a time on a worker instance, unless you explicitly create your own threads. Note, however, that the Beam SDKs are not thread-safe. If you create your own threads in your user code, you must provide your own synchronization."

问题 2:您的函数对象的每个实例都由一个工作实例上的单个线程一次访问"是否表明 Beam 将在幕后同步 @ProcessElement 或整个 DoFn?

Question 2: Does "Each instance of your function object is accessed by a single thread at a time on a worker instance" indicate that Beam will synchronize @ProcessElement or the entire DoFn behind the scenes?

这篇IBM 论文 指出了这一点,我引用了>

This IBM paper points out that and I quote

  1. "第三,Beam 编程指南保证每个用户定义的函数实例一次只会被一个线程执行.这意味着运行器必须同步整个函数调用,这可能会导致严重的性能瓶颈."
  2. Beam 向应用程序承诺,一次只会有一个线程执行其用户定义的函数.因此,如果下划线引擎产生多个线程,则运行程序必须同步整个 DoFn 或 GroupByKey 调用."
  3. 由于 Beam 禁止多个线程进入同一个 PTransform 实例,引擎失去了使用运算符并行性的机会."
  1. "Third, the Beam programming guide guarantee that each user-defined function instance will only be executed by a single thread at a time. This means that the runner has to synchronize the entire function invocation, which could lead to significant performance bottlenecks."
  2. "Beam promises applications that there will only be a single thread executing their user-defined functions at a time. Therefore, if the underline engine spawns multiple threads, the runner has to synchronize the entire DoFn or GroupByKey invocation."
  3. "As Beam forbids multiple threads from entering the same PTransform instance, engines lose the opportunity to use operator parallelism."

论文似乎表明整个 DoFn 调用是同步的.

The paper seems to indicate that the entire DoFn invocation is synchronized.

推荐答案

我知道这是个老问题,但因为我正在研究同样的事情 - 不,你不需要为你的 processElement 同步,因为正如你引用的那样:您的函数 (DoFn) 对象的每个实例一次由一个工作线程实例上的单个线程访问

I know this is old question but since I was researching the same thing - no, you don't need synchronized for your processElement because as you quoted: "Each instance of your function (DoFn) object is accessed by a single thread at a time on a worker instance"

这里是beam的官方类改变实例变量的例子https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/sdks/java/io/elasticsearch/src/main/java/org/apache/beioam/elasticsearch/ElasticsearchIO.java#L1369

Here is example of beam's official class that mutates instance variable https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1369

这篇关于Apache Beam 中 DoFn 的线程同步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆