What I have in mind is a very generic BackgroundTask class that can be used within webservers or standalone scripts, to schedule away tasks that don't need to be blocking.
I don't want to use any task queues (celery, rabbitmq, etc.) here because the tasks I'm thinking of are too small and fast to run. Just want to get them done as out of the way as possible. Would that be an async approach? Throwing them onto another process?
First solution I came up with that works:
# Need ParamSpec to get correct type hints in BackgroundTask init
P = ParamSpec("P")
class BackgroundTask(metaclass=ThreadSafeSingleton):
"""Easy way to create a background task that is not dependent on any webserver internals.
Usage:
async def sleep(t):
time.sleep(t)
BackgroundTask(sleep, 10) <- Creates async task and executes it separately (nonblocking, works with coroutines)
BackgroundTask(time.sleep, 9) <- Creates async task and executes it separately (nonblocking, works with normal functions)
"""
background_tasks = set()
lock = threading.Lock()
def __init__(self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
"""Uses singleton instance of BackgroundTask to add a task to the async execution queue.
Args:
func (typing.Callable[P, typing.Any]): _description_
"""
self.func = func
self.args = args
self.kwargs = kwargs
self.is_async = asyncio.iscoroutinefunction(func)
async def __call__(self) -> None:
if self.is_async:
with self.lock:
task = asyncio.create_task(self.func(*self.args, **self.kwargs))
self.background_tasks.add(task)
print(len(self.background_tasks))
task.add_done_callback(self.background_tasks.discard)
# TODO: Create sync task (this will follow a similar pattern)
async def create_background_task(func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
b = BackgroundTask(func, *args, **kwargs)
await b()
# Usage:
async def sleep(t):
time.sleep(t)
await create_background_task(sleep, 5)
I think I missed the point by doing this though. If I ran this code along with some other async code, then yes, I would get a performance benefit since blocking operations aren't blocking the main thread anymore.
I'm thinking I maybe need something more like a separate process to handle such background tasks without blocking the main thread at all (the above async code will still be run on the main thread).
Does it make sense to have a separate thread that handles background jobs? Like a simple job queue but very lightweight and does not require additional infrastructure?
Or does it make sense to create a solution like the one above?
I've seen that Starlette does something like this (https://github.com/encode/starlette/blob/decc5279335f105837987505e3e477463a996f3e/starlette/background.py#L15) but they await the background tasks AFTER a response is returned.
This makes their solution dependent on a web server design (i.e. doing things after response is sent is OK). I'm wondering if we can build something more generic where you can run background tasks in scripts or webservers alike, without sacrificing performance.
Not that familiar with async/concurrency features, so don't really know how to compare these solutions. Seems like an interesting problem!
Here is what I came up with trying to perform the tasks on another process:
class BackgroundTask(metaclass=ThreadSafeSingleton):
"""Easy way to create a background task that is not dependent on any webserver internals.
Usage:
async def sleep(t):
time.sleep(t)
BackgroundTask(sleep, 10) <- Creates async task and executes it separately (nonblocking, works with coroutines)
BackgroundTask(time.sleep, 9) <- Creates async task and executes it separately (nonblocking, works with normal functions)
BackgroundTask(es.transport.close) <- Probably most common use in our codebase
"""
background_tasks = set()
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
lock = threading.Lock()
def __init__(self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
"""Uses singleton instance of BackgroundTask to add a task to the async execution queue.
Args:
func (typing.Callable[P, typing.Any]): _description_
"""
self.func = func
self.args = args
self.kwargs = kwargs
self.is_async = asyncio.iscoroutinefunction(func)
async def __call__(self) -> None:
if self.is_async:
with self.lock:
loop = asyncio.get_running_loop()
with self.executor as pool:
result = await loop.run_in_executor(
pool, functools.partial(self.func, *self.args, **self.kwargs))
from How can I "fire and forget" a task without blocking main thread?
No comments:
Post a Comment