Subscribewith与RxJava2(Android)中的订阅吗? [英] Subscribewith Vs subscribe in RxJava2(Android)?

查看:55
本文介绍了Subscribewith与RxJava2(Android)中的订阅吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

什么时候调用subscribeWith方法而不是普通订阅?用例是什么?

  compositeDisposable.add(get().observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io()).subscribe(this :: handleResponse,this :: handleError)); 

VS

  CompositeDisposable.add(get().observeOn(AndroidSchedulers.mainThread()).subscribeOn(Schedulers.io())//.subscribe(this :: handleResponse,this :: handleError);.subscribeWith(new DisposableObserver< News>(){@Override public void onNext(News value){handleResponse(value);}@Override public void onError(Throwable e){handleError(e);}@Override public void onComplete(){//放置在这里?为什么?什么时候整个东西会被处理掉//通过compositeDisposable.dispose();在onDestroy();中}})); 

谢谢


稍后添加

根据文档,两个都返回一次性的SingleObserver实例:

  @CheckReturnValue@SchedulerSupport(SchedulerSupport.NONE)公共最终< E扩展SingleObserver< ;?超级TE subscriptionWith(E观察者){订阅(观察者);返回观察员;}@SchedulerSupport(SchedulerSupport.NONE)public final Disposable subscription(final Consumer<?super T> onSuccess,final Consumer<?super Throwable" onError){ObjectHelper.requireNonNull(onSuccess,"onSuccess为null");ObjectHelper.requireNonNull(onError,"onError为null");ConsumerSingleObserver Ts =新的ConsumerSingleObserver T(onSuccess,onError);订阅;返回s;} 

ConsumerSingleObserver类在哪里实现SingleObserver和Disposable.

解决方案

Observable#subscribe解释:

在您的第一个代码段中:

.subscribe(this :: handleResponse,this :: handleError));

您实际上正在使用以下几种重载的 Observable#subscribe 方法之一:

  public final一次性订阅(Consumer<?super T> onNext,Consumer<?super Throwable> onError) 

还有另一种方法也需要执行 Action 来执行onComplete:

  public final Disposable subscription(Consumer<?super T> onNext,Consumer<?super Throwable> onError,Action onComplete){ 

另一个选项允许您简单地传入 Observer (注意:void方法) (编辑2-此方法在 ObservableSource中定义,这是 Observable 扩展的接口.)

  public最终的void subscription(Observer<?super T>观察者) 

在问题的第二个代码段中,您使用了 subscribeWith 方法,该方法仅返回您传入的 Observer (为了方便/缓存等):

 公共最终< E扩展了Observer< ;?超级TE subscriptionWith(E观察者) 

Observer#onComplete说明:

在Observable发出流中的所有项目之后,调用

Observer#onComplete.来自Java文档:

 /***通知观察者{@link Observable}已完成基于推送的通知的发送.*< p>*如果{@link Observable}调用{@link #onError},则不会调用此方法.*/void onComplete(); 

例如,如果您的代码段中的 get()返回了发出多个 News 对象的 Observable ,则每个对象都会是在 Observer#onNext 中处理.在这里您可以处理每个项目.

在处理完所有内容(并假设未发生错误)之后,将调用 onComplete .在这里,您知道已经处理了所有 News 对象,就可以执行所需的任何其他操作(例如,更新UI).

这不要与 Disposable#dispose 混淆,后者在可观察流结束(完成/错误)时被调用,或者由您手动终止观察(在出现CompositeDisposable ,是因为它可以帮助您一次性处置其中包含的所有 Disposable .)

如果在您的方案中, get()将返回仅发出单个项目的 Observable ,则不使用 Observable ,考虑使用 io.reactivex.Single ,您只需处理一项(在 onSuccess 中),而无需指定 Action for onComplete:)

编辑:回复您的评论:

但是我仍然不使用subscribeWith,您说它通过了缓存等的观察者,它将传递到哪里?完成吗?和据我了解,subscribeWith实际上并没有消耗可以观察到(或单个)吧?

为了进一步阐明 subscribeWith 的解释,我的意思是它消耗掉传递给 Observer 对象subscriptionWith (与 subscribe 方法完全一样),但是它还会另外将相同的Observer返回给您.在撰写本文时,subscribeWith的实现为:

 公共最终< E扩展了Observer< ;?超级TE subscriptionWith(E观察者){订阅(观察者);返回观察员;} 

因此, subscribeWith 可以与 subscribe 互换使用.

您可以举一个用例的SubscribeWith的用例吗?我猜可能是将完全回答问题

subscribeWith javadoc提供以下用法示例:

  Observable< Integer>来源= Observable.range(1,10);CompositeDisposable Composite = new CompositeDisposable();ResourceObserver< Integer>rs =新的ResourceObserver<>(){//...};Composite.add(source.subscribeWith(rs)); 

请参见此处 subscribeWith 的用法将返回与实例化相同的 ResourceObserver 对象.这提供了执行订阅和共享的便利.在一行中将 ResourceObserver 添加到 CompositeDisposable 中(请注意, ResourceObservable 实现了 Disposable .)

编辑2 .回复第二条评论.

source.subscribeWith(rs);source.subscribe(rs);都回来了SingleObserver实例,

ObservableSource#subscribe(观察者< ;?超级T>观察者) 返回观察者.这是一个无效方法(请参见上面 Observable#subscribe 解释下的NOTE.)而 Observable#subscribeWith DOES 返回 Observer.如果要改为使用 ObservableSource#subscribe 重写示例用法代码,则必须像这样在两行中完成它:

  source.subscribe(rs);//ObservableSource#subscribe是一个void方法,因此不会返回任何内容Composite.add(rs); 

Observable#subscribeWith 方法使我们只需一行即可方便地完成上述操作 composite.add(source.subscribeWith(rs));

它可能会与所有看起来有些相似的重载订阅方法混淆,但是有一些区别(其中有些是微妙的).查看代码和文档有助于区分它们.


编辑3 subscriptionWith的另一个示例用例

当您具有可能要重用的 Observer 的特定实现时, subscribeWith 方法非常有用.例如,在上面的示例代码中,它在订阅中提供了 ResourceObserver 的特定实现,从而继承了其功能,同时仍然允许您处理onNext onError和onComplete.

另一个示例使用:对于您问题中的示例代码,如果要在多个位置为 get()响应执行相同的订阅怎么办?

与其在不同的类之间复制onNext和onError的 Consumer 实现,还可以为例如定义一个新类.

 //示例代码公共类GetNewsObserver扩展了DisposableObserver< News>{//实现onNext,onError,onComplete.....} 

现在,无论何时执行该 get()请求,您都可以通过以下方式简单地进行订阅:

  compositeDisposable.add(get()....subscribeWith(new GetNewsObserver())); 

现在看到代码很简单,您可以保持处理响应的责任分离,并且现在可以在任意位置重复使用该 GetNewsObserver .

When to call the subscribeWith method rather than plain subscribe? And what is the use case?

compositeDisposable.add(get()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(this::handleResponse, this::handleError));

VS

   compositeDisposable.add(get()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
              //  .subscribe(this::handleResponse, this::handleError);
                .subscribeWith(new DisposableObserver<News>() {
                    @Override public void onNext(News value) {
                        handleResponse(value);
                    }

                    @Override public void onError(Throwable e) {
                        handleError(e);
                    }

                    @Override public void onComplete() {
                       // dispose here ? why? when the whole thing will get disposed later
                       //via  compositeDisposable.dispose();  in onDestroy();
                    }
                }));

Thank you


Added Later

According to the documentation, both return disposable SingleObserver instances:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <E extends SingleObserver<? super T>> E subscribeWith(E observer) {
    subscribe(observer);
    return observer;
}

@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
    ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError);
    subscribe(s);
    return s;
}

Where ConsumerSingleObserver class implements SingleObserver and Disposable.

解决方案

Observable#subscribe explanation:

In your first code snippet:

.subscribe(this::handleResponse, this::handleError));

You are actually using one of the several overloaded Observable#subscribe methods:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)

There is another one that also takes in an Action to perform onComplete:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete) {

And another option allows you to simply pass in an Observer (NOTE: void method) (Edit 2 - this method is defined in ObservableSource, which is the interface that Observable extends.)

public final void subscribe(Observer<? super T> observer)

In the second code snippet in your question, you used the subscribeWith method which simply returns the Observer you passed in (for convenience/caching etc):

public final <E extends Observer<? super T>> E subscribeWith(E observer)

Observer#onComplete explanation:

Observer#onComplete gets called after the Observable has emitted all the items in the stream. From the java doc:

/**
 * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
 * <p>
 * The {@link Observable} will not call this method if it calls {@link #onError}.
 */
void onComplete();

So for example, if the get() in your code snippets returned an Observable that emitted multiple News objects, each one will be handled in the Observer#onNext. Here you can process each item.

After they have all been processed (and assuming no error occured), then the onComplete will get called. Here you can perform any extra actions that you need to do (for eg. update UI) knowing that you've processed all the News objects.

This is not to be confused with Disposable#dispose which gets invoked when the observable stream ends (complete/error), or manually by you to terminate the observation (this is where the CompositeDisposable comes in as it helps you dispose of all your Disposables that it contains at once).

If in your scenario the get() will return an Observable that only emits a single item, then instead of using an Observable, consider using an io.reactivex.Single where you only process the one item (in onSuccess), and won't need to specify an Action for onComplete :)

Edit: response to your comment:

However I still do not get use of subscribeWith, you said it passes the observer for caching etc , where does it pass to? on complete? and from what I understood subscribeWith is not actually consuming the observable( or Single) right?

To further clarify the subscribeWith explanation, what I meant was that it will consume the Observer object that you passed into the subscribeWith (exactly like the subscribe method) however it will additionally return that same Observer right back to you. At time of writing, the implementation of subscribeWith is:

public final <E extends Observer<? super T>> E subscribeWith(E observer) {
    subscribe(observer);
    return observer;
}

Therefore, subscribeWith can be used interchangeably with subscribe.

Can you give a use case of subscribeWith with example? I guess that will answer the question completely

The subscribeWith javadoc gives the following usage example:

Observable<Integer> source = Observable.range(1, 10);
CompositeDisposable composite = new CompositeDisposable();

ResourceObserver<Integer> rs = new ResourceObserver<>() {
     // ...
};

composite.add(source.subscribeWith(rs));

See here the usage of subscribeWith will return that same ResourceObserver object that was instantiated. This gives the convenience of performing the subscription & adding the ResourceObserver to the CompositeDisposable in one line (note that ResourceObservable implements Disposable.)

Edit 2 Replying to second comment.

source.subscribeWith(rs); source.subscribe(rs); both return SingleObserver instance,

ObservableSource#subscribe(Observer <? super T> observer) does NOT return an Observer. It is a void method (See NOTE under the Observable#subscribe explanation above.) Whereas the Observable#subscribeWith DOES return the Observer. If we were to rewrite the example usage code using ObservableSource#subscribe instead, we'd have to do it in two lines like so:

source.subscribe(rs); //ObservableSource#subscribe is a void method so nothing will be returned
composite.add(rs);

Whereas the Observable#subscribeWith method made it convenient for us to do the above in just one line composite.add(source.subscribeWith(rs));

It can get confusing with all the overloaded subscribe methods that look somewhat similar, however there are differences (some of which are subtle). Looking at the code and documentation helps to provide the distinction between them.


Edit 3 Another sample use case for subscribeWith

The subscribeWith method is useful for when you have a specific implementation of an Observer that you may want to reuse. For example, in the sample code above, it provided a specific implementation of ResourceObserver in the subscription, thereby inheriting it's functionality while still allowing you to handle onNext onError and onComplete.

Another example use: for to the sample code in your question, what if you wanted to perform the same subscription for the get() response in multiple places?

Instead of copying the Consumer implementations for onNext and onError across different classes, what you can do instead is define a new class for eg.

//sample code..
public class GetNewsObserver extends DisposableObserver<News> {
    //implement your onNext, onError, onComplete.
    ....
}

Now, whenever you do that get() request, you can simply subscribe by doing:

compositeDisposable.add(get()
    ...
    .subscribeWith(new GetNewsObserver()));

See the code is simple now, you maintain separation of responsibility for handling the response, and can now reuse that GetNewsObserver wherever you want.

这篇关于Subscribewith与RxJava2(Android)中的订阅吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆