如何强制消费者读取kafka中的特定分区 [英] How to force a consumer to read a specific partition in kafka

查看:50
本文介绍了如何强制消费者读取kafka中的特定分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个应用程序,用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容.我创建了一个有 5 个分区的主题,并且有 5 个 kafka 消费者.但是网页下载的超时时间是 60 秒.当其中一个 url 被下载时,服务器假定消息丢失并将数据重新发送给不同的消费者.

我已经尝试了

中提到的所有内容

Kafka 消费者配置/性能问题

https://github.com/spring-projects/spring-kafka/issues/202

但我每次都会收到不同的错误.

是否可以将特定消费者与 kafka 中的分区绑定在一起?我正在为我的应用程序使用 kafka-python

解决方案

我错过了 Kafka-python 的文档.我们可以使用 TopicPartition 类为特定的消费者分配一个分区.

http://kafka-python.readthedocs.io/en/master/

<预><代码>>>># 手动为消费者分配分区列表>>>从 kafka 导入 TopicPartition>>>消费者 = KafkaConsumer(bootstrap_servers='localhost:1234')>>>consumer.assign([TopicPartition('foobar', 2)])>>>msg = 下一个(消费者)

I have an application for downloading specific web-content, from a stream of URL's generated from 1 Kafka-producer. I've created a topic with 5 partitions and there are 5 kafka-consumers. However the timeout for the webpage download is 60 seconds. While one of the url is getting downloaded, the server assumes that the message is lost and resends the data to different consumers.

I've tried everything mentioned in

Kafka consumer configuration / performance issues

and

https://github.com/spring-projects/spring-kafka/issues/202

But I keep getting different errors everytime.

Is it possible to tie a specific consumer with a partition in kafka? I am using kafka-python for my application

解决方案

I missed on the documentation of Kafka-python. We can use TopicPartition class to assign a specific consumer with one partition.

http://kafka-python.readthedocs.io/en/master/

>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)

这篇关于如何强制消费者读取kafka中的特定分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆