Kafka-Python,生产者发送记录,但消费者未收到记录 [英] Kafka-Python, Producer send record but Consumer don't receive it
问题描述
我在为我的python代码使用kafka时遇到麻烦.我使用python 2.7.5和软件包kafka-python.
I'm having trouble using kafka for my python code. I use python 2.7.5, and the package kafka-python.
我想通过kafka主题发送csv(300000行,每行20个字段).在此之前,我将每个序列化排成一个json文件,直到这里,一切正常.我的生产者发送文件的每一行,然后关闭.但另一方面,我的消费者什么也没消费...
I want to send csv's (300000 rows, 20 fields per row) through a kafka topics. Before that I serialize each row into a json file and up to here, everything works. My Producer sends each row of the files and then close. But on the other side, my consumer doesn't consume anything...
就kafka而言,我有一个带有单个分区的主题.我的kafka和zookeeper实例包含在docker容器中,但不包含在我的消费者或生产者中.
As far as kafka is concerned, I have a single topic with a single partition. My kafka and zookeeper instances are contained in docker containers, but not my consumer or producer.
这是我给生产者的代码:...
Here is my code for the producer: ...
def producer(path) :
producer = KafkaProducer(bootstrap_servers="localhost:9092", retries = 5)
with open(path, newline = '', encoding='utf-8-sig') as csvFile :
reader = csv.DictReader(csvFile, fieldnames = dataElements)
for row in reader :
log = process_row(row)
producer.send(topic = TOPIC, value = json.dumps(log).encode())
producer.flush()
producer.close()
print('processing done')
这是我给消费者的代码:
Here is my code for the consumer:
consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
consumer.subscribe(TOPIC)
for message in consumer:
log = json.loads(message.value.decode())
print(log)
consumer.close()
运行生产者后,我完成了处理".当我运行我的消费者时,我什么也没得到.(我首先经营我的消费者).
I get 'processing done', after running my producer. I don't get anything when I run my consumer. (I run my consumer first).
我阅读了文档,并且可能来自生产者配置.仍然我不确定应该修改哪些参数(linger_ms,batch_size ...?).在我看来,默认值适合我的情况.
I read documentation and It may come from the producer configuration. Still I'm not sure which parameters I should modify (linger_ms, batch_size... ?). It seems to me the default values work in my case.
有什么想法吗?
推荐答案
我使用以下内容弄清楚了: https://github.com/wurstmeister/kafka-docker/wiki/Connectivity
I figured it out using the following contents : https://www.kaaproject.org/blog/kafka-docker https://github.com/wurstmeister/kafka-docker/wiki/Connectivity
它要求在docker-compose.yml中添加一些环境变量,例如KAFKA_ADVERTISED_HOST,以便客户端可以从docker主机外部连接到kafka代理.
It recquire to add some environment variables like KAFKA_ADVERTISED_HOST, in the docker-compose.yml, so that the clients can connect to the kafka broker from outside the docker host.
这篇关于Kafka-Python,生产者发送记录,但消费者未收到记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!