Spring Integraton RSocket和Spring RSocket交互问题 [英] Spring Integraton RSocket and Spring RSocket interaction issues
问题描述
我创建了一个新示例,并将代码放入客户端和服务器端.
I created a new sample and slipted the codes into client and server side.
完整代码可在此处找到.
服务器端有3个版本.
- 服务器:无Spring Boot应用程序,使用Spring Integration RSocket InboundGateway.
- 服务器启动,重新使用Spring RSocket自动配置,并通过
ServerRSocketMessageHanlder
创建ServerRSocketConnecter
. - 服务器引导消息映射不使用Spring集成,只需使用Spring Boot RSocket自配置,以及
@Controller
和@MessageMapping
.
- server None Spring Boot app, using Spring Integration RSocket InboundGateway.
- server-boot Reuse Spring RSocket autconfiguration, and created
ServerRSocketConnecter
throughServerRSocketMessageHanlder
. - server-boot-messsagemapping Not use Spring Integration, just use Spring Boot RSocket autconfiguration, and
@Controller
and@MessageMapping
.
有2个版本的客户端.
- 客户端,使用Spring Integration Rocket OutboundGateway发送消息.
- 客户端请求者使用
RSocketRequester
发送消息,完全不使用Spring Integration.
- client, Sending messages using Spring Integration Rocket OutboundGateway.
- client-requester Send messages using
RSocketRequester
, not use Spring Integration at all.
客户端和服务器的交互模式为REQUEST_CHANNEL,并通过TCP/localhost:7000连接服务器.
The client and server interaction mode is REQUEST_CHANNEL, and connect server via TCP/localhost:7000.
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
应用程序类:
@Configuration
@ComponentScan
@IntegrationComponentScan
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) throws IOException {
try (ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(DemoApplication.class)) {
System.out.println("Press any key to exit.");
System.in.read();
} finally {
System.out.println("Exited.");
}
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
return new ServerRSocketConnector("localhost", 7000);
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel)
.rsocketConnector(serverRSocketConnector)
)
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}
服务器启动
pom.xml中的依赖项.
server-boot
Dependencies in pom.xml.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
application.properties
application.properties
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp
应用程序类.
@SpringBootApplication
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) throws IOException {
SpringApplication.run(DemoApplication.class, args);
}
// see PR: https://github.com/spring-projects/spring-boot/pull/18834
@Bean
ServerRSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies) {
var handler = new ServerRSocketMessageHandler(true);
handler.setRSocketStrategies(rSocketStrategies);
return handler;
}
@Bean
public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
return new ServerRSocketConnector(serverRSocketMessageHandler);
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel)
.rsocketConnector(serverRSocketConnector)
)
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}
server-boot-messagemapping
pom.xml中的依赖项.
server-boot-messagemapping
Dependencies in pom.xml.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
application.properties.
The application.properties.
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp
应用程序类.
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@Controller
class UpperCaseHandler {
@MessageMapping("/uppercase")
public Flux<String> uppercase(Flux<String> input) {
return input.map(String::toUpperCase);
}
}
客户端
在客户端中, pom.xml 中的依赖项类似于.
client
In the client, the dependencies in the pom.xml is like.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
应用程序类:
@SpringBootApplication
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", 7000);
clientRSocketConnector.setAutoStartup(false);
return clientRSocketConnector;
}
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel((message) -> RSocketInteractionModel.requestChannel)
.expectedResponseType("T(java.lang.String)")
.clientRSocketConnector(clientRSocketConnector))
.get();
}
}
@RestController
class HelloController {
@Autowired()
@Lazy
@Qualifier("rsocketUpperCaseRequestFlow.gateway")
private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
}
}
在运行客户端和服务器应用程序时,请尝试通过curl
访问http://localhost:8080/hello
.
When running the client and server application, and try to access the http://localhost:8080/hello
by curl
.
使用 server 和 server-boot (使用InboundGateway处理消息)时,输出看起来像这样.
When using server and server-boot which uses InboundGateway to handle messages, the output looks like this.
curl http://localhost:8080/hello
data:ABCD
使用 server-boot-messagemapping 时,输出如我所愿:
When using server-boot-messagemapping, the output is woking as I expected:
data:A
data:B
data:C
data:D
客户端请求者
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
应用程序类:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@RestController
class HelloController {
Mono<RSocketRequester> requesterMono;
public HelloController(RSocketRequester.Builder builder) {
this.requesterMono = builder.connectTcp("localhost", 7000);
}
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return requesterMono.flatMapMany(
rSocketRequester -> rSocketRequester.route("/uppercase")
.data(Flux.just("a", "b", "c", "d"))
.retrieveFlux(String.class)
);
}
}
在运行此客户端和3台服务器时,请尝试通过curl
访问http://localhost:8080/hello
.
When running this client and the 3 servers, and try to access the http://localhost:8080/hello
by curl
.
在使用 server 和 server-boot (使用InboundGateway来处理消息)时,它将引发类强制转换异常.
When using server and server-boot which uses InboundGateway to handle messages, it throws a class cast exception.
使用 server-boot-messagemapping 时,输出如我所愿:
When using server-boot-messagemapping, the output is woking as I expected:
data:A
data:B
data:C
data:D
我不知道InboundGateway和OutboundGateway的配置问题在哪里?
I do not know where is the problem of the configuration of InboundGateway and OutboundGateway?
推荐答案
感谢您提供如此详细的示例!
Thank you for such a detailed sample!
所以,我所看到的.两种客户端(普通RSocketRequester
和Spring Integration)都可以与普通的RSocket服务器一起很好地工作.
So, what I see. Both clients (plain RSocketRequester
and Spring Integration) work well with plain RSocket server.
要使其与Spring Integration Server一起使用,您必须进行以下更改:
To make them working with Spring Integration server you have to do this changes:
- 服务器端:
将.requestElementType(ResolvableType.forClass(String.class))
添加到RSockets.inboundGateway()
定义中,这样它将知道如何转换传入的有效负载.
Add .requestElementType(ResolvableType.forClass(String.class))
into an RSockets.inboundGateway()
definition, so it will know to what to convert an incoming payloads.
-
客户端:
The client side:
.data(Flux.just("a\n", "b\n", "c\n", "d\n"))
.
当前,Spring Integration的服务器端不会将传入的Flux
视为独立的有效负载流.因此,我们尝试将所有这些都连接到一个值中.
新的行定界符是我们期望独立值的指标. Spring Messaging的功能恰好相反:它检查multi-value
期望的类型,并在其map()
中解码传入的Flux
中的每个元素,而不是尝试整个Publisher
解码.
Currently the server side of Spring Integration doesn't treat an incoming Flux
as a stream of independent payloads. So, we try to connect all of them into a single value.
The new line delimiter is an indicator that we expect independent values. Spring Messaging on its side does exactly opposite: it checks for multi-value
expected type and decode every element in the incoming Flux
in its map()
instead of an attempt for the whole Publisher
decoding.
这将是一个重大突破,但可能需要考虑修复RSocketInboundGateway
逻辑以使其与常规的@MessageMapping
一致,以支持RSocket.随时提出GH问题!
It's going to be kinda breaking change, but possibly need to consider to fix RSocketInboundGateway
logic to be consistent with regular @MessageMapping
for RSocket support. Feel free to raise a GH issue!
这篇关于Spring Integraton RSocket和Spring RSocket交互问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!