Thursday, 10 October 2019

RxJava 2.x - An Observable getting flatMap triggered by a PublishSubject and merged with other Observables does not get subscribed/executed

I have a pagination solution using a PublishSubject that looks like this:

    private val pages: PublishSubject<Int> = PublishSubject.create()
    val observable: Observable<List<Data> = pages.hide()
        .filter { !inFlight }
        .doOnNext { inFlight = true }
        .flatMap{
            getPage(it) // Returns an Observable
        }
        .doOnNext(::onNextPage) // inFlight gets reset here

This Observable is merged and scanned with other Observable´s like this:

    fun stateObservable(): Observable<SavedState> {
        return Observable.merge(listOf(firstPage(),
            nextPage(),// The observable listed above
            refresh()))
            .scan(MyState.initialState(), StateReducer::reduce)
    }

Basically I have a unidirectional setup where every observable updates MyState with its relevant changes with the help of the accumulator function reduce.

In the ViewModel this is consumed in a straight forward way:

        interactor.stateObservable()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeBy(onNext = ::render, onError = Timber::e)
            .addTo(subscriptions)

This setup works well for firstPage as well as refresh (Also triggered with the help of PublishSubject) but for some reason the paging solution get's as for as returning the getPage Observable in the flatMap but then this page Observable never gets triggered/subscribed to and the doOnNext after the flatMap obviously doesn't get called either. It seems like it basically doesn't wanna subscribe to it and I simply don't know why.

The getPage function looks like this:

    private fun getPage(page: Long): Observable<PartialState<SavedState>> {
        return repo.getPage(page).firstOrError().toObservable()
            .subscribeOn(Schedulers.io())
            .map<PartialState<MyState>> { NextPageLoaded(it) }
            .onErrorReturn { NextPageError(it) }
            .startWith { NextPageLoading() }
    }

The getPage in the repo is converting an RxJava 1 Observable to an RxJava2 Observable with the help of RxJavaInterop in the following way:

    public io.reactivex.Observable<List<Data>> getPage(long page) {
        Observable<List<Data>> observable = getPage(page)
                .map(dataList -> {
                    if(dataList == null){
                        dataList = new ArrayList<>();
                    }
                    return dataList;
                });

        return RxJavaInterop.toV2Observable(observable);
    }

Im not getting any errors so you can rule that out.

I already have this very same setup with RxJava 1 where it is working very well and now when I'm migrating to 2.x I was expecting the same solution to work but I'm completely stuck on this pagination issue and in all other scenarios the setup is working as expected.

To be able to test the issue I have uploaded a sample project on GitHub demonstrating the issue.

Any RxJava expert out there that have a clue on what it could be? :)

Thanks



from RxJava 2.x - An Observable getting flatMap triggered by a PublishSubject and merged with other Observables does not get subscribed/executed

No comments:

Post a Comment