如何取消嵌套数据流中的嵌套PCollection [英] How to Unnest the nested PCollection in Dataflow

查看:117
本文介绍了如何取消嵌套数据流中的嵌套PCollection的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

要加入两个嵌套结构的PCollection,我们需要在进行加入之前取消嵌套PCollection,以解决挑战(请参阅我的其他stackoverflow案例

To Join two nested structure PCollection, we need to Unnest the PCollection before doing join, as getting challenges (refer my other stackoverflow case a link). So want to know how to unnest the PCollection. It would be good if some one give idea either Join two nested table or how to unnest PCollections.

我刚刚注意到我们有PTransform"Unnest"(

I just noted that we have PTransform "Unnest" (link) for unnesting collection from the nested one. But I could not find any sample on net. However I just tried to implement it with below steps to convert nested collection, but still unable to get the unnest Collection in last.

1)PCollection empCollection = ReadCollection(); 2)使用Pardo函数将值从PCollection(com.google.api.services.bigquery.model.TableRow)转换为PCollection(org.apache.beam.sdk.values.Row) 3)定义如下所示的架构 模式项目= Schema.builder().addInt32Field("Id").addStringField("Name").build(); 架构员工= Schema.builder().addStringField("empNo").addStringField("empName").addArrayField("Projects",FieldType.row(projects)).build(); 4)使用Unnest转换取消嵌套集合的嵌套

1) PCollection empCollection = ReadCollection(); 2) Using Pardo function convert the value from PCollection (com.google.api.services.bigquery.model.TableRow) to PCollection(org.apache.beam.sdk.values.Row) 3) Define the Schema like below Schema projects = Schema.builder().addInt32Field("Id").addStringField("Name").build(); Schema Employees = Schema.builder().addStringField("empNo").addStringField("empName").addArrayField("Projects", FieldType.row(projects)).build(); 4) Use Unnest transform to unnest the nested collection

PCollection<Row> pcColl = targetRowCollection.apply(Unnest.<Row>create().withFieldNameFunction(new SerializableFunction<java.util.List<java.lang.String>, java.lang.String>() {
@Override
public java.lang.String apply(java.util.List<java.lang.String> input) {
    return String.join("+", input);
    }
}));

5)使用Pardo函数将值从PCollection(org.apache.beam.sdk.values.Row)转换为PCollection(com.google.api.services.bigquery.model.TableRow)

5) Using Pardo function convert the value from PCollection(org.apache.beam.sdk.values.Row) to PCollection (com.google.api.services.bigquery.model.TableRow)

可以使用此Unnest转换将嵌套集合中的unnest集合转换为我吗?

Could some one to help me, using this Unnest transform to convert the unnest collection from nested collection.

推荐答案

使用Beam在python中将两个具有嵌套结构的Pcollection连接起来的代码:

code for joining two Pcollection with nested structure in python with Beam:

with beam.Pipeline(options=option) as p:

    source_record1 =  p | "get data1" >> beam.io.avroio.ReadFromAvro(input_file1)
    source_record2 =  p | "get data2" >> beam.io.avroio.ReadFromAvro(input_file2)

    #convert into <k,v> form
    keyed_record1 = source_record1 | beam.ParDo(addkeysnested(),join_fileld_names1)
    keyed_record2 = source_record2 | beam.ParDo(addkeysnested(),join_fileld_names2)

    #Apply join operation
    rjoin = ({'File1Info': keyed_record1, 'File2Info': keyed_record2}                     
               | beam.CoGroupByKey())


    class addkeysnested(beam.DoFn):
        def process(self,element,fieldName):
            tmp_record = element    
            fieldName = fieldName.split(".")
            for i in range(len(fieldName)):

                if i != len(fieldName) - 1 :
                    tmp_record = tmp_record[fieldName[i].strip()][0]

                else:
                    tmp_record = tmp_record[fieldName[i].strip()]   

        return [(tmp_record,element)]

注意:在上面的代码中,我们可以在嵌套字段的任何级别(即personalInfo.Address.City)获取键值,然后应用CoGroupByKey()来连接两个pcollection

Note: In above code we can get keyvalue at any level of nested fields i.e. personalInfo.Address.City, After that apply CoGroupByKey() to join two pcollection

这篇关于如何取消嵌套数据流中的嵌套PCollection的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆