Apache Beam:刷新我使用MongoDbIO.read()从MongoDB中读取的sideinput [英] Apache Beam : Refreshing a sideinput which i am reading from the MongoDB using MongoDbIO.read()

查看:209
本文介绍了Apache Beam:刷新我使用MongoDbIO.read()从MongoDB中读取的sideinput的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从MongoDB中读取一个PCollection mongodata,并将此PCollection用作我的ParDo(DoFN).withSideInputs(PCollection)的sideInput

I am reading a PCollection mongodata from the MongoDB and using this PCollection as a sideInput to my ParDo(DoFN).withSideInputs(PCollection)

从后端,我的 MongoDB集合每天或每月或每年可能更新.我需要在管道中增加新的价值.

And from Backend my MongoDB collection is updating on a daily or monthly basis or a yearly may be. And i need that newly added value in my pipeline.

我们可以认为这是在正在运行的管道中刷新mongo集合值.例如,mongo集合总共有2万个文档,一天之后,mongo集合中又增加了三个记录,那么我需要在我的桩中增加三个值,总计为20003.

We can consider this as refreshing the mongo collection value in a running pipeline. For example of mongo collection has total 20K documents and after one day three more records added into mongo collection then i need that three more values in my pileine which will be 20,003 in total.

当前,我的管道如下所示.

Currently my pipeline looks like this.

PCollection<String> mongodata =  pipeline.apply(MongoDbIO.read()
                .withUri(options.getMongoDBHostName())
                .withDatabase(options.getMongoDBDatabaseName())
                .withCollection(options.getMongoVinCollectionName()))
                .apply(ParDo.of(new ConvertDocuemntToStringFn()));

PCollectionView<List<String>> list_of_data = mongodata.apply(View.<String> asList());

PCollection<PubsubMessage>  pubsubMessagePCollection = controller.flattenPubSubPCollection(
                controller.fetchDataFromBucket(options),pipeline);

pubsubMessagePCollection.apply("Convert pubsub to kv,k=vin",ParDo.of(new ConvertPubsubToKVFn()))
                .apply("group by vin key",GroupByKey.<String,String>create())
                .apply("converting message to document type",ParDo.of(
                        new ConvertMessageToDocumentTypeFn(list_of_data)).withSideInputs(list_of_data))
                .apply(MongoDbIO.write()
                .withUri(options.getMongoDBHostName())
                .withDatabase(options.getMongoDBDatabaseName())
                .withCollection(CollectionA));
pipeline.run();

我希望此mongodata(list_of_data)可以根据从后端更新的需求刷新而无需停止管道.

I want this mongodata (list_of_data) will refresh based on the requirement of updating from the backend without stopping the pipeline.

我尝试查找GenerateSequence或触发的方法,但无法找到确切的代码来测试此问题,请帮助,如果可以通过添加合适的代码来解决我的查询,请提供更新的代码.

I tried looking the approach of GenerateSequence or triggering but unable to find the exact code to test this please help and provide me the updated code if you can by adding suitable code to resolve my queries.

请让我知道是否需要更多信息.

Please do let me know if need more info.

谢谢

推荐答案

您将要使用GenerateSequence定期创建元素,拥有一个读取MongoDB的ParDo,然后使用适当的触发器进入GlobalWindows.我认为您不能直接使用MongoDbIO,因为它不支持在这样的管道中间运行.该代码将类似于:

You'll want to use GenerateSequence to periodically create elements, have a ParDo that reads the MongoDB, then window into GlobalWindows with an appropriate trigger. I don't think you'll be able to use MongoDbIO directly, since it doesn't support running in the middle of a pipeline like this. The code will be something like:

PCollectionView<List<String>> list_of_data = pipeline
  .apply(GenerateSequence.from(0).withRate(1, Duration.hours(24))) // adjust polling rate
  .apply(ParDo.of(new DoFn<Long, List<String>>() {
    @ProcessElement
    public void process(@Element long unused) {
      // Read entire DB, and output as a List<String>
    }
  })
  .apply(Window.into(new GlobalWindows()).triggering(AfterPane.elementCountAtLeast(1))));

这篇关于Apache Beam:刷新我使用MongoDbIO.read()从MongoDB中读取的sideinput的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆