Kafka-Python,生产者发送记录,但消费者未收到记录 [英] Kafka-Python, Producer send record but Consumer don't receive it

查看:128
本文介绍了Kafka-Python,生产者发送记录,但消费者未收到记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在为我的python代码使用kafka时遇到麻烦.我使用python 2.7.5和软件包kafka-python.

I'm having trouble using kafka for my python code. I use python 2.7.5, and the package kafka-python.

我想通过kafka主题发送csv(300000行,每行20个字段).在此之前,我将每个序列化排成一个json文件,直到这里,一切正常.我的生产者发送文件的每一行,然后关闭.但另一方面,我的消费者什么也没消费...

I want to send csv's (300000 rows, 20 fields per row) through a kafka topics. Before that I serialize each row into a json file and up to here, everything works. My Producer sends each row of the files and then close. But on the other side, my consumer doesn't consume anything...

就kafka而言,我有一个带有单个分区的主题.我的kafka和zookeeper实例包含在docker容器中,但不包含在我的消费者或生产者中.

As far as kafka is concerned, I have a single topic with a single partition. My kafka and zookeeper instances are contained in docker containers, but not my consumer or producer.

这是我给生产者的代码:...

Here is my code for the producer: ...

def producer(path) :
    producer = KafkaProducer(bootstrap_servers="localhost:9092", retries = 5)

    with open(path, newline = '', encoding='utf-8-sig') as csvFile :
        reader = csv.DictReader(csvFile, fieldnames = dataElements)
        for row in reader :
            log = process_row(row)
            producer.send(topic = TOPIC, value = json.dumps(log).encode())
    producer.flush()
    producer.close()
    print('processing done')

这是我给消费者的代码:

Here is my code for the consumer:

consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
consumer.subscribe(TOPIC)
for message in consumer:
    log = json.loads(message.value.decode())
    print(log)
consumer.close()

运行生产者后,我完成了处理".当我运行我的消费者时,我什么也没得到.(我首先经营我的消费者).

I get 'processing done', after running my producer. I don't get anything when I run my consumer. (I run my consumer first).

我阅读了文档,并且可能来自生产者配置.仍然我不确定应该修改哪些参数(linger_ms,batch_size ...?).在我看来,默认值适合我的情况.

I read documentation and It may come from the producer configuration. Still I'm not sure which parameters I should modify (linger_ms, batch_size... ?). It seems to me the default values work in my case.

有什么想法吗?

推荐答案

我使用以下内容弄清楚了: https://github.com/wurstmeister/kafka-docker/wiki/Connectivity

I figured it out using the following contents : https://www.kaaproject.org/blog/kafka-docker https://github.com/wurstmeister/kafka-docker/wiki/Connectivity

它要求在docker-compose.yml中添加一些环境变量,例如KAFKA_ADVERTISED_HOST,以便客户端可以从docker主机外部连接到kafka代理.

It recquire to add some environment variables like KAFKA_ADVERTISED_HOST, in the docker-compose.yml, so that the clients can connect to the kafka broker from outside the docker host.

这篇关于Kafka-Python,生产者发送记录,但消费者未收到记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆