PySpark — Estimate Partition Count for File Read

Understand how Spark estimates the number of Partitions required to read a file

Photo by Maksym Kaharlytskyi on Unsplash

Spark reads file in partitions and each partition is processed to reach the desired result. But, more often we are confused over how the number of partitions is estimated and divided ?

Today we will deep dive inside the logic that Spark uses for this calculation. But before we can start, we need to understand the use of the following configurations. As per Spark documentation:

  1. spark.sql.files.maxPartitionBytes — The maximum number of bytes to pack into a single partition when reading files. Default value is set to 128MB.
  2. spark.sql.files.openCostInBytes — The estimated cost to open a file. Default value is set to 4MB.
  3. spark.sparkContext.defaultParallelism — Number of Cores available for Parallel operations.

In order to understand calculation better, lets consider an example and break it into multiple Steps.

Example 1

We have a single huge CSV file of 2647733632 bytes size (approx. 2.5 GB). Lets estimate the partition count with default Spark configuration.

Now if you are new to Spark, PySpark or want to learn more — I teach Big Data, Spark, Data Engineering & Data Warehousing on my YouTube Channel — Ease With Data
https://medium.com/media/bc2c16be6ddb2223368ba8421c13f202/href

1️⃣ Step 1: Validate the configuration for partition size, open cost and parallelism

# Check the default partition size
partition_size = int(spark.conf.get("spark.sql.files.maxPartitionBytes").replace("b",""))
print(f"Partition Size: {partition_size} in bytes and {int(partition_size) / 1024 / 1024} in MB")

# Check the default open Cost in Bytes
open_cost_size = int(spark.conf.get("spark.sql.files.openCostInBytes").replace("b",""))
print(f"Open Cost Size: {partition_size} in bytes and {int(partition_size) / 1024 / 1024} in MB")

# Default parallelism
parallelism = int(spark.sparkContext.defaultParallelism)
print(f"Default Parallelism: {parallelism}")
Spark Configuration

2️⃣ Step 2: Calculate the total file size on disk (for current example we have only one file of 2647733632 bytes of size)

Example file
# File size in Bytes
average_file_size = 2647733632
total_files = 1

# Total Actual File Size in Bytes
total_file_size = average_file_size * total_files
print(f"Total File size on disk: {total_file_size} in bytes and {total_file_size / 1024 /1024} in MB")
File Size

3️⃣ Step 3: Calculate the total padded file size by adding total file opening cost with total file size.

# Padded file size for Spark read
padded_file_size = total_file_size + (total_files * open_cost_size)
print(f"Total padded file size: {padded_file_size} in bytes and {padded_file_size / 1024 /1024} in MB")
Padded file size

4️⃣ Step 4: Calculate the Bytes per Core by dividing the calculated padded file size with total number of cores available (i.e. parallelism)

# Number of Bytes per Core
bytes_per_core = padded_file_size / parallelism
print(f"Bytes per Core: {bytes_per_core} in bytes and {bytes_per_core / 1024 /1024} in MB")
Bytes per core

*5️⃣ Step 5: Calculate the max bytes per partition - identify the min between configured partition size and max of open cost or bytes per core.

# Max Split Bytes
max_bytes_per_split = min(partition_size, max(open_cost_size, bytes_per_core))
print(f"Max bytes per Partition: {max_bytes_per_split} in bytes and {max_bytes_per_split / 1024 /1024} in MB")
Max bytes per partition

6️⃣ Step 6: Calculate the number of partitions by dividing padded file size with max bytes per partition

# Total number of Partitions
num_of_partitions = padded_file_size / max_bytes_per_split
print(f"Approx number of partitions: {num_of_partitions}")
Number of Partitions

So for current example we estimate 19.75 partitions for our data read. Lets read the file in Spark and see the actual number of partitions.

# Read the file to see the number of partitons
df_1 = spark.read.format("csv").option("header", True).load("dataset/sales_combined_2.csv")
print(f"Number of Partition -> {df_1.rdd.getNumPartitions()}")

Awesome, we are able to predict almost the number of partitions Spark would require to process the dataset 🤞

Now, lets pack the whole code into a Python function for re-usability.

# Lets pack all code in single function
def num_partitions(file_size, num_of_files, spark):
# Check the default partition size
partition_size = int(spark.conf.get("spark.sql.files.maxPartitionBytes").replace("b",""))
# Check the default open Cost in Bytes
open_cost_size = int(spark.conf.get("spark.sql.files.openCostInBytes").replace("b",""))
# Default parallelism
parallelism = int(spark.sparkContext.defaultParallelism)
# Total Actual File Size in Bytes
total_file_size = file_size * num_of_files
# Padded file size for Spark read
padded_file_size = total_file_size + (num_of_files * open_cost_size)
# Number of Bytes per Core
bytes_per_core = padded_file_size / parallelism
# Max Split Bytes
max_bytes_per_split = min(partition_size, max(open_cost_size, bytes_per_core))
# Total number of Partitions
num_of_partitions = padded_file_size / max_bytes_per_split

return num_of_partitions
Reusable function

Great, now we can use this function to estimate the number of partitions. Lets take one more example.

Example 2

Consider a folder with 41300 tiny parquet files with average size of 7777 bytes per file approx. 7.7 KB. We can use our designed function now to predict the number of partitions for us.

# Calculate the number of partitions as per our logic for tiny files
estimated_num_partition = num_partitions(7777, 41300, spark)
print(f"Estimated number of partitions = {estimated_num_partition}")

# Lets read multiple parquet file with approx size 7777 bytes or 7.7 KB
df_2 = spark.read.format("parquet").load("dataset/sales_trx_id.parquet/")
print(f"Number of Partition -> {df_2.rdd.getNumPartitions()}")
Example 2

Mind blowing 🤯, we estimated the approx. correct number.

Note: Since we are using average file sizes for calculation, there can be negligible difference in the predicted and the actual number of partitions.

Upcoming articles will discuss few more techniques, which will help to further optimize the file reads. Till then Keep Learning, Keep Growing and Keep Sharing❤️

Make sure to Like and Subscribe.

Checkout Ease With Data YouTube Channel: https://www.youtube.com/@easewithdata

Wish to connect with me: https://topmate.io/subham_khandelwal

Checkout the iPython Notebook on Github — https://github.com/subhamkharwal/ease-with-apache-spark/blob/master/39_estimate_partition_count_file_read.ipynb

Checkout my Personal Blog — https://urlit.me/blog/

Checkout the PySpark Medium Series — https://subhamkharwal.medium.com/learnbigdata101-spark-series-940160ff4d30


PySpark — Estimate Partition Count for File Read was originally published in Dev Genius on Medium, where people are continuing the conversation by highlighting and responding to this story.

Buy me a Coffee

If you like my content and wish to buy me a COFFEE. Click the link below or Scan the QR.
Buy Subham a Coffee
*All Payments are secured through Stripe.

Scan the QR to Pay Securely

About the Author

Subham is working as Senior Data Engineer at a Data Analytics and Artificial Intelligence multinational organization.
Checkout portfolio: Subham Khandelwal