Rxjava 3 + Retrofit2-多次插入数据库问题 [英] Rxjava 3 + Retrofit2 - multiple inserts to DB problem

查看:120
本文介绍了Rxjava 3 + Retrofit2-多次插入数据库问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试执行以下操作;使用Retrofit将云数据库同步到设备上的本地SqLite DB(房间). DB可能变大,大约有100,000个寄存器或更多,因此同步过程可能需要一些时间.因此,它发送第一个Retrofit请求以获取寄存器数,因此它可以计算总页数,然后它将发送多个Retrofit Request,以从API获取所有数据,在每个请求之后,它将数据保存到房间.

I am trying to do the following; sync a cloud DB using Retrofit to a local SqLite DB (Room) on a device. The DB could get large, around 100,000 registers or more, so the sync process can take some time. So it send a first Retrofit request to get the number of register, so it can calculate the total number of pages, after that it will send multiple Retrofit Request, to get all the data from API, after each request, it saves the data to Room.

现在,我在合并两个RxJava调用或进程时遇到麻烦,在第二个RxJava进程中,在Retrofit调用之后,有一个房间列表的对象列表,但是在空洞过程结束后,我请注意,并非所有记录都插入了100%,每次运行该流程时,插入的记录数都会发生变化,大约是80%-98%,但是即使发送了所有Retrofit调用也绝不会100%.

Right now, I am having trouble combining two RxJava calls or process, also on the second RxJava process, after a Retrofit call, there is a Room Insert of a List-of-Objets, but after the hole process ends, I notice that not 100% of all the records are inserted, every time that I run the process, the number of records inserted change, it is around 80% - 98%, but never 100%, even though all the Retrofit calls are sent.

请帮助我

  1. 如何在一个RxJava调用中完成所有过程,而不是像我这样的2 现在吗?
  2. 如何将100%的记录插入到会议室?
  1. How to make all the process in one RxJava call, not 2 like I have it now?
  2. How to insert 100% of records to Room?

以下代码:

Gradle

def room_version = "2.2.5"
//RxJava 2
implementation "io.reactivex.rxjava2:rxjava:2.2.19"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
//Retrofit
implementation 'com.squareup.retrofit2:retrofit:2.8.1'
implementation 'com.squareup.retrofit2:converter-gson:2.8.1'
//Retrofit2 Adapter for RxJava 2
implementation "com.squareup.retrofit2:adapter-rxjava2:2.8.1"
//okhttp3 Logging Interceptor
implementation "com.squareup.okhttp3:logging-interceptor:4.5.0"
//Room
implementation "androidx.room:room-runtime:$room_version"
annotationProcessor "androidx.room:room-compiler:$room_version"
//RxJava support for Room
implementation "androidx.room:room-rxjava2:$room_version" 

ItemSyncDetails

ItemSyncDetails

...
public class ItemSyncDetails {
    @SerializedName("CurrentPage")
    int currentPage;
    @SerializedName("PageCount")
    int pageCount;
    @SerializedName("PageSize")
    int pageSize;
    @SerializedName("RecordCount")
    int recordCount;
    @SerializedName("Results")
    List<Item> mItemList;
...
}

ItemDao

注意:我没有使用Observer/Flowable/Maybe/Single,因为我有 使其能够与RxJava一起使用

Note: I haven't used Observer/Flowable/Maybe/Single, because I having been able to make it work with RxJava

import io.reactivex.Flowable;

@Dao
public interface ItemDao {

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    long insert(Item item);

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    List<Long> insertAll(List<Item> items);
...

DataApi

import io.reactivex.rxjava3.core.Observable;
...

public interface DataApi {

    @GET("item")
    Observable<ItemSyncDetails> getItemsByPage(
            @Query("pageSize") Integer pageSize,
            @Query("currentPage") Integer currentPage,
            @Query("sortBy") Integer sortBy
    );

ItemRepository

ItemRepository

import io.reactivex.Observable;
    ...

    public class ItemRepository {
    ...

        public ItemRepository(Application application) {
            mDataApi = RetrofitClient.getRetrofitInstance("http://192.168.1.100").create(DataApi.class);
            RfidDatabase db = RfidDatabase.getAppDatabase(application);
            itemDao = db.itemDao();
            itemList = itemDao.getAllItems();
            inserts = 0;
        }

        public List<Long> insertAllLocal (List<Item> itemList) {
            List<Long> items = itemDao.insertAll(itemList);
            inserts += items.size();
            Log.i(TAG, "************insertAllLocal - ItemRepository: " + inserts + "*************");
            Log.i(TAG, "************insertAllLocal - ItemRepository: " + items);
            return items;
        }

        public Observable<ItemSyncDetails> getRecordsCount(){
            return mDataApi.getItemsByPage(1,1,1);
        }

        public Observable<ItemSyncDetails> getItemsPerPage(int pageSize,int currentPage){
            return mDataApi.getItemsByPage(pageSize,currentPage,1);
        }
    ...

SyncConfigFragment 

    import io.reactivex.Observable;
    import io.reactivex.android.schedulers.AndroidSchedulers;
    import io.reactivex.disposables.CompositeDisposable;
    import io.reactivex.functions.Function;
    import io.reactivex.schedulers.Schedule
    ...

    public class SyncConfigFragment extends Fragment {


        private ItemViewModel itemViewModel;
        private ImageView imageSyncItems;
        private ProgressDialog progressDialog;
        private TextView tvSyncDescriptionItems;
        private DataApi service;
        private ItemSyncDetails mItemSyncDetails;
        private List<Item> mItemlist;
        private CompositeDisposable mCompositeDisposable;
        private int mNumPages;
        private int syncProgress;
        ...

        @Override
        public View onCreateView(LayoutInflater inflater, ViewGroup container, Bundle savedInstanceState) {
            View view =  inflater.inflate(R.layout.fragment_config_sync,container, false);
            progressDialog = new ProgressDialog(getActivity());
            sharedPref = getActivity().getSharedPreferences(
                    getString(R.string.sharepref_filename), Context.MODE_PRIVATE);
            mItemlist = new ArrayList<Item>();
            mCompositeDisposable = new CompositeDisposable();
            itemViewModel = ViewModelProviders.of(this).get(ItemViewModel.class);
            tvSyncDescriptionItems = view.findViewById(R.id.tvDescriptionSyncItems);
            if(sharedPref.contains("last_sync_item")) {
                tvSyncDescriptionItems.setText("Última actualización " + sharedPref.getString("last_sync_item",""));
            } else{
                tvSyncDescriptionItems.setText("No se ha Sincronizado");
            }
            imageSyncItems = view.findViewById(R.id.imageViewSyncItems);
            imageSyncItems.setOnClickListener(clickListener);
            return view;
        }

        private View.OnClickListener clickListener = new View.OnClickListener() {
            public void onClick(View v) {
                    if (v.equals(imageSyncItems)) {
                //If I uncomment the next line it does not work
                        //mCompositeDisposable.add(
                        mNumPages = 0;
                        syncProgress = 0;
                        showProgressDialog("Items");
                        getRecordsCount();
                       //); Closing round bracket for mCompositeDisposable
                }
            }
        };//End View.OnClickListener 

        private void getRecordsCount(){
            itemViewModel.getRecordsCount()
                    .subscribeOn(Schedulers.io())
                    .retry(3)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(this::HandleResults, this::handleError,this::getNumPagesHandlerComplete );
        }

        private void HandleResults(ItemSyncDetails itemSyncDetails) {
            this.mItemSyncDetails = itemSyncDetails;
            int pageSize = 100;
            int numPages = itemSyncDetails.getRecordCount()/pageSize;
            if (itemSyncDetails.getRecordCount() < pageSize || itemSyncDetails.getRecordCount()%pageSize != 0){
                numPages++;
            }
            this.mNumPages = numPages;
        }

        private void getNumPagesHandlerComplete() {
            getAllRecords(mNumPages);
        }

        private void handleError(Throwable throwable) {
            tvSyncDescriptionItems.setText("**********Error de conexión...");
            closeProgressDialog();
        }

        private void getAllRecords(int numPages){
            //numPages: total of pages are the number of times to send the request to API
            Observable.range(1, numPages)
                    .flatMap(i -> itemViewModel.getItemsPerPage(100,i))
                    .map(new Function<ItemSyncDetails, Integer>() {
                        @Override
                        public Integer apply(ItemSyncDetails itemSyncDetails) throws Throwable {
                            return itemViewModel.insertAllLocal(itemSyncDetails.getItemList()).size();
                        }
                    })
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(this::getAllHandleResults, this::handleError,this::handleComplete);
        }

        private void getAllHandleResults(Integer i) {
            progressDialog.setProgress(getProgress(i));
        }

        private void handleComplete() {
            //last request finished
            closeProgressDialog();
        }

        private int getProgress(int newItems){
            syncProgress += newItems;
            int progress = 0;
            if (syncProgress == mItemSyncDetails.getRecordCount()){
                progress = 100;
            } else {
                progress = (100 * syncProgress)/mItemSyncDetails.getRecordCount();
            }
            return progress;
        }
    ...
    }

http://192.168 .1.10:82/api/v1.0/item?pageSize = 1& currentPage = 1& sortBy = 1

注意:页面大小可能会改变,我使用的是100的固定大小 每页项.

Note: The page size could change, I am using a fixed size of a 100 items per page.

{
  Results: [
  {
    epc: "202020202020202030303031",
    barcode: "0001",
    name: "Televisor Samnsung",
    description: "0001",
    creation_date: "2020-02-26T10:55:06",
    last_update: "2020-02-26T10:55:06",
    last_seen: "2020-02-26T10:55:06",
    brand: "Samnsung",
    serial_number: "0001",
    parent: "",
    fk_category: 1,
    responsable: "",
    purchase_date: "2020-02-26T10:55:06",
    cost: 0,
    fk_location: 1008,
    fk_item_state: 1,
    inventory_date: "2020-02-26T10:55:06"
  }
 ],
 CurrentPage: 1,
 PageCount: 65565,
 PageSize: 1,
 RecordCount: 65565
}

推荐答案

您在编辑之前在此处发布了json响应.

You posted a json response here before the edit.

    CurrentPage: 1,
    PageCount: 65566,
    PageSize: 1,
    RecordCount: 65566

如果我的理解正确,那么每页中有65k项和1项.表示65k页面,表示65k网络调用.好多啊.您可以先改进此设计.

If I understand correctly, then you have 65k items and 1 item in each page. Meaning 65k pages which means 65k network calls. That's a lot. You could improve this design first.

  1. 将整个记录分成几页(甚至10或20页).如果整个记录有成千上万的项目,那么一页中仍然会有成千上万的项目.
  2. 然后使用gzip压缩来压缩每个页面的json响应,并从服务器提供该响应.或不要将记录分成几页,并在一个用gzip压缩的响应中将它们全部传递(如果不是那么大的话).
  3. 在android上将响应解压缩,解析它,然后执行您想做的任何事情.

这样,您可以减少大量的网络通话,并可能减少同步的等待时间.

This way you reduce a lot of network calls and possibly reduce the wait time for sync.

关于您实际的接收问题:

As to your actual rx question:

val pageSize = 100
viewModel.getRecordsCount()
    .map {
        // logic from `HandleResults` function
        // do some calculation
        var numPages: Int = it.records / pageSize
        if (it.records < pageSize || it.records % pageSize != 0) {
            numPages++
        }
        return@map numPages
    }
    .flatMap { pages -> Observable.range(1, pages) }
    .flatMap { page -> viewModel.getItemsPerPage(pageSize, page) }
    .flatMap { itemSyncDetails ->
        val items = viewModel.insertAllLocal(itemSyncDetails.getItemList())
        return@flatMap Observable.just(items.size)
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(....)

我注意到并不是所有记录都被100%插入,每次运行该流程时,插入的记录数都会发生变化,大约是80%-98%,但是即使所有的翻新都不会100%呼叫已发送.

I notice that not 100% of all the records are inserted, every time that I run the process, the number of records inserted change, it is around 80% - 98%, but never 100%, even though all the Retrofit calls are sent.

handleError函数中记录错误,并查看实际问题是什么.

Log the error in handleError function and see what the actual problem is.

这篇关于Rxjava 3 + Retrofit2-多次插入数据库问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆