如何使用 kafka-python 订阅多个 kafka 通配符模式的列表? [英] How to subscribe to a list of multiple kafka wildcard patterns using kafka-python?

查看:164
本文介绍了如何使用 kafka-python 订阅多个 kafka 通配符模式的列表?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用带通配符的模式订阅 Kafka,如下所示.通配符代表动态客户 ID.

consumer.subscribe(pattern='customer.*.validations')

这很有效,因为我可以从主题字符串中提取客户 ID.但是现在我需要扩展功能,以便为稍微不同的目的收听类似的主题.我们称之为customer.*.additional-validations.代码需要存在于同一个项目中,因为共享了这么多功能,但我需要能够根据队列的类型采用不同的路径.

Kafka 文档中,我可以看到可以订阅到一系列主题.然而,这些是硬编码的字符串.不是允许灵活性的模式.

<预><代码>>>># 反序列化 msgpack 编码的值>>>消费者 = KafkaConsumer(value_deserializer=msgpack.loads)>>>消费者.订阅(['msgpackfoo'])>>>对于消费者中的味精:...断言 isinstance(msg.value, dict)

所以我想知道是否有可能以某种方式将两者结合起来?有点像这样(非工作):

consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])

解决方案

在KafkaConsumer代码中,支持主题列表,或者模式,

https://github.com/dpkp/kafka-python/blob/68c8fa4ad01f8fef38708f257cb1c261cfac01ab/kafka/consumer/group.py#L717

 def subscribe(self, topic=(), pattern=None, listener=None):"""订阅主题列表,或主题正则表达式模式分区将通过组协调器动态分配.主题订阅不是增量的:此列表将替换当前任务(如果有).

因此,您可以使用 | 创建一个带有 OR 条件的正则表达式,它应该像订阅多个动态主题正则表达式一样工作,因为它在内部使用 re 模块进行匹配.

(customer.*.validations)|(customer.*.additional-validations)

I'm subscribing to Kafka using a pattern with a wildcard, as shown below. The wildcard represents a dynamic customer id.

consumer.subscribe(pattern='customer.*.validations')

This works well, because I can pluck the customer Id from the topic string. But now I need to expand on the functionality to listen to a similar topic for a slightly different purpose. Let's call it customer.*.additional-validations. The code needs to live in the same project because so much functionality is shared, but I need to be able to take a different path based on the type of queue.

In the Kafka documentation I can see that it is possible to subscribe to an array of topics. However these are hard-coded strings. Not patterns that allow for flexibility.

>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
...     assert isinstance(msg.value, dict)

So I'm wondering if it is possible to somehow do a combination of the two? Kind of like this (non-working):

consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])

解决方案

In the KafkaConsumer code, it supports list of topics, or a pattern,

https://github.com/dpkp/kafka-python/blob/68c8fa4ad01f8fef38708f257cb1c261cfac01ab/kafka/consumer/group.py#L717

   def subscribe(self, topics=(), pattern=None, listener=None):
        """Subscribe to a list of topics, or a topic regex pattern
        Partitions will be dynamically assigned via a group coordinator.
        Topic subscriptions are not incremental: this list will replace the
        current assignment (if there is one).

So you can create a regex, with OR condition using |, that should work as subscribe to multiple dynamic topics regex, as it internally uses re module for matching.

(customer.*.validations)|(customer.*.additional-validations)

这篇关于如何使用 kafka-python 订阅多个 kafka 通配符模式的列表?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆