I have created a custom template which reads from BigQuery using the ReadFromBigQuery
I/O connector. I use it like this:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--query',
help='Query to retrieve from BigQuery acting as data source.')
parser.add_argument(
'--bucket',
default='mybucketname',
help='Bucket name for staging, temp and schema files.')
options = PipelineOptions()
args = options.view_as(CustomOptions)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'myproject'
google_cloud_options.region = 'europe-west1'
google_cloud_options.staging_location = 'gs://{}/staging/'.format(args.bucket)
google_cloud_options.temp_location = 'gs://{}/tmp/'.format(args.bucket)
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(SetupOptions).setup_file = './setup.py'
def run():
with beam.Pipeline(options=options) as p:
(
p
| "Read from BigQuery" >> beam.io.ReadFromBigQuery(
query=args.query,
use_standard_sql=True,
flatten_results=False)
...
)
setup.py
import setuptools
REQUIRED_PACKAGES = [
'apache-beam',
'apache-beam[gcp]',
'google-cloud-storage'
]
setuptools.setup(
name='ProcessEmailMetrics',
version='0.0.1',
description='Workflow to process email metrics.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
include_package_data=True
)
Finally, this is how I make the Dataflow API call on my Cloud Function:
import google.auth
from googleapiclient.discovery import build
credentials, _ = google.auth.default()
service = build('dataflow', 'v1b3', credentials=credentials, cache_discovery=False)
query = """
SELECT ...
"""
BODY = {
'jobName': 'process-data',
'gcsPath': 'gs://mybucket/templates/my_template',
'parameters': {
'query' : query
}
}
req = service.projects().locations().templates().create(
projectId='myproject',
location='europe-west1',
body=BODY
)
req.execute()
I start the job by making an API call launching the template on a Cloud Function listening to a Pub/Sub topic. If I publish only one message on the topic, the pipeline finish without any error. However, if I start multiple jobs from the same Cloud Function execution, I get two different errors.
The first one is about a missing file. The first two errors were of this type:
HttpError accessing https://www.googleapis.com/storage/v1/b/my-bucket/o/tmp%2F6b2d2ba6-1%2Fbigquery-table-dump-000000000003.json?alt=media&generation=1628848711723613: response: <{'x-guploader-uploadid': 'ADPycdvNyinmSGSiYZPZw3GAJ4scmNLnGGsv5DUhowTZUYn_L6z9kMZ5b8oFWzPR2utFmTogffLijzmyfcJN_amILlmWQZa7aQ', 'content-type': 'text/html; charset=UTF-8', 'date': 'Fri, 13 Aug 2021 09:58:37 GMT', 'vary': 'Origin, X-Origin', 'expires': 'Fri, 13 Aug 2021 09:58:37 GMT', 'cache-control': 'private, max-age=0', 'content-length': '94', 'server': 'UploadServer', 'status': '404'}>, content <No such object: my-bucket/tmp/6b2d2ba6-1/bigquery-table-dump-000000000003.json>
The second one is an index error out of range, again when reading the avro files generated on ReadFromBigQuery
. The next three errors were of this type:
2021-08-13 12:03:48.656 CESTError message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 48, in dataflow_worker.native_operations.NativeReadOperation.start File "/usr/local/lib/python3.7/site-packages/apache_beam/io/concat_source.py", line 84, in read for record in self._source_bundles[source_ix].source.read( IndexError: list index out of range
After this five errors happened, my pipeline failed and stopped.
It seems like the ReadFromBigQuery
connector is looking for a temporal file containing some BigQuery rows which actually does not exist, or has been messed up.
As I have said, if I start only one Dataflow job it finishes without any error, so I have two hypothesis.
-
It may be related with my Cloud Function. When two messages are published too close on time, the function does not have time to go to sleep, and maybe the files paths are messed that way.
- Can this problem be produced by the
cache_discovery=False
option when creating thebuild
Dataflow service?
- Can this problem be produced by the
-
Maybe, it is due to how my template is coded:
- May the
options.view_as(SetupOptions).save_main_session = True
option be the key of the problem? - May I need to provide somehow a specific temporal dataset for each job execution when reading from / writing to BigQuery?
- A different temporal location on
google_cloud_options.temp_location = 'gs://{}/tmp/'.format(args.bucket)
for each job execution?
- May the
I need to be able to start several Dataflow jobs on the same Cloud Function execution, so the actual behaviour does not fit my project needs.
Here you are one of my failed job's ID: 2021-08-13_02_54_10-11165491620802897150
.
Any idea how to fix this?
from Start multiple batch Dataflow jobs from the same Cloud Function execution
No comments:
Post a Comment