承诺在循环内关闭 [英] Promise closure within loop

查看:73
本文介绍了承诺在循环内关闭的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我每秒都从Kafka接收数据行。对于每一批数据,我都将插入到数据库中。

I am receiving rows of data every second from Kafka. For each batch of data, I am inserting into my database.

我的应用不断读取最后的条消息 id 每批。这里的问题是承诺不是按顺序运行,而是在一批完成后并发运行,并且它们始终读取相同的 message id 。我希望每个承诺都拥有自己的消息 id ,具体取决于它们从for邮件中输入的顺序-

My app keeps reading the last message and id of each batch. The issue here is that the promises are not running in series, but running concurrently after one batch is finished, and they keep reading the same message and id. I want each promise to have it's own message and id, as defined by the order they came in from the for-loop in the first function.

我认为我需要使用闭包,但是我不确定如何在此处应用它们。
我不想使用计时器!

I think I need to use closures, however I am not sure how I can apply them here. I don't want to use timers!

谢谢!

// This is live data, coming in concurrently, forever. Promises from previous batch must be resolved before the next batch is received.
batchOfRows.on('message', function (data) {
    for (var i = 0; i < batchOfRows.rows.length; i++) {
        validate(batchOfRows.rows[i])
            .then(result => console.log(result))
            .catch(error => console.log(error));
    }
});

// For each row received, give it an ID and then insert into the DB
function validate(data) {
    return new Promise((resolve, reject) => {
        message = data;
        id = message.date + message.location
        DB.execute('select * from table1 where id = ?', id) // This is a promise function provided by the database driver (Cassandra)
            .then(result => {
                // Insert into the table at this ID
                insertIntoDB(message, id)
                    .then(result => resolve(result))
                    .catch(error => reject(error));
            })
            .catch(error => {
                reject(error);
            });
    });
}

// Inserting into DB
function insertIntoDB(message, id) {
    return new Promise((resolve, reject) => {
        query = "insert into table2 where id = ? and messageBody = ?";

        DB.execute(query, [id, JSON.Stringify(message)])
            .then(result => resolve("Successfully inserted message ID " + id))
            .catch(error => reject("Error inserting!"));
    });
}

编辑(danh解决方案):

var kafka = require('kafka-node');
client = new kafka.Client("localhost:2181"), Consumer = kafka.Consumer;
// This is like an event listener.
batchOfRows = new Consumer(
    client, [{
        topic: 'my_topic',
        partition: 0,
        offset: 0
    }], {
        fromOffset: false
    }
);

let results = [];
let promises = Promise.resolve();

function processQueue() {
    queue.forEach(element => {
        promises = promises.then(element.map(processElement)).then(elementResult => {
            // results.push(elementResult); // Don't want result to increase in size! I have put this inside insertDB then I clear it below
            console.log(results.length); // First received batch prints: 0. Second received batch prints 72. Third received batch prints 75
            results = [];  
            queue.shift();
        });
    });
}

batchOfRows.on('message', function (data) {
    console.log(batchOfRows.value.length); // First received batch prints: 72. Second received batch prints 75. Third received batch prints 76
    queue.push(batchOfRows.rows);
    processQueue();
});

function processElement(data) {
    const id = data.date + data.location
    return  DB.execute('select * from table1 where id = ?', id)
              .then(result => insertIntoDB(data, id).then(() => result));
}

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)])
        .then(result => {
            // Pushing the result here
            results.push(result); // Seems like it does not push the results from the first batch from batchOfRows until it receives the second batch
            console.log("Test") // On the first batch prints "Test" 72 times right away
        });
}

编辑
我已经修改了通过添加element.map(processUpdate)来使processQueue稍微起作用,因为从batchOfRows接收的批处理实际上是数组,因此我需要对该数组中的每个项目执行该数据库查询。

EDIT I have modified the processQueue function just slightly by adding a element.map(processUpdate) because the batches received from batchOfRows are actually arrays, and I need to perform that DB query for each item inside that array.

我也删除了result.push(elementResult),因为由于某种原因elementResult实际上是未定义的。我已经将results.push(elementResult)移到insertIntoDB中,并将其命名为results.push(result)。这可能是错误产生的地方(我不知道如何将insertIntoDB的结果返回给调用诺言函数processQueue)。

I have also removed results.push(elementResult) because elementResult is actually undefined for some reason. I have moved results.push(elementResult) into insertIntoDB and named it as results.push(result). This may be where the error originates (I don't know how to return the result from insertIntoDB back to the calling promise function processQueue).

如果您浏览一下insertIntoDB,如果我console.log( test),它将打印测试次数与batchOfRows数组中的元素相同,表明它已解决该批次中的所有诺言。因此,在第一个批次/消息上,如果有72行,它将打印72次测试。但是,如果我将console.log( Test)更改为results.push(result),甚至results.push( test),然后打印results.length,它仍然会给我0,直到第二批完成即使我希望长度为72。

If you take a glance at insertIntoDB, if I console.log("test") it will print test the same number of times as there are elements in the batchOfRows array, signifying that it has resolved all promises in that batch. So on the first batch/message, if there are 72 rows, it will print "Test" 72 times. But if I change that console.log("Test") to simply results.push(result), or even results.push("test"), and then print results.length it will still give me 0 until the second batch completes even though I expect the length to be 72.

推荐答案

将思想抽象一些并代表它们可能会有所帮助在数据中明确显示(而不是在承诺中隐含保留的数据)。从队列开始:

It might be helpful to abstract the ideas a little bit, and represnt them explicitly in data (rather than data retained implictly in the promises). Start with a queue:

let queue = [];

使用 queue.push(element)并按照 element = queue.shift()

到达的顺序来获取和删除是按顺序处理队列中的所有内容,按顺序保存结果。处理本身是异步的,我们想在开始下一个队列项目之前先完成它,因此我们需要一连串的诺言(称为 promises )来处理队列:

Our goal is to process whatever's on the queue, in the order, saving the results in order. The processing itself is async, and we want to finish one queue item before starting the next, so we need a chain of promises (called promises) to process the queue:

let results = [];
let promises = Promise.resolve();

function processQueue() {
    queue.forEach(element => {
        promises = promises.then(processElement(element)).then(elementResult => {
            results.push(elementResult);
            queue.shift();
        });
    });
}

我们可以说服自己这是对的,而无需考虑 processElement()可以,只要它返回一个承诺即可。 (在OP情况下,该诺言就是处理一系列行的诺言)。 processElement()就可以了,结果(OP情况下的结果数组)将被推送到结果

We can convince ourselves that this is right without even thinking about what processElement() does, so long as it returns a promise. (In the OP case, that promise is a promise to deal with an array of "rows"). processElement() will do it's thing, and the result (an array of results in the OP case) will get pushed to results.

确信操作的顺序是有意义的,当新批次到达时,将其添加到队列中,然后处理队列中的所有内容:

Confident that the ordering of operations makes sense, when a new batch arrives, add it to the queue, and then process whatever's on the queue:

batchOfRows.on('message', function (data) {
    queue.push(batchOfRows.rows);
    processQueue();
});

我们只需要定义 processElement() 。在此使用@YuryTarabanko的有用建议(并在IMO上将答案标记为正确)

We just need to define processElement(). Use @YuryTarabanko's helpful suggestions for that here (and leave his answer marked correct, IMO)

function processElement(data) {
    const id = data.date + data.location
    return  DB.execute('select * from table1 where id = ?', id)
              .then(result => insertIntoDB(data, id).then(() => result));
}

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)])
}

一个不错的副作用是您可以衡量进度。如果输入的到达速度太快,则表达式为:

One nice side-effect of this is that you can measure progress. If the inputs are arriving too fast then the expression:

queue.length - results.length

...会随着时间增长。

... will grow over time.

编辑看着更新的代码,我为为什么对每一行( batchOfRows.rows 中的每个元素)进行查询感到困惑。由于该查询的结果将被忽略,请不要执行...

EDIT Looking at the newer code, I am puzzled by why a query is done for each row (each element in batchOfRows.rows). Since the result of that query is ignored, don't do it...

function processElement(data) {
    const id = data.date + data.location
    // we know everything we need to know to call insert (data and id)
    // just call it and return what it returns :-)
    return insertIntoDB(data, id);
}

我现在知道这将是一项长期的工作,应该不能累积结果(甚至是线性的)。为此,更清洁的解决方案是删除我建议的对 results 数组的所有引用。最小版本的insert只是插入并返回插入的结果...

I understand now that this will be a long-running task, and it shouldn't accumulate results (even linearly). The cleaner fix for that is remove every reference to the results array that I suggested. The minimal version of insert just inserts and returns the result of the insertion...

function insertIntoDB(message, id) {
    const query = "insert into table2 where id = ? and messageBody = ?";
    return DB.execute(query, [id, JSON.Stringify(message)]);
}

我认为您添加了一些代码来记录结果(更好的测试证明它有效将是通过某些外部过程检查数据库,但是如果要记录日志,只需记住在记录后传递结果值。

I think you added some code to log results (a better test that it worked would be to check the database via some outside process, but if you want to log, just remember to pass-through the result value after logging.

anyPromise.then(result => {
    console.log(result);
    return result;  // IMPORTANT
})

这篇关于承诺在循环内关闭的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆