GCP数据流Apache Beam代码逻辑未按预期工作 [英] GCP Dataflow Apache Beam code logic not working as expected

查看:75
本文介绍了GCP数据流Apache Beam代码逻辑未按预期工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在部署于Google Cloud Dataflow中的Apache Beam中实现CDC.

I am trying to implement a CDC in Apache Beam, deployed in Google Cloud Dataflow.

我已经卸载了主数据和新数据,这些数据预计每天都会发布. 该联接未按预期工作.缺了点什么.

I have unloaded the master data and the new data, which is expected to coming daily. The join is not working as expected. Something is missing.

master_data = (
    p | 'Read base from BigQuery ' >> beam.io.Read(beam.io.BigQuerySource(query=master_data, use_standard_sql=True))
      | 'Map id in master' >> beam.Map(lambda master: (
          master['id'], master)))
new_data = (
    p | 'Read Delta from BigQuery ' >> beam.io.Read(beam.io.BigQuerySource(query=new_data, use_standard_sql=True))
      | 'Map id in new' >> beam.Map(lambda new: (new['id'], new)))

joined_dicts = (
    {'master_data' :master_data, 'new_data' : new_data }
    | beam.CoGroupByKey()
    | beam.FlatMap(join_lists)
    | 'mergeddicts' >> beam.Map(lambda masterdict, newdict: newdict.update(masterdict))
) 

def join_lists(k,v):
    itertools.product(v['master_data'], v['new_data'])

观察(关于样本数据):

Observations (on sample data):

主数据

1, 'A',3232

2, 'B',234

新数据:

1,'A' ,44

4,'D',45

主表中的预期结果,发布代码实现:

Expected result in master table, post the code implementation:

1, 'A',44

2, 'B',234

4,'D',45

但是,我在主表中得到的是:

However, what I am getting in master table is:

1,'A' ,44

4,'D',45

我错过了一步吗?任何人都可以帮助纠正我的错误.

Am I missing a step? Can anyone please assist in rectifying my mistake.

推荐答案

在分组之后,您无需进行拼合,因为它再次分隔了元素.

You don't need to flatten after group by as it separates the elements again.

这是示例代码.

from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam

def join_lists(e):
    (k,v)=e
    return (k, v['new_data']) if v['new_data'] != v['master_data'] else (k, None)

with Pipeline(options=PipelineOptions()) as p:
    master_data = (
        p | 'Read base from BigQuery ' >> beam.Create([('A', [3232]),('B', [234])])
    )
    new_data = (
        p | 'Read Delta from BigQuery ' >> beam.Create([('A',[44]),('D',[45])])
    )

    joined_dicts = (
        {'master_data' :master_data, 'new_data' : new_data }
        | beam.CoGroupByKey()
        | 'mergeddicts' >> beam.Map(join_lists)
    )
    result = p.run()
    result.wait_until_finish()

这篇关于GCP数据流Apache Beam代码逻辑未按预期工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆