kafka-producer-api相关内容

如何为托管在 Kubernetes 中的多个应用程序选择 Kafka 事务 ID?

我有一个经典的微服务架构.所以,有不同的应用.每个应用程序可能有 1..N 个实例.系统部署到Kubernetes. 所以,我们有很多不同的PODs,可以随时启动和停止. 我想实现 read-process-write 模式,所以我需要卡夫卡交易. 要配置事务,我需要为每个 Kafka 生产者设置一些 transaction id.(实际上,我需要 transaction-id-pre ..

无法使用 kafka-avro-console-consumer 读取 avro 消息.SerializationException:未知的魔法字节

我正在编写一个 REST 代理,比如融合的休息代理.它接受一个 JSON 负载、模式主题和 id,然后将 JSON 负载作为 Avro 对象写入流中.当我使用 kafka-avro-console-consumer 读取消息时,出现“未知幻字节"错误. 这是我的 kafka 生产者配置: properties.put("client.id", LocalHostUtils.getLoca ..

Kafka Producer:使用回调处理异步发送中的异常

我需要在异步发送到 Kafka 的情况下捕获异常.Kafka 生产者 Api 带有一个函数 send(ProducerRecord record, Callback callback).但是当我针对以下两种情况对此进行测试时: Kafka Broker 关闭 主题未预先创建回调没有被调用.相反,我在代码中收到了发送失败的警告(如下所示). 问题: 那么回调是否只针对特定异常调用 ..
发布时间:2021-11-12 02:49:56 其他开发

spring kafka 模板生产者表现

我正在使用 Spring Kafka 模板来生成消息.而且它产生消息的速度太慢了.生成 15000 条消息大约需要 8 分钟. 以下是我如何创建 Kafka 模板: @Bean公共 ProducerFactory高速AvroProducerFactory(@Qualifier("highSpeedProducerProperties") KafkaProperties 属性) {最终映射 ..
发布时间:2021-11-12 02:48:01 其他开发

消费者和生产者失败并出现错误:“在读取响应之前断开了与 0 的连接"

我有一个由 3 个 Kafka 代理组成的集群,所有主题的复制因子为 3.由于最近几天我面临这个问题,突然(一天中几次)消费者和生产者在获得响应时卡住了,即使 Kafka 正在所有 3 个服务器上运行,直到我检查代理日志(“连接到 0 已断开连接)在响应被读取之前")并找出罪魁祸首节点为 0(在本例中为第一个节点)并在该节点上重新启动 zookeeper 和 broker. 根据日志,这是由 ..

如何在消费者组kafka中动态添加消费者

我应该如何知道何时必须扩展消费者组中的消费者.有快速生产者时,消费者扩大规模的触发因素是什么? 解决方案 一个直接的方法是获取消费者滞后(这可以计算为提交偏移量和开始偏移量之间的差异),如果滞后在最后一个计算n 倍增加,您可以扩大规模,反之亦然.您可能需要考虑一些边缘情况,例如,如果消费者数量下降,延迟会增加,并且自动缩放功能可能会产生更多线程/机器). ..
发布时间:2021-11-12 02:46:52 其他开发

Kafka 分区中的消息分布不均

我的主题有 10 个分区,1 个消费者组有 4 个消费者,工作线程大小为 3. 我可以看到分区中的消息分布不均匀,一个分区有很多数据,另一个是空闲的. 如何让我的生产者将负载平均分配到所有分区,从而使所有分区都得到正确利用? 解决方案 根据 DefaultPartitioner 类本身的 JavaDoc 注释,默认分区策略为: 如果记录中指定了分区,则使用它. 如果未指 ..
发布时间:2021-11-12 02:45:16 其他开发

kafka 是否有任何默认的 Web UI

我有几个关于 Kafka 的问题. 1) Kafka 是否有默认的 Web UI? 2) 我们如何优雅地关闭独立的 kafka 服务器,kafka 控制台-消费者/控制台生产者. 任何解决方案将不胜感激. 谢谢. 解决方案 1) 没有 Kafka 没有默认 UI. 然而,有许多第三方工具可以以图形方式显示 Kafka 资源.只需在 Google 上搜索 kaf ..
发布时间:2021-11-12 02:44:40 其他开发

保留期后的卡夫卡偏移

我有一个带有 1 个分区的 kafka 主题.如果其中有 100 条消息,则偏移量将从 0.99 开始. 根据 kafka 保留政策,所有消息将在指定时间段后被清除. 一旦所有消息都被清除(保留期后),我将向该主题发送 100 条新消息.现在,消息的新偏移量从哪里开始?是从 100 开始还是从 0 开始?? 我想了解新的偏移量是 100-199 还是 0-99? 解决方案 ..
发布时间:2021-11-12 02:44:34 其他开发

Kafka - 代理:消息大小太大

当我尝试发送超过 1 Mb 的消息时,我收到 Message size too large 异常.当我尝试生成消息时,错误出现在我的客户端应用程序中.经过一番谷歌搜索后,我发现应该更改设置以增加最大消息大小.好吧,我在 /kafka/config/server.properties 文件中做到了.我添加了接下来的 2 个设置: message.max.bytes=15728640replica. ..
发布时间:2021-11-12 02:44:28 其他开发

如果我的生产者生产,那为什么消费者不能消费?它卡住了@ poll()

我正在发布到远程 kafka 服务器并尝试使用来自该远程服务器的消息.(卡夫卡 v 0.90.1)发布工作正常,但消费也不行. 出版商 package org.test;导入 java.io.IOException;导入 java.util.Properties;导入 org.apache.kafka.clients.producer.KafkaProducer;导入 org.apache ..

我可以根据Kafka的特定条件消费吗?

我正在向 Kafka 写入一个 msg 并在另一端使用它.在其中做一些处理并将其写回另一个 Kafka 主题. 我想知道哪个消息响应针对哪个请求.. 当前决定从消费者端捕获偏移 id,然后写入响应并读取响应负载并决定相同. 对于这种方法,我们需要读取每条消息,还有其他方法可以根据消费者配置条件消费吗? 解决方案 消费者只能阅读整个主题.您只能通过 seek() 跳过消息, ..
发布时间:2021-11-12 02:41:27 其他开发

为 kafka 消息实现归档

我最近开始使用 Kafka,并针对少数用例评估了 Kafka. 如果我们想根据消息内容为消费者(订阅者)提供过滤消息的功能,那么最好的方法是什么? 假设生产商公开了一个名为“交易"的主题,该主题具有不同的交易详细信息,例如市场名称、创建日期、价格等. 一些消费者对特定市场的交易感兴趣,而另一些消费者则对特定日期后的交易感兴趣(基于内容的过滤) 由于代理端无法进行过滤,因此实 ..
发布时间:2021-11-12 02:41:18 其他开发

当我重新启动我的 kafka 代理时,为什么或如何丢失很少的消息?

我在单节点中运行 kafka,我想在关闭我的 kafka 代理时看到 kafka Producer 的行为,然后我在几秒钟内重新启动我的代理,所以我创建了 Spring Boot 项目,我可能会在其中发送 1000 个客户 JSON 对象和打印每次发送的 JSON 对象的偏移量.我的应用程序工作正常,但是当我关闭我的 kafka 代理时,几秒钟后我重新启动我的代理,我的生产者返回以正常发送来自最新 ..