Wednesday 9 August 2023

BigQuery Storage API: what happened to AppendRowsStream

Seeing no Errors but no Data Submitted

I am trying to use the new Python BigQuery Storage API (google.cloud.bigquery_storage instead of google.cloud.bigquery_storage_v1 or google.cloud.bigquery_storage_v1beta2), as can be seen here.

But there are no end-to-end documents or examples to write to BigQuery using the new API (which it seems is version 2, but is not explicitly referred to as version 2 other than in that migration document).

If I look at version 1 examples like this one or this one, they use the AppendRowsStream method. That method does not exist anywhere (that I can find) in version 2.

I am trying to use version 2, as it seems the right one to use since it is fully released, but without much documentation and with breaking changes that I cannot understand, I'm unable to get a publish to work. I am running code which has no errors, both an Async and a Sync version, but neither are publishing anything. They just run -- without errors -- and complete with no data being pushed to the table.

Code

The code is quite long, and I can publish it, but for now I will just publish the high level flow (I am publishing a batch using a PENDING type stream):

  1. Create a bigquery client: client = google.cloud.bigquery_storage.BigQueryWriteClient()
  2. Create a write stream request: request = google.cloud.bigquery_storage.CreateWriteStreamRequest(parent="<string_with_full_table_path", write_stream=google.cloud.bigquery_storage.WriteStream(type_=google.cloud.bigquery_storage.WriteStream.PENDING)
  3. Create a write stream: stream = client.create_write_stream(request=request)
  4. Create an iterator of append rows requests. This is just a def - for - yield wrapper around the append rows request immediately below.
  5. Create an append rows request: request = google.cloud.bigquery_storage.AppendRowsRequest(write_stream=stream.name, proto_rows=proto_data)
    • I've skipped describing proto_data, but it was written correctly because I could inspect it. It is a google.cloud.bigquery_storage.AppendRowsRequest.ProtoData object with the proper writer_schema and rows data attached to it.
  6. Iterate through the iterator. It seems from the examples I showed above that all which is necessary is to iterate through them.
  7. Finalize the stream: client.finalize_write_stream(name=stream.name)
  8. Create a batch write stream request: batch_commit_request = google.cloud.bigquery_storage.BatchCommitWriteStreamsRequest( parent=table_path, write_streams=[stream.name] )
  9. Batch commit the stream: commit_response = client.batch_commit_write_streams(request=batch_commit_request)

Analysis

I have sprinkled in logs, and I can see in the logs that the data is being generated and is within the Protobufs and the Rows. I am also getting proper responses from the client object. But I am not getting any data pushed.

The only thing that I can see that I am doing differently from the examples I pointed to earlier is that I am not including the AppendRowsStream action. For instance, one of the examples I linked to has these sections:

from google.cloud.bigquery_storage_v1 import writer
.
.
.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
.
.
.
response_future_1 = append_rows_stream.send(request)
.
.
.
append_rows_stream.close()

This is the only reason that I am seeing why my data might not be arriving in the table: without the stream ever closed, I am "committing" something that was never finished. Maybe there are "two steps" to ensure the commit, so that when I am committing an unclosed stream, I am committing something that is effectively empty, because it is trying to ensure ACID compliance.

But I cannot find a way to create -- or therefore submit data to or close -- such an AppendRowsStream. This is the only wrinkle I am seeing why my version might not be working.

Summary

Two questions:

  1. Is there an AppendRowsStream in the new google.cloud.bigquery_storage API? If so, where is it hidden?
  2. Is there anything else you can see that I might be doing wrong?


from BigQuery Storage API: what happened to AppendRowsStream

No comments:

Post a Comment