Thursday, 26 February 2026

AWS Data Engineering Project Documentation

 Project Documentation

PART 1 — IAM Roles
  • Role 1 — Lambda Role
IAM → Roles → Create role
  •  Trusted entity type: AWS Service
  • Service: Lambda
  • Click Next
  •  Skip adding managed policies for now → click Next
  • Role name: lambda-pipeline-execution-role
  • Create role
Now add permissions — search for your new role and click it:

Permissions tab → Add permissions → Create inline policy
Click JSON tab → paste this:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "glue:StartCrawler",
        "glue:GetCrawler",
        "glue:StartJobRun",
        "glue:GetJobRun"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::e-commerce-data-kpc",
        "arn:aws:s3:::e-commerce-data-kpc/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "*"
    }
  ]
}

Click Next
Policy name: lambda-pipeline-policy
Create policy


Role 2 — Glue Role

IAM Roles  Create role
  • Trusted entity type: AWS Service
  • Service: Glue
  • Click Next
  • Search "AWSGlueServiceRole" → check it → Next
  • Role name: glue-pipeline-service-role
    

One Role — Two Policies Inside It

glue-pipeline-service-role
    ├── AWSGlueServiceRole    ← managed policy (attached during role creation)
    └── glue-s3-access            ← inline policy (you add this after)
Create role

Permissions tab → Add permissions → Create inline policy
JSON tab → paste this:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::e-commerce-data-kpc",
        "arn:aws:s3:::e-commerce-data-kpc/*"
      ]
    }
  ]
}

  • Policy name: glue-s3-access
  • Create policy
PART 2 — S3 Folders

S3 → e-commerce-data-kpc → Create folder


Create these 4 folders one by one:

| Folder name |
------------------------
| raw |
| etl_processed_data |
| athena-results |
| glue-scripts |

Then upload `orders.json` into the `raw/` folder and upload `etl_ecommerce.py` script into `glue-scripts/` folder.


PART 3 — Glue Setup

Create Database

Glue → Databases → Add database
Name: ecommerce_db
Create

Create Crawler

Glue → Crawlers → Create crawler
Name: ecommerce-crawler
Next
Add data source → S3
S3 path: s3://e-commerce-data-kpc/raw/
Add → Next
IAM Role: glue-pipeline-service-role
Next
Target database: ecommerce_db
Next → Create crawler






Create ETL Job

Glue → ETL Jobs → Script editor
  • Engine: Spark
  • Start fresh
  • Job name: etl_ecommerce
  • IAM Role: glue-pipeline-service-role
  • Glue version: Glue 4.0
  • Worker type: G.1X
  • Number of workers: 2
Paste the ETL script in the editor and click Save


PART 4 — Lambda Functions

Lambda 1 — s3-crawler-trigger

Lambda → Create function
  • Function name: s3-crawler-trigger
  • Runtime: Python 3.12
  • Permissions → Use existing role → lambda-pipeline-execution-role
  • Create function
Paste code → click Deploy:

import boto3

CRAWLER_NAME = 'ecommerce-crawler'

def lambda_handler(event, context):
    glue = boto3.client('glue')
    
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = event['Records'][0]['s3']['object']['key']
    print(f"File uploaded: s3://{bucket}/{key}")
    
    response = glue.get_crawler(Name=CRAWLER_NAME)
    state = response['Crawler']['State']
    print(f"Crawler state: {state}")
    
    if state == 'RUNNING':
        print("Crawler already running — skipping")
        return {"status": "skipped"}
    
    glue.start_crawler(Name=CRAWLER_NAME)
    print(f"Crawler started!")
    
    return {"status": "success"}

Set timeout:

Configuration → General configuration → Edit
Timeout: 1 min 0 sec → Save


Lambda 2 — etl-job-trigger

Lambda → Create function
  • Function name: etl-job-trigger
  • Runtime: Python 3.12
  • Permissions → Use existing role → lambda-pipeline-execution-role
  • Create function
Paste code → click Deploy:

import boto3

ETL_JOB_NAME = 'etl_ecommerce'

def lambda_handler(event, context):
    glue = boto3.client('glue')
    
    print(f"Event received: {event}")
    
    state = event['detail']['state']
    crawler_name = event['detail']['crawlerName']
    print(f"Crawler: {crawler_name} | State: {state}")
    
    if state != 'Succeeded':
        print(f"Crawler state was {state} — ETL not triggered")
        return {"status": "skipped"}
    
    response = glue.start_job_run(JobName=ETL_JOB_NAME)
    print(f"ETL job started! RunId: {response['JobRunId']}")
    
    return {"status": "success", "jobRunId": response['JobRunId']}


Set timeout:

Configuration → General configuration → Edit
Timeout: 1 min 0 sec → Save




PART 5 — S3 Event Notification

S3 → e-commerce-data-kpc → Properties
Event notifications → Create event notification

| Field | Value |
---------------------------------------------------
| Event name | json-upload-trigger |
| Prefix | raw/ |
| Suffix | .json |
| Event types | All object create events |
| Destination | Lambda function |
| Lambda function | `s3-crawler-trigger` |

Save changes

This automatically adds the S3 permission to invoke Lambda 1 

PART 6 — EventBridge Rule

EventBridge → Rules → Create rule
Name: on-crawler-complete
Event bus: default
Rule type: Rule with an event pattern
Next


In event pattern section:

Event source: AWS services
AWS service: Glue
Event type: Glue Crawler State Change
Switch to "Edit pattern" and paste:


{
  "source": ["aws.glue"],
  "detail-type": ["Glue Crawler State Change"],
  "detail": {
    "crawlerName": ["ecommerce-crawler"],
    "state": ["Succeeded"]
  }
}

Next
Target type: AWS service
Select a target: Lambda function
Function: etl-job-trigger
Next → Next → Create rule

This automatically adds the EventBridge permission to invoke Lambda 2 

PART 7 — Athena Setup

Athena → Settings → Manage
Query result location: s3://e-commerce-data-kpc/athena-results/
Save


Final Checklist

IAM: lambda-pipeline-execution-role created with inline policy
IAM: glue-pipeline-service-role created with AWSGlueServiceRole + inline policy
S3: folders created (raw, etl_processed_data, athena-results, glue-scripts)
S3: ETL script uploaded to glue-scripts/
Glue: ecommerce_db database created
Glue: ecommerce-crawler created → points to raw/
Glue: etl_ecommerce job created with script
Lambda 1: s3-crawler-trigger deployed with 1 min timeout
Lambda 2: etl-job-trigger deployed with 1 min timeout
S3 Event Notification → s3-crawler-trigger (auto-adds permission)
EventBridge Rule → etl-job-trigger (auto-adds permission)
Athena: result location set

Tuesday, 16 December 2025

Spark Projects

 1. E-Commerce Product Catalog with SCD Type 2

Develop a product inventory system that tracks price changes over time using the Slowly Changing Dimension Type 2 approach.


-Ingest product data from a REST API (like Fake Store API)

-Parse JSON responses and transform data using Spark

-Handle Missing Values, Null Values, Duplicate records

-Calculate the percentage of bad records and good records

-Process JSON data with different optimization techniques (caching, partitioning, bucketing, broadcast joins)

-Implement SCD2 in Delta tables to maintain price history

-Store results in both delta and PostgreSQL

-Query historical prices and Current prices

-Structure Project into Packages- Create separate modules for extraction, transformation, and loading

-Generate Different Test case scenarios to check the validity of methods

-Use Jenkins for CI/CD

-Use Linux commands to create a shell script to execute the Spark jar file.

-----------------------------------------------------------------------------------------------------------------------

2. Weather Data Pipeline with Performance Optimization

Create a batch pipeline that fetches weather data and optimizes Spark processing.


-Pull historical weather data from OpenWeather API

-Handle Missing Values, Null Values, Duplicate records

-Load Clean data

-Process JSON data with different optimization techniques (caching, partitioning, bucketing, broadcast joins)

-Store results in PostgreSQL for reporting

-Package as: Create separate modules for extraction, transformation, and loading

-Generate Different Test case scenarios to check the validity of methods

-prepare JAR file for execution

-Use Jenkins for CI/CD

-Use Linux Commands to create a shell script to execute the Spark Jar file

---------------------------------------------------------------------------------------------------------------------------

3. User Activity Tracker with SCD Type 1

Build a batch system tracking user profile updates where only the current state matters.


1)Generate mock user activity data or pull from a user API

2)Implement SCD1 logic to overwrite old records

3)Use Spark to process updates in daily batches

4)Store the final state in both Delta and PostgreSQL


Data Modeling Concepts:


1)Slowly Changing Dimension Type 1: Overwrite strategy, no history

2)Natural Keys vs Surrogate Keys: Use email as a natural key, generate a surrogate for joins

3)Denormalization: Flatten nested JSON (address, location) for analytics

4) Check invalid, null, or missing records- perform Data Quality

5) Data Types: Proper type selection (VARCHAR, TIMESTAMP, BOOLEAN)

5) Following optimization techniques(repartitioning,coalesce)

6) Constraints: Primary keys, unique constraints, and not null constraints. Indexing Strategy: Index on frequently queried columns    (user_id, email, last_updated)


1)Generate Different Test case scenarios to check the validity of methods

2) Scala Package Structure: model, service,scd_type, main

3) Prepare JAR file for execution 

4) Use Jenkins for CI/CD

5) Use Linux commands to create a shell script to execute the Spark jar file 

---------------------------------------------------------------------------------------------------------------------------

4) Stock Market Historical Analyzer with SCD Type 3

Track limited stock history (current price, previous price, last change date) using batch loads.


1) Fetch historical stock data from Alpha Vantage API

2) Apply Schema to the data

2) Check invalid, null, or missing records- perform Data Quality

3) Implement SCD3 with current and previous value columns

4) Use Spark SQL for transformations

5) Perform Optimization using Caching and Persist

6) Create a Delta table with optimized partitioning by date

7) Scala Package Structure: API, DELTA/POSTGRE, Main Logic

8) Generate Different Test case scenarios to check the validity of methods

9)Prepare JAR file for execution

10)Use Jenkins for CI/CD

11) Use Linux Commands to create shell scripts to execute the Spark jar file

----------------------------------------------------------------------------------------------------------------------------

5) Customer Orders Data Warehouse

Build a mini data warehouse combining all three SCD types with batch processing.


1)Customer data (SCD2 - track address changes over time)

2)Product prices (SCD1 - keep only current price)

3)Payment methods (SCD3 - current and previous method)

4)Ingest from multiple JSON files via API

5)Handle Missing, Null Values as Data Quality checks

6)Implement daily incremental loads with Delta merge operations

7)Scala Package Structure:

Com.datawarehouse.orders

    ├── domain

    │   ├── customer (Customer models + SCD2)

    │   ├── product (Product models + SCD1)

    │   ├── payment (Payment models + SCD3)

    │   ├── dimension (all dimension tables)

    │   └── fact (OrderFact, InventorySnapshot)

    ├── etl

    │   ├── extract

    │   ├── transform

    │   └── load

    ├── shared (common utilities)

    └── app (main application)

7) Generate Different Test case scenarios to check the validity of methods.

8) Prepare JAR file for execution

9) Use Jenkins for CI/CD

10) Use Linux Commands to create shell scripts to executethe Sparkk jar file

----------------------------------------------------------------------------------------------------------------------------

6)Process application logs in batches with focus on Spark optimization techniques.


1)Read JSON logs from files or a REST endpoint

2)Handle the Data Quality issue of missing, null, and bad records

3)Apply various optimization techniques: predicate pushdown, column pruning, handling data skewness, if any, with the salting technique

4)Include various Spark transformations and Actions

5)Implement CDC

6)Store records in PostgreSQL

7)Execute PostgreSQL scripts, Temp View, Stored Procedure, and User 

  Defined Function.

8)Generate Different Test case scenarios to check the validity of methods

9)Scala Package Structure: API, Schema, metrics,json_parser

10)prepare JAR file for execution

11)Create a Shell Script to exeute spark JAR file using Linux Commands

12)USE Jenkins for CI/CD


8) Use Jenkins for CI/CD

Monday, 15 December 2025

SCD 2 TYPE IMPLEMENTATION WITH DIM AND FACT TABLE

 -- ===============================================

-- COMPLETE SCD TYPE 2 ETL EXAMPLE

-- ===============================================

-- This example shows the complete flow from source to fact tables

-- with SCD Type 2 dimension handling


-- ===============================================

-- STEP 1: CREATE SOURCE TABLES (Simulating OLTP)

-- ===============================================


-- Source Customer Table (Business System)

CREATE TABLE source_customer (

    customer_id INT PRIMARY KEY,

    customer_name VARCHAR(100),

    email VARCHAR(100),

    address VARCHAR(200),

    phone VARCHAR(20),

    last_modified_date DATE

);


-- Source Product Table

CREATE TABLE source_product (

    product_id INT PRIMARY KEY,

    product_name VARCHAR(100),

    category VARCHAR(50),

    price DECIMAL(10,2),

    last_modified_date DATE

);


-- Source Sales Table

CREATE TABLE source_sales (

    sale_id INT PRIMARY KEY,

    customer_id INT,

    product_id INT,

    sale_date DATE,

    quantity INT,

    amount DECIMAL(10,2)

);


-- ===============================================

-- STEP 2: CREATE STAGING TABLES

-- ===============================================


CREATE TABLE staging_customer (

    customer_id INT,

    customer_name VARCHAR(100),

    email VARCHAR(100),

    address VARCHAR(200),

    phone VARCHAR(20),

    last_modified_date DATE

);


CREATE TABLE staging_sales (

    sale_id INT,

    customer_id INT,

    product_id INT,

    sale_date DATE,

    quantity INT,

    amount DECIMAL(10,2)

);


-- ===============================================

-- STEP 3: CREATE DIMENSION TABLES (SCD TYPE 2)

-- ===============================================


CREATE TABLE dim_customer (

    customer_sk INT PRIMARY KEY AUTO_INCREMENT,  -- Surrogate Key

    customer_id INT NOT NULL,                     -- Business Key

    customer_name VARCHAR(100),

    email VARCHAR(100),

    address VARCHAR(200),

    phone VARCHAR(20),

    effective_date DATE NOT NULL,

    end_date DATE,

    is_current BOOLEAN NOT NULL DEFAULT 1,

    created_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_business_key (customer_id, is_current),

    INDEX idx_date_range (customer_id, effective_date, end_date)

);


CREATE TABLE dim_product (

    product_sk INT PRIMARY KEY AUTO_INCREMENT,

    product_id INT NOT NULL,

    product_name VARCHAR(100),

    category VARCHAR(50),

    price DECIMAL(10,2),

    effective_date DATE NOT NULL,

    end_date DATE,

    is_current BOOLEAN NOT NULL DEFAULT 1,

    created_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_business_key (product_id, is_current)

);


-- Date Dimension (Type 1 - no history needed)

CREATE TABLE dim_date (

    date_sk INT PRIMARY KEY,

    full_date DATE,

    year INT,

    quarter INT,

    month INT,

    day INT,

    day_of_week VARCHAR(20)

);


-- ===============================================

-- STEP 4: CREATE FACT TABLE

-- ===============================================


CREATE TABLE fact_sales (

    sale_id INT PRIMARY KEY,

    customer_sk INT NOT NULL,           -- Links to dimension surrogate key

    product_sk INT NOT NULL,             -- Links to dimension surrogate key

    date_sk INT NOT NULL,

    quantity INT,

    amount DECIMAL(10,2),

    load_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    FOREIGN KEY (customer_sk) REFERENCES dim_customer(customer_sk),

    FOREIGN KEY (product_sk) REFERENCES dim_product(product_sk),

    FOREIGN KEY (date_sk) REFERENCES dim_date(date_sk),

    INDEX idx_customer (customer_sk),

    INDEX idx_product (product_sk),

    INDEX idx_date (date_sk)

);


-- ===============================================

-- STEP 5: INSERT INITIAL SOURCE DATA

-- ===============================================


-- Initial Customer Data (Jan 2024)

INSERT INTO source_customer VALUES

(101, 'John Smith', 'john@email.com', '123 Oak Street, Boston', '555-0101', '2024-01-01'),

(102, 'Mary Johnson', 'mary@email.com', '456 Elm Avenue, NYC', '555-0102', '2024-01-01'),

(103, 'Bob Williams', 'bob@email.com', '789 Pine Road, Chicago', '555-0103', '2024-01-01');


-- Initial Product Data

INSERT INTO source_product VALUES

(201, 'Laptop Pro', 'Electronics', 1200.00, '2024-01-01'),

(202, 'Wireless Mouse', 'Electronics', 25.00, '2024-01-01'),

(203, 'Office Chair', 'Furniture', 350.00, '2024-01-01');


-- Initial Sales (Jan-Mar 2024)

INSERT INTO source_sales VALUES

(1001, 101, 201, '2024-01-15', 1, 1200.00),

(1002, 102, 202, '2024-02-10', 2, 50.00),

(1003, 101, 203, '2024-03-05', 1, 350.00),

(1004, 103, 201, '2024-03-20', 1, 1200.00);


-- ===============================================

-- STEP 6: INITIAL DIMENSION LOAD (FIRST RUN)

-- ===============================================


-- Load dim_customer - Initial Load

INSERT INTO dim_customer (customer_id, customer_name, email, address, phone, effective_date, end_date, is_current)

SELECT 

    customer_id,

    customer_name,

    email,

    address,

    phone,

    last_modified_date as effective_date,

    NULL as end_date,

    1 as is_current

FROM source_customer;


-- Load dim_product - Initial Load

INSERT INTO dim_product (product_id, product_name, category, price, effective_date, end_date, is_current)

SELECT 

    product_id,

    product_name,

    category,

    price,

    last_modified_date as effective_date,

    NULL as end_date,

    1 as is_current

FROM source_product;


-- Load dim_date (simplified - just the dates we need)

INSERT INTO dim_date VALUES

(20240115, '2024-01-15', 2024, 1, 1, 15, 'Monday'),

(20240210, '2024-02-10', 2024, 1, 2, 10, 'Saturday'),

(20240305, '2024-03-05', 2024, 1, 3, 5, 'Tuesday'),

(20240320, '2024-03-20', 2024, 1, 3, 20, 'Wednesday'),

(20240725, '2024-07-25', 2024, 3, 7, 25, 'Thursday'),

(20240815, '2024-08-15', 2024, 3, 8, 15, 'Thursday');


-- Insert default/unknown records

INSERT INTO dim_customer (customer_sk, customer_id, customer_name, email, address, phone, effective_date, is_current) VALUES

(-1, -1, 'Unknown', 'unknown@unknown.com', 'Unknown', 'Unknown', '1900-01-01', 1);


INSERT INTO dim_product (product_sk, product_id, product_name, category, price, effective_date, is_current) VALUES

(-1, -1, 'Unknown', 'Unknown', 0.00, '1900-01-01', 1);


-- ===============================================

-- STEP 7: INITIAL FACT TABLE LOAD

-- ===============================================


-- Load fact_sales with surrogate key lookups

INSERT INTO fact_sales (sale_id, customer_sk, product_sk, date_sk, quantity, amount)

SELECT 

    s.sale_id,

    COALESCE(dc.customer_sk, -1) as customer_sk,  -- Get surrogate key

    COALESCE(dp.product_sk, -1) as product_sk,    -- Get surrogate key

    CAST(DATE_FORMAT(s.sale_date, '%Y%m%d') AS UNSIGNED) as date_sk,

    s.quantity,

    s.amount

FROM source_sales s

LEFT JOIN dim_customer dc 

    ON s.customer_id = dc.customer_id 

    AND dc.is_current = 1                          -- Get current version

LEFT JOIN dim_product dp 

    ON s.product_id = dp.product_id 

    AND dp.is_current = 1;


-- View initial results

SELECT 'Initial Dimension Data' as Step;

SELECT * FROM dim_customer WHERE customer_id > 0;


SELECT 'Initial Fact Data with Dimension Info' as Step;

SELECT 

    f.sale_id,

    dc.customer_name,

    dc.address as customer_address,

    dp.product_name,

    f.amount,

    f.quantity

FROM fact_sales f

JOIN dim_customer dc ON f.customer_sk = dc.customer_sk

JOIN dim_product dp ON f.product_sk = dp.product_sk

ORDER BY f.sale_id;


-- ===============================================

-- STEP 8: SIMULATE CHANGES (July 2024)

-- ===============================================


-- Customer 101 moves to a new address

UPDATE source_customer 

SET address = '999 Maple Drive, Boston', 

    last_modified_date = '2024-07-01'

WHERE customer_id = 101;


-- Customer 102 changes email

UPDATE source_customer 

SET email = 'mary.johnson@newemail.com',

    last_modified_date = '2024-07-01'

WHERE customer_id = 102;


-- Product price change

UPDATE source_product 

SET price = 1150.00,

    last_modified_date = '2024-07-01'

WHERE product_id = 201;


-- New sales AFTER the changes

INSERT INTO source_sales VALUES

(1005, 101, 202, '2024-07-25', 3, 75.00),   -- John at NEW address

(1006, 102, 201, '2024-08-15', 1, 1150.00); -- Mary with NEW email, product at NEW price


-- ===============================================

-- STEP 9: SCD TYPE 2 UPDATE PROCESS

-- ===============================================


-- Extract changes to staging

TRUNCATE staging_customer;

INSERT INTO staging_customer 

SELECT * FROM source_customer;


-- Process SCD Type 2 for dim_customer

-- Step 9a: Identify changed records

CREATE TEMPORARY TABLE changed_customers AS

SELECT 

    s.customer_id,

    s.customer_name,

    s.email,

    s.address,

    s.phone,

    s.last_modified_date

FROM staging_customer s

JOIN dim_customer d 

    ON s.customer_id = d.customer_id 

    AND d.is_current = 1

WHERE 

    s.customer_name != d.customer_name 

    OR s.email != d.email 

    OR s.address != d.address 

    OR s.phone != d.phone;


-- Step 9b: Expire old records (set is_current = 0, add end_date)

UPDATE dim_customer d

JOIN changed_customers c ON d.customer_id = c.customer_id

SET 

    d.is_current = 0,

    d.end_date = DATE_SUB(c.last_modified_date, INTERVAL 1 DAY)

WHERE d.is_current = 1;


-- Step 9c: Insert new versions

INSERT INTO dim_customer (customer_id, customer_name, email, address, phone, effective_date, end_date, is_current)

SELECT 

    customer_id,

    customer_name,

    email,

    address,

    phone,

    last_modified_date as effective_date,

    NULL as end_date,

    1 as is_current

FROM changed_customers;


-- Similar process for products

CREATE TEMPORARY TABLE changed_products AS

SELECT 

    s.product_id,

    s.product_name,

    s.category,

    s.price,

    s.last_modified_date

FROM source_product s

JOIN dim_product d 

    ON s.product_id = d.product_id 

    AND d.is_current = 1

WHERE 

    s.product_name != d.product_name 

    OR s.category != d.category 

    OR s.price != d.price;


UPDATE dim_product d

JOIN changed_products c ON d.product_id = c.product_id

SET 

    d.is_current = 0,

    d.end_date = DATE_SUB(c.last_modified_date, INTERVAL 1 DAY)

WHERE d.is_current = 1;


INSERT INTO dim_product (product_id, product_name, category, price, effective_date, end_date, is_current)

SELECT 

    product_id,

    product_name,

    category,

    price,

    last_modified_date as effective_date,

    NULL as end_date,

    1 as is_current

FROM changed_products;


-- ===============================================

-- STEP 10: LOAD NEW FACT RECORDS

-- ===============================================


-- Load new sales with CURRENT surrogate keys

INSERT INTO fact_sales (sale_id, customer_sk, product_sk, date_sk, quantity, amount)

SELECT 

    s.sale_id,

    COALESCE(dc.customer_sk, -1) as customer_sk,

    COALESCE(dp.product_sk, -1) as product_sk,

    CAST(DATE_FORMAT(s.sale_date, '%Y%m%d') AS UNSIGNED) as date_sk,

    s.quantity,

    s.amount

FROM source_sales s

LEFT JOIN dim_customer dc 

    ON s.customer_id = dc.customer_id 

    AND dc.is_current = 1                    -- Gets NEW version!

LEFT JOIN dim_product dp 

    ON s.product_id = dp.product_id 

    AND dp.is_current = 1

WHERE s.sale_id NOT IN (SELECT sale_id FROM fact_sales);


-- ===============================================

-- STEP 11: VERIFICATION QUERIES

-- ===============================================


-- Show SCD Type 2 in action - Customer History

SELECT 

    'Customer 101 History - Shows Both Versions' as Description,

    customer_sk,

    customer_id,

    customer_name,

    address,

    effective_date,

    end_date,

    is_current

FROM dim_customer

WHERE customer_id = 101

ORDER BY effective_date;


-- Show Product History

SELECT 

    'Product 201 History - Price Change' as Description,

    product_sk,

    product_id,

    product_name,

    price,

    effective_date,

    end_date,

    is_current

FROM dim_product

WHERE product_id = 201

ORDER BY effective_date;


-- Show ALL sales with historical accuracy

SELECT 

    'All Sales with Historical Context' as Description,

    f.sale_id,

    f.date_sk as sale_date,

    dc.customer_name,

    dc.address as customer_address_at_sale_time,

    dc.effective_date as customer_version_from,

    dc.end_date as customer_version_to,

    dp.product_name,

    dp.price as product_price_at_sale_time,

    f.quantity,

    f.amount

FROM fact_sales f

JOIN dim_customer dc ON f.customer_sk = dc.customer_sk

JOIN dim_product dp ON f.product_sk = dp.product_sk

ORDER BY f.sale_id;


-- Show customer's sales across address changes

SELECT 

    'Customer 101 Sales - Different Addresses at Different Times' as Description,

    f.sale_id,

    f.date_sk as sale_date,

    dc.address as address_at_time_of_sale,

    dc.is_current as is_current_address,

    dp.product_name,

    f.amount

FROM fact_sales f

JOIN dim_customer dc ON f.customer_sk = dc.customer_sk

JOIN dim_product dp ON f.product_sk = dp.product_sk

WHERE dc.customer_id = 101

ORDER BY f.sale_id;


-- Analytical Query: Sales by Customer (all versions aggregated by business key)

SELECT 

    'Sales Summary by Customer (All Versions)' as Description,

    dc.customer_id,

    MAX(CASE WHEN dc.is_current = 1 THEN dc.customer_name END) as current_name,

    COUNT(DISTINCT f.sale_id) as total_sales_count,

    SUM(f.amount) as total_sales_amount

FROM fact_sales f

JOIN dim_customer dc ON f.customer_sk = dc.customer_sk

GROUP BY dc.customer_id

ORDER BY total_sales_amount DESC;


-- Show current view (most common query pattern)

SELECT 

    'Current Snapshot - Latest Customer Info with All Sales' as Description,

    curr_dc.customer_name,

    curr_dc.address as current_address,

    COUNT(f.sale_id) as total_sales,

    SUM(f.amount) as total_amount

FROM fact_sales f

JOIN dim_customer dc ON f.customer_sk = dc.customer_sk  -- Historical join

JOIN dim_customer curr_dc 

    ON dc.customer_id = curr_dc.customer_id 

    AND curr_dc.is_current = 1                          -- Current info

GROUP BY curr_dc.customer_sk, curr_dc.customer_name, curr_dc.address;


-- ===============================================

-- CLEANUP TEMPORARY TABLES

-- ===============================================

DROP TEMPORARY TABLE IF EXISTS changed_customers;

DROP TEMPORARY TABLE IF EXISTS changed_products;