Apache Beam中DoFn的线程同步 [英] Thread Synchronization for DoFn in Apache Beam

查看:111
本文介绍了Apache Beam中DoFn的线程同步的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个 DoFn ,其中其实例变量 elements (即共享资源)可以在 @ProcessElement 方法中进行更改:

I am writing a DoFn in which its instance variable elements (i.e., a shared resource) can be mutated in the @ProcessElement method:

import java.util.ArrayList;
import java.util.List;

import org.apache.beam.sdk.transforms.DoFn;

public class DemoDoFn extends DoFn<String, Void> {
  private final int batchSize;

  private transient List<String> elements;

  public DemoDoFn(int batchSize) {
    this.batchSize = batchSize;
  }

  @StartBundle
  public void startBundle() {
    elements = new ArrayList<>();
  }

  @ProcessElement
  public void processElement(@Element String element, ProcessContext context) {
    elements.add(element); // <-------- mutated

    if (elements.size() >= batchSize) {
      flushBatch();
    }
  }

  @FinishBundle
  public void finishBundle() {
    flushBatch();
  }

  private void flushBatch() {
    // Flush all elements, e.g., send all elements in a single API call to a server

    // Initialize a new array list for next batch
    elements = new ArrayList<>(); // <-------- mutated
  }
}

问题1:是否需要在 @ProcessElement 方法中添加 synchronized 关键字以避免竞争情况?

Question 1: Do I need to add the synchronized keyword to the @ProcessElement method in order to avoid a race condition?

根据Apache Beam 线程兼容性:除非您显式创建自己的线程,否则每个线程(DoFn)对象的每个实例都可以在工作实例上一次被单个线程访问.但是请注意,Beam SDK并不是线程安全的.如果您创建自己的线程,用户代码中包含自己的线程,则必须提供自己的同步."

According to Apache Beam Thread-compatibility: "Each instance of your function (DoFn) object is accessed by a single thread at a time on a worker instance, unless you explicitly create your own threads. Note, however, that the Beam SDKs are not thread-safe. If you create your own threads in your user code, you must provide your own synchronization."

问题2:是否单个线程一次在工作实例上访问每个函数对象实例"是否表明Beam将在后台同步 @ProcessElement 或整个DoFn?

Question 2: Does "Each instance of your function object is accessed by a single thread at a time on a worker instance" indicate that Beam will synchronize @ProcessElement or the entire DoFn behind the scenes?

IBM论文指出了这一点,我引用了

This IBM paper points out that and I quote

  1. 第三,Beam编程指南保证每个用户定义的函数实例一次只能由一个线程执行.这意味着运行程序必须同步整个函数调用,这可能会导致严重的性能瓶颈."
  2. "Beam向应用程序保证一次将只有一个线程执行其用户定义的功能.因此,如果下划线引擎产生了多个线程,则运行程序必须同步整个DoFn或GroupByKey调用."
  3. 由于Beam禁止多个线程进入同一个PTransform实例,因此引擎失去了使用运算符并行性的机会."
  1. "Third, the Beam programming guide guarantee that each user-defined function instance will only be executed by a single thread at a time. This means that the runner has to synchronize the entire function invocation, which could lead to significant performance bottlenecks."
  2. "Beam promises applications that there will only be a single thread executing their user-defined functions at a time. Therefore, if the underline engine spawns multiple threads, the runner has to synchronize the entire DoFn or GroupByKey invocation."
  3. "As Beam forbids multiple threads from entering the same PTransform instance, engines lose the opportunity to use operator parallelism."

该论文似乎表明整个DoFn调用是同步的.

The paper seems to indicate that the entire DoFn invocation is synchronized.

推荐答案

我知道这是个老问题,但是由于我正在研究同一件事-不,您不需要为processElement进行同步,因为如您所引用:您的函数(DoFn)对象的每个实例都可以通过一个单独的线程在worker实例上进行访问"

I know this is old question but since I was researching the same thing - no, you don't need synchronized for your processElement because as you quoted: "Each instance of your function (DoFn) object is accessed by a single thread at a time on a worker instance"

这里是Beam的官方类的实例,该实例改变了实例变量 https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1369

Here is example of beam's official class that mutates instance variable https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1369

这篇关于Apache Beam中DoFn的线程同步的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆