Friday, 15 January 2021

Fetching requests in queue concurrently

I have written code that allows me to start fetching the next chunk of data from an API while the previous chunk of data is being processed.

I'd like this to be always fetching up to 5 chunks concurrently at any given moment, but the returned data should always be processed in the correct order even if a request that is last in the queue completes before any other.

How can my code be changed to make this happen?

class MyClient:
    async def fetch_entities(
        self,
        entity_ids:List[int],
        objects:Optional[List[str]],
        select_inbound:Optional[List[str]]=None,
        select_outbound:Optional[List[str]]=None,
        queue_size:int=5,
        chunk_size:int=500,
    ):
        """
        Fetch entities in chunks

        While one chunk of data is being processed the next one can
        already be fetched. In other words: Data processing does not
        block data fetching.
        """
        objects = ",".join(objects)
        if select_inbound:
            select_inbound = ",".join(select_inbound)

        if select_outbound:
            select_outbound = ",".join(select_outbound)

        queue = asyncio.Queue(maxsize=queue_size)

        # TODO: I want to be able to fill the queue with requests that are already executing

        async def queued_chunks():
            for ids in chunks(entity_ids, chunk_size):
                res = await self.client.post(urllib.parse.quote("entities:fetchdata"), json={
                    "entityIds": ids,
                    "objects": objects,
                    "inbound": {
                        "linkTypeIds": select_outbound,
                        "objects": objects,
                    } if select_inbound else {},
                    "outbound": {
                        "linkTypeIds": select_inbound,
                        "objects": objects,
                    } if select_outbound else {},
                })
                await queue.put(res)
            await queue.put(None)

        asyncio.create_task(queued_chunks())

        while True:
            res = await queue.get()
            if res is None:
                break
            res.raise_for_status()
            queue.task_done()
            for entity in res.json():
                yield entity


from Fetching requests in queue concurrently

No comments:

Post a Comment