import org.apache.spark.sql.{SaveMode, SparkSession, functions => F}
import io.delta.tables._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.LongType
import org.apache.log4j.{Level, Logger}
case class Customer(customer_id:Int,name:String,email:String,city:String,country:String)
object Main {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("CDC Example")
.master("local[*]")
.config("spark.sql.warehouse.dir", "/home/kaustubh/Documents/")
.config("spark.sql.catalogImplementation", "hive")
.enableHiveSupport()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.adaptive.enabled","true")
.getOrCreate()
Logger.getLogger("akka").setLevel(Level.ERROR)
import spark.implicits._
// spark.sql("CREATE DATABASE IF NOT EXISTS sales_db")
spark.sql("USE sales_db")
spark.sql("DROP TABLE IF EXISTS customers");
spark.sql("DROP TABLE IF EXISTS customers_managed");
println("customers table deleted successfully")
// val Customers = Seq(
// Customer(1, "John Doe", "john@email.com", "New York", "USA"),
// Customer(2, "Jane Smith", "jane@email.com", "London", "UK"),
// Customer(3, "Bob Johnson", "bob@email.com", "Toronto", "Canada")
// ).toDF()
// Customers.write.mode(SaveMode.Overwrite).saveAsTable("sales_db.customer_data")
// spark.sql("SELECT * FROM sales_db.customer_data").show()
spark.sql(
"""
|CREATE TABLE IF NOT EXISTS customers_managed(
|customer_id INT,
|name STRING,
|email STRING,
|city STRING,
|country STRING)
|STORED AS ORC
|""".stripMargin)
spark.sql(
"""
|CREATE TABLE IF NOT EXISTS customers(
|customer_id INT,
|name STRING,
|email STRING,
|city STRING,
|country STRING
|) ROW FORMAT DELIMITED
|FIELDS TERMINATED BY ','
|STORED AS TEXTFILE
|LOCATION '/home/kaustubh/Desktop/'
|TBLPROPERTIES ("skip.header.line.count"="1");
|""".stripMargin)
val df = spark.read
.option("header","true")
.option("inferSchema","false")
.csv("/home/kaustubh/Downloads/customers.csv")
df.write
.mode("overwrite")
.saveAsTable("customers")
spark.sql("SELECT * FROM customers").show()
spark.sql(
"""
|INSERT INTO TABLE customers_managed
|SELECT
|CAST(customer_id AS INT),
|name STRING,
|email STRING,
|city STRING,
|country STRING
|FROM customers
|""".stripMargin)
spark.sql("SELECT * FROM customers_managed").show()
spark.sql(
"""
|CREATE TABLE IF NOT EXISTS orders(
|order_id INT,
|customer_id INT,
|order_date DATE,
|amount DECIMAL(10,2),
|status STRING)
|ROW FORMAT DELIMITED
|FIELDS TERMINATED BY ','
|STORED AS TEXTFILE
|LOCATION '/home/kaustubh/Desktop/'
|TBLPROPERTIES ("skip.header.line.count"="1");
|""".stripMargin)
spark.sql(
"""
|CREATE TABLE IF NOT EXISTS orders_managed(
|order_id INT,
|customer_id INT,
|order_date DATE,
|amount DECIMAL(10,2),
|status STRING)
|STORED AS ORC
|""".stripMargin
)
val df_orders = spark.read
.option("header","true")
.option("inferSchema","false")
.csv("/home/kaustubh/Downloads/orders.csv")
df_orders.write
.mode("overwrite")
.saveAsTable("orders")
spark.sql("SELECT * FROM orders").show()
spark.sql(
"""
|INSERT INTO TABLE orders_managed
|SELECT
|order_id, customer_id,order_date,amount,status
|FROM orders
|""".stripMargin)
spark.sql("SELECT * FROM orders_managed").show()
// System.in.read()
spark.stop()
}
}
Wednesday, 10 December 2025
Scala Spark Hive
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment