Monday, 18 January 2021

Android/RxJava Sync local DB with API

So i'm learning RxJava so apologies.

I have to sync the local database with an API. I have to snyc multiple tables. The Database is Room. And the API is Rxretrofit.

I've been going backwards and forth as theres some "Observables" im paying little attention too.

First way is:

Observable.zip(apiService.getAssetList(null, null), appDatabase.checkDao().getAllAssetAssets(), (networkAssets, localAssets) -> {
            try {
                for (AssetListResponseModel.Asset asset : networkAssets.getData().getAssets()) {
                    localAssets.remove(asset);
                    networkAssets.getData().getAssets().remove(asset);
                }
                appDatabase.checkDao().deleteAllAssetsIn(localAssets); // remove old
                appDatabase.checkDao().insertAssets(networkAssets.getData().getAssets()); //insert new
                return STATUS.SYNCHRONISED;
            } catch (Exception e) {
                return STATUS.ERROR;
            }
        }).flatMap((Function<STATUS, ObservableSource<STATUS>>) status -> {
            if (status != STATUS.SYNCHRONISED) {
                //Sync Failed on Assets
                return Observable.error(new Exception("Asset " + status));
            }

            return Observable.zip(apiService.getQuestionResponse(0), appDatabase.checkDao().getAllQuestions(), (networkQuestions, localQuestions) -> { //TODO get all questions from API - Will 0 work ?
                try {
                    for (QuestionLiseResponseModel.Question question : networkQuestions.getData().getQuestionList()) {
                        localQuestions.remove(question);
                        networkQuestions.getData().getQuestionList().remove(question);
                    }
                    appDatabase.checkDao().insertQuestions(networkQuestions.getData().getQuestionList()); //Insert new
                    appDatabase.checkDao().deleteQuestions(localQuestions); //Purge old

                    return STATUS.SYNCHRONISED;
                } catch (Exception e) {
                    return STATUS.ERROR;
                }
            });

        }).flatMap((Function<STATUS, ObservableSource<STATUS>>) status -> {
            if (status != STATUS.SYNCHRONISED) {
                //Sync Failed on Question
                return Observable.error(new Exception("Question " + status));
            }

            return Observable.zip(apiService.getContacts(), appDatabase.checkDao().getContacts(), (networkContact, localContact) -> {
                try {
                    for (Contact contact : networkContact.getData()) {
                        localContact.remove(contact);
                        networkContact.getData().remove(contact);
                    }
                    appDatabase.checkDao().insertContacts(networkContact.getData());
                    appDatabase.checkDao().deleteContacts(localContact);
                    return STATUS.SYNCHRONISED;
                } catch (Exception e) {
                    AirbrakeNotifier.notify(e);
                    return STATUS.ERROR;
                }
            });
        }).map(status -> {
            CORE_STATUS = status;
            CORE_SYNC_SUCCESSFUL = status == STATUS.SYNCHRONISED ? new Date() : CORE_SYNC_SUCCESSFUL; //if Successful, update time to now, otherwise reset it back to original
            return status;
        }).doOnSubscribe(disposable -> Log.d(TAG, "handleActionCore - Started")
        ).doOnComplete(() -> {
            Log.d(TAG, "handleActionCore - Ended");
            set_isPerformingCoreSync(false);
        }).doOnError(throwable -> {
            AirbrakeNotifier.notify(throwable);
            set_isPerformingCoreSync(false);
        }).observeOn(Schedulers.io())
                .subscribeOn(Schedulers.io())
                .subscribe();

For an example at the top these two lines are called and the result ignored.

                appDatabase.checkDao().deleteAllAssetsIn(localAssets); // remove old
                appDatabase.checkDao().insertAssets(networkAssets.getData().getAssets()); 

Second way:

    Observable<List<Check>> checkCalls = appDatabase.checkDao().getAllCompletedChecksThatNotSent()
            .onErrorResumeNext((Function<Throwable, ObservableSource<? extends List<Check>>>) Observable::error)
            .map(checks -> {
                for (Check check : checks) {
                    check.setSending(true);
                    appDatabase.checkDao().setCheckAsSending(check).subscribe();
                }
                return checks;
            }).map(checks -> {
                if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
                    checks.forEach(check -> {
                        apiService.createCheck(check)
                                .doOnError(throwable -> {
                                    check.setSending(false);
                                    check.setErrorMessage(throwable.getMessage());
                                    appDatabase.checkDao().insertCheck(check);
                                }).doOnSuccess(checkResponseModelBaseResponseModel -> {
                            check.setSending(false);
                            check.setSent(checkResponseModelBaseResponseModel.getData().getId());
                            appDatabase.checkDao().setCheckAsSending(check);
                        }).subscribe();
                    });
                } else {
                    for (Check check : checks) {
                        apiService.createCheck(check)
                                .doOnError(throwable -> {
                                    check.setSending(false);
                                    check.setErrorMessage(throwable.getMessage());
                                    appDatabase.checkDao().insertCheck(check);
                                }).doOnSuccess(checkResponseModelBaseResponseModel -> {
                            check.setSending(false);
                            check.setSent(checkResponseModelBaseResponseModel.getData().getId());
                            appDatabase.checkDao().setCheckAsSending(check);
                        }).subscribe();
                    }
                }
                return checks;
            });

However is this a better way where i return and Observable<T> and then call Obervable.merge();

    Observable.merge(checkCalls, checkCalls) //Example
            .observeOn(Schedulers.io())
            .subscribeOn(Schedulers.io())
            .subscribe();

Both do ignore multiple database update calls. So im not 100% sure im tackling this correctly.

So the question would be. What is the best way to handle sync'ing information where both endpoints are reactively implemented and can have numerous calls either side?



from Android/RxJava Sync local DB with API

No comments:

Post a Comment