I want to publish SINGLE Kafka message in case of airflow PARALLEL task failures. my airflow dags are similar to below.
from datetime import datetime, timedelta
from airflow.models import Variable
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import PythonOperator
def task_failure_callback(context):
ti = context['task_instance']
print(f"task {ti.task_id } failed in dag { ti.dag_id }, error: {ti.xcom_pull(key='error')} ")
#call function to publish kafka message
def task_success_callback(context):
ti = context['task_instance']
print(f"Task {ti.task_id } has succeeded in dag { ti.dag_id }.")
#call function to publish kafka message
def dag_success_callback(context):
dag_status = f"DAG has succeeded, run_id: {context['run_id']}"
print(dag_status)
Variable.set("TEST_CALLBACK_DAG_STATUS", dag_status)
#call function to publish kafka message
def dag_failure_callback(context):
ti = context['task_instance']
dag_status = f"DAG has failed, run_id: {context['run_id']}, task id: {ti.task_id}"
print(dag_status)
Variable.set("TEST_CALLBACK_DAG_STATUS", dag_status)
#call function to publish kafka message
def user_func1(ti):
try:
input_val = int(Variable.get("TEST_CALLBACK_INPUT", 0))
if input_val % 10 == 0:
raise ValueError("Invalid Input")
except Exception as e:
ti.xcom_push(key="error", value=str(e))
raise e
def user_func2(ti):
try:
input_val = int(Variable.get("TEST_CALLBACK_INPUT", 0))
if input_val % 2 == 0:
raise ValueError("Invalid Input")
except Exception as e:
ti.xcom_push(key="error", value=str(e))
raise e
# pass
default_args = {
"on_success_callback": None,
"on_failure_callback": dag_failure_callback,
}
with DAG(
dag_id="test_callbacks_dag",
default_args=default_args,
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
catchup=False,
) as dag:
task1 = PythonOperator(task_id="task1", python_callable=user_func1)
task2 = PythonOperator(task_id="task2", python_callable=user_func2)
task3 = DummyOperator(task_id="task3", on_success_callback=task_success_callback)
[task1, task2] >> task3
Airflow parallel tasks failure logs:
[2022-10-08, 00:10:51 IST] {logging_mixin.py:115} INFO - DAG has failed, run_id: manual__2022-10-07T18:40:50.355282+00:00, task id: task1
[2022-10-08, 00:10:51 IST] {logging_mixin.py:115} INFO - DAG has failed, run_id: manual__2022-10-07T18:40:50.355282+00:00, task id: task2
As mentioned above task1 and task2 are parallel tasks. I have used callback functions to trigger respective Kafka messages. for the success scenario, its triggers one success message during the final task. The problem is during the failure tasks mainly when tasks are run in parallel. if task1 & task2 both tasks failed during the parallel run, airflow triggered TWO on_failure_callback for task1 & task2. I agree that this should be the behavior of airflow. But for my requirement, I don't want to trigger multiple on_failure_callback. when its triggers the first on_failure_callback, it should not trigger the next callback, since the receiver side was designed to handle single error scenarios, not multiple/ batch errors.
I have written kafka message call function under on_failure_callback function (dag_failure_callback) if my first task1 has failed, it triggered one message to kafka topic, same time if task2 also got failed, it triggered the second message to the same kafka topic, I could not handle it since both are running parallel as well independent. I want to stop when the first kafka publish on the topic, don't want to trigger kafka messages for further failures.
Please suggest, how can I restrict the on_failure_callback during the parallel tasks failures.
from on_failure_callback triggered multiple times
No comments:
Post a Comment