带有 Python 消费者的 Docker Kafka [英] Docker Kafka w/ Python consumer
问题描述
我正在使用 dockerized Kafka 并编写了一个 Kafka 消费者程序.当我在本地机器上的 docker 和应用程序中运行 Kafka 时,它工作得很好.但是当我在 docker 中配置本地应用程序时,我遇到了问题.该问题可能是由于直到应用程序启动时才创建主题.
I am using dockerized Kafka and written one Kafka consumer program. It works perfectly when I run Kafka in docker and application at my local machine. But when I configured the local application in docker I am facing issues. The issue may be due to a topic not created until time application started.
docker-compose.yml
docker-compose.yml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
parse-engine:
build: .
depends_on:
- "kafka"
command: python parse-engine.py
ports:
- "5000:5000"
解析引擎.py
from kafka import KafkaConsumer
import json
try:
print('Welcome to parse engine')
consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092')
for message in consumer:
print(message)
except Exception as e:
print(e)
# Logs the error appropriately.
pass
错误日志
kafka_1 | [2018-09-21 06:27:17,400] INFO [SocketServer brokerId=1001] Started processors for 1 acceptors (kafka.network.SocketServer)
kafka_1 | [2018-09-21 06:27:17,404] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1 | [2018-09-21 06:27:17,404] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1 | [2018-09-21 06:27:17,431] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)
**parse-engine_1 | Welcome to parse engine
parse-engine_1 | NoBrokersAvailable
parseengine_parse-engine_1 exited with code 0**
kafka_1 | creating topics: test:1:1
因为我已经在 docker-compose 中添加了 depends_on 属性,但是在开始主题应用程序连接之前,所以发生了错误.
As I already added depends_on property in docker-compose but before starting topic application connecting so error occurred.
我读到我可以在 docker-compose 文件中添加脚本,但我正在寻找一些简单的方法.
I read that I can possible to add the script in the docker-compose file but I am looking for some easy way.
感谢帮助
推荐答案
您的问题是网络问题.在您的 Kafka 配置中,您正在设置
Your problem is the networking. In your Kafka config you're setting
KAFKA_ADVERTISED_HOST_NAME: localhost
但这意味着任何客户端(包括您的 python 应用程序)都将连接到代理,然后代理会告诉代理使用 localhost
进行任何连接.由于来自您的客户端机器(例如您的 Python 容器)的 localhost 不在代理所在的位置,因此请求将失败.
but this means that any client (including your python app) will connect to the broker, and then be told by the broker to use localhost
for any connections. Since localhost from your client machine (e.g. your python container) is not where the broker is, requests will fail.
您可以在此处详细阅读有关解决 Kafka 侦听器问题的更多信息
You can read more about fixing problems with your Kafka listeners in detail here
因此,要解决您的问题,您可以执行以下两项操作之一:
So to fix your issue, you can do one of two things:
只需更改您的撰写以使用 Kafka 的内部主机名(
KAFKA_ADVERTISED_HOST_NAME:kafka
).这意味着 docker 网络中的任何客户端都可以正常访问它,但是没有外部客户端能够(例如从您的主机):
Simply change your compose to use the internal hostname for Kafka (
KAFKA_ADVERTISED_HOST_NAME: kafka
). This means any clients within the docker network will be able to access it fine, but no external clients will be able to (e.g. from your host machine):
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
parse-engine:
build: .
depends_on:
- "kafka"
command: python parse-engine.py
ports:
- "5000:5000"
然后您的客户将在 kafka:9092 访问代理,因此您的 Python 应用程序将更改为
Your clients would then access the broker at kafka:9092, so your python app would change to
consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092')
添加一个新的 Kafka 监听器.这使得它可以在 docker 网络内部和外部访问.端口 29092 将用于外部访问 docker 网络(例如从您的主机),而 9092 用于内部访问.
Add a new listener to Kafka. This enables it to be accessed both internally and externally to the docker network. Port 29092 would be for access external to the docker network (e.g. from your host), and 9092 for internal access.
您仍然需要更改您的 python 程序以在正确的地址访问 Kafka.在这种情况下,由于它是 Docker 网络的内部,您可以使用:
You would still need to change your python program to access Kafka at the correct address. In this case since it's internal to the Docker network, you'd use:
consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092')
由于我不熟悉 wurstmeister
图像,这个 docker-compose 基于我知道的 Confluent 图像:
Since I'm not familiar with the wurstmeister
images, this docker-compose is based on the Confluent images which I do know:
(编辑器损坏了我的 yaml,你可以在这里找到)
(editor has mangled my yaml, you can find it here)
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
# An important note about accessing Kafka from clients on other machines:
# -----------------------------------------------------------------------
#
# The config used here exposes port 29092 for _external_ connections to the broker
# i.e. those from _outside_ the docker network. This could be from the host machine
# running docker, or maybe further afield if you've got a more complicated setup.
# If the latter is true, you will need to change the value 'localhost' in
# KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
# remote clients
#
# For connections _internal_ to the docker network, such as from other services
# and components, use kafka:9092.
#
# See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
#
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
免责声明:我为 Confluent 工作
这篇关于带有 Python 消费者的 Docker Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!