有状态索引导致ParDo在Dataflow Runner上单线程运行 [英] Stateful indexing causes ParDo to be run single-threaded on Dataflow Runner

查看:51
本文介绍了有状态索引导致ParDo在Dataflow Runner上单线程运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用Beam的Java SDK 2.0.0在ParDo中生成顺序索引.就像Beam的有状态处理简介中的简单有状态索引示例一样我们使用一个ValueState<Integer>单元格,对其唯一的操作是在需要下一个索引时检索值并递增:

We're generating a sequential index in a ParDo using Beam's Java SDK 2.0.0. Just like the simple stateful index example in Beam's introduction to stateful processing we use a ValueState<Integer> cell and our only operation on it is to retrieve the value and increment when we need the next index:

Integer statefulIndex = firstNonNull(index.read(), 0);
index.write(statefulIndex + 1);

与Google的Dataflow运行器一起运行时,我们在Dataflow监视界面上注意到该ParDo的挂墙时间与经过时间同步累积.通过插入工作节点并使用top1查看每个内核的CPU使用率,我们可以确认ParDo是执行单线程的.注释掉有状态处理单元并保持代码不变,相同的ParDo使用我们的n1-standard-32工作节点的所有内核.

When running with Google's Dataflow runner, we noticed on the Dataflow monitoring interface that the wall time for that ParDo was accumulating in sync with elapsed time. We were able to confirm that the ParDo executes single-threaded by ssh'ing in to the worker node and using top and 1 to view the CPU usage per core. Commenting out the stateful processing cell and keeping the code otherwise unchanged, the same ParDo uses all cores of our n1-standard-32 worker node.

即使Dataflow运行器能够基于每个键和窗口对(我们目前只有一个窗口和一个键)并行化状态索引,但缺乏并行性会导致性能显着下降,以致我们无法使用它.这是数据流运行器的预期行为吗?

Even if the Dataflow runner is able to parallelize stateful indexing based on each key and window pair (we currently have one window and one key), the lack of parallelism causes such a significant decrease in performance that we are unable to use it. Is this the expected behavior of the Dataflow runner?

天真的,我希望在后台,Beam的有状态索引将类似于Java的

Naively, I expected that behind the scenes, Beam's stateful indexing would operate similarly to Java's AtomicInteger. Are there constraints that prevent parallel processing with a ValueState<Integer> cell or is this functionality just not yet built into the runner?

推荐答案

这不仅是数据流运行器的预期行为,而且在任何情况下都是逻辑上的必要性.无论是在Beam中使用状态还是在单进程Java程序中使用AtomicInteger都无关紧要:如果操作"A"写入了一个值,而操作"B"读取了该值,则必须在之后执行"B" 一种".关系的通用术语是先发生".

This is not only the expected behavior of the Dataflow runner, but a logical necessity in any context. It doesn't matter if you are using state in Beam or an AtomicInteger in a single-process Java program: if operation "A" writes a value and operation "B" reads the value, then "B" must be executed after "A". The common term for this is relationship is "happens-before".

这种形式的有状态计算与并行计算相反.根据定义,观察到写入的读取具有因果关系.根据定义,两个并行的操作没有因果关系.

This form of stateful computation is the opposite of parallel computation. By definition, a read that observes a write has a causal relationship. By definition, two operations that are in parallel do not have a causal relationship.

现在,您可能期望并行线程能够同时访问状态单元,就像在多线程编程的标准模式下,该模式带有一些并发控制的共享状态一样.对于此示例,如果这些线程实际上是并行的,则将获得重复的索引.退后一步,Beam的目标是透明地分布在大型计算机集群中的大规模令人尴尬的并行"计算.细粒度的并发控制除了很难正确设置外,还不能轻易转换为大规模的分布式计算.

Now, you are perhaps expecting parallel threads that access the state cell concurrently, as in the standard pattern of multi-threaded programming with some shared state with concurrency control. For this example, if these threads were actually parallel, you would get duplicate indices. Taking a step back, Beam targets massive "embarrassingly parallel" computations transparently distributed across a large cluster of machines. Fine-grained concurrency controls, aside from being extremely difficult to get right, do not readily translate to massive distributed computations.

这篇关于有状态索引导致ParDo在Dataflow Runner上单线程运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆