reactor相关内容

如何重试反应堆中出现故障的耗材记录-卡夫卡

我正在尝试Reactive-Kafka消费消息。其他一切都很好,但我想为失败的消息添加一个重试(2)。默认情况下,Spring-Kafka已经重试失败记录3次,我想使用Reactive-Kafka实现相同的记录。 我正在使用Spring-Kafka作为Active-Kafka的包装器。以下是我的消费者模板: reactiveKafkaConsumerTemplate ..
发布时间:2022-05-06 15:27:38 其他开发

为什么对 Redis 使用异步客户端有意义?

在这个 页面列出 redis 客户端中,我统计了 8 个异步库.我的理解是,像 node.js 或 tornado 这样的框架只有在异步回调函数没有为 I/O 相互争斗时才有意义,否则你最好同步. 但是Redis是单线程的.所以他们实际上是在为 I/O 而战.Redis 的单线程特性不是取消了异步回调的所有潜在好处吗?为什么在 Redis 中使用异步客户端有意义? 解决方案 Redi ..
发布时间:2021-12-28 09:40:36 其他开发

nodejs 是代表 Reactor 还是 Proactor 设计模式?

网上很多文章都展示了nodejs作为反应器模式的一个例子.不就是前摄吗? 据我了解,两者的区别在于: reactor 在单线程(同步)中处理事件、 proactor 处理事件是具有完成回调的多线程(异步). 例如在这篇文章中: Reactor Pattern 是 Node.js 中非阻塞 I/O 操作的一种想法.该模式提供了一个与每个 I/O 操作相关联的处理程序(在 N ..
发布时间:2021-11-17 01:57:02 其他开发

反应程序在将所有消息发送到 Kafka 之前提前退出

这是之前反应式 kafka 问题的后续问题(将数据流发送到反应式 kafka 时出现的问题). 我正在尝试使用反应式方法向 kafka 发送一些日志记录.这是使用反应式 kafka 发送消息的反应式代码. 公共类 LogProducer {私人最终 KafkaSender发件人;公共日志生产者(字符串引导服务器){映射props = new HashMap();props.put(Prod ..

反应程序在将所有消息发送到 Kafka 之前提前退出

这是之前反应式 kafka 问题的后续问题(将数据流发送到反应式 kafka 时出现的问题). 我正在尝试使用反应式方法向 kafka 发送一些日志记录.这是使用反应式 kafka 发送消息的反应式代码. 公共类 LogProducer {私人最终 KafkaSender发件人;公共日志生产者(字符串引导服务器){映射props = new HashMap();props.put(Prod ..

在单个程序中多次启动扭曲反应堆?

是否可以在同一个程序中多次启动反应器?假设您想将扭曲的功能封装在一个方法中,用于 API 目的. 例如,mymodule.py 看起来像这样: 1 fromtwisted.web.client import getPage2 来自twisted.internet 进口反应堆34 def _result(r):5 打印6 reactor.stop()78 def_error(e):9 打印 ..
发布时间:2021-09-10 20:48:26 Python

Python 的 Twisted Reactor 是如何工作的?

最近,我一直在研究 Twisted 文档.从我收集到的信息来看,Twisted 功能的基础是其称为“反应器"的事件循环的结果.反应器侦听某些事件并将它们分派到已注册的回调函数,这些回调函数旨在处理这些事件.在书中,有一些伪代码描述了 Reactor 的作用,但我无法理解它,这对我来说没有任何意义. 为真:超时 = time_until_next_timed_event()事件 = wait_f ..
发布时间:2021-09-10 20:46:51 Python

在条件下停止扭曲反应器

有没有办法在达到一定条件时停止扭曲的反应器.例如,如果一个变量被设置为某个值,那么反应器应该停止吗? 解决方案 理想情况下,您不会将变量设置为值并停止反应器,而是调用 reactor.stop().有时您不在主线程中,这是不允许的,因此您可能需要调用 reactor.callFromThread.以下是三个工作示例: # 在主线程中:反应器停止()# 在非主线程中:reactor.cal ..
发布时间:2021-09-10 20:46:46 其他开发

将 Mono 与 Flux 结合

我想创建一个服务,将来自两个反应源的结果结合起来.一个是生产 Mono,另一个是生产 Flux.对于合并,对于每个发射的通量,我需要相同的单声道值. 现在我有这样的东西 Flux.zip(service1.getConfig(),//产生通量service2.getContext()//产生单声道.cache().repeat()) 这给了我我需要的东西, service2 只调用 ..
发布时间:2021-06-22 18:35:48 Java开发

Project Reactor,在创建 lambda 之外使用 Flux sink

当我的服务启动时,我想构建一个简单的管道. 我想隔离 Flux 接收器或处理器来发出事件. 事件将来自多个线程,应根据管道的 subscribeOn() 规范进行处理,但一切似乎都在 main 线程上运行.最好的方法是什么?我在下面附上了我的尝试. (我使用的是 reactor-core v3.2.8.RELEASE.) import org.junit.jupiter.api.Tes ..
发布时间:2021-06-22 18:34:46 Java开发

Reactor 中`groupBy` 组的并行调度

我正在学习 Reactor,我想知道如何实现某种行为.假设我有一个传入消息流.每条消息都与某个实体相关联并包含一些数据. interface Message {字符串 getEntityId();数据 getData();} 与不同实体相关的消息可以并行处理.但是,与任何单个实体有关的消息必须一次处理一个,即实体 "abc" 的消息 2 的处理不能开始,直到实体 "abc" 已完成.在处理消息 ..
发布时间:2021-06-22 18:34:29 Java开发

Project Reactor 中的背压是如何工作的?

我一直在使用 Spring Reactor 并且之前进行了一些测试,这让我想知道 Fluxes 默认如何处理背压.我知道 onBackpressureBuffer 等存在,我也读过 RxJava 默认为无界,直到你定义是否缓冲、丢弃等 那么,谁能为我澄清一下:Reactor 3 中 Flux 的默认背压行为是什么? 我尝试寻找答案,但没有找到任何明确的答案,只有背压的定义或上面链接的 ..
发布时间:2021-06-22 18:33:24 其他开发