The Investigation of Skewness & Kurtosis in Spark (Scala)

5 minute read

Published:

Applying central moment functions in Spark might be tricky, especially for skewness and kurtosis.

A few weeks ago I was experimenting with these functions. Fortunately, the IDE shows all the relevant functions which might fit my needs. For instance, the IDE suggests var_pop and var_samp for variance calculation. Such suggestions might make us aware of our initial goal of leveraging variance functions (at least for me). The same goes for standard deviation which has stddev_pop and stddev_samp.

However, seems that such kind of feature isn’t available for skewness & kurtosis.

The available function is just skewness for skewness and kurtosis for kurtosis. The problem is that I don’t know whether such functions are for population (parameters) or sample (statistics) calculation.

I decided, once again, to take a dive into the Spark’s code itself.

Locating the Appropriate Modules

So I started from the most obvious location. Since all these central moment functions are included as Spark SQL functions, we basically summon them by the following.

import org.apache.spark.sql.functions

val mean = df.agg(avg(columnName)).head.get(0).asInstanceOf[Double]
val stddev_pop = df.agg(stddev_pop(columnName)).head.get(0).asInstanceOf[Double]
val stddev_samp = df.agg(stddev_samp(columnName)).head.get(0).asInstanceOf[Double]
val var_pop = df.agg(var_pop(columnName)).head.get(0).asInstanceOf[Double]
val var_samp = df.agg(var_samp(columnName)).head.get(0).asInstanceOf[Double]

Therefore, we can start from functions.scala module which is located here.

However, there is no any implementation for each function.

Looking further at the imported modules, seems that there might be a chance that the implementation are included there. The followings might be the best possible ones.

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._

Long story short, after searching for the correct location of sql.catalyst.expressions, I managed to find it here.

There is no any implementation for the central moment functions there. However, recall that there is also an import for sql.catalyst.expressions.aggregate._ there. And yes, there’s indeed a package called aggregate which stores all the statistical modules.

Since our need is specified for central moments, just click on the CentralMomentAgg.scala module.

From there, the investigation began.

The Investigation

As you might have seen, this module provides the central moment functions, such as standard deviation, variance, skewness, and kurtosis. As expected, both of the standard deviation and variance have two separated classes, one is for parameter while the other is for statistic.

Back to our original question, how about skewness and kurtosis?

Well, as you can see, the class names don’t inform us whether each of them acts as a parameter or statistic. Just take a deeper look at the applied formula.

Starting from skewness, recall that the population skewness formula is given by the following.

skewness_pop = E [((X - mu_pop) / stddev_pop) ^ 3]

X: the random variable
mu_pop: population mean
stddev_pop: population standard deviation

The implemented formula in CentralMomentAgg.scala is given as the following.

sqrt(n) * m3 / sqrt(m2 * m2 * m2)

where if m refers to (X - mu), then m2 refers to (X - mu)^2 and m3 refers to (X - mu)^3.

If you take a closer look, you might find that after replacing stddev_pop in skewness_pop with its formula and simplifying the skewness_pop equation, the implemented formula is the simplified version of skewness_pop.

To support the argument, recall that the sample skewness formula is given by the following.

skewness_samp = SUM(i=1 to n) [(xi - mu_samp) ^ 3] / n
                --------------------------------------
                           stddev_samp ^ 3

After replacing stddev_samp with the appropriate value (recall it has n-1 factor) and simplifying the above equation, we don’t end up with the implemented formula.

Therefore, we may conclude that the implemented skewness in CentralMomentAgg.scala refers to the population skewness.

I performed the same step for kurtosis.

Recall that the population excess kurtosis is given as the following.

kurtosis_pop = E [((X - mu_pop) / stddev_pop) ^ 4]

excess_kurtosis_pop = kurtosis_pop - 3.0

X: the random variable
mu_pop: population_mean
stddev_pop: population standard deviation

The implemented formula in CentralMomentAgg.scala is given as the following.

n * m4 / (m2 * m2) - 3.0

where if m refers to (X - mu), then m2 refers to (X - mu)^2 and m4 refers to (X - mu)^4.

If you take a closer look, you might find that after replacing stddev_pop in kurtosis_pop with its formula and simplifying the kurtosis_pop equation, the implemented formula is the simplified version of excess_kurtosis_pop.

To support the argument, recall that the sample kurtosis formula is given by the following.

kurtosis_samp =   SUM(i=1 to n) [(xi - mu_samp) ^ 4] / n
                  --------------------------------------
                             stddev_samp ^ 4

excess_kurtosis_samp = kurtosis_samp - 3.0

After replacing stddev_samp with the appropriate value (recall it has n-1 factor) and simplifying the above equation, we don’t end up with the implemented formula.

Therefore, we may conclude that the implemented kurtosis in CentralMomentAgg.scala refers to the population kurtosis, specifically the population excess kurtosis.

The Implementation in Spark (Scala)

Knowing that both of skewness and kurtosis are for population, I decided to implement the sample version for both of them in Spark (Scala).

With this, I presume that calculateMean and calculateStdDevSamp are already implemented.

Sample Skewness

def calculateSkewnessSamp(df: DataFrame, column: String): Double = {
  val observedDf = df.filter(!F.isnull(F.col(column)) && !F.isnan(F.col(column)))

  val observationsMean = calculateMean(observedDf, column)
  val observationsStdDev = calculateStdDevSamp(observedDf, column)

  val totalObservations = observedDf.count()

  val thirdCentralSampleMoment =
    observedDf
      .agg(F.sum(F.pow(F.col(column) - observationsMean, 3) / totalObservations))
      .head
      .get(0)
      .asInstanceOf[Double]
  val thirdPowerOfSampleStdDev = scala.math.pow(observationsStdDev, 3)

  thirdCentralSampleMoment / thirdPowerOfSampleStdDev
}

Sample Excess Kurtosis

def calculateExcessKurtosisSamp(df: DataFrame, column: String): Double = {
  val observedDf = df.filter(!F.isnull(F.col(column)) && !F.isnan(F.col(column)))

  val observationsMean = calculateMean(observedDf, column)
  val observationsStdDev = calculateStdDevSamp(observedDf, column)

  val totalObservations = observedDf.count()

  val fourthCentralSampleMoment =
    observedDf
      .agg(F.sum(F.pow(F.col(column) - observationsMean, 4) / totalObservations))
      .head
      .get(0)
      .asInstanceOf[Double]
  val fourthPowerOfSampleStdDev = scala.math.pow(observationsStdDev, 4)

  (fourthCentralSampleMoment / fourthPowerOfSampleStdDev) - 3.0
}