So i'm learning RxJava so apologies.
I have to sync the local database with an API. I have to snyc multiple tables. The Database is Room. And the API is Rxretrofit.
I've been going backwards and forth as theres some "Observables" im paying little attention too.
First way is:
Observable.zip(apiService.getAssetList(null, null), appDatabase.checkDao().getAllAssetAssets(), (networkAssets, localAssets) -> {
try {
for (AssetListResponseModel.Asset asset : networkAssets.getData().getAssets()) {
localAssets.remove(asset);
networkAssets.getData().getAssets().remove(asset);
}
appDatabase.checkDao().deleteAllAssetsIn(localAssets); // remove old
appDatabase.checkDao().insertAssets(networkAssets.getData().getAssets()); //insert new
return STATUS.SYNCHRONISED;
} catch (Exception e) {
return STATUS.ERROR;
}
}).flatMap((Function<STATUS, ObservableSource<STATUS>>) status -> {
if (status != STATUS.SYNCHRONISED) {
//Sync Failed on Assets
return Observable.error(new Exception("Asset " + status));
}
return Observable.zip(apiService.getQuestionResponse(0), appDatabase.checkDao().getAllQuestions(), (networkQuestions, localQuestions) -> { //TODO get all questions from API - Will 0 work ?
try {
for (QuestionLiseResponseModel.Question question : networkQuestions.getData().getQuestionList()) {
localQuestions.remove(question);
networkQuestions.getData().getQuestionList().remove(question);
}
appDatabase.checkDao().insertQuestions(networkQuestions.getData().getQuestionList()); //Insert new
appDatabase.checkDao().deleteQuestions(localQuestions); //Purge old
return STATUS.SYNCHRONISED;
} catch (Exception e) {
return STATUS.ERROR;
}
});
}).flatMap((Function<STATUS, ObservableSource<STATUS>>) status -> {
if (status != STATUS.SYNCHRONISED) {
//Sync Failed on Question
return Observable.error(new Exception("Question " + status));
}
return Observable.zip(apiService.getContacts(), appDatabase.checkDao().getContacts(), (networkContact, localContact) -> {
try {
for (Contact contact : networkContact.getData()) {
localContact.remove(contact);
networkContact.getData().remove(contact);
}
appDatabase.checkDao().insertContacts(networkContact.getData());
appDatabase.checkDao().deleteContacts(localContact);
return STATUS.SYNCHRONISED;
} catch (Exception e) {
AirbrakeNotifier.notify(e);
return STATUS.ERROR;
}
});
}).map(status -> {
CORE_STATUS = status;
CORE_SYNC_SUCCESSFUL = status == STATUS.SYNCHRONISED ? new Date() : CORE_SYNC_SUCCESSFUL; //if Successful, update time to now, otherwise reset it back to original
return status;
}).doOnSubscribe(disposable -> Log.d(TAG, "handleActionCore - Started")
).doOnComplete(() -> {
Log.d(TAG, "handleActionCore - Ended");
set_isPerformingCoreSync(false);
}).doOnError(throwable -> {
AirbrakeNotifier.notify(throwable);
set_isPerformingCoreSync(false);
}).observeOn(Schedulers.io())
.subscribeOn(Schedulers.io())
.subscribe();
For an example at the top these two lines are called and the result ignored.
appDatabase.checkDao().deleteAllAssetsIn(localAssets); // remove old
appDatabase.checkDao().insertAssets(networkAssets.getData().getAssets());
Second way:
Observable<List<Check>> checkCalls = appDatabase.checkDao().getAllCompletedChecksThatNotSent()
.onErrorResumeNext((Function<Throwable, ObservableSource<? extends List<Check>>>) Observable::error)
.map(checks -> {
for (Check check : checks) {
check.setSending(true);
appDatabase.checkDao().setCheckAsSending(check).subscribe();
}
return checks;
}).map(checks -> {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
checks.forEach(check -> {
apiService.createCheck(check)
.doOnError(throwable -> {
check.setSending(false);
check.setErrorMessage(throwable.getMessage());
appDatabase.checkDao().insertCheck(check);
}).doOnSuccess(checkResponseModelBaseResponseModel -> {
check.setSending(false);
check.setSent(checkResponseModelBaseResponseModel.getData().getId());
appDatabase.checkDao().setCheckAsSending(check);
}).subscribe();
});
} else {
for (Check check : checks) {
apiService.createCheck(check)
.doOnError(throwable -> {
check.setSending(false);
check.setErrorMessage(throwable.getMessage());
appDatabase.checkDao().insertCheck(check);
}).doOnSuccess(checkResponseModelBaseResponseModel -> {
check.setSending(false);
check.setSent(checkResponseModelBaseResponseModel.getData().getId());
appDatabase.checkDao().setCheckAsSending(check);
}).subscribe();
}
}
return checks;
});
However is this a better way where i return and Observable<T>
and then call Obervable.merge();
Observable.merge(checkCalls, checkCalls) //Example
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.io())
.subscribe();
Both do ignore multiple database update calls. So im not 100% sure im tackling this correctly.
So the question would be. What is the best way to handle sync'ing information where both endpoints are reactively implemented and can have numerous calls either side?
from Android/RxJava Sync local DB with API
No comments:
Post a Comment