Repartitioning Input Data Stream
Published:
Recently I played with a simple Spark Streaming application. Precisely, I investigated the behavior of repartitioning on different level of input data streams. For instance, we have two input data streams, such as linesDStream and wordsDStream. The question is, is the repartitioning result different if I repartition after linesDStream and after wordsDStream?
Here’s the code I used.
from pyspark import SparkContext
from pyspark.sql import Row, SparkSession
from pyspark.streaming import StreamingContext
def getSparkSessionInstance():
# this code was taken from https://github.com/apache/spark/blob/v2.4.3/examples/src/main/python/streaming/sql_network_wordcount.py
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession.builder.getOrCreate()
return globals()['sparkSessionSingletonInstance']
sc = SparkContext("local[*]", "RepartitioningInputDataStream")
ssc = StreamingContext(sc, 20) # Uses 20 second as the batch interval
def process(time, rdd):
print("========= %s =========" % str(time))
try:
print(rdd.collect())
print('Num of partitions: {}'.format(rdd.getNumPartitions()))
for index, partition in enumerate(rdd.glom().collect()):
print('Partition {}'.format(index))
print(partition)
except:
pass
lines = ssc.socketTextStream("localhost", 9999)
lines = lines.repartition(8)
words = lines.flatMap(lambda line: line.split(" "))
#words = words.repartition(8)
pairs = words.map(lambda word: (word, 1))
#pairs = pairs.repartition(8)
pairs.foreachRDD(process)
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
The code above does a very simple thing. It receives input stream at localhost:9999. Then, each line is splitted to words using a whitespace as delimiter. Finally, each word is mapped to an integer. As the final process, we calculate the number and content of partitions in each RDD. Additionally, after each transformation, a repartitioning operation is done.
To execute the code, open 2 Terminals and type the following commands:
Terminal 1 nc -lk 9999 Terminal 2 (path_to_the_spark_submit) (path_to_the_python_code) localhost 9999
Here’s the input data stream that I used (type these on Terminal 1).
ab cd ef gh ij kl mn op qr st a b c x y z
Here’s the result I got.
Repartitioning after ssc.socketTextStream Num of partitions: 8 Partition 0 ('y', 1) Partition 1 ('z', 1) Partition 2 ('mn', 1), ('op', 1), ('qr', 1), ('st', 1), ('x', 1) Partition 3 ('c', 1) Partition 4 empty Partition 5 empty Partition 6 ('ab', 1), ('cd', 1), ('ef', 1), ('gh', 1), ('ij', 1), ('kl', 1) Partition 7 ('a', 1), ('b', 1) ============ Repartitioning after lines.flatMap(lambda line: line.split(" ")) Num of partitions: 8 Partition 0 ('y', 1) Partition 1 ('z', 1) Partition 2 ('mn', 1), ('op', 1), ('qr', 1), ('st', 1), ('x', 1) Partition 3 ('c', 1) Partition 4 empty Partition 5 empty Partition 6 ('ab', 1), ('cd', 1), ('ef', 1), ('gh', 1), ('ij', 1), ('kl', 1) Partition 7 ('a', 1), ('b', 1) ============ Repartitioning after words.map(lambda word: (word, 1)) Num of partitions: 8 Partition 0 ('y', 1) Partition 1 ('z', 1) Partition 2 ('mn', 1), ('op', 1), ('qr', 1), ('st', 1), ('x', 1) Partition 3 ('c', 1) Partition 4 empty Partition 5 empty Partition 6 ('ab', 1), ('cd', 1), ('ef', 1), ('gh', 1), ('ij', 1), ('kl', 1) Partition 7 ('a', 1), ('b', 1)
Based on the result above, the repartitioning result is the same. From all the repartitioning positions, the same data is located to the same partition.
Furthermore, all the elements that was actually in the same line before being splitted to words are located to the same partition. For instance, ab cd ef gh ij kl is splitted to ab, cd, ef, gh, ij, kl. At first I thought these elements would be located to different partitions since they were already splitted (became different rows). But the result shows that Spark makes them reside in the same partition (Partition 6). However, this doesn’t mean that a single partition will only be occupied by elements splitted from a line. We can see that in Partition 2 we have another element, namely x.
Thanks for reading.