我如何执行"diff"操作在使用Apache Beam Python SDK给出密钥的两个来源上? [英] How do I perform a "diff" on two Sources given a key using Apache Beam Python SDK?

查看:55
本文介绍了我如何执行"diff"操作在使用Apache Beam Python SDK给出密钥的两个来源上?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一般性地提出了这个问题,因为也许这是一个一般性的答案.但是一个特定的示例是将2个BigQuery表与相同的架构进行比较,但可能会有不同的数据.我想要一个差异,即相对于复合键,例如,添加,删除,修改的内容前2列.

I posed the question generically, because maybe it is a generic answer. But a specific example is comparing 2 BigQuery tables with the same schema, but potentially different data. I want a diff, i.e. what was added, deleted, modified, with respect to a composite key, e.g. the first 2 columns.

Table A
C1  C2  C3
-----------
a   a   1
a   b   1
a   c   1

Table B     
C1  C2  C3  # Notes if comparing B to A
-------------------------------------
a   a   1   # No Change to the key a + a
a   b   2   # Key a + b Changed from 1 to 2
            # Deleted key a + c with value 1
a   d   1   # Added key a + d

我基本上希望能够制作/报告比较记录. 或者从Beam的角度来看,我可能只想输出最多4个标记的PCollection:未更改,已更改,已添加,已删除.我该怎么办?PCollection的外观如何?

I basically want to be able to make/report the comparison notes. Or from a Beam perspective I may want to Just output up to 4 labeled PCollections: Unchanged, Changed, Added, Deleted. How do I do this and what would the PCollections look like?

推荐答案

基本上,您要在此处执行的操作是联接两个表并比较其结果,对吗?您可以查看我对这个问题的回答,以查看两种方式可以联接两个表(侧输入或CoGroupByKey).

What you want to do here, basically, is join two tables and compare the result of that, right? You can look at my answer to this question, to see the two ways in which you can join two tables (Side inputs, or CoGroupByKey).

我还将使用CoGroupByKey为您的问题编写解决方案.我用Python编写代码是因为我对Python SDK更加熟悉,但是您将在Java中实现类似的逻辑:

I'll also code a solution for your problem using CoGroupByKey. I'm writing the code in Python because I'm more familiar with the Python SDK, but you'd implement similar logic in Java:

def make_kv_pair(x):
  """ Output the record with the x[0]+x[1] key added."""
  return ((x[0], x[1]), x)

table_a = (p | 'ReadTableA' >> beam.Read(beam.io.BigQuerySource(....))
            | 'SetKeysA' >> beam.Map(make_kv_pair)
table_b = (p | 'ReadTableB' >> beam.Read(beam.io.BigQuerySource(....))
            | 'SetKeysB' >> beam.Map(make_kv_pair))

joined_tables = ({'table_a': table_a, 'table_b': table_b}
                 | beam.CoGroupByKey())


output_types = ['changed', 'added', 'deleted', 'unchanged']
class FilterDoFn(beam.DoFn):
  def process((key, values)):
    table_a_value = list(values['table_a'])
    table_b_value = list(values['table_b'])
    if table_a_value == table_b_value:
      yield pvalue.TaggedOutput('unchanged', key)
    elif len(table_a_value) < len(table_b_value):
      yield pvalue.TaggedOutput('added', key)
    elif len(table_a_value) > len(table_b_value):
      yield pvalue.TaggedOutput('removed', key)
    elif table_a_value != table_b_value:
      yield pvalue.TaggedOutput('changed', key)

key_collections = (joined_tables 
                   | beam.ParDo(FilterDoFn()).with_outputs(*output_types))

# Now you can handle each output
key_collections.unchanged | WriteToText(...)
key_collections.changed | WriteToText(...)
key_collections.added | WriteToText(...)
key_collections.removed | WriteToText(...)

这篇关于我如何执行"diff"操作在使用Apache Beam Python SDK给出密钥的两个来源上?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆