I have a graph of tasks, where each task may have on_success and on_error follow-up tasks. Success path is guaranteed to be acyclic (e.g. if all tasks run fine then no on_success handlers link to tasks that have already ran) and guaranteed to have some final task (e.g. without on_success follow-up handler). But on_error tasks might link to any tasks, creating new flows (which themselves are positive-acyclic and finite). Example graph:
+ final_1
task_1
+ - $task_2
entrypoint
- + final_2
task_2
- $task_1
where $task_1 means that execution flow goes onto another branch and runs task_1 directly.
Technically it can get into an infinite loop if both task_1 and task_2 always fail, but that's an ok case by design.
My current solution was to run single celery task and handle errors in my "tasks" (which are then just ran synchronosly, blocking the celery task).
@celery.task
def task_runner():
task = graph.tasks[graph.entrypoint]
task_input = some_input
task_output = None
while task:
try:
# call current task
task_output = task.callback(task_input)
# define successor and define input as output of current task
task = graph.tasks[task.on_success]
task_input = task_output
except Exception:
# define error-successor and define input as output of current task
task = graph.tasks[task.on_error]
task_output = task_input
The code is somewhat simplified version of mine so might contain obvious errors but it's here to convey the basic idea.
Basically, at each point in time during execution I have a straightforward success-flow — entrypoint -> task_1 -> final_1 (no forks and no loops). Once I encounter error I just switch to different success-flow if it is defined — entrypoint -> task_1 (error!) -> task_2 -> final_2.
What I want now is to run those tasks as Celery tasks, not just synchronous methods within single Celery task. But I cannot think of a way to define new chained flows of any task fails (that try/except block).
I could build a chain of valid tasks for entrypoint -> task_1 -> final_1 and do an apply_async():
@celery.task
def task_chainer():
task = graph.tasks[graph.entrypoint]
task_input = some_input
task_output = None
success_chain = []
while task:
try:
success_chain.append(task.callback.s(task_input))
# define successor and define input as output of current task
task = graph.tasks[task.on_success]
except Exception:
# define error-successor and define input as output of current task
task = graph.tasks[task.on_error]
success_chain.apply_async()
but that try/except wouldn't work since it's now all async and I have to use error callbacks. Due to those chains holding cyclic-potential for error cases I cannot define all chains beforehand and just apply them once.
So, my question is: how to redefine a chain of tasks once some task fails? Or should this be done in some other way? Maybe chain.unchain_tasks() might be of some use here?
from Dinamically change task success-callback and error-callback within a chain in Celery
No comments:
Post a Comment