apache-kafka相关内容

我们有没有办法暂停卡夫卡流一段时间,然后再恢复?

我们有一个要求,我们使用Kafka Streams读取Kafka主题,然后通过一个会话池通过网络发送数据。然而,有时网络调用有点慢,我们需要频繁地暂停流,以确保我们没有使网络超载。目前,我们将数据捕获到流中,并将其加载到Executor服务,然后通过会话池通过网络发送。 如果Executor服务中的数据太高,我们需要暂停流一段时间,然后在Executor服务上的积压清理完毕后恢复它。为了实现 ..
发布时间:2022-04-18 16:50:52 Java开发

将Kafka Streams与Serde结合使用,这些Serde依赖于标头中的架构引用

我正在尝试使用Kafka Streams对CDC数据执行KTable-KTable外键联接。我将读取的数据是Avro格式的,但是它被序列化的方式与其他行业序列化程序/反序列化程序(例如。合流架构注册表),因为架构标识符存储在标头中。 当我设置KTables的Serdes时,我的Kafka Streams应用程序最初运行,但最终失败,因为它在内部调用了带有byte[] serialize(Stri ..
发布时间:2022-04-18 16:44:47 Java开发

Kafka kstream-kstream加入了滑动窗口,内存使用量随着时间的推移不断增长,直到OOM

我在使用kstream联接时遇到问题。我所做的是从一个主题中将3种不同类型的消息分离到新的流中。 然后与创建另一个流的两个流进行一次内部联接,最后我与新流和最后一个剩余的流进行最后一次左联接。 联接的窗口时间为30秒。 这样做是为了筛选出某些被其他邮件覆盖的邮件。 我在Kubernetes上运行此应用程序,Pod的磁盘空间一直在无限增长,直到Pod崩溃。 我已经意识到这是因 ..

将数据从Kafka商店转移到Kafka主题

我想用卡夫卡做这样的事情: 继续在KStream/Ktable/Kafka-store中存储数据 当我的应用程序收到特定事件/数据时,仅将上述存储区中的特定数据集发送到主题。 我们可以用卡夫卡做这个吗?我不认为单独使用Kafka消费者会有帮助,因为当一组数据已被使用时,我们不能启动/暂停消费者。 推荐答案 创建流和加入流的配置如下:- CREATE STREAM C ..
发布时间:2022-04-18 16:34:53 其他开发

如何阅读和处理卡夫卡消费者的高优先级消息?

有没有办法先处理优先级高的邮件? 我尝试创建了三个主题‘High’、‘Medium’和‘Low’,并使用一个使用者订阅了所有这三个主题,如果‘High’主题中有未处理的消息,它将暂停其他两个主题。有没有更好的实现消息优先级的方法? 我尝试使用下面给出的逻辑。 topics = ['high', 'medium', 'low'] consumer.subscribe(topics) h ..

无法从Windows生成运行在WSL 2上的卡夫卡主题

我在Ubuntu WSL2上成功运行了最新的Kafka。我可以在我在WSL上运行的Ubuntu中启动ZooKeeper、Kafka服务器、创建主题、控制台生产和控制台消费。但是,当我进入Windows上的IntelliJ并创建一个简单的Java生成器时,它似乎无法连接到代理 版本和主机名 Java version: 1.8 Kafka Version: 2.6 ..

如何将OPC-UA数据与Kafka连接?

我想将OPC-UA数据映射到Kafka主题中。例如,没有来自Confluent的特定连接器。 我将使用eclipse/Milo(https://github.com/eclipse/milo)。 推荐答案 OPC是你的服务器,有了OPC的节点,你就有了数据。当你有了数据,你就可以选择对数据做任何事情。你可以编写一个处理程序类或服务来处理从OPC服务器获取的数据。你必须用你喜欢的方 ..
发布时间:2022-04-16 20:52:29 其他开发

卡夫卡消费者正在花时间识别新的分区

我正在运行一个测试,其中Kafka使用者正在从一个主题的多个分区读取数据。在进程运行时,我添加了更多分区。使用者线程从新分区读取数据大约需要5分钟。我找到了这个配置“topic.metadata.renh.interval.ms”,但这只适用于生产者。消费者也有类似的配置吗? 推荐答案 我更改了";metadata.Max.age.ms";参数值以刷新元数据https: ..
发布时间:2022-04-15 16:07:18 其他开发

如何使用NodeJS消费来自Kafka-Consumer的最新消息?

我创建了一个NodeJS应用程序来将数据插入到MongoDB集合中。此数据库插入是通过使用Kafka完成的。Kafka-node是我用来调用Kafka的插件。 我可以创建主题并在生产者级别向消费者发送消息。消息和主题取自POST请求。 This is how I call the Kafka. Parameters are topic and message. 每次我调用此接口时,生 ..
发布时间:2022-04-15 16:06:13 其他开发

Kaka Auto.offset.Reset查询

我的项目使用的是Kafka 0.10.2版本。IAM在消费者中设置enable.Auto.Commit=FALSE和Auto.offset.Reset=LATEST。如果维护后重新启动消费者,则消费者将从第一个偏移量开始再次读取,而不是等待最新的偏移量消息。有什么原因会发生这种情况吗?我是否错误地理解了配置? 我的要求是,使用者不应自动提交,而应仅在主题处于活动状态时读取放入该主题中的新消息 ..
发布时间:2022-04-15 16:02:12 其他开发

Kafka Java使用者SSL握手错误:java.security.cert.cerfiateException:不存在使用者替代名称

我正在运行Kafka 2.13-2.4.1,并在用Java编写的Kafka客户端(消费者)和Kafka集群(3个节点,每个节点有一个Broker)之间配置一个SSL连接。 我通过Confluent's Documentation使用官方文档,它有单向身份验证(客户端没有证书),它不起作用,所以我不得不使用两种方式进行身份验证,然后消费者和生产者控制台都通过SSL进行通信,但当我使用我的Java消费 ..

卡夫卡适合运行公有API吗?

我有一个要发布的事件流。它被划分为多个主题,不断更新,需要水平扩展(没有SPOF很好),在某些情况下可能需要重播旧事件。所有似乎与卡夫卡的能力相匹配的功能。 我想通过任何人都可以连接并获取事件的公共API将其发布到世界各地。Kafka适合作为公共API公开吗? 我已经阅读了文档页面,但还没有更深入。ACL似乎是合理的。 我的顾虑 消费者将在世界任何地方。我看不出卡夫卡的建筑 ..
发布时间:2022-04-15 15:56:35 其他开发

如何删除Kafka消费群(通过新的消费者API创建)?

我通过新的消费者接口创建了Kafka消费者 我使用的是卡夫卡2.10-0.9.0.1 我们有一个消费者组,每个组中有一个消费者实例 Kafka脚本‘kafka-Consumer-groups.sh’提供了删除用户的方法,但这仅适用于旧的消费群体 正在运行命令: bin/kafka-Consumer-groups.sh 给予 --删除:警告:组删除仅适用于基于ZK的旧消费组, ..
发布时间:2022-04-15 15:53:40 Java开发

通过Kafka Consumer Retries维护订购保证

我正在为基于Kafka的数据处理管道中的消费者重试提出一个体系结构。我们正在使用Kafka的生产商和消费者,并正在考虑重试主题,如果他们在消费上出错,将发送哪些消息。将有使用者以特定的节奏运行这些重试主题。 我阅读了很多参考体系结构,但没有一个谈到如何在消息使用失败期间维护排序保证。让我举一个例子: 我们的Kafka消息包含有效负载,它有一个对象和一个操作类型(可以是创建/更新/删除) ..
发布时间:2022-04-15 15:52:07 其他开发

卡夫卡和防火墙规则

我们有相当严格的网络分段策略。我正在使用云代工实例将应用程序部署到。已经设置了防火墙规则,以便从云代工实例内部到达Kafka集群。我相信防火墙规则也是为了访问ZooKeeper实例而设置的。我需要实际确认这一点。 我的问题似乎是我可以向卡夫卡发送消息,但我的消费者似乎没有收到它们。它似乎在“轮询”时挂起。 我的防火墙规则是否需要处理一些隐藏的主机或端口,而不只是Kafka和ZooKee ..
发布时间:2022-04-15 15:50:24 其他开发