I recently observed significant differences in results between covariance computation in Pandas and the MLLib equivalent. Results are reasonably close for fully specified inputs (i.e. without any NAs) but deviate significantly for missing values. Pandas source explains how NAs are treated but I could not reproduce results using Spark. I could not find documentation on what exactly RowMatrix().computeCovariance()
does with regards to NAs in the source - but my Scala is very fair at best and I am unfamiliar with BLAS, perhaps I missed something. There is the BLAS warning for which I could not track down the reason since I am using a pre-build macOS Spark setup:
WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
Given the importance of covariance for many applications, I wonder if someone could shed some light on the exact treatment of missing values for covariance calculation in Apache Spark MLLib?
EDIT: Additionally, this is not resolved in the current Spark 3.2 release, since The method `pd.DataFrame.cov()` is not implemented yet
.
Assuming the following setup:
import pyspark
from pyspark.mllib.linalg.distributed import RowMatrix
spark = SparkSession.builder.appName("MyApp") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
sc = spark.sparkContext
good_rows = sc.parallelize([[11, 12, 13, 14, 16, 17, 18],
[21, 22, 23, 42, 26, 27, 28],
[31, 32, 33, 34, 36, 37, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[ 1, 2, 3, 4, 6, 7, 8]])
bad_rows = sc.parallelize([[11, 12, None, 14, 16, None, 18],
[21, 22, None, 42, 26, None, 28],
[31, 32, None, 34, 36, None, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[ 1, 2, 3, 4, 6, 7, 8]])
The covariance computed from good_rows
are equal for Pandas and Spark:
good_rows.toDF().toPandas().cov()
# Results in:
_1 _2 _3 _4 _5 _6 _7
_1 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_2 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_3 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_4 332.0 332.0 332.0 368.0 332.0 332.0 332.0
_5 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_6 350.0 350.0 350.0 332.0 350.0 350.0 350.0
_7 350.0 350.0 350.0 332.0 350.0 350.0 350.0
spark.createDataFrame(RowMatrix(good_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
_1 _2 _3 _4 _5 _6 _7
0 350.0 350.0 350.0 332.0 350.0 350.0 350.0
1 350.0 350.0 350.0 332.0 350.0 350.0 350.0
2 350.0 350.0 350.0 332.0 350.0 350.0 350.0
3 332.0 332.0 332.0 368.0 332.0 332.0 332.0
4 350.0 350.0 350.0 332.0 350.0 350.0 350.0
5 350.0 350.0 350.0 332.0 350.0 350.0 350.0
6 350.0 350.0 350.0 332.0 350.0 350.0 350.0
Running the same with the bad_rows
results in very different covariance matrices, unless Pandas is cov()
runs with min_periods=(bad_rows.count()/2)+1
bad_rows.toDF().toPandas().cov()
#Results in:
_1 _2 _3 _4 _5 _6 _7
_1 350.0 350.0 700.0 332.0 350.0 700.0 350.0
_2 350.0 350.0 700.0 332.0 350.0 700.0 350.0
_3 700.0 700.0 700.0 700.0 700.0 700.0 700.0
_4 332.0 332.0 700.0 368.0 332.0 700.0 332.0
_5 350.0 350.0 700.0 332.0 350.0 700.0 350.0
_6 700.0 700.0 700.0 700.0 700.0 700.0 700.0
_7 350.0 350.0 700.0 332.0 350.0 700.0 350.0
spark.createDataFrame(RowMatrix(bad_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
_1 _2 _3 _4 _5 _6 _7
0 350.0 350.0 NaN 332.0 350.0 NaN 350.0
1 350.0 350.0 NaN 332.0 350.0 NaN 350.0
2 NaN NaN NaN NaN NaN NaN NaN
3 332.0 332.0 NaN 368.0 332.0 NaN 332.0
4 350.0 350.0 NaN 332.0 350.0 NaN 350.0
5 NaN NaN NaN NaN NaN NaN NaN
6 350.0 350.0 NaN 332.0 350.0 NaN 350.0
bad_rows.toDF().toPandas().cov(min_periods=(bad_rows.count()/2)+1)
# With 50% of dataframe rows +1 Pandas equals the Spark result:
_1 _2 _3 _4 _5 _6 _7
_1 350.0 350.0 NaN 332.0 350.0 NaN 350.0
_2 350.0 350.0 NaN 332.0 350.0 NaN 350.0
_3 NaN NaN NaN NaN NaN NaN NaN
_4 332.0 332.0 NaN 368.0 332.0 NaN 332.0
_5 350.0 350.0 NaN 332.0 350.0 NaN 350.0
_6 NaN NaN NaN NaN NaN NaN NaN
_7 350.0 350.0 NaN 332.0 350.0 NaN 350.0
I did try to set None
to 0
and to mean
but could not reproduce the MLLib covariance results with these standard imputations, see below.
# Zero NA fill:
zeroed_na_rows = sc.parallelize([[11, 12, 0, 14, 16, 0, 18],
[21, 22, 0, 42, 26, 0, 28],
[31, 32, 0, 34, 36, 0, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[1, 2, 3, 4, 6, 7, 8]])
spark.createDataFrame(RowMatrix(zeroed_na_rows).computeCovariance().toArray().tolist()).toPandas()
# Results in:
_1 _2 _3 _4 _5 _6 _7
0 350.0 350.0 379.0 332.0 350.0 391.0 350.0
1 350.0 350.0 379.0 332.0 350.0 391.0 350.0
2 379.0 379.0 606.7 319.6 379.0 646.3 379.0
3 332.0 332.0 319.6 368.0 332.0 324.4 332.0
4 350.0 350.0 379.0 332.0 350.0 391.0 350.0
5 391.0 391.0 646.3 324.4 391.0 690.7 391.0
6 350.0 350.0 379.0 332.0 350.0 391.0 350.0
# Mean NA fill:
mean_rows = sc.parallelize([[11, 12, 27, 14, 16, 37, 18],
[21, 22, 27, 42, 26, 37, 28],
[31, 32, 27, 34, 36, 37, 38],
[41, 42, 43, 44, 46, 47, 48],
[51, 52, 53, 54, 56, 57, 58],
[ 1, 2, 3, 4, 6, 7, 8]])
spark.createDataFrame(RowMatrix(mean_rows).computeCovariance().toArray().tolist()).toPandas()
#Results in (still different from Pandas.cov()):
_1 _2 _3 _4 _5 _6 _7
0 350.0 350.0 298.0 332.0 350.0 280.0 350.0
1 350.0 350.0 298.0 332.0 350.0 280.0 350.0
2 298.0 298.0 290.8 287.2 298.0 280.0 298.0
3 332.0 332.0 287.2 368.0 332.0 280.0 332.0
4 350.0 350.0 298.0 332.0 350.0 280.0 350.0
5 280.0 280.0 280.0 280.0 280.0 280.0 280.0
6 350.0 350.0 298.0 332.0 350.0 280.0 350.0
If it's not that, what's going on here and how do I get Spark MLLib to produce reasonably similar results to Pandas?
from What is the Exact Apache-Spark NA Treatment Difference Pandas vs MLLib for Covariance Computation?
No comments:
Post a Comment