Saturday 26 February 2022

Dag run not found when unit testing a custom operator in Airflow

I've written a custom operator (DataCleaningOperator), which corrects JSON data based on a provided schema.

The unit tests previously worked when I didn't have to instatiate a TaskInstance and provide the operator with a context. However, I've updated the operator recently to take in a context (so that it can use xcom_push).

Here is an example of one of the tests:

DEFAULT_DATE = datetime.today()

class TestDataCleaningOperator(unittest.TestCase):    
    """
    Class to execute unit tests for the operator 'DataCleaningOperator'.
    """
    def setUp(self) -> None:
        super().setUp()
        self.dag = DAG(
            dag_id="test_dag_data_cleaning",
            schedule_interval=None,
            default_args={
                "owner": "airflow",
                "start_date": DEFAULT_DATE,
                "output_to_xcom": True,
            },
        )
        self._initialise_test_data()

    def _initialize_test_data() -> None:
        # Test data set here as class variables such as self.test_data_correct
        ...

    def test_operator_cleans_dataset_which_matches_schema(self) -> None:
        """
        Test: Attempt to clean a dataset which matches the provided schema.
        Verification: Returns the original dataset, unchanged.
        """
        task = DataCleaningOperator(
            task_id="test_operator_cleans_dataset_which_matches_schema",
            schema_fields=self.test_schema_nest,
            data_file_object=deepcopy(self.test_data_correct),
            dag=self.dag,
        )
        ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
        result: List[dict] = task.execute(ti.get_template_context())
        self.assertEqual(result, self.test_data_correct)

However, when the tests are run, the following error is raised:

airflow.exceptions.DagRunNotFound: DagRun for 'test_dag_data_cleaning' with date 2022-02-22 12:09:51.538954+00:00 not found

This is related to the line in which a task instance is instantiated in test_operator_cleans_dataset_which_matches_schema.

Why can't Airflow locate the test_dag_data_cleaning DAG? Is there a specific configuration I've missed? Do I need to also create a DAG run instance or add the DAG to the dag bag manually if this test dag is outide of my standard DAG directory? All normal (non-test) dags in my dag dir run correctly.

In case it helps, my current Airflow version is 2.2.3 and the structure of my project is:

airflow
├─ dags
├─ plugins
|  ├─ ...
|  └─ operators
|     ├─ ...
|     └─ data_cleaning_operator.py
|
└─ tests
   ├─ ...
   └─ operators
      └─ test_data_cleaning_operator.py


from Dag run not found when unit testing a custom operator in Airflow

No comments:

Post a Comment