Wednesday 30 December 2020

How to complete a Kotlin Flow in Android Worker

I'm investigating the use of Kotlin Flow within my current Android application

My application retrieves its data from a remote server via Retrofit API calls.

Some of these API's return 50,000 data items in 500 item pages.

Each API response contains an HTTP Link header containing the Next pages complete URL.

These calls can take up to 2 seconds to complete.

In an attempt to reduce the elapsed time I have employed a Kotlin Flow to concurrently process each page of data while also making the next page API call.

My flow is defined as follows:

private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
private val internalWorkWorkState = MutableStateFlow<Response<List<MyPage>>?>(null)
private val workWorkState = internalWorkWorkState.asStateFlow()

private val myJob: Job

init {
    myJob = GlobalScope.launch(persistenceThreadPool) {
        workWorkState.collect { page ->
            if (page == null) {
            } else managePage(page!!)
        }
    }
}

My Recursive function is defined as follows that fetches all pages:-

    private suspend fun managePages(accessToken: String, response: Response<List<MyPage>>) {
        when {
            result != null -> return
            response.isSuccessful -> internalWorkWorkState.emit(response)
            else -> {
                manageError(response.errorBody())
                result = Result.failure()
                return
            }
        }

        response.headers().filter { it.first == HTTP_HEADER_LINK && it.second.contains(REL_NEXT) }.forEach {
            val parts = it.second.split(OPEN_ANGLE, CLOSE_ANGLE)
            if (parts.size >= 2) {
                managePages(accessToken, service.myApiCall(accessToken, parts[1]))
            }
        }
    }

   private suspend fun managePage(response: Response<List<MyPage>>) {
        val pages = response.body()

        pages?.let {
            persistResponse(it)
        }
    }

    private suspend fun persistResponse(myPage: List<MyPage>) {
        val myPageDOs = ArrayList<MyPageDO>()

        myPage.forEach { page ->
            myPageDOs.add(page.mapDO())
        }

        database.myPageDAO().insertAsync(myPageDOs)
    }
    

My numerous issues are

  1. This code does not insert all data items that I retrieve

  2. How do complete the flow when all data items have been retrieved

  3. How do I complete the GlobalScope job once all the data items have been retrieved and persisted

UPDATE

By making the following changes I have managed to insert all the data

 private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
    private val completed = CompletableDeferred<Int>()

    private val channel = Channel<Response<List<MyPage>>?>(UNLIMITED)
    private val channelFlow = channel.consumeAsFlow().flowOn(persistenceThreadPool)

    private val frank: Job

    init {
        frank = GlobalScope.launch(persistenceThreadPool) {
            channelFlow.collect { page ->
                if (page == null) {
                    completed.complete(totalItems)
                } else managePage(page!!)
            }
        }
    }


...
...
...

   channel.send(null)
   completed.await()

   return result ?: Result.success(outputData)

I do not like having to rely on a CompletableDeferred, is there a better approach than this to know when the Flow has completed everything?



from How to complete a Kotlin Flow in Android Worker

No comments:

Post a Comment