我可以限制 kafka 节点消费者的消费吗? [英] can I limit consumption of kafka-node consumer?

查看:44
本文介绍了我可以限制 kafka 节点消费者的消费吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

好像是我的 kafka 节点消费者:

It seems like my kafka node consumer:

var kafka = require('kafka-node');
var consumer = new Consumer(client, [], {
     ...
    });

在某些情况下获取的消息太多,我无法处理.有没有办法限制它(例如每秒接受不超过 1000 条消息,可能使用暂停 api?)

is fetching way too many messages than I can handle in certain cases. Is there a way to limit it (for example accept no more than 1000 messages per second, possibly using the pause api?)

  • 我正在使用 kafka-node,与 Java 版本相比,它的 api 似乎有限

推荐答案

我遇到了类似的情况,我从 Kafka 消费消息,不得不限制消费,因为我的消费服务依赖于有自己的第三方 API约束.

I had a similar situation where I was consuming messages from Kafka and had to throttle the consumption because my consumer service was dependent on a third party API which had its own constraints.

我使用了 async/queue 以及一个名为 asyncTimedCargoasync/cargo 包装器来进行批处理.货物从 kafka-consumer 获取所有消息,并在达到大小限制 batch_config.batch_size 或超时 batch_config.batch_timeout 时将其发送到队列.async/queue 提供了 saturated未饱和 回调,如果您的队列任务工作人员很忙,您可以使用它们来停止消耗.这将阻止货物填满,您的应用程序不会耗尽内存.消耗将在不饱和时恢复.

I used async/queue along with a wrapper of async/cargo called asyncTimedCargo for batching purpose. The cargo gets all the messages from the kafka-consumer and sends it to queue upon reaching a size limit batch_config.batch_size or timeout batch_config.batch_timeout. async/queue provides saturated and unsaturated callbacks which you can use to stop the consumption if your queue task workers are busy. This would stop the cargo from filling up and your app would not run out of memory. The consumption would resume upon unsaturation.

//cargo-service.js
module.exports = function(key){
    return new asyncTimedCargo(function(tasks, callback) {
        var length = tasks.length;
        var postBody = [];
        for(var i=0;i<length;i++){
            var message ={};
            var task = JSON.parse(tasks[i].value);
            message = task;
            postBody.push(message);
        }
        var postJson = {
            "json": {"request":postBody}
        };
        sms_queue.push(postJson);
        callback();
    }, batch_config.batch_size, batch_config.batch_timeout)
};

//kafka-consumer.js
cargo = cargo-service()
consumer.on('message', function (message) {
    if(message && message.value && utils.isValidJsonString(message.value)) {
        var msgObject = JSON.parse(message.value);        
        cargo.push(message);
    }
    else {
        logger.error('Invalid JSON Message');
    }
});

// sms-queue.js
var sms_queue = queue(
retryable({
    times: queue_config.num_retries,
    errorFilter: function (err) {
        logger.info("inside retry");
        console.log(err);
        if (err) {
            return true;
        }
        else {
            return false;
        }
    }
}, function (task, callback) {
// your worker task for queue
  callback()
}), queue_config.queue_worker_threads);

sms_queue.saturated = function() {
    consumer.pause();
    logger.warn('Queue saturated Consumption paused: ' + sms_queue.running());
};
sms_queue.unsaturated = function() {
    consumer.resume();
    logger.info('Queue unsaturated Consumption resumed: ' + sms_queue.running());
};

这篇关于我可以限制 kafka 节点消费者的消费吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆