RxJava - 终止无限流 [英] RxJava -- Terminating Infinite Streams
问题描述
我正在探索反应式编程和RxJava。这很有趣,但我遇到了一个无法找到答案的问题。我的基本问题:什么是一种反应性的方法来终止一个无限运行的Observable?我也欢迎关于我的代码的批评和反应性最佳实践。
I am exploring reactive programming and RxJava. It is fun, but I am stuck on a problem for which I cannot find an answer. My basic question: what is a reactive-appropriate way to terminate an otherwise infinitely-running Observable? I also welcome critiques and reactive best practices regarding my code.
作为练习,我正在编写一个日志文件尾部实用程序。日志文件中的行流由 Observable< String>
表示。要获得 BufferedReader
继续阅读添加到文件中的文本,我忽略了通常的 reader.readLine()== null
终止检查,而不是将其解释为我的线程应该睡眠并等待更多记录器文本。
As an exercise, I am writing a log file tail utility. The stream of lines in the log file is represented by an Observable<String>
. To get the BufferedReader
to continue reading text that is added to the file, I ignore the usual reader.readLine() == null
termination check and instead interpret it to mean that my thread should sleep and wait for more logger text.
但是我可以使用<$ c $终止Observer c> takeUntil ,我需要找到一种干净的方法来终止无限运行的文件观察器。我可以编写自己的 terminateWatcher
方法/字段,但这会破坏Observable / Observer封装 - 我希望尽可能严格遵守反应范式。
But while I can terminate the Observer using takeUntil
, I need to find a clean way to terminate the otherwise infinitely-running file watcher. I can write my own terminateWatcher
method/field, but that breaks the Observable/Observer encapsulation -- and I'd like to stay as strict to the reactive paradigm as possible.
这是 Observable< String>
代码:
public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;
@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) != null)
observer.onNext(newLine);
else
try {
// Wait for more text
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return null; // Not sure what Subscription I should return
}
}
这是Observer代码在它们到来时打印新行:
Here is the Observer code that prints the new lines as they come:
public static void main(String... args) {
. . .
Observable<String> lines = Observable.create(createWatcher(file));
lines = lines.takeWhile(new Func1<String, Boolean>() {
@Override
public Boolean call(String line) {
// Predicate for which to continue processing
return !line.contains("shutdown");
}
}).subscribeOn(Schedulers.threadPoolForIO())
.observeOn(Schedulers.currentThread());
// Seems like I should use subscribeOn() and observeOn(), but they
// make my tailer terminate without reading any text.
Subscription subscription = lines.subscribe(new Action1<String>() {
@Override
public void call(String line) {
System.out.printf("%20s\t%s\n", file, line);
}
});
}
我的两个问题是:
- 什么是终止无限运行流的被动一致方法?
- 我的代码中有哪些其他错误让你哭? :)
推荐答案
由于您在请求订阅时正在启动文件,因此它会生成在订阅结束时,即订阅
关闭时终止它的意义。这解决了代码注释中的一个松散结束:您返回订阅
,它告诉 FileWatcher
停止观看文件。然后替换你的循环条件,以便检查订阅是否已被取消,而不是检查当前线程是否已被中断。
Since you are initiating the file watching when the subscription is requested, it makes sense to terminate it when the subscription ends, i.e., when the Subscription
is closed. This addresses one of the loose ends in your code comments: you return a Subscription
which tells the FileWatcher
to stop watching the file. Then replace your loop condition such that it checks whether the subscription has been canceled instead of checking whether the current thread has been interrupted.
问题是,这不是很有帮助如果您的 onSubscribe()
方法永远不会返回。也许您的 FileWatcher
应该要求指定一个调度程序,并在该调度程序上进行读取。
Problem is, that isn't very helpful if your onSubscribe()
method never returns. Perhaps your FileWatcher
should require a scheduler to be specified, with the reading occurring on that scheduler.
这篇关于RxJava - 终止无限流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!