PySpark — Dynamic Partition Overwrite

INSERT OVERWRITE is a very wonderful concept of overwriting few partitions rather than overwriting the whole data in partitioned output. We have seen this implemented in Hive, Impala etc. But can we implement the same Apache Spark?

Yes, we can implement the same functionality in Spark with Version > 2.3.0 with a small configuration change with write mode “overwrite” itself.

Representation Image

As usual, we will follow along with an example to test this implementation. To implement this functionality we will change the following Spark Parameter

spark.sql.sources.partitionOverwriteMode — Check out Spark documentation https://spark.apache.org/docs/3.0.0/configuration.html

Test 1: We overwrite the partitions for the data with default value for partitionOverwriteMode i.e. STATIC

Lets create our Initial example dataset

# Example dataset
from pyspark.sql.functions import cast, to_date
_data = [
["ORD1001", "P003", 70, "01-21-2022"],
["ORD1004", "P033", 12, "01-24-2022"],
["ORD1005", "P036", 10, "01-20-2022"],
["ORD1002", "P016", 2, "01-10-2022"],
["ORD1003", "P012", 6, "01-10-2022"],
]
_cols = ["order_id", "prod_id", "qty", "order_date"]
# Create the dataframe
df = spark.createDataFrame(data=_data, schema=_cols)
# Cast the Order date from String to Date
df = df.withColumn("order_date", to_date("order_date" ,"MM-dd-yyyy"))
df.printSchema()
df.show()
Initial dataset

Current/Default configuration for spark.sql.sources.partitionOverwriteMode

# Check the mode for Partition Overwrite
spark.conf.get("spark.sql.sources.partitionOverwriteMode")
Default conf set to STATIC

Repartition and save the Initial dataset, we have 4 partition with order_date

# Lets repartition the data with order_date and write
df.repartition("order_date") \
.write \
.format("parquet") \
.partitionBy("order_date") \
.mode("overwrite") \
.save("dataset/orders_partitioned")
Initial dataset partitioned
Validate Initial dataset

Generate the new delta dataset for overwrite

# Lets create our delta dataset for Overwrite
_data = [
["ORD1010", "P053", 78, "01-24-2022"],
["ORD1011", "P076", 21, "01-20-2022"],
]
_cols = ["order_id", "prod_id", "qty", "order_date"]
# Create the delta dataframe
delta_df = spark.createDataFrame(data=_data, schema=_cols)
# Cast the Order date from String to Date
delta_df = delta_df.withColumn("order_date", to_date("order_date" ,"MM-dd-yyyy"))
delta_df.printSchema()
delta_df.show()
Delta dataset

Now, we overwrite the delta data with default configuration

# Lets write to the same location for Orders partitioned
delta_df.repartition("order_date") \
.write \
.format("parquet") \
.partitionBy("order_date") \
.mode("overwrite") \
.save("dataset/orders_partitioned")
Final Data Partitioned
Final data validation
From the above test we conclude, The data is completely overwritten for all existing partitions from Initial data with Delta dataset.

Test 2: We overwrite the partitions for the data with partitionOverwriteMode as DYNAMIC

We follow the same above steps but this time we set the configuration value to DYNAMIC

# Setting the partitionOverwriteMode as DYNAMIC
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.conf.get("spark.sql.sources.partitionOverwriteMode")
Setting conf value to DYNAMIC

Initial dataset

# Example dataset
from pyspark.sql.functions import cast, to_date
_data = [
["ORD1001", "P003", 70, "01-21-2022"],
["ORD1004", "P033", 12, "01-24-2022"],
["ORD1005", "P036", 10, "01-20-2022"],
["ORD1002", "P016", 2, "01-10-2022"],
["ORD1003", "P012", 6, "01-10-2022"],
]
_cols = ["order_id", "prod_id", "qty", "order_date"]
# Create the dataframe
df = spark.createDataFrame(data=_data, schema=_cols)
# Cast the Order date from String to Date
df = df.withColumn("order_date", to_date("order_date" ,"MM-dd-yyyy"))
df.printSchema()
df.show()
Initial Dataset

Write the partitioned Initial dataset

# Lets repartition the data with order_date and write
df.repartition("order_date") \
.write \
.format("parquet") \
.partitionBy("order_date") \
.mode("overwrite") \
.save("dataset/orders_partitioned")
Initial Partitioned dataset
Validate Initial data

We create the same delta dataset

# Lets create our delta dataset for Overwrite
_data = [
["ORD1010", "P053", 78, "01-24-2022"],
["ORD1011", "P076", 21, "01-10-2022"],
]
_cols = ["order_id", "prod_id", "qty", "order_date"]
# Create the delta dataframe
delta_df = spark.createDataFrame(data=_data, schema=_cols)
# Cast the Order date from String to Date
delta_df = delta_df.withColumn("order_date", to_date("order_date" ,"MM-dd-yyyy"))
delta_df.printSchema()
delta_df.show()
Delta Dataset

Now, we write the dataset with same mode “overwrite

# Lets write to the same location for Orders partitioned
delta_df.repartition("order_date") \
.write \
.format("parquet") \
.partitionBy("order_date") \
.mode("overwrite") \
.save("dataset/orders_partitioned")
Final partitioned dataset
Final data validation
As its evident from above test, the partitioned in the delta dataset are only overwritten rather than the whole dataset.
Note: This will only work with partitioned dataset.

Conclusion: We can achieve INSERT OVERWRITE functionality in Spark > 2.3.0 with configuration parameter for changed for Partition Overwrite to dynamic.

Checkout the iPython Notebook on Github — https://github.com/subhamkharwal/ease-with-apache-spark/blob/master/24_Partition_Overwrite.ipynb

Checkout my Personal blog — https://urlit.me/blog/

Checkout the PySpark Medium Series — https://subhamkharwal.medium.com/learnbigdata101-spark-series-940160ff4d30

Buy me a Coffee

If you like my content and wish to buy me a COFFEE. Click the link below or Scan the QR.
Buy Subham a Coffee
*All Payments are secured through Stripe.

Scan the QR to Pay Securely

About the Author

Subham is working as Senior Data Engineer at a Data Analytics and Artificial Intelligence multinational organization.
Checkout portfolio: Subham Khandelwal