如何使用 kafka-node 将 AWS Bitnami 认证的 Kafka AMI 与 Elastic Beanstalk nodejs 环境连接起来 [英] How to connect AWS Bitnami Certified Kafka AMI with Elastic Beanstalk nodejs environment using kafka-node

查看:24
本文介绍了如何使用 kafka-node 将 AWS Bitnami 认证的 Kafka AMI 与 Elastic Beanstalk nodejs 环境连接起来的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将 Bitnami 认证的 Kafka AMI 与 Elastic Beanstalk 连接起来nodejs 环境使用 kafka-node,怎么做?

I'm trying to connect Bitnami Certified Kafka AMI with Elastic Beanstalk nodejs environment using kafka-node, how to do that?

在本地安装 apache Kafka 并使用 Kafka-node 成功测试后,我想使用 AWS kafka 服务器测试我的应用程序.

After installing apache Kafka locally and testing it with Kafka-node successfully, I wanted to test my app with AWS kafka server.

我配置了我的 AWS Bitnami 认证的 Kafka AMI 侦听器以匹配我的 PublicDNS (IPv4) 并在入站规则中公开 9092 和 2181 端口,如下所示:

I configured my AWS Bitnami Certified Kafka AMI listeners to match my Public DNS (IPv4) and exposed the 9092 and 2181 ports in inbound rules like this:

Type            protocol     port    source

Custom TCP Rule    TCP       9092    0.0.0.0/0
Custom TCP Rule    TCP       2181    0.0.0.0/0

<小时>

#server.properties    
listeners=SASL_PLAINTEXT://<Public DNS (IPv4) from AWS>:9092
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://<Public DNS (IPv4) from AWS>:9092

# Hostname and port the broker will advertise to producers and consumers. 
# If not set it uses the value for "listeners" if configured. Otherwise, it  
# will use the value returned from 
# java.net.InetAddress.getCanonicalHostName().
advertised.listeners=SASL_PLAINTEXT://<Public DNS (IPv4) from AWS>:9092

# root directory for all kafka znodes.
zookeeper.connect=<Public DNS (IPv4) from AWS>:2181

我正在使用 kafka-node 设置我的生产者,如下所示:

I'm setting my producer using kafka-node like this:

var Producer = kafka.Producer,
client = new kafka.KafkaClient({ kafkaHost: <kafka-public-ip>:9092}),
producer = new Producer(client);
producer.on('ready', function () {
console.log('Producer is ready');
});
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
})

kafka-node 抛出超时错误 Error: Unable to find available brokers to try

kafka-node is throwing a timeout error Error: Unable to find available brokers to try

我已经使用 telnet open <kafka-instance-public-ip> 测试了默认端口 2222 并且它工作,但端口 9092 不工作.

I have tested the default port 22 with telnet open <kafka-instance-public-ip> 22 and it worked, but port 9092 is not working.

Bitnami Kafka AMI 问题总结:

1- 如何使用 AWS 配置 Bitnami Kafka AMI 以进行远程访问

1- How to configure Bitnami Kafka AMI with AWS to be accessed remotely

推荐答案

所以我如何设置如下:这是两个可以运行的文件,只需要 express 和 kafka-node@3.0.1

so how i have this set up is the following: these are 2 files that can run and only require express and kafka-node@3.0.1

// consumer.js
const kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client('<IP of kafka server>:2181');
    consumer = new Consumer(client,
        [{ topic: '<>'}]
    );
console.log('listening')
consumer.on('message', function (message) {
    console.log(message);
});

consumer.on('error', function (err) {
    console.log('Error:',err);
})

consumer.on('offsetOutOfRange', function (err) {
    console.log('offsetOutOfRange:',err);
})

这是连接到zookeeper,所以我认为你需要有3.0.1版的kafka-node,所以当你安装它时

This is connecting to the zookeeper so i think you would need to have version 3.0.1 of kafka-node so when you install it would be

npm install --save kafka-node@3.0.1

要直接连接到经纪人,您可能需要自己解决.

to connect straight to the broker you might have to figure it out on your own.

// producer.js
const express = require('express');
const kafka = require('kafka-node');

const app = express();
const bodyParser = require('body-parser');

app.use(bodyParser.json()); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ extended: true }));

const { Producer } = kafka;
const client = new kafka.Client('<IP of kafka server>:2181');
const producer = new Producer(client);

producer.on('ready', () => {
  console.log('Producer is ready');
});

producer.on('error', err => {
  console.log('Producer is in error state');
  console.log(err);
});

app.post('/kafkaproducer', (req, res) => {
  const sentMessage = JSON.stringify(req.body.message);
  const payloads = [
    { topic: req.body.topic, messages: sentMessage, partition: 0 },
  ];
  producer.send(payloads, (err, data) => {
    if (data) {
      res.json(data);
    }
    if (err) {
      res.send(err);
    }
  });
});

app.get('/',function(req,res){
    res.json({greeting:'Kafka Producer'})
});

app.listen(5001,function(){
    console.log('Kafka producer running at 5001')
})

您可以使用 postman 向 http://localhost:5001/kafkaproducer 发送 post http 请求格式如下

you can use postman to send a post http request to http://localhost:5001/kafkaproducer in the following format

{
  topic: '<TOPIC YOU WANT>',
  messages: '<Can be any format you want even a json but i would advise just 
    testing with a basic string at first>'
}

然后消费者将接收消息,但要确保主题已在 kafka 服务器上创建,并且您的消费者拥有正确的主题.

then the consumer will pick up the message but make sure the topic has been created on the kafka server and that you have the correct topic on your consumer.

附带说明,如果您使用 EC2 实例,则可以将它们组合起来

on a side note, if you went with a EC2 instance you could combine them

const express = require('express');
const kafka = require('kafka-node');

const app = express();
const bodyParser = require('body-parser');

app.use(bodyParser.json()); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ extended: true }));

const { Producer, Consumer } = kafka;
const client = new kafka.Client('13.56.240.35:2181');
const producer = new Producer(client);
consumer = new Consumer(client,
    [{ topic: 'memes-to-mturk'}]
);

producer.on('ready', () => {
  console.log('Producer is ready');
});

producer.on('error', err => {
  console.log('Producer is in error state');
  console.log(err);
});

consumer.on('message', function (message) {
    console.log(message);
});

consumer.on('error', function (err) {
    console.log('Error:',err);
})


app.get('/',function(req,res){
    res.json({greeting:'Kafka Producer'})
});

app.post('/kafkaproducer', (req, res) => {
  const sentMessage = JSON.stringify(req.body.message);
  console.log(sentMessage);
  const payloads = [
    { topic: req.body.topic, messages: sentMessage, partition: 0 },
  ];
  producer.send(payloads, (err, data) => {
    if (data) {
      res.json(data);
    }
    if (err) {
      res.send(err);
    }
  });
});

app.listen(5002,function(){
    console.log('Kafka producer running at 5001')
})

这篇关于如何使用 kafka-node 将 AWS Bitnami 认证的 Kafka AMI 与 Elastic Beanstalk nodejs 环境连接起来的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆