Monday, 6 January 2020

Airflow, calling dags from a dag causes duplicate dagruns

I have the below "Master" DAG. I want to call the associated DAGs as per the downstream section at the bottom.

However, what happens, is that the first DAG gets called four times, and the other three runs for a microsecond (Not enough to actually perform) and everything comes back green.

How do I get to behave in the direction the downstream section means?

from airflow import models
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime

args = {
    'owner': 'Airflow',
    'start_date': datetime(2019, 12, 29),
    'email': ['email@gmail.com'],
    'email_on_failure': True,
    'email_on_success': True,
}

dag = models.DAG(
    dag_id='Offrs_People_Master_V1',
    default_args=args,
    schedule_interval='0 0 * * *',
)

Ingest_People = TriggerDagRunOperator(
    task_id='Ingest_People_PE_and_PP_Into_GBQ',
    trigger_dag_id='Offrs_People_Ingestion',
    dag=dag
)

Ingest_DataFinder = TriggerDagRunOperator(
    task_id='API_DataFinder_Lookup_Ingestion_V1',
    trigger_dag_id='Offrs_People_Ingestion',
    dag=dag
)

Ingest_EmailOversight = TriggerDagRunOperator(
    task_id='API_Emailoversight_Lookup_Ingestion_V1',
    trigger_dag_id='Offrs_People_Ingestion',
    dag=dag
)

Publish_People = TriggerDagRunOperator(
    task_id='Publise_PE_and_PP_to_MSSQL_DB3',
    trigger_dag_id='Offrs_People_Ingestion',
    dag=dag
)

Ingest_People >> [Ingest_DataFinder, Ingest_EmailOversight] >> Publish_People

Thanks.

Log for first task (Ingest_People):

*** Reading remote log from gs://airflowlogs/Offrs_People_Master_V1/Ingest_People_PE_and_PP_Into_GBQ/2020-01-04T00:00:00+00:00/1.log.
*** Previous log discarded: 404 GET https://www.googleapis.com/download/storage/v1/b/airflowlogs/o/Offrs_People_Master_V1%2FIngest_People_PE_and_PP_Into_GBQ%2F2020-01-04T00%3A00%3A00%2B00%3A00%2F1.log?alt=media: ('Request failed with status code', 404, 'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PARTIAL_CONTENT: 206>)

[2020-01-05 00:00:08,693] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: Offrs_People_Master_V1.Ingest_People_PE_and_PP_Into_GBQ 2020-01-04T00:00:00+00:00 [queued]>
[2020-01-05 00:00:08,726] {taskinstance.py:620} INFO - Dependencies all met for <TaskInstance: Offrs_People_Master_V1.Ingest_People_PE_and_PP_Into_GBQ 2020-01-04T00:00:00+00:00 [queued]>
[2020-01-05 00:00:08,727] {taskinstance.py:838} INFO - 
--------------------------------------------------------------------------------
[2020-01-05 00:00:08,727] {taskinstance.py:839} INFO - Starting attempt 1 of 1
[2020-01-05 00:00:08,727] {taskinstance.py:840} INFO - 
--------------------------------------------------------------------------------
[2020-01-05 00:00:08,796] {taskinstance.py:859} INFO - Executing <Task(TriggerDagRunOperator): Ingest_People_PE_and_PP_Into_GBQ> on 2020-01-04T00:00:00+00:00
[2020-01-05 00:00:08,796] {base_task_runner.py:133} INFO - Running: ['airflow', 'run', 'Offrs_People_Master_V1', 'Ingest_People_PE_and_PP_Into_GBQ', '2020-01-04T00:00:00+00:00', '--job_id', '43858', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/Offrs_People_MASTER.py', '--cfg_path', '/tmp/tmp7of6dc4i']
[2020-01-05 00:00:12,421] {base_task_runner.py:115} INFO - Job 43858: Subtask Ingest_People_PE_and_PP_Into_GBQ [2020-01-05 00:00:12,411] {settings.py:213} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=14172
[2020-01-05 00:00:13,193] {base_task_runner.py:115} INFO - Job 43858: Subtask Ingest_People_PE_and_PP_Into_GBQ [2020-01-05 00:00:13,192] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-01-05 00:00:15,011] {base_task_runner.py:115} INFO - Job 43858: Subtask Ingest_People_PE_and_PP_Into_GBQ [2020-01-05 00:00:15,006] {dagbag.py:90} INFO - Filling up the DagBag from /home/airflow/airflow/dags/Offrs_People_MASTER.py
[2020-01-05 00:00:15,221] {base_task_runner.py:115} INFO - Job 43858: Subtask Ingest_People_PE_and_PP_Into_GBQ [2020-01-05 00:00:15,221] {cli.py:516} INFO - Running <TaskInstance: Offrs_People_Master_V1.Ingest_People_PE_and_PP_Into_GBQ 2020-01-04T00:00:00+00:00 [running]> on host airflow-pipeline.c.mother-216719.internal
[2020-01-05 00:00:15,386] {logging_mixin.py:95} INFO - [[34m2020-01-05 00:00:15,386[0m] {[34mdagbag.py:[0m90} INFO[0m - Filling up the DagBag from [1m/home/airflow/airflow/dags/Offrs_People_Ingestion.py[0m[0m
[2020-01-05 00:00:17,353] {logging_mixin.py:95} WARNING - /usr/local/lib/python3.7/site-packages/airflow/operators/gcs_to_bq.py:185: PendingDeprecationWarning: Invalid arguments were passed to GoogleCloudStorageToBigQueryOperator (task_id: OffrsPE_GCS_to_GBQ_Raw). Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'soft_fail': True}
  super().__init__(*args, **kwargs)
[2020-01-05 00:00:17,354] {logging_mixin.py:95} WARNING - /usr/local/lib/python3.7/site-packages/airflow/operators/gcs_to_bq.py:185: PendingDeprecationWarning: Invalid arguments were passed to GoogleCloudStorageToBigQueryOperator (task_id: OffrsPP_GCS_to_GBQ_Raw). Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'soft_fail': True}
  super().__init__(*args, **kwargs)
[2020-01-05 00:00:17,354] {logging_mixin.py:95} WARNING - /usr/local/lib/python3.7/site-packages/airflow/contrib/operators/bigquery_operator.py:210: DeprecationWarning: Deprecated parameter `bql` used in Task id: Set_PeopleEmail_Staging. Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow.
  category=DeprecationWarning)
[2020-01-05 00:00:17,355] {logging_mixin.py:95} WARNING - /usr/local/lib/python3.7/site-packages/airflow/contrib/operators/bigquery_operator.py:210: DeprecationWarning: Deprecated parameter `bql` used in Task id: Set_PeoplePhone_Staging. Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow.
  category=DeprecationWarning)
[2020-01-05 00:00:17,355] {logging_mixin.py:95} WARNING - /usr/local/lib/python3.7/site-packages/airflow/contrib/operators/bigquery_operator.py:210: DeprecationWarning: Deprecated parameter `bql` used in Task id: Set_PeopleEmail. Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow.
  category=DeprecationWarning)
[2020-01-05 00:00:17,355] {logging_mixin.py:95} WARNING - /usr/local/lib/python3.7/site-packages/airflow/contrib/operators/bigquery_operator.py:210: DeprecationWarning: Deprecated parameter `bql` used in Task id: Set_PeoplePhone. Use `sql` parameter instead to pass the sql to be executed. `bql` parameter is deprecated and will be removed in a future version of Airflow.
  category=DeprecationWarning)
[2020-01-05 00:00:18,623] {logging_mixin.py:95} INFO - [[34m2020-01-05 00:00:18,622[0m] {[34mlocal_task_job.py:[0m172} WARNING[0m - State of this instance has been externally set to [1msuccess[0m. Taking the poison pill.[0m
[2020-01-05 00:00:18,631] {helpers.py:307} INFO - Sending Signals.SIGTERM to GPID 14172
[2020-01-05 00:00:18,673] {helpers.py:285} INFO - Process psutil.Process(pid=14172, status='terminated') (14172) terminated with exit code -15
[2020-01-05 00:00:18,674] {logging_mixin.py:95} INFO - [[34m2020-01-05 00:00:18,674[0m] {[34mlocal_task_job.py:[0m105} INFO[0m - Task exited with return code 0[0m


from Airflow, calling dags from a dag causes duplicate dagruns

No comments:

Post a Comment