Streaming GroupBy for Large Datasets with Pandas

7 minute read

Published:

I came across an article about how to perform groupBy operation for large dataset. Long story short, the author proposes an approach called streaming groupBy where the dataset is divided into chunks and the groupBy operation is applied to each chunk. This approach is implemented with pandas.

For those who are curious about the article, you can find it here (Streaming Groupbys In Pandas For Big Datasets). It’s authored by Max Halford.

However, the operation is not merely applied to the whole chunk. There are several assumptions needed in order for this approach can work. One of the assumptions is that the dataset must consist of a column which acts as the grouping key, such as user ID. Another assumption is that this grouping key should be sorted first before the groupBy operation is performed.

Here’s a sample data taken from the author’s blog.

object_id   passband  flux    mjd
===================================
615         uu        52.91   59750
615         gg        381.95  59750
615         gg        384.18  59751
615         uu        153.49  59751
615         yy        -111.06 59750
713         yy        -180.23 59751
713         uu        61.06   59753
713         uu        107.64  59754
713         yy        -133.42 59752
713         uu        118.74  59755

In the above sample, object_id is the grouping key that has been sorted.

Below is the algorithm used to perform the streaming groupby:

  • Load the next k rows from a dataset
  • Identify the last group from the k rows
  • Put the m rows corresponding to the last group aside (called orphans)
  • Perform the groupBy on the remaining k - m rows
  • Repeat from step 1, and add the orphan rows at the top of the next chunk

Let’s simulate the above algorithm with the previous sample data.

Suppose that we’d like to compute the mean of the flux for each pair of object_id and passband. In pandas, we can achieve this by simply using df.groupby(['object_id', 'passband'])['flux'].mean(). However, since this instruction will load all the data into memory, it surely will be an issue when the size of the data is large.

How about applying the streaming groupby approach?

Let’s use k = 7 to simulate the algorithm.

i) Load k rows from the dataset.

object_id   passband  flux    mjd
===================================
615         uu        52.91   59750
615         gg        381.95  59750
615         gg        384.18  59751
615         uu        153.49  59751
615         yy        -111.06 59750
713         yy        -180.23 59751
713         uu        61.06   59753

ii) Identify the last group from the k rows. Basically, we just take the group ID of the last row. In this case, the last group ID is 713.

iii) Put the m rows corresponding to the last group aside (called as orphans). In this case, the value of m is 2 since there are two rows whose group ID is 713.

iv) Perform the groupBy on the remaining k - m rows. In this case, we perform the groupBy operation on the first 5 rows. The last 2 rows is discarded for the next chunk. For the sake of completeness, here’s the result of the groupBy operation.

group_id    passband    flux_mean
=================================
615         uu          103.2
615         gg          383.065
615         yy          -111.06

v) Repeat from step (i), and add the orphan rows at the top of the next chunk. The previous last two columns (group id 713) are added at the top of the next seven rows.

object_id   passband    flux      mjd
=======================================
713         yy          -180.23   59751	--> from the previous chunk
713         uu          61.06     59753	--> from the previous chunk
713         uu          107.64    59754
713         yy          -133.42   59752
713         uu          118.74    59755

The algorithm is quite simple, though. However, in my humble opinion, the used assumptions make this approach limited to particular cases. Additionally, requiring the dataset to be sorted first before applying the algorithm might be a critical issue when the data size is extremely large.

Apart from the assumptions, I was thinking of an extreme case which might introduce a critical risk for this approach.

Consider the following data.

row_id    object_id   passband    flux
======================================
1         100         uu          50
2         100         uu          50
3         100         vv          30
4         100         vv          30
...
50        100         ww          90
51        100         ww          90
52        100         ww          90
53        500         aa          10
54        500         bb          10
...
100       500         cc          30
101       500         cc          30
102       900         pp          10
103       900         qq          30
...
150       900         pp          10
151       900         qq          30

Based on the above sample data, we know that object_id = 100 occupies 52 rows, object_id = 500 occupies 49 rows, and object_id = 900 occupies 50 rows.

Suppose that we use k = 10 for this case.

Let’s execute the algorithm.

i) Take k rows from the dataset. This should result in the first 10 rows of the data.

ii) Identify the last group from the k rows. Basically, we just take the group ID of the last row. In this case, the last group ID is 100.

iii) Put the m rows corresponding to the last group aside (called as orphans). In this case, the value of m is 10 since there are ten rows whose group ID is 100.

iv) Perform the groupBy on the remaining k - m rows. In this case, we perform the groupBy operation on 0 rows. All 10 rows are left out for the next chunk.

The groupBy operation was not performed at all for the first chunk since all the rows are left out for the second chunk.

Here’s what the second chunk looks like.

row_id    object_id   passband    flux
======================================
1         100         uu          50
...
10        100         vv          50

--> The first 10 rows are from the first chunk

--> The next 10 rows are the actual data for the second chunk

11        100         ww          90
...
20        100         ww          90

When the algorithm is applied, we can see that there would be no groupBy operation performed on the second chunk since the third step discards all the rows again.

Consequently, we have to add this 20 rows at the top of the third chunk. Long story short, the third chunk now should consists of 30 rows with the same group id. Again, the groupBy operation won’t be performed on this chunk.

To make it short, we should be able to execute the groupBy operation on the sixth chunk. This sixth chunk should consists of the additional 50 rows from the first till fifth chunk. Since the next 10 rows have different group id (500), the groupBy operation can now be applied.

However, we need to remember that the groupBy operation will only be applied on rows with group id 100 since all the rows with group id 500 will be put aside for the next chunk.

The problem might occur when the discarded rows grow in large quantity that causes the data can’t be loaded into memory. Moreover, applying the algorithm over and over (which requires quite lots of checks) might introduce unnecessary performance issues.