我可以限制kafka-node消费者的消费吗? [英] can I limit consumption of kafka-node consumer?

查看:86
本文介绍了我可以限制kafka-node消费者的消费吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

好像我的kafka节点使用者:

It seems like my kafka node consumer:

var kafka = require('kafka-node');
var consumer = new Consumer(client, [], {
     ...
    });

在某些情况下获取太多消息,超出了我的处理能力.有没有一种方法可以限制它(例如,每秒可能接收到不超过1000条消息,可能使用暂停API?)

is fetching way too many messages than I can handle in certain cases. Is there a way to limit it (for example accept no more than 1000 messages per second, possibly using the pause api?)

  • 我使用的是kafka-node,与Java版本相比,它的api似乎很有限

推荐答案

我遇到了类似的情况,我在使用来自Kafka的消息,并且必须限制使用量,因为我的消费者服务依赖于第三方API,而第三方API拥有自己的API约束.

I had a similar situation where I was consuming messages from Kafka and had to throttle the consumption because my consumer service was dependent on a third party API which had its own constraints.

我使用了 async/queue 以及称为 asyncTimedCargo async/cargo 包装器进行批处理.货物从kafka消费者那里获取所有消息,并在达到大小限制 batch_config.batch_size 或超时 batch_config.batch_timeout 时将其发送到队列. async/queue 提供了 saturated unsaturated 回调,如果队列任务工作人员很忙,您可以使用它们来停止消耗.这将阻止货物装满,并且您的应用程序不会耗尽内存.消耗会在不饱和状态下恢复.

I used async/queue along with a wrapper of async/cargo called asyncTimedCargo for batching purpose. The cargo gets all the messages from the kafka-consumer and sends it to queue upon reaching a size limit batch_config.batch_size or timeout batch_config.batch_timeout. async/queue provides saturated and unsaturated callbacks which you can use to stop the consumption if your queue task workers are busy. This would stop the cargo from filling up and your app would not run out of memory. The consumption would resume upon unsaturation.

//cargo-service.js
module.exports = function(key){
    return new asyncTimedCargo(function(tasks, callback) {
        var length = tasks.length;
        var postBody = [];
        for(var i=0;i<length;i++){
            var message ={};
            var task = JSON.parse(tasks[i].value);
            message = task;
            postBody.push(message);
        }
        var postJson = {
            "json": {"request":postBody}
        };
        sms_queue.push(postJson);
        callback();
    }, batch_config.batch_size, batch_config.batch_timeout)
};

//kafka-consumer.js
cargo = cargo-service()
consumer.on('message', function (message) {
    if(message && message.value && utils.isValidJsonString(message.value)) {
        var msgObject = JSON.parse(message.value);        
        cargo.push(message);
    }
    else {
        logger.error('Invalid JSON Message');
    }
});

// sms-queue.js
var sms_queue = queue(
retryable({
    times: queue_config.num_retries,
    errorFilter: function (err) {
        logger.info("inside retry");
        console.log(err);
        if (err) {
            return true;
        }
        else {
            return false;
        }
    }
}, function (task, callback) {
// your worker task for queue
  callback()
}), queue_config.queue_worker_threads);

sms_queue.saturated = function() {
    consumer.pause();
    logger.warn('Queue saturated Consumption paused: ' + sms_queue.running());
};
sms_queue.unsaturated = function() {
    consumer.resume();
    logger.info('Queue unsaturated Consumption resumed: ' + sms_queue.running());
};

这篇关于我可以限制kafka-node消费者的消费吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆