带有 Python 消费者的 Docker Kafka [英] Docker Kafka w/ Python consumer

查看:30
本文介绍了带有 Python 消费者的 Docker Kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 dockerized Kafka 并编写了一个 Kafka 消费者程序.当我在本地机器上的 docker 和应用程序中运行 Kafka 时,它工作得很好.但是当我在 docker 中配置本地应用程序时,我遇到了问题.该问题可能是由于直到应用程序启动时才创建主题.

I am using dockerized Kafka and written one Kafka consumer program. It works perfectly when I run Kafka in docker and application at my local machine. But when I configured the local application in docker I am facing issues. The issue may be due to a topic not created until time application started.

docker-compose.yml

docker-compose.yml

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  parse-engine:
    build: .
    depends_on:
      - "kafka"
    command: python parse-engine.py
    ports:
     - "5000:5000"

解析引擎.py

from kafka import KafkaConsumer
import json

try:
    print('Welcome to parse engine')
    consumer = KafkaConsumer('test', bootstrap_servers='localhost:9092')
    for message in consumer:
        print(message)
except Exception as e:
    print(e)
    # Logs the error appropriately. 
    pass

错误日志

kafka_1         | [2018-09-21 06:27:17,400] INFO [SocketServer brokerId=1001] Started processors for 1 acceptors (kafka.network.SocketServer)
kafka_1         | [2018-09-21 06:27:17,404] INFO Kafka version : 2.0.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1         | [2018-09-21 06:27:17,404] INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1         | [2018-09-21 06:27:17,431] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)
**parse-engine_1  | Welcome to parse engine
parse-engine_1  | NoBrokersAvailable 
parseengine_parse-engine_1 exited with code 0**
kafka_1         | creating topics: test:1:1

因为我已经在 docker-compose 中添加了 depends_on 属性,但是在开始主题应用程序连接之前,所以发生了错误.

As I already added depends_on property in docker-compose but before starting topic application connecting so error occurred.

我读到我可以在 docker-compose 文件中添加脚本,但我正在寻找一些简单的方法.

I read that I can possible to add the script in the docker-compose file but I am looking for some easy way.

感谢帮助

推荐答案

您的问题是网络问题.在您的 Kafka 配置中,您正在设置

Your problem is the networking. In your Kafka config you're setting

KAFKA_ADVERTISED_HOST_NAME: localhost

但这意味着任何客户端(包括您的 python 应用程序)都将连接到代理,然后代理会告诉代理使用 localhost 进行任何连接.由于来自您的客户端机器(例如您的 Python 容器)的 localhost 不在代理所在的位置,因此请求将失败.

but this means that any client (including your python app) will connect to the broker, and then be told by the broker to use localhost for any connections. Since localhost from your client machine (e.g. your python container) is not where the broker is, requests will fail.

您可以在此处详细阅读有关解决 Kafka 侦听器问题的更多信息

You can read more about fixing problems with your Kafka listeners in detail here

因此,要解决您的问题,您可以执行以下两项操作之一:

So to fix your issue, you can do one of two things:

  1. 只需更改您的撰写以使用 Kafka 的内部主机名(KAFKA_ADVERTISED_HOST_NAME:kafka).这意味着 docker 网络中的任何客户端都可以正常访问它,但是没有外部客户端能够(例如从您的主机):

  1. Simply change your compose to use the internal hostname for Kafka (KAFKA_ADVERTISED_HOST_NAME: kafka). This means any clients within the docker network will be able to access it fine, but no external clients will be able to (e.g. from your host machine):

 version: '3'
 services:
 zookeeper:
     image: wurstmeister/zookeeper
     ports:
     - "2181:2181"
 kafka:
     image: wurstmeister/kafka
     ports:
     - "9092:9092"
     environment:
     KAFKA_ADVERTISED_HOST_NAME: kafka
     KAFKA_CREATE_TOPICS: "test:1:1"
     KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
     volumes:
     - /var/run/docker.sock:/var/run/docker.sock
 parse-engine:
     build: .
     depends_on:
     - "kafka"
     command: python parse-engine.py
     ports:
     - "5000:5000"

然后您的客户将在 kafka:9092 访问代理,因此您的 Python 应用程序将更改为

Your clients would then access the broker at kafka:9092, so your python app would change to

 consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092')

  • 添加一个新的 Kafka 监听器.这使得它可以在 docker 网络内部和外部访问.端口 29092 将用于外部访问 docker 网络(例如从您的主机),而 9092 用于内部访问.

  • Add a new listener to Kafka. This enables it to be accessed both internally and externally to the docker network. Port 29092 would be for access external to the docker network (e.g. from your host), and 9092 for internal access.

    您仍然需要更改您的 python 程序以在正确的地址访问 Kafka.在这种情况下,由于它是 Docker 网络的内部,您可以使用:

    You would still need to change your python program to access Kafka at the correct address. In this case since it's internal to the Docker network, you'd use:

     consumer = KafkaConsumer('test', bootstrap_servers='kafka:9092')
    

    由于我不熟悉 wurstmeister 图像,这个 docker-compose 基于我知道的 Confluent 图像:

    Since I'm not familiar with the wurstmeister images, this docker-compose is based on the Confluent images which I do know:

    (编辑器损坏了我的 yaml,你可以在这里找到)

    (editor has mangled my yaml, you can find it here)

     ---
     version: '2'
     services:
       zookeeper:
         image: confluentinc/cp-zookeeper:latest
         environment:
           ZOOKEEPER_CLIENT_PORT: 2181
           ZOOKEEPER_TICK_TIME: 2000
    
       kafka:
         # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
         # An important note about accessing Kafka from clients on other machines: 
         # -----------------------------------------------------------------------
         #
         # The config used here exposes port 29092 for _external_ connections to the broker
         # i.e. those from _outside_ the docker network. This could be from the host machine
         # running docker, or maybe further afield if you've got a more complicated setup. 
         # If the latter is true, you will need to change the value 'localhost' in 
         # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those 
         # remote clients
         #
         # For connections _internal_ to the docker network, such as from other services
         # and components, use kafka:9092.
         #
         # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
         # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
         #
         image: confluentinc/cp-kafka:latest
         depends_on:
           - zookeeper
         ports:
           - 29092:29092
         environment:
           KAFKA_BROKER_ID: 1
           KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
           KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
           KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
           KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
           KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    


  • 免责声明:我为 Confluent 工作

    这篇关于带有 Python 消费者的 Docker Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆