I've written a custom operator (DataCleaningOperator), which corrects JSON data based on a provided schema.
The unit tests previously worked when I didn't have to instatiate a TaskInstance and provide the operator with a context. However, I've updated the operator recently to take in a context (so that it can use xcom_push).
Here is an example of one of the tests:
DEFAULT_DATE = datetime.today()
class TestDataCleaningOperator(unittest.TestCase):
"""
Class to execute unit tests for the operator 'DataCleaningOperator'.
"""
def setUp(self) -> None:
super().setUp()
self.dag = DAG(
dag_id="test_dag_data_cleaning",
schedule_interval=None,
default_args={
"owner": "airflow",
"start_date": DEFAULT_DATE,
"output_to_xcom": True,
},
)
self._initialise_test_data()
def _initialize_test_data() -> None:
# Test data set here as class variables such as self.test_data_correct
...
def test_operator_cleans_dataset_which_matches_schema(self) -> None:
"""
Test: Attempt to clean a dataset which matches the provided schema.
Verification: Returns the original dataset, unchanged.
"""
task = DataCleaningOperator(
task_id="test_operator_cleans_dataset_which_matches_schema",
schema_fields=self.test_schema_nest,
data_file_object=deepcopy(self.test_data_correct),
dag=self.dag,
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
result: List[dict] = task.execute(ti.get_template_context())
self.assertEqual(result, self.test_data_correct)
However, when the tests are run, the following error is raised:
airflow.exceptions.DagRunNotFound: DagRun for 'test_dag_data_cleaning' with date 2022-02-22 12:09:51.538954+00:00 not found
This is related to the line in which a task instance is instantiated in test_operator_cleans_dataset_which_matches_schema.
Why can't Airflow locate the test_dag_data_cleaning DAG? Is there a specific configuration I've missed? Do I need to also create a DAG run instance or add the DAG to the dag bag manually if this test dag is outide of my standard DAG directory? All normal (non-test) dags in my dag dir run correctly.
In case it helps, my current Airflow version is 2.2.3 and the structure of my project is:
airflow
├─ dags
├─ plugins
| ├─ ...
| └─ operators
| ├─ ...
| └─ data_cleaning_operator.py
|
└─ tests
├─ ...
└─ operators
└─ test_data_cleaning_operator.py
from Dag run not found when unit testing a custom operator in Airflow
No comments:
Post a Comment