如何从< key,value>中输出所有值对,使用Google Dataflow按密钥分组 [英] How to output all values from a <key,value> pair, grouped by key, using Google Dataflow

查看:127
本文介绍了如何从< key,value>中输出所有值对,使用Google Dataflow按密钥分组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试做一些看似相对简单但有些困难的事情.

I'm trying to do something that seems relatively straightforward but am running into some difficulty.

我有一堆文本,每一行都是一个值.我分析文本的每一行,创建适当的键,然后发出KV对.然后,我使用GroupByKey转换.最后,我想输出现在按键分组的所有文本(如果可以为每个键得到一个文本文件,则可以加分,但是我不确定这是可能的.)

I have a bunch of text, and each line is a value. I analyze each line of text, create the appropriate key, then emit KV pairs. I then use the GroupByKey transform. Finally, I want to output all the text now grouped by key (bonus points if I can get one text file for each key, but I'm not sure that's possible).

这是管道的apply的样子:

    public PCollection<String> apply(PCollection<String> generator) {

        // Returns individuals lines of text as <String,String> KV pairs
        PCollection<KV<String,String>> generatedTextKV = generator.apply(
                ParDo.of(new GeneratorByLineFn()));

        // Groups the <String,String> KV pairs by value
        PCollection<KV<String, Iterable<String>>> groupedText = generatedTextKV.apply(
            GroupByKey.<String, String>create());

        // Hopefully returns output where all of each key's values are together
        PCollection<String> results = groupedText.apply(ParDo.of(new FormatOutputFn()));

        return results;
    }

不幸的是,我无法使FormatOutputFn()正常工作.

Unfortunately, I cannot get the FormatOutputFn() to work as desired.

遍历Iterable<String>并输出每个值并不能保证键值对的分组(如果我错了,请更正我,然后我的问题就解决了).然后,我尝试使用StringBuilder(),它适用于小型数据集,但毫不奇怪,在较大数据的日志中会生成java.lang.OutOfMemoryError: Java heap space错误.我也尝试了Flatten.FlattenIterables转换,但是由于K,V对中的值不是PCollection,而是常规的Iterable,所以也不起作用.

Iterating over the Iterable<String> and outputting each value doesn't guarantee the key,value grouping (please correct me if I'm wrong about this, then my problem is solved). I then tried using StringBuilder(), which works with small datasets but unsurprisingly generates java.lang.OutOfMemoryError: Java heap space errors in the log on larger data. I also tried the Flatten.FlattenIterables transform, but that doesn't work either since the value in the K,V pair is not a PCollection, but just a regular Iterable.

我看过这个问题通过公用密钥进行分析,但是从答案中我并不完全清楚我应该如何处理自己的情况.我想我必须使用Combine.PerKey,但是我不确定如何使用它.我还假设必须有一种预烘焙的方法来执行此操作,但是我在文档中找不到这种预烘焙的方法.我确定我只是在找对地方.

I've seen this question on analysis by common key, but from the answer it is not entirely clear to me exactly what I should do with my situation. I think I have to use Combine.PerKey, but I'm not exactly sure how to use it. I'm also assuming there has to be a pre-baked way to do this, but I can't find that pre-baked way in the docs. I'm sure I'm just not looking in the right place.

而且,如上所述,如果有一种方法可以获取文本文件输出,而文本文件的名称是键,而值都在文件中,那就太好了.但是我不认为Dataflow可以做到这一点(还好吗?).

And, as mentioned above, if there is a way to get text file output where the name of the text file is the key and the values are all in the file, that would be amazing. But I don't think Dataflow can do this (yet?).

感谢您的阅读.

推荐答案

Dataflow当前不支持对PCollections进行排序的任何概念.您是正确的,不能保证结果"具有排序,包括键分组.我们想在某个时候为PCollections添加排序属性,但是尚不知道该时间表的时间表.

Dataflow doesn't currently support any notion of ordering on PCollections. You are correct that there is no guarantee that 'results' has an ordering, including key grouping. We would like to add ordering properties for PCollections at some point, but the timeline for that is not yet known.

由于基本的实现细节,某些跑步者在某些情况下可能会显示有序.例如,如果FormatOutputFn与写入"步骤融合在一起,则很可能会看到分组,因为每个KV<K, Iterable<V>>都被处理成多个<K,V>,然后在处理下一个KV<K, Iterable<V>>之前将其写入文件.但这再次只是Dataflow选择如何优化此特定情况的一种人工产物,因此一般不应依赖于此.

Certain runners may appear to have ordering in certain situations, due to underlying implementation details. For example, if FormatOutputFn is fusing with a Write step, then it's likely you will see grouping because each KV<K, Iterable<V>> is processed into multiple <K,V>s which are written to the file before the next KV<K, Iterable<V>> is processed. But again this is just an artifact of how Dataflow chooses to optimize this particular case and should not be relied on generally.

您已经知道,如果单个元素可以容纳在内存中,则可以让FormatOutputFn将每个KV<K, Iterable<V>>转换为包含多个换行符的单个String.

As you already figured out, if a single element could fit in memory, you could have FormatOutputFn convert each KV<K, Iterable<V>> into a single String which contains multiple newlines.

鉴于情况并非如此,我能想到的最佳解决方案是手动编写文件-因此FormatOutputFn接收每个KV<K, Iterable<V>>并使用标准GCS库打开名为K的文件并编写对此.坏消息是,这有点棘手,因为您需要了解我们的容错语义如何重试元素.但是好消息是,我们目前正在开发库,以帮助简化这些类型的自定义接收器.

Given that is not the case here, the best solution I can think of is to write the files by hand -- so FormatOutputFn takes each KV<K, Iterable<V>> and uses standard GCS libraries to open a file named K and write the Iterable<V> to it. The bad news is this gets a little tricky because you need to be aware of how our fault tolerance semantics might retry elements. But the good news is that we're currently working on libraries to help make these types of custom sinks easier.

对于零长度文件,这里有一个很棒的答案:

As for the zero-length files, there's an awesome answer here: Why are zero byte files written to GCS when running a pipeline?

这篇关于如何从&lt; key,value&gt;中输出所有值对,使用Google Dataflow按密钥分组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆