由于工作人员失去联系,Google Cloud Dataflow无法执行合并功能 [英] Google Cloud Dataflow fails in combine function due to worker losing contact

查看:79
本文介绍了由于工作人员失去联系,Google Cloud Dataflow无法执行合并功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的数据流在我的合并功能中始终失败,除了以下单个条目外,日志中未报告任何错误:

My Dataflow consistently fails in my combine function with no errors reported in the logs beyond a single entry of:

 A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service.

我正在使用Apache Beam Python SDK 2.4.0.我尝试使用CombinePerKey和CombineGlobally执行此步骤.在这两种情况下,管道在合并功能中均失败.当使用少量数据运行时,管道将完成.

I am using the Apache Beam Python SDK 2.4.0. I have tried performing this step with both CombinePerKey and CombineGlobally. The pipeline failed in the combine function in both cases. The pipeline completes when running with a smaller amount of data.

我是在用尽工人资源而不被告知吗?什么会导致工人失去与服务的联系?

Am I exhausting worker resources and not being told about it? What can cause a worker to lose contact with the service?

更新:

使用n1-highmem-4工作人员也会给我带来同样的失败.当我检查Stackdriver时,没有看到任何错误,但是出现了三种警告:No session file foundRefusing to splitProcessing lull.我的输入集合大小表明,它分布在约60 MB的空间中有17,000个元素,但是Stackdriver声明说我在单个工作人员上使用了约25 GB的内存,这正在达到最大.对于此输入,在我的CombineFn中创建的每个累加器应占用大约150 MB的内存.我的管道会创建过多的累加器并耗尽其内存吗?如果是这样,我如何告诉它更频繁地合并累加器或限制创建的累加器数量?

Using n1-highmem-4 workers gives me the same failure. When I check Stackdriver I see no errors, but three kinds of warnings: No session file found, Refusing to split, and Processing lull. My input collection size says it's 17,000 elements spread across ~60 MB, but Stackdriver has a statement saying I'm using ~25 GB on a single worker which is getting towards the max. For this input, each accumulator created in my CombineFn should take roughly 150 MB memory. Is my pipeline creating too many accumulators and exhausting its memory? If so, how can I tell it to merge accumulators more often or limit the number created?

我确实有一个错误日志条目,确认我的工作人员因OOM而被杀死.它只是没有被标记为工作程序错误,这是Dataflow监视器的默认筛选.

I do have an error log entry verifying my worker was killed due to OOM. It just isn't tagged as a worker error which is the default filtering for the Dataflow monitor.

管道定义看起来像:

table1 = (p | "Read Table1" >> beam.io.Read(beam.io.BigQuerySource(query=query))
     | "Key rows" >> beam.Map(lambda row: (row['key'], row)))
table2 = (p | "Read Table2" >> beam.io.Read(beam.io.BigQuerySource(query=query))
     | "Key rows" >> beam.Map(lambda row: (row['key'], row)))

merged = ({"table1": table1, "table2": table2}
     | "Join" >> beam.CoGroupByKey()
     | "Reshape" >> beam.ParDo(ReshapeData())
     | "Key rows" >> beam.Map(lambda row: (row['key'], row)))
     | "Build matrix" >> beam.CombinePerKey(MatrixCombiner())  # Dies here
     | "Write matrix" >> beam.io.avroio.WriteToAvro())

推荐答案

以更少的工作人员运行可以减少累加器并成功完成管道.

Running with fewer workers leads to less accumulators and successful completion of the pipeline.

这篇关于由于工作人员失去联系,Google Cloud Dataflow无法执行合并功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆