Monday, 8 December 2025

Spark Project

SPARK CDC Code


// Spark Code

import org.apache.spark.sql.{SparkSession,functions => F}

import io.delta.tables._

object Main {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()

      .appName("CDC Example")

      .master("local[*]")

      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")

      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

      .getOrCreate()

    import spark.implicits._

    val deltaTablePath = "/home/kaustubh/Documents/"

    val initialDF = spark.read.option("header","true").csv("/home/kaustubh/Documents/kaustubh/initialLoad/initial.csv")

    initialDF.write

      .format("delta")

      .mode("overwrite")

      .save(deltaTablePath)

    // Incremental Load

    val incrementalLoad = spark.read

      .option("header","true")

      .csv("/home/kaustubh/Documents/kaustubh/incrementalLoad/incremental.csv")

    // Create a Deltatable reference for merge

    val deltaTable = DeltaTable.forPath(spark,deltaTablePath)

    deltaTable.as("t")

      .merge(

        incrementalLoad.as("s"),

        "t.id = s.id"

      )

    // UPDATE Existing Row

    .whenMatched(F.expr("s.op='U'"))

      .updateExpr(Map(

        "name" -> "s.name",

        "age" -> "s.age",

        "email" -> "s.email",

         "country" -> "s.country"

      ))

    // DELETE Row

    .whenMatched(F.expr("s.op='D'"))

      .delete()

    // INSERT Row

    .whenNotMatched(F.expr("s.op='I'"))

      .insertExpr(Map(

        "id" -> "s.id",

        "name" -> "s.name",

        "age" -> "s.age",

        "email" -> "s.email",

        "country" -> "s.country"

      ))

      .execute()

    println("CDC Successful")

    val finalTable = spark

      .read.format("delta")

      .option("versionAsOf",0)

      .load("/home/kaustubh/Documents/")

    finalTable.show()

   spark.stop()

  }

}


// plugins.sbt file

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.3")


// sbt file

import sbtassembly.MergeStrategy

ThisBuild / version := "0.1.0-SNAPSHOT"


ThisBuild / scalaVersion := "2.12.17"


lazy val root = (project in file("."))

  .settings(

    name := "untitled"

  )

libraryDependencies ++= Seq(

 // "org.json4s" %% "json4s-native" % "4.0.7",

  "com.typesafe" % "config" % "1.4.3",

  "org.scalatest" %% "scalatest" % "3.2.17" % Test,

  "com.oracle.database.jdbc" % "ojdbc8" % "21.1.0.0",

  //"org.apache.spark" %% "spark-core" % "3.5.1",

  // "org.apache.spark" %% "spark-sql" % "3.5.1"

  "org.apache.spark" %% "spark-core" % "3.5.0",

  "org.apache.spark" %% "spark-sql" % "3.5.0",

  "io.delta" %% "delta-spark" % "3.2.0"

)

ThisBuild / assemblyMergeStrategy := {

  case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat

  case PathList("META-INF", xs @ _*) => MergeStrategy.discard

  case x => MergeStrategy.first

}


No comments:

Post a Comment