如何使用kafka-node将AWS Bitnami认证的Kafka AMI与Elastic Beanstalk nodejs环境连接 [英] How to connect AWS Bitnami Certified Kafka AMI with Elastic Beanstalk nodejs environment using kafka-node

查看:80
本文介绍了如何使用kafka-node将AWS Bitnami认证的Kafka AMI与Elastic Beanstalk nodejs环境连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将 Bitnami认证的Kafka AMI 与Elastic Beanstalk连接使用 kafka-node 的nodejs环境,该怎么做?

I'm trying to connect Bitnami Certified Kafka AMI with Elastic Beanstalk nodejs environment using kafka-node, how to do that?

在本地安装apache Kafka并成功使用Kafka-node测试它之后,我想使用AWS kafka服务器测试我的应用程序.

After installing apache Kafka locally and testing it with Kafka-node successfully, I wanted to test my app with AWS kafka server.

我配置了我的AWS 通过Bitnami认证的Kafka AMI 侦听器,以与我的Public匹配DNS(IPv4),并在入站规则中公开9092和2181端口,如下所示:

I configured my AWS Bitnami Certified Kafka AMI listeners to match my Public DNS (IPv4) and exposed the 9092 and 2181 ports in inbound rules like this:

Type            protocol     port    source

Custom TCP Rule    TCP       9092    0.0.0.0/0
Custom TCP Rule    TCP       2181    0.0.0.0/0


#server.properties    
listeners=SASL_PLAINTEXT://<Public DNS (IPv4) from AWS>:9092
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://<Public DNS (IPv4) from AWS>:9092

# Hostname and port the broker will advertise to producers and consumers. 
# If not set it uses the value for "listeners" if configured. Otherwise, it  
# will use the value returned from 
# java.net.InetAddress.getCanonicalHostName().
advertised.listeners=SASL_PLAINTEXT://<Public DNS (IPv4) from AWS>:9092

# root directory for all kafka znodes.
zookeeper.connect=<Public DNS (IPv4) from AWS>:2181

我正在使用kafka-node这样设置我的生产者:

I'm setting my producer using kafka-node like this:

var Producer = kafka.Producer,
client = new kafka.KafkaClient({ kafkaHost: <kafka-public-ip>:9092}),
producer = new Producer(client);
producer.on('ready', function () {
console.log('Producer is ready');
});
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
})

kafka节点抛出超时错误Error: Unable to find available brokers to try

kafka-node is throwing a timeout error Error: Unable to find available brokers to try

我已经用telnet open <kafka-instance-public-ip> 22测试了默认端口22,它可以工作,但是端口9092无法工作.

I have tested the default port 22 with telnet open <kafka-instance-public-ip> 22 and it worked, but port 9092 is not working.

Bitnami Kafka AMI问题总结:

1-如何配置可通过AWS远程访问的Bitnami Kafka AMI

1- How to configure Bitnami Kafka AMI with AWS to be accessed remotely

推荐答案

因此,我如何进行以下设置: 这是两个可以运行的文件,仅需要express和kafka-node@3.0.1

so how i have this set up is the following: these are 2 files that can run and only require express and kafka-node@3.0.1

// consumer.js
const kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client('<IP of kafka server>:2181');
    consumer = new Consumer(client,
        [{ topic: '<>'}]
    );
console.log('listening')
consumer.on('message', function (message) {
    console.log(message);
});

consumer.on('error', function (err) {
    console.log('Error:',err);
})

consumer.on('offsetOutOfRange', function (err) {
    console.log('offsetOutOfRange:',err);
})

这正在连接到zookeeper,所以我认为您需要安装kafka-node 3.0.1版,因此在安装时将是

This is connecting to the zookeeper so i think you would need to have version 3.0.1 of kafka-node so when you install it would be

npm install --save kafka-node@3.0.1

要直接连接到代理,您可能必须自己弄清楚它.

to connect straight to the broker you might have to figure it out on your own.

// producer.js
const express = require('express');
const kafka = require('kafka-node');

const app = express();
const bodyParser = require('body-parser');

app.use(bodyParser.json()); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ extended: true }));

const { Producer } = kafka;
const client = new kafka.Client('<IP of kafka server>:2181');
const producer = new Producer(client);

producer.on('ready', () => {
  console.log('Producer is ready');
});

producer.on('error', err => {
  console.log('Producer is in error state');
  console.log(err);
});

app.post('/kafkaproducer', (req, res) => {
  const sentMessage = JSON.stringify(req.body.message);
  const payloads = [
    { topic: req.body.topic, messages: sentMessage, partition: 0 },
  ];
  producer.send(payloads, (err, data) => {
    if (data) {
      res.json(data);
    }
    if (err) {
      res.send(err);
    }
  });
});

app.get('/',function(req,res){
    res.json({greeting:'Kafka Producer'})
});

app.listen(5001,function(){
    console.log('Kafka producer running at 5001')
})

您可以使用邮递员将http请求发送到 http://localhost:5001/kafkaproducer 格式如下

you can use postman to send a post http request to http://localhost:5001/kafkaproducer in the following format

{
  topic: '<TOPIC YOU WANT>',
  messages: '<Can be any format you want even a json but i would advise just 
    testing with a basic string at first>'
}

然后,消费者将获取消息,但请确保已在kafka服务器上创建了该主题,并且您在消费者上具有正确的主题.

then the consumer will pick up the message but make sure the topic has been created on the kafka server and that you have the correct topic on your consumer.

附带说明,如果您使用EC2实例,则可以将它们组合在一起

on a side note, if you went with a EC2 instance you could combine them

const express = require('express');
const kafka = require('kafka-node');

const app = express();
const bodyParser = require('body-parser');

app.use(bodyParser.json()); // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({ extended: true }));

const { Producer, Consumer } = kafka;
const client = new kafka.Client('13.56.240.35:2181');
const producer = new Producer(client);
consumer = new Consumer(client,
    [{ topic: 'memes-to-mturk'}]
);

producer.on('ready', () => {
  console.log('Producer is ready');
});

producer.on('error', err => {
  console.log('Producer is in error state');
  console.log(err);
});

consumer.on('message', function (message) {
    console.log(message);
});

consumer.on('error', function (err) {
    console.log('Error:',err);
})


app.get('/',function(req,res){
    res.json({greeting:'Kafka Producer'})
});

app.post('/kafkaproducer', (req, res) => {
  const sentMessage = JSON.stringify(req.body.message);
  console.log(sentMessage);
  const payloads = [
    { topic: req.body.topic, messages: sentMessage, partition: 0 },
  ];
  producer.send(payloads, (err, data) => {
    if (data) {
      res.json(data);
    }
    if (err) {
      res.send(err);
    }
  });
});

app.listen(5002,function(){
    console.log('Kafka producer running at 5001')
})

这篇关于如何使用kafka-node将AWS Bitnami认证的Kafka AMI与Elastic Beanstalk nodejs环境连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆