RxJava:将一个流(Observable)作为另一个流的输入 [英] RxJava: Feed one stream (Observable) as the input of another stream

查看:46
本文介绍了RxJava:将一个流(Observable)作为另一个流的输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我仍在学习 RxJava.在另一个流中使用流的最佳方法是什么?还是违背了响应式编程的原则?

I'm still learning RxJava. What is the best way to use a stream within another stream? Or is it against the principles of reactive programming?

我正在尝试编写的一个玩具示例包括一个 TCP 客户端和一个发送回大写输入的服务器.我想从标准输入中获取输入,将其发送到服务器并打印出客户端和服务器接收到的所有内容.

A toy example that I'm trying to write includes a TCP client and a server that sends back capitalized input. I'd like to take input from standard input, send it to the server and print out everything received by both the client and the server.

以下是程序的预期输出:

The following is the expected output from the program:

(User input) apple
Server received: apple
Client received: APPLE
(User input) peach
Server received: peach
Client received: PEACH

我能够使用三个 observables 来实现这一点:

I was able to achieve this using three observables:

  • stdinStream 从标准输入中发出字符串,
  • serverStream 发送服务器接收到的字符串
  • clientStream 发出客户端接收到的字符串.
  • stdinStream that emits strings from the standard input,
  • serverStream that emits strings the server receives
  • clientStream that emits strings the client receives.

然后从clientStream的创建中订阅inputStream,如下所示:

and then subscribe the inputStream from within the creation of clientStream, like so:

    private Observable<String> createClientStream(String host, int port, Observable<String> inputStream) {
    return Observable.create(sub -> {
        try (Socket socket = new Socket(host, port);
             BufferedReader inFromServer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
             DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
             PrintWriter outWriter = new PrintWriter(outputStream, true);
        ) {
            inputStream.subscribe(line -> {
                outWriter.println(line);
                try {
                    sub.onNext(inFromServer.readLine());
                } catch (IOException e) {
                    sub.onError(e);
                }
            });
        } catch (UnknownHostException e) {
            sub.onError(e);
        } catch (IOException e) {
            sub.onError(e);
        }
    });
}

注意:我不想创建多个客户端,而是希望保持单个客户端运行并指示它根据输入向服务器发送不同的值.因此,不需要将输入映射到新的 clientStream 的方法:

Note: I don't want to create multiple clients and would rather keep a single client running and instruct it to send different values to the server based on the input. So, the approach of mapping the input to a new clientStream is NOT desired:

stdinStream.map(line -> createClientStream(line))

所以我的问题是:

  1. 这是使用 RxJava 的明智方式吗?有没有更好的选择?
  2. 我在创建 clientStream 的过程中创建了客户端套接字.我这样做是为了我可以轻松地使用调度程序异步运行它,clientStream.scheduleOn(Schedulers.newThread).鉴于我的单一客户要求,也许我应该采取不同的方式?
  1. Is this a sane way to use RxJava? Are there better alternatives?
  2. I created client socket as part of the creation of clientStream. I did this so that I can easily run it asynchronously using schedulers, clientStream.scheduleOn(Schedulers.newThread). Maybe I should do it differently given my single-client requirement?

完整代码如下:https://gist.github.com/lintonye/25af58abdfcc688ad3c3

推荐答案

您需要的是使用.将所有与套接字相关的对象放入一个 Connection 类中,并给定输入序列,将其映射到一对 println/readLine,同时保持单个连接.这是一个可运行示例的要点.

What you need is using. Put all the socket related objects into a Connection class and given the input sequence, map it to a pair of println/readLine while maintaining a single connection. Here is a gist for a runnable example.

static class Connection {
    Socket socket;
    BufferedReader inFromServer;
    DataOutputStream outputStream;
    PrintWriter outWriter;

    public Connection(String host, int port) {
        try {
            socket = new Socket(host, port);
            inFromServer = new BufferedReader(
                new InputStreamReader(socket.getInputStream()));
            outputStream = new DataOutputStream(socket.getOutputStream());
            outWriter = new PrintWriter(outputStream, true);
        } catch (IOException ex) {
            Exceptions.propagate(ex);
        }
    }

    public void close() {
        try {
            outWriter.close();
            outputStream.close();
            inFromServer.close();
            socket.close();
        } catch (IOException ex) {
            Exceptions.propagate(ex);
        }
    }
}

public static void main(String[] args) {
    runServer();

    Observable<String> source = Observable.just("a", "b", "c");

    String host = "localhost";
    int port = 8080;

    Observable.<String, Connection>using(() -> new Connection(host, port), 
    conn -> 
        source
        .map(v -> {
            conn.outWriter.println(v);
            try {
                return conn.inFromServer.readLine();
            } catch (IOException ex) {
                throw Exceptions.propagate(ex);
            }
        })
    , Connection::close)
    .subscribe(System.out::println);
}

这篇关于RxJava:将一个流(Observable)作为另一个流的输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆