kafka-python相关内容

如何使用Kafka与JDBC接收器连接,并使用Python进行源连接

我要从一个系统实时传输到另一个系统。 我正在使用kafka-python,可以在本地直播。 计算出连接器将处理多个设备。有没有人能给我提个建议,让我用连接符来实现它? 推荐答案 Kafka Connect是Java框架,不是Python. Kafka Connect运行REST API,您可以使用urllib3或requests与其交互,而不是kafka-python ..
发布时间:2022-05-26 09:07:16 其他开发

如何阅读和处理卡夫卡消费者的高优先级消息?

有没有办法先处理优先级高的邮件? 我尝试创建了三个主题‘High’、‘Medium’和‘Low’,并使用一个使用者订阅了所有这三个主题,如果‘High’主题中有未处理的消息,它将暂停其他两个主题。有没有更好的实现消息优先级的方法? 我尝试使用下面给出的逻辑。 topics = ['high', 'medium', 'low'] consumer.subscribe(topics) h ..

Kafka最优保留和删除策略

我对 kafka 相当陌生,所以如果这个问题微不足道,请原谅我.为了计时测试,我有一个非常简单的设置,如下所示: 机器 A -> 写入主题 1 (Broker) -> 机器 B 从主题 1 读取机器 B -> 将刚刚读取的消息写入主题 2 (Broker) -> 机器 A 从主题 2 中读取 现在我在无限循环中发送大约 1400 字节的消息,很快填满了我的小代理上的空间.我正在尝试为 ..

2019 年 8 月 - Kafka Consumer Lag 以编程方式

有什么方法可以通过编程方式找到 Kafka Consumer 中的延迟.我不希望在仪表板上安装和检查外部 Kafka Manager 工具. 我们可以列出所有消费者组并检查每个组的延迟. 目前我们确实有命令来检查延迟,它需要Kafka所在的相对路径. Spring-Kafka、kafka-python、Kafka Admin 客户端或使用 JMX - 有什么方法可以编码并找出延迟 ..
发布时间:2021-11-12 03:17:17 其他开发

Python KafkaConsumer 从时间戳开始消费消息

我打算跳过主题的开头,只读取从某个时间戳到结尾的消息.关于如何实现这一目标的任何提示? 解决方案 我猜你在使用 kafka-python (https://github.com/dpkp/kafka-python) 正如你提到的“KafkaConsumer". 您可以使用 offsets_for_times() 方法来检索与时间戳匹配的偏移量.https://kafka-python ..

消费者重新启动后,kafka-python 从最后生成的消息中读取

我正在使用 kafka-python 来使用来自 kafka 队列的消息(kafka 0.10 版).2.0).特别是我使用 KafkaConsumer 类型.如果消费者停止并在一段时间后重新启动,我想从最新生成的消息重新启动,即删除消费者关闭期间生成的所有消息.我怎样才能做到这一点? 谢谢 解决方案 你不会seekToEnd() 到日志的末尾. 请记住,您首先需要订阅主题,然 ..
发布时间:2021-11-12 02:51:39 Python

如何强制消费者读取kafka中的特定分区

我有一个应用程序,用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容.我创建了一个有 5 个分区的主题,并且有 5 个 kafka 消费者.但是网页下载的超时时间是 60 秒.当其中一个 url 被下载时,服务器假定消息丢失并将数据重新发送给不同的消费者. 我已经尝试了 中提到的所有内容 Kafka 消费者配置/性能问题 和 https://git ..
发布时间:2021-11-12 02:48:42 其他开发

Python:如何模拟单元测试的 kafka 主题?

我们有一个消息调度程序,它会根据消息属性生成一个哈希键,然后将其放置在带有该键的 Kafka 主题队列中. 这样做是为了重复数据删除.但是,我不确定如何在不实际设置本地集群并检查它是否按预期执行的情况下测试此重复数据删除. 在网上搜索用于模拟 Kafka 主题队列的工具没有帮助,我担心我可能会以错误的方式思考这个问题. 最终,无论用于模拟 Kafka 队列的任何内容,都应该以与本 ..
发布时间:2021-11-12 02:44:31 Python

如何使用 kafka-python 订阅多个 kafka 通配符模式的列表?

我正在使用带通配符的模式订阅 Kafka,如下所示.通配符代表动态客户 ID. consumer.subscribe(pattern='customer.*.validations') 这很有效,因为我可以从主题字符串中提取客户 ID.但是现在我需要扩展功能,以便为稍微不同的目的收听类似的主题.我们称之为customer.*.additional-validations.代码需要存在于同一个项 ..
发布时间:2021-11-12 02:23:59 Python

如何在龙卷风上使用 kafka?

我正在尝试使用基于 this 但我也想使用 kafka 来存储消息.我怎样才能做到这一点? 现在,我使用 this 来制作消费者,不知何故它正在工作,但它只是在控制台上打印我需要在网页上显示消息,就像龙卷风应用程序一样,只有它保存在 kafka 中. 这是我目前的 app.py 代码 #!/usr/bin/env python## 版权所有 2009 Facebook## 根据 ..
发布时间:2021-11-12 02:20:16 Python

如何以编程方式检查 Kafka Broker 是否已启动并在 Python 中运行

我正在尝试使用来自 Kafka 主题的消息.我在 confluent_kafka 消费者周围使用了一个包装器.在开始消费消息之前,我需要检查是否建立了连接. 我读到消费者很懒惰,所以我需要执行一些操作才能建立连接.但是我想在不执行consume 或poll 操作的情况下检查连接建立. 此外,我尝试给出一些错误的配置,以查看民意调查的响应.我得到的回应是: b'Broker: 没有更多 ..
发布时间:2021-11-12 02:14:26 Python

kafka-python 中的多处理

我一直在使用 python-kaka 模块从 kafka 代理消费.我想从同一主题中并行使用 'x' 个分区.文档有这个: # 使用 0.9 kafka brokers 并行使用多个消费者# 通常你会在不同的服务器/进程/CPU 上运行每个消费者1 = KafkaConsumer('我的主题',group_id='我的组',bootstrap_servers='my.server.com')消费 ..
发布时间:2021-11-12 02:14:18 其他开发

Kafka 是否保证具有任何配置参数值的单个分区内的消息排序?

如果我将 Producer 的 Kafka 配置参数设置为: 1.重试 = 32. max.in.flight.requests.per.connection = 5 那么很可能一个分区内的消息可能不在 send_order 中. Kafka 是否采取任何额外步骤来确保分区内的消息仅保持发送顺序或者通过以上配置,是否有可能在一个分区内有乱序的消息? 解决方案 很遗憾,没有. ..
发布时间:2021-11-12 02:13:57 其他开发