如何使用Apache Beam的PubSubIO withIdAttribute在DataFlow中对GCP PubSub中的消息进行重复数据删除 [英] How to deduplicate messages from GCP PubSub in DataFlow using Apache Beam's PubSubIO withIdAttribute

查看:65
本文介绍了如何使用Apache Beam的PubSubIO withIdAttribute在DataFlow中对GCP PubSub中的消息进行重复数据删除的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在尝试将withIdAttributePubSubIO一起使用,以对来自PubSub的邮件进行重复数据删除(因为PubSub仅保证至少一次传递).

I'm currently attempting to use withIdAttribute with PubSubIO to deduplicate messages that come from PubSub (since PubSub only guarantees at least once delivery).

我的消息有四个字段,分别为label1label2timestampvalue.在某个时间戳记中,两个标签的值是唯一的.因此,在写入PubSub之前,我还设置了一个uniqueID属性,该属性等于作为字符串连接的这三个值.

My messages have four fields, label1, label2, timestamp, and value. A value is unique to the two labels at some timestamp. Therefore, I additionally set a uniqueID attribute before writing to PubSub equal to these three values joined as a string.

例如,这就是我使用gcp控制台工具从订阅中读取的内容.

For example, this is what I get from reading from a subscription using the gcp console tool.

┌───────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────┬───────────────────────────────────────────────────────────────────────────────────────────────────┐
│                                                    DATA                                                   │   MESSAGE_ID   │                                               ATTRIBUTES                                          │
├───────────────────────────────────────────────────────────────────────────────────────────────────────────┼────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ {"label1":"5c381a51-2873-49b8-acf5-60a0fa59fc65","label2":"foobarbaz","timestamp":1513199383,"value":4.2} │ 11185357338249 │ eventTime=2017-12-13T21:09:43Z uniqueID=5c381a51-2873-49b8-acf5-60a0fa59fc65:foobarbaz:1513199383 │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────┘

在我的运行在GCP Dataflow上的Beam作业中,我将这些消息解码为json,对其进行窗口化,按它们的两个标签进行分组,然后尝试对其进行汇总.但是,在聚合类CreateMyAggregationsFn中,我看到重复的消息具有相同的label1label2timestamp.

In my beam job, running on GCP Dataflow, I decode these messages as json, window them, group them by their two labels, and then attempt to aggregate them. However, in my aggregation class CreateMyAggregationsFn I'm seeing duplicate messages that have the same label1, label2, and timestamp.

public class MyBeam {
  public interface MyBeanOptions extends PipelineOptions {
    // ...
  }

  private static class MyMessage implements Serializable {
    public long timestamp;
    public double value;
    public String label1;
    public String label2;
  }

  private static class CreateMyAggregationsFn extends DoFn<KV<String, Iterable<MyMessage>>, MyAggregate> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      ArrayList<MyMessage> messages = new ArrayList<>();
      c.element().getValue().forEach(messages::add);
      Collections.sort(messages, (msg1, msg2) -> Long.compare(msg1.timestamp, msg2.timestamp));

      MyMessage prev = null
      for (MyMessage msg : messages) {
        if (prev != null &&
            msg.timestamp == prev.timestamp && 
            msg.label1.equals(prev.label1) && 
            msg.label2.equals(prev.label2)) {
            // ... identifying duplicates here
        }
        prev = msg;
      }
      ...
    }
  }

  public static void main(String[] args) throws IOException {
    MyBeamOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyBeamOptions.class);
    Pipeline pipeline = Pipeline.create(options);
    PubsubIO.Read<String> pubsubReadSubscription =
        PubsubIO.readStrings()
            .withTimestampAttribute("eventTime")
            .withIdAttribute("uniqueID")
            .fromSubscription(options.getPubsubSubscription());
    pipeline
        .apply("PubsubReadSubscription", pubsubReadSubscription)
        .apply("ParseJsons", ParseJsons.of(MyMessage.class))
        .setCoder(SerializableCoder.of(MyMessage.class))
        .apply(
            "Window",
            Window.<MyMessage>into(FixedWindows.of(Duration.standardSeconds(60)))
                .triggering(
                    AfterWatermark.pastEndOfWindow()
                        .withLateFirings(AfterPane.elementCountAtLeast(1)))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.standardSeconds(3600)))
        .apply(
            "PairMessagesWithLabels",
            MapElements.into(
                    TypeDescriptors.kvs(
                        TypeDescriptors.strings(), TypeDescriptor.of(MyMessage.class)))
                .via(msg -> KV.of(msg.label1 + ":" + msg.label2, msg)))
        .apply("GroupMessagesByLabels", GroupByKey.<String, MyMessage>create())
        .apply("CreateAggregations", ParDo.of(new CreateMyAggregationsFn()))
        // ...
    PipelineResult result = pipeline.run();
  }
}

是否还有其他步骤可以使用我缺少的withIdAttribute方法对来自PubSubIO的消息进行重复数据删除?

Is there an additional step to deduping messages from PubSubIO with the withIdAttribute method that I'm missing?

推荐答案

您指定的是accumulatingFiredPanes(),这意味着如果一个窗口多次触发(例如,如果有较晚的数据到达),则您要求连续触发包括所有以前触发的元素,而不仅仅是新元素.根据定义,这会产生重复.您要通过指定accumulatingFiredPanes()来实现什么?

You are specifying accumulatingFiredPanes(), which means that in case of multiple firings for a window (e.g. if late data arrives) you are asking successive firings to include all the elements from previous firings, not just new elements. This by definition produces duplication. What are you trying to achieve by specifying accumulatingFiredPanes()?

这篇关于如何使用Apache Beam的PubSubIO withIdAttribute在DataFlow中对GCP PubSub中的消息进行重复数据删除的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆