Apache Beam:刷新我正在使用 MongoDbIO.read() 从 MongoDB 读取的 sideinput 第 2 部分 [英] Apache Beam : Refreshing a sideinput which i am reading from the MongoDB using MongoDbIO.read() Part 2

查看:16
本文介绍了Apache Beam:刷新我正在使用 MongoDbIO.read() 从 MongoDB 读取的 sideinput 第 2 部分的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

不确定这个 GenerateSequence 如何为我工作,因为我必须每小时或每天定期从 Mongo 读取值,创建一个读取 MongoDB 的 ParDo,还使用触发器将窗口添加到 GlobalWindows(触发器我将更新为公关要求).但是下面的代码片段给出了返回类型错误,所以你能帮我更正下面的代码行吗?还可以找到错误的快照.还有这个 Generate Sequence 对我有什么帮助?

Not sure about how this GenerateSequence work for me as i have to read values from Mongo periodically on hourly or on daily basis, created a ParDo that reads the MongoDB, also added window into GlobalWindows with an trigger (trigger i will update as pr requirement). But below code snippet giving return type error so could you please help me to correct below lines of code? Also find snapshot of the error. Also how this Generate Sequence help in my case ?

PCollectionView<List<String>> list_of_vins = pipeline
                  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(5))) // adjust polling rate
                  .apply(ParDo.of(new DoFn<Long, List<String>>() {
                    @ProcessElement
                    public void process(ProcessContext c) {
                      // Read entire DB, and output as a List<String>
                        final String uriString = "mongodb://$[username]:$[password]@$[hostlist]/$[database]?authSource=$[authSource]";
                        MongoClient mongoClient = MongoClients.create(uriString);
                        MongoDatabase mongoDB = mongoClient.getDatabase(options.getMongoDBHostName());
                        MongoCollection<Document> mongoCollection = mongoDB.getCollection(options.getMongoVinCollectionName());
                        c.output((List<String>) ((View) mongoCollection).asList());
                    }
                  })
                  .apply(Window.into(new GlobalWindows()).triggering(AfterPane.elementCountAtLeast(1))));

推荐答案

@danielm 和所有,

@danielm and all,

我已经更新了我的代码,看起来它可以工作,但问题很少,需要澄清才能继续,

I have updated my code and seems its working but few questions and required clarification to go ahead with this,

PCollection<String> list_of_vins_1 = pipeline
            // Generate a tick every 15 seconds
            .apply("Ticker", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(2)))
            // Just to check if individual ticks are being generated once every day
            .apply("Read Data from Mongo DB",ParDo.of(new DoFn<Long, Document>() {
                    @ProcessElement
                    public void processElement(@Element Long tick, OutputReceiver<Document> out) {
                            // reading values from Mongo DB
                            out.output(mongoDocuments);
                        }
                    }
                }
            )).apply("Window", Window.<Document>into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
            .apply(ParDo.of(new ConvertDocuemntToStringFn()));

// convert to mongodata to list of string
PCollectionView<List<String>> list_of_data_1 = list_of_vins_1.apply(View.<String> asList());

我能够根据提到的 Ticker Duration 从 mongo db 读取值,但我不确定这会增加我的侧输入大小.就像我将此 list_of_data_1 作为侧输入传递一样,在管道中它显示添加的元素计数增加.

I am able to read value from mongo db as per Ticker Duration mentioned but i am not sure this will increase my sideinput size. Like as i am passing this list_of_data_1 as a sideinput, in pipeline its shows that counts of elements added in increase.

假设 mongo db 有 20000 个集合,并且如果此代码每 2 分钟运行一次,则添加的元素数将是 20000 乘以代码运行的次数,即 20,000 + 20,0000 + 20,000 + ..... 等等在.

Lets suppose if mongo db has 20000 collections and if this ticker runs every 2 mins then number of elements added will be 20000 multiply by number of times ticker runs i.e 20,000 + 20,0000 + 20,000 + ..... and so on.

所以我的问题是每次在侧输入或侧输入中添加元素时是否都会刷新并且侧输入总是有 20,000 个值或 MongoDB 拥有的任何值,它是附加的还是覆盖的?

这篇关于Apache Beam:刷新我正在使用 MongoDbIO.read() 从 MongoDB 读取的 sideinput 第 2 部分的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆