RxJava - 终止无限流 [英] RxJava -- Terminating Infinite Streams

查看:131
本文介绍了RxJava - 终止无限流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在探索反应式编程和RxJava。这很有趣,但我遇到了一个无法找到答案的问题。我的基本问题:什么是一种反应性的方法来终止一个无限运行的Observable?我也欢迎关于我的代码的批评和反应性最佳实践。

I am exploring reactive programming and RxJava. It is fun, but I am stuck on a problem for which I cannot find an answer. My basic question: what is a reactive-appropriate way to terminate an otherwise infinitely-running Observable? I also welcome critiques and reactive best practices regarding my code.

作为练习,我正在编写一个日志文件尾部实用程序。日志文件中的行流由 Observable< String> 表示。要获得 BufferedReader 继续阅读添加到文件中的文本,我忽略了通常的 reader.readLine()== null 终止检查,而不是将其解释为我的线程应该睡眠并等待更多记录器文本。

As an exercise, I am writing a log file tail utility. The stream of lines in the log file is represented by an Observable<String>. To get the BufferedReader to continue reading text that is added to the file, I ignore the usual reader.readLine() == null termination check and instead interpret it to mean that my thread should sleep and wait for more logger text.

但是我可以使用<$ c $终止Observer c> takeUntil ,我需要找到一种干净的方法来终止无限运行的文件观察器。我可以编写自己的 terminateWatcher 方法/字段,但这会破坏Observable / Observer封装 - 我希望尽可能严格遵守反应范式。

But while I can terminate the Observer using takeUntil, I need to find a clean way to terminate the otherwise infinitely-running file watcher. I can write my own terminateWatcher method/field, but that breaks the Observable/Observer encapsulation -- and I'd like to stay as strict to the reactive paradigm as possible.

这是 Observable< String> 代码:

public class FileWatcher implements OnSubscribeFunc<String> {
    private Path path = . . .;

    @Override
    // The <? super String> generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer<? super String> observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) != null)
                    observer.onNext(newLine);
                else
                    try {
                        // Wait for more text
                        Thread.sleep(250);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
            }
            observer.onCompleted();
        } catch (Exception e) {
            observer.onError(e);
        }

        return null;  // Not sure what Subscription I should return
    }
}

这是Observer代码在它们到来时打印新行:

Here is the Observer code that prints the new lines as they come:

public static void main(String... args) {
    . . .
    Observable<String> lines = Observable.create(createWatcher(file));
    lines = lines.takeWhile(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String line) {
            // Predicate for which to continue processing
            return !line.contains("shutdown");
        }
    }).subscribeOn(Schedulers.threadPoolForIO())
            .observeOn(Schedulers.currentThread());
    // Seems like I should use subscribeOn() and observeOn(), but they
    // make my tailer terminate without reading any text.

    Subscription subscription = lines.subscribe(new Action1<String>() {
        @Override
        public void call(String line) {
             System.out.printf("%20s\t%s\n", file, line);
        }
    });
}

我的两个问题是:


  1. 什么是终止无限运行流的被动一致方法?

  2. 我的代码中有哪些其他错误让你哭? :)


推荐答案

由于您在请求订阅时正在启动文件,因此它会生成在订阅结束时,即订阅关闭时终止它的意义。这解决了代码注释中的一个松散结束:您返回订阅,它告诉 FileWatcher 停止观看文件。然后替换你的循环条件,以便检查订阅是否已被取消,而不是检查当前线程是否已被中断。

Since you are initiating the file watching when the subscription is requested, it makes sense to terminate it when the subscription ends, i.e., when the Subscription is closed. This addresses one of the loose ends in your code comments: you return a Subscription which tells the FileWatcher to stop watching the file. Then replace your loop condition such that it checks whether the subscription has been canceled instead of checking whether the current thread has been interrupted.

问题是,这不是很有帮助如果您的 onSubscribe()方法永远不会返回。也许您的 FileWatcher 应该要求指定一个调度程序,并在该调度程序上进行读取。

Problem is, that isn't very helpful if your onSubscribe() method never returns. Perhaps your FileWatcher should require a scheduler to be specified, with the reading occurring on that scheduler.

这篇关于RxJava - 终止无限流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆