apache_beam.transforms.util.Reshuffle()不适用于GCP数据流 [英] apache_beam.transforms.util.Reshuffle() not available for GCP Dataflow

查看:185
本文介绍了apache_beam.transforms.util.Reshuffle()不适用于GCP数据流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经通过 pip install --upgrade apache_beam [gcp] 升级到最新的apache_beam [gcp]包。不过,我注意到 Reshuffle() 不会出现在 [gcp] 分配中。这是否意味着我将无法在任何数据流管道中使用 Reshuffle()?有没有办法解决?或者是否有可能pip包不是最新的,如果在github上使用了reshuffle(),那么它将在数据流中可用?



基于回应此问题我试图从BigQuery读取数据,然后随机化数据我将它写入到GCP存储桶中的CSV中。我注意到我用来训练我的GCMLE模型的分片.csv并不是真正的随机数。在tensorflow中,我可以对批处理进行随机化处理,但这只会随机化队列中每个文件中的行,而我的问题是当前生成的文件在某些​​方面存在偏差。如果在写入CSV数据流之前有任何其他方法可以进行随机洗牌,我们将非常感激。

解决方案

一种方法是重新创建自我洗牌。

  import random 

shuffled_data =(unshuffled_pcoll
|'AddRandomKeys'>> Map(lambda t:(random.getrandbits(32),t))
|'GroupByKey'>> GroupByKey()
|'RemoveRandomKeys'>> FlatMap(lambda t:t [1]))
pre>

我剩下的问题是如果我需要担心窗口或展开可用节代码


I have upgraded to the latest apache_beam[gcp] package via pip install --upgrade apache_beam[gcp]. However, I noticed that Reshuffle() does not appear in the [gcp] distribution. Does this mean that I will not be able to use Reshuffle() in any dataflow pipelines? Is there any way around this? Or is it possible that the pip package is just not up to date and if Reshuffle() is in master on github then it will be available on dataflow?

Based on the response to this question I am trying to read data from BigQuery and then randomize the data before I write it to CSV's in a GCP storage bucket. I have noticed that my sharded .csv's that I am using to train my GCMLE model are not truly random. Within tensorflow I can randomize the batches, but that will only randomize the rows within each file that is built up in the queue and my issue is that currently the files being generated are biased in some way. If there are any suggestions for other ways to shuffle right before writing to CSV in dataflow that would be much appreciated.

解决方案

One approach is to recreate shuffle myself.

import random

shuffled_data = (unshuffled_pcoll
        | 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
        | 'GroupByKey' >> GroupByKey()
        | 'RemoveRandomKeys' >> FlatMap(lambda t: t[1]))

My remaining question would be if I need to worry about the windowing or ExpandIterable sections from the code

这篇关于apache_beam.transforms.util.Reshuffle()不适用于GCP数据流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆