带窗口的 GroupByKey 后,Beam 管道不产生任何输出,并且出现内存错误 [英] Beam pipeline does not produce any output after GroupByKey with windowing and I got memory error

查看:21
本文介绍了带窗口的 GroupByKey 后,Beam 管道不产生任何输出,并且出现内存错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想加载流数据,然后加一个key,然后按key统计.

I want to load stream data, then add a key and then count them by key.

Apache Beam Dataflow pipline 在我尝试使用流方法(无界数据)加载和按键分组大数据时出现内存错误.因为数据似乎是按组累积的,并且不会随着每个窗口的触发而提前触发数据.

Apache Beam Dataflow pipline gets a memory error when i try to load and group-by-key a big-size data using streaming approach (unbounded data) . Because it seems that data is accumulated in group-by and it does not fire data earlier with triggering of each window.

如果我减小元素大小(元素数量不会改变),它会起作用!因为实际上 group-by step 会等待所有数据分组,然后触发所有新的窗口数据.

If I decrease the elements size (elements count will not change) it works! because actually group-by step waits for all the data to be grouped and then fire all the new windowed data.

我对两者都进行了测试:

I tested with both:

beam 版本 2.11.0 和 scio 版本 0.7.4

beam version 2.11.0 and scio version 0.7.4

beam 2.6.0 版和 scio 0.6.1 版

beam version 2.6.0 and scio version 0.6.1

  1. 读取包含文件名的 Pubsub 消息
  2. 从 GCS 读取并加载相关文件作为逐行迭代器
  3. 逐行展平(因此生成大约 10,000 个)元素
  4. 向元素添加时间戳(当前即时时间)
  5. 为我的数据创建一个键值(使用一些从 1 到 10 的随机整数键)
  6. Apply window with triggering(在行小且无内存问题的情况下会触发50次左右)
  7. 按键计数(按键分组然后组合它们)
  8. 最后,我们应该有大约 50 * 10 个元素来表示按窗口和键的计数(当行大小足够小时测试成功)

管道的可视化(步骤 4 到 7):

如您所见,数据是按组累积的,不会发出.

As you can see the data is accumulated in group-by step and does not get emitted.

val windowedData = data.applyKvTransform(
  Window.into[myt](
    Sessions.withGapDuration(Duration.millis(1)))
    .triggering(
      Repeatedly.forever(AfterFirst.of(
        AfterPane.elementCountAtLeast(10),
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))

      ).orFinally(AfterWatermark.pastEndOfWindow())

    ).withAllowedLateness(Duration.standardSeconds(100))
    .discardingFiredPanes()

)

错误:

org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and key 2 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException.causedBy(StreamingDataflowWorker.java:230)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    java.lang.Thread.run(Thread.java:745)

是否有解决内存问题的解决方案,可能是通过强制 group-by 发出每个窗口的早期结果.

Is there any solution to solve the memory problem maybe by forcing group-by to emit early results of each window.

推荐答案

KeyCommitTooLargeException 不是内存问题,而是 protobuf 序列化问题.Protobuf 的对象限制为 2GB(google protobuf 最大大小).Dataflow 发现管道中单个键的值大于 2GB,因此无法对数据进行 shuffle.错误消息表明这可能是由于在没有使用组合的情况下在单个窗口中对大量数据进行分组,或者从单个输入元素生成大量数据造成的."根据您的管道设置(即分配的随机密钥),后者更有可能.

The KeyCommitTooLargeException is not a memory problem but a protobuf serialization problem. Protobuf has a limit of 2GB for an object (google protobuf maximum size). Dataflow found that the value of a single key in the pipeline was larger than 2GB therefore it couldn't shuffle the data. The error message indicates that "This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element." Based on your pipeline setup (i.e., assigned random keys), it is more likely the latter.

管道可能从 GCS 读取了一个大文件 (>2GB) 并将其分配给一个随机密钥.GroupByKey 需要密钥 shuffle 操作,而 Dataflow 由于 protobuf 限制而无法执行,因此卡在该密钥上并保留水印.

The pipeline may have read a large file (>2GB) from GCS and assigned it to a random key. GroupByKey requires a key shuffle operation and Dataflow failed to do due to the protobuf limitation therefore stuck on that key and hold the watermark.

如果单个键的值较大,您可能希望减小值的大小,例如,压缩字符串,或将字符串拆分为多个键,或者首先生成较小的 GCS 文件.

If a single key has large value, you may want to reduce the value size, for example, compress the string, or split the string to multiple keys, or generate smaller GCS file in the first place.

如果较大的值来自多个键的分组,您可能需要增加键空间,以便每个按键操作的组最终将较少的键组合在一起.

If the large value is from grouping of multiple keys, you may want to increase the key space so every group by key operations end up group fewer keys together.

这篇关于带窗口的 GroupByKey 后,Beam 管道不产生任何输出,并且出现内存错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆