apache-kafka-streams相关内容
我计划设置MySQL到Kafka的流程,最终目标是计划一个流程,根据更改的数据重新计算MongoDB文档。 这可能涉及直接修补MongoDB文档,或运行将重新创建整个文档的进程。 我的问题是,如果MySQL数据库的一组更改都与一个MongoDB文档相关,那么我不想为每个更改实时重新运行重新计算过程,我想等待更改‘结算’,以便只在需要时运行重新计算过程。 有没有办法“揭穿”卡夫卡之
..
我正在尝试将StreamsUncaughtExceptionHandler添加到我的Kafka流处理器中。该处理器是用Kafka函数编写的。我查看了suggestion provided by Artem Bilan以将StreamsUncaughtExceptionHandler包括到我的服务中,但我的异常从未被它捕获/处理。 配置Bean: @Autowired UnCaughtE
..
默认情况下,.windowedBy(SessionWindows.with(Duration.ofSeconds(60))为每个传入记录返回一条记录。 结合使用.count()和.filter()可以轻松检索第一条记录。 使用 .suppress(Suppressed.untilWindowCloses(unbounded()))还可以轻松检索最后一条记录。 所以…我做了两次处理
..
我正在尝试Kafka Streams的字数统计问题。我正在使用带有Scala版本2.11.12和SBT版本1.1.4的Kafka 1.1.0。我收到以下错误: Exception in thread "wordcount-application-d81ee069-9307-46f1-8e71-c9f777d2db64-StreamThread-1" java.lang.Unsatisfied
..
我有一个Kafka流--比如博客和Kafka表--比如那些博客相关的评论。Kafka流中的key可以映射到Kafka表中的多个值,即一个博客可以有多条评论。我想连接这两个对象,并创建一个带有注释ID数组的新对象。但是当我连接时,流只包含最后一个注释id。有没有任何文档或示例代码可以为我指明如何实现这一点的正确方向?基本上,有没有文档详细说明如何使用Kafka流和Kafka表进行一对多关系连接?
..
我有一个Kafka Streams应用程序,它从几个主题中获取数据,并连接数据并将其放入另一个主题中。 卡夫卡配置: 5 kafka brokers Kafka Topics - 15 partitions and 3 replication factor. 注意:我正在运行Kafka Broker的同一台计算机上运行Kafka Streams应用程序。 每小时消耗/产生数
..
如@https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results所述,我有一个要求,即等待窗口关闭,以便通过在窗口持续时间内对其进行缓冲来处理后期无序事件。 根据我对此功能的理解,一旦创建了窗口,窗口的工作方式就像墙上的时钟处理一样,例如创建1小时的窗口,一旦
..
我正在为我的Spring Kafka Streams应用程序的定制而苦苦挣扎。 我一直在尝试在我的KStreams中配置未捕获(运行时异常)处理。 参照文档https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html
..
我使用的是Spring Cloud Stream和Kafka Stream。假设我有一个处理器,它是一个将字符串的KStream转换为CityProgrammes的KStream的函数。它调用一个API来按名称查找City,并调用另一个转换来查找该城市附近的任何事件。 现在的问题是转换过程中发生任何错误,整个应用程序都会停止。我想把这一条特别的信息发送给DLQ,然后继续前进。我已经读了几天了
..
我们有一个Streams应用程序,它使用源主题中的消息,执行一些处理并将结果转发到目标主题。 消息的结构由某些Avro架构控制。 当开始使用消息时,如果架构尚未缓存,应用程序将尝试从架构注册表中检索它。如果由于任何原因架构注册表不可用(例如网络故障),则当前正在处理的消息将丢失,因为默认处理程序是名为LogAndContinueExceptionHandler的处理程序。 o.a.k
..
我们可以在不聚合的情况下应用Kafka窗口操作吗?我需要过去10分钟内的所有数据(而不是计数)?我所看到的是,大多数示例使用了窗口滑动的聚合。 推荐答案 您可以使用KafkaConsumer#offsetsForTimes方法获取分区到偏移量映射中10分钟前的偏移量。 使用该信息,循环遍历映射并seek您的使用者到这些分区偏移量。 然后开始轮询,直到记录时间戳超出您请求的时
..
我将Spring Cloud Streams与Kafka Streams绑定器一起使用,功能 样式处理器API和多处理器。 以这种方式配置一个具有多个处理器和多个Kafka主题的处理应用程序,并使用/Actuator、WebClient等方式保持在Spring Boot领域,这真的很酷。事实上,我更喜欢它,而不是使用纯阿帕奇Kafka Streams。 但是:我希望为处理器内发生的异常
..
这与Is there a "Circuit Breaker" for Spring Boot Kafka client?有关,但我仍然认为这是一个不同的问题:) 我们需要配置Spring Boot Kafka客户端,以便它根本不会尝试连接。 用例是,在测试环境中,我们没有运行Kafka,但我们仍然需要构建完整的Spring Boot上下文,因此使该Bean以配置文件为条件是不起作用的。
..
我正在寻找使用模式注册中心的kafka-stream。我有谷歌,但找不到合适的教程。 推荐答案 文档在此处 https://docs.confluent.io/current/streams/developer-guide/datatypes.html#avro 这是依赖项 io.confluent
..
我正在尝试了解Kafka Streams API的体系结构,在documentation: 中遇到了这一点 应用程序的处理器拓扑通过将其分解为多个任务来进行扩展 将处理器拓扑分解为任务的所有标准是什么?只是流/主题中的分区数还是更多。 然后任务可以根据分配的分区实例化它们自己的处理器拓扑 有没有人能举个例子解释一下上面的意思?如果创建任务的目的只是为了扩展,它们不应该都具有相
..
我们有一个要求,我们使用Kafka Streams读取Kafka主题,然后通过一个会话池通过网络发送数据。然而,有时网络调用有点慢,我们需要频繁地暂停流,以确保我们没有使网络超载。目前,我们将数据捕获到流中,并将其加载到Executor服务,然后通过会话池通过网络发送。 如果Executor服务中的数据太高,我们需要暂停流一段时间,然后在Executor服务上的积压清理完毕后恢复它。为了实现
..
如果Kafka服务器(暂时)关闭,我的Spring Boot应用程序ReactiveKafkaConsumerTemplate一直尝试连接不成功,从而造成不必要的流量和日志文件混乱: 2021-11-10 14:45:30.265 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Con
..
在只有一个Kafka Broker的测试设置中启动我们的Kafka Streams应用程序时,我们看到以下错误,大约每15次运行中就有一次出现以下错误: org.apache.kafka.streams.errors.StreamsException: Existing internal topic alarm-message-streams-by-organization-repartit
..
我正在尝试使用Kafka Streams对CDC数据执行KTable-KTable外键联接。我将读取的数据是Avro格式的,但是它被序列化的方式与其他行业序列化程序/反序列化程序(例如。合流架构注册表),因为架构标识符存储在标头中。 当我设置KTables的Serdes时,我的Kafka Streams应用程序最初运行,但最终失败,因为它在内部调用了带有byte[] serialize(Stri
..
我正在尝试使用Kafka Streams,我已经创建了以下拓扑: KStream eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(), historyEventSerde)); eventStrea
..