Repartitioning Input Data Stream

3 minute read

Published:

Recently I played with a simple Spark Streaming application. Precisely, I investigated the behavior of repartitioning on different level of input data streams. For instance, we have two input data streams, such as linesDStream and wordsDStream. The question is, is the repartitioning result different if I repartition after linesDStream and after wordsDStream?

Here’s the code I used.

from pyspark import SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.streaming import StreamingContext


def getSparkSessionInstance():
  # this code was taken from https://github.com/apache/spark/blob/v2.4.3/examples/src/main/python/streaming/sql_network_wordcount.py
  if ('sparkSessionSingletonInstance' not in globals()):
	  globals()['sparkSessionSingletonInstance'] = SparkSession.builder.getOrCreate()
  return globals()['sparkSessionSingletonInstance']

sc = SparkContext("local[*]", "RepartitioningInputDataStream")
ssc = StreamingContext(sc, 20)    # Uses 20 second as the batch interval

def process(time, rdd):
	print("========= %s =========" % str(time))
  
	try:
		print(rdd.collect())
		print('Num of partitions: {}'.format(rdd.getNumPartitions()))
		for index, partition in enumerate(rdd.glom().collect()):
			print('Partition {}'.format(index))
			print(partition)
	except:
		pass

lines = ssc.socketTextStream("localhost", 9999)
lines = lines.repartition(8)

words = lines.flatMap(lambda line: line.split(" "))
#words = words.repartition(8)

pairs = words.map(lambda word: (word, 1))
#pairs = pairs.repartition(8)

pairs.foreachRDD(process)

ssc.start()                       # Start the computation
ssc.awaitTermination()            # Wait for the computation to terminate

The code above does a very simple thing. It receives input stream at localhost:9999. Then, each line is splitted to words using a whitespace as delimiter. Finally, each word is mapped to an integer. As the final process, we calculate the number and content of partitions in each RDD. Additionally, after each transformation, a repartitioning operation is done.

To execute the code, open 2 Terminals and type the following commands:

Terminal 1
nc -lk 9999

Terminal 2
(path_to_the_spark_submit) (path_to_the_python_code) localhost 9999

Here’s the input data stream that I used (type these on Terminal 1).

ab cd ef gh ij kl
mn op qr st
a
b
c
x
y
z

Here’s the result I got.

Repartitioning after ssc.socketTextStream

Num of partitions: 8
Partition 0
('y', 1)
Partition 1
('z', 1)
Partition 2
('mn', 1), ('op', 1), ('qr', 1), ('st', 1), ('x', 1)
Partition 3
('c', 1)
Partition 4
empty
Partition 5
empty
Partition 6
('ab', 1), ('cd', 1), ('ef', 1), ('gh', 1), ('ij', 1), ('kl', 1)
Partition 7
('a', 1), ('b', 1)

============

Repartitioning after lines.flatMap(lambda line: line.split(" "))

Num of partitions: 8
Partition 0
('y', 1)
Partition 1
('z', 1)
Partition 2
('mn', 1), ('op', 1), ('qr', 1), ('st', 1), ('x', 1)
Partition 3
('c', 1)
Partition 4
empty
Partition 5
empty
Partition 6
('ab', 1), ('cd', 1), ('ef', 1), ('gh', 1), ('ij', 1), ('kl', 1)
Partition 7
('a', 1), ('b', 1)

============

Repartitioning after words.map(lambda word: (word, 1))

Num of partitions: 8
Partition 0
('y', 1)
Partition 1
('z', 1)
Partition 2
('mn', 1), ('op', 1), ('qr', 1), ('st', 1), ('x', 1)
Partition 3
('c', 1)
Partition 4
empty
Partition 5
empty
Partition 6
('ab', 1), ('cd', 1), ('ef', 1), ('gh', 1), ('ij', 1), ('kl', 1)
Partition 7
('a', 1), ('b', 1)

Based on the result above, the repartitioning result is the same. From all the repartitioning positions, the same data is located to the same partition.

Furthermore, all the elements that was actually in the same line before being splitted to words are located to the same partition. For instance, ab cd ef gh ij kl is splitted to ab, cd, ef, gh, ij, kl. At first I thought these elements would be located to different partitions since they were already splitted (became different rows). But the result shows that Spark makes them reside in the same partition (Partition 6). However, this doesn’t mean that a single partition will only be occupied by elements splitted from a line. We can see that in Partition 2 we have another element, namely x.

Thanks for reading.