在带有窗口的GroupByKey之后,光束管道不产生任何输出,并且出现内存错误 [英] Beam pipeline does not produce any output after GroupByKey with windowing and I got memory error

查看:39
本文介绍了在带有窗口的GroupByKey之后,光束管道不产生任何输出,并且出现内存错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想加载流数据,然后添加一个键,然后按键对它们进行计数.

I want to load stream data, then add a key and then count them by key.

当我尝试使用流传输方法(无界数据)加载并按密钥对大型数据进行分组时,Apache Beam Dataflow pipline出现内存错误.因为似乎数据是按组存储的,并且不会在触发每个窗口时更早地触发数据.

Apache Beam Dataflow pipline gets a memory error when i try to load and group-by-key a big-size data using streaming approach (unbounded data) . Because it seems that data is accumulated in group-by and it does not fire data earlier with triggering of each window.

如果我减小元素大小(元素数不会改变),它将起作用!因为实际上分组操作会等待所有数据被分组,然后触发所有新的窗口数据.

If I decrease the elements size (elements count will not change) it works! because actually group-by step waits for all the data to be grouped and then fire all the new windowed data.

我对两者都进行了测试

beam版本2.11.0和scio版本0.7.4

beam version 2.11.0 and scio version 0.7.4

beam版本2.6.0和scio版本0.6.1

beam version 2.6.0 and scio version 0.6.1

  1. 阅读包含文件名的Pubsub消息
  2. 从GCS逐行迭代器读取并加载相关文件
  3. 逐行平整元素(因此产生大约10,000个元素)
  4. 为元素添加时间戳(当前即时时间)
  5. 为我的数据创建键值(带有一些1到10的随机整数键)
  6. 应用具有触发功能的窗口(如果行很小且没有内存问题,它将触发约50次)
  7. 按键计数(按键分组然后组合)
  8. 最后,我们应该有大约50 * 10个元素,分别通过窗口和键来表示计数(当行的大小足够小时,可以成功测试)

管道的可视化(步骤4至7):

如您所见,数据是逐步存储的,不会被散发.

As you can see the data is accumulated in group-by step and does not get emitted.

val windowedData = data.applyKvTransform(
  Window.into[myt](
    Sessions.withGapDuration(Duration.millis(1)))
    .triggering(
      Repeatedly.forever(AfterFirst.of(
        AfterPane.elementCountAtLeast(10),
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))

      ).orFinally(AfterWatermark.pastEndOfWindow())

    ).withAllowedLateness(Duration.standardSeconds(100))
    .discardingFiredPanes()

)

错误:

org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and key 2 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException.causedBy(StreamingDataflowWorker.java:230)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1287)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    java.lang.Thread.run(Thread.java:745)

是否有解决方案,可以通过强制group-by发出每个窗口的早期结果来解决内存问题.

Is there any solution to solve the memory problem maybe by forcing group-by to emit early results of each window.

推荐答案

KeyCommitTooLargeException不是内存问题,而是protobuf序列化问题.Protobuf的对象限制为2GB( google protobuf最大大小).数据流发现,管道中单个键的值大于2GB,因此无法对数据进行混洗.错误消息指示这可能是由于在不使用合并的情况下将单个窗口中的大量数据分组或从单个输入元素产生大量数据引起的."根据您的管道设置(即分配的随机密钥),后者更有可能.

The KeyCommitTooLargeException is not a memory problem but a protobuf serialization problem. Protobuf has a limit of 2GB for an object (google protobuf maximum size). Dataflow found that the value of a single key in the pipeline was larger than 2GB therefore it couldn't shuffle the data. The error message indicates that "This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element." Based on your pipeline setup (i.e., assigned random keys), it is more likely the latter.

管道可能已从GCS读取了一个大文件(> 2GB),并将其分配给了随机密钥.GroupByKey要求进行密钥混洗操作,并且由于protobuf限制而导致Dataflow失败,因此卡在该密钥上并保持水印.

The pipeline may have read a large file (>2GB) from GCS and assigned it to a random key. GroupByKey requires a key shuffle operation and Dataflow failed to do due to the protobuf limitation therefore stuck on that key and hold the watermark.

如果单个键的值较大,则可能需要减小值的大小,例如,压缩字符串,或将字符串拆分为多个键,或者首先生成较小的GCS文件.

If a single key has large value, you may want to reduce the value size, for example, compress the string, or split the string to multiple keys, or generate smaller GCS file in the first place.

如果较大的值来自多个键的分组,则您可能需要增加键空间,以便每个按键操作的组最终将较少的键组合在一起.

If the large value is from grouping of multiple keys, you may want to increase the key space so every group by key operations end up group fewer keys together.

这篇关于在带有窗口的GroupByKey之后,光束管道不产生任何输出,并且出现内存错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆