Apache Beam:刷新我使用MongoDbIO.read()从MongoDB中读取的sideinput [英] Apache Beam : Refreshing a sideinput which i am reading from the MongoDB using MongoDbIO.read() Part 2

查看:121
本文介绍了Apache Beam:刷新我使用MongoDbIO.read()从MongoDB中读取的sideinput的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

不确定该GenerateSequence对我的工作方式,因为我必须每小时或每天定期从Mongo读取值,创建了一个读取MongoDB的ParDo,还使用触发器将窗口添加到GlobalWindows中(触发时我将更新为公关要求).但是下面的代码片段给出了返回类型错误,所以您能帮我纠正下面的代码行吗?还可以找到错误的快照.另外,此生成序列对我的情况有何帮助?

Not sure about how this GenerateSequence work for me as i have to read values from Mongo periodically on hourly or on daily basis, created a ParDo that reads the MongoDB, also added window into GlobalWindows with an trigger (trigger i will update as pr requirement). But below code snippet giving return type error so could you please help me to correct below lines of code? Also find snapshot of the error. Also how this Generate Sequence help in my case ?

PCollectionView<List<String>> list_of_vins = pipeline
                  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(5))) // adjust polling rate
                  .apply(ParDo.of(new DoFn<Long, List<String>>() {
                    @ProcessElement
                    public void process(ProcessContext c) {
                      // Read entire DB, and output as a List<String>
                        final String uriString = "mongodb://$[username]:$[password]@$[hostlist]/$[database]?authSource=$[authSource]";
                        MongoClient mongoClient = MongoClients.create(uriString);
                        MongoDatabase mongoDB = mongoClient.getDatabase(options.getMongoDBHostName());
                        MongoCollection<Document> mongoCollection = mongoDB.getCollection(options.getMongoVinCollectionName());
                        c.output((List<String>) ((View) mongoCollection).asList());
                    }
                  })
                  .apply(Window.into(new GlobalWindows()).triggering(AfterPane.elementCountAtLeast(1))));

推荐答案

@danielm和所有

@danielm and all,

我已经更新了我的代码,似乎可以正常工作,但是很少有问题,需要澄清才能继续进行下去,

I have updated my code and seems its working but few questions and required clarification to go ahead with this,

PCollection<String> list_of_vins_1 = pipeline
            // Generate a tick every 15 seconds
            .apply("Ticker", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(2)))
            // Just to check if individual ticks are being generated once every day
            .apply("Read Data from Mongo DB",ParDo.of(new DoFn<Long, Document>() {
                    @ProcessElement
                    public void processElement(@Element Long tick, OutputReceiver<Document> out) {
                            // reading values from Mongo DB
                            out.output(mongoDocuments);
                        }
                    }
                }
            )).apply("Window", Window.<Document>into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
            .apply(ParDo.of(new ConvertDocuemntToStringFn()));

// convert to mongodata to list of string
PCollectionView<List<String>> list_of_data_1 = list_of_vins_1.apply(View.<String> asList());

我能够按照提到的行情持续时间从mongo db中读取值,但是我不确定这会增加我的sideinput大小.就像我通过传递此list_of_data_1作为sideinput一样,在管道中它显示出添加的元素数量增加了.

I am able to read value from mongo db as per Ticker Duration mentioned but i am not sure this will increase my sideinput size. Like as i am passing this list_of_data_1 as a sideinput, in pipeline its shows that counts of elements added in increase.

让我们假设,如果mongo db有20000个集合,并且如果此代码运行每2分钟运行一次,那么添加的元素数将是20000乘以代码运行的次数,即20,000 + 20,0000 + 20,000 + ....等等在.

Lets suppose if mongo db has 20000 collections and if this ticker runs every 2 mins then number of elements added will be 20000 multiply by number of times ticker runs i.e 20,000 + 20,0000 + 20,000 + ..... and so on.

所以我的问题是是否每次在Side输入中添加元素或sideinput都刷新并且sideinput始终具有20,000值或MongoDB具有的任何值时,它是追加还是覆盖?

这篇关于Apache Beam:刷新我使用MongoDbIO.read()从MongoDB中读取的sideinput的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆