领域:使用Clean-Architecture和RxJava2 [英] Realm: working with Clean-Architecture and RxJava2

查看:71
本文介绍了领域:使用Clean-Architecture和RxJava2的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有点背景,我试图将一些干净的架构应用于我的一个项目,但是我在存储库的(Realm)磁盘实现方面遇到了麻烦.我有一个存储库,根据某些条件(缓存)从不同的数据存储中提取一些数据.这是理论,将所有这些与UseCases和RxJava2混合使用时会出现问题.

A bit of context, I’ve tried to apply some clean-architecture to one of my projects and I’m having trouble with the (Realm) disk implementation of my repository. I have a Repository which pulls some data from different DataStores depending on some conditions (cache). This is the theory, the problem comes when mixing all of this with UseCases and RxJava2.

首先,我从Realm获取对象列表,然后手动为其创建Observable. 但是subscribe(如预期的那样)在不同的线程上执行,因此领域最终崩溃了……(第二个代码块)

First I get the list of objects from Realm and then I manually create an Observable of it. But the subscribe (as expected) is executed on a different thread so realm ends up crashing… (second block of code)

这是我用来创建Observable的代码(来自抽象类DiskStoreBase):

This is the code I use to create the Observables (from an abstract class DiskStoreBase):

Observable<List<T>> createListFrom(final List<T> list) {
    return Observable.create(new ObservableOnSubscribe<List<T>>() {
        @Override
        public void subscribe(ObservableEmitter<List<T>> emitter) throws Exception {
            if (list != null) {
                emitter.onNext(list);
                emitter.onComplete();
            } else {
                emitter.onError(new ExceptionCacheNotFound());
            }
        }
    });
}

如何处理这种情况?

DiskStoreForZone的更多代码:

@Override
public Observable<List<ResponseZone>> entityList() {
    Realm realm = Realm.getDefaultInstance();
    List<ResponseZone> result = realm.where(ResponseZone.class).findAll();
    return createListFrom(result);
}

确切的崩溃:

E/REALM_JNI: jni: ThrowingException 8, Realm accessed from incorrect thread.
E/REALM_JNI: Exception has been thrown: Realm accessed from incorrect thread.

推荐答案

它不起作用,因为尽管使用Rx,您的数据层也不会反应.

It doesn't work because despite using Rx, your data layer is not reactive.

Realm本质上是 reactive 数据源,其管理对象本质上也是 mutable (由Realm在适当位置更新)并且线程受限的(只能在打开Realm的同一线程上访问).

Realm by its nature is a reactive datasource, and its managed objects by nature are also mutable (updated in place by Realm), and thread-confined (can only be accessed on the same thread where the Realm was opened).

要使代码正常工作,您需要从Realm中复制数据.

For your code to work, you'd need to copy out the data from the Realm.

@Override
public Single<List<ResponseZone>> entityList() {
    return Single.fromCallable(() -> {
       try(Realm realm = Realm.getDefaultInstance()) {
           return realm.copyFromRealm(realm.where(ResponseZone.class).findAll());
       }
    });
}

我冒昧地将您的Single表示为Single,因为它不是Observable,它不监听更改,只有 1 事件,这就是列表本身.因此,通过ObservableEmitter发送它实际上没有任何意义,因为它不会发出事件.

I took the liberty and represented your Single as a Single, considering it's not an Observable, it does not listen for changes, there is only 1 event and that is the list itself. So sending it through an ObservableEmitter doesn't really make sense as it does not emit events.

因此,这就是为什么我说:您的数据层没有反应性.您不是在听变化.您只是在直接获取数据,并且永远不会收到任何更改的通知.尽管使用了Rx.

Therefore, this is why I said: your data layer is not reactive. You are not listening for changes. You are just obtaining data directly, and you are never notified of any change; despite using Rx.

我用油漆画了些图画来说明我的观点. (蓝色表示副作用)

I drew some pictures in paint to illustrate my point. (blue means side-effects)

在您的情况下,您调用一次性操作以从多个数据源(缓存,本地,远程)中检索数据.一旦获得它,您就不会听更改.从技术上讲,如果您在一个地方和另一个地方编辑数据,则唯一的更新方法是强制缓存手动检索新数据";为此,您必须知道您在其他地方修改了数据.为此,您需要一种方法来直接调用回调或发送消息/事件-更改通知.

in your case, you call a one-off operation to retrieve the data from multiple data-sources (cache, local, remote). Once you obtain it, you don't listen for changes; technically if you edit the data in one place and another place, the only way to update is by "forcing the cache to retrieve the new data manually"; for which you must know that you modified the data somewhere else. For which you need a way to either directly call a callback, or send a message/event - a notification for change.

因此,在某种程度上,您必须创建一个缓存失效通知事件.而且,如果您听这些话,该解决方案可能会再次变得被动.除非您手动执行此操作.

So in a way, you must create a cache invalidation notification event. And if you listen to that, the solution could be reactive again. Except you're doing this manually.

考虑到Realm已经是一个反应性数据源(类似于SQLite的SQLBrite),它能够提供更改通知,通过这些通知,您可以使缓存无效".

Considering Realm is already a reactive data source (similarly to SQLBrite for SQLite), it is able to provide change notifications by which you can "invalidate your cache".

实际上,如果本地数据源是唯一的数据源,并且来自网络的任何写操作都是您所听的更改,那么您的缓存"可以记为replay(1).publish().refCount()(为新的重放最新数据)订户,如果​​评估了新数据,则用新数据替换)( RxReplayingShare .

In fact, if your local data source is the only source of data, and any write from network is a change that you listen to, then your "cache" can be written down as replay(1).publish().refCount() (replay latest data for new subscribers, replace data with new if new data is evaluated) which is RxReplayingShare.

Using a Scheduler created from the looper of a handler thread, you can listen to changes in the Realm on a background thread, creating a reactive data source that returns up-to-date unmanaged copies that you can pass between threads (although mapping directly to immutable domain models is preferred to copyFromRealm() if you choose this route - the route being clean architecture).

return io.reactivex.Observable.create(new ObservableOnSubscribe<List<ResponseZone>>() {
    @Override
    public void subscribe(ObservableEmitter<List<ResponseZone>> emitter)
            throws Exception {
        final Realm observableRealm = Realm.getDefaultInstance();
        final RealmResults<ResponseZone> results = observableRealm.where(ResponseZone.class).findAllAsync();
        final RealmChangeListener<RealmResults<ResponseZone>> listener = results -> {
            if(!emitter.isDisposed()) {
                if(results.isValid() && results.isLoaded()) {
                    emitter.onNext(observableRealm.copyFromRealm(results));
                }
            }
        };

        emitter.setDisposable(Disposables.fromRunnable(() -> {
            if(results.isValid()) {
                results.removeChangeListener(listener);
            }
            observableRealm.close();
        }));
        results.addChangeListener(listener);
        // initial value will be handled by async query
    }
}).subscribeOn(looperScheduler).unsubscribeOn(looperScheduler);

其中looper调度程序的获取方式为

Where looper scheduler is obtained as

    handlerThread = new HandlerThread("LOOPER_SCHEDULER");
    handlerThread.start();
    synchronized(handlerThread) {
        looperScheduler  = AndroidSchedulers.from(handlerThread.getLooper());
    }

这就是您使用Realm创建反应式干净架构的方法.

And that is how you create reactive clean architecture using Realm.

已添加:

仅当您打算在领域上实际实施Clean Architecture时才需要LooperScheduler.这是因为默认情况下,Realm鼓励您使用数据对象作为域模型,并且这样做的好处是可以提供延迟加载的线程局部视图,这些视图在更新时会发生变化.但Clean Architecture表示您应该改用不可变的领域模型(与数据层无关).因此,如果您要创建反应式的干净架构,以便在Realm更改时随时在后台线程上从Realm复制,那么您将需要一个Looper调度程序(或在后台线程上进行观察,但是要从刷新的Realm上进行复制,Schedulers.io()).

The LooperScheduler is only needed if you intend to actually enforce Clean Architecture on Realm. This is because Realm by default encourages you to use your data objects as domain models and as a benefit provides lazy-loaded thread-local views that mutate in place when updated; but Clean Architecture says you should use immutable domain models instead (independent from your data layer). So if you want to create reactive clean architecture where you copy from Realm on a background thread any time when Realm changes, then you'll need a looper scheduler (or observe on a background thread, but do the copying from a refreshed Realm on Schedulers.io()).

对于Realm,通常您希望将RealmObjects用作域模型,并依赖于惰性求值.在这种情况下,您不会使用copyFromRealm(),也不会将RealmResults映射到其他内容.但是您可以将其公开为FlowableLiveData.

With Realm, generally you'd want to use RealmObjects as your domain models, and rely on lazy-evaluation. In that case, you do not use copyFromRealm() and you don't map the RealmResults to something else; but you can expose it as a Flowable or a LiveData.

您可以阅读有关 查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆