Wednesday, 10 December 2025

Schema Evolution - Basics

import org.apache.spark.sql.{SparkSession, DataFrame}

import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import io.delta.tables._ object DeltaSchemaEvolutionBasics { // Initialize Spark with Delta Lake support val spark = SparkSession.builder() .appName("DeltaSchemaEvolutionBasics") .master("local[*]") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate() import spark.implicits._ // ============================================ // EXAMPLE 1: Basic Table Creation // ============================================ def example1_CreateInitialTable(): Unit = { println("\n" + "="*70) println("EXAMPLE 1: Creating Your First Delta Table") println("="*70) val tablePath = "/tmp/delta/beginner/users" // Create simple data val users = Seq( (1, "Alice", "alice@email.com"), (2, "Bob", "bob@email.com"), (3, "Charlie", "charlie@email.com") ).toDF("id", "name", "email") println("\nOriginal data:") users.show() users.printSchema() // Write as Delta table users.write .format("delta") .mode("overwrite") .save(tablePath) println(s"\n✓ Delta table created at: $tablePath") // Read it back val savedTable = spark.read.format("delta").load(tablePath) println("\nReading the saved table:") savedTable.show() } // ============================================ // EXAMPLE 2: Adding New Columns (Schema Evolution) // ============================================ def example2_AddNewColumns(): Unit = { println("\n" + "="*70) println("EXAMPLE 2: Adding New Columns to Existing Table") println("="*70) val tablePath = "/tmp/delta/beginner/users" println("\nCurrent schema:") spark.read.format("delta").load(tablePath).printSchema() // Create new data with additional columns val newUsers = Seq( (4, "David", "david@email.com", 30, "New York"), (5, "Emma", "emma@email.com", 25, "London") ).toDF("id", "name", "email", "age", "city") println("\nNew data with extra columns (age, city):") newUsers.show() newUsers.printSchema() // Try to append WITHOUT schema evolution (will fail) println("\n❌ Without mergeSchema option - this would fail:") println("users.write.format('delta').mode('append').save(tablePath)") // Append WITH schema evolution println("\n✓ With mergeSchema option - this works:") newUsers.write .format("delta") .mode("append") .option("mergeSchema", "true") // KEY OPTION for schema evolution .save(tablePath) println("\nUpdated table with new columns:") val updatedTable = spark.read.format("delta").load(tablePath) updatedTable.printSchema() updatedTable.show() println("\nNotice: Old records have NULL for new columns (age, city)") } // ============================================ // EXAMPLE 3: Overwriting Schema Completely // ============================================ def example3_OverwriteSchema(): Unit = { println("\n" + "="*70) println("EXAMPLE 3: Completely Replacing Table Schema") println("="*70) val tablePath = "/tmp/delta/beginner/products" // Create initial table val initialProducts = Seq( (1, "Laptop", 999.99), (2, "Mouse", 29.99) ).toDF("product_id", "product_name", "price") println("\nInitial products table:") initialProducts.show() initialProducts.printSchema() initialProducts.write .format("delta") .mode("overwrite") .save(tablePath) // Create completely new schema val newProducts = Seq( (101, "Monitor", "Electronics", 299.99, 10), (102, "Keyboard", "Electronics", 79.99, 25) ).toDF("id", "name", "category", "price", "stock") println("\nNew products with different schema:") newProducts.show() newProducts.printSchema() // Overwrite with new schema newProducts.write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") // KEY OPTION to replace schema .save(tablePath) println("\n✓ Schema completely replaced:") val updatedProducts = spark.read.format("delta").load(tablePath) updatedProducts.printSchema() updatedProducts.show() } // ============================================ // EXAMPLE 4: Data Type Widening (Automatic) // ============================================ def example4_DataTypeWidening(): Unit = { println("\n" + "="*70) println("EXAMPLE 4: Automatic Data Type Widening") println("="*70) val tablePath = "/tmp/delta/beginner/sales" // Create table with Int println("Step 1: Creating table with Integer salary") val initialData = Seq( (1, "John", 50000), (2, "Mary", 60000), (3, "Bob", 55000) ).toDF("id", "name", "salary") initialData.write.format("delta").mode("overwrite").save(tablePath) println("\nInitial Schema:") spark.read.format("delta").load(tablePath).printSchema() spark.read.format("delta").load(tablePath).show() // Step 2: Read, cast, and overwrite with new schema println("\nStep 2: Widening salary from INT to BIGINT") val df = spark.read.format("delta").load(tablePath) val widenedDF = df.select( $"id", $"name", $"salary".cast(LongType).as("salary") ) widenedDF.write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") .save(tablePath) println("\nSchema after widening:") spark.read.format("delta").load(tablePath).printSchema() // Step 3: Insert new data with larger salary values println("\nStep 3: Inserting data with large salary values") val newData = Seq( (4, "Alice", 5000000000L) // Large value that needs BIGINT ).toDF("id", "name", "salary") newData.write.format("delta").mode("append").save(tablePath) println("\nFinal data:") spark.read.format("delta").load(tablePath).show() println("\n✓ Type widening successful!") println("Salary column changed from INT to BIGINT without losing any data") } // ============================================ // EXAMPLE 5: Using ALTER TABLE to Add Columns // ============================================ def example5_AlterTableAddColumn(): Unit = { println("\n" + "="*70) println("EXAMPLE 5: Using SQL ALTER TABLE to Add Columns") println("="*70) val tablePath = "/tmp/delta/beginner/employees" // Create initial table val employees = Seq( (1, "John", "Engineering"), (2, "Sarah", "Marketing") ).toDF("emp_id", "emp_name", "department") employees.write.format("delta").mode("overwrite").save(tablePath) // Create a temporary view for SQL operations spark.read.format("delta").load(tablePath).createOrReplaceTempView("employees") println("\nInitial employee table:") spark.sql("SELECT * FROM employees").show() spark.sql("DESCRIBE employees").show() // Add column using SQL println("\nAdding 'salary' column using ALTER TABLE:") spark.sql(s""" ALTER TABLE delta.`$tablePath` ADD COLUMNS (salary DOUBLE) """) spark.read.format("delta").load(tablePath).createOrReplaceTempView("employees") println("\n✓ Column added:") spark.sql("DESCRIBE employees").show() spark.sql("SELECT * FROM employees").show() // You can also add column with comment spark.sql(s""" ALTER TABLE delta.`$tablePath` ADD COLUMNS (hire_date STRING COMMENT 'Employee hire date') """) spark.read.format("delta").load(tablePath).createOrReplaceTempView("employees") println("\n✓ Another column added with comment:") spark.sql("DESCRIBE employees").show() } // ============================================ // EXAMPLE 6: Handling Schema Mismatches // ============================================ def example6_SchemaMismatchHandling(): Unit = { println("\n" + "="*70) println("EXAMPLE 6: Understanding Schema Mismatch Errors") println("="*70) val tablePath = "/tmp/delta/beginner/orders" // Create initial table val orders = Seq( (1, "Product A", 50.0), (2, "Product B", 75.0) ).toDF("order_id", "product", "amount") orders.write.format("delta").mode("overwrite").save(tablePath) println("\nOriginal orders table:") spark.read.format("delta").load(tablePath).show() // Try to append data with different column order val newOrders = Seq( (100.0, 3, "Product C"), // Different order! (125.0, 4, "Product D") ).toDF("amount", "order_id", "product") println("\nNew data (columns in different order):") newOrders.show() println("\n✓ Delta Lake matches by column NAME, not position:") newOrders.write .format("delta") .mode("append") .save(tablePath) println("\nAll orders (correctly matched despite different column order):") spark.read.format("delta").load(tablePath).show() // Example of incompatible schema (will fail) println("\n❌ Incompatible schema example:") println("If you try to change data types incompatibly (e.g., String to Int),") println("Delta Lake will raise an error to protect data integrity.") } // ============================================ // EXAMPLE 7: Viewing Schema History // ============================================ def example7_SchemaHistory(): Unit = { println("\n" + "="*70) println("EXAMPLE 7: Tracking Schema Changes Over Time") println("="*70) val tablePath = "/tmp/delta/beginner/users" val deltaTable = DeltaTable.forPath(spark, tablePath) println("\nTable history (shows all operations including schema changes):") deltaTable.history().select("version", "timestamp", "operation", "operationMetrics") .show(false) // Read table at different versions to see schema evolution println("\nSchema at version 0 (initial creation):") spark.read.format("delta").option("versionAsOf", 0).load(tablePath).printSchema() println("\nSchema at latest version (after adding columns):") spark.read.format("delta").load(tablePath).printSchema() println("\nYou can always time-travel back to previous schemas!") } // ============================================ // EXAMPLE 8: Merge with Schema Evolution // ============================================ def example8_MergeWithSchemaEvolution(): Unit = { println("\n" + "="*70) println("EXAMPLE 8: MERGE Operations with Schema Evolution") println("="*70) val tablePath = "/tmp/delta/beginner/customers" // Create target table val customers = Seq( (1, "Alice", "alice@email.com"), (2, "Bob", "bob@email.com") ).toDF("customer_id", "name", "email") customers.write.format("delta").mode("overwrite").save(tablePath) println("\nExisting customers table:") spark.read.format("delta").load(tablePath).show() // New data with additional column val updates = Seq( (2, "Bob Smith", "bob.smith@email.com", "555-1234"), // Update existing (3, "Charlie", "charlie@email.com", "555-5678") // Insert new ).toDF("customer_id", "name", "email", "phone") println("\nUpdates with new 'phone' column:") updates.show() // Perform merge with schema evolution val deltaTable = DeltaTable.forPath(spark, tablePath) deltaTable.as("target") .merge( updates.as("source"), "target.customer_id = source.customer_id" ) .whenMatched().updateAll() .whenNotMatched().insertAll() .execute() println("\n❌ Without schema evolution, this would fail!") println("You need to enable schema evolution for MERGE operations:") spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") println("\n✓ With schema evolution enabled:") deltaTable.as("target") .merge( updates.as("source"), "target.customer_id = source.customer_id" ) .whenMatched().updateAll() .whenNotMatched().insertAll() .execute() println("\nFinal customers table with new column:") spark.read.format("delta").load(tablePath).show() } // ============================================ // EXAMPLE 9: Practical Use Case - Adding Audit Columns // ============================================ def example9_AddAuditColumns(): Unit = { println("\n" + "="*70) println("EXAMPLE 9: Practical Example - Adding Audit Columns") println("="*70) val tablePath = "/tmp/delta/beginner/transactions" // Initial table without audit columns val transactions = Seq( (1, 1001, 250.0), (2, 1002, 175.5) ).toDF("transaction_id", "account_id", "amount") println("\nOriginal transactions (no audit info):") transactions.show() transactions.write.format("delta").mode("overwrite").save(tablePath) // Add audit columns to new records val newTransactions = Seq( (3, 1003, 320.0, "2024-01-15", "system_user"), (4, 1004, 450.75, "2024-01-15", "admin_user") ).toDF("transaction_id", "account_id", "amount", "created_date", "created_by") println("\nNew transactions with audit columns:") newTransactions.show() // Append with schema evolution newTransactions.write .format("delta") .mode("append") .option("mergeSchema", "true") .save(tablePath) println("\n✓ Table now has audit columns:") val allTransactions = spark.read.format("delta").load(tablePath) allTransactions.show() println("\nOld records have NULL for audit columns - you can update them:") val deltaTable = DeltaTable.forPath(spark, tablePath) deltaTable.update( condition = col("created_date").isNull, set = Map( "created_date" -> lit("2024-01-01"), "created_by" -> lit("legacy_system") ) ) println("\nAfter backfilling audit columns:") spark.read.format("delta").load(tablePath).show() } // ============================================ // EXAMPLE 10: Best Practices Summary // ============================================ def example10_BestPractices(): Unit = { println("\n" + "="*70) println("EXAMPLE 10: Schema Evolution Best Practices") println("="*70) println(""" | |📋 SCHEMA EVOLUTION BEST PRACTICES: | |1. ENABLE SCHEMA EVOLUTION EXPLICITLY | ✓ Use .option("mergeSchema", "true") for append operations | ✓ Use .option("overwriteSchema", "true") for full schema replacement | ✓ Set spark.databricks.delta.schema.autoMerge.enabled for MERGE | |2. UNDERSTAND WHAT'S ALLOWED | ✓ Adding new columns: ALLOWED | ✓ Data type widening (Int → Long): ALLOWED | ✓ Changing column order: NO PROBLEM (Delta uses names) | ❌ Dropping columns: NOT DIRECTLY SUPPORTED | ❌ Data type narrowing (Long → Int): NOT ALLOWED | ❌ Changing column data types incompatibly: NOT ALLOWED | |3. USE TIME TRAVEL FOR SAFETY | ✓ Always check table history before major changes | ✓ You can read previous schemas using versionAsOf | ✓ Test schema changes on a copy first | |4. PLAN FOR NULLS | ✓ New columns will have NULL for existing records | ✓ Consider adding default values or backfilling data | ✓ Document what NULL means in your schema | |5. DOCUMENT SCHEMA CHANGES | ✓ Use column comments in ALTER TABLE statements | ✓ Keep track of why columns were added | ✓ Review table history regularly | |6. TEST IN DEVELOPMENT FIRST | ✓ Never test schema evolution on production data | ✓ Validate that downstream applications handle new schemas | ✓ Check query performance after schema changes | """.stripMargin) } // ============================================ // MAIN - RUN ALL BEGINNER EXAMPLES // ============================================ def main(args: Array[String]): Unit = { println("="*70) println("DELTA LAKE SCHEMA EVOLUTION - BEGINNER'S GUIDE") println("="*70) try { // Run all examples in sequence example1_CreateInitialTable() example2_AddNewColumns() example3_OverwriteSchema() example4_DataTypeWidening() example5_AlterTableAddColumn() example6_SchemaMismatchHandling() example7_SchemaHistory() example8_MergeWithSchemaEvolution() example9_AddAuditColumns() example10_BestPractices() println("\n" + "="*70) println("✓ ALL EXAMPLES COMPLETED SUCCESSFULLY!") println("="*70) } catch { case e: Exception => println(s"\n❌ Error occurred: ${e.getMessage}") e.printStackTrace() } finally { spark.stop() } } }

No comments:

Post a Comment