Sunday, January 19, 2020

Bigdata Project with 8 Emerging Open Sources End to End


In this post, we will see discuss about the code flow on this above project with multiple phases.

---------------------------------------------------------------------------
Presented By Dineshkumar Selvaraj
---------------------------------------------------------------------------

Those are :

  1. Kafka Streaming
  2. Table creation on mySQL Table
  3. Spark structured streaming (Python)
  4. SQOOP Incremental job
  5. Hive Tables
  6. Spark - Hive Aggregation
  7. Airflow scheduler
  8. Grafana

1. Kafka Streaming :-

                  Here we used Apache Kafka 2.3.2 version, So please follow the below steps to kick start your kafka cluster and create the topic accordingly
  1. run Zookeeper 
    • zkServer.sh start
  2. start kafka server
    • ./bin/kafka-server-start.sh ./config/server.properties --override delete.topic.enable=true
  3. creating kafka topics
    • ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kfktpkOne 

2. mySQL Queries :-

>>use test1;
>>create table weblogdetails (id int NOT NULL AUTO_INCREMENT,datevalue timestamp, ipaddress varchar(150),host varchar(150),url varchar(150),responsecode integer,PRIMARY KEY (id));

>>INSERT INTO weblogdetails (datevalue,ipaddress,host,url,responsecode) VALUES ('2019-10-28 12:59:06','10.128.2.1','nhmasternode','GET /css/main.css HTTP/1.1','200');


3. PYSPARK Streaming:-

from test.dbLoad import ConnectDatabase
import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from os import *

spark = SparkSession.builder.appName("Kafka_Spark").getOrCreate()  # Spark 2.x

spark.sparkContext.setLogLevel("ERROR")

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "kfktpkOne").load()

df1 = df.selectExpr("CAST(value AS STRING)")

df2 = df1.withColumn("value_split", split(col("value"), ",")) \
    .withColumn("datevalue", to_timestamp(col("value_split").getItem(0), 'yyyy/MM/dd HH:mm:ss'))\
    .withColumn("ipaddress", col("value_split").getItem(1)) \
    .withColumn("Host", col("value_split").getItem(2)) \
    .withColumn("ReqURL", col("value_split").getItem(3)) \
    .withColumn("ResponseCode", col("value_split").getItem(4).cast("Integer")) \
    .drop("value_split","value")

df2.printSchema()

df2.writeStream.format("console").option("truncate", "false").outputMode("append").start() 

df2.writeStream.outputMode("update").foreach(ConnectDatabase()).start().awaitTermination()


4. SQOOP Command:-

sqoop job --create Sqoop_weblogdetails_test1 -- import \
--connect jdbc:mysql://localhost:3306/test1 \
--username root \
--password-file  file:///home/bigdatapedia/00HADOOP/00EcoSystem/sqoopmysql.password \
--table weblogdetails \
--target-dir /airflowproject/Sqoop_weblogdetails_test1 \
--incremental append \
--check-column id \
--last-value 0 \
-m1 \
--direct

sqoop job --create Sqoop_weblogdetails_test1 -- import --connect jdbc:mysql://localhost:3306/test --username root --password-file  file:///home/bigdatapedia/00HADOOP/00EcoSystem/sqoopmysql.password --table weblogdetails --target-dir /airflowproject/Sqoop_weblogdetails_test1 --incremental append --check-column id --last-value 0 -m1 --direct

sqoop job --exec Sqoop_weblogdetails_test1


5. Hive Command:-

use test1;

create external table weblog_external (id int, datevalue string,ipaddress string, host string, url string, responsecode int) row format delimited fields terminated by ',' stored as textfile location '/airflowproject/Sqoop_weblogdetails_test1';

select * from weblog_external limit 5;
select count(*) from weblog_external;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=1000;

create table weblog_Dynamicpart_sqoop_internal (id int, datevalue string, ipaddress string, url string, responsecode int) partitioned by (host string) row format delimited fields terminated by ',' stored as textfile;

insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);

insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);


6. PYSPARK Aggregation:-

import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from os import *

spark = SparkSession.builder.appName("Hive_Spark").master("local[*]").enableHiveSupport().getOrCreate()  # Spark 2.x

spark.sparkContext.setLogLevel("ERROR")

spark.sql("use test1")

df = spark.sql("select * from weblog_Dynamicpart_sqoop_internal order by id")

df.show()

df1=df.groupBy(['host','ipaddress']).agg({'responsecode':'sum','url':'count'})\
    .withColumnRenamed("sum(responsecode)","Total_ResponseCode")\
    .withColumnRenamed("count(url)","Total_URL")
df1.show()

sqlproperties = {"user": "root", "password": "root", "driver": "com.mysql.cj.jdbc.Driver"}

print("\n", "Mysql Ingestion started", "\n")

df1.write.jdbc(url="jdbc:mysql://localhost:3306/test1", table="hive_agg", mode="overwrite", properties=sqlproperties)


7. Airflow DAG:-

import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.hive_operator import HiveOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.operators.dummy_operator import DummyOperator

DAG_NAME = 'test_project_airflow_9'
args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(1),
}

dag_prjt_main = DAG(
    dag_id=DAG_NAME,
    default_args=args,
    schedule_interval='* * * * *' #"@once"
)

SQOOP_Task1 = BashOperator(task_id="Sqoop_Incremental",
                      bash_command='sqoop job --exec testweblog_ws1', dag=dag_prjt_main)

my_query = """
    USE {{ params.db }};
    set hive.exec.dynamic.partition=true;
    set hive.exec.dynamic.partition.mode=nonstrict;          
    set hive.exec.max.dynamic.partitions=1000;
    insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);
    """

hive_Task2 = HiveOperator(
    task_id= "Hive_Seeding",
    hive_cli_conn_id='hive_local',
    hql = my_query,
    params={'db': 'test'},
    dag=dag_prjt_main)

spark_submit_Task3 = SparkSubmitOperator(
    task_id="sparksubmit_Aggregate",
    application='/home/bigdatapedia/PycharmProjects/Airflow_Demo/test/testspark_hiveagg.py',
    conn_id='spark_local', dag= dag_prjt_main)


SQOOP_Task1 >> hive_Task2 >> spark_submit_Task3

if __name__ == '__main__':
    dag_prjt_main.cli()