RxJava:将一个流(Observable)作为另一个流的输入 [英] RxJava: Feed one stream (Observable) as the input of another stream
问题描述
我仍在学习 RxJava.在另一个流中使用流的最佳方法是什么?还是违背了响应式编程的原则?
I'm still learning RxJava. What is the best way to use a stream within another stream? Or is it against the principles of reactive programming?
我正在尝试编写的一个玩具示例包括一个 TCP 客户端和一个发送回大写输入的服务器.我想从标准输入中获取输入,将其发送到服务器并打印出客户端和服务器接收到的所有内容.
A toy example that I'm trying to write includes a TCP client and a server that sends back capitalized input. I'd like to take input from standard input, send it to the server and print out everything received by both the client and the server.
以下是程序的预期输出:
The following is the expected output from the program:
(User input) apple
Server received: apple
Client received: APPLE
(User input) peach
Server received: peach
Client received: PEACH
我能够使用三个 observables 来实现这一点:
I was able to achieve this using three observables:
stdinStream
从标准输入中发出字符串,serverStream
发送服务器接收到的字符串clientStream
发出客户端接收到的字符串.
stdinStream
that emits strings from the standard input,serverStream
that emits strings the server receivesclientStream
that emits strings the client receives.
然后从clientStream
的创建中订阅inputStream
,如下所示:
and then subscribe the inputStream
from within the creation of clientStream
, like so:
private Observable<String> createClientStream(String host, int port, Observable<String> inputStream) {
return Observable.create(sub -> {
try (Socket socket = new Socket(host, port);
BufferedReader inFromServer = new BufferedReader(new InputStreamReader(socket.getInputStream()));
DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
PrintWriter outWriter = new PrintWriter(outputStream, true);
) {
inputStream.subscribe(line -> {
outWriter.println(line);
try {
sub.onNext(inFromServer.readLine());
} catch (IOException e) {
sub.onError(e);
}
});
} catch (UnknownHostException e) {
sub.onError(e);
} catch (IOException e) {
sub.onError(e);
}
});
}
注意:我不想创建多个客户端,而是希望保持单个客户端运行并指示它根据输入向服务器发送不同的值.因此,不需要将输入映射到新的 clientStream
的方法:
Note: I don't want to create multiple clients and would rather keep a single client running and instruct it to send different values to the server based on the input. So, the approach of mapping the input to a new clientStream
is NOT desired:
stdinStream.map(line -> createClientStream(line))
所以我的问题是:
- 这是使用 RxJava 的明智方式吗?有没有更好的选择?
- 我在创建
clientStream
的过程中创建了客户端套接字.我这样做是为了我可以轻松地使用调度程序异步运行它,clientStream.scheduleOn(Schedulers.newThread)
.鉴于我的单一客户要求,也许我应该采取不同的方式?
- Is this a sane way to use RxJava? Are there better alternatives?
- I created client socket as part of the creation of
clientStream
. I did this so that I can easily run it asynchronously using schedulers,clientStream.scheduleOn(Schedulers.newThread)
. Maybe I should do it differently given my single-client requirement?
完整代码如下:https://gist.github.com/lintonye/25af58abdfcc688ad3c3
推荐答案
您需要的是使用
.将所有与套接字相关的对象放入一个 Connection
类中,并给定输入序列,将其映射到一对 println/readLine,同时保持单个连接.这是一个可运行示例的要点.
What you need is using
. Put all the socket related objects into a Connection
class and given the input sequence, map it to a pair of println/readLine while maintaining a single connection. Here is a gist for a runnable example.
static class Connection {
Socket socket;
BufferedReader inFromServer;
DataOutputStream outputStream;
PrintWriter outWriter;
public Connection(String host, int port) {
try {
socket = new Socket(host, port);
inFromServer = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
outputStream = new DataOutputStream(socket.getOutputStream());
outWriter = new PrintWriter(outputStream, true);
} catch (IOException ex) {
Exceptions.propagate(ex);
}
}
public void close() {
try {
outWriter.close();
outputStream.close();
inFromServer.close();
socket.close();
} catch (IOException ex) {
Exceptions.propagate(ex);
}
}
}
public static void main(String[] args) {
runServer();
Observable<String> source = Observable.just("a", "b", "c");
String host = "localhost";
int port = 8080;
Observable.<String, Connection>using(() -> new Connection(host, port),
conn ->
source
.map(v -> {
conn.outWriter.println(v);
try {
return conn.inFromServer.readLine();
} catch (IOException ex) {
throw Exceptions.propagate(ex);
}
})
, Connection::close)
.subscribe(System.out::println);
}
这篇关于RxJava:将一个流(Observable)作为另一个流的输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!