在不使用Observable.create的情况下创建Observable [英] Creating Observable without using Observable.create

查看:381
本文介绍了在不使用Observable.create的情况下创建Observable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Android应用中使用RxJava,我想从数据库中加载数据.

I am using RxJava in my Android app and I want to load data from the database.

通过这种方式,我使用Observable.create()创建了一个新的Observable,它返回了EventLog

In this way, I am creating a new Observable using Observable.create() which returns a list of EventLog

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.create(new Observable.OnSubscribe<List<EventLog>>() {
        @Override
        public void call(Subscriber<? super List<EventLog>> subscriber) {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            subscriber.onNext(eventLogs);
        }
    });
}

尽管它可以正常工作,但我读到使用Observable.create()实际上不是Rx Java的最佳实践(请参见

Though it works correctly, I read that using Observable.create() is not actually a best practice for Rx Java (see here).

所以我以这种方式改变了这种方法.

So I changed this method in this way.

public Observable<List<EventLog>> loadEventLogs() {
    return Observable.fromCallable(new Func0<List<EventLog>>() {
        @Override
        public List<EventLog> call() {
            List<DBEventLog> logs = new Select().from(DBEventLog.class).execute();
            List<EventLog> eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            return eventLogs;
        }
    });
}

这是使用Rx Java更好的方法吗?为什么?这两种方法实际上有什么区别?

Is this a better approach using Rx Java? Why? What is actually the difference among the two methods?

此外,由于数据库加载了元素列表,因此一次发出整个列表是否有意义?还是应该一次发射一件物品?

Moreover, since the database load a list of elements, makes sense to emit the entire list at once? Or should I emit one item at a time?

推荐答案

这两种方法可能看起来相似并且行为相似,但是fromCallable为您解决了背压的难题,而create版本却没有.在OnSubscribe实现中处理背压的范围从简单到彻底的融化.但是,如果省略,您可能会沿异步边界(例如observeOn)甚至在连续边界(例如concat)上得到MissingBackpressureException.

The two methods may look like similar and behave similar but fromCallable deals with the difficulties of backpressure for you whereas the create version does not. Dealing with backpressure inside an OnSubscribe implementation ranges from simple to outright mind-melting; however, if omitted, you may get MissingBackpressureExceptions along asynchronous boundaries (such as observeOn) or even on continuation boundaries (such as concat).

RxJava试图为尽可能多的工厂和操作员提供适当的背压支持,但是,有很多工厂和操作员不支持它.

RxJava tries to offer proper backpressure support for as much factories and operators as possible, however, there are quite a few factories and operators that can't support it.

手动执行OnSubscribe的第二个问题是缺少取消支持,尤其是在生成大量onNext调用时.其中许多可以用标准的工厂方法(例如from)或帮助程序类(例如SyncOnSubscribe)代替,这些方法可以为您解决所有的复杂性.

The second problem with manual OnSubscribe implementation is the lack of cancellation support, especially if you generate a lot of onNext calls. Many of these can be replaced by standard factory methods (such as from) or helper classes (such as SyncOnSubscribe) that deal with all the complexity for you.

您可能会发现很多介绍和示例(仍然)使用create,这有两个原因.

You may find a lot of introductions and examples that (still) use create for two reasons.

  1. 通过显示事件推送如何以命令方式工作,引入基于推送的数据流要容易得多.我认为,这些资源按比例花在create上的时间过多,而不是谈论标准的工厂方法并显示如何安全地完成某些常见任务(例如您的任务).
  2. 其中许多示例是在RxJava不需要背压支持甚至不需要适当的同步取消支持时创建的,或者只是从Rx.NET示例移植而来(到目前为止,该示例不支持背压,并且同步取消在某种程度上可以正常工作)我猜想是C#.)那时,通过调用onNext生成值是无忧的.但是,这样的使用确实会导致缓冲区膨胀和过多的内存使用,因此,Netflix团队提出了一种通过要求观察者声明他们愿意进行多少项操作来限制内存使用的方法.这就是所谓的背压.
  1. It is much easier to introduce push-based datastreams by showing how the push of events work in an imperative fashion. In my opinion, such sources spend too much time with create proportionally instead of talking about the standard factory methods and showing how certain common tasks (such as yours) can be achieved safely.
  2. Many of these examples were created the time RxJava didn't require backpressure support or even proper synchronous cancellation support or were just ported from the Rx.NET examples (which to date doesn't support backpressure and synchronous cancellation works somehow, courtesy of C# I guess.) Generating values by calling onNext was worry-free back then. However, such use does lead to buffer bloat and excessive memory usage, therefore, the Netflix team came up with a way of limiting the memory usage by requiring observers to state how many items are they willing to proceed. This became known as backpressure.

对于第二个问题,即是否应该创建一个List或一系列值,这取决于您的来源.如果您的源支持某种形式的单个数据元素的迭代或流式传输(例如JDBC),则可以将其挂钩并逐个发出(请参见SyncOnSubscribe).如果它不支持它,或者无论如何您都需要以列表形式使用它,那么请保持它原样.如有必要,您始终可以通过toListflatMapIterable在两种形式之间进行转换.

For the second question, namely if one should create a List or a sequence of values, it depends on your source. If your source supports some kind of iteration or streaming of individual data elements (such as JDBC), you can just hook onto it and emit one by one (see SyncOnSubscribe). If it doesn't support it or you need it in List form anyway, then keep it as it is. You can always convert between the two forms via toList and flatMapIterable if necessary.

这篇关于在不使用Observable.create的情况下创建Observable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆