Tuesday, 16 December 2025

Spark Projects

 1. E-Commerce Product Catalog with SCD Type 2

Develop a product inventory system that tracks price changes over time using the Slowly Changing Dimension Type 2 approach.


-Ingest product data from a REST API (like Fake Store API)

-Parse JSON responses and transform data using Spark

-Handle Missing Values, Null Values, Duplicate records

-Calculate the percentage of bad records and good records

-Process JSON data with different optimization techniques (caching, partitioning, bucketing, broadcast joins)

-Implement SCD2 in Delta tables to maintain price history

-Store results in both delta and PostgreSQL

-Query historical prices and Current prices

-Structure Project into Packages- Create separate modules for extraction, transformation, and loading

-Generate Different Test case scenarios to check the validity of methods

-Use Jenkins for CI/CD

-Use Linux commands to create a shell script to execute the Spark jar file.

-----------------------------------------------------------------------------------------------------------------------

2. Weather Data Pipeline with Performance Optimization

Create a batch pipeline that fetches weather data and optimizes Spark processing.


-Pull historical weather data from OpenWeather API

-Handle Missing Values, Null Values, Duplicate records

-Load Clean data

-Process JSON data with different optimization techniques (caching, partitioning, bucketing, broadcast joins)

-Store results in PostgreSQL for reporting

-Package as: Create separate modules for extraction, transformation, and loading

-Generate Different Test case scenarios to check the validity of methods

-prepare JAR file for execution

-Use Jenkins for CI/CD

-Use Linux Commands to create a shell script to execute the Spark Jar file

---------------------------------------------------------------------------------------------------------------------------

3. User Activity Tracker with SCD Type 1

Build a batch system tracking user profile updates where only the current state matters.


1)Generate mock user activity data or pull from a user API

2)Implement SCD1 logic to overwrite old records

3)Use Spark to process updates in daily batches

4)Store the final state in both Delta and PostgreSQL


Data Modeling Concepts:


1)Slowly Changing Dimension Type 1: Overwrite strategy, no history

2)Natural Keys vs Surrogate Keys: Use email as a natural key, generate a surrogate for joins

3)Denormalization: Flatten nested JSON (address, location) for analytics

4) Check invalid, null, or missing records- perform Data Quality

5) Data Types: Proper type selection (VARCHAR, TIMESTAMP, BOOLEAN)

5) Following optimization techniques(repartitioning,coalesce)

6) Constraints: Primary keys, unique constraints, and not null constraints. Indexing Strategy: Index on frequently queried columns    (user_id, email, last_updated)


1)Generate Different Test case scenarios to check the validity of methods

2) Scala Package Structure: model, service,scd_type, main

3) Prepare JAR file for execution 

4) Use Jenkins for CI/CD

5) Use Linux commands to create a shell script to execute the Spark jar file 

---------------------------------------------------------------------------------------------------------------------------

4) Stock Market Historical Analyzer with SCD Type 3

Track limited stock history (current price, previous price, last change date) using batch loads.


1) Fetch historical stock data from Alpha Vantage API

2) Apply Schema to the data

2) Check invalid, null, or missing records- perform Data Quality

3) Implement SCD3 with current and previous value columns

4) Use Spark SQL for transformations

5) Perform Optimization using Caching and Persist

6) Create a Delta table with optimized partitioning by date

7) Scala Package Structure: API, DELTA/POSTGRE, Main Logic

8) Generate Different Test case scenarios to check the validity of methods

9)Prepare JAR file for execution

10)Use Jenkins for CI/CD

11) Use Linux Commands to create shell scripts to execute the Spark jar file

----------------------------------------------------------------------------------------------------------------------------

5) Customer Orders Data Warehouse

Build a mini data warehouse combining all three SCD types with batch processing.


1)Customer data (SCD2 - track address changes over time)

2)Product prices (SCD1 - keep only current price)

3)Payment methods (SCD3 - current and previous method)

4)Ingest from multiple JSON files via API

5)Handle Missing, Null Values as Data Quality checks

6)Implement daily incremental loads with Delta merge operations

7)Scala Package Structure:

Com.datawarehouse.orders

    ├── domain

    │   ├── customer (Customer models + SCD2)

    │   ├── product (Product models + SCD1)

    │   ├── payment (Payment models + SCD3)

    │   ├── dimension (all dimension tables)

    │   └── fact (OrderFact, InventorySnapshot)

    ├── etl

    │   ├── extract

    │   ├── transform

    │   └── load

    ├── shared (common utilities)

    └── app (main application)

7) Generate Different Test case scenarios to check the validity of methods.

8) Prepare JAR file for execution

9) Use Jenkins for CI/CD

10) Use Linux Commands to create shell scripts to executethe Sparkk jar file

----------------------------------------------------------------------------------------------------------------------------

6)Process application logs in batches with focus on Spark optimization techniques.


1)Read JSON logs from files or a REST endpoint

2)Handle the Data Quality issue of missing, null, and bad records

3)Apply various optimization techniques: predicate pushdown, column pruning, handling data skewness, if any, with the salting technique

4)Include various Spark transformations and Actions

5)Implement CDC

6)Store records in PostgreSQL

7)Execute PostgreSQL scripts, Temp View, Stored Procedure, and User 

  Defined Function.

8)Generate Different Test case scenarios to check the validity of methods

9)Scala Package Structure: API, Schema, metrics,json_parser

10)prepare JAR file for execution

11)Create a Shell Script to exeute spark JAR file using Linux Commands

12)USE Jenkins for CI/CD


8) Use Jenkins for CI/CD

Monday, 15 December 2025

SCD 2 TYPE IMPLEMENTATION WITH DIM AND FACT TABLE

 -- ===============================================

-- COMPLETE SCD TYPE 2 ETL EXAMPLE

-- ===============================================

-- This example shows the complete flow from source to fact tables

-- with SCD Type 2 dimension handling


-- ===============================================

-- STEP 1: CREATE SOURCE TABLES (Simulating OLTP)

-- ===============================================


-- Source Customer Table (Business System)

CREATE TABLE source_customer (

    customer_id INT PRIMARY KEY,

    customer_name VARCHAR(100),

    email VARCHAR(100),

    address VARCHAR(200),

    phone VARCHAR(20),

    last_modified_date DATE

);


-- Source Product Table

CREATE TABLE source_product (

    product_id INT PRIMARY KEY,

    product_name VARCHAR(100),

    category VARCHAR(50),

    price DECIMAL(10,2),

    last_modified_date DATE

);


-- Source Sales Table

CREATE TABLE source_sales (

    sale_id INT PRIMARY KEY,

    customer_id INT,

    product_id INT,

    sale_date DATE,

    quantity INT,

    amount DECIMAL(10,2)

);


-- ===============================================

-- STEP 2: CREATE STAGING TABLES

-- ===============================================


CREATE TABLE staging_customer (

    customer_id INT,

    customer_name VARCHAR(100),

    email VARCHAR(100),

    address VARCHAR(200),

    phone VARCHAR(20),

    last_modified_date DATE

);


CREATE TABLE staging_sales (

    sale_id INT,

    customer_id INT,

    product_id INT,

    sale_date DATE,

    quantity INT,

    amount DECIMAL(10,2)

);


-- ===============================================

-- STEP 3: CREATE DIMENSION TABLES (SCD TYPE 2)

-- ===============================================


CREATE TABLE dim_customer (

    customer_sk INT PRIMARY KEY AUTO_INCREMENT,  -- Surrogate Key

    customer_id INT NOT NULL,                     -- Business Key

    customer_name VARCHAR(100),

    email VARCHAR(100),

    address VARCHAR(200),

    phone VARCHAR(20),

    effective_date DATE NOT NULL,

    end_date DATE,

    is_current BOOLEAN NOT NULL DEFAULT 1,

    created_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_business_key (customer_id, is_current),

    INDEX idx_date_range (customer_id, effective_date, end_date)

);


CREATE TABLE dim_product (

    product_sk INT PRIMARY KEY AUTO_INCREMENT,

    product_id INT NOT NULL,

    product_name VARCHAR(100),

    category VARCHAR(50),

    price DECIMAL(10,2),

    effective_date DATE NOT NULL,

    end_date DATE,

    is_current BOOLEAN NOT NULL DEFAULT 1,

    created_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_business_key (product_id, is_current)

);


-- Date Dimension (Type 1 - no history needed)

CREATE TABLE dim_date (

    date_sk INT PRIMARY KEY,

    full_date DATE,

    year INT,

    quarter INT,

    month INT,

    day INT,

    day_of_week VARCHAR(20)

);


-- ===============================================

-- STEP 4: CREATE FACT TABLE

-- ===============================================


CREATE TABLE fact_sales (

    sale_id INT PRIMARY KEY,

    customer_sk INT NOT NULL,           -- Links to dimension surrogate key

    product_sk INT NOT NULL,             -- Links to dimension surrogate key

    date_sk INT NOT NULL,

    quantity INT,

    amount DECIMAL(10,2),

    load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    FOREIGN KEY (customer_sk) REFERENCES dim_customer(customer_sk),

    FOREIGN KEY (product_sk) REFERENCES dim_product(product_sk),

    FOREIGN KEY (date_sk) REFERENCES dim_date(date_sk),

    INDEX idx_customer (customer_sk),

    INDEX idx_product (product_sk),

    INDEX idx_date (date_sk)

);


-- ===============================================

-- STEP 5: INSERT INITIAL SOURCE DATA

-- ===============================================


-- Initial Customer Data (Jan 2024)

INSERT INTO source_customer VALUES

(101, 'John Smith', 'john@email.com', '123 Oak Street, Boston', '555-0101', '2024-01-01'),

(102, 'Mary Johnson', 'mary@email.com', '456 Elm Avenue, NYC', '555-0102', '2024-01-01'),

(103, 'Bob Williams', 'bob@email.com', '789 Pine Road, Chicago', '555-0103', '2024-01-01');


-- Initial Product Data

INSERT INTO source_product VALUES

(201, 'Laptop Pro', 'Electronics', 1200.00, '2024-01-01'),

(202, 'Wireless Mouse', 'Electronics', 25.00, '2024-01-01'),

(203, 'Office Chair', 'Furniture', 350.00, '2024-01-01');


-- Initial Sales (Jan-Mar 2024)

INSERT INTO source_sales VALUES

(1001, 101, 201, '2024-01-15', 1, 1200.00),

(1002, 102, 202, '2024-02-10', 2, 50.00),

(1003, 101, 203, '2024-03-05', 1, 350.00),

(1004, 103, 201, '2024-03-20', 1, 1200.00);


-- ===============================================

-- STEP 6: INITIAL DIMENSION LOAD (FIRST RUN)

-- ===============================================


-- Load dim_customer - Initial Load

INSERT INTO dim_customer (customer_id, customer_name, email, address, phone, effective_date, end_date, is_current)

SELECT 

    customer_id,

    customer_name,

    email,

    address,

    phone,

    last_modified_date as effective_date,

    NULL as end_date,

    1 as is_current

FROM source_customer;


-- Load dim_product - Initial Load

INSERT INTO dim_product (product_id, product_name, category, price, effective_date, end_date, is_current)

SELECT 

    product_id,

    product_name,

    category,

    price,

    last_modified_date as effective_date,

    NULL as end_date,

    1 as is_current

FROM source_product;


-- Load dim_date (simplified - just the dates we need)

INSERT INTO dim_date VALUES

(20240115, '2024-01-15', 2024, 1, 1, 15, 'Monday'),

(20240210, '2024-02-10', 2024, 1, 2, 10, 'Saturday'),

(20240305, '2024-03-05', 2024, 1, 3, 5, 'Tuesday'),

(20240320, '2024-03-20', 2024, 1, 3, 20, 'Wednesday'),

(20240725, '2024-07-25', 2024, 3, 7, 25, 'Thursday'),

(20240815, '2024-08-15', 2024, 3, 8, 15, 'Thursday');


-- Insert default/unknown records

INSERT INTO dim_customer (customer_sk, customer_id, customer_name, email, address, phone, effective_date, is_current) VALUES

(-1, -1, 'Unknown', 'unknown@unknown.com', 'Unknown', 'Unknown', '1900-01-01', 1);


INSERT INTO dim_product (product_sk, product_id, product_name, category, price, effective_date, is_current) VALUES

(-1, -1, 'Unknown', 'Unknown', 0.00, '1900-01-01', 1);


-- ===============================================

-- STEP 7: INITIAL FACT TABLE LOAD

-- ===============================================


-- Load fact_sales with surrogate key lookups

INSERT INTO fact_sales (sale_id, customer_sk, product_sk, date_sk, quantity, amount)

SELECT 

    s.sale_id,

    COALESCE(dc.customer_sk, -1) as customer_sk,  -- Get surrogate key

    COALESCE(dp.product_sk, -1) as product_sk,    -- Get surrogate key

    CAST(DATE_FORMAT(s.sale_date, '%Y%m%d') AS UNSIGNED) as date_sk,

    s.quantity,

    s.amount

FROM source_sales s

LEFT JOIN dim_customer dc 

    ON s.customer_id = dc.customer_id 

    AND dc.is_current = 1                          -- Get current version

LEFT JOIN dim_product dp 

    ON s.product_id = dp.product_id 

    AND dp.is_current = 1;


-- View initial results

SELECT 'Initial Dimension Data' as Step;

SELECT * FROM dim_customer WHERE customer_id > 0;


SELECT 'Initial Fact Data with Dimension Info' as Step;

SELECT 

    f.sale_id,

    dc.customer_name,

    dc.address as customer_address,

    dp.product_name,

    f.amount,

    f.quantity

FROM fact_sales f

JOIN dim_customer dc ON f.customer_sk = dc.customer_sk

JOIN dim_product dp ON f.product_sk = dp.product_sk

ORDER BY f.sale_id;


-- ===============================================

-- STEP 8: SIMULATE CHANGES (July 2024)

-- ===============================================


-- Customer 101 moves to a new address

UPDATE source_customer 

SET address = '999 Maple Drive, Boston', 

    last_modified_date = '2024-07-01'

WHERE customer_id = 101;


-- Customer 102 changes email

UPDATE source_customer 

SET email = 'mary.johnson@newemail.com',

    last_modified_date = '2024-07-01'

WHERE customer_id = 102;


-- Product price change

UPDATE source_product 

SET price = 1150.00,

    last_modified_date = '2024-07-01'

WHERE product_id = 201;


-- New sales AFTER the changes

INSERT INTO source_sales VALUES

(1005, 101, 202, '2024-07-25', 3, 75.00),   -- John at NEW address

(1006, 102, 201, '2024-08-15', 1, 1150.00); -- Mary with NEW email, product at NEW price


-- ===============================================

-- STEP 9: SCD TYPE 2 UPDATE PROCESS

-- ===============================================


-- Extract changes to staging

TRUNCATE staging_customer;

INSERT INTO staging_customer 

SELECT * FROM source_customer;


-- Process SCD Type 2 for dim_customer

-- Step 9a: Identify changed records

CREATE TEMPORARY TABLE changed_customers AS

SELECT 

    s.customer_id,

    s.customer_name,

    s.email,

    s.address,

    s.phone,

    s.last_modified_date

FROM staging_customer s

JOIN dim_customer d 

    ON s.customer_id = d.customer_id 

    AND d.is_current = 1

WHERE 

    s.customer_name != d.customer_name 

    OR s.email != d.email 

    OR s.address != d.address 

    OR s.phone != d.phone;


-- Step 9b: Expire old records (set is_current = 0, add end_date)

UPDATE dim_customer d

JOIN changed_customers c ON d.customer_id = c.customer_id

SET 

    d.is_current = 0,

    d.end_date = DATE_SUB(c.last_modified_date, INTERVAL 1 DAY)

WHERE d.is_current = 1;


-- Step 9c: Insert new versions

INSERT INTO dim_customer (customer_id, customer_name, email, address, phone, effective_date, end_date, is_current)

SELECT 

    customer_id,

    customer_name,

    email,

    address,

    phone,

    last_modified_date as effective_date,

    NULL as end_date,

    1 as is_current

FROM changed_customers;


-- Similar process for products

CREATE TEMPORARY TABLE changed_products AS

SELECT 

    s.product_id,

    s.product_name,

    s.category,

    s.price,

    s.last_modified_date

FROM source_product s

JOIN dim_product d 

    ON s.product_id = d.product_id 

    AND d.is_current = 1

WHERE 

    s.product_name != d.product_name 

    OR s.category != d.category 

    OR s.price != d.price;


UPDATE dim_product d

JOIN changed_products c ON d.product_id = c.product_id

SET 

    d.is_current = 0,

    d.end_date = DATE_SUB(c.last_modified_date, INTERVAL 1 DAY)

WHERE d.is_current = 1;


INSERT INTO dim_product (product_id, product_name, category, price, effective_date, end_date, is_current)

SELECT 

    product_id,

    product_name,

    category,

    price,

    last_modified_date as effective_date,

    NULL as end_date,

    1 as is_current

FROM changed_products;


-- ===============================================

-- STEP 10: LOAD NEW FACT RECORDS

-- ===============================================


-- Load new sales with CURRENT surrogate keys

INSERT INTO fact_sales (sale_id, customer_sk, product_sk, date_sk, quantity, amount)

SELECT 

    s.sale_id,

    COALESCE(dc.customer_sk, -1) as customer_sk,

    COALESCE(dp.product_sk, -1) as product_sk,

    CAST(DATE_FORMAT(s.sale_date, '%Y%m%d') AS UNSIGNED) as date_sk,

    s.quantity,

    s.amount

FROM source_sales s

LEFT JOIN dim_customer dc 

    ON s.customer_id = dc.customer_id 

    AND dc.is_current = 1                    -- Gets NEW version!

LEFT JOIN dim_product dp 

    ON s.product_id = dp.product_id 

    AND dp.is_current = 1

WHERE s.sale_id NOT IN (SELECT sale_id FROM fact_sales);


-- ===============================================

-- STEP 11: VERIFICATION QUERIES

-- ===============================================


-- Show SCD Type 2 in action - Customer History

SELECT 

    'Customer 101 History - Shows Both Versions' as Description,

    customer_sk,

    customer_id,

    customer_name,

    address,

    effective_date,

    end_date,

    is_current

FROM dim_customer

WHERE customer_id = 101

ORDER BY effective_date;


-- Show Product History

SELECT 

    'Product 201 History - Price Change' as Description,

    product_sk,

    product_id,

    product_name,

    price,

    effective_date,

    end_date,

    is_current

FROM dim_product

WHERE product_id = 201

ORDER BY effective_date;


-- Show ALL sales with historical accuracy

SELECT 

    'All Sales with Historical Context' as Description,

    f.sale_id,

    f.date_sk as sale_date,

    dc.customer_name,

    dc.address as customer_address_at_sale_time,

    dc.effective_date as customer_version_from,

    dc.end_date as customer_version_to,

    dp.product_name,

    dp.price as product_price_at_sale_time,

    f.quantity,

    f.amount

FROM fact_sales f

JOIN dim_customer dc ON f.customer_sk = dc.customer_sk

JOIN dim_product dp ON f.product_sk = dp.product_sk

ORDER BY f.sale_id;


-- Show customer's sales across address changes

SELECT 

    'Customer 101 Sales - Different Addresses at Different Times' as Description,

    f.sale_id,

    f.date_sk as sale_date,

    dc.address as address_at_time_of_sale,

    dc.is_current as is_current_address,

    dp.product_name,

    f.amount

FROM fact_sales f

JOIN dim_customer dc ON f.customer_sk = dc.customer_sk

JOIN dim_product dp ON f.product_sk = dp.product_sk

WHERE dc.customer_id = 101

ORDER BY f.sale_id;


-- Analytical Query: Sales by Customer (all versions aggregated by business key)

SELECT 

    'Sales Summary by Customer (All Versions)' as Description,

    dc.customer_id,

    MAX(CASE WHEN dc.is_current = 1 THEN dc.customer_name END) as current_name,

    COUNT(DISTINCT f.sale_id) as total_sales_count,

    SUM(f.amount) as total_sales_amount

FROM fact_sales f

JOIN dim_customer dc ON f.customer_sk = dc.customer_sk

GROUP BY dc.customer_id

ORDER BY total_sales_amount DESC;


-- Show current view (most common query pattern)

SELECT 

    'Current Snapshot - Latest Customer Info with All Sales' as Description,

    curr_dc.customer_name,

    curr_dc.address as current_address,

    COUNT(f.sale_id) as total_sales,

    SUM(f.amount) as total_amount

FROM fact_sales f

JOIN dim_customer dc ON f.customer_sk = dc.customer_sk  -- Historical join

JOIN dim_customer curr_dc 

    ON dc.customer_id = curr_dc.customer_id 

    AND curr_dc.is_current = 1                          -- Current info

GROUP BY curr_dc.customer_sk, curr_dc.customer_name, curr_dc.address;


-- ===============================================

-- CLEANUP TEMPORARY TABLES

-- ===============================================

DROP TEMPORARY TABLE IF EXISTS changed_customers;

DROP TEMPORARY TABLE IF EXISTS changed_products;

Wednesday, 10 December 2025

Scala Spark Hive

import org.apache.spark.sql.{SaveMode, SparkSession, functions => F}
import io.delta.tables._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.LongType
import org.apache.log4j.{Level, Logger}
case class Customer(customer_id:Int,name:String,email:String,city:String,country:String)
object Main {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("CDC Example")
.master("local[*]")
.config("spark.sql.warehouse.dir", "/home/kaustubh/Documents/")
.config("spark.sql.catalogImplementation", "hive")
.enableHiveSupport()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.adaptive.enabled","true")
.getOrCreate()
Logger.getLogger("akka").setLevel(Level.ERROR)
import spark.implicits._
// spark.sql("CREATE DATABASE IF NOT EXISTS sales_db")
spark.sql("USE sales_db")
spark.sql("DROP TABLE IF EXISTS customers");
spark.sql("DROP TABLE IF EXISTS customers_managed");
println("customers table deleted successfully")
// val Customers = Seq(
// Customer(1, "John Doe", "john@email.com", "New York", "USA"),
// Customer(2, "Jane Smith", "jane@email.com", "London", "UK"),
// Customer(3, "Bob Johnson", "bob@email.com", "Toronto", "Canada")
// ).toDF()
// Customers.write.mode(SaveMode.Overwrite).saveAsTable("sales_db.customer_data")
// spark.sql("SELECT * FROM sales_db.customer_data").show()

spark.sql(
"""
|CREATE TABLE IF NOT EXISTS customers_managed(
|customer_id INT,
|name STRING,
|email STRING,
|city STRING,
|country STRING)
|STORED AS ORC
|""".stripMargin)
spark.sql(
"""
|CREATE TABLE IF NOT EXISTS customers(
|customer_id INT,
|name STRING,
|email STRING,
|city STRING,
|country STRING
|) ROW FORMAT DELIMITED
|FIELDS TERMINATED BY ','
|STORED AS TEXTFILE
|LOCATION '/home/kaustubh/Desktop/'
|TBLPROPERTIES ("skip.header.line.count"="1");
|""".stripMargin)
val df = spark.read
.option("header","true")
.option("inferSchema","false")
.csv("/home/kaustubh/Downloads/customers.csv")
df.write
.mode("overwrite")
.saveAsTable("customers")
spark.sql("SELECT * FROM customers").show()
spark.sql(
"""
|INSERT INTO TABLE customers_managed
|SELECT
|CAST(customer_id AS INT),
|name STRING,
|email STRING,
|city STRING,
|country STRING
|FROM customers
|""".stripMargin)
spark.sql("SELECT * FROM customers_managed").show()
spark.sql(
"""
|CREATE TABLE IF NOT EXISTS orders(
|order_id INT,
|customer_id INT,
|order_date DATE,
|amount DECIMAL(10,2),
|status STRING)
|ROW FORMAT DELIMITED
|FIELDS TERMINATED BY ','
|STORED AS TEXTFILE
|LOCATION '/home/kaustubh/Desktop/'
|TBLPROPERTIES ("skip.header.line.count"="1");
|""".stripMargin)
spark.sql(
"""
|CREATE TABLE IF NOT EXISTS orders_managed(
|order_id INT,
|customer_id INT,
|order_date DATE,
|amount DECIMAL(10,2),
|status STRING)
|STORED AS ORC
|""".stripMargin
)
val df_orders = spark.read
.option("header","true")
.option("inferSchema","false")
.csv("/home/kaustubh/Downloads/orders.csv")
df_orders.write
.mode("overwrite")
.saveAsTable("orders")
spark.sql("SELECT * FROM orders").show()
spark.sql(
"""
|INSERT INTO TABLE orders_managed
|SELECT
|order_id, customer_id,order_date,amount,status
|FROM orders
|""".stripMargin)
spark.sql("SELECT * FROM orders_managed").show()

// System.in.read()
spark.stop()
}
}

Schema Evolution - Basics

import org.apache.spark.sql.{SparkSession, DataFrame}

import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import io.delta.tables._ object DeltaSchemaEvolutionBasics { // Initialize Spark with Delta Lake support val spark = SparkSession.builder() .appName("DeltaSchemaEvolutionBasics") .master("local[*]") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate() import spark.implicits._ // ============================================ // EXAMPLE 1: Basic Table Creation // ============================================ def example1_CreateInitialTable(): Unit = { println("\n" + "="*70) println("EXAMPLE 1: Creating Your First Delta Table") println("="*70) val tablePath = "/tmp/delta/beginner/users" // Create simple data val users = Seq( (1, "Alice", "alice@email.com"), (2, "Bob", "bob@email.com"), (3, "Charlie", "charlie@email.com") ).toDF("id", "name", "email") println("\nOriginal data:") users.show() users.printSchema() // Write as Delta table users.write .format("delta") .mode("overwrite") .save(tablePath) println(s"\n✓ Delta table created at: $tablePath") // Read it back val savedTable = spark.read.format("delta").load(tablePath) println("\nReading the saved table:") savedTable.show() } // ============================================ // EXAMPLE 2: Adding New Columns (Schema Evolution) // ============================================ def example2_AddNewColumns(): Unit = { println("\n" + "="*70) println("EXAMPLE 2: Adding New Columns to Existing Table") println("="*70) val tablePath = "/tmp/delta/beginner/users" println("\nCurrent schema:") spark.read.format("delta").load(tablePath).printSchema() // Create new data with additional columns val newUsers = Seq( (4, "David", "david@email.com", 30, "New York"), (5, "Emma", "emma@email.com", 25, "London") ).toDF("id", "name", "email", "age", "city") println("\nNew data with extra columns (age, city):") newUsers.show() newUsers.printSchema() // Try to append WITHOUT schema evolution (will fail) println("\n❌ Without mergeSchema option - this would fail:") println("users.write.format('delta').mode('append').save(tablePath)") // Append WITH schema evolution println("\n✓ With mergeSchema option - this works:") newUsers.write .format("delta") .mode("append") .option("mergeSchema", "true") // KEY OPTION for schema evolution .save(tablePath) println("\nUpdated table with new columns:") val updatedTable = spark.read.format("delta").load(tablePath) updatedTable.printSchema() updatedTable.show() println("\nNotice: Old records have NULL for new columns (age, city)") } // ============================================ // EXAMPLE 3: Overwriting Schema Completely // ============================================ def example3_OverwriteSchema(): Unit = { println("\n" + "="*70) println("EXAMPLE 3: Completely Replacing Table Schema") println("="*70) val tablePath = "/tmp/delta/beginner/products" // Create initial table val initialProducts = Seq( (1, "Laptop", 999.99), (2, "Mouse", 29.99) ).toDF("product_id", "product_name", "price") println("\nInitial products table:") initialProducts.show() initialProducts.printSchema() initialProducts.write .format("delta") .mode("overwrite") .save(tablePath) // Create completely new schema val newProducts = Seq( (101, "Monitor", "Electronics", 299.99, 10), (102, "Keyboard", "Electronics", 79.99, 25) ).toDF("id", "name", "category", "price", "stock") println("\nNew products with different schema:") newProducts.show() newProducts.printSchema() // Overwrite with new schema newProducts.write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") // KEY OPTION to replace schema .save(tablePath) println("\n✓ Schema completely replaced:") val updatedProducts = spark.read.format("delta").load(tablePath) updatedProducts.printSchema() updatedProducts.show() } // ============================================ // EXAMPLE 4: Data Type Widening (Automatic) // ============================================ def example4_DataTypeWidening(): Unit = { println("\n" + "="*70) println("EXAMPLE 4: Automatic Data Type Widening") println("="*70) val tablePath = "/tmp/delta/beginner/sales" // Create table with Int println("Step 1: Creating table with Integer salary") val initialData = Seq( (1, "John", 50000), (2, "Mary", 60000), (3, "Bob", 55000) ).toDF("id", "name", "salary") initialData.write.format("delta").mode("overwrite").save(tablePath) println("\nInitial Schema:") spark.read.format("delta").load(tablePath).printSchema() spark.read.format("delta").load(tablePath).show() // Step 2: Read, cast, and overwrite with new schema println("\nStep 2: Widening salary from INT to BIGINT") val df = spark.read.format("delta").load(tablePath) val widenedDF = df.select( $"id", $"name", $"salary".cast(LongType).as("salary") ) widenedDF.write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") .save(tablePath) println("\nSchema after widening:") spark.read.format("delta").load(tablePath).printSchema() // Step 3: Insert new data with larger salary values println("\nStep 3: Inserting data with large salary values") val newData = Seq( (4, "Alice", 5000000000L) // Large value that needs BIGINT ).toDF("id", "name", "salary") newData.write.format("delta").mode("append").save(tablePath) println("\nFinal data:") spark.read.format("delta").load(tablePath).show() println("\n✓ Type widening successful!") println("Salary column changed from INT to BIGINT without losing any data") } // ============================================ // EXAMPLE 5: Using ALTER TABLE to Add Columns // ============================================ def example5_AlterTableAddColumn(): Unit = { println("\n" + "="*70) println("EXAMPLE 5: Using SQL ALTER TABLE to Add Columns") println("="*70) val tablePath = "/tmp/delta/beginner/employees" // Create initial table val employees = Seq( (1, "John", "Engineering"), (2, "Sarah", "Marketing") ).toDF("emp_id", "emp_name", "department") employees.write.format("delta").mode("overwrite").save(tablePath) // Create a temporary view for SQL operations spark.read.format("delta").load(tablePath).createOrReplaceTempView("employees") println("\nInitial employee table:") spark.sql("SELECT * FROM employees").show() spark.sql("DESCRIBE employees").show() // Add column using SQL println("\nAdding 'salary' column using ALTER TABLE:") spark.sql(s""" ALTER TABLE delta.`$tablePath` ADD COLUMNS (salary DOUBLE) """) spark.read.format("delta").load(tablePath).createOrReplaceTempView("employees") println("\n✓ Column added:") spark.sql("DESCRIBE employees").show() spark.sql("SELECT * FROM employees").show() // You can also add column with comment spark.sql(s""" ALTER TABLE delta.`$tablePath` ADD COLUMNS (hire_date STRING COMMENT 'Employee hire date') """) spark.read.format("delta").load(tablePath).createOrReplaceTempView("employees") println("\n✓ Another column added with comment:") spark.sql("DESCRIBE employees").show() } // ============================================ // EXAMPLE 6: Handling Schema Mismatches // ============================================ def example6_SchemaMismatchHandling(): Unit = { println("\n" + "="*70) println("EXAMPLE 6: Understanding Schema Mismatch Errors") println("="*70) val tablePath = "/tmp/delta/beginner/orders" // Create initial table val orders = Seq( (1, "Product A", 50.0), (2, "Product B", 75.0) ).toDF("order_id", "product", "amount") orders.write.format("delta").mode("overwrite").save(tablePath) println("\nOriginal orders table:") spark.read.format("delta").load(tablePath).show() // Try to append data with different column order val newOrders = Seq( (100.0, 3, "Product C"), // Different order! (125.0, 4, "Product D") ).toDF("amount", "order_id", "product") println("\nNew data (columns in different order):") newOrders.show() println("\n✓ Delta Lake matches by column NAME, not position:") newOrders.write .format("delta") .mode("append") .save(tablePath) println("\nAll orders (correctly matched despite different column order):") spark.read.format("delta").load(tablePath).show() // Example of incompatible schema (will fail) println("\n❌ Incompatible schema example:") println("If you try to change data types incompatibly (e.g., String to Int),") println("Delta Lake will raise an error to protect data integrity.") } // ============================================ // EXAMPLE 7: Viewing Schema History // ============================================ def example7_SchemaHistory(): Unit = { println("\n" + "="*70) println("EXAMPLE 7: Tracking Schema Changes Over Time") println("="*70) val tablePath = "/tmp/delta/beginner/users" val deltaTable = DeltaTable.forPath(spark, tablePath) println("\nTable history (shows all operations including schema changes):") deltaTable.history().select("version", "timestamp", "operation", "operationMetrics") .show(false) // Read table at different versions to see schema evolution println("\nSchema at version 0 (initial creation):") spark.read.format("delta").option("versionAsOf", 0).load(tablePath).printSchema() println("\nSchema at latest version (after adding columns):") spark.read.format("delta").load(tablePath).printSchema() println("\nYou can always time-travel back to previous schemas!") } // ============================================ // EXAMPLE 8: Merge with Schema Evolution // ============================================ def example8_MergeWithSchemaEvolution(): Unit = { println("\n" + "="*70) println("EXAMPLE 8: MERGE Operations with Schema Evolution") println("="*70) val tablePath = "/tmp/delta/beginner/customers" // Create target table val customers = Seq( (1, "Alice", "alice@email.com"), (2, "Bob", "bob@email.com") ).toDF("customer_id", "name", "email") customers.write.format("delta").mode("overwrite").save(tablePath) println("\nExisting customers table:") spark.read.format("delta").load(tablePath).show() // New data with additional column val updates = Seq( (2, "Bob Smith", "bob.smith@email.com", "555-1234"), // Update existing (3, "Charlie", "charlie@email.com", "555-5678") // Insert new ).toDF("customer_id", "name", "email", "phone") println("\nUpdates with new 'phone' column:") updates.show() // Perform merge with schema evolution val deltaTable = DeltaTable.forPath(spark, tablePath) deltaTable.as("target") .merge( updates.as("source"), "target.customer_id = source.customer_id" ) .whenMatched().updateAll() .whenNotMatched().insertAll() .execute() println("\n❌ Without schema evolution, this would fail!") println("You need to enable schema evolution for MERGE operations:") spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") println("\n✓ With schema evolution enabled:") deltaTable.as("target") .merge( updates.as("source"), "target.customer_id = source.customer_id" ) .whenMatched().updateAll() .whenNotMatched().insertAll() .execute() println("\nFinal customers table with new column:") spark.read.format("delta").load(tablePath).show() } // ============================================ // EXAMPLE 9: Practical Use Case - Adding Audit Columns // ============================================ def example9_AddAuditColumns(): Unit = { println("\n" + "="*70) println("EXAMPLE 9: Practical Example - Adding Audit Columns") println("="*70) val tablePath = "/tmp/delta/beginner/transactions" // Initial table without audit columns val transactions = Seq( (1, 1001, 250.0), (2, 1002, 175.5) ).toDF("transaction_id", "account_id", "amount") println("\nOriginal transactions (no audit info):") transactions.show() transactions.write.format("delta").mode("overwrite").save(tablePath) // Add audit columns to new records val newTransactions = Seq( (3, 1003, 320.0, "2024-01-15", "system_user"), (4, 1004, 450.75, "2024-01-15", "admin_user") ).toDF("transaction_id", "account_id", "amount", "created_date", "created_by") println("\nNew transactions with audit columns:") newTransactions.show() // Append with schema evolution newTransactions.write .format("delta") .mode("append") .option("mergeSchema", "true") .save(tablePath) println("\n✓ Table now has audit columns:") val allTransactions = spark.read.format("delta").load(tablePath) allTransactions.show() println("\nOld records have NULL for audit columns - you can update them:") val deltaTable = DeltaTable.forPath(spark, tablePath) deltaTable.update( condition = col("created_date").isNull, set = Map( "created_date" -> lit("2024-01-01"), "created_by" -> lit("legacy_system") ) ) println("\nAfter backfilling audit columns:") spark.read.format("delta").load(tablePath).show() } // ============================================ // EXAMPLE 10: Best Practices Summary // ============================================ def example10_BestPractices(): Unit = { println("\n" + "="*70) println("EXAMPLE 10: Schema Evolution Best Practices") println("="*70) println(""" | |📋 SCHEMA EVOLUTION BEST PRACTICES: | |1. ENABLE SCHEMA EVOLUTION EXPLICITLY | ✓ Use .option("mergeSchema", "true") for append operations | ✓ Use .option("overwriteSchema", "true") for full schema replacement | ✓ Set spark.databricks.delta.schema.autoMerge.enabled for MERGE | |2. UNDERSTAND WHAT'S ALLOWED | ✓ Adding new columns: ALLOWED | ✓ Data type widening (Int → Long): ALLOWED | ✓ Changing column order: NO PROBLEM (Delta uses names) | ❌ Dropping columns: NOT DIRECTLY SUPPORTED | ❌ Data type narrowing (Long → Int): NOT ALLOWED | ❌ Changing column data types incompatibly: NOT ALLOWED | |3. USE TIME TRAVEL FOR SAFETY | ✓ Always check table history before major changes | ✓ You can read previous schemas using versionAsOf | ✓ Test schema changes on a copy first | |4. PLAN FOR NULLS | ✓ New columns will have NULL for existing records | ✓ Consider adding default values or backfilling data | ✓ Document what NULL means in your schema | |5. DOCUMENT SCHEMA CHANGES | ✓ Use column comments in ALTER TABLE statements | ✓ Keep track of why columns were added | ✓ Review table history regularly | |6. TEST IN DEVELOPMENT FIRST | ✓ Never test schema evolution on production data | ✓ Validate that downstream applications handle new schemas | ✓ Check query performance after schema changes | """.stripMargin) } // ============================================ // MAIN - RUN ALL BEGINNER EXAMPLES // ============================================ def main(args: Array[String]): Unit = { println("="*70) println("DELTA LAKE SCHEMA EVOLUTION - BEGINNER'S GUIDE") println("="*70) try { // Run all examples in sequence example1_CreateInitialTable() example2_AddNewColumns() example3_OverwriteSchema() example4_DataTypeWidening() example5_AlterTableAddColumn() example6_SchemaMismatchHandling() example7_SchemaHistory() example8_MergeWithSchemaEvolution() example9_AddAuditColumns() example10_BestPractices() println("\n" + "="*70) println("✓ ALL EXAMPLES COMPLETED SUCCESSFULLY!") println("="*70) } catch { case e: Exception => println(s"\n❌ Error occurred: ${e.getMessage}") e.printStackTrace() } finally { spark.stop() } } }