无法访问来自 EC2 上融合的 kafka 的消息 [英] Not able to access messages from confluent kafka on EC2

查看:29
本文介绍了无法访问来自 EC2 上融合的 kafka 的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Confluent Kafka 5.0.0 已安装在具有公共 IP 的 AWS EC2 上,例如 54.XX.XX.XX使用 0.0.0.0 在 EC2 机器上打开端口 9092

Confluent Kafka 5.0.0 has been installed on AWS EC2 which has Public IP say 54.XX.XX.XX Opened port 9092 on the EC2 machine with 0.0.0.0

在/etc/kafka/server.properties 我有

In /etc/kafka/server.properties I have

advertised.listeners=PLAINTEXT://54.XX.XX.XX:9092  
listeners=PLAINTEXT://0.0.0.0:9092

/etc/kafka/producer.properties 我有 bootstrap.servers=0.0.0.0:9092

在本地机器上在 /etc/kafka/consumer.properties 我有 bootstrap.servers=54.XX.XX.XX:9092

on local machine In /etc/kafka/consumer.properties I have bootstrap.servers=54.XX.XX.XX:9092

在EC2中,启动kafka 'confluent start' 并创建'mytopic'

In the EC2, started kafka 'confluent start' and created 'mytopic'

我在本地机器上运行的 producer.py 代码看起来像(相关部分):

My producer.py code running from local machine looks like (relavant portion):

from confluent_kafka import Producer
broker = '54.XX.XX.XX'
topic = 'mytopic'
    p = Producer({'bootstrap.servers': broker})

    for data in dictList:
        p.poll(0)
        sendme = json.dumps(data)
        p.produce(topic, sendme.encode('utf-8'), callback=delivery_report)

    p.flush()

这似乎将消息写入 EC2 中 kafka 流中的mytopic".我可以在 EC2 上的kafkacat -b 54.XX.XX.XX -t mytopic"中看到这些消息.

This seems to write messages to 'mytopic' in the kafka stream in the EC2. I can see those messages in 'kafkacat -b 54.XX.XX.XX -t mytopic' on the EC2.

但是我无法作为简单的消息打印使用者从本地机器访问这些消息,代码如下:

But I am not able to access those message from local machine as a simple message printing consumer, with code as below:

from confluent_kafka import Consumer, KafkaError, KafkaException
import json
import sys

broker = '54.XX.XX.XX'
topic = 'mytopic'
group = 'mygroup'
     c = Consumer({
         'bootstrap.servers': broker,
         'group.id': group,
         'session.timeout.ms': 6000,
         'default.topic.config': {
             'auto.offset.reset': 'smallest'
         }
     })
     basic_consume_loop(c,[topic])

def basic_consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        while running:
            msg = consumer.poll(timeout=1.0)
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('{} [{}] reached end at offset {}\n'.format(msg.topic(), msg.partition(), msg.offset()))
                    data_process()
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                msg_process(msg)
    finally:
        # Close down consumer to commit final offsets.
        print("Shutting down the consumer")
        consumer.close()

它只是挂起,我错过了任何设置吗?

It just hangs, did I miss any settings?

推荐答案

以下步骤似乎有效.

在本地和 EC2 机器上,在/etc/kakfa/server.properties 设置中

On both, local and EC2 machine, in /etc/kakfa/server.properties set

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://54.XX.XX.XX:9092

在本地机器上,在/etc/kakfa/producer.properties 设置中

On local machine, in /etc/kakfa/producer.properties set

bootstrap.servers=0.0.0.0:9092

在 EC2 机器上,在/etc/kakfa/producer.properties 设置中

On EC2 machine, in /etc/kakfa/producer.properties set

bootstrap.servers=localhost:9092

在本地和 EC2 机器上,在/etc/kakfa/consumer.properties 设置中

On both local and EC2 machine, in /etc/kakfa/consumer.properties set

bootstrap.servers=0.0.0.0:9092
group.id=mygroup

使用confluent-start"在远程 EC2 机器上启动所有必要的守护进程.在本地机器上,Confluent 不会运行.

Use 'confluent-start' to start all necessary daemons on remote EC2 machine. On local machine, Confluent is NOT made running.

在本地机器上(用于 ip 隐藏,可选):

On local machine (for ip hiding, optional):

export KAFKA_PRODUCER_IP=54.XX.XX.XX

有了这个,本地机器的生产者可以通过以下方式将消息放在远程EC2 Kafka上:

With this, producer from local machine,can put messages on remote EC2 Kafka by following:

broker = os.environ['KAFKA_PRODUCER_IP'] + ':9092'
topic = 'mytopic'
p = Producer({'bootstrap.servers': broker})

从本地机器,可以通过以下方式从远程 EC2 kafka 获取消息:

From local machine, messages could be fetched from remote EC2 kafka by following:

broker = os.environ['KAFKA_PRODUCER_IP'] + ':9092'
topic = 'mytopic'
group = 'mygroup'
     c = Consumer({
         'bootstrap.servers': broker,
         'group.id': group,
         'session.timeout.ms': 6000,
         'default.topic.config': {
             'auto.offset.reset': 'smallest'
         }
     })

这些步骤似乎有效.可能会有一些冗余,如果有,请指出.

These steps seems to work. There could be some redundancies, if so, do point out.

这篇关于无法访问来自 EC2 上融合的 kafka 的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆