Project Aim: Building an automated data ingestion pipeline from OLTP to OLAP system
Project Description: A Financial Bank has issued cards to their customers; everyday card transaction take place and through the issued cards which is handled by Online Transaction Processing System by SQL Database server database in background, and some records also arrives in form of Json file format directly HDFS, The Data is collected from these resources and stored in OLAP i.e., Hive Datawarehouse for further Analysis.
Tools used: 1) MySQL,
2) Apache Sqoop,
3) Apache Hive,
4) Apache Airflow
Project Code:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow import settings
from airflow.models import Connection
dag = DAG(
dag_id = 'simple_pipeline',
schedule_interval = '@daily',
start_date = days_ago(1)
)
//[command to create a folder inside a local system]
create_folder_command = f'rm -rf project && mkdir -p project'
//[command to create a folder inside a HDFS system]
create_folder_inside_hadoop_command = f'hdfs dfs -rm -R -f real_project && hdfs dfs -mkdir -p real_project'
//[Command to copy card transaction data in local project folder]
copy_to_folder_command = f'cp -f card_transactions-221229-131756.csv project'
//[Command to copy card transaction data from Project folder to the folder in HDFS file system]
copy_file_to_hadoop_folder = f'hdfs dfs -put project/card_transactions-221229-131756.csv real_project'
//[creating command in airflow to execute the sqoop export shell script to export card_transaction file from hdfs to SQL table]
export_sqoop_command = f'./sqoop_export.sh ms.itversity.com:3306 retail_export retail_user card_transaction_table staging_card_transaction_table'
//[creating command in airflow to execute the sqoop eval shell script to delete existing records]
truncate_sqoop_command = f'./sqoop_eval_trunc.sh ms.itversity.com:3306 retail_export retail_user card_transaction_table'
truncate_table = SSHOperator(
task_id = 'truncate_data',
ssh_conn_id = 'itversity',
command = truncate_sqoop_command,
dag=dag
)
export_data_file = SSHOperator(
task_id = 'export_data',
ssh_conn_id = 'itversity',
command = export_sqoop_command,
dag=dag
)
create_folder_hadoop = SSHOperator(
task_id = 'create_folder_hadoop',
ssh_conn_id = 'itversity',
command = create_folder_inside_hadoop_command,
dag=dag
)
download_to_edgenode1 = SSHOperator(
task_id = 'create_folder',
ssh_conn_id = 'itversity',
command = create_folder_command,
dag=dag
)
copy_to_folder = SSHOperator(
task_id = 'copy_file',
ssh_conn_id = 'itversity',
command = copy_to_folder_command,
dag=dag
)
dummy1 = DummyOperator(
task_id = 'dummy1',
dag=dag
)
download_to_edgenode1 >> create_folder_hadoop >> copy_to_folder >> dummy1
//[creating shell script for sqoop eval command to delete existing records command in vi editor]
DBSERVER=${1}
DBNAME=${2}
DBUSER=${3}
DBTABLE=${4}
sqoop eval \
-Dhadoop.security.credential.provider.path=jecks://hdfs/user/itv005018/mysql.dbpassword.jceks \
--connect jdbc:mysql://${DBSERVER}/${DBNAME} \
--username ${DBUSER} \
--password-alias mysql.banking.password \
--query "DELETE FROM ${DBTABLE}"
//[creating shell script for sqoop export command in vi editor]
[itv005018@g01 ~]$ vi sqoop_export.sh
DBSERVER=${1}
DBNAME=${2}
DBUSER=${3}
DBTABLE=${4}
DBTABLE2=${5}
sqoop export \
-Dhadoop.security.credential.provider.path=jceks://hdfs/user/itv005018/mysql.dbpassword.jceks \
--connect jdbc:mysql://${DBSERVER}/${DBNAME} \
--username ${DBUSER}
--password-alias mysql.banking.password \
--table ${DBTABLE} \
--staging-table ${DBTABLE2} \
--export-dir real_project/card_transactions-221229-131756.csv \
--fields-terminated-by ','
//[Executed Airflow DAG for above commands]
mysql> create table card_transaction
-> (card_id bigint not null,
-> member_id bigint not null,
-> amount int not null,
-> postcode int not null,
-> post_id bigint not null,
-> transaction_dt varchar(255) not null,
-> txn_category varchar(50)
-> );
mysql> LOAD DATA INFILE '/home/cloudera/Desktop/shared/card_transactions-221026-145049.csv'
-> INTO TABLE card_transaction
-> FIELDS TERMINATED BY ','
-> ENCLOSED BY '"'
-> LINES TERMINATED BY '\n';
mysql> create table card_member
-> (card_id bigint not null,
-> member_id bigint not null,
-> member_joining_dt varchar(255) not null,
-> card_purchase_dt varchar(255) not null,
-> country varchar(50),
-> city varchar(50),
-> zone varchar(50)
-> PRIMARY KEY (member_id)
-> );
mysql> LOAD DATA INFILE '/home/cloudera/Desktop/shared/cardmembers-210304-135700.csv'
-> INTO TABLE card_member
-> FIELDS TERMINATED BY ','
-> ENCLOSED BY '"'
-> LINES TERMINATED BY '\n';
mysql> create table member_score
-> (member_id bigint not null,
-> score int,
-> score_category varchar(255),
-> PRIMARY KEY (member_id)
-> );
mysql> LOAD DATA INFILE '/home/cloudera/Desktop/shared/memberscore1-210824-132632.csv'
-> INTO TABLE member_score
-> FIELDS TERMINATED BY ','
-> ENCLOSED BY '"'
-> LINES TERMINATED BY '\n';
---------------------------------------------------------------------------------------------------------------------
[cloudera@quickstart ~]$ hadoop credential create mysql.banking.password -provider jceks://hdfs/user/cloudera/mysql.password.jceks
sqoop job \
> -Dhadoop.security.credential.provider.path=jceks://hdfs/user/cloudera/mysql.password.jceks \
> --create job_members_1 \
> -- import \
> --connect jdbc:mysql://quickstart.cloudera:3306/bank \
> --username root \
> --password-alias mysql.banking.password \
> --table card_member \
> --warehouse-dir /member_data \
> --incremental append \
> --check-column member_id \
> --last-value 0 \
> --compress
sqoop job --exec job_members_1
hadoop fs -ls /member_data/card_member
Found 4 items
-rw-r--r-- 1 cloudera cloudera 5608 2022-12-03 23:46 /member_data/card_member/part-m-00000.gz
-rw-r--r-- 1 cloudera cloudera 5978 2022-12-03 23:46 /member_data/card_member/part-m-00001.gz
-rw-r--r-- 1 cloudera cloudera 5842 2022-12-03 23:46 /member_data/card_member/part-m-00002.gz
-rw-r--r-- 1 cloudera cloudera 5647 2022-12-03 23:46 /member_data/card_member/part-m-00003.gz
create table card_member (
> card_id bigint,
> member_id bigint,
> member_joining_dt string,
> card_purchase_dt string,
> country string,
> city string,
> zone string)
> row format delimited fields terminated by ',';
hive> load data inpath '/member_data/card_member/*' into table card_member;
hive> select * from card_member limit 10;
OK
348702000000000 37495066290 38:43.0 Feb-18 United States Avon Center North
5189560000000000 117826000000 41:33.0 Apr-15 United States Medina East
5407070000000000 1147920000000 13:06.0 Jun-15 United States Watertown East
378304000000000 1314070000000 01:08.0 Jan-17 United States East Lake-Orient Park South
348413000000000 1739550000000 11:19.0 Jul-17 United States East Independence North
348537000000000 3761430000000 17:03.0 Nov-15 United States Dover North
5515990000000000 4494070000000 21:12.0 Jul-13 United States Estero East
5400250000000000 6836120000000 03:30.0 Sep-17 United States Indian Trail East
4573340000000000 6991870000000 27:14.0 Feb-14 United States Rio Rico South
4708910000000000 7955570000000 24:04.0 Jan-18 United States Fond du Lac South
hive> SET hive.exec.dynamic.partition=true;
hive> SET hive.exec.dynamic.partition.mode = nonstrict;
hive> SET hive.enforce.bucketing=true;
hive> create external table ext_card_member (
> card_id bigint,
> member_id bigint,
> member_joining_dt string,
> card_purchase_dt string,
> country string,
> city string)
> partitioned by (zone string)
> clustered by (member_id) into 4 buckets
> stored as orc;
hive> insert into table ext_card_member partition (zone)
> select card_id,member_id,member_joining_dt,card_purchase_dt,country,city,zone from card_member;
show partitions ext_card_member;
hive> show partitions ext_card_member;
OK
zone=East
zone=North
zone=South
zone=West
SELECT * FROM ext_card_member TABLESAMPLE(bucket 1 out of 4);
[cloudera@quickstart ~]$ hadoop fs -ls /user/hive/warehouse/ext_card_member/
Found 4 items
drwxrwxrwx - cloudera supergroup 0 2022-12-04 02:32 /user/hive/warehouse/ext_card_member/zone=East
drwxrwxrwx - cloudera supergroup 0 2022-12-04 02:32 /user/hive/warehouse/ext_card_member/zone=North
drwxrwxrwx - cloudera supergroup 0 2022-12-04 02:32 /user/hive/warehouse/ext_card_member/zone=South
drwxrwxrwx - cloudera supergroup 0 2022-12-04 02:32 /user/hive/warehouse/ext_card_member/zone=West
sqoop job \
> --create job_members \
> -- import \
> --connect jdbc:mysql://quickstart.cloudera:3306/bank \
> --username root \
> --password cloudera \
> -Dhadoop.security.credential.provider.path=jceks://hdfs/user/cloudera/mysql.password.jceks \
> --table card_transaction \
> --warehouse-dir /member_data \
> --split-by card_id \
> --incremental append \
> --check-column card_id \
> --last-value 0 \
> --compress
sqoop job --exec job_members
hadoop fs -ls /member_data/card_transaction
Found 4 items
-rw-r--r-- 1 cloudera cloudera 305821 2022-11-26 06:48 /member_data/card_transaction/part-m-00000.gz
-rw-r--r-- 1 cloudera cloudera 20 2022-11-26 06:48 /member_data/card_transaction/part-m-00001.gz
-rw-r--r-- 1 cloudera cloudera 260541 2022-11-26 06:48 /member_data/card_transaction/part-m-00002.gz
-rw-r--r-- 1 cloudera cloudera 517629 2022-11-26 06:48 /member_data/card_transaction/part-m-00003.gz
hive> create table card_transaction (
> card_id bigint,
> member_id bigint,
> amount int,
> postcode int,
> post_id bigint,
> transaction_dt string,
> txn_category string)
> row format delimited fields terminated by ',';
hive> load data inpath '/member_data/card_transaction/*' into table card_transaction;
hive> select * from card_transaction limit 10;
OK
348702330256514 37495066290 9084849 33946 614677375609919 11-02-2018 00:00 GENUINE
348702330256514 37495066290 330148 33946 614677375609919 11-02-2018 00:00 GENUINE
348702330256514 37495066290 136052 33946 614677375609919 11-02-2018 00:00 GENUINE
348702330256514 37495066290 4310362 33946 614677375609919 11-02-2018 00:00 GENUINE
348702330256514 37495066290 9097094 33946 614677375609919 11-02-2018 00:00 GENUINE
348702330256514 37495066290 2291118 33946 614677375609919 11-02-2018 00:00 GENUINE
348702330256514 37495066290 4900011 33946 614677375609919 11-02-2018 00:00 GENUINE
348702330256514 37495066290 633447 33946 614677375609919 11-02-2018 00:00 GENUINE
348702330256514 37495066290 6259303 33946 614677375609919 11-02-2018 00:00 GENUINE
348702330256514 37495066290 369067 33946 614677375609919 11-02-2018 00:00 GENUINE
hive> SET hive.exec.dynamic.partition=true;
hive> SET hive.exec.dynamic.partition.mode = nonstrict;
hive> SET hive.enforce.bucketing=true;
hive> create external table ext_card_transaction (
> card_id bigint,
> member_id bigint,
> amount int,
> postcode int,
> post_id bigint,
> transaction_dt string)
> partitioned by (txn_category string)
> clustered by (card_id) into 4 buckets
> stored as orc;
hive> insert into table ext_card_transaction partition (txn_category)
> select card_id,member_id,amount,postcode,post_id,transaction_dt,txn_category from
card_transaction;
hive> show partitions ext_card_transaction;
OK
txn_category=FRAUD
txn_category=GENUINE
SELECT * FROM ext_card_transaction TABLESAMPLE(bucket 1 out of 4);
[cloudera@quickstart ~]$ hadoop fs -ls /user/hive/warehouse/ext_card_transaction/
Found 2 items
drwxrwxrwx - cloudera supergroup 0 2022-11-28 19:06 /user/hive/warehouse/ext_card_transaction/txn_category=FRAUD
drwxrwxrwx - cloudera supergroup 0 2022-11-28 19:06 /user/hive/warehouse/ext_card_transaction/txn_category=GENUINE
hadoop fs -cat /user/hive/warehouse/ext_card_transaction/txn_category=FRAUD/*