MQTT
import sys
import datetime
import json
import paho.mqtt.client as mqtt
import random
import time
mqtt_hostname = "192.168.56.117"
mqtt_queue = "hostqueue"
while True:
dt = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
temperature = round(random.uniform(20.0, 32.0), 2)
humidity = round(random.uniform(60.0, 72.0), 2)
pressure = round(random.uniform(100.0, 125.0), 2)
waterLevel = round(random.uniform(40.0, 60.0), 2)
if humidity is not None and temperature is not None:
# body = str(dt) + ",RaspberryPi-4," + str(temperature) + "," + str(humidity) + "," + str(pressure) + "," + str(waterLevel)
body = "%s,RaspberryPi-4,%s,%s,%s,%s" % (dt, temperature, humidity, pressure, waterLevel)
print("body \n", body)
# This is the Publisher
client = mqtt.Client()
client.connect(mqtt_hostname, 1883, 60)
client.publish(mqtt_queue, str(body))
client.disconnect()
time.sleep(15)
else:
print('Failed to get reading. Try again!')
sys.exit(1)
MQTT to Kafka
import os
import time
from kafka import KafkaProducer
import paho.mqtt.client as mqtt
from json import dumps
from kafka.errors import KafkaError, NoBrokersAvailable
topic_mqtt = "hostqueue"
topic_kafka = "kfktpkOne"
# This is the Subscriber
## KAFKA
def send_message_to_kafka(message):
print(str(message).replace("\"", ""))
print("sending message to kafka: %s" % message)
producer.send(topic_kafka, message)
def mqtt_to_kafka_run():
# Pick messages off MQTT queue and put them on Kafka
client_name = "home_connector_%s" % os.getenv("localhost")
print("\n Client Name ", client_name)
client = mqtt.Client(client_id=client_name)
on_connect = lambda client, userdata, flags, rc: client.subscribe(topic_mqtt)
client.on_connect = on_connect
on_message = lambda client, userdata, msg: send_message_to_kafka(msg.payload.decode())
client.on_message = on_message
on_disconnect = lambda client, user_data, rc: print("""Disconnected client: %s user_data: %s rc: %s """ % (client, user_data, rc))
client.on_disconnect = on_disconnect
client.connect("192.168.56.117", 1883, 60)
client.loop_forever()
if __name__ == '__main__':
attempts = 0
while attempts < 10:
try:
producer = KafkaProducer(bootstrap_servers=['192.168.56.117:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
mqtt_to_kafka_run()
except NoBrokersAvailable:
print("No Brokers. Attempt %s" % attempts)
attempts = attempts + 1
time.sleep(2)
Spark Structured Streaming
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("BigData Workshop 14").master("local[2]").getOrCreate()
kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.56.117:9092").option("subscribe", "kfktpkOne").load()
kafka_df_1 = kafka_df.selectExpr("cast(value as string)")
# "2020-09-26 16:31:48,RaspberryPi-4,23.1,70.53,124.24,57.29"
kafka_df_2 = kafka_df_1.withColumn("body", regexp_replace(kafka_df_1.value, "\"", ""))
kafka_df_3 = kafka_df_2.withColumn("body_1", split(kafka_df_2.body, ","))
kafka_df_4 =kafka_df_3.withColumn("Timestamp", col("body_1").getItem(0))\
.withColumn("Device_Name", col("body_1").getItem(1))\
.withColumn("Temperature", col("body_1").getItem(2).cast("Double"))\
.withColumn("Humidity", col("body_1").getItem(3).cast("Double"))\
.withColumn("Pressure", col("body_1").getItem(4).cast("Double"))\
.withColumn("WaterLevel", col("body_1").getItem(5).cast("Double")).drop("body_1", "body", "value")
kafka_df_4.printSchema()
df_window = kafka_df_4.groupBy(window(kafka_df_4.Timestamp, "5 seconds"), "Device_Name").mean().orderBy("window")
df_window.printSchema()
#
# df_window_1 = df_window.withColumnRenamed("window.start", "StartTime").withColumnRenamed("window.endtime", "EndTime")\
# .withColumnRenamed("avg(Temperature)", "Avg_Temperature").withColumnRenamed("avg(Humidity)", "Avg_Humidity")\
# .withColumnRenamed("avg(Pressure)", "Avg_Pressure").withColumnRenamed("avg(WaterLevel)", "Avg_WaterLevel")
df_window_1 = df_window.select(col("window.start").alias("StartTime"), col("window.end").alias("EndTime"), "Device_Name",
col("avg(Temperature)").alias("Avg_Temperature"),
col("avg(Humidity)").alias("Avg_Humidity"),
col("avg(Pressure)").alias("Avg_Pressure"),
col("avg(WaterLevel)").alias("Avg_WaterLevel"))
df_window_1.printSchema()
# window |Device_Name |avg(Temperature) |avg(Humidity) |avg(Pressure) |avg(WaterLevel)
# starttime | endtime | device_name | avg_temperature | avg_humidity | avg_pressure | avg_waterlevel
import psycopg2
class AggInsertPostgres:
def process(self, row):
StartTime = str(row.StartTime)
EndTime = str(row.EndTime)
Device_Name = row.Device_Name
Temperature = row.Avg_Temperature
Humidity = row.Avg_Humidity
Pressure = row.Avg_Pressure
WaterLevel = row.Avg_WaterLevel
try:
connection = psycopg2.connect(user="postgres",
password="postgres",
host="192.168.99.100",
port="5432",
database="sample_project")
cursor = connection.cursor()
sql_insert_query = """ INSERT INTO agg_rpy (StartTime, EndTime, Device_Name, Avg_Temperature, Avg_Humidity, Avg_Pressure, Avg_WaterLevel)
VALUES ('%s', '%s', '%s', %d, %d, %d, %d) """ % (StartTime, EndTime, Device_Name, Temperature, Humidity,
Pressure, WaterLevel)
print("\nsql_insert_query ", sql_insert_query)
# executemany() to insert multiple rows rows
result = cursor.execute(sql_insert_query)
connection.commit()
print(cursor.rowcount, "Record inserted successfully into agg_rpy table")
except (Exception, psycopg2.Error) as error:
print("Failed inserting record into table {}".format(error))
finally:
# closing database connection.
if connection:
cursor.close()
connection.close()
# print("PostgreSQL connection is closed")
df_window_1.writeStream.format("console").outputMode("complete").option("truncate", "false").start()
df_window_1.writeStream.outputMode("complete").foreach(AggInsertPostgres()).start()
kafka_df_4.writeStream.format("console").outputMode("update").option("truncate", "false").start().awaitTermination()