Spring Integraton RSocket和Spring RSocket交互问题 [英] Spring Integraton RSocket and Spring RSocket interaction issues

查看:153
本文介绍了Spring Integraton RSocket和Spring RSocket交互问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个新示例,并将代码放入客户端和服务器端.

I created a new sample and slipted the codes into client and server side.

完整代码可在此处找到.

服务器端有3个版本.

  • 服务器:无Spring Boot应用程序,使用Spring Integration RSocket InboundGateway.
  • 服务器启动,重新使用Spring RSocket自动配置,并通过ServerRSocketMessageHanlder创建ServerRSocketConnecter.
  • 服务器引导消息映射不使用Spring集成,只需使用Spring Boot RSocket自配置,以及@Controller@MessageMapping.
  • server None Spring Boot app, using Spring Integration RSocket InboundGateway.
  • server-boot Reuse Spring RSocket autconfiguration, and created ServerRSocketConnecter through ServerRSocketMessageHanlder.
  • server-boot-messsagemapping Not use Spring Integration, just use Spring Boot RSocket autconfiguration, and @Controller and @MessageMapping.

有2个版本的客户端.

  • 客户端,使用Spring Integration Rocket OutboundGateway发送消息.
  • 客户端请求者使用RSocketRequester发送消息,完全不使用Spring Integration.
  • client, Sending messages using Spring Integration Rocket OutboundGateway.
  • client-requester Send messages using RSocketRequester, not use Spring Integration at all.

客户端和服务器的交互模式为REQUEST_CHANNEL,并通过TCP/localhost:7000连接服务器.

The client and server interaction mode is REQUEST_CHANNEL, and connect server via TCP/localhost:7000.

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
</dependency>

应用程序类:

@Configuration
@ComponentScan
@IntegrationComponentScan
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) throws IOException {
        try (ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(DemoApplication.class)) {
            System.out.println("Press any key to exit.");
            System.in.read();
        } finally {
            System.out.println("Exited.");
        }

    }

    @Bean
    public ServerRSocketConnector serverRSocketConnector() {
        return new ServerRSocketConnector("localhost", 7000);
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase")
                        .interactionModels(RSocketInteractionModel.requestChannel)
                        .rsocketConnector(serverRSocketConnector)
                )
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

服务器启动

pom.xml中的依赖项.

server-boot

Dependencies in pom.xml.

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-rsocket</artifactId>
        </dependency>

application.properties

application.properties

spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp

应用程序类.

@SpringBootApplication
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) throws IOException {
        SpringApplication.run(DemoApplication.class, args);
    }

    // see PR: https://github.com/spring-projects/spring-boot/pull/18834
    @Bean
    ServerRSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies) {
        var handler = new ServerRSocketMessageHandler(true);
        handler.setRSocketStrategies(rSocketStrategies);
        return handler;
    }

    @Bean
    public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
        return new ServerRSocketConnector(serverRSocketMessageHandler);
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
        return IntegrationFlows
                .from(RSockets.inboundGateway("/uppercase")
                        .interactionModels(RSocketInteractionModel.requestChannel)
                        .rsocketConnector(serverRSocketConnector)
                )
                .<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
                .get();
    }

}

server-boot-messagemapping

pom.xml中的依赖项.

server-boot-messagemapping

Dependencies in pom.xml.

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

application.properties.

The application.properties.

spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp

应用程序类.

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@Controller
class UpperCaseHandler {

    @MessageMapping("/uppercase")
    public Flux<String> uppercase(Flux<String> input) {
        return input.map(String::toUpperCase);
    }
}

客户端

在客户端中, pom.xml 中的依赖项类似于.

client

In the client, the dependencies in the pom.xml is like.


<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
</dependency>

应用程序类:


@SpringBootApplication
@EnableIntegration
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public ClientRSocketConnector clientRSocketConnector() {
        ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", 7000);
        clientRSocketConnector.setAutoStartup(false);
        return clientRSocketConnector;
    }

    @Bean
    public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
        return IntegrationFlows
                .from(Function.class)
                .handle(RSockets.outboundGateway("/uppercase")
                        .interactionModel((message) -> RSocketInteractionModel.requestChannel)
                        .expectedResponseType("T(java.lang.String)")
                        .clientRSocketConnector(clientRSocketConnector))
                .get();
    }
}

@RestController
class HelloController {

    @Autowired()
    @Lazy
    @Qualifier("rsocketUpperCaseRequestFlow.gateway")
    private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
    }
}

在运行客户端和服务器应用程序时,请尝试通过curl访问http://localhost:8080/hello.

When running the client and server application, and try to access the http://localhost:8080/hello by curl.

使用 server server-boot (使用InboundGateway处理消息)时,输出看起来像这样.

When using server and server-boot which uses InboundGateway to handle messages, the output looks like this.

curl http://localhost:8080/hello

data:ABCD

使用 server-boot-messagemapping 时,输出如我所愿:

When using server-boot-messagemapping, the output is woking as I expected:

data:A
data:B
data:C
data:D

客户端请求者

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>

应用程序类:

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

@RestController
class HelloController {
    Mono<RSocketRequester> requesterMono;

    public HelloController(RSocketRequester.Builder builder) {
        this.requesterMono = builder.connectTcp("localhost", 7000);
    }

    @GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> uppercase() {
        return requesterMono.flatMapMany(
                rSocketRequester -> rSocketRequester.route("/uppercase")
                        .data(Flux.just("a", "b", "c", "d"))
                        .retrieveFlux(String.class)
        );
    }
}

在运行此客户端和3台服务器时,请尝试通过curl访问http://localhost:8080/hello.

When running this client and the 3 servers, and try to access the http://localhost:8080/hello by curl.

在使用 server server-boot (使用InboundGateway来处理消息)时,它将引发类强制转换异常.

When using server and server-boot which uses InboundGateway to handle messages, it throws a class cast exception.

使用 server-boot-messagemapping 时,输出如我所愿:

When using server-boot-messagemapping, the output is woking as I expected:

data:A
data:B
data:C
data:D

我不知道InboundGateway和OutboundGateway的配置问题在哪里?

I do not know where is the problem of the configuration of InboundGateway and OutboundGateway?

推荐答案

感谢您提供如此详细的示例!

Thank you for such a detailed sample!

所以,我所看到的.两种客户端(普通RSocketRequester和Spring Integration)都可以与普通的RSocket服务器一起很好地工作.

So, what I see. Both clients (plain RSocketRequester and Spring Integration) work well with plain RSocket server.

要使其与Spring Integration Server一起使用,您必须进行以下更改:

To make them working with Spring Integration server you have to do this changes:

  1. 服务器端:

.requestElementType(ResolvableType.forClass(String.class))添加到RSockets.inboundGateway()定义中,这样它将知道如何转换传入的有效负载.

Add .requestElementType(ResolvableType.forClass(String.class)) into an RSockets.inboundGateway() definition, so it will know to what to convert an incoming payloads.

  1. 客户端:

  1. The client side:

.data(Flux.just("a\n", "b\n", "c\n", "d\n")).

当前,Spring Integration的服务器端不会将传入的Flux视为独立的有效负载流.因此,我们尝试将所有这些都连接到一个值中. 新的行定界符是我们期望独立值的指标. Spring Messaging的功能恰好相反:它检查multi-value期望的类型,并在其map()中解码传入的Flux中的每个元素,而不是尝试整个Publisher解码.

Currently the server side of Spring Integration doesn't treat an incoming Flux as a stream of independent payloads. So, we try to connect all of them into a single value. The new line delimiter is an indicator that we expect independent values. Spring Messaging on its side does exactly opposite: it checks for multi-value expected type and decode every element in the incoming Flux in its map() instead of an attempt for the whole Publisher decoding.

这将是一个重大突破,但可能需要考虑修复RSocketInboundGateway逻辑以使其与常规的@MessageMapping一致,以支持RSocket.随时提出GH问题!

It's going to be kinda breaking change, but possibly need to consider to fix RSocketInboundGateway logic to be consistent with regular @MessageMapping for RSocket support. Feel free to raise a GH issue!

这篇关于Spring Integraton RSocket和Spring RSocket交互问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆