Adding Strictly Increasing ID to Spark Dataframes

3 minute read

Published:

Recently I was exploring ways of adding a unique row ID column to a dataframe. The requirement is simple: “the row ID should strictly increase with difference of one and the data order is not modified”.

One way to do this is by simply leveraging monotonically_increasing_id function. In accordance with its name, this function creates a sequence of number that strictly increases (delta f(x) > 0). The code would be look like the following.

from pyspark.sql import functions as F

df = df.withColumn("monotonic_row_id", F.monotonically_increasing_id())

However, this function might return a sequence of ID which a pretty large gap between two different row groups. For instance, the first row group consisting of row 0 to 4 has row IDs that start from 1 and finish at 5 (strictly increases and differs by 1). Meanwhile, the second row group consisting of row 5 to 9 might have row IDs that start from 8000000 and finish at 8000005 (strictly increases and differs by 1). The same rule applies to the proceeding row groups.

I think this result might somehow be an obstacle when the data size is extremely large. The extreme gap difference between row groups might introduce a problem related to overflow error.

The question is how to make the row ID increases strictly with difference of one between each row?

We might want to leverage windowing approach to accomplish the answer. The following is the code.

from pyspark.sql import functions as F

df = df.withColumn("row_id", 
		   F.row_number().over(Window.orderBy(<some_columns>))
     )

The result is much better than using monotonically_increasing_id function. It yields the expected output.

However, I think the main drawback lies on the use of orderBy as part of windowing. This indeed makes the row ID sorted in ascending way, yet the order of the original data has been modified.

Take a look at the following example.

ORIGINAL DATA
++++++++++++

column
======
row_p
row_c
row_b
row_a
row_d


DATA WITH ROW ID
+++++++++++++++

column	|  row_id
=================
row_a	|    1
row_b	|    2
row_c	|    3
row_d	|    4
row_p	|    5

The row ID strictly increases yet the data’s order has been changed. Generally, we don’t want this to happen since row_p should have row_id of 1 instead of 5.

Now, we’ve come to the fact that we don’t want our data’s order changed. However, applying such an approach requires us to select a column on which the sorting will be based.

What if there’s a column that has been sorted (ascending or descending) in the dataframe? In other words, we’re trying to use such a sorted column as the column on which the orderBy operation will be applied.

Turns out we can use the column generated by the monotonically_increasing_id function since it’s already in sorted manner.

To extend the windowing approach, let’s use the following code.

from pyspark.sql import functions as F
from pyspark.sql import Window

# Approach A
df = df.withColumn("row_id", 
		   F.row_number().over(Window.orderBy('monotonic_row_id'))
     )


# Approach B - we don't need to create the monotonic row ID beforehand
df = df.withColumn("row_id", 
		   F.row_number().over(Window.orderBy(F.monotonically_increasing_id()))
     )

This time, the resulting output fulfils our initial requirements—the row ID should strictly increase with difference of one and the data order is not modified.