I would like to materialize query from BigQuery with function
def events_ga4(self):
EVENTS_TARGET = 'tests.events'
# last_date = self.bq.get_last_push_date(EVENTS_TARGET, 'session_date')
# if self.is_already_done_for_today(last_date):
# self.log(f'{EVENTS_TARGET} already done, nothing changed')
# return False
self.log(f'start {EVENTS_TARGET}')
# dates = get_data_ragne(last_date)
dates = [(datetime.datetime.today() - datetime.timedelta(1)).date().strftime("%Y%m%d")]
for date in dates:
self.log(f'downloading {date}')
query = f'''
select user_pseudo_id, parse_date('%Y%m%d', event_date) session_date,
event_timestamp, event_name, event_params, ecommerce.transaction_id
FROM `ga4.events_{date}`
where user_pseudo_id is not null
'''
self.log('query')
data = self.bq.query(query)
self.log('transforming to arrow')
data_to_update = data.to_arrow()
self.log('save to parquet file')
Path('reports').mkdir(exist_ok=True)
big_file = Path(f'reports/events_{date}.parquet')
pq.write_table(data_to_update, big_file, compression=None)
self.log('load')
print(pq.read_schema(big_file))
# self.get_table_schema('testy_ab_events.json')
self.bq.load_from_parquet(EVENTS_TARGET, big_file, 'append')
self.log('deleting big file')
big_file.unlink()
self.log(f'done {EVENTS_TARGET} for {date}')
Here is the function to load data to BQ:
def load_from_parquet(self, table_id: str, path_to_csv: Path, mode: str, schema: dict = None) -> str:
with open(path_to_csv, 'rb') as f:
return f"{self._load_to_bq(table_id=table_id, file=f, mode=mode, file_type='parquet', schema=schema)}, many rows loaded."
def _load_to_bq(self, table_id, file, mode, file_type='nd json', schema=None) -> str:
# job config
# https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload
source_formats = {
'nd json': bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
'csv': bigquery.SourceFormat.CSV,
'parquet': bigquery.SourceFormat.PARQUET
}
job_config = bigquery.LoadJobConfig(
source_format=source_formats[file_type]
)
if schema:
job_config.schema = self.format_schema(schema)
else:
job_config.autodetect = True
if mode == 'append':
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
elif mode == 'turncate':
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
elif mode == 'empty':
job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
else:
raise ValueError(f'{mode} is not valid mode. Choose append, turncate or empty')
Here is the json scheme:
[{
"name": "event_params",
"type": "RECORD",
"mode": "REPEATED",
"fields": [{
"name": "key",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "value",
"type": "RECORD",
"mode": "NULLABLE",
"fields": [{
"name": "string_value",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "int_value",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "float_value",
"type": "FLOAT",
"mode": "NULLABLE"
},
{
"name": "double_value",
"type": "FLOAT",
"mode": "NULLABLE"
}
]
}
]
}
]
and I'm still getting error:
google.api_core.exceptions.BadRequest: 400 400 Provided Schema does not match Table analityka-269913:testy_ab.events. Cannot add fields (field: event_params.list)
errors[]:
{'reason': 'invalid', 'message': 'Provided Schema does not match Table analityka-269913:testy_ab.events. Cannot add fields (field: event_params.list)'}
I tried to read arrow from python function read_from_parquet but it took too much RAM, I would like to have this parquet thing to work, not by json nd :/
from Parquet file does not map correctly columns scheme
No comments:
Post a Comment