Sunday, 6 September 2020

PySpark UDF on multi-level aggregated data; how can I properly generalize this

I know of several ways to make calculations on an aggregation of a Spark dataframe column. Too many... and it is not clear which is "best" or how they can properly be generalized as I would like. I see some StackOverflow solutions that conflict with one or another recommendation by Spark / Databricks themselves, but none do precisely what I want.

Below is a baby snippet of code showing what I am trying to do (multi-column aggregation using somewhat arbitrary calculation), and the ways which work, but which I cannot intelligently compare or assess:

Dataset

df = sc.parallelize([
    [100, 'A', 1],
    [100, 'A', 2],
    [101, 'A', 3],
    [102, 'B', 4],
    [103, 'B', 5],
    [102, 'B', 6],
]).toDF(('id', 'grp', 'val'))

Original Dataset

Using applyInPandas

def calc_q1(pddf):
  newcol = pddf.val.quantile(q=0.25)
  return pddf.assign(val_Q1 = newcol)

grp_df = df.groupBy('grp', 'id').applyInPandas(calc_q1, schema="grp string, id long, val long, val_Q1 double")

Which yields:

ApplyInPandas Option

Pros:

  • All Pandas functions are now available
    • Arbitrary complexity / function
  • This seems to be most recommended by Spark / Databricks

Cons:

  • It seems hard-coding is required (i.e., I had to specify val in the calc_q1 function).
  • I cannot manage to return more than one value (in the later examples, you will see that a list of results is returned).
  • It is a pain to have to provide the output schema and also forces some hard-coding.
  • Each aggregate (each dataframe passed to the function) must fit in memory since a Pandas function is being called
    • This is not a concern for my work, but I'm curious: I would think this is a limit of all of the options I list, not just the Pandas option.

Using Windowing

from pyspark.sql import Window
import pyspark.sql.functions as F

grp_window = Window.partitionBy('id', 'grp')
quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')

grp_df = df.withColumn('med_val', quantiles.over(grp_window))

which yields:

Using Windowing

Pros:

  • Using functions.expr seems almost as "open" and utilitarian as being able to rely upon all of Pandas.

Cons:

  • It is a bit more laborious
    • Possibly also slower because of this?? I have not compared times.
  • Using Window to perform aggregation somehow just feels "wrong".
  • I still cannot perform any expression I would like, just any PySpark expression.

Using groupBy

import pyspark.sql.functions as F

quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')

grp_df = df.groupBy('grp', 'id').agg(quantiles.alias('med_val'))

which yields:

Using Group By

Pros:

  • This is what I already use for simpler calculations (ones that do not require UDFs or multi-level aggregations).
  • It "feels" right, as the verbs that I am using groupBy and agg align exactly with the concepts and semantics of what I am doing.

Cons:

  • I don't know, but since this is an "older" way of doing things, I feel there must be a reason why the newer methods were made.
  • I still cannot perform any expression I would like, just any PySpark expression.

What I ** Want to Achieve

I would **** like to find a way to be able to pass arguments to the aggregate function so that it could be generalized. But I cannot figure out how.

I would like to be able to do something like this:

def calc_quants(pddf, col, quantcol, quants):
  pddf[quantcol] = pddf[col].quantile(q=quants) # Where `quants` is a list of values
  return pddf

grp_df = (df.groupBy('grp', 'id')
            .applyInPandas(calc_quants, val_quants, [0.25, 0.75],
                           schema="grp string, id long, val long, val_quants list"))

Being able to do something like what I have written above is my ultimate desire. I list all the methods I have discovered because it seems none of them are capable of doing what I want.



from PySpark UDF on multi-level aggregated data; how can I properly generalize this

No comments:

Post a Comment