如何使用 vert.x-rx 创建反应式客户端 - 服务器 TCP 通信 [英] How to use vert.x-rx to create reactive client-server TCP communication

查看:64
本文介绍了如何使用 vert.x-rx 创建反应式客户端 - 服务器 TCP 通信的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在从事一个项目,该项目需要在外部系统和我将编写的应用程序之间进行 TCP 通信(用 Java).众所周知,这可以使用常规 NIO 轻松实现.但是,作为我正在进行的这个新项目的一部分,我必须使用 Vert.x 来提供 TCP 通信.请参考下图:

I'm currently working on a project which requires TCP communication between an external system and an application which I will write (in Java). As we all know, this can easily be achieved using regular NIO. However, as part of this new project I'm working on, I have to use Vert.x to provide the TCP communication. Please refer to the image below:

在右侧,我的应用程序作为 TCP 服务器运行,在左侧等待来自外部系统的连接.我已经阅读了创建 TCP 并侦听连接的内容,您只需执行以下操作:

On the right, I have my application which runs as a TCP server waiting for a connection from the external system, on the left. I've read that to create a TCP and listen for connections you simple do something like this:

NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
  if (res.succeeded()) {
    System.out.println("Server is now listening!");
  } else {
    System.out.println("Failed to bind!");
  }
});

然而,当外部系统连接到我的应用程序并通过 TCP 发送 EchoRequestMessages 时,我无法弄清楚如何处理.我的应用程序必须获取接收到的字节缓冲区,将其解码为 EchoRequestMessage POJO,然后将 EchoResponseMessage 编码为字节缓冲区以发送回外部系统.

However, the bit I can't figure out is how to handle when the external system connects to my application and sends EchoRequestMessages via TCP. My application has to take the received Buffer of bytes, decode it into a EchoRequestMessage POJO and then encode the EchoResponseMessage into a Buffer of bytes to send back to the external system.

我如何使用 vert.x-rx 对 EchoRequestMessage 的接收、其解码、EchoResponseMessage 的编码执行反应式编程,然后将其发送回外部系统,所有这些都在一个构建器模式类型设置中.我读过 Observables 和订阅,但我不知道要观察什么或订阅什么.任何帮助将不胜感激.

How do i use vert.x-rx to perform reactive programming of the receipt of the EchoRequestMessage, its decoding, the encoding of the EchoResponseMessage, and then sending that back to the external system, all in one builder pattern type setup. I've read about Observables and subscribing, but i can't figure out what to observe or what to subscribe to. Any help would be greatly appreciated.

推荐答案

要从套接字读取数据,您可以使用 RecordParser.在套接字连接上,数据通常由换行符分隔:

To read data from the socket you can use a RecordParser. On a socket connection, data is often separated by newline characters:

RecordParser parser = RecordParser.newDelimited("\n", sock);

一个 RecordParser 是一个 Vert.x ReadStream 所以它可以被转换成一个 Flowable:

A RecordParser is a Vert.x ReadStream so it can be transformed into a Flowable:

FlowableHelper.toFlowable(parser)

现在,如果可以从 Buffer 创建 EchoRequestMessage:

Now if an EchoRequestMessage can be created from a Buffer:

public class EchoRequestMessage {
  private String message;

  public static EchoRequestMessage fromBuffer(Buffer buffer) {
    // Deserialize
  }

  public String getMessage() {
    return message;
   }
 }

并且将 EchoResponseMessage 转换为 Buffer:

public class EchoResponseMessage {
  private final String message;

  public EchoResponseMessage(String message) {
    this.message = message;
  }

  public Buffer toBuffer() {
    // Serialize;
  }
}

您可以使用 RxJava 运算符来实现回显服务器流程:

You can use RxJava operators to implement an echo server flow:

vertx.createNetServer().connectHandler(sock -> {

  RecordParser parser = RecordParser.newDelimited("\n", sock);

  FlowableHelper.toFlowable(parser)
    .map(EchoRequestMessage::fromBuffer)
    .map(echoRequestMessage -> {
      return new EchoResponseMessage(echoRequestMessage.getMessage());
    })
    .subscribe(echoResponseMessage -> sock.write(echoResponseMessage.toBuffer()).write("\n"), throwable -> {
      throwable.printStackTrace();
      sock.close();
    }, sock::close);

}).listen(1234);

如果在您的协议中消息不是行分隔而是长度前缀,那么您可以创建自定义 ReadStream:

If in your protocol messages are not line separated but length-prefixed, then you can create a custom ReadStream:

class LengthPrefixedStream implements ReadStream<Buffer> {
  final RecordParser recordParser;
  boolean prefix = false;

  private LengthPrefixedStream(ReadStream<Buffer> stream) {
    recordParser = RecordParser.newFixed(4, stream);
  }

  @Override
  public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
    recordParser.exceptionHandler(handler);
    return this;
  }

  @Override
  public ReadStream<Buffer> handler(Handler<Buffer> handler) {
    if (handler == null) {
      recordParser.handler(null);
      return this;
    }
    recordParser.handler(buffer -> {
      if (prefix) {
        prefix = false;
        recordParser.fixedSizeMode(buffer.getInt(0));
      } else {
        prefix = true;
        recordParser.fixedSizeMode(4);
        handler.handle(buffer);
      }
    });
    return this;
  }

  @Override
  public ReadStream<Buffer> pause() {
    recordParser.pause();
    return this;
  }

  @Override
  public ReadStream<Buffer> resume() {
    recordParser.resume();
    return this;
  }

  @Override
  public ReadStream<Buffer> endHandler(Handler<Void> endHandler) {
    recordParser.endHandler(endHandler);
    return this;
  }
}

并将其转换为Flowable:

FlowableHelper.toFlowable(new LengthPrefixedStream(sock))

这篇关于如何使用 vert.x-rx 创建反应式客户端 - 服务器 TCP 通信的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆