In this post, we will see discuss about the code flow on this above project with multiple phases.
---------------------------------------------------------------------------
Presented By Dineshkumar Selvaraj
Those are :
- Kafka Streaming
- Table creation on mySQL Table
- Spark structured streaming (Python)
- SQOOP Incremental job
- Hive Tables
- Spark - Hive Aggregation
- Airflow scheduler
- Grafana
1. Kafka Streaming :-
Here we used Apache Kafka 2.3.2 version, So please follow the below steps to kick start your kafka cluster and create the topic accordingly
- run Zookeeper
- zkServer.sh start
- start kafka server
- ./bin/kafka-server-start.sh ./config/server.properties --override delete.topic.enable=true
- creating kafka topics
- ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kfktpkOne
2. mySQL Queries :-
>>use test1;
>>create table weblogdetails (id int NOT NULL AUTO_INCREMENT,datevalue timestamp, ipaddress varchar(150),host varchar(150),url varchar(150),responsecode integer,PRIMARY KEY (id));
>>INSERT INTO weblogdetails (datevalue,ipaddress,host,url,responsecode) VALUES ('2019-10-28 12:59:06','10.128.2.1','nhmasternode','GET /css/main.css HTTP/1.1','200');
>>create table weblogdetails (id int NOT NULL AUTO_INCREMENT,datevalue timestamp, ipaddress varchar(150),host varchar(150),url varchar(150),responsecode integer,PRIMARY KEY (id));
>>INSERT INTO weblogdetails (datevalue,ipaddress,host,url,responsecode) VALUES ('2019-10-28 12:59:06','10.128.2.1','nhmasternode','GET /css/main.css HTTP/1.1','200');
3. PYSPARK Streaming:-
from test.dbLoad import ConnectDatabase
import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from os import *
spark = SparkSession.builder.appName("Kafka_Spark").getOrCreate() # Spark 2.x
spark.sparkContext.setLogLevel("ERROR")
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "kfktpkOne").load()
df1 = df.selectExpr("CAST(value AS STRING)")
df2 = df1.withColumn("value_split", split(col("value"), ",")) \
.withColumn("datevalue", to_timestamp(col("value_split").getItem(0), 'yyyy/MM/dd HH:mm:ss'))\
.withColumn("ipaddress", col("value_split").getItem(1)) \
.withColumn("Host", col("value_split").getItem(2)) \
.withColumn("ReqURL", col("value_split").getItem(3)) \
.withColumn("ResponseCode", col("value_split").getItem(4).cast("Integer")) \
.drop("value_split","value")
df2.printSchema()
df2.writeStream.format("console").option("truncate", "false").outputMode("append").start()
df2.writeStream.outputMode("update").foreach(ConnectDatabase()).start().awaitTermination()
4. SQOOP Command:-
sqoop job --create Sqoop_weblogdetails_test1 -- import \
--connect jdbc:mysql://localhost:3306/test1 \
--username root \
--password-file file:///home/bigdatapedia/00HADOOP/00EcoSystem/sqoopmysql.password \
--table weblogdetails \
--target-dir /airflowproject/Sqoop_weblogdetails_test1 \
--incremental append \
--check-column id \
--last-value 0 \
-m1 \
--direct
sqoop job --create Sqoop_weblogdetails_test1 -- import --connect jdbc:mysql://localhost:3306/test --username root --password-file file:///home/bigdatapedia/00HADOOP/00EcoSystem/sqoopmysql.password --table weblogdetails --target-dir /airflowproject/Sqoop_weblogdetails_test1 --incremental append --check-column id --last-value 0 -m1 --direct
sqoop job --exec Sqoop_weblogdetails_test1
5. Hive Command:-
use test1;
create external table weblog_external (id int, datevalue string,ipaddress string, host string, url string, responsecode int) row format delimited fields terminated by ',' stored as textfile location '/airflowproject/Sqoop_weblogdetails_test1';
select * from weblog_external limit 5;
select count(*) from weblog_external;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=1000;
create table weblog_Dynamicpart_sqoop_internal (id int, datevalue string, ipaddress string, url string, responsecode int) partitioned by (host string) row format delimited fields terminated by ',' stored as textfile;
insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);
insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);
6. PYSPARK Aggregation:-
import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from os import *
spark = SparkSession.builder.appName("Hive_Spark").master("local[*]").enableHiveSupport().getOrCreate() # Spark 2.x
spark.sparkContext.setLogLevel("ERROR")
spark.sql("use test1")
df = spark.sql("select * from weblog_Dynamicpart_sqoop_internal order by id")
df.show()
df1=df.groupBy(['host','ipaddress']).agg({'responsecode':'sum','url':'count'})\
.withColumnRenamed("sum(responsecode)","Total_ResponseCode")\
.withColumnRenamed("count(url)","Total_URL")
df1.show()
sqlproperties = {"user": "root", "password": "root", "driver": "com.mysql.cj.jdbc.Driver"}
print("\n", "Mysql Ingestion started", "\n")
df1.write.jdbc(url="jdbc:mysql://localhost:3306/test1", table="hive_agg", mode="overwrite", properties=sqlproperties)
7. Airflow DAG:-
import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.hive_operator import HiveOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.operators.dummy_operator import DummyOperator
DAG_NAME = 'test_project_airflow_9'
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(1),
}
dag_prjt_main = DAG(
dag_id=DAG_NAME,
default_args=args,
schedule_interval='* * * * *' #"@once"
)
SQOOP_Task1 = BashOperator(task_id="Sqoop_Incremental",
bash_command='sqoop job --exec testweblog_ws1', dag=dag_prjt_main)
my_query = """
USE {{ params.db }};
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=1000;
insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);
"""
hive_Task2 = HiveOperator(
task_id= "Hive_Seeding",
hive_cli_conn_id='hive_local',
hql = my_query,
params={'db': 'test'},
dag=dag_prjt_main)
spark_submit_Task3 = SparkSubmitOperator(
task_id="sparksubmit_Aggregate",
application='/home/bigdatapedia/PycharmProjects/Airflow_Demo/test/testspark_hiveagg.py',
conn_id='spark_local', dag= dag_prjt_main)
SQOOP_Task1 >> hive_Task2 >> spark_submit_Task3
if __name__ == '__main__':
dag_prjt_main.cli()