如何使用 kafka-node 将 AWS Bitnami 认证的 Kafka AMI 与 Elastic Beanstalk nodejs 环境连接起来 [英] How to connect AWS Bitnami Certified Kafka AMI with Elastic Beanstalk nodejs environment using kafka-node
问题描述
我正在尝试将 Bitnami 认证的 Kafka AMI 与 Elastic Beanstalk 连接起来nodejs 环境使用 kafka-node,怎么做?
I'm trying to connect Bitnami Certified Kafka AMI with Elastic Beanstalk nodejs environment using kafka-node, how to do that?
在本地安装 apache Kafka 并使用 Kafka-node 成功测试后,我想使用 AWS kafka 服务器测试我的应用程序.
After installing apache Kafka locally and testing it with Kafka-node successfully, I wanted to test my app with AWS kafka server.
我配置了我的 AWS Bitnami 认证的 Kafka AMI 侦听器以匹配我的 PublicDNS (IPv4) 并在入站规则中公开 9092 和 2181 端口,如下所示:
I configured my AWS Bitnami Certified Kafka AMI listeners to match my Public DNS (IPv4) and exposed the 9092 and 2181 ports in inbound rules like this:
Type protocol port source
Custom TCP Rule TCP 9092 0.0.0.0/0
Custom TCP Rule TCP 2181 0.0.0.0/0
<小时>
#server.properties
listeners=SASL_PLAINTEXT://<Public DNS (IPv4) from AWS>:9092
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://<Public DNS (IPv4) from AWS>:9092
# Hostname and port the broker will advertise to producers and consumers.
# If not set it uses the value for "listeners" if configured. Otherwise, it
# will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.listeners=SASL_PLAINTEXT://<Public DNS (IPv4) from AWS>:9092
# root directory for all kafka znodes.
zookeeper.connect=<Public DNS (IPv4) from AWS>:2181
我正在使用 kafka-node 设置我的生产者,如下所示:
I'm setting my producer using kafka-node like this:
var Producer = kafka.Producer,
client = new kafka.KafkaClient({ kafkaHost: <kafka-public-ip>:9092}),
producer = new Producer(client);
producer.on('ready', function () {
console.log('Producer is ready');
});
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
})
kafka-node 抛出超时错误 Error: Unable to find available brokers to try
kafka-node is throwing a timeout error Error: Unable to find available brokers to try
我已经使用 telnet open <kafka-instance-public-ip> 测试了默认端口 2222
并且它工作,但端口 9092 不工作.
I have tested the default port 22 with telnet open <kafka-instance-public-ip> 22
and it worked, but port 9092 is not working.
Bitnami Kafka AMI 问题总结:
1- 如何使用 AWS 配置 Bitnami Kafka AMI 以进行远程访问
1- How to configure Bitnami Kafka AMI with AWS to be accessed remotely
推荐答案
所以我如何设置如下:这是两个可以运行的文件,只需要 express 和 kafka-node@3.0.1
so how i have this set up is the following: these are 2 files that can run and only require express and kafka-node@3.0.1
// consumer.js
const kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client('<IP of kafka server>:2181');
consumer = new Consumer(client,
[{ topic: '<>'}]
);
console.log('listening')
consumer.on('message', function (message) {
console.log(message);
});
consumer.on('error', function (err) {
console.log('Error:',err);
})
consumer.on('offsetOutOfRange', function (err) {
console.log('offsetOutOfRange:',err);
})
这是连接到zookeeper,所以我认为你需要有3.0.1版的kafka-node,所以当你安装它时
This is connecting to the zookeeper so i think you would need to have version 3.0.1 of kafka-node so when you install it would be
npm install --save kafka-node@3.0.1
要直接连接到经纪人,您可能需要自己解决.
to connect straight to the broker you might have to figure it out on your own.
// producer.js
const express = require('express');
const kafka = require('kafka-node');
const app = express();
const bodyParser = require('body-parser');
app.use(bodyParser.json()); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ extended: true }));
const { Producer } = kafka;
const client = new kafka.Client('<IP of kafka server>:2181');
const producer = new Producer(client);
producer.on('ready', () => {
console.log('Producer is ready');
});
producer.on('error', err => {
console.log('Producer is in error state');
console.log(err);
});
app.post('/kafkaproducer', (req, res) => {
const sentMessage = JSON.stringify(req.body.message);
const payloads = [
{ topic: req.body.topic, messages: sentMessage, partition: 0 },
];
producer.send(payloads, (err, data) => {
if (data) {
res.json(data);
}
if (err) {
res.send(err);
}
});
});
app.get('/',function(req,res){
res.json({greeting:'Kafka Producer'})
});
app.listen(5001,function(){
console.log('Kafka producer running at 5001')
})
您可以使用 postman 向 http://localhost:5001/kafkaproducer 发送 post http 请求格式如下
you can use postman to send a post http request to http://localhost:5001/kafkaproducer in the following format
{
topic: '<TOPIC YOU WANT>',
messages: '<Can be any format you want even a json but i would advise just
testing with a basic string at first>'
}
然后消费者将接收消息,但要确保主题已在 kafka 服务器上创建,并且您的消费者拥有正确的主题.
then the consumer will pick up the message but make sure the topic has been created on the kafka server and that you have the correct topic on your consumer.
附带说明,如果您使用 EC2 实例,则可以将它们组合起来
on a side note, if you went with a EC2 instance you could combine them
const express = require('express');
const kafka = require('kafka-node');
const app = express();
const bodyParser = require('body-parser');
app.use(bodyParser.json()); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ extended: true }));
const { Producer, Consumer } = kafka;
const client = new kafka.Client('13.56.240.35:2181');
const producer = new Producer(client);
consumer = new Consumer(client,
[{ topic: 'memes-to-mturk'}]
);
producer.on('ready', () => {
console.log('Producer is ready');
});
producer.on('error', err => {
console.log('Producer is in error state');
console.log(err);
});
consumer.on('message', function (message) {
console.log(message);
});
consumer.on('error', function (err) {
console.log('Error:',err);
})
app.get('/',function(req,res){
res.json({greeting:'Kafka Producer'})
});
app.post('/kafkaproducer', (req, res) => {
const sentMessage = JSON.stringify(req.body.message);
console.log(sentMessage);
const payloads = [
{ topic: req.body.topic, messages: sentMessage, partition: 0 },
];
producer.send(payloads, (err, data) => {
if (data) {
res.json(data);
}
if (err) {
res.send(err);
}
});
});
app.listen(5002,function(){
console.log('Kafka producer running at 5001')
})
这篇关于如何使用 kafka-node 将 AWS Bitnami 认证的 Kafka AMI 与 Elastic Beanstalk nodejs 环境连接起来的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!