Tuesday 26 April 2022

How stream a response from a Twisted server?

Issue

My problem is that I can't write a server that streams the response that my application sends back.
The response are not retrieved chunk by chunk, but from a single block when the iterator has finished iterating.

Approach

When I write the response with the write method of Request, it understands well that it is a chunk that we send.
I checked if there was a buffer size used by Twisted, but the message size check seems to be done in the doWrite.

After spending some time debugging, it seems that the reactor only reads and writes at the end.
If I understood correctly how a reactor works with Twisted, it writes and reads when the file descriptor is available.

What is a file descriptor in Twisted ?
Why is it not available after writing the response ?

Example

I have written a minimal script of what I would like my server to look like.
It's a "ASGI-like" server that runs an application, iterates over a function that returns a very large string:

# async_stream_server.py
import asyncio
from twisted.internet import asyncioreactor

twisted_loop = asyncio.new_event_loop()
asyncioreactor.install(twisted_loop)

import time
from sys import stdout

from twisted.web import http
from twisted.python.log import startLogging
from twisted.internet import reactor, endpoints

CHUNK_SIZE = 2**16


def async_partial(async_fn, *partial_args):
    async def wrapped(*args):
        return await async_fn(*partial_args, *args)
    return wrapped


def iterable_content():
    for _ in range(5):
        time.sleep(1)
        yield b"a" * CHUNK_SIZE


async def application(send):
    for part in iterable_content():
        await send(
            {
                "body": part,
                "more_body": True,
            }
        )
    await send({"more_body": False})


class Dummy(http.Request):
    def process(self):
        asyncio.ensure_future(
            application(send=async_partial(self.handle_reply)),
            loop=asyncio.get_event_loop()
        )

    async def handle_reply(self, message):
        http.Request.write(self, message.get("body", b""))
        if not message.get("more_body", False):
            http.Request.finish(self)
        print('HTTP response chunk')


class DummyFactory(http.HTTPFactory):
    def buildProtocol(self, addr):
        protocol = http.HTTPFactory.buildProtocol(self, addr)
        protocol.requestFactory = Dummy
        return protocol


startLogging(stdout)
endpoints.serverFromString(reactor, "tcp:1234").listen(DummyFactory())
asyncio.set_event_loop(reactor._asyncioEventloop)
reactor.run()

To execute this example:

  • in a terminal, run:
python async_stream_server.py
  • in another terminal, run:
curl http://localhost:1234/

You will have to wait a while before you see the whole message.

Details

$ python --version
Python 3.10.4
$ pip list
Package           Version Editable project location
----------------- ------- --------------------------------------------------
asgiref           3.5.0
Twisted           22.4.0


from How stream a response from a Twisted server?

No comments:

Post a Comment