Android Realm + RxJava-来自错误线程的领域访问.只能在创建对象的线程上访问领域对象 [英] Android Realm + RxJava - Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created

查看:70
本文介绍了Android Realm + RxJava-来自错误线程的领域访问.只能在创建对象的线程上访问领域对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现RxJava + Realm + Retrofit + Repository Pattern

这是我的本地实现:

@Override
public Observable<Page> search(@NonNull final String query) {

        return Realm.getDefaultInstance().where(Page.class)
                .equalTo("query", query)
                .findAll()
                .asObservable()
                .cast(Page.class);
    }

这是我的远程实现:

 @Override
 public Observable<Page> search(@NonNull String query) {
        return mWikiServices.search(query).map(new Func1<Result, Page>() {
            @Override
            public Page call(Result result) {
                final List<Page> pages = new ArrayList<>(result.getQuery().getPages().values());
                return pages.get(0);
            }
        });
    }

这是我的回购实现:

 final Observable<Page> localResult = mSearchLocalDataSource.search(query);
 final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
                .doOnNext(new Action1<Page>() {
                    @Override
                    public void call(Page page) {
                        //mSearchLocalDataSource.save(query, page);
                        //mResultCache.put(query, page);
                    }
                });

        return Observable.concat(localResult, remoteResult)
                .first()
                .doOnError(new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });

最后是我在主持人中的订阅.

final Subscription subscription = mSearchRepository.search(this.mQuery)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Page>() {
                    @Override
                    public void onCompleted() {
                        // Completed
                    }

                    @Override
                    public void onError(Throwable e) {
                        mView.onDefaultMessage(e.getMessage());
                    }

                    @Override
                    public void onNext(Page page) {
                        mView.onDefaultMessage(page.getContent());
                    }
                });

        mCompositeSubscription.add(subscription);

当我运行代码时,出现以下异常:来自错误线程的领域访问.只能在创建对象的线程上访问领域对象.

我在Realm Github存储库中尝试了官方解决方案,但没有一个起作用.仍然会遇到此异常.

我认为我收到此异常是因为我正在订阅io线程.在主线程中创建了Realm实例.所以我得到了这个例外.

有实施优惠吗?

谢谢.

解决方案

经过长期研究,我找到了解决方案.

首先让我们记住问题所在:当我订阅Schedulars.io线程并尝试从领域或更新中获取数据时,我得到来自错误线程的领域访问.领域对象只能在它们创建的线程上访问"

在这种情况下的主要问题是我在主线程中创建了Realm实例,但是尝试从辅助线程访问它.

我们可以在主线程上订阅领域,但这不是一个好习惯.当我使用

realm.where("query",query).findFirstAsync().asObservable();

以Github存储库为例,我陷入

Observable.concat(localResult, remoteResult).first();

我的解决方案是什么?

在我们的存储库实现中,仍然有两个远程和本地可观察对象,如下所示:

final Observable<Page> localResult = mSearchLocalDataSource.search(query);
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
                .doOnNext(new Action1<Page>() {
                    @Override
                    public void call(Page page) {
                        if (page != null) {
                            mSearchLocalDataSource.save(query, page);
                            mResultCache.put(query, page);
                        }
                    }
                });

请注意,当我们从远程获取数据时,我们将其保存到数据并缓存在内存中.

如果我们无法从缓存中获取数据,我们仍然合并两个可观察对象.

return Observable.concat(localResult, remoteResult)
                .first()
                .map(new Func1<Page, Page>() {
                    @Override
                    public Page call(Page page) {
                        if (page == null) {
                            throw new NoSuchElementException("No result found!");
                        }
                        return page;
                    }
                });

使用concat时,我们尝试从领域中获取数据,如果不能,则尝试从远程中获取数据.

这是远程可观察的实现:

@Override
public Observable<Page> search(@NonNull String query) {
        return mWikiServices.search(query).flatMap(new Func1<Result, Observable<Page>>() {
            @Override
            public Observable<Page> call(Result result) {
                final ArrayList<Page> pages = new ArrayList<>(result.getQuery().getPages().values());
                Log.i("data from", "remote");
                return Observable.from(pages).first();
            }
        });
    }

这里是本地源实现:

@Override
public Observable<Page> search(@NonNull final String query) {
        return Observable.create(new Observable.OnSubscribe<Page>() {
            @Override
            public void call(Subscriber<? super Page> subscriber) {
                final Realm realm = Realm.getInstance(mRealmConfiguration);
                final Page page = realm.where(Page.class)
                        .equalTo("query", query)
                        .findFirst();
                if (page != null && page.isLoaded() && page.isValid()) {
                    Log.i("data from", "realm");
                    subscriber.onNext(realm.copyFromRealm(page));
                } else {
                    Observable.empty();
                }
                subscriber.onCompleted();
                realm.close();
            }
        });
    }

关键是我创建了一个新的Observable并从那里的领域获取数据.因此,我们创建领域实例并在同一线程中使用它. (io线程).我们创建对象的副本以摆脱非法状态异常.

如果我们从领域获取数据(如果为null),则返回一个可观察的空值,以免卡在concat操作中.

如果获得页面,则该页面有效并已加载,我们将其发送给订户并完成操作.

在这里,我们可以将从远程获取的数据保存到领域:

@Override
public void save(@NonNull String query, @NonNull Page page) {
        final Realm realm = Realm.getInstance(mRealmConfiguration);
        realm.beginTransaction();
        final Page p = realm.createObject(Page.class);
        p.setQuery(query);
        p.setId(page.getId());
        p.setTitle(page.getTitle());
        p.setContent(page.getContent());
        realm.copyToRealmOrUpdate(p);
        realm.commitTransaction();
        realm.close();
    }

这里是示例源代码. https://github.com/savepopulation/wikilight

祝你好运.

I'm trying to implement RxJava + Realm + Retrofit + Repository Pattern

Here's my local implementation:

@Override
public Observable<Page> search(@NonNull final String query) {

        return Realm.getDefaultInstance().where(Page.class)
                .equalTo("query", query)
                .findAll()
                .asObservable()
                .cast(Page.class);
    }

Here's my remote implementation:

 @Override
 public Observable<Page> search(@NonNull String query) {
        return mWikiServices.search(query).map(new Func1<Result, Page>() {
            @Override
            public Page call(Result result) {
                final List<Page> pages = new ArrayList<>(result.getQuery().getPages().values());
                return pages.get(0);
            }
        });
    }

Here's my repo implementation:

 final Observable<Page> localResult = mSearchLocalDataSource.search(query);
 final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
                .doOnNext(new Action1<Page>() {
                    @Override
                    public void call(Page page) {
                        //mSearchLocalDataSource.save(query, page);
                        //mResultCache.put(query, page);
                    }
                });

        return Observable.concat(localResult, remoteResult)
                .first()
                .doOnError(new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });

And finally here's my subscription in presenter.

final Subscription subscription = mSearchRepository.search(this.mQuery)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Page>() {
                    @Override
                    public void onCompleted() {
                        // Completed
                    }

                    @Override
                    public void onError(Throwable e) {
                        mView.onDefaultMessage(e.getMessage());
                    }

                    @Override
                    public void onNext(Page page) {
                        mView.onDefaultMessage(page.getContent());
                    }
                });

        mCompositeSubscription.add(subscription);

When i run code i get this exception: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.

I tried official solutions in Realm Github repo but none of them worked. Still get this exception.

I think i get this exception because i'm subscribing on an io thread. Realm instance gets created in Main thread. So i get this exception.

Are there any implementation offers?

Thanks.

解决方案

After a long research i found the solution.

First let's remember the problem: When i subscribe a Schedulars.io thread and try to get data from realm or retrofit i get "Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created"

Main problem in this case is i create Realm instance in Main thread but try to access it from worker thread.

We can subscribe realm on main thread but this's not a good practice. When i use

realm.where("query",query).findFirstAsync().asObservable();

as an example in Github repo i get stuck at

Observable.concat(localResult, remoteResult).first();

What's my solution?

In our Repository implementation still we have two observables for remote and local like below:

final Observable<Page> localResult = mSearchLocalDataSource.search(query);
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
                .doOnNext(new Action1<Page>() {
                    @Override
                    public void call(Page page) {
                        if (page != null) {
                            mSearchLocalDataSource.save(query, page);
                            mResultCache.put(query, page);
                        }
                    }
                });

Take attention on we save to data and cache in memory when we get data from remote.

Still we concat two observables if we cannot get data from cache.

return Observable.concat(localResult, remoteResult)
                .first()
                .map(new Func1<Page, Page>() {
                    @Override
                    public Page call(Page page) {
                        if (page == null) {
                            throw new NoSuchElementException("No result found!");
                        }
                        return page;
                    }
                });

With concat we try to get data from realm and if we can't we try to get from remote.

Here's remote observable implementation:

@Override
public Observable<Page> search(@NonNull String query) {
        return mWikiServices.search(query).flatMap(new Func1<Result, Observable<Page>>() {
            @Override
            public Observable<Page> call(Result result) {
                final ArrayList<Page> pages = new ArrayList<>(result.getQuery().getPages().values());
                Log.i("data from", "remote");
                return Observable.from(pages).first();
            }
        });
    }

Here's local source implementation:

@Override
public Observable<Page> search(@NonNull final String query) {
        return Observable.create(new Observable.OnSubscribe<Page>() {
            @Override
            public void call(Subscriber<? super Page> subscriber) {
                final Realm realm = Realm.getInstance(mRealmConfiguration);
                final Page page = realm.where(Page.class)
                        .equalTo("query", query)
                        .findFirst();
                if (page != null && page.isLoaded() && page.isValid()) {
                    Log.i("data from", "realm");
                    subscriber.onNext(realm.copyFromRealm(page));
                } else {
                    Observable.empty();
                }
                subscriber.onCompleted();
                realm.close();
            }
        });
    }

The point is i create a new Observable and get data from realm in there. So we create realm instance and use it in same thread. (io thread). We create copy of object to get rid of illegal state exception.

When we get data from realm if null we return an empty observable to do not get stuck in concat operation.

if we get page, it's valid and loaded we send to subscriber and complete operation.

Here how we can save the data we get from remote to realm:

@Override
public void save(@NonNull String query, @NonNull Page page) {
        final Realm realm = Realm.getInstance(mRealmConfiguration);
        realm.beginTransaction();
        final Page p = realm.createObject(Page.class);
        p.setQuery(query);
        p.setId(page.getId());
        p.setTitle(page.getTitle());
        p.setContent(page.getContent());
        realm.copyToRealmOrUpdate(p);
        realm.commitTransaction();
        realm.close();
    }

Here's example source code. https://github.com/savepopulation/wikilight

Good luck.

这篇关于Android Realm + RxJava-来自错误线程的领域访问.只能在创建对象的线程上访问领域对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆