如何在 Dataflow 中取消嵌套的 PCollection [英] How to Unnest the nested PCollection in Dataflow

查看:22
本文介绍了如何在 Dataflow 中取消嵌套的 PCollection的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

要加入两个嵌套结构PCollection,我们需要在加入之前取消PCollection的嵌套,作为获得挑战(参考我的另一个stackoverflow案例一个链接).所以想知道如何取消嵌套PCollection.如果有人提出加入两个嵌套表或如何取消嵌套 PCollections 的想法,那就太好了.

To Join two nested structure PCollection, we need to Unnest the PCollection before doing join, as getting challenges (refer my other stackoverflow case a link). So want to know how to unnest the PCollection. It would be good if some one give idea either Join two nested table or how to unnest PCollections.

我刚刚注意到我们有 PTransform "Unnest" (link) 用于从嵌套集合中取消嵌套集合.但是我在网上找不到任何样本.但是我只是尝试使用以下步骤来实现它来转换嵌套集合,但仍然无法在最后获得 unnest 集合.

I just noted that we have PTransform "Unnest" (link) for unnesting collection from the nested one. But I could not find any sample on net. However I just tried to implement it with below steps to convert nested collection, but still unable to get the unnest Collection in last.

1) PCollection empCollection = ReadCollection();2) 使用 Pardo 函数将值从 PCollection (com.google.api.services.bigquery.model.TableRow) 转换为 PCollection(org.apache.beam.sdk.values.Row)3)定义架构如下架构项目 = Schema.builder().addInt32Field("Id").addStringField("Name").build();架构员工 = Schema.builder().addStringField("empNo").addStringField("empName").addArrayField("Projects", FieldType.row(projects)).build();4) 使用 Unnest 变换来取消嵌套的集合

1) PCollection empCollection = ReadCollection(); 2) Using Pardo function convert the value from PCollection (com.google.api.services.bigquery.model.TableRow) to PCollection(org.apache.beam.sdk.values.Row) 3) Define the Schema like below Schema projects = Schema.builder().addInt32Field("Id").addStringField("Name").build(); Schema Employees = Schema.builder().addStringField("empNo").addStringField("empName").addArrayField("Projects", FieldType.row(projects)).build(); 4) Use Unnest transform to unnest the nested collection

PCollection<Row> pcColl = targetRowCollection.apply(Unnest.<Row>create().withFieldNameFunction(new SerializableFunction<java.util.List<java.lang.String>, java.lang.String>() {
@Override
public java.lang.String apply(java.util.List<java.lang.String> input) {
    return String.join("+", input);
    }
}));

5) 使用 Pardo 函数将值从 PCollection(org.apache.beam.sdk.values.Row) 转换为 PCollection (com.google.api.services.bigquery.model.TableRow)

5) Using Pardo function convert the value from PCollection(org.apache.beam.sdk.values.Row) to PCollection (com.google.api.services.bigquery.model.TableRow)

有人可以帮助我,使用此 Unnest 转换将 unnest 集合从嵌套集合转换.

Could some one to help me, using this Unnest transform to convert the unnest collection from nested collection.

推荐答案

在python中用Beam连接两个嵌套结构的Pcollection的代码:

code for joining two Pcollection with nested structure in python with Beam:

with beam.Pipeline(options=option) as p:

    source_record1 =  p | "get data1" >> beam.io.avroio.ReadFromAvro(input_file1)
    source_record2 =  p | "get data2" >> beam.io.avroio.ReadFromAvro(input_file2)

    #convert into <k,v> form
    keyed_record1 = source_record1 | beam.ParDo(addkeysnested(),join_fileld_names1)
    keyed_record2 = source_record2 | beam.ParDo(addkeysnested(),join_fileld_names2)

    #Apply join operation
    rjoin = ({'File1Info': keyed_record1, 'File2Info': keyed_record2}                     
               | beam.CoGroupByKey())


    class addkeysnested(beam.DoFn):
        def process(self,element,fieldName):
            tmp_record = element    
            fieldName = fieldName.split(".")
            for i in range(len(fieldName)):

                if i != len(fieldName) - 1 :
                    tmp_record = tmp_record[fieldName[i].strip()][0]

                else:
                    tmp_record = tmp_record[fieldName[i].strip()]   

        return [(tmp_record,element)]

注意:在上面的代码中,我们可以在任何级别的嵌套字段即personalInfo.Address.City上获取keyvalue,然后应用CoGroupByKey()加入两个pcollection

Note: In above code we can get keyvalue at any level of nested fields i.e. personalInfo.Address.City, After that apply CoGroupByKey() to join two pcollection

这篇关于如何在 Dataflow 中取消嵌套的 PCollection的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆