Monday, September 28, 2020

Real Time Aggregation Using Spark Structured Streaming




Real Time Aggregation Using Spark Structured Streaming




MQTT

import sys
import datetime
import json
import paho.mqtt.client as mqtt
import random
import time

mqtt_hostname = "192.168.56.117"
mqtt_queue = "hostqueue"

while True:
    dt = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    temperature = round(random.uniform(20.0, 32.0), 2)
    humidity = round(random.uniform(60.0, 72.0), 2)
    pressure = round(random.uniform(100.0, 125.0), 2)
    waterLevel = round(random.uniform(40.0, 60.0), 2)

    if humidity is not None and temperature is not None:
        # body = str(dt) + ",RaspberryPi-4," + str(temperature) + "," + str(humidity) + "," + str(pressure) + "," + str(waterLevel)
        body = "%s,RaspberryPi-4,%s,%s,%s,%s" % (dt, temperature, humidity, pressure, waterLevel)
        print("body \n", body)

        # This is the Publisher
        client = mqtt.Client()
        client.connect(mqtt_hostname, 1883, 60)
        client.publish(mqtt_queue, str(body))
        client.disconnect()
        time.sleep(15)
    else:
        print('Failed to get reading. Try again!')
        sys.exit(1)


MQTT to Kafka

import os
import time
from kafka import KafkaProducer
import paho.mqtt.client as mqtt
from json import dumps
from kafka.errors import KafkaError, NoBrokersAvailable

topic_mqtt = "hostqueue"
topic_kafka = "kfktpkOne"
# This is the Subscriber
## KAFKA
def send_message_to_kafka(message):
    print(str(message).replace("\"", ""))
    print("sending message to kafka: %s" % message)
    producer.send(topic_kafka, message)


def mqtt_to_kafka_run():
    # Pick messages off MQTT queue and put them on Kafka
    client_name = "home_connector_%s" % os.getenv("localhost")
    print("\n Client Name ", client_name)
    client = mqtt.Client(client_id=client_name)

    on_connect = lambda client, userdata, flags, rc: client.subscribe(topic_mqtt)
    client.on_connect = on_connect

    on_message = lambda client, userdata, msg: send_message_to_kafka(msg.payload.decode()) 
    client.on_message = on_message

    on_disconnect = lambda client, user_data, rc: print("""Disconnected client: %s user_data: %s rc: %s """ % (client, user_data, rc))
    client.on_disconnect = on_disconnect

    client.connect("192.168.56.117", 1883, 60)
    client.loop_forever()


if __name__ == '__main__':
    attempts = 0
    while attempts < 10:
        try:
            producer = KafkaProducer(bootstrap_servers=['192.168.56.117:9092'],
                                     value_serializer=lambda x: dumps(x).encode('utf-8'))
            mqtt_to_kafka_run()
        except NoBrokersAvailable:
            print("No Brokers. Attempt %s" % attempts)
            attempts = attempts + 1
            time.sleep(2)


Spark Structured Streaming


from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("BigData Workshop 14").master("local[2]").getOrCreate()

kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.56.117:9092").option("subscribe", "kfktpkOne").load()

kafka_df_1 = kafka_df.selectExpr("cast(value as string)")

# "2020-09-26 16:31:48,RaspberryPi-4,23.1,70.53,124.24,57.29"
kafka_df_2 = kafka_df_1.withColumn("body", regexp_replace(kafka_df_1.value, "\"", ""))

kafka_df_3 = kafka_df_2.withColumn("body_1", split(kafka_df_2.body, ","))

kafka_df_4 =kafka_df_3.withColumn("Timestamp", col("body_1").getItem(0))\
    .withColumn("Device_Name", col("body_1").getItem(1))\
    .withColumn("Temperature", col("body_1").getItem(2).cast("Double"))\
    .withColumn("Humidity", col("body_1").getItem(3).cast("Double"))\
    .withColumn("Pressure", col("body_1").getItem(4).cast("Double"))\
    .withColumn("WaterLevel", col("body_1").getItem(5).cast("Double")).drop("body_1", "body", "value")

kafka_df_4.printSchema()

df_window = kafka_df_4.groupBy(window(kafka_df_4.Timestamp, "5 seconds"), "Device_Name").mean().orderBy("window")

df_window.printSchema()
#
# df_window_1 = df_window.withColumnRenamed("window.start", "StartTime").withColumnRenamed("window.endtime", "EndTime")\
#     .withColumnRenamed("avg(Temperature)", "Avg_Temperature").withColumnRenamed("avg(Humidity)", "Avg_Humidity")\
#     .withColumnRenamed("avg(Pressure)", "Avg_Pressure").withColumnRenamed("avg(WaterLevel)", "Avg_WaterLevel")

df_window_1 = df_window.select(col("window.start").alias("StartTime"), col("window.end").alias("EndTime"), "Device_Name",
                               col("avg(Temperature)").alias("Avg_Temperature"),
                               col("avg(Humidity)").alias("Avg_Humidity"),
                               col("avg(Pressure)").alias("Avg_Pressure"),
                               col("avg(WaterLevel)").alias("Avg_WaterLevel"))

df_window_1.printSchema()
# window                                    |Device_Name  |avg(Temperature)  |avg(Humidity)     |avg(Pressure)     |avg(WaterLevel)
# starttime         |          endtime          |  device_name  | avg_temperature | avg_humidity | avg_pressure | avg_waterlevel


import psycopg2

class AggInsertPostgres:
    def process(self, row):
        StartTime = str(row.StartTime)
        EndTime = str(row.EndTime)
        Device_Name = row.Device_Name
        Temperature = row.Avg_Temperature
        Humidity = row.Avg_Humidity
        Pressure = row.Avg_Pressure
        WaterLevel = row.Avg_WaterLevel
        try:
            connection = psycopg2.connect(user="postgres",
                                          password="postgres",
                                          host="192.168.99.100",
                                          port="5432",
                                          database="sample_project")
            cursor = connection.cursor()

            sql_insert_query = """ INSERT INTO agg_rpy (StartTime, EndTime, Device_Name, Avg_Temperature, Avg_Humidity, Avg_Pressure, Avg_WaterLevel)
                               VALUES ('%s', '%s', '%s', %d, %d, %d, %d) """ % (StartTime, EndTime, Device_Name, Temperature, Humidity,
                                                                     Pressure, WaterLevel)

            print("\nsql_insert_query ", sql_insert_query)

            # executemany() to insert multiple rows rows
            result = cursor.execute(sql_insert_query)
            connection.commit()
            print(cursor.rowcount, "Record inserted successfully into agg_rpy table")
        except (Exception, psycopg2.Error) as error:
            print("Failed inserting record into table {}".format(error))
        finally:
            # closing database connection.
            if connection:
                cursor.close()
                connection.close()
                # print("PostgreSQL connection is closed")


df_window_1.writeStream.format("console").outputMode("complete").option("truncate", "false").start()

df_window_1.writeStream.outputMode("complete").foreach(AggInsertPostgres()).start()

kafka_df_4.writeStream.format("console").outputMode("update").option("truncate", "false").start().awaitTermination()

Sunday, January 19, 2020

Bigdata Project with 8 Emerging Open Sources End to End


In this post, we will see discuss about the code flow on this above project with multiple phases.

---------------------------------------------------------------------------
Presented By Dineshkumar Selvaraj
---------------------------------------------------------------------------

Those are :

  1. Kafka Streaming
  2. Table creation on mySQL Table
  3. Spark structured streaming (Python)
  4. SQOOP Incremental job
  5. Hive Tables
  6. Spark - Hive Aggregation
  7. Airflow scheduler
  8. Grafana

1. Kafka Streaming :-

                  Here we used Apache Kafka 2.3.2 version, So please follow the below steps to kick start your kafka cluster and create the topic accordingly
  1. run Zookeeper 
    • zkServer.sh start
  2. start kafka server
    • ./bin/kafka-server-start.sh ./config/server.properties --override delete.topic.enable=true
  3. creating kafka topics
    • ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kfktpkOne 

2. mySQL Queries :-

>>use test1;
>>create table weblogdetails (id int NOT NULL AUTO_INCREMENT,datevalue timestamp, ipaddress varchar(150),host varchar(150),url varchar(150),responsecode integer,PRIMARY KEY (id));

>>INSERT INTO weblogdetails (datevalue,ipaddress,host,url,responsecode) VALUES ('2019-10-28 12:59:06','10.128.2.1','nhmasternode','GET /css/main.css HTTP/1.1','200');


3. PYSPARK Streaming:-

from test.dbLoad import ConnectDatabase
import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from os import *

spark = SparkSession.builder.appName("Kafka_Spark").getOrCreate()  # Spark 2.x

spark.sparkContext.setLogLevel("ERROR")

df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "kfktpkOne").load()

df1 = df.selectExpr("CAST(value AS STRING)")

df2 = df1.withColumn("value_split", split(col("value"), ",")) \
    .withColumn("datevalue", to_timestamp(col("value_split").getItem(0), 'yyyy/MM/dd HH:mm:ss'))\
    .withColumn("ipaddress", col("value_split").getItem(1)) \
    .withColumn("Host", col("value_split").getItem(2)) \
    .withColumn("ReqURL", col("value_split").getItem(3)) \
    .withColumn("ResponseCode", col("value_split").getItem(4).cast("Integer")) \
    .drop("value_split","value")

df2.printSchema()

df2.writeStream.format("console").option("truncate", "false").outputMode("append").start() 

df2.writeStream.outputMode("update").foreach(ConnectDatabase()).start().awaitTermination()


4. SQOOP Command:-

sqoop job --create Sqoop_weblogdetails_test1 -- import \
--connect jdbc:mysql://localhost:3306/test1 \
--username root \
--password-file  file:///home/bigdatapedia/00HADOOP/00EcoSystem/sqoopmysql.password \
--table weblogdetails \
--target-dir /airflowproject/Sqoop_weblogdetails_test1 \
--incremental append \
--check-column id \
--last-value 0 \
-m1 \
--direct

sqoop job --create Sqoop_weblogdetails_test1 -- import --connect jdbc:mysql://localhost:3306/test --username root --password-file  file:///home/bigdatapedia/00HADOOP/00EcoSystem/sqoopmysql.password --table weblogdetails --target-dir /airflowproject/Sqoop_weblogdetails_test1 --incremental append --check-column id --last-value 0 -m1 --direct

sqoop job --exec Sqoop_weblogdetails_test1


5. Hive Command:-

use test1;

create external table weblog_external (id int, datevalue string,ipaddress string, host string, url string, responsecode int) row format delimited fields terminated by ',' stored as textfile location '/airflowproject/Sqoop_weblogdetails_test1';

select * from weblog_external limit 5;
select count(*) from weblog_external;

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=1000;

create table weblog_Dynamicpart_sqoop_internal (id int, datevalue string, ipaddress string, url string, responsecode int) partitioned by (host string) row format delimited fields terminated by ',' stored as textfile;

insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);

insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);


6. PYSPARK Aggregation:-

import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from os import *

spark = SparkSession.builder.appName("Hive_Spark").master("local[*]").enableHiveSupport().getOrCreate()  # Spark 2.x

spark.sparkContext.setLogLevel("ERROR")

spark.sql("use test1")

df = spark.sql("select * from weblog_Dynamicpart_sqoop_internal order by id")

df.show()

df1=df.groupBy(['host','ipaddress']).agg({'responsecode':'sum','url':'count'})\
    .withColumnRenamed("sum(responsecode)","Total_ResponseCode")\
    .withColumnRenamed("count(url)","Total_URL")
df1.show()

sqlproperties = {"user": "root", "password": "root", "driver": "com.mysql.cj.jdbc.Driver"}

print("\n", "Mysql Ingestion started", "\n")

df1.write.jdbc(url="jdbc:mysql://localhost:3306/test1", table="hive_agg", mode="overwrite", properties=sqlproperties)


7. Airflow DAG:-

import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.hive_operator import HiveOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.operators.dummy_operator import DummyOperator

DAG_NAME = 'test_project_airflow_9'
args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(1),
}

dag_prjt_main = DAG(
    dag_id=DAG_NAME,
    default_args=args,
    schedule_interval='* * * * *' #"@once"
)

SQOOP_Task1 = BashOperator(task_id="Sqoop_Incremental",
                      bash_command='sqoop job --exec testweblog_ws1', dag=dag_prjt_main)

my_query = """
    USE {{ params.db }};
    set hive.exec.dynamic.partition=true;
    set hive.exec.dynamic.partition.mode=nonstrict;          
    set hive.exec.max.dynamic.partitions=1000;
    insert into weblog_Dynamicpart_sqoop_internal partition(host) select id, datevalue, ipaddress, url, responsecode, host from weblog_external as b where not exists (select a.id from weblog_Dynamicpart_sqoop_internal as a where a.id = b.id);
    """

hive_Task2 = HiveOperator(
    task_id= "Hive_Seeding",
    hive_cli_conn_id='hive_local',
    hql = my_query,
    params={'db': 'test'},
    dag=dag_prjt_main)

spark_submit_Task3 = SparkSubmitOperator(
    task_id="sparksubmit_Aggregate",
    application='/home/bigdatapedia/PycharmProjects/Airflow_Demo/test/testspark_hiveagg.py',
    conn_id='spark_local', dag= dag_prjt_main)


SQOOP_Task1 >> hive_Task2 >> spark_submit_Task3

if __name__ == '__main__':
    dag_prjt_main.cli()