apache-kafka-streams相关内容

Kafka kstream-kstream加入了滑动窗口,内存使用量随着时间的推移不断增长,直到OOM

我在使用kstream联接时遇到问题。我所做的是从一个主题中将3种不同类型的消息分离到新的流中。 然后与创建另一个流的两个流进行一次内部联接,最后我与新流和最后一个剩余的流进行最后一次左联接。 联接的窗口时间为30秒。 这样做是为了筛选出某些被其他邮件覆盖的邮件。 我在Kubernetes上运行此应用程序,Pod的磁盘空间一直在无限增长,直到Pod崩溃。 我已经意识到这是因 ..

如何更改记录的时间戳?

我正在使用FluentD(v.12最后一个稳定版本)向Kafka发送消息。但FluentD使用的是旧的KafkaProducer,因此记录时间戳始终设置为-1。 因此,我必须使用WallclockTimestampExtractor将记录的时间戳设置为消息到达Kafka时的时间点。 是否有特定于Kafka Streams的解决方案? 我真正感兴趣的时间戳是由fluentd在消息中发送的 ..
发布时间:2022-04-18 16:38:49 Java开发

将数据从Kafka商店转移到Kafka主题

我想用卡夫卡做这样的事情: 继续在KStream/Ktable/Kafka-store中存储数据 当我的应用程序收到特定事件/数据时,仅将上述存储区中的特定数据集发送到主题。 我们可以用卡夫卡做这个吗?我不认为单独使用Kafka消费者会有帮助,因为当一组数据已被使用时,我们不能启动/暂停消费者。 推荐答案 创建流和加入流的配置如下:- CREATE STREAM C ..
发布时间:2022-04-18 16:34:53 其他开发

如何阅读和处理卡夫卡消费者的高优先级消息?

有没有办法先处理优先级高的邮件? 我尝试创建了三个主题‘High’、‘Medium’和‘Low’,并使用一个使用者订阅了所有这三个主题,如果‘High’主题中有未处理的消息,它将暂停其他两个主题。有没有更好的实现消息优先级的方法? 我尝试使用下面给出的逻辑。 topics = ['high', 'medium', 'low'] consumer.subscribe(topics) h ..

如何管理Kafka KStream到Kstream窗口连接?

基于 apache Kafka 文档 KStream-to-KStream Joins 总是窗口连接,我的问题是如何控制窗口的大小?保持主题数据的大小是否相同?或者,例如,我们可以保留 1 个月的数据,但只加入过去一周的数据流? 有没有什么好的例子来展示一个窗口化的 KStream-to-kStream 窗口化连接? 就我而言,假设我有 2 个 KStream,kstream1 和 k ..
发布时间:2021-12-24 17:19:42 其他开发

Kafka Stream StateStore 在所有实例上是全局的还是本地的?

在 Kafka Stream WordCount 示例中,它使用 StateStore 来存储字数.如果同一个消费者组中有多个实例,StateStore 对组来说是全局的,还是只是一个消费者实例的局部? 感谢 解决方案 这取决于您对 state store 的看法. 在 Kafka Streams 中,状态是共享的,因此每个实例都包含整个应用程序状态的一部分.例如,使用 DSL ..
发布时间:2021-11-28 21:39:27 其他开发

如何批量处理最大大小的 KStream 或回退到时间窗口?

我想创建一个基于 Kafka 流的应用程序,该应用程序处理一个主题并分批接收大小为 X(即 50)的消息,但如果流的流量较低,则在 Y 秒内为我提供流中的任何内容(即5). 因此,我不是一个一个地处理消息,而是处理一个 List[Record],其中列表的大小为 50(或可能更少). 这是为了让一些 I/O 绑定处理更高效. 我知道这可以用经典的 Kafka API 来实现,但我 ..
发布时间:2021-11-28 21:39:16 其他开发

Kafka 如何使用所需的偏移量提交消费者偏移量

我有 Kafka Stream 应用程序.我的应用程序正在成功处理事件. 如何使用所需的偏移量更改 Kafka 提交的消费者偏移量以重新处理/跳过事件.我尝试了如何更改主题的起始偏移量?.但我收到“节点不存在:"错误.请帮帮我. 解决方案 您所指的问题/答案基于较旧的 Kafka 版本.从 Kafka 0.9 开始,偏移量不再提交给 ZooKeeper,而是存储在一个名为 offse ..

流数据处理和纳秒时间分辨率

我刚刚开始讨论实时流数据处理框架的话题,我有一个问题,目前我还没有找到任何决定性的答案: 通常的怀疑对象(Apache 的 Spark、Kafka、Storm、Flink 等)是否支持以 纳秒(甚至皮秒)的事件时间分辨率处理数据? 大多数人和文档都谈论毫秒或微秒分辨率,但我无法找到明确的答案,如果可能有更多分辨率或问题.我推断唯一具有此功能的框架是 influxData 的 Kapac ..

在运行时部署流处理拓扑?

大家好, 我有一个要求,我需要重新提取一些旧数据.我们有一个多阶段管道,其来源是一个 Kafka 主题.一旦将记录输入其中,它就会运行一系列步骤(大约 10 个).每一步都会对推送到源主题的原始 JSON 对象进行按摩,然后推送到目标主题. 现在,有时,我们需要重新摄取旧数据并应用我上面描述的步骤的一个子集.我们打算将这些重新摄取记录推送到不同的主题,以免阻止通过的“实时"数据,这可能 ..

如何在 Kafka 流中使用带有 KTable 的自定义序列化程序?

当我将 groupBy 引入我的 KStream 时,我在将 KStream 序列化为 KTable 的过程中遇到了一些错误.据我了解,一旦您在 KStream 上拥有 aggregate 或 reduce,Kafka 会尝试将其转换为 KTable> 由于必要的 shuffle,因此 Kafka 必须再次序列化记录.所以,我原来的 KStream 只是像这样将记录从 JSON 映射到 AVRO, ..
发布时间:2021-11-12 03:45:47 其他开发

如果每个主题都有单个分区,可扩展性是否适用于 Kafka 流

我根据 Kafka 流文档的理解,最大可能的并行任务数等于集群中所有主题中某个主题的最大分区数. 我在 Kafka 集群中有大约 60 个主题.每个主题只有一个分区.是否可以通过 Kafka 流为我的 Kafka 集群实现可扩展性/并行性? 解决方案 您想对所有主题进行相同的计算吗?为此,我建议引入一个额外的主题,其中包含许多用于横向扩展的分区: //使用新的 1.0 APIStr ..
发布时间:2021-11-12 03:45:42 其他开发

具有依赖对象的Kafka Streams等待功能

我创建了一个 Kafka Streams 应用程序,它接收来自不同主题的不同 JSON 对象,我想实现某种等待功能,但我不确定如何最好地实现它. 为了简化问题,我将在下一节中使用简化的实体,我希望可以很好地描述问题.所以在我的一个流中,我收到了汽车对象,每辆车都有一个 id.在第二个流中,我收到 person 对象,每个人也有一个汽车 ID,并被分配给具有此 ID 的汽车. 我想使用我 ..
发布时间:2021-11-12 03:45:38 其他开发

Kafka Streams 用于计数总数?

一个名为“addcash"的topic,有3个分区(kafka集群的机器也是3个),里面流着很多用户充值的消息.我想每天数一数总钱数.我从一些关于 Kafka Streams 的文章中了解到:Kafka Streams 将拓扑作为任务运行,任务的数量取决于主题的分区数量,每个任务都有单独的状态存储.那么当我按状态计算总金额时,是否有三个值,而不是一个总值会返回?正确的做法是什么?谢谢! 解决 ..
发布时间:2021-11-12 03:45:36 其他开发

更改 Kafka-streams 拓扑(添加重新分区步骤)是否对消息处理保证有任何影响

假设我想进行一些可配置的转换“A".此转换使用 state-store 管理一些状态,并且还需要重新分区,这意味着只有在配置后才会进行重新分区.现在,如果我以下列方式(或任何其他组合)运行该应用程序 3 次(也可能是滚动升级):- 转换“A"被禁用 转换“A"已启用 转换“A"被禁用 鉴于所有 3 次运行都使用相同的 Kafka 代理集群:- 如果启用了 EOS,那么 ..
发布时间:2021-11-12 03:45:33 Java开发

聚合中使用了错误的序列化程序

我正在处理一个 kafka-streams 应用程序,我在其中处理日志事件.在这种情况下,我想将 WorkflowInput 类型聚合为 Workflow 类型.我在使聚合正常工作时遇到问题. final KStream过滤流 = someStream;最终 KTable聚合工作流 = 过滤流.peek((k, v) -> {if (!(v instanceof WorkflowInput)) ..
发布时间:2021-11-12 03:45:27 其他开发