kafka-python相关内容
我要从一个系统实时传输到另一个系统。 我正在使用kafka-python,可以在本地直播。 计算出连接器将处理多个设备。有没有人能给我提个建议,让我用连接符来实现它? 推荐答案 Kafka Connect是Java框架,不是Python. Kafka Connect运行REST API,您可以使用urllib3或requests与其交互,而不是kafka-python
..
有没有办法先处理优先级高的邮件? 我尝试创建了三个主题‘High’、‘Medium’和‘Low’,并使用一个使用者订阅了所有这三个主题,如果‘High’主题中有未处理的消息,它将暂停其他两个主题。有没有更好的实现消息优先级的方法? 我尝试使用下面给出的逻辑。 topics = ['high', 'medium', 'low'] consumer.subscribe(topics) h
..
我对 kafka 相当陌生,所以如果这个问题微不足道,请原谅我.为了计时测试,我有一个非常简单的设置,如下所示: 机器 A -> 写入主题 1 (Broker) -> 机器 B 从主题 1 读取机器 B -> 将刚刚读取的消息写入主题 2 (Broker) -> 机器 A 从主题 2 中读取 现在我在无限循环中发送大约 1400 字节的消息,很快填满了我的小代理上的空间.我正在尝试为
..
我在 Java 中使用过 Kafka Streams.我在 python 中找不到类似的 API.Apache Kafka 是否支持 Python 中的流处理? 解决方案 Kafka Streams 仅作为 JVM 库提供,但至少有两个 Python 实现它 robinhood/faust(截至 2020 年未维护,但 被分叉) wintincode/winton-kafka-st
..
有什么方法可以通过编程方式找到 Kafka Consumer 中的延迟.我不希望在仪表板上安装和检查外部 Kafka Manager 工具. 我们可以列出所有消费者组并检查每个组的延迟. 目前我们确实有命令来检查延迟,它需要Kafka所在的相对路径. Spring-Kafka、kafka-python、Kafka Admin 客户端或使用 JMX - 有什么方法可以编码并找出延迟
..
我正在尝试在 ubuntu EC2 机器上设置 3 个 Kafka 代理.但是我在启动 zookeeper 时收到 ConnectException.我的 ec2 实例的 security group 中的所有端口都已打开. 下面是堆栈跟踪: [2016-03-03 07:37:12,040] 侦听时出现错误异常 (org.apache.zookeeper.server.quorum.Qu
..
我打算跳过主题的开头,只读取从某个时间戳到结尾的消息.关于如何实现这一目标的任何提示? 解决方案 我猜你在使用 kafka-python (https://github.com/dpkp/kafka-python) 正如你提到的“KafkaConsumer". 您可以使用 offsets_for_times() 方法来检索与时间戳匹配的偏移量.https://kafka-python
..
我正在使用 kafka-python 来使用来自 kafka 队列的消息(kafka 0.10 版).2.0).特别是我使用 KafkaConsumer 类型.如果消费者停止并在一段时间后重新启动,我想从最新生成的消息重新启动,即删除消费者关闭期间生成的所有消息.我怎样才能做到这一点? 谢谢 解决方案 你不会seekToEnd() 到日志的末尾. 请记住,您首先需要订阅主题,然
..
我的接收器属性: {"name": "jdbc-oracle",“配置":{"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","topics": "订单","connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac","connec
..
我有一个应用程序,用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容.我创建了一个有 5 个分区的主题,并且有 5 个 kafka 消费者.但是网页下载的超时时间是 60 秒.当其中一个 url 被下载时,服务器假定消息丢失并将数据重新发送给不同的消费者. 我已经尝试了 中提到的所有内容 Kafka 消费者配置/性能问题 和 https://git
..
我正在做 Python Kafka 消费者(尝试在 http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html).当我运行以下代码时,它会一直运行,即使所有消息都被消耗了.我希望消费者在消费完所有消息后停止.怎么做?我也不知道如何使用 stop() 函数(它在基类 kafka.consumer.base.Cons
..
我们有一个消息调度程序,它会根据消息属性生成一个哈希键,然后将其放置在带有该键的 Kafka 主题队列中. 这样做是为了重复数据删除.但是,我不确定如何在不实际设置本地集群并检查它是否按预期执行的情况下测试此重复数据删除. 在网上搜索用于模拟 Kafka 主题队列的工具没有帮助,我担心我可能会以错误的方式思考这个问题. 最终,无论用于模拟 Kafka 队列的任何内容,都应该以与本
..
场景: 我正在将数据 JSON 对象数据写入 kafka 主题,同时阅读我想根据消息中存在的值仅读取一组特定的消息.我正在使用 kafka-python 库. 示例消息: {flow_status: "completed", value: 1, active: yes}{flow_status:"failure",value 2, active:yes} 这里我只想读取 flow_
..
我在 Java 中使用过 Kafka Streams.我在 python 中找不到类似的 API.Apache Kafka 是否支持 Python 中的流处理? 解决方案 Kafka Streams 仅作为 JVM 库提供,但至少有两个 Python 实现它 robinhood/faust(截至 2020 年未维护,但 被分叉) wintincode/winton-kafka-st
..
我正在使用带通配符的模式订阅 Kafka,如下所示.通配符代表动态客户 ID. consumer.subscribe(pattern='customer.*.validations') 这很有效,因为我可以从主题字符串中提取客户 ID.但是现在我需要扩展功能,以便为稍微不同的目的收听类似的主题.我们称之为customer.*.additional-validations.代码需要存在于同一个项
..
我有一个简单的 JSON 对象,如下所示 d = { 'tag ': 'blah','name' : 'sam','分数':{'row1':100,“第 2 行":200}} 以下是我的python代码,它向Kafka发送消息 from kafka import SimpleProducer, KafkaClient导入json# 同步发送消息kafka = KafkaClient('10.
..
我正在尝试使用基于 this 但我也想使用 kafka 来存储消息.我怎样才能做到这一点? 现在,我使用 this 来制作消费者,不知何故它正在工作,但它只是在控制台上打印我需要在网页上显示消息,就像龙卷风应用程序一样,只有它保存在 kafka 中. 这是我目前的 app.py 代码 #!/usr/bin/env python## 版权所有 2009 Facebook## 根据
..
我正在尝试使用来自 Kafka 主题的消息.我在 confluent_kafka 消费者周围使用了一个包装器.在开始消费消息之前,我需要检查是否建立了连接. 我读到消费者很懒惰,所以我需要执行一些操作才能建立连接.但是我想在不执行consume 或poll 操作的情况下检查连接建立. 此外,我尝试给出一些错误的配置,以查看民意调查的响应.我得到的回应是: b'Broker: 没有更多
..
我一直在使用 python-kaka 模块从 kafka 代理消费.我想从同一主题中并行使用 'x' 个分区.文档有这个: # 使用 0.9 kafka brokers 并行使用多个消费者# 通常你会在不同的服务器/进程/CPU 上运行每个消费者1 = KafkaConsumer('我的主题',group_id='我的组',bootstrap_servers='my.server.com')消费
..
如果我将 Producer 的 Kafka 配置参数设置为: 1.重试 = 32. max.in.flight.requests.per.connection = 5 那么很可能一个分区内的消息可能不在 send_order 中. Kafka 是否采取任何额外步骤来确保分区内的消息仅保持发送顺序或者通过以上配置,是否有可能在一个分区内有乱序的消息? 解决方案 很遗憾,没有.
..