kafka-consumer-api相关内容

打造消费者动态春天卡夫卡

我正在创建一个与另一个服务对话的服务,以确定要收听的卡夫卡主题。卡夫卡主题可能有不同的键和值类型。因此,我希望为每个配置(主题、键类型、值类型)动态创建不同的Kafka消费者,其中配置仅在运行时才知道。然而,在春季的Kafka中,我看不到动态传递所有这些参数的方法(至少我不知道有)。我应该如何进行这项工作。 推荐答案 只需在运行时创建新的侦听器容器。 https://docs.s ..
发布时间:2022-08-13 15:40:48 其他开发

负载测试卡夫卡消费者

(我正在编辑问题,因为我认为它不够清楚) 如何对我的Kafka消费者进行负载测试? 我看过很多关于负载测试的文章,但是没有一篇是关于负载测试消费者的。 为了前任。我已经在jsr223中编写了jeter Kafka制作人测试计划,将事件发送到Kafka主题。 我的卡夫卡弹簧靴消费者听这个话题,消费事件,开始处理。 Jeter只给我看了关于制片人的报告,仅此而已。 有没有更好的方法来加载测试Sp ..
发布时间:2022-07-16 20:19:59 其他开发

卡夫卡消费者群体ID与消费者再平衡问题

我在我的生产服务器中使用Kafka0.10.0和ZooKeeper3.4.6。我有20个主题,每个主题大约有50个分区。我总共有100个用户,每个用户都订阅了不同的主题和分区。所有用户都有相同的groupID。那么,如果为特定主题添加或删除消费者,那么附加到不同主题的消费者也将经历重新平衡吗? 我的消费者代码是: public static void main(String[] arg ..

阿帕奇-卡夫卡有1亿个主题

我正在尝试用ApacheKafka替换兔子MQ,在规划时,我遇到了几个概念性规划问题。 首先,我们对每用户队列策略使用Rabb MQ,这意味着每个用户使用一个队列。这符合我们的需要,因为每个用户代表要与该特定用户一起完成的一些工作,并且如果该用户导致问题,则队列对于其他用户永远不会有问题,因为队列是分开的(问题意味着队列中的消息将使用http请求被分派给用户。如果用户拒绝接收消息(服务器可能会关 ..
发布时间:2022-07-16 19:53:48 其他开发

卡夫卡消费者正在花时间识别新的分区

我正在运行一个测试,其中Kafka使用者正在从一个主题的多个分区读取数据。在进程运行时,我添加了更多分区。使用者线程从新分区读取数据大约需要5分钟。我找到了这个配置“topic.metadata.renh.interval.ms”,但这只适用于生产者。消费者也有类似的配置吗? 推荐答案 我更改了";metadata.Max.age.ms";参数值以刷新元数据https: ..
发布时间:2022-04-15 16:07:18 其他开发

如何使用NodeJS消费来自Kafka-Consumer的最新消息?

我创建了一个NodeJS应用程序来将数据插入到MongoDB集合中。此数据库插入是通过使用Kafka完成的。Kafka-node是我用来调用Kafka的插件。 我可以创建主题并在生产者级别向消费者发送消息。消息和主题取自POST请求。 This is how I call the Kafka. Parameters are topic and message. 每次我调用此接口时,生 ..
发布时间:2022-04-15 16:06:13 其他开发

Kaka Auto.offset.Reset查询

我的项目使用的是Kafka 0.10.2版本。IAM在消费者中设置enable.Auto.Commit=FALSE和Auto.offset.Reset=LATEST。如果维护后重新启动消费者,则消费者将从第一个偏移量开始再次读取,而不是等待最新的偏移量消息。有什么原因会发生这种情况吗?我是否错误地理解了配置? 我的要求是,使用者不应自动提交,而应仅在主题处于活动状态时读取放入该主题中的新消息 ..
发布时间:2022-04-15 16:02:12 其他开发

Kafka Java使用者SSL握手错误:java.security.cert.cerfiateException:不存在使用者替代名称

我正在运行Kafka 2.13-2.4.1,并在用Java编写的Kafka客户端(消费者)和Kafka集群(3个节点,每个节点有一个Broker)之间配置一个SSL连接。 我通过Confluent's Documentation使用官方文档,它有单向身份验证(客户端没有证书),它不起作用,所以我不得不使用两种方式进行身份验证,然后消费者和生产者控制台都通过SSL进行通信,但当我使用我的Java消费 ..

卡夫卡适合运行公有API吗?

我有一个要发布的事件流。它被划分为多个主题,不断更新,需要水平扩展(没有SPOF很好),在某些情况下可能需要重播旧事件。所有似乎与卡夫卡的能力相匹配的功能。 我想通过任何人都可以连接并获取事件的公共API将其发布到世界各地。Kafka适合作为公共API公开吗? 我已经阅读了文档页面,但还没有更深入。ACL似乎是合理的。 我的顾虑 消费者将在世界任何地方。我看不出卡夫卡的建筑 ..
发布时间:2022-04-15 15:56:35 其他开发

如何删除Kafka消费群(通过新的消费者API创建)?

我通过新的消费者接口创建了Kafka消费者 我使用的是卡夫卡2.10-0.9.0.1 我们有一个消费者组,每个组中有一个消费者实例 Kafka脚本‘kafka-Consumer-groups.sh’提供了删除用户的方法,但这仅适用于旧的消费群体 正在运行命令: bin/kafka-Consumer-groups.sh 给予 --删除:警告:组删除仅适用于基于ZK的旧消费组, ..
发布时间:2022-04-15 15:53:40 Java开发

通过Kafka Consumer Retries维护订购保证

我正在为基于Kafka的数据处理管道中的消费者重试提出一个体系结构。我们正在使用Kafka的生产商和消费者,并正在考虑重试主题,如果他们在消费上出错,将发送哪些消息。将有使用者以特定的节奏运行这些重试主题。 我阅读了很多参考体系结构,但没有一个谈到如何在消息使用失败期间维护排序保证。让我举一个例子: 我们的Kafka消息包含有效负载,它有一个对象和一个操作类型(可以是创建/更新/删除) ..
发布时间:2022-04-15 15:52:07 其他开发

卡夫卡和防火墙规则

我们有相当严格的网络分段策略。我正在使用云代工实例将应用程序部署到。已经设置了防火墙规则,以便从云代工实例内部到达Kafka集群。我相信防火墙规则也是为了访问ZooKeeper实例而设置的。我需要实际确认这一点。 我的问题似乎是我可以向卡夫卡发送消息,但我的消费者似乎没有收到它们。它似乎在“轮询”时挂起。 我的防火墙规则是否需要处理一些隐藏的主机或端口,而不只是Kafka和ZooKee ..
发布时间:2022-04-15 15:50:24 其他开发

如何使用Quarkus在Kafka中设置同一主题中的多个消费者

我正在使用Quarkus框架构建一个Kafka消费者,它将读取带有3个分区的主题。下面的代码片段正在工作,但根据日志,我只是启动了具有3个分区的1个使用者。我现在的问题是,一旦我运行我的应用程序,我如何才能产生3个消费者。 @Incoming("topic-1") public CompletionStage onMessage(KafkaRecord ..

多个主题的 Kafka 消费者

我有一个主题列表(现在是 10 个),它们的大小将来会增加.我知道我们可以产生多个线程(每个主题)来从每个主题消费,但在我的情况下,如果主题数量增加,那么从主题消费的线程数量会增加,这是我不想要的,因为主题不是将过于频繁地获取数据,因此线程将处于理想状态. 有没有办法让一个消费者从所有主题中消费?如果是,那么我们如何才能实现它?Kafka 将如何维护偏移量?请提出答案. 解决方案 我 ..
发布时间:2021-12-24 17:28:02 Java开发