Wednesday, 3 February 2021

Passing variables through Cloud Functions to a container using KubernetesPodOperator on Cloud Composer

I am trying to get the event and context variables data from the background functions run on Google Cloud Functions and pass the values through to a container running the KubernetesPodOperator on Cloud Composer / Airflow.

The first section of code is my cloud function which triggers a dag called gcs_to_pubsub_topic_dag, what I would like to pass over and access is the data in json, specifically the "conf": event data.

#!/usr/bin/env python
# coding: utf-8

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'

def trigger_dag(event, context=None):
    client_id = '###############.apps.googleusercontent.com'
    webserver_id = '###############'
    # The name of the DAG you wish to trigger
    dag_name = 'gcs_to_pubsub_topic_dag'
    webserver_url = (
        'https://'
        + webserver_id
        + '.appspot.com/api/experimental/dags/'
        + dag_name
        + '/dag_runs'
    )
    print(f' This is my webserver url: {webserver_url}')
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(
        webserver_url, client_id, method='POST', json={"conf": event, "replace_microseconds": 'false'})

def make_iap_request(url, client_id, method='GET', **kwargs):

    if 'timeout' not in kwargs:
        kwargs['timeout'] = 90

    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    resp = requests.request(
        method, url,
        headers={'Authorization': 'Bearer {}'.format(
            google_open_id_connect_token)}, **kwargs)
    if resp.status_code == 403:
        raise Exception('Service account does not have permission to '
                        'access the IAP-protected application.')
    elif resp.status_code != 200:
        raise Exception(
            'Bad response from application: {!r} / {!r} / {!r}'.format(
                resp.status_code, resp.headers, resp.text))
    else:
        return resp.text

def main(event, context=None):
    """
    Call the main function, sets the order in which to run functions.
    """
    
    trigger_dag(event, context=None)

    return 'Script has run without errors !!'

if (__name__ == "__main__"):
    main()

The dag that is triggered runs this KubernetesPodOperator code:

kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id=TASK_ID,
    # Name of task you want to run, used to generate Pod ID.
    name=TASK_ID,
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=[f'python3', 'execution_file.py'],
    # The namespace to run within Kubernetes, default namespace is `default`.
    namespace=KUBERNETES_NAMESPACE,
    # location of the docker image on google container repository
    image=f'eu.gcr.io/{GCP_PROJECT_ID}/{CONTAINER_ID}:{IMAGE_VERSION}',
    #Always pulls the image before running it.
    image_pull_policy='Always',
    # The env_var template variable allows you to access variables defined in Airflow UI.
    env_vars = {'GCP_PROJECT_ID':GCP_PROJECT_ID,'DAG_CONF':},
    dag=dag)

And then finally I want to get DAG_CONF to print within the called container image execution_file.py script :

#!/usr/bin/env python
# coding: utf-8

from gcs_unzip_function import main as gcs_unzip_function
from gcs_to_pubsub_topic import main as gcs_to_pubsub_topic
from os import listdir, getenv

GCP_PROJECT_ID = getenv('GCP_PROJECT_ID')
DAG_CONF = getenv('DAG_CONF')

print('Test run')
    
print(GCP_PROJECT_ID)

print (f'This is my dag conf {DAG_CONF}')
 
print(type(DAG_CONF))    

At the moment the code triggers the dag and returns:

Test run

GCP_PROJECT_ID (this is set in the airflow environment variables)

This is my dag conf None

class 'NoneType

where as I would like DAG_CONF to come through



from Passing variables through Cloud Functions to a container using KubernetesPodOperator on Cloud Composer

No comments:

Post a Comment