ReactorNettyWebSocketClient 使用示例 [英] Examples of use ReactorNettyWebSocketClient

查看:241
本文介绍了ReactorNettyWebSocketClient 使用示例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

New Spring 在 Spring 文档.

New Spring has some WebSocketClient example on Spring documentation.

WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute("ws://localhost:8080/echo"), session -> {... }).blockMillis(5000);

但是很短而且不清楚:

  1. 如何向服务器发送消息(订阅频道)?
  2. 然后处理传入的流并发出 Flux 消息?
  3. 连接中断时重新连接到服务器?

有人可以提供更复杂的例子吗?

Could some one provide more complex example?

更新.我尝试做类似的事情:

UPD. I tried to do something like:

public Flux<String> getStreaming() {

    WebSocketClient client = new ReactorNettyWebSocketClient();
    EmitterProcessor<String> output = EmitterProcessor.create();
    Flux<String> input = Flux.just("{ event: 'subscribe', channel: 'examplpe' }");

    Mono<Void> sessionMono = client.execute(URI.create("ws://api.example.com/"),
            session -> session
                    .send(input.map(session::textMessage))
                    .thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then())
                    .then());

    return output.doOnSubscribe(s -> sessionMono.subscribe());
}

但这只会返回一条消息.就像我没有订阅一样.

But that returns only one message. Like I didnt get subscription.

推荐答案

我假设您正在使用echo"服务.为了从服务中获取一些消息,您必须将它们推送到 websocket 中,服务将回显"它们给您.

I assume you are using an "echo" service. In order to get some messages from the service, you have to push them into the websocket and the service will "echo" them back to you.

在您的示例代码中,您只向 websocket 写入了一个元素.一旦您将更多消息推送到套接字中,您将获得更多回报.

In your example code you are writing only a single element to the websocket. As soon as you push more messages into the socket you will get more back.

我修改了代码以连接到 ws://echo.websocket.org 而不是本地服务.当您浏览到 /stream 时,您会看到每秒都会出现一条新消息.

I adapted the code to connect to ws://echo.websocket.org instead of a local service. When you browse to /stream you see every second a new message appear.

@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getStreaming() throws URISyntaxException {

    Flux<String> input = Flux.<String>generate(sink -> sink.next(String.format("{ message: 'got message', date: '%s' }", new Date())))
        .delayElements(Duration.ofSeconds(1));

    WebSocketClient client = new ReactorNettyWebSocketClient();
    EmitterProcessor<String> output = EmitterProcessor.create();

    Mono<Void> sessionMono = client.execute(URI.create("ws://echo.websocket.org"), session -> session.send(input.map(session::textMessage))
        .thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then()).then());

    return output.doOnSubscribe(s -> sessionMono.subscribe());
}

希望这有帮助...

这篇关于ReactorNettyWebSocketClient 使用示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆