Sunday, January 19, 2020

Bigdata Project with 8 Emerging Open Sources End to End


In this post, we will see discuss about the code flow on this above project with multiple phases.

---------------------------------------------------------------------------
Presented By Dineshkumar Selvaraj
---------------------------------------------------------------------------

Those are :

  1. Kafka Streaming
  2. Table creation on mySQL Table
  3. Spark structured streaming (Python)
  4. SQOOP Incremental job
  5. Hive Tables
  6. Spark - Hive Aggregation
  7. Airflow scheduler
  8. Grafana

1. Kafka Streaming :-

                  Here we used Apache Kafka 2.3.2 version, So please follow the below steps to kick start your kafka cluster and create the topic accordingly
  1. run Zookeeper 
    • zkServer.sh start
  2. start kafka server
    • ./bin/kafka-server-start.sh ./config/server.properties --override delete.topic.enable=true
  3. creating kafka topics
    • ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kfktpkOne 

2. mySQL Queries :-

>>use test1;
>>create table weblogdetails (id int NOT NULL AUTO_INCREMENT,datevalue timestamp, ipaddress varchar(150),host varchar(150),url varchar(150),responsecode integer,PRIMARY KEY (id));

>>INSERT INTO weblogdetails (datevalue,ipaddress,host,url,responsecode) VALUES ('2019-10-28 12:59:06','10.128.2.1','nhmasternode','GET /css/main.css HTTP/1.1','200');


3. PYSPARK Streaming:-

from test.dbLoad import ConnectDatabase
import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from os import *

spark = SparkSession.builder.appName("Kafka_Spark").getOrCreate()  # Spark 2.x

spark.sparkContext.setLogLevel("ERROR")

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "kfktpkOne").load()

df1 = df.selectExpr("CAST(value AS STRING)")

df2 = df1.withColumn("value_split", split(col("value"), ",")) \
    .withColumn("datevalue", to_timestamp(col("value_split").getItem(0), 'yyyy/MM/dd HH:mm:ss'))\
    .withColumn("ipaddress", col("value_split").getItem(1)) \
    .withColumn("Host", col("value_split").getItem(2)) \
    .withColumn("ReqURL", col("value_split").getItem(3)) \
    .withColumn("ResponseCode", col("value_split").getItem(4).cast("Integer")) \
    .drop("value_split","value")

df2.printSchema()

df2.writeStream.format("console").option("truncate", "false").outputMode("append").start() 

df2.writeStream.outputMode("update").foreach(ConnectDatabase()).start().awaitTermination()


4. SQOOP Command:-

sqoop job --create Sqoop_weblogdetails_test1 -- import \
--connect jdbc:mysql://localhost:3306/test1 \
--username root \
--password-file  file:///home/bigdatapedia/00HADOOP/00EcoSystem/sqoopmysql.password \
--table weblogdetails \
--target-dir /airflowproject/Sqoop_weblogdetails_test1 \
--incremental append \
--check-column id \
--last-value 0 \
-m1 \
--direct

sqoop job --create Sqoop_weblogdetails_test1 -- import --connect jdbc:mysql://localhost:3306/test --username root --password-file  file:///home/bigdatapedia/00HADOOP/00EcoSystem/sqoopmysql.password --table weblogdetails --target-dir /airflowproject/Sqoop_weblogdetails_test1 --incremental append --check-column id --last-value 0 -m1 --direct

sqoop job --exec Sqoop_weblogdetails_test1


5. Hive Command:-

use test1;

create external table weblog_external (id int, datevalue string,ipaddress string, host string, url string, responsecode int) row format delimited fields terminated by ',' stored as textfile location '/airflowproject/Sqoop_weblogdetails_test1';

select * from weblog_external limit 5;
select count(*) from weblog_external;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=1000;

create table weblog_Dynamicpart_sqoop_internal (id int, datevalue string, ipaddress string, url string, responsecode int) partitioned by (host string) row format delimited fields terminated by ',' stored as textfile;

insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);

insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);


6. PYSPARK Aggregation:-

import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from os import *

spark = SparkSession.builder.appName("Hive_Spark").master("local[*]").enableHiveSupport().getOrCreate()  # Spark 2.x

spark.sparkContext.setLogLevel("ERROR")

spark.sql("use test1")

df = spark.sql("select * from weblog_Dynamicpart_sqoop_internal order by id")

df.show()

df1=df.groupBy(['host','ipaddress']).agg({'responsecode':'sum','url':'count'})\
    .withColumnRenamed("sum(responsecode)","Total_ResponseCode")\
    .withColumnRenamed("count(url)","Total_URL")
df1.show()

sqlproperties = {"user": "root", "password": "root", "driver": "com.mysql.cj.jdbc.Driver"}

print("\n", "Mysql Ingestion started", "\n")

df1.write.jdbc(url="jdbc:mysql://localhost:3306/test1", table="hive_agg", mode="overwrite", properties=sqlproperties)


7. Airflow DAG:-

import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.hive_operator import HiveOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.operators.dummy_operator import DummyOperator

DAG_NAME = 'test_project_airflow_9'
args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(1),
}

dag_prjt_main = DAG(
    dag_id=DAG_NAME,
    default_args=args,
    schedule_interval='* * * * *' #"@once"
)

SQOOP_Task1 = BashOperator(task_id="Sqoop_Incremental",
                      bash_command='sqoop job --exec testweblog_ws1', dag=dag_prjt_main)

my_query = """
    USE {{ params.db }};
    set hive.exec.dynamic.partition=true;
    set hive.exec.dynamic.partition.mode=nonstrict;          
    set hive.exec.max.dynamic.partitions=1000;
    insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);
    """

hive_Task2 = HiveOperator(
    task_id= "Hive_Seeding",
    hive_cli_conn_id='hive_local',
    hql = my_query,
    params={'db': 'test'},
    dag=dag_prjt_main)

spark_submit_Task3 = SparkSubmitOperator(
    task_id="sparksubmit_Aggregate",
    application='/home/bigdatapedia/PycharmProjects/Airflow_Demo/test/testspark_hiveagg.py',
    conn_id='spark_local', dag= dag_prjt_main)


SQOOP_Task1 >> hive_Task2 >> spark_submit_Task3

if __name__ == '__main__':
    dag_prjt_main.cli()



17 comments:

  1. Nice information, thanks for sharing this blog.
    Spark Scala Training

    ReplyDelete
  2. such a nice blog and so much informational i got through your blog, thanks for sharing!!!
    Hadoop Big Data Overview
    Big Data Analytics Introduction to R

    ReplyDelete
  3. This blog looks very nice,keep sharing more posts on big data ,hadoop.

    thank u for infp.

    big data and hadoop online training
    hadoop admin online training

    ReplyDelete
  4. Good information thank you. If you want to read similar technology article/news then visit us,
    we are technology/news/smartphone company. Visit us: https://techmie.com/

    ReplyDelete
  5. Big Data Service has been in the headlines a lot lately. Companies like Amazon, Apple, Facebook, Google and Microsoft are all using Hadoop to store and mine their data, and there is a growing demand for Hadoop consultants. So if you're a programmer, an analyst, or data scientist, it's now a great time to jump into the Hadoop field.

    ReplyDelete
  6. This comment has been removed by the author.

    ReplyDelete
  7. Excellent article for the people who need information about this course.
    Hadoop
    Big Data

    ReplyDelete
  8. This is a really authentic and informative blog. Share more posts like this.
    Phonetics Sounds With Examples
    Basics Of Phonetics

    ReplyDelete
  9. Thanks for this blog, this blog contains very useful information.
    Appium Training in Chennai
    Appium Training Online

    ReplyDelete
  10. More impressive blog!!! Thanks for shared with us.... waiting for you upcoming data.
    Why software testing is important
    Importance of software testing

    ReplyDelete

  11. Great Post with valuable information.Thank you. Share more updates.
    Bilingual language
    Second language

    ReplyDelete
  12. Thanks for this blog, This blog contains more useful Information...
    What Is MERN
    What Is MERN Stack Used For

    ReplyDelete