Apache Beam - 在两个无界 PCollections 上按键进行流连接 [英] Apache Beam - Stream Join by Key on two unbounded PCollections

查看:18
本文介绍了Apache Beam - 在两个无界 PCollections 上按键进行流连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个 Unbounded(KafkaIO) PCollections,我正在为其应用基于标签的 CoGroupByKey,固定窗口为 1 分钟,但是在加入时间 大多数情况下,对于某些具有相同键的测试数据,集合似乎错过了其中一个标记数据.请找到以下代码段.

am having two Unbounded(KafkaIO) PCollections for which am applying tag based CoGroupByKey with a fixed window of 1 min, however at the time of joining most of the time the collection seem to miss one of the tagged data for some test data having same keys. Please find the below snippet.

    KafkaIO.Read<Integer, String> event1 = ... ;


    KafkaIO.Read<Integer, String> event2 = ...;

    PCollection<KV<String,String>> event1Data = p.apply(event1.withoutMetadata())
            .apply(Values.<String>create())
            .apply(MapElements.via(new SimpleFunction<String, KV<String, String>>() {
                @Override public KV<String, String> apply(String input) {
                    log.info("Extracting Data");
                    . . . .//Some processing
                    return KV.of(record.get("myKey"), record.get("myValue"));
                }
            }))
            .apply(Window.<KV<String,String>>into(
                    FixedWindows.of(Duration.standardMinutes(1))));

    PCollection<KV<String,String>> event2Data = p.apply(event2.withoutMetadata())
            .apply(Values.<String>create())
            .apply(MapElements.via(new SimpleFunction<String, KV<String, String>>() {
                @Override public KV<String, String> apply(String input) {
                    log.info("Extracting Data");
                    . . . .//Some processing
                    return KV.of(record.get("myKey"), record.get("myValue"));
                }
            }))
            .apply(Window.<KV<String,String>>into(
                    FixedWindows.of(Duration.standardMinutes(1))));

   final TupleTag<String> event1Tag = new TupleTag<>();
   final TupleTag<String> event2Tag = new TupleTag<>();

   PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
            .of(event1Tag, event1Data)
            .and(event2Tag, event2Data)
            .apply(CoGroupByKey.<String>create());

   PCollection<String> finalResultCollection =
            kvpCollection.apply("Join", ParDo.of(
                    new DoFn<KV<String, CoGbkResult>, String>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) throws IOException {
                            KV<String, CoGbkResult> e = c.element();
                            Iterable<String> event1Values = e.getValue().getAll(event1Tag);
                            Iterable<String> event2Values = e.getValue().getAll(event2Tag);
                            if( event1.iterator().hasNext() && event2.iterator().hasNext() ){
                               // Process event1 and event2 data and write to c.output
                            }else {
                                System.out.println("Unable to join event1 and event2");
                            }
                        }
                    }));

对于上面的代码,当我开始使用两个 kafka 主题的公共密钥抽取数据时,它永远不会加入,即 无法加入 event1 和 event2,如果我做错了什么,请告诉我或者有没有更好的方法将两个无界 PCollection 连接到一个公共键上.

For the above code when I start pumping data with a common key for the two kafka topics, its never getting joined i.e Unable to join event1 and event2, kindly let me know if am doing anything wrong or is there a better way to join two unbounded PCollection on a common key.

推荐答案

我最近遇到了类似的问题.根据 Beam 文档,要在无界 PCollection(特别是键值 PCollection)上使用 CoGroupByKey 转换,所有 PCollection 都应该具有相同的窗口和触发策略.因此,由于您正在使用流/无界集合,因此您必须根据您的触发策略在特定时间间隔后使用 Trigger 触发并发出窗口输出.这个触发器应该连续触发,因为你在这里处理流数据,即永远重复使用你的触发器.您还需要在窗口化 PCollection 上应用累积/丢弃选项来告诉光束在触发触发器后应该做什么,即累积丢弃窗格的结果.使用此加窗、触发和累加策略后,您应该使用 CoGroupByKey 变换将多个无界 PCollection 分组使用一个公共键.

I had similar issue recently. As per beam documentation, to use CoGroupByKey transfrom on unbounded PCollections (key-value PCollection, specifically), all the PCollection should have same windowing and trigger strategy. So you will have to use Trigger to fire and emit window output after certain interval based on your Triggering strategy since you are working with streaming/unbounded collections. This trigger should fire contineously since you are dealing with streaming data here i.e. use your Trigger repeatedly forever. You also need to apply accumulating/discarding option on your windowed PCollection to tell beam what should be done after trigger is fired i.e. to accumulate the result of discard the window pane. After using this windowing, trigger and accumulating strategy you should use CoGroupByKey transform to group multiple unbounded PCollection using a common key.

像这样:

PCollection<KV<String, Employee>> windowedCollection1
                    = collection1.apply(Window.<KV<String, DeliveryTimeWindow>>into(FixedWindows.of(Duration.standardMinutes(5)))
                    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                    .withAllowedLateness(Duration.ZERO).accumulatingFiredPanes());


PCollection<KV<String, Department>> windowedCollection2
                    = collection2.apply(Window.<KV<String, DeliveryTimeWindow>>into(FixedWindows.of(Duration.standardMinutes(5)))
                    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                    .withAllowedLateness(Duration.ZERO).accumulatingFiredPanes());

然后使用 CoGroupByKey :

Then use CoGroupByKey :

final TupleTag<Employee> t1 = new TupleTag<>();
final TupleTag<Department> t2 = new TupleTag<>();

PCollection<KV<String, CoGbkResult>> groupByKeyResult =
                    KeyedPCollectionTuple.of(t1,windowedCollection1)
.and(t2,windowedCollection2) 
                            .apply("Join Streams", CoGroupByKey.create());

现在您可以在 ParDo 变换中处理分组的 PCollection.

now you can process your grouped PCollection in ParDo transform.

希望这会有所帮助!

这篇关于Apache Beam - 在两个无界 PCollections 上按键进行流连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆