I know of several ways to make calculations on an aggregation of a Spark dataframe column. Too many... and it is not clear which is "best" or how they can properly be generalized as I would like. I see some StackOverflow solutions that conflict with one or another recommendation by Spark / Databricks themselves, but none do precisely what I want.
Below is a baby snippet of code showing what I am trying to do (multi-column aggregation using somewhat arbitrary calculation), and the ways which work, but which I cannot intelligently compare or assess:
Dataset
df = sc.parallelize([
[100, 'A', 1],
[100, 'A', 2],
[101, 'A', 3],
[102, 'B', 4],
[103, 'B', 5],
[102, 'B', 6],
]).toDF(('id', 'grp', 'val'))
Using applyInPandas
def calc_q1(pddf):
newcol = pddf.val.quantile(q=0.25)
return pddf.assign(val_Q1 = newcol)
grp_df = df.groupBy('grp', 'id').applyInPandas(calc_q1, schema="grp string, id long, val long, val_Q1 double")
Which yields:
Pros:
- All Pandas functions are now available
- Arbitrary complexity / function
- This seems to be most recommended by Spark / Databricks
Cons:
- It seems hard-coding is required (i.e., I had to specify
val
in thecalc_q1
function). - I cannot manage to return more than one value (in the later examples, you will see that a list of results is returned).
- It is a pain to have to provide the output schema and also forces some hard-coding.
- Each aggregate (each dataframe passed to the function) must fit in memory since a Pandas function is being called
- This is not a concern for my work, but I'm curious: I would think this is a limit of all of the options I list, not just the Pandas option.
Using Windowing
from pyspark.sql import Window
import pyspark.sql.functions as F
grp_window = Window.partitionBy('id', 'grp')
quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')
grp_df = df.withColumn('med_val', quantiles.over(grp_window))
which yields:
Pros:
- Using
functions.expr
seems almost as "open" and utilitarian as being able to rely upon all of Pandas.
Cons:
- It is a bit more laborious
- Possibly also slower because of this?? I have not compared times.
- Using
Window
to perform aggregation somehow just feels "wrong". - I still cannot perform any expression I would like, just any PySpark expression.
Using groupBy
import pyspark.sql.functions as F
quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')
grp_df = df.groupBy('grp', 'id').agg(quantiles.alias('med_val'))
which yields:
Pros:
- This is what I already use for simpler calculations (ones that do not require UDFs or multi-level aggregations).
- It "feels" right, as the verbs that I am using
groupBy
andagg
align exactly with the concepts and semantics of what I am doing.
Cons:
- I don't know, but since this is an "older" way of doing things, I feel there must be a reason why the newer methods were made.
- I still cannot perform any expression I would like, just any PySpark expression.
What I ** Want to Achieve
I would **** like to find a way to be able to pass arguments to the aggregate function so that it could be generalized. But I cannot figure out how.
I would like to be able to do something like this:
def calc_quants(pddf, col, quantcol, quants):
pddf[quantcol] = pddf[col].quantile(q=quants) # Where `quants` is a list of values
return pddf
grp_df = (df.groupBy('grp', 'id')
.applyInPandas(calc_quants, val_quants, [0.25, 0.75],
schema="grp string, id long, val long, val_quants list"))
Being able to do something like what I have written above is my ultimate desire. I list all the methods I have discovered because it seems none of them are capable of doing what I want.
from PySpark UDF on multi-level aggregated data; how can I properly generalize this
No comments:
Post a Comment