阿帕奇梁 ->BigQuery - 用于重复数据删除的 insertId 不起作用 [英] Apache Beam -> BigQuery - insertId for deduplication not working

查看:28
本文介绍了阿帕奇梁 ->BigQuery - 用于重复数据删除的 insertId 不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将数据从 kafka 流式传输到 BigQuery,使用 apache beam 和 google dataflow runner.我想使用 insertId 进行重复数据删除,我发现在 google docs 中有描述.但是即使插入发生在彼此之间的几秒钟内,我仍然看到很多具有相同 insertId 的行.现在我想知道也许我没有正确使用 API 来利用 BQ 提供的流式插入的重复数据删除机制.

I am streaming data from kafka to BigQuery using apache beam with google dataflow runner. I wanted to make use of insertId for deduplication, that I found described in google docs. But even tho inserts are happening within few seconds from each other I still see a lot of rows with the same insertId. Now I'm wondering that perhaps I am not using the API correctly to take advantage of deduplication mechanism for streaming inserts offered by BQ.

我写的beam代码如下:

My code in beam for writing looks as follows:

payments.apply("Write Fx Payments to BQ", BigQueryIO.<FxPayment>write()
            .withFormatFunction(ps -> FxTableRowConverter.convertFxPaymentToTableRow(ps))
            .to(bqTradePaymentTable)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

除了所有其他字段,我直接在 FxTableRowConverter.convertFxPaymentToTableRow 方法中的 TableRow 上设置 insertId 作为格式函数传递给 BigQueryIO:

Besides all other fields I am setting insertId directly on TableRow in FxTableRowConverter.convertFxPaymentToTableRow method passed to BigQueryIO as format function:

row.set("insertId", insertId);

我还将该字段作为列添加到 BQ.没有它,它在插入时失败(显然).除了将它添加到 TableRow 对象之外,我找不到任何其他方法可以直接在 BigQueryIO 上设置 insertId.

I also added that field as a column to BQ. Without it, it was failing on inserts (obviously). I couldn't find any other way to set insertId directly on BigQueryIO other than adding it to TableRow object.

这是正确的使用方法吗?因为它对我不起作用,即使我不应该看到很多重复,因为就像我已经提到的那样,插入会在几秒钟内发生.BigQuery 文档指出,流式缓冲区将 insertId 保留至少一分钟.

Is this the correct way of using this? Because it does not work for me, I am seeing many duplicates even tho I shouldn't, since like I already mentioned inserts are happening within seconds. BigQuery doc states that streaming buffer is keeping insertId for at least one minute.

推荐答案

您无法在 Dataflow 中手动为 BigQuery 流指定 insertId https://stackoverflow.com/a/54193825/1580227

You can't manually specify insertId for BigQuery streaming in Dataflow https://stackoverflow.com/a/54193825/1580227

这篇关于阿帕奇梁 ->BigQuery - 用于重复数据删除的 insertId 不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆