import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import io.delta.tables._
object DeltaSchemaEvolutionBasics {
// Initialize Spark with Delta Lake support
val spark = SparkSession.builder()
.appName("DeltaSchemaEvolutionBasics")
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
import spark.implicits._
// ============================================
// EXAMPLE 1: Basic Table Creation
// ============================================
def example1_CreateInitialTable(): Unit = {
println("\n" + "="*70)
println("EXAMPLE 1: Creating Your First Delta Table")
println("="*70)
val tablePath = "/tmp/delta/beginner/users"
// Create simple data
val users = Seq(
(1, "Alice", "alice@email.com"),
(2, "Bob", "bob@email.com"),
(3, "Charlie", "charlie@email.com")
).toDF("id", "name", "email")
println("\nOriginal data:")
users.show()
users.printSchema()
// Write as Delta table
users.write
.format("delta")
.mode("overwrite")
.save(tablePath)
println(s"\n✓ Delta table created at: $tablePath")
// Read it back
val savedTable = spark.read.format("delta").load(tablePath)
println("\nReading the saved table:")
savedTable.show()
}
// ============================================
// EXAMPLE 2: Adding New Columns (Schema Evolution)
// ============================================
def example2_AddNewColumns(): Unit = {
println("\n" + "="*70)
println("EXAMPLE 2: Adding New Columns to Existing Table")
println("="*70)
val tablePath = "/tmp/delta/beginner/users"
println("\nCurrent schema:")
spark.read.format("delta").load(tablePath).printSchema()
// Create new data with additional columns
val newUsers = Seq(
(4, "David", "david@email.com", 30, "New York"),
(5, "Emma", "emma@email.com", 25, "London")
).toDF("id", "name", "email", "age", "city")
println("\nNew data with extra columns (age, city):")
newUsers.show()
newUsers.printSchema()
// Try to append WITHOUT schema evolution (will fail)
println("\n❌ Without mergeSchema option - this would fail:")
println("users.write.format('delta').mode('append').save(tablePath)")
// Append WITH schema evolution
println("\n✓ With mergeSchema option - this works:")
newUsers.write
.format("delta")
.mode("append")
.option("mergeSchema", "true") // KEY OPTION for schema evolution
.save(tablePath)
println("\nUpdated table with new columns:")
val updatedTable = spark.read.format("delta").load(tablePath)
updatedTable.printSchema()
updatedTable.show()
println("\nNotice: Old records have NULL for new columns (age, city)")
}
// ============================================
// EXAMPLE 3: Overwriting Schema Completely
// ============================================
def example3_OverwriteSchema(): Unit = {
println("\n" + "="*70)
println("EXAMPLE 3: Completely Replacing Table Schema")
println("="*70)
val tablePath = "/tmp/delta/beginner/products"
// Create initial table
val initialProducts = Seq(
(1, "Laptop", 999.99),
(2, "Mouse", 29.99)
).toDF("product_id", "product_name", "price")
println("\nInitial products table:")
initialProducts.show()
initialProducts.printSchema()
initialProducts.write
.format("delta")
.mode("overwrite")
.save(tablePath)
// Create completely new schema
val newProducts = Seq(
(101, "Monitor", "Electronics", 299.99, 10),
(102, "Keyboard", "Electronics", 79.99, 25)
).toDF("id", "name", "category", "price", "stock")
println("\nNew products with different schema:")
newProducts.show()
newProducts.printSchema()
// Overwrite with new schema
newProducts.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true") // KEY OPTION to replace schema
.save(tablePath)
println("\n✓ Schema completely replaced:")
val updatedProducts = spark.read.format("delta").load(tablePath)
updatedProducts.printSchema()
updatedProducts.show()
}
// ============================================
// EXAMPLE 4: Data Type Widening (Automatic)
// ============================================
def example4_DataTypeWidening(): Unit = {
println("\n" + "="*70)
println("EXAMPLE 4: Automatic Data Type Widening")
println("="*70)
val tablePath = "/tmp/delta/beginner/sales"
// Create table with Int
println("Step 1: Creating table with Integer salary")
val initialData = Seq(
(1, "John", 50000),
(2, "Mary", 60000),
(3, "Bob", 55000)
).toDF("id", "name", "salary")
initialData.write.format("delta").mode("overwrite").save(tablePath)
println("\nInitial Schema:")
spark.read.format("delta").load(tablePath).printSchema()
spark.read.format("delta").load(tablePath).show()
// Step 2: Read, cast, and overwrite with new schema
println("\nStep 2: Widening salary from INT to BIGINT")
val df = spark.read.format("delta").load(tablePath)
val widenedDF = df.select(
$"id",
$"name",
$"salary".cast(LongType).as("salary")
)
widenedDF.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.save(tablePath)
println("\nSchema after widening:")
spark.read.format("delta").load(tablePath).printSchema()
// Step 3: Insert new data with larger salary values
println("\nStep 3: Inserting data with large salary values")
val newData = Seq(
(4, "Alice", 5000000000L) // Large value that needs BIGINT
).toDF("id", "name", "salary")
newData.write.format("delta").mode("append").save(tablePath)
println("\nFinal data:")
spark.read.format("delta").load(tablePath).show()
println("\n✓ Type widening successful!")
println("Salary column changed from INT to BIGINT without losing any data")
}
// ============================================
// EXAMPLE 5: Using ALTER TABLE to Add Columns
// ============================================
def example5_AlterTableAddColumn(): Unit = {
println("\n" + "="*70)
println("EXAMPLE 5: Using SQL ALTER TABLE to Add Columns")
println("="*70)
val tablePath = "/tmp/delta/beginner/employees"
// Create initial table
val employees = Seq(
(1, "John", "Engineering"),
(2, "Sarah", "Marketing")
).toDF("emp_id", "emp_name", "department")
employees.write.format("delta").mode("overwrite").save(tablePath)
// Create a temporary view for SQL operations
spark.read.format("delta").load(tablePath).createOrReplaceTempView("employees")
println("\nInitial employee table:")
spark.sql("SELECT * FROM employees").show()
spark.sql("DESCRIBE employees").show()
// Add column using SQL
println("\nAdding 'salary' column using ALTER TABLE:")
spark.sql(s"""
ALTER TABLE delta.`$tablePath`
ADD COLUMNS (salary DOUBLE)
""")
spark.read.format("delta").load(tablePath).createOrReplaceTempView("employees")
println("\n✓ Column added:")
spark.sql("DESCRIBE employees").show()
spark.sql("SELECT * FROM employees").show()
// You can also add column with comment
spark.sql(s"""
ALTER TABLE delta.`$tablePath`
ADD COLUMNS (hire_date STRING COMMENT 'Employee hire date')
""")
spark.read.format("delta").load(tablePath).createOrReplaceTempView("employees")
println("\n✓ Another column added with comment:")
spark.sql("DESCRIBE employees").show()
}
// ============================================
// EXAMPLE 6: Handling Schema Mismatches
// ============================================
def example6_SchemaMismatchHandling(): Unit = {
println("\n" + "="*70)
println("EXAMPLE 6: Understanding Schema Mismatch Errors")
println("="*70)
val tablePath = "/tmp/delta/beginner/orders"
// Create initial table
val orders = Seq(
(1, "Product A", 50.0),
(2, "Product B", 75.0)
).toDF("order_id", "product", "amount")
orders.write.format("delta").mode("overwrite").save(tablePath)
println("\nOriginal orders table:")
spark.read.format("delta").load(tablePath).show()
// Try to append data with different column order
val newOrders = Seq(
(100.0, 3, "Product C"), // Different order!
(125.0, 4, "Product D")
).toDF("amount", "order_id", "product")
println("\nNew data (columns in different order):")
newOrders.show()
println("\n✓ Delta Lake matches by column NAME, not position:")
newOrders.write
.format("delta")
.mode("append")
.save(tablePath)
println("\nAll orders (correctly matched despite different column order):")
spark.read.format("delta").load(tablePath).show()
// Example of incompatible schema (will fail)
println("\n❌ Incompatible schema example:")
println("If you try to change data types incompatibly (e.g., String to Int),")
println("Delta Lake will raise an error to protect data integrity.")
}
// ============================================
// EXAMPLE 7: Viewing Schema History
// ============================================
def example7_SchemaHistory(): Unit = {
println("\n" + "="*70)
println("EXAMPLE 7: Tracking Schema Changes Over Time")
println("="*70)
val tablePath = "/tmp/delta/beginner/users"
val deltaTable = DeltaTable.forPath(spark, tablePath)
println("\nTable history (shows all operations including schema changes):")
deltaTable.history().select("version", "timestamp", "operation", "operationMetrics")
.show(false)
// Read table at different versions to see schema evolution
println("\nSchema at version 0 (initial creation):")
spark.read.format("delta").option("versionAsOf", 0).load(tablePath).printSchema()
println("\nSchema at latest version (after adding columns):")
spark.read.format("delta").load(tablePath).printSchema()
println("\nYou can always time-travel back to previous schemas!")
}
// ============================================
// EXAMPLE 8: Merge with Schema Evolution
// ============================================
def example8_MergeWithSchemaEvolution(): Unit = {
println("\n" + "="*70)
println("EXAMPLE 8: MERGE Operations with Schema Evolution")
println("="*70)
val tablePath = "/tmp/delta/beginner/customers"
// Create target table
val customers = Seq(
(1, "Alice", "alice@email.com"),
(2, "Bob", "bob@email.com")
).toDF("customer_id", "name", "email")
customers.write.format("delta").mode("overwrite").save(tablePath)
println("\nExisting customers table:")
spark.read.format("delta").load(tablePath).show()
// New data with additional column
val updates = Seq(
(2, "Bob Smith", "bob.smith@email.com", "555-1234"), // Update existing
(3, "Charlie", "charlie@email.com", "555-5678") // Insert new
).toDF("customer_id", "name", "email", "phone")
println("\nUpdates with new 'phone' column:")
updates.show()
// Perform merge with schema evolution
val deltaTable = DeltaTable.forPath(spark, tablePath)
deltaTable.as("target")
.merge(
updates.as("source"),
"target.customer_id = source.customer_id"
)
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
println("\n❌ Without schema evolution, this would fail!")
println("You need to enable schema evolution for MERGE operations:")
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
println("\n✓ With schema evolution enabled:")
deltaTable.as("target")
.merge(
updates.as("source"),
"target.customer_id = source.customer_id"
)
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
println("\nFinal customers table with new column:")
spark.read.format("delta").load(tablePath).show()
}
// ============================================
// EXAMPLE 9: Practical Use Case - Adding Audit Columns
// ============================================
def example9_AddAuditColumns(): Unit = {
println("\n" + "="*70)
println("EXAMPLE 9: Practical Example - Adding Audit Columns")
println("="*70)
val tablePath = "/tmp/delta/beginner/transactions"
// Initial table without audit columns
val transactions = Seq(
(1, 1001, 250.0),
(2, 1002, 175.5)
).toDF("transaction_id", "account_id", "amount")
println("\nOriginal transactions (no audit info):")
transactions.show()
transactions.write.format("delta").mode("overwrite").save(tablePath)
// Add audit columns to new records
val newTransactions = Seq(
(3, 1003, 320.0, "2024-01-15", "system_user"),
(4, 1004, 450.75, "2024-01-15", "admin_user")
).toDF("transaction_id", "account_id", "amount", "created_date", "created_by")
println("\nNew transactions with audit columns:")
newTransactions.show()
// Append with schema evolution
newTransactions.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.save(tablePath)
println("\n✓ Table now has audit columns:")
val allTransactions = spark.read.format("delta").load(tablePath)
allTransactions.show()
println("\nOld records have NULL for audit columns - you can update them:")
val deltaTable = DeltaTable.forPath(spark, tablePath)
deltaTable.update(
condition = col("created_date").isNull,
set = Map(
"created_date" -> lit("2024-01-01"),
"created_by" -> lit("legacy_system")
)
)
println("\nAfter backfilling audit columns:")
spark.read.format("delta").load(tablePath).show()
}
// ============================================
// EXAMPLE 10: Best Practices Summary
// ============================================
def example10_BestPractices(): Unit = {
println("\n" + "="*70)
println("EXAMPLE 10: Schema Evolution Best Practices")
println("="*70)
println("""
|
|📋 SCHEMA EVOLUTION BEST PRACTICES:
|
|1. ENABLE SCHEMA EVOLUTION EXPLICITLY
| ✓ Use .option("mergeSchema", "true") for append operations
| ✓ Use .option("overwriteSchema", "true") for full schema replacement
| ✓ Set spark.databricks.delta.schema.autoMerge.enabled for MERGE
|
|2. UNDERSTAND WHAT'S ALLOWED
| ✓ Adding new columns: ALLOWED
| ✓ Data type widening (Int → Long): ALLOWED
| ✓ Changing column order: NO PROBLEM (Delta uses names)
| ❌ Dropping columns: NOT DIRECTLY SUPPORTED
| ❌ Data type narrowing (Long → Int): NOT ALLOWED
| ❌ Changing column data types incompatibly: NOT ALLOWED
|
|3. USE TIME TRAVEL FOR SAFETY
| ✓ Always check table history before major changes
| ✓ You can read previous schemas using versionAsOf
| ✓ Test schema changes on a copy first
|
|4. PLAN FOR NULLS
| ✓ New columns will have NULL for existing records
| ✓ Consider adding default values or backfilling data
| ✓ Document what NULL means in your schema
|
|5. DOCUMENT SCHEMA CHANGES
| ✓ Use column comments in ALTER TABLE statements
| ✓ Keep track of why columns were added
| ✓ Review table history regularly
|
|6. TEST IN DEVELOPMENT FIRST
| ✓ Never test schema evolution on production data
| ✓ Validate that downstream applications handle new schemas
| ✓ Check query performance after schema changes
|
""".stripMargin)
}
// ============================================
// MAIN - RUN ALL BEGINNER EXAMPLES
// ============================================
def main(args: Array[String]): Unit = {
println("="*70)
println("DELTA LAKE SCHEMA EVOLUTION - BEGINNER'S GUIDE")
println("="*70)
try {
// Run all examples in sequence
example1_CreateInitialTable()
example2_AddNewColumns()
example3_OverwriteSchema()
example4_DataTypeWidening()
example5_AlterTableAddColumn()
example6_SchemaMismatchHandling()
example7_SchemaHistory()
example8_MergeWithSchemaEvolution()
example9_AddAuditColumns()
example10_BestPractices()
println("\n" + "="*70)
println("✓ ALL EXAMPLES COMPLETED SUCCESSFULLY!")
println("="*70)
} catch {
case e: Exception =>
println(s"\n❌ Error occurred: ${e.getMessage}")
e.printStackTrace()
} finally {
spark.stop()
}
}
}
No comments:
Post a Comment