我可以限制kafka-node消费者的消费吗? [英] can I limit consumption of kafka-node consumer?
问题描述
好像我的kafka节点使用者:
It seems like my kafka node consumer:
var kafka = require('kafka-node');
var consumer = new Consumer(client, [], {
...
});
在某些情况下获取太多消息,超出了我的处理能力.有没有一种方法可以限制它(例如,每秒可能接收到不超过1000条消息,可能使用暂停API?)
is fetching way too many messages than I can handle in certain cases. Is there a way to limit it (for example accept no more than 1000 messages per second, possibly using the pause api?)
- 我使用的是kafka-node,与Java版本相比,它的api似乎很有限
推荐答案
我遇到了类似的情况,我在使用来自Kafka的消息,并且必须限制使用量,因为我的消费者服务依赖于第三方API,而第三方API拥有自己的API约束.
I had a similar situation where I was consuming messages from Kafka and had to throttle the consumption because my consumer service was dependent on a third party API which had its own constraints.
我使用了 async/queue
以及称为 asyncTimedCargo
的 async/cargo
包装器进行批处理.货物从kafka消费者那里获取所有消息,并在达到大小限制 batch_config.batch_size
或超时 batch_config.batch_timeout
时将其发送到队列. async/queue
提供了 saturated
和 unsaturated
回调,如果队列任务工作人员很忙,您可以使用它们来停止消耗.这将阻止货物装满,并且您的应用程序不会耗尽内存.消耗会在不饱和状态下恢复.
I used async/queue
along with a wrapper of async/cargo
called asyncTimedCargo
for batching purpose.
The cargo gets all the messages from the kafka-consumer and sends it to queue upon reaching a size limit batch_config.batch_size
or timeout batch_config.batch_timeout
.
async/queue
provides saturated
and unsaturated
callbacks which you can use to stop the consumption if your queue task workers are busy. This would stop the cargo from filling up and your app would not run out of memory. The consumption would resume upon unsaturation.
//cargo-service.js
module.exports = function(key){
return new asyncTimedCargo(function(tasks, callback) {
var length = tasks.length;
var postBody = [];
for(var i=0;i<length;i++){
var message ={};
var task = JSON.parse(tasks[i].value);
message = task;
postBody.push(message);
}
var postJson = {
"json": {"request":postBody}
};
sms_queue.push(postJson);
callback();
}, batch_config.batch_size, batch_config.batch_timeout)
};
//kafka-consumer.js
cargo = cargo-service()
consumer.on('message', function (message) {
if(message && message.value && utils.isValidJsonString(message.value)) {
var msgObject = JSON.parse(message.value);
cargo.push(message);
}
else {
logger.error('Invalid JSON Message');
}
});
// sms-queue.js
var sms_queue = queue(
retryable({
times: queue_config.num_retries,
errorFilter: function (err) {
logger.info("inside retry");
console.log(err);
if (err) {
return true;
}
else {
return false;
}
}
}, function (task, callback) {
// your worker task for queue
callback()
}), queue_config.queue_worker_threads);
sms_queue.saturated = function() {
consumer.pause();
logger.warn('Queue saturated Consumption paused: ' + sms_queue.running());
};
sms_queue.unsaturated = function() {
consumer.resume();
logger.info('Queue unsaturated Consumption resumed: ' + sms_queue.running());
};
这篇关于我可以限制kafka-node消费者的消费吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!