Monday, September 28, 2020

Real Time Aggregation Using Spark Structured Streaming




Real Time Aggregation Using Spark Structured Streaming




MQTT

import sys
import datetime
import json
import paho.mqtt.client as mqtt
import random
import time

mqtt_hostname = "192.168.56.117"
mqtt_queue = "hostqueue"

while True:
    dt = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    temperature = round(random.uniform(20.0, 32.0), 2)
    humidity = round(random.uniform(60.0, 72.0), 2)
    pressure = round(random.uniform(100.0, 125.0), 2)
    waterLevel = round(random.uniform(40.0, 60.0), 2)

    if humidity is not None and temperature is not None:
        # body = str(dt) + ",RaspberryPi-4," + str(temperature) + "," + str(humidity) + "," + str(pressure) + "," + str(waterLevel)
        body = "%s,RaspberryPi-4,%s,%s,%s,%s" % (dt, temperature, humidity, pressure, waterLevel)
        print("body \n", body)

        # This is the Publisher
        client = mqtt.Client()
        client.connect(mqtt_hostname, 1883, 60)
        client.publish(mqtt_queue, str(body))
        client.disconnect()
        time.sleep(15)
    else:
        print('Failed to get reading. Try again!')
        sys.exit(1)


MQTT to Kafka

import os
import time
from kafka import KafkaProducer
import paho.mqtt.client as mqtt
from json import dumps
from kafka.errors import KafkaError, NoBrokersAvailable

topic_mqtt = "hostqueue"
topic_kafka = "kfktpkOne"
# This is the Subscriber
## KAFKA
def send_message_to_kafka(message):
    print(str(message).replace("\"", ""))
    print("sending message to kafka: %s" % message)
    producer.send(topic_kafka, message)


def mqtt_to_kafka_run():
    # Pick messages off MQTT queue and put them on Kafka
    client_name = "home_connector_%s" % os.getenv("localhost")
    print("\n Client Name ", client_name)
    client = mqtt.Client(client_id=client_name)

    on_connect = lambda client, userdata, flags, rc: client.subscribe(topic_mqtt)
    client.on_connect = on_connect

    on_message = lambda client, userdata, msg: send_message_to_kafka(msg.payload.decode()) 
    client.on_message = on_message

    on_disconnect = lambda client, user_data, rc: print("""Disconnected client: %s user_data: %s rc: %s """ % (client, user_data, rc))
    client.on_disconnect = on_disconnect

    client.connect("192.168.56.117", 1883, 60)
    client.loop_forever()


if __name__ == '__main__':
    attempts = 0
    while attempts < 10:
        try:
            producer = KafkaProducer(bootstrap_servers=['192.168.56.117:9092'],
                                     value_serializer=lambda x: dumps(x).encode('utf-8'))
            mqtt_to_kafka_run()
        except NoBrokersAvailable:
            print("No Brokers. Attempt %s" % attempts)
            attempts = attempts + 1
            time.sleep(2)


Spark Structured Streaming


from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("BigData Workshop 14").master("local[2]").getOrCreate()

kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.56.117:9092").option("subscribe", "kfktpkOne").load()

kafka_df_1 = kafka_df.selectExpr("cast(value as string)")

# "2020-09-26 16:31:48,RaspberryPi-4,23.1,70.53,124.24,57.29"
kafka_df_2 = kafka_df_1.withColumn("body", regexp_replace(kafka_df_1.value, "\"", ""))

kafka_df_3 = kafka_df_2.withColumn("body_1", split(kafka_df_2.body, ","))

kafka_df_4 =kafka_df_3.withColumn("Timestamp", col("body_1").getItem(0))\
    .withColumn("Device_Name", col("body_1").getItem(1))\
    .withColumn("Temperature", col("body_1").getItem(2).cast("Double"))\
    .withColumn("Humidity", col("body_1").getItem(3).cast("Double"))\
    .withColumn("Pressure", col("body_1").getItem(4).cast("Double"))\
    .withColumn("WaterLevel", col("body_1").getItem(5).cast("Double")).drop("body_1", "body", "value")

kafka_df_4.printSchema()

df_window = kafka_df_4.groupBy(window(kafka_df_4.Timestamp, "5 seconds"), "Device_Name").mean().orderBy("window")

df_window.printSchema()
#
# df_window_1 = df_window.withColumnRenamed("window.start", "StartTime").withColumnRenamed("window.endtime", "EndTime")\
#     .withColumnRenamed("avg(Temperature)", "Avg_Temperature").withColumnRenamed("avg(Humidity)", "Avg_Humidity")\
#     .withColumnRenamed("avg(Pressure)", "Avg_Pressure").withColumnRenamed("avg(WaterLevel)", "Avg_WaterLevel")

df_window_1 = df_window.select(col("window.start").alias("StartTime"), col("window.end").alias("EndTime"), "Device_Name",
                               col("avg(Temperature)").alias("Avg_Temperature"),
                               col("avg(Humidity)").alias("Avg_Humidity"),
                               col("avg(Pressure)").alias("Avg_Pressure"),
                               col("avg(WaterLevel)").alias("Avg_WaterLevel"))

df_window_1.printSchema()
# window                                    |Device_Name  |avg(Temperature)  |avg(Humidity)     |avg(Pressure)     |avg(WaterLevel)
# starttime         |          endtime          |  device_name  | avg_temperature | avg_humidity | avg_pressure | avg_waterlevel


import psycopg2

class AggInsertPostgres:
    def process(self, row):
        StartTime = str(row.StartTime)
        EndTime = str(row.EndTime)
        Device_Name = row.Device_Name
        Temperature = row.Avg_Temperature
        Humidity = row.Avg_Humidity
        Pressure = row.Avg_Pressure
        WaterLevel = row.Avg_WaterLevel
        try:
            connection = psycopg2.connect(user="postgres",
                                          password="postgres",
                                          host="192.168.99.100",
                                          port="5432",
                                          database="sample_project")
            cursor = connection.cursor()

            sql_insert_query = """ INSERT INTO agg_rpy (StartTime, EndTime, Device_Name, Avg_Temperature, Avg_Humidity, Avg_Pressure, Avg_WaterLevel)
                               VALUES ('%s', '%s', '%s', %d, %d, %d, %d) """ % (StartTime, EndTime, Device_Name, Temperature, Humidity,
                                                                     Pressure, WaterLevel)

            print("\nsql_insert_query ", sql_insert_query)

            # executemany() to insert multiple rows rows
            result = cursor.execute(sql_insert_query)
            connection.commit()
            print(cursor.rowcount, "Record inserted successfully into agg_rpy table")
        except (Exception, psycopg2.Error) as error:
            print("Failed inserting record into table {}".format(error))
        finally:
            # closing database connection.
            if connection:
                cursor.close()
                connection.close()
                # print("PostgreSQL connection is closed")


df_window_1.writeStream.format("console").outputMode("complete").option("truncate", "false").start()

df_window_1.writeStream.outputMode("complete").foreach(AggInsertPostgres()).start()

kafka_df_4.writeStream.format("console").outputMode("update").option("truncate", "false").start().awaitTermination()