Thursday, 2 September 2021

Start multiple batch Dataflow jobs from the same Cloud Function execution

I have created a custom template which reads from BigQuery using the ReadFromBigQuery I/O connector. I use it like this:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--query',
            help='Query to retrieve from BigQuery acting as data source.')
        parser.add_argument(
            '--bucket',
            default='mybucketname',
            help='Bucket name for staging, temp and schema files.')

options = PipelineOptions()
args = options.view_as(CustomOptions)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'myproject'
google_cloud_options.region = 'europe-west1'
google_cloud_options.staging_location = 'gs://{}/staging/'.format(args.bucket)
google_cloud_options.temp_location = 'gs://{}/tmp/'.format(args.bucket)
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(SetupOptions).setup_file = './setup.py'


def run():
    with beam.Pipeline(options=options) as p:
        (
            p
            | "Read from BigQuery" >> beam.io.ReadFromBigQuery(
                                          query=args.query,
                                          use_standard_sql=True,
                                          flatten_results=False)
            ...
        )

setup.py

import setuptools

REQUIRED_PACKAGES = [
    'apache-beam',
    'apache-beam[gcp]',
    'google-cloud-storage'
]

setuptools.setup(
    name='ProcessEmailMetrics',
    version='0.0.1',
    description='Workflow to process email metrics.',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True
)

Finally, this is how I make the Dataflow API call on my Cloud Function:

import google.auth
from googleapiclient.discovery import build

credentials, _ = google.auth.default()
service = build('dataflow', 'v1b3', credentials=credentials, cache_discovery=False)

query = """
SELECT ...
"""

BODY = {
    'jobName': 'process-data',
    'gcsPath': 'gs://mybucket/templates/my_template',
    'parameters': {
      'query' : query
    }
}

req = service.projects().locations().templates().create(
    projectId='myproject',
    location='europe-west1',
    body=BODY
)

req.execute()

I start the job by making an API call launching the template on a Cloud Function listening to a Pub/Sub topic. If I publish only one message on the topic, the pipeline finish without any error. However, if I start multiple jobs from the same Cloud Function execution, I get two different errors.

The first one is about a missing file. The first two errors were of this type:

HttpError accessing https://www.googleapis.com/storage/v1/b/my-bucket/o/tmp%2F6b2d2ba6-1%2Fbigquery-table-dump-000000000003.json?alt=media&generation=1628848711723613: response: <{'x-guploader-uploadid': 'ADPycdvNyinmSGSiYZPZw3GAJ4scmNLnGGsv5DUhowTZUYn_L6z9kMZ5b8oFWzPR2utFmTogffLijzmyfcJN_amILlmWQZa7aQ', 'content-type': 'text/html; charset=UTF-8', 'date': 'Fri, 13 Aug 2021 09:58:37 GMT', 'vary': 'Origin, X-Origin', 'expires': 'Fri, 13 Aug 2021 09:58:37 GMT', 'cache-control': 'private, max-age=0', 'content-length': '94', 'server': 'UploadServer', 'status': '404'}>, content <No such object: my-bucket/tmp/6b2d2ba6-1/bigquery-table-dump-000000000003.json>

The second one is an index error out of range, again when reading the avro files generated on ReadFromBigQuery. The next three errors were of this type:

2021-08-13 12:03:48.656 CESTError message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 48, in dataflow_worker.native_operations.NativeReadOperation.start File "/usr/local/lib/python3.7/site-packages/apache_beam/io/concat_source.py", line 84, in read for record in self._source_bundles[source_ix].source.read( IndexError: list index out of range

After this five errors happened, my pipeline failed and stopped.

It seems like the ReadFromBigQuery connector is looking for a temporal file containing some BigQuery rows which actually does not exist, or has been messed up.

As I have said, if I start only one Dataflow job it finishes without any error, so I have two hypothesis.

  1. It may be related with my Cloud Function. When two messages are published too close on time, the function does not have time to go to sleep, and maybe the files paths are messed that way.

    • Can this problem be produced by the cache_discovery=False option when creating the build Dataflow service?
  2. Maybe, it is due to how my template is coded:

    • May the options.view_as(SetupOptions).save_main_session = True option be the key of the problem?
    • May I need to provide somehow a specific temporal dataset for each job execution when reading from / writing to BigQuery?
    • A different temporal location on google_cloud_options.temp_location = 'gs://{}/tmp/'.format(args.bucket) for each job execution?

I need to be able to start several Dataflow jobs on the same Cloud Function execution, so the actual behaviour does not fit my project needs.

Here you are one of my failed job's ID: 2021-08-13_02_54_10-11165491620802897150.

Any idea how to fix this?



from Start multiple batch Dataflow jobs from the same Cloud Function execution

No comments:

Post a Comment