PySpark — Setup Delta Lake

Delta Lake is one of the hot topics surrounding the big data space right now. It brings in a lot of improvements in terms reliability on data lake. But why is it gaining so much attention?

Representation Image (Credits:

As per Delta IO documentation — “Delta Lake is an open source project that enables building a Lakehouse Architecture on top of data lakes. Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS”.

There are so many Key features associated with delta lake, checkout below image

Key Features (Credits:

Today, we are going to setup delta lake using PySpark and enable our metastore for the same.

In order to make PySpark support Delta Lake, we need to import the delta-core jar into SparkSession and set the configs right.

# Create Spark Session with Delta JARS and conf

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Delta with PySpark") \
.config('spark.jars.packages', '') \
.config("spark.sql.extensions", "") \
) \
.config("spark.sql.warehouse.dir", "spark-warehouse") \
.master("local[*]") \
.enableHiveSupport() \

Generate SparkSession
Make sure to enable persisting metastore. Checkout my previous blog on the same —

Now, we are ready to work with Delta Lake. To create our first Delta table, lets read the Sales Parquet data.

# Lets read our Sales dataset

df_sales ="dataset/sales.parquet/*parquet")
df_sales.printSchema(), False)
Sales dataset

Now, we transform and write this data as delta table.

# Lets create a sales managed delta table
from pyspark.sql.functions import to_timestamp, expr

df_formatted = (
.withColumn("transacted_at", to_timestamp("transacted_at"))
.withColumn("amount", expr("CAST(amount as decimal(14,2))"))

df_formatted.write \
.format("delta") \
.mode("overwrite") \
.option("mergeSchema", True) \
Sales Delta table

Since we didn’t specify any external location, by default it would be a in-house managed delta table.

Table definition
Table data

Since, delta tables supports versioning

# Lets check the current version of the table

from delta import DeltaTable

dt = DeltaTable.forName(spark, "sales_delta_managed")
dt.history().select("version", "timestamp").show(truncate=False)
Delta table versioning

Lets update a record to see the changes in versioning

ACID transaction support

Record got updated without any hassle (delta lake supports DML operations)

Record updated

Changes in versioning

Version updated after DML operation

Lets validate if a given table location is Delta Table

# Verify if a given table is Delta

print(DeltaTable.isDeltaTable(spark, "spark-warehouse/sales_managed/"))
print(DeltaTable.isDeltaTable(spark, "spark-warehouse/sales_delta_managed/"))
Validate Delta table

Bonus : Shortcut to convert Parquet data to delta

# Shortcut to create a Parquet location to delta table
# We will convert the sales_managed table to delta

DeltaTable.convertToDelta(spark, "parquet.`spark-warehouse/sales_managed`")

Verify if the location is converted into delta


But, if we check the metadata from Catalog, its still a hive table.

So, to convert the metadata as well


CONVERT TO DELTA default.sales_managed;
Table converted to delta

We are going to cover many more Delta Lake concepts in upcoming blogs.

Checkout the iPython Notebook on Github —

Checkout my Personal blog —

Checkout the PySpark Medium Series —

Buy me a Coffee

If you like my content and wish to buy me a COFFEE. Click the link below or Scan the QR.
Buy Subham a Coffee
*All Payments are secured through Stripe.

Scan the QR to Pay Securely

About the Author

Subham is working as Senior Data Engineer at a Data Analytics and Artificial Intelligence multinational organization.
Checkout portfolio: Subham Khandelwal