Structured Streaming Checkpointing with Parquet Stream Source

3 minute read

Published:

I was curious about how checkpoint files in Spark structured streaming looked like. To introduce the basic concept, checkpointing simply denotes the progress information of streaming process. This checkpoint files are usually used for failure recovery. More detail explanation can be found here.

The structure of the checkpoint directory might be different for various streaming sources. For instance, socket and parquet streaming source yield different checkpoint directory structures. Socket stream source does not generate a directory called sources/0/, while parquet stream source generates it.

In this article I’ll use parquet as the streaming source.

Here’s the scenario. We’re going to read streaming data from a dedicated sink source, let’s call it streamedParquets. This streamedParquets stores a collection of parquet files sunk by another streaming process. Suppose the followings are the parquet files.

\ streamedParquets
    -- a.parquet
    -- b.parquet
    -- c.parquet
    -- d.parquet
    -- e.parquet
    -- f.parquet
    -- g.parquet
    -- h.parquet

In addition, let’s presume that each parquet file contains an extremely simple dataframe, such as the following.

+---------+
+   word  +
+---------+
+ hello_a +
+ world_a +
+---------+

Our streaming process will then read the parquet files according to the below code.

import time
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructField, StructType

spark = SparkSession \
    .builder \
    .appName("structuredStreamingParquet") \
    .getOrCreate()

# Define a schema for the parquet files
parquet_schema = StructType([
    StructField('word', StringType())
])

df = spark \
    .readStream \
    .schema(parquet_schema) \
    .option('maxFilesPerTrigger', 4) \
    .parquet("streamedParquets/")

The above code simply works like the followings:

  • Define a schema for the parquet files. In this case, the dataframe in each parquet file should have a column called word with string as the data type. Any parquet that does not follow this rule will not be processed
  • Create a stream reader for reading the stream data (does not start the streaming process). This reader consists of several information, such as:
    • valid schema
    • number of parquets to be processed within a single batch (4 parquet files in this case)
    • parquet stream source (streamedParquets/)

After defining the stream reader, we then can proceed to start the streaming process. Below shows the code snippet.

def process_batch(df, epoch_id):
	print("Batch {}: {}".format(epoch_id, datetime.fromtimestamp(time.time())))
	df = df.withColumn('TEST', F.lit('abc'))
	df.show()


df = df \
    .writeStream \
    .trigger(processingTime='5 seconds') \
    .foreachBatch(lambda df, epoch_id: process_batch(df, epoch_id)) \
    .option('checkpointLocation', "checkpoints/") \
    .start()

df.awaitTermination()

The above code simply does the followings:

  • For every 5 seconds, create a new batch by the following processes:
    • retrieve maxFilesPerTrigger parquet data
    • append the data to an unbounded (input) table
    • process the data using the defined batch processor (in this case, a method called process_batch)
  • Write the checkpoint files to a dedicated location, that is checkpoints in this case

After the streaming is executed, Spark will create several directories in checkpoints directory. One of them is sources/0/.

Inside the sources/0/ is simply a collection of files named incrementally (starts from 0). This file name denotes the batch ID.

Here’s the example for batch 0.

v1
{"path":"streamedParquets/a.parquet","timestamp":<timestamp_p>,"batchId":0}
{"path":"streamedParquets/b.parquet","timestamp":<timestamp_q>,"batchId":0}
{"path":"streamedParquets/c.parquet","timestamp":<timestamp_r>,"batchId":0}
{"path":"streamedParquets/d.parquet","timestamp":<timestamp_s>,"batchId":0}

And here’s the example for batch 1.

v1
{"path":"streamedParquets/e.parquet","timestamp":<timestamp_k>,"batchId":1}
{"path":"streamedParquets/f.parquet","timestamp":<timestamp_l>,"batchId":1}
{"path":"streamedParquets/g.parquet","timestamp":<timestamp_m>,"batchId":1}
{"path":"streamedParquets/h.parquet","timestamp":<timestamp_n>,"batchId":1}

Please note that the timestamp for each parquet file within the same batch might be different.

After knowing the content of the checkpoint files, let’s take a look at the output of the batch processor on Terminal.

Batch 0: datetime_batch_0
+---------+
+   word  +
+---------+
+ hello_a +
+ world_a +
+ hello_b +
+ world_b +
+ hello_c +
+ world_c +
+ hello_d +
+ world_d +
+---------+

Batch 1: datetime_batch_0 + 5 seconds (processing time)
+---------+
+   word  +
+---------+
+ hello_e +
+ world_e +
+ hello_f +
+ world_f +
+ hello_g +
+ world_g +
+ hello_h +
+ world_h +
+---------+

Thank you for reading.