Monday, 3 July 2023

Parquet file does not map correctly columns scheme

I would like to materialize query from BigQuery with function

def events_ga4(self):
        EVENTS_TARGET = 'tests.events'
        # last_date = self.bq.get_last_push_date(EVENTS_TARGET, 'session_date')
        # if self.is_already_done_for_today(last_date):
        #     self.log(f'{EVENTS_TARGET} already done, nothing changed')
        #     return False
        self.log(f'start {EVENTS_TARGET}')
        # dates = get_data_ragne(last_date)
        dates = [(datetime.datetime.today() - datetime.timedelta(1)).date().strftime("%Y%m%d")]
        for date in dates:
            self.log(f'downloading {date}')
            query = f'''
                    select user_pseudo_id, parse_date('%Y%m%d', event_date) session_date,
                    event_timestamp, event_name, event_params, ecommerce.transaction_id
                    FROM `ga4.events_{date}`
                    where user_pseudo_id is not null
            '''
            self.log('query')
            data = self.bq.query(query)
            self.log('transforming to arrow')
            data_to_update = data.to_arrow()
            self.log('save to parquet file')
            Path('reports').mkdir(exist_ok=True)
            big_file = Path(f'reports/events_{date}.parquet')
            pq.write_table(data_to_update, big_file, compression=None)
            self.log('load')
            print(pq.read_schema(big_file))
            # self.get_table_schema('testy_ab_events.json')
            self.bq.load_from_parquet(EVENTS_TARGET, big_file, 'append')
            self.log('deleting big file')
            big_file.unlink()
            self.log(f'done {EVENTS_TARGET} for {date}')

Here is the function to load data to BQ:

def load_from_parquet(self, table_id: str, path_to_csv: Path, mode: str, schema: dict = None) -> str:
        with open(path_to_csv, 'rb') as f:
            return f"{self._load_to_bq(table_id=table_id, file=f, mode=mode, file_type='parquet', schema=schema)}, many rows loaded."

    def _load_to_bq(self, table_id, file, mode, file_type='nd json', schema=None) -> str:
        # job config
        # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload
        source_formats = {
            'nd json': bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
            'csv': bigquery.SourceFormat.CSV,
            'parquet': bigquery.SourceFormat.PARQUET
        }
        job_config = bigquery.LoadJobConfig(
            source_format=source_formats[file_type]
        )
        if schema:
            job_config.schema = self.format_schema(schema)
        else:
            job_config.autodetect = True
        if mode == 'append':
            job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
        elif mode == 'turncate':
            job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
        elif mode == 'empty':
            job_config.write_disposition = bigquery.WriteDisposition.WRITE_EMPTY
        else:
            raise ValueError(f'{mode} is not valid mode. Choose append, turncate or empty')

Here is the json scheme:

[{
        "name": "event_params",
        "type": "RECORD",
        "mode": "REPEATED",
        "fields": [{
                "name": "key",
                "type": "STRING",
                "mode": "NULLABLE"
            },
            {
                "name": "value",
                "type": "RECORD",
                "mode": "NULLABLE",
                "fields": [{
                        "name": "string_value",
                        "type": "STRING",
                        "mode": "NULLABLE"
                    },
                    {
                        "name": "int_value",
                        "type": "INTEGER",
                        "mode": "NULLABLE"
                    },
                    {
                        "name": "float_value",
                        "type": "FLOAT",
                        "mode": "NULLABLE"
                    },
                    {
                        "name": "double_value",
                        "type": "FLOAT",
                        "mode": "NULLABLE"
                    }
                ]
            }
        ]
    }
]

and I'm still getting error:

google.api_core.exceptions.BadRequest: 400 400 Provided Schema does not match Table analityka-269913:testy_ab.events. Cannot add fields (field: event_params.list)
errors[]:
{'reason': 'invalid', 'message': 'Provided Schema does not match Table analityka-269913:testy_ab.events. Cannot add fields (field: event_params.list)'}

I tried to read arrow from python function read_from_parquet but it took too much RAM, I would like to have this parquet thing to work, not by json nd :/



from Parquet file does not map correctly columns scheme

No comments:

Post a Comment