Apache Beam:刷新我正在使用 MongoDbIO.read() 从 MongoDB 读取的侧输入 [英] Apache Beam : Refreshing a sideinput which i am reading from the MongoDB using MongoDbIO.read()

查看:19
本文介绍了Apache Beam:刷新我正在使用 MongoDbIO.read() 从 MongoDB 读取的侧输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从 MongoDB 读取 PCollection mongodata 并使用这个 PCollection 作为我的 ParDo(DoFN).withSideInputs(PCollection) 的 sideInput

I am reading a PCollection mongodata from the MongoDB and using this PCollection as a sideInput to my ParDo(DoFN).withSideInputs(PCollection)

从后端,我的MongoDB 集合每天或每月或每年更新.我需要在我的管道中添加新的价值.

And from Backend my MongoDB collection is updating on a daily or monthly basis or a yearly may be. And i need that newly added value in my pipeline.

我们可以认为这是在正在运行的管道中刷新 mongo 集合值.例如,mongo 集合共有 20,000 个文档,一天后又向 mongo 集合中添加了三个记录,然后我需要在我的 pileine 中再添加三个值,总共 20,003 个.

We can consider this as refreshing the mongo collection value in a running pipeline. For example of mongo collection has total 20K documents and after one day three more records added into mongo collection then i need that three more values in my pileine which will be 20,003 in total.

目前我的管道看起来像这样.

Currently my pipeline looks like this.

PCollection<String> mongodata =  pipeline.apply(MongoDbIO.read()
                .withUri(options.getMongoDBHostName())
                .withDatabase(options.getMongoDBDatabaseName())
                .withCollection(options.getMongoVinCollectionName()))
                .apply(ParDo.of(new ConvertDocuemntToStringFn()));

PCollectionView<List<String>> list_of_data = mongodata.apply(View.<String> asList());

PCollection<PubsubMessage>  pubsubMessagePCollection = controller.flattenPubSubPCollection(
                controller.fetchDataFromBucket(options),pipeline);

pubsubMessagePCollection.apply("Convert pubsub to kv,k=vin",ParDo.of(new ConvertPubsubToKVFn()))
                .apply("group by vin key",GroupByKey.<String,String>create())
                .apply("converting message to document type",ParDo.of(
                        new ConvertMessageToDocumentTypeFn(list_of_data)).withSideInputs(list_of_data))
                .apply(MongoDbIO.write()
                .withUri(options.getMongoDBHostName())
                .withDatabase(options.getMongoDBDatabaseName())
                .withCollection(CollectionA));
pipeline.run();

我希望这个 mongodata (list_of_data) 将根据后端更新的要求进行刷新,而无需停止管道.

I want this mongodata (list_of_data) will refresh based on the requirement of updating from the backend without stopping the pipeline.

我尝试查看 GenerateSequence 或触发的方法,但找不到确切的代码来测试这个,请帮助并提供更新的代码,如果可以的话,添加合适的代码来解决我的查询.

I tried looking the approach of GenerateSequence or triggering but unable to find the exact code to test this please help and provide me the updated code if you can by adding suitable code to resolve my queries.

如果需要更多信息,请告诉我.

Please do let me know if need more info.

谢谢

推荐答案

您需要使用 GenerateSequence 定期创建元素,拥有一个读取 MongoDB 的 ParDo,然后使用适当的触发器打开 GlobalWindows.我认为您无法直接使用 MongoDbIO,因为它不支持在像这样的管道中间运行.代码将类似于:

You'll want to use GenerateSequence to periodically create elements, have a ParDo that reads the MongoDB, then window into GlobalWindows with an appropriate trigger. I don't think you'll be able to use MongoDbIO directly, since it doesn't support running in the middle of a pipeline like this. The code will be something like:

PCollectionView<List<String>> list_of_data = pipeline
  .apply(GenerateSequence.from(0).withRate(1, Duration.hours(24))) // adjust polling rate
  .apply(ParDo.of(new DoFn<Long, List<String>>() {
    @ProcessElement
    public void process(@Element long unused) {
      // Read entire DB, and output as a List<String>
    }
  })
  .apply(Window.into(new GlobalWindows()).triggering(AfterPane.elementCountAtLeast(1))));

这篇关于Apache Beam:刷新我正在使用 MongoDbIO.read() 从 MongoDB 读取的侧输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆