In this post, we will see discuss about the code flow on this above project with multiple phases.
---------------------------------------------------------------------------
Presented By Dineshkumar Selvaraj
Those are :
- Kafka Streaming
- Table creation on mySQL Table
- Spark structured streaming (Python)
- SQOOP Incremental job
- Hive Tables
- Spark - Hive Aggregation
- Airflow scheduler
- Grafana
1. Kafka Streaming :-
Here we used Apache Kafka 2.3.2 version, So please follow the below steps to kick start your kafka cluster and create the topic accordingly
- run Zookeeper
- zkServer.sh start
- start kafka server
- ./bin/kafka-server-start.sh ./config/server.properties --override delete.topic.enable=true
- creating kafka topics
- ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kfktpkOne
2. mySQL Queries :-
>>use test1;
>>create table weblogdetails (id int NOT NULL AUTO_INCREMENT,datevalue timestamp, ipaddress varchar(150),host varchar(150),url varchar(150),responsecode integer,PRIMARY KEY (id));
>>INSERT INTO weblogdetails (datevalue,ipaddress,host,url,responsecode) VALUES ('2019-10-28 12:59:06','10.128.2.1','nhmasternode','GET /css/main.css HTTP/1.1','200');
>>create table weblogdetails (id int NOT NULL AUTO_INCREMENT,datevalue timestamp, ipaddress varchar(150),host varchar(150),url varchar(150),responsecode integer,PRIMARY KEY (id));
>>INSERT INTO weblogdetails (datevalue,ipaddress,host,url,responsecode) VALUES ('2019-10-28 12:59:06','10.128.2.1','nhmasternode','GET /css/main.css HTTP/1.1','200');
3. PYSPARK Streaming:-
from test.dbLoad import ConnectDatabase
import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from os import *
spark = SparkSession.builder.appName("Kafka_Spark").getOrCreate() # Spark 2.x
spark.sparkContext.setLogLevel("ERROR")
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "kfktpkOne").load()
df1 = df.selectExpr("CAST(value AS STRING)")
df2 = df1.withColumn("value_split", split(col("value"), ",")) \
.withColumn("datevalue", to_timestamp(col("value_split").getItem(0), 'yyyy/MM/dd HH:mm:ss'))\
.withColumn("ipaddress", col("value_split").getItem(1)) \
.withColumn("Host", col("value_split").getItem(2)) \
.withColumn("ReqURL", col("value_split").getItem(3)) \
.withColumn("ResponseCode", col("value_split").getItem(4).cast("Integer")) \
.drop("value_split","value")
df2.printSchema()
df2.writeStream.format("console").option("truncate", "false").outputMode("append").start()
df2.writeStream.outputMode("update").foreach(ConnectDatabase()).start().awaitTermination()
4. SQOOP Command:-
sqoop job --create Sqoop_weblogdetails_test1 -- import \
--connect jdbc:mysql://localhost:3306/test1 \
--username root \
--password-file file:///home/bigdatapedia/00HADOOP/00EcoSystem/sqoopmysql.password \
--table weblogdetails \
--target-dir /airflowproject/Sqoop_weblogdetails_test1 \
--incremental append \
--check-column id \
--last-value 0 \
-m1 \
--direct
sqoop job --create Sqoop_weblogdetails_test1 -- import --connect jdbc:mysql://localhost:3306/test --username root --password-file file:///home/bigdatapedia/00HADOOP/00EcoSystem/sqoopmysql.password --table weblogdetails --target-dir /airflowproject/Sqoop_weblogdetails_test1 --incremental append --check-column id --last-value 0 -m1 --direct
sqoop job --exec Sqoop_weblogdetails_test1
5. Hive Command:-
use test1;
create external table weblog_external (id int, datevalue string,ipaddress string, host string, url string, responsecode int) row format delimited fields terminated by ',' stored as textfile location '/airflowproject/Sqoop_weblogdetails_test1';
select * from weblog_external limit 5;
select count(*) from weblog_external;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=1000;
create table weblog_Dynamicpart_sqoop_internal (id int, datevalue string, ipaddress string, url string, responsecode int) partitioned by (host string) row format delimited fields terminated by ',' stored as textfile;
insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);
insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);
6. PYSPARK Aggregation:-
import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from os import *
spark = SparkSession.builder.appName("Hive_Spark").master("local[*]").enableHiveSupport().getOrCreate() # Spark 2.x
spark.sparkContext.setLogLevel("ERROR")
spark.sql("use test1")
df = spark.sql("select * from weblog_Dynamicpart_sqoop_internal order by id")
df.show()
df1=df.groupBy(['host','ipaddress']).agg({'responsecode':'sum','url':'count'})\
.withColumnRenamed("sum(responsecode)","Total_ResponseCode")\
.withColumnRenamed("count(url)","Total_URL")
df1.show()
sqlproperties = {"user": "root", "password": "root", "driver": "com.mysql.cj.jdbc.Driver"}
print("\n", "Mysql Ingestion started", "\n")
df1.write.jdbc(url="jdbc:mysql://localhost:3306/test1", table="hive_agg", mode="overwrite", properties=sqlproperties)
7. Airflow DAG:-
import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.hive_operator import HiveOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.operators.dummy_operator import DummyOperator
DAG_NAME = 'test_project_airflow_9'
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(1),
}
dag_prjt_main = DAG(
dag_id=DAG_NAME,
default_args=args,
schedule_interval='* * * * *' #"@once"
)
SQOOP_Task1 = BashOperator(task_id="Sqoop_Incremental",
bash_command='sqoop job --exec testweblog_ws1', dag=dag_prjt_main)
my_query = """
USE {{ params.db }};
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=1000;
insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);
"""
hive_Task2 = HiveOperator(
task_id= "Hive_Seeding",
hive_cli_conn_id='hive_local',
hql = my_query,
params={'db': 'test'},
dag=dag_prjt_main)
spark_submit_Task3 = SparkSubmitOperator(
task_id="sparksubmit_Aggregate",
application='/home/bigdatapedia/PycharmProjects/Airflow_Demo/test/testspark_hiveagg.py',
conn_id='spark_local', dag= dag_prjt_main)
SQOOP_Task1 >> hive_Task2 >> spark_submit_Task3
if __name__ == '__main__':
dag_prjt_main.cli()
Nice information, thanks for sharing this blog.
ReplyDeleteSpark Scala Training
good information thank you
ReplyDeleteSpark and Scala Online Training
such a nice blog and so much informational i got through your blog, thanks for sharing!!!
ReplyDeleteHadoop Big Data Overview
Big Data Analytics Introduction to R
This blog looks very nice,keep sharing more posts on big data ,hadoop.
ReplyDeletethank u for infp.
big data and hadoop online training
hadoop admin online training
this blogs is very useful
ReplyDeleteGood information thank you. If you want to read similar technology article/news then visit us,
ReplyDeletewe are technology/news/smartphone company. Visit us: https://techmie.com/
Big Data Service has been in the headlines a lot lately. Companies like Amazon, Apple, Facebook, Google and Microsoft are all using Hadoop to store and mine their data, and there is a growing demand for Hadoop consultants. So if you're a programmer, an analyst, or data scientist, it's now a great time to jump into the Hadoop field.
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteThis is a really authentic and informative blog. Share more posts like this.
ReplyDeleteData Analytics Courses in Chennai
Big Data Analytics Courses in Chennai
Hadoop Admin Training in Chennai
Excellent article for the people who need information about this course.
ReplyDeleteHadoop
Big Data
This is a really authentic and informative blog. Share more posts like this.
ReplyDeletePhonetics Sounds With Examples
Basics Of Phonetics
ReplyDeleteThis blog gives me some valuable information, Thanks a lot.
Oracle Certification in Chennai
oracle certification courses online
oracle course in Coimbatore
This post is so helpfull and informative.keep updating with more information...
ReplyDeleteSwift Programming Training In Mumbai
Swift Developer Training In Ahmedabad
Swift Developer Training In Kochi
Swift Developer Training In Trivandrum
Swift Programming Training In Kolkata
Thanks for this blog, this blog contains very useful information.
ReplyDeleteAppium Training in Chennai
Appium Training Online
More impressive blog!!! Thanks for shared with us.... waiting for you upcoming data.
ReplyDeleteWhy software testing is important
Importance of software testing
ReplyDeleteGreat Post with valuable information.Thank you. Share more updates.
Bilingual language
Second language
Thanks for this blog, This blog contains more useful Information...
ReplyDeleteWhat Is MERN
What Is MERN Stack Used For