Google Dataflow/Apache Beam Python-PCollection的侧面输入会降低性能 [英] Google Dataflow / Apache Beam Python - Side-Input from PCollection kills performance

查看:84
本文介绍了Google Dataflow/Apache Beam Python-PCollection的侧面输入会降低性能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用Python SDK在Google数据流中运行日志文件解析作业.数据分布在数百条每日日志中,我们通过文件模式从Cloud Storage中读取这些日志.所有文件的数据量约为5-8 GB(gz文件),总共有50-8000万行.

We are running logfile parsing jobs in google dataflow using the Python SDK. Data is spread over several 100s of daily logs, which we read via file-pattern from Cloud Storage. Data volume for all files is about 5-8 GB (gz files) with 50-80 million lines in total.

loglines = p | ReadFromText('gs://logfile-location/logs*-20180101')

此外,我们有一个简单的(小的)映射csv,它将日志文件条目映射到人类可读的文本.大约有400行,大小为5 kb.

In addition, we have a simple (small) mapping csv, that maps logfile-entries to human readable text. Has about 400 lines, 5 kb size.

例如,在最终输出中,带有[param = testing2]的日志文件条目应映射到客户请求的14天免费产品试用版".

For Example a logfile entry with [param=testing2] should be mapped to "Customer requested 14day free product trial" in the final output.

我们在一个简单的Beam.Map中使用sideinput进行此操作,如下所示:

We do this in a simple beam.Map with sideinput, like so:

customerActions = loglines | beam.Map(map_logentries,mappingTable)

其中map_logentries是映射函数,而mappingTable是映射表.

where map_logentries is the mapping function and mappingTable is said mapping table.

但是,仅当我们通过open()/read()读取本机python中的映射表时,此方法才有效.如果我们通过ReadFromText()利用光束管道进行相同的操作,然后将生成的PCollection作为侧面输入传递给Map,就像这样:

However, this only works if we read the mapping table in native python via open() / read(). If we do the same utilising the beam pipeline via ReadFromText() and pass the resulting PCollection as side-input to the Map, like so:

mappingTable = p | ReadFromText('gs://side-inputs/category-mapping.csv')    
customerActions = loglines | beam.Map(map_logentries,beam.pvalue.AsIter(mappingTable))

性能完全分解为每秒大约2-3个项目.

performance breaks down completely to about 2-3 items per Second.

现在,我的问题:

  1. 为什么性能这么差,通过 PCollection作为侧面输入?
  2. 如果不建议使用 PCollections作为侧面输入,应该如何构建,例如 需要可以/不应被硬编码到的映射的管道 映射功能?
  1. Why would performance break so badly, what is wrong with passing a PCollection as side-input?
  2. If it is maybe not recommended to use PCollections as side-input, how is one supposed to build such as pipeline that needs mappings that can/should not be hard coded into the mapping function?

对于我们来说,映射确实会经常更改,因此我需要找到一种让普通"用户提供映射的方法.想法是在Cloud Storage中提供可用的csv映射,并通过ReadFromText()将其简单地合并到管道中.在本地读取它涉及将映射提供给工作人员,因此只有技术团队才能做到这一点.

For us, the mapping does change frequently and I need to find a way to have "normal" users provide it. The idea was to have the mapping csv available in Cloud Storage, and simply incorporate it into the Pipeline via ReadFromText(). Reading it locally involves providing the mapping to the workers, so only the tech-team can do this.

我知道侧面输入存在缓存问题,但是可以肯定的是,这不适用于5kb输入.

I am aware that there are caching issues with side-input, but surely this should not apply to a 5kb input.

上面的所有代码都是伪代码来解释问题.任何对此的想法和想法将不胜感激!

All code above is pseudo code to explain the problem. Any ideas and thoughts on this would be highly appreciated!

推荐答案

对于更有效的侧面输入(中小尺寸),您可以利用 beam.pvalue.AsList(mappingTable) 因为AsList导致Beam实现数据,所以您确定将获得该pcollection的内存列表.

For more efficient side inputs (with small to medium size) you can utilize beam.pvalue.AsList(mappingTable) since AsList causes Beam to materialize the data, so you're sure that you will get in-memory list for that pcollection.

旨在用于副参数规范-相同的地方 使用AsSingleton和AsIter的地方,但强制实现 此PCollection作为列表.

Intended for use in side-argument specification---the same places where AsSingleton and AsIter are used, but forces materialization of this PCollection as a list.

来源: 查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆