我如何执行“差异"?在使用 Apache Beam Python SDK 给定密钥的两个源上? [英] How do I perform a "diff" on two Sources given a key using Apache Beam Python SDK?

查看:19
本文介绍了我如何执行“差异"?在使用 Apache Beam Python SDK 给定密钥的两个源上?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我笼统地提出了这个问题,因为也许这是一个笼统的答案.但一个具体的例子是比较 2 个具有相同架构但数据可能不同的 BigQuery 表.我想要一个差异,即相对于复合键添加、删除、修改的内容,例如前 2 列.

I posed the question generically, because maybe it is a generic answer. But a specific example is comparing 2 BigQuery tables with the same schema, but potentially different data. I want a diff, i.e. what was added, deleted, modified, with respect to a composite key, e.g. the first 2 columns.

Table A
C1  C2  C3
-----------
a   a   1
a   b   1
a   c   1

Table B     
C1  C2  C3  # Notes if comparing B to A
-------------------------------------
a   a   1   # No Change to the key a + a
a   b   2   # Key a + b Changed from 1 to 2
            # Deleted key a + c with value 1
a   d   1   # Added key a + d

我基本上希望能够制作/报告对比笔记.或者从 Beam 的角度来看,我可能只想输出最多 4 个标记的 PCollections:未更改、已更改、已添加、已删除.我该怎么做,PCollections 会是什么样子?

I basically want to be able to make/report the comparison notes. Or from a Beam perspective I may want to Just output up to 4 labeled PCollections: Unchanged, Changed, Added, Deleted. How do I do this and what would the PCollections look like?

推荐答案

你在这里要做的,基本上是连接两个表并比较结果,对吗?你可以看看我对这个问题的回答,查看连接两个表的两种方式(侧输入,或 CoGroupByKey).

What you want to do here, basically, is join two tables and compare the result of that, right? You can look at my answer to this question, to see the two ways in which you can join two tables (Side inputs, or CoGroupByKey).

我还将使用 CoGroupByKey 为您的问题编写解决方案.我正在用 Python 编写代码,因为我更熟悉 Python SDK,但您会用 Java 实现类似的逻辑:

I'll also code a solution for your problem using CoGroupByKey. I'm writing the code in Python because I'm more familiar with the Python SDK, but you'd implement similar logic in Java:

def make_kv_pair(x):
  """ Output the record with the x[0]+x[1] key added."""
  return ((x[0], x[1]), x)

table_a = (p | 'ReadTableA' >> beam.Read(beam.io.BigQuerySource(....))
            | 'SetKeysA' >> beam.Map(make_kv_pair)
table_b = (p | 'ReadTableB' >> beam.Read(beam.io.BigQuerySource(....))
            | 'SetKeysB' >> beam.Map(make_kv_pair))

joined_tables = ({'table_a': table_a, 'table_b': table_b}
                 | beam.CoGroupByKey())


output_types = ['changed', 'added', 'deleted', 'unchanged']
class FilterDoFn(beam.DoFn):
  def process((key, values)):
    table_a_value = list(values['table_a'])
    table_b_value = list(values['table_b'])
    if table_a_value == table_b_value:
      yield pvalue.TaggedOutput('unchanged', key)
    elif len(table_a_value) < len(table_b_value):
      yield pvalue.TaggedOutput('added', key)
    elif len(table_a_value) > len(table_b_value):
      yield pvalue.TaggedOutput('removed', key)
    elif table_a_value != table_b_value:
      yield pvalue.TaggedOutput('changed', key)

key_collections = (joined_tables 
                   | beam.ParDo(FilterDoFn()).with_outputs(*output_types))

# Now you can handle each output
key_collections.unchanged | WriteToText(...)
key_collections.changed | WriteToText(...)
key_collections.added | WriteToText(...)
key_collections.removed | WriteToText(...)

这篇关于我如何执行“差异"?在使用 Apache Beam Python SDK 给定密钥的两个源上?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆