kafka-consumer-api相关内容

设计 Kafka 消费者和生产者以实现可扩展性

我想设计一个解决方案,用于向多个提供商发送不同类型的电子邮件.总体概述. 我有几个上游提供商 Sendgrid、Zoho、Mailgun 等.它们将用于发送电子邮件等.例如: 注册新用户的电子邮件 删除用户的电子邮件 空间配额限制的电子邮件 (一般来说大约有 6 种类型的电子邮件) 每种类型的电子邮件都应该生成到生产者中,转换成序列化的 Java 对象并发送到与上游提供 ..
发布时间:2021-11-17 02:44:31 其他开发

Zookeeper 如何从 __consumer_offsets 主题中检索消费者偏移量?

这是“Zookeeper 在哪里做的后续问题存储Kafka集群和相关信息?"基于 Armando Ballaci 提供的答案. 现在很明显,消费者偏移量存储在 Kafka 集群中一个名为 __consumer_offsets 的特殊主题中.没关系,我只是想知道这些偏移量的检索是如何工作的. Topic 不像 RDBS,我们可以基于某个谓词查询任意数据.例如 - 如果数据存储在 RDBM ..
发布时间:2021-11-15 00:06:09 其他开发

zookeeper 在哪里存放 kafka 集群和相关信息?

通过说集群信息,我指的是像 这样的信息 订阅的消费者/消费者群体 读取和提交的偏移量 分区的领导者和追随者 服务器上的主题等 zookeeper 是将这些信息保存在它自己的数据库中(尽管我从来没有听说过 Zookeeper 有任何自己的数据库)还是将这些信息存储在 Kafka 集群中的某些主题等? 编辑:以及后续问题:Zookeeper 如何从 __consumer_offs ..
发布时间:2021-11-15 00:05:53 其他开发

Kafka 如何使用所需的偏移量提交消费者偏移量

我有 Kafka Stream 应用程序.我的应用程序正在成功处理事件. 如何使用所需的偏移量更改 Kafka 提交的消费者偏移量以重新处理/跳过事件.我尝试了如何更改主题的起始偏移量?.但我收到“节点不存在:"错误.请帮帮我. 解决方案 您所指的问题/答案基于较旧的 Kafka 版本.从 Kafka 0.9 开始,偏移量不再提交给 ZooKeeper,而是存储在一个名为 offse ..
发布时间:2021-11-15 00:05:08 其他开发

消费者和生产者因错误而失败:“在读取响应之前断开了与 0 的连接"

我有一个由 3 个 Kafka 代理组成的集群,所有主题的复制因子为 3.由于最近几天我面临这个问题,突然(一天几次)消费者和生产者在得到响应时卡住了,即使 Kafka 在所有 3 个服务器上运行,直到我检查代理日志(“连接到 0 已断开连接在响应被读取之前")并找出罪魁祸首节点为 0(在本例中为第一个节点)并在该节点上重新启动 zookeeper 和 broker. 根据日志,这是由于重新 ..
发布时间:2021-11-15 00:03:59 其他开发

使用 __consumer_offsets 杀死节点导致消费者没有消息消费

我有 3 个节点(nodes0、node1、node2)Kafka 集群(broker0、broker1、broker2),复制因子为 2,Zookeeper(使用与 Kafka tar 一起打包的 zookeeper)在不同的节点(节点 4)上运行. 我在启动 zookper 和剩余节点后启动了代理 0.在 broker 0 日志中可以看到它正在读取 __consumer_offsets ..
发布时间:2021-11-15 00:00:18 其他开发

kafka消费机器需要运行zookeeper吗?

所以我的问题是:如果我有一台运行 Kafka(和 zookeeper)的服务器,而另一台机器只消费消息,那么消费者机器是否也需要运行 zookeeper?还是服务器负责所有? 解决方案 没有 Zookeeper 在 Kafka 中的作用是: 代理注册:(集群成员资格)具有心跳机制以保持列表最新 存储主题配置:存在哪些主题,每个主题有多少个分区有,副本在哪里,谁是首选领导者,I ..
发布时间:2021-11-14 23:58:24 其他开发

Kafka:如何连接 kafka-console-consumer 以获取远程代理主题内容?

我在 ec2 上的一台机器上设置了一个 kafka zookeeper 和 3 个代理,端口为 9092..9094,我正在尝试使用另一台机器上的主题内容.端口 2181 (zk)、9092、9093 和 9094(服务器)对消费者机器开放.我什至可以做一个 bin/kafka-topics.sh --describe --zookeeper 172.X.X.X:2181 --topic remo ..
发布时间:2021-11-14 23:55:35 其他开发

如何从默认设置中设置 spoutconfig?

我正在尝试使用图形 API 获取 fb 页面数据.每个帖子的大小超过 1MB,其中 kafka 默认 fetch.message 为 1MB.通过在 kafa consumer.properties 和 server.properties 文件中添加以下行,我已将 kafka 属性从 1MB 更改为 3MB. fetch.message.max.bytes=3048576 (consumer.p ..
发布时间:2021-11-14 23:39:00 其他开发

如何在Nifi中查看Kafka的消费消息?

我已经启动了一个 Nifi 进程(消费 Kafka)并将其连接到一个主题.它正在运行,但我无法(不知道)在哪里可以查看消息? 解决方案 ConsumeKafka 处理器运行并为每条消息生成流文件.只有当您将处理器连接到其他组件(如另一个处理器或输出端口)时,您才能可视化正在传输的数据. 对于初学者,你可以试试这个: 将 ConsumeKafka 与 LogAttribute 或 ..
发布时间:2021-11-12 03:50:48 其他开发

具有解码器问题的 Kafka Avro 消费者

当我尝试使用我各自的架构对数据运行 Kafka Consumer with Avro 时,它返回一个错误 "AvroRuntimeException: Malformed data. Length is negative: -40" .我看到其他人也有类似的问题 将字节数组转换为 json,Avro 写入和读取,以及 Kafka Avro Binary *coder.我还参考了这个 Consume ..
发布时间:2021-11-12 03:50:27 其他开发

Kafka Streams API:会话窗口异常

我正在尝试创建一个 Kafka 拓扑并将其分解为更具可读性的内容.我有一个按键分组的流,然后我试图像这样打开它: SessionWindowedKStream窗口表 =groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds)).grace(Duration.ZERO));KT ..
发布时间:2021-11-12 03:45:01 其他开发

如何转换/分叉 Kafka 流并将其发送到特定主题?

我正在尝试使用函数“mapValues"将在我的原始流“textlines"中获得的字符串值转换为 newStream.然后将我在 newStream 中获得的任何内容流式传输到名为“testoutput"的主题上.但是每次消息实际上通过转换块时,我都会收到一个 NullPointerException,错误仅指向 kafka 流库.不知道发生了什么:(( 附言当我从原始流分叉/创建新的 k ..
发布时间:2021-11-12 03:44:05 其他开发

Apache kafka 选项中的高、中、低重要性级别有什么区别?

我想知道关于 kafka 选项 Importnace 级别的区别.Apache kafka 有 3 个级别. org.apache.kafka.commong.config.ConfigDef.Importance 公共枚举重要性{高、中、低} 这三个有什么区别? 解决方案 此枚举用作 Kafka 社区的指示器,为用户在许多配置上提供一些指导.您将在配置说明中看到它们. 没 ..
发布时间:2021-11-12 03:43:01 其他开发

事件计数的窗口聚合

我已经对我的 kafka 事件进行了分组: private static void createImportStream(final StreamsBuilder builder, final Collection 主题) {最终 KStream流 = builder.stream(topics, Consumed.with(Serdes.ByteArray(), new UserEventT ..
发布时间:2021-11-12 03:42:40 其他开发

无法描述 Kafka Streams Consumer Group

我想要实现的是确保我的 Kafka 流消费者没有延迟. 我有一个简单的 Kafka 流应用程序,它以 GlobalKTable 的形式将一个主题具体化为存储. 当我尝试通过命令在 Kafka 上描述消费者时: kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-applicatio ..
发布时间:2021-11-12 03:42:34 其他开发

Kafkastreams.allMetadata() 方法返回空列表

所以我正在尝试使用 Kafka 流进行交互式查询.我有 Zookeeper 和 Kafka 在本地运行(在 Windows 上).我使用 C:\temp 作为存储文件夹,用于 Zookeeper 和 Kafka. 我已经设置了这样的主题 kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 - ..
发布时间:2021-11-12 03:40:47 其他开发

Kafka Stream 在重新平衡时重新处理旧消息

我有一个 Kafka Streams 应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题. 这是我的Kafka集群的配置: 5 个 Kafka 经纪人Kafka 主题 - 15 个分区和复制因子 3. 我的 Kafka Streams 应用程序与我的 Kafka 代理运行在同一台机器上. 每小时消耗/产生几百万条记录.每当我关闭代理时,应用程序都会进入重新平衡状态,并且在 ..
发布时间:2021-11-12 03:40:13 其他开发

我可以在与 Kafka Broker 相同的机器上运行 Kafka Streams 应用程序吗?

我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据连接起来并将其放入另一个主题中. Kafka 配置: 5 个 kafka 代理Kafka 主题 - 15 个分区和 3 个复制因子. 注意:我在运行 Kafka Broker 的同一台机器上运行 Kafka Streams 应用程序. 每小时消耗/产生数百万条记录.每当我关闭任何 kafka 经纪人时,它 ..
发布时间:2021-11-12 03:39:30 其他开发