SPARK CDC Code
// Spark Code
import org.apache.spark.sql.{SparkSession,functions => F}
import io.delta.tables._
object Main {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("CDC Example")
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
import spark.implicits._
val deltaTablePath = "/home/kaustubh/Documents/"
val initialDF = spark.read.option("header","true").csv("/home/kaustubh/Documents/kaustubh/initialLoad/initial.csv")
initialDF.write
.format("delta")
.mode("overwrite")
.save(deltaTablePath)
// Incremental Load
val incrementalLoad = spark.read
.option("header","true")
.csv("/home/kaustubh/Documents/kaustubh/incrementalLoad/incremental.csv")
// Create a Deltatable reference for merge
val deltaTable = DeltaTable.forPath(spark,deltaTablePath)
deltaTable.as("t")
.merge(
incrementalLoad.as("s"),
"t.id = s.id"
)
// UPDATE Existing Row
.whenMatched(F.expr("s.op='U'"))
.updateExpr(Map(
"name" -> "s.name",
"age" -> "s.age",
"email" -> "s.email",
"country" -> "s.country"
))
// DELETE Row
.whenMatched(F.expr("s.op='D'"))
.delete()
// INSERT Row
.whenNotMatched(F.expr("s.op='I'"))
.insertExpr(Map(
"id" -> "s.id",
"name" -> "s.name",
"age" -> "s.age",
"email" -> "s.email",
"country" -> "s.country"
))
.execute()
println("CDC Successful")
val finalTable = spark
.read.format("delta")
.option("versionAsOf",0)
.load("/home/kaustubh/Documents/")
finalTable.show()
spark.stop()
}
}
// plugins.sbt file
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.3")
// sbt file
import sbtassembly.MergeStrategy
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.12.17"
lazy val root = (project in file("."))
.settings(
name := "untitled"
)
libraryDependencies ++= Seq(
// "org.json4s" %% "json4s-native" % "4.0.7",
"com.typesafe" % "config" % "1.4.3",
"org.scalatest" %% "scalatest" % "3.2.17" % Test,
"com.oracle.database.jdbc" % "ojdbc8" % "21.1.0.0",
//"org.apache.spark" %% "spark-core" % "3.5.1",
// "org.apache.spark" %% "spark-sql" % "3.5.1"
"org.apache.spark" %% "spark-core" % "3.5.0",
"org.apache.spark" %% "spark-sql" % "3.5.0",
"io.delta" %% "delta-spark" % "3.2.0"
)
ThisBuild / assemblyMergeStrategy := {
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
No comments:
Post a Comment