Beam/DataFlow ::ReadFromPubSub(id_label) :: 意外行为 [英] Beam / DataFlow ::ReadFromPubSub(id_label) :: Unexpected behavior

查看:24
本文介绍了Beam/DataFlow ::ReadFromPubSub(id_label) :: 意外行为的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有人可以澄清 ReafFromPubSub 转换?

Can someone clarify what's the purpose for id_label argument in ReafFromPubSub transform?

我正在使用 BigQuery 接收器,我的理解它就像 BQ Streaming API 的 insertId表格数据:insertAll

I'm using BigQuery sink, my understanding it acts like an insertId for BQ Streaming API, Tabledata: insertAll

每一行的唯一 ID.BigQuery 使用此属性尽最大努力检测重复的插入请求.如需了解详情,请参阅数据一致性.

A unique ID for each row. BigQuery uses this property to detect duplicate insertion requests on a best-effort basis. For more information, see data consistency.

但是我没有看到这种预期的行为.

However I don't see this expected behaviour.

  • 我正在向 Pub/Sub 发布消息,每条消息都具有相同的属性 message_id 值(这是为了测试管道/BQ 重复数据删除行为)

  • I'm publishing messages to Pub/Sub, each message with same attribute message_id value (this is intentional to test pipeline / BQ dedupe behaviour)

我的管道从pubs读取如下beam.io.gcp.pubsub.ReadFromPubSub(topic=TOPIC,订阅=无,id_label='message_id')

My pipeline reads from pubs as follows beam.io.gcp.pubsub.ReadFromPubSub(topic=TOPIC, subscription=None, id_label='message_id')

但仍在查询 BQ,所有消息都被插入.我预料到,因为每条消息发布的消息都具有相同的 message_id 值,所以 BQ 应该推断出那些......

but still querying BQ, all messages get inserted. I expected, because each message published with same message_id value, BQ should have deduced those...

有人可以澄清一下吗?提前致谢!

can someone clarify pls? Thanks in advance!

另外,我注意到DirectRunner在使用这个属性时不断抛出错误,

Also, I notice DirectRunner keep throwing error when using this attribute,

NotImplementedError: DirectRunner: id_label 不支持 PubSub 读取

NotImplementedError: DirectRunner: id_label is not supported for PubSub reads

我必须使用 DataflowRunner... 这也是预期的吗?

I've to use DataflowRunner... is that expected as well?

干杯!

UPDATE 1:移至 DataflowRunner,并且管道似乎在 ReadFromPubSub() 期间尊重 id_label 参数.但是,重复的消息确实会偶尔被读入管道.

UPDATE 1 : moved to DataflowRunner, and the pipeline seems to respect id_label argument during ReadFromPubSub(). However, the duplicate messages DO continue to get read into the pipeline sporadically.

cid=141&message_id=2&evt_time={{DATE_TIME_AT_RUNTIME}}

cid=141&message_id=2&evt_time={{DATE_TIME_AT_RUNTIME}}

注意,我也在消息的属性中传递相同的 message_id 值(=‘2’)(这是为了尝试,测试推断行为).

notice, i’m passing same message_id value (=‘2’) in message’s attribute as well (this is intention to try, test deduce behaviour).

  • 我的管道(在 Dataflow Runner 上运行,beam Python v2.11 SDK,管道代码在这里 ),将以下消息转储到 BQ.如您所见,多个具有相同 message_id 的消息被读入管道并发送到接收器.这通常发生在我停止/重新启动我的发布者应用程序时.
  • my pipeline (running on Dataflow Runner, beam Python v2.11 SDK, pipeline code is here ), dumps following message to BQ. As you can see, multiple messages with same message_id get read into pipeline and emitted to sink. This usually happens, when I stop/restart my publisher application.
cid=141&message_id=2&evt_time=2019-03-17T09:31:15.792653Z
cid=141&message_id=2&evt_time=2019-03-17T09:30:00.767878Z
cid=141&message_id=2&evt_time=2019-03-17T09:28:30.747951Z
cid=141&message_id=2&evt_time=2019-03-17T09:22:30.668764Z
cid=141&message_id=2&evt_time=2019-03-17T09:21:00.646867Z
cid=141&message_id=2&evt_time=2019-03-17T09:19:45.630280Z
cid=141&message_id=2&evt_time=2019-03-17T09:12:05.466953Z
cid=141&message_id=2&evt_time=2019-03-17T09:10:42.956195Z
cid=141&message_id=2&evt_time=2019-03-17T09:01:42.816151Z

推荐答案

那些是不同的 ID.正如此处所述,发布到主题的每条消息都有一个名为 messageId 的字段,保证在主题中是唯一的.Pub/Sub 保证至少一次交付所以订阅可以有重复(即具有相同 messageId 的消息).数据流有 exactly-once processing 语义,因为它在从订阅读取时使用该字段去重复消息.这与接收器无关,接收器不需要是 BigQuery.

Those are different IDs. As explained here, every message published to a topic has a field named messageId that is guaranteed to be unique within the topic. Pub/Sub guarantees at-least-once delivery so a subscription can have duplicates (i.e. messages with the same messageId). Dataflow has exactly-once processing semantics because it uses that field to de-duplicate messages when reading from a subscription. This is independent of the sink, which does not need to be BigQuery.

使用 id_label(或 Java SDK 中的 .withIdAttribute())我们可以强制消息根据不同的字段被认为是重复的,该字段应该是唯一的(例如订单 ID、客户 ID 等).输入源只会读取重复的消息一次,您不会看到它们增加了管道中输入元素的数量.请记住,Direct Runner 仅用于测试,不提供在检查点、重复数据删除等方面有相同的保证.例如,请参考 此评论.这是您在管道中看到它们的最可能原因,同时考虑到 NotImplementedError 消息,因此我建议您改用 Dataflow Runner.

Using id_label (or .withIdAttribute() in the Java SDK) we can force that messages are considered duplicate according to a different field that should be unique (such as order ID, customer ID, etc.). The input source will read the repeated messages only once, you won't see them increase the count of input elements in the pipeline. Keep in mind that the Direct Runner is intended for testing purposes only and does not offer the same guarantees in terms of checkpointing, de-duplication, etc. As an example refer to this comment. That's the most likely cause of why you are seeing them in the pipeline, also taking into account the NotImplementedError messages, so I'd suggest moving to Dataflow Runner.

另一方面,在 insertId"nofollow noreferrer">尽力 为基础,以避免在 BigQuery 中重试流式插入时出现重复行.使用 BigQueryIO 它是在幕后创建的,不能手动指定.在您的情况下,如果您的 N 条消息进入管道并且 N 条写入 BigQuery,则它按预期工作.如果必须重试,该行具有相同的 insertId,因此被丢弃.

On the other side, insertId is used, on a best-effort basis, to avoid duplicate rows when retrying streaming inserts in BigQuery. Using BigQueryIO it is created under the hood and can't be specified manually. In your case, if your N messages enter the pipeline and N are written to BigQuery, it is working as expected. If any had to be retried, the row had the same insertId and was, therefore, discarded.

这篇关于Beam/DataFlow ::ReadFromPubSub(id_label) :: 意外行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆