Monday 25 October 2021

What is the Exact Apache-Spark NA Treatment Difference Pandas vs MLLib for Covariance Computation?

I recently observed significant differences in results between covariance computation in Pandas and the MLLib equivalent. Results are reasonably close for fully specified inputs (i.e. without any NAs) but deviate significantly for missing values. Pandas source explains how NAs are treated but I could not reproduce results using Spark. I could not find documentation on what exactly RowMatrix().computeCovariance() does with regards to NAs in the source - but my Scala is very fair at best and I am unfamiliar with BLAS, perhaps I missed something. There is the BLAS warning for which I could not track down the reason since I am using a pre-build macOS Spark setup:

WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS

Given the importance of covariance for many applications, I wonder if someone could shed some light on the exact treatment of missing values for covariance calculation in Apache Spark MLLib?

EDIT: Additionally, this is not resolved in the current Spark 3.2 release, since The method `pd.DataFrame.cov()` is not implemented yet.

Assuming the following setup:

import pyspark
from pyspark.mllib.linalg.distributed import RowMatrix

spark = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()
sc = spark.sparkContext
good_rows = sc.parallelize([[11, 12, 13, 14, 16, 17, 18], 
                            [21, 22, 23, 42, 26, 27, 28],
                            [31, 32, 33, 34, 36, 37, 38],
                            [41, 42, 43, 44, 46, 47, 48],
                            [51, 52, 53, 54, 56, 57, 58],
                            [ 1,  2,  3,  4,  6,  7,  8]])
bad_rows = sc.parallelize([[11, 12, None, 14, 16, None, 18], 
                           [21, 22, None, 42, 26, None, 28],
                           [31, 32, None, 34, 36, None, 38],
                           [41, 42, 43, 44, 46, 47, 48],
                           [51, 52, 53, 54, 56, 57, 58],
                           [ 1,  2,  3,  4,  6,  7,  8]])

The covariance computed from good_rows are equal for Pandas and Spark:

good_rows.toDF().toPandas().cov()
# Results in:
       _1     _2     _3     _4     _5     _6     _7
_1  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_2  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_3  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_4  332.0  332.0  332.0  368.0  332.0  332.0  332.0
_5  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_6  350.0  350.0  350.0  332.0  350.0  350.0  350.0
_7  350.0  350.0  350.0  332.0  350.0  350.0  350.0

spark.createDataFrame(RowMatrix(good_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
      _1     _2     _3     _4     _5     _6     _7
0  350.0  350.0  350.0  332.0  350.0  350.0  350.0
1  350.0  350.0  350.0  332.0  350.0  350.0  350.0
2  350.0  350.0  350.0  332.0  350.0  350.0  350.0
3  332.0  332.0  332.0  368.0  332.0  332.0  332.0
4  350.0  350.0  350.0  332.0  350.0  350.0  350.0
5  350.0  350.0  350.0  332.0  350.0  350.0  350.0
6  350.0  350.0  350.0  332.0  350.0  350.0  350.0

Running the same with the bad_rowsresults in very different covariance matrices, unless Pandas is cov() runs with min_periods=(bad_rows.count()/2)+1

bad_rows.toDF().toPandas().cov()
#Results in: 
       _1     _2     _3     _4     _5     _6     _7
_1  350.0  350.0  700.0  332.0  350.0  700.0  350.0
_2  350.0  350.0  700.0  332.0  350.0  700.0  350.0
_3  700.0  700.0  700.0  700.0  700.0  700.0  700.0
_4  332.0  332.0  700.0  368.0  332.0  700.0  332.0
_5  350.0  350.0  700.0  332.0  350.0  700.0  350.0
_6  700.0  700.0  700.0  700.0  700.0  700.0  700.0
_7  350.0  350.0  700.0  332.0  350.0  700.0  350.0
spark.createDataFrame(RowMatrix(bad_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
      _1     _2  _3     _4     _5  _6     _7
0  350.0  350.0 NaN  332.0  350.0 NaN  350.0
1  350.0  350.0 NaN  332.0  350.0 NaN  350.0
2    NaN    NaN NaN    NaN    NaN NaN    NaN
3  332.0  332.0 NaN  368.0  332.0 NaN  332.0
4  350.0  350.0 NaN  332.0  350.0 NaN  350.0
5    NaN    NaN NaN    NaN    NaN NaN    NaN
6  350.0  350.0 NaN  332.0  350.0 NaN  350.0

bad_rows.toDF().toPandas().cov(min_periods=(bad_rows.count()/2)+1)
# With 50% of dataframe rows +1 Pandas equals the Spark result:
       _1     _2  _3     _4     _5  _6     _7
_1  350.0  350.0 NaN  332.0  350.0 NaN  350.0
_2  350.0  350.0 NaN  332.0  350.0 NaN  350.0
_3    NaN    NaN NaN    NaN    NaN NaN    NaN
_4  332.0  332.0 NaN  368.0  332.0 NaN  332.0
_5  350.0  350.0 NaN  332.0  350.0 NaN  350.0
_6    NaN    NaN NaN    NaN    NaN NaN    NaN
_7  350.0  350.0 NaN  332.0  350.0 NaN  350.0

I did try to set Noneto 0and to meanbut could not reproduce the MLLib covariance results with these standard imputations, see below.

# Zero NA fill:
zeroed_na_rows = sc.parallelize([[11, 12, 0, 14, 16, 0, 18], 
                       [21, 22, 0, 42, 26, 0, 28],
                       [31, 32, 0, 34, 36, 0, 38],
                       [41, 42, 43, 44, 46, 47, 48],
                       [51, 52, 53, 54, 56, 57, 58],
                       [1, 2, 3, 4, 6, 7, 8]])
spark.createDataFrame(RowMatrix(zeroed_na_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
      _1     _2     _3     _4     _5     _6     _7
0  350.0  350.0  379.0  332.0  350.0  391.0  350.0
1  350.0  350.0  379.0  332.0  350.0  391.0  350.0
2  379.0  379.0  606.7  319.6  379.0  646.3  379.0
3  332.0  332.0  319.6  368.0  332.0  324.4  332.0
4  350.0  350.0  379.0  332.0  350.0  391.0  350.0
5  391.0  391.0  646.3  324.4  391.0  690.7  391.0
6  350.0  350.0  379.0  332.0  350.0  391.0  350.0

# Mean NA fill:
mean_rows = sc.parallelize([[11, 12, 27, 14, 16, 37, 18], 
                           [21, 22, 27, 42, 26, 37, 28],
                           [31, 32, 27, 34, 36, 37, 38],
                           [41, 42, 43, 44, 46, 47, 48],
                           [51, 52, 53, 54, 56, 57, 58],
                           [ 1,  2,  3,  4,  6,  7,  8]])
spark.createDataFrame(RowMatrix(mean_rows).computeCovariance().toArray().tolist()).toPandas()
#Results in (still different from Pandas.cov()):
      _1     _2     _3     _4     _5     _6     _7
0  350.0  350.0  298.0  332.0  350.0  280.0  350.0
1  350.0  350.0  298.0  332.0  350.0  280.0  350.0
2  298.0  298.0  290.8  287.2  298.0  280.0  298.0
3  332.0  332.0  287.2  368.0  332.0  280.0  332.0
4  350.0  350.0  298.0  332.0  350.0  280.0  350.0
5  280.0  280.0  280.0  280.0  280.0  280.0  280.0
6  350.0  350.0  298.0  332.0  350.0  280.0  350.0

If it's not that, what's going on here and how do I get Spark MLLib to produce reasonably similar results to Pandas?



from What is the Exact Apache-Spark NA Treatment Difference Pandas vs MLLib for Covariance Computation?

No comments:

Post a Comment