Thursday, 14 January 2021

What is the best method to run several compuations on a large dataset with one time reading?

I am processing a very big dataset which includes 200 of compressed JSON files (each ~ 8G uncompressed) in Spark. I have created a main dataframe largeDF, and several additional dataframes to compute aggregates on nested attributes (which are arrays of structs). I want to perform a general stats computation (fill rates and group counts).

Each processing on the whole datasets takes ~20 minutes (to load the files, decompress, and perform aggregations). For 50 fields it takes ages because each time I am changing my criteria and run the queries with additional filters again and again.

I want to rely on the lazy evaluation of PySpark and avoid loading data several times, so I can create one complex aggregation and apply it once on the whole dataset, then convert all results to Pandas. Or better, if I can pre-define jobs and ask Spark to process them in parallel (load once, compute all), then return result for each job separately.

These are not my main ETL but I am trying to extract semantics of the dataset to write the actual ETL pipeline.

Compute 1: Calculate statistics and find fill rate for all fields:

stats = DF_large.describe().toPandas()

Compute 2: Process simple fields with categorical data:

def group_count(df, col, limit, sort, skip_null):
    """This function groups data-set on based on provided column[s], and counts each group."""
    if skip_null:
        df = df.where(df[col].isNotNull())
    if limit:
        df = df.limit(limit) 
    df = df.groupBy(col).count()
    if sort:
        df = df.sort(col, ascending=False)
    return df.toPandas()

aggregations = {}
for col in group_count_list_of_columns:
    aggregations[col] = group_count(largeDF, col, limit=0, skip_null=True, sort=False)

Compute 3: Count and calculate fill rate for nested fields:

def get_nested_fields(spDf, col : str, limit, othercols : tuple, stats = True):
    """This function unwinds a nested array field out of data-set based on provided column, and either returns the whole or statistics of it."""
    spDf = spDf.where(spDf[col].isNotNull())
    df = spDf.select(F.explode(col), *othercols)
    if limit:
        df = df.limit(limit)
    if stats:
        res = df.describe().toPandas()
    else:
        res = df.toPandas()
    return res

nested_fields_aggregate = {}
for col in nested_fields_lists:
    nested_fields_aggregate[col] = get_nested_field(largeDF, col, limit=10**4, othercols =['name', 'id', 'timestamp'], stats = True)

This requires the whole data-set to be read multiple times. The shapes are not the same so I cannot join. Theoretically there should be a way to reduce the time because none of the computations are dependent on each other.



from What is the best method to run several compuations on a large dataset with one time reading?

No comments:

Post a Comment