It is expected that the world will need to battle the COVID-19 pandemic with precautious measures until an effective vaccine is developed. This project proposes a real-time safety monitoring system for COVID-19. The proposed system would employ an Internet of Things (IoT) framework. It to collect real-time data from users, to monitor the preventions taken for COVID 19.
- In this system, it will detect whether the person entering wearing the mask or not. The door leading to the store/ room will open when the person with the mask is detected.
- Secondly, it will count the number of people in the store and display it on the screen accordingly.
- In addition, a Threshold is kept on the number of people. If the count increase or decrease it will show accordingly the number of people can enter in the store.
Topics
Methodology of Safety Monitoring System
The project is divided into two major parts:
A) Face Mask Detection
- Visitors entering a room/ store will be asked to stand in a single file outside the main door. Camera with an OLED/LCD is placed at the entry.
- The cam will be used to perform face mask detection using OpenCV and TensorFlow.
- The door leading to the store/room will open only when the person with a face mask is detected.
- Once detected, it triggers the ESP32 through the web to open the door using a servo motor.
B) Crowd Counting
- A crowd counting algorithm will monitor the number of people inside the room.
- A threshold is maintained on the number of people, if the count increase or decreases it will display accordingly.
- Opens the door using Stepper motor if the count is less than threshold and no. of people that can enter is displayed on OLED/LCD.
- The OLED/LCD screen will subsequently show the number of people present inside the room/store.
Pin Connection
Output
Code
In this project, we will be defining two-channel one for mask detection and another one for crowd detection. The Face Mask detection script pushes 1 for mask detected and keeps it 1 for 15 secs and then again pushes 0. In this span of 15 secs the controller reads the 1 in the channel and opens the door for few seconds and again closes the door and waits till the channel again receives 1 (i.e. mask detected). The crowd detection script is going to push the count (no. of people) on the channel at every 15 predictions taking the maximum time predicted value as the final count. Finally, the controller reads the count value and displays it on the LCD as shown in the output video.
Face Mask Detection Code Explanation:
Firstly we will import the libraries required
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.models import load_model
from imutils.video import VideoStream
import numpy as np
import imutils
import time
import cv2
import os
import requests
import json
Now enter your API key obtained from Thingspeak by creating a channel.
baseURL = "https://api.thingspeak.com/update?api_key=enter-your-api-key-here"
Now we define a function that will push 1 to the channel when the face mask is detected on the baseURL link that consists of valid API key.
def pushtoWebServer(isMasked: bool):
if isMasked:
print("Person with Mask Detected")
time.sleep(2)
params = {'field1': 1}
dooropen= requests.post(baseURL,data=params)
print(dooropen.status_code)
#Push to webserver that the person is masked
else :
print("Person with No Mask Detected")
pass
After that, we define a function that will push 0 to the channel on the same link acting as a reset for the hardware at an interval of 15 seconds
def reset():
time.sleep(15)
print("Entering reset")
params2 = {'field1': 0}
doorclose= requests.post(baseURL,data=params2)
print(doorclose.status_code)
We then define the function for face mask detection and prediction
def detect_and_predict_mask(frame, faceNet, maskNet):
# grab the dimensions of the frame and then construct a blob
# from it
(h, w) = frame.shape[:2]
blob = cv2.dnn.blobFromImage(frame, 1.0, (224, 224),
(104.0, 177.0, 123.0))
# pass the blob through the network and obtain the face detections
faceNet.setInput(blob)
detections = faceNet.forward()
# print(detections.shape)
# initialize our list of faces, their corresponding locations,
# and the list of predictions from our face mask network
faces = []
locs = []
preds = []
# loop over the detections
for i in range(0, detections.shape[2]):
# extract the confidence (i.e., probability) associated with
# the detection
confidence = detections[0, 0, i, 2]
# filter out weak detections by ensuring the confidence is
# greater than the minimum confidence
if confidence > 0.5:
# compute the (x, y)-coordinates of the bounding box for
# the object
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
(startX, startY, endX, endY) = box.astype("int")
# ensure the bounding boxes fall within the dimensions of
# the frame
(startX, startY) = (max(0, startX), max(0, startY))
(endX, endY) = (min(w - 1, endX), min(h - 1, endY))
# extract the face ROI, convert it from BGR to RGB channel
# ordering, resize it to 224x224, and preprocess it
face = frame[startY:endY, startX:endX]
face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
face = cv2.resize(face, (224, 224))
face = img_to_array(face)
face = preprocess_input(face)
# add the face and bounding boxes to their respective
# lists
faces.append(face)
locs.append((startX, startY, endX, endY))
# only make a predictions if at least one face was detected
if len(faces) > 0:
# for faster inference we'll make batch predictions on *all*
# faces at the same time rather than one-by-one predictions
# in the above `for` loop
faces = np.array(faces, dtype="float32")
preds = maskNet.predict(faces, batch_size=32)
# return a 2-tuple of the face locations and their corresponding
# locations
return (locs, preds)
Then next we define the paths of our model
# load our serialized face detector model from disk
prototxtPath = r"D:\\Face-mask-detection-master\\face_detector\\deploy.prototxt"
weightsPath = r"D:\\Face-mask-detection-master\\face_detector\\res10_300x300_ssd_iter_140000.caffemodel"
faceNet = cv2.dnn.readNet(prototxtPath, weightsPath)
# load the face mask detector model from disk
maskNet = load_model("D:\\Face-mask-detection-master\\mask_detector.model")
Finally we initialize the video stream and run the mask detection model on frames for mask detection
# initialize the video stream
print("[INFO] starting video stream...")
vs = VideoStream(src=0).start()
# loop over the frames from the video stream
while True:
# grab the frame from the threaded video stream and resize it
# to have a maximum width of 400 pixels
frame = vs.read()
frame = imutils.resize(frame, width=800)
# detect faces in the frame and determine if they are wearing a
# face mask or not
(locs, preds) = detect_and_predict_mask(frame, faceNet, maskNet)
# loop over the detected face locations and their corresponding
# locations
for (box, pred) in zip(locs, preds):
# unpack the bounding box and predictions
(startX, startY, endX, endY) = box
(mask, withoutMask) = pred
if mask >= 0.8:
pushtoWebServer(True)
reset()
# Write webserver script
else:
pushtoWebServer(False)
# determine the class label and color we'll use to draw
# the bounding box and text
label = "Mask" if mask > withoutMask else "No Mask"
color = (0, 255, 0) if label == "Mask" else (0, 0, 255)
# include the probability in the label
label = "{}: {:.2f}%".format(label, max(mask, withoutMask) * 100)
# display the label and bounding box rectangle on the output
# frame
cv2.putText(frame, label, (startX, startY - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 2)
cv2.rectangle(frame, (startX, startY), (endX, endY), color, 2)
# show the output frame
cv2.imshow("Frame", frame)
key = cv2.waitKey(1) & 0xFF
# if the `q` key was pressed, break from the loop
if key == ord("q"):
break
# do a bit of cleanup
cv2.destroyAllWindows()
vs.stop()
Crowd Detection Code Explanation:
Firstly we will import the libraries required
import numpy as np
import tensorflow as tf
import cv2
import time
import requests
import json
Now enter your API key obtained from Thingspeak by creating a channel. We are using the update link provided by Thingspeak. Also, we define an empty dictionary for storing the count. The code pushes the no. of people based on the repetition of the predicted count. In other words, the code predicts the count (no. of people) 15 times and the number which is predicted the maximum times is pushed on the channel.
baseURL = "https://api.thingspeak.com/update?api_key=FUS3OTM13TWXPYZE"
pushcount ={}
Then we define the class named DetectorAPI for count detection for this we have used a prebuilt model named faster rcnn inspection v2 coco
class DetectorAPI:
def __init__(self, path_to_ckpt):
self.path_to_ckpt = path_to_ckpt
self.detection_graph = tf.Graph()
with self.detection_graph.as_default():
od_graph_def = tf.compat.v1.GraphDef()
with tf.compat.v2.io.gfile.GFile(self.path_to_ckpt, 'rb') as fid:
serialized_graph = fid.read()
od_graph_def.ParseFromString(serialized_graph)
tf.import_graph_def(od_graph_def, name='')
self.default_graph = self.detection_graph.as_default()
self.sess = tf.compat.v1.Session(graph=self.detection_graph)
# Definite input and output Tensors for detection_graph
self.image_tensor = self.detection_graph.get_tensor_by_name('image_tensor:0')
# Each box represents a part of the image where a particular object was detected.
self.detection_boxes = self.detection_graph.get_tensor_by_name('detection_boxes:0')
# Each score represent how level of confidence for each of the objects.
# Score is shown on the result image, together with the class label.
self.detection_scores = self.detection_graph.get_tensor_by_name('detection_scores:0')
self.detection_classes = self.detection_graph.get_tensor_by_name('detection_classes:0')
self.num_detections = self.detection_graph.get_tensor_by_name('num_detections:0')
def processFrame(self, image):
# Expand dimensions since the trained_model expects images to have shape: [1, None, None, 3]
image_np_expanded = np.expand_dims(image, axis=0)
# Actual detection.
start_time = time.time()
(boxes, scores, classes, num) = self.sess.run(
[self.detection_boxes, self.detection_scores, self.detection_classes, self.num_detections],
feed_dict={self.image_tensor: image_np_expanded})
end_time = time.time()
print("Elapsed Time:", end_time-start_time)
im_height, im_width,_ = image.shape
boxes_list = [None for i in range(boxes.shape[1])]
for i in range(boxes.shape[1]):
boxes_list[i] = (int(boxes[0,i,0] * im_height),
int(boxes[0,i,1]*im_width),
int(boxes[0,i,2] * im_height),
int(boxes[0,i,3]*im_width))
return boxes_list, scores[0].tolist(), [int(x) for x in classes[0].tolist()], int(num[0])
def close(self):
self.sess.close()
self.default_graph.close()
Then we define the variable required, model path and the threshold value. For count detection, we have used a video and not live feed from a webcam.
if __name__ == "__main__":
model_path = 'D:\\Face-mask-detection-master\\faster_rcnn_inception_v2_coco_2018_01_28\\frozen_inference_graph.pb'
odapi = DetectorAPI(path_to_ckpt=model_path)
threshold = 0.7
cap = cv2.VideoCapture('D:\\Face-mask-detection-master\\final2.mp4')
times=0
temp=0
Finally we define the loop which is continuously running on the frames fetched from video for count detection.
while True:
r, img = cap.read()
temp=temp+1
# print(temp)
if temp%3 != 0:
# print("returning")
continue
img = cv2.resize(img, (720, 480))
boxes, scores, classes, num = odapi.processFrame(img)
# Visualization of the results of a detection.
count=0
for i in range(len(boxes)):
# Class 1 represents human
if classes[i] == 1 and scores[i] > threshold:
box = boxes[i]
cv2.rectangle(img,(box[1],box[0]),(box[3],box[2]),(255,0,0),2)
count=count+1
times=times+1
pushcount[count]= pushcount.get(count,0) + 1
print(count,pushcount)
if (times == 15):
finalcount= max(pushcount, key= lambda x: pushcount[x])
params = {'field1': finalcount}
push= requests.post(baseURL,data=params)
print("Count pushed {}, Count Verification {}".format(finalcount,push.status_code))
times=0
pushcount.clear()
cv2.imshow("preview", img)
key = cv2.waitKey(30)
if key & 0xFF == ord('q'):
break
Hardware Code Explanation:
Firstly we add the required library. Now as we are using ESP32 we have defined ESP32Servo library so depending on your controller this library would change.
#include <ESP32Servo.h>
#include <WiFi.h>
#include <WiFiClient.h>
#include <WiFiServer.h>
#include <LiquidCrystal.h>
#include <ThingSpeak.h>
Now we define the network parameters i.e. WiFi ssid and password
const char* ssid = "Enter-your-WiFi-name";
const char* password = "Enter-your-WiFi-password";
Then we define some variables and Thingspeak channel information required to read channel data.
char* server = "api.thingspeak.com";
unsigned long DchannelID = Enter-your-channel-id;
unsigned long CchannelID = Enter-your-channel-id;
char* DreadAPIKey = "Enter-api-key";
char* CreadAPIKey = "Enter-api-key";
unsigned int dataFieldDoor = 1;
unsigned int dataFieldCount = 1;
Next we define the objects , variables and pin connection of LCD
Servo myservo; // create servo object to control a servo
LiquidCrystal lcd(22,23,5,18,19,21);
#you can change this based on your connections
// Global variables
// These constants are device specific. You need to get them from the manufacturer or determine them yourself.
int dOperation = 0;
int cOperation = 0;
int pos = 0;
WiFiClient client;
Next we define the function to connect to the WiFi and then we initiate Thingspeak
int connectWiFi() {
WiFi.begin( ssid, password );
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println( "Connected" );
ThingSpeak.begin( client );
Then we define function to read both channels (Mask and Crowd Detection channel)
float DreadTSData( long TSChannel, unsigned int TSField ) {
int data = ThingSpeak.readIntField( TSChannel, TSField, DreadAPIKey );
return data;
}
float CreadTSData( long TSChannel, unsigned int TSField ) {
int data = ThingSpeak.readIntField( TSChannel, TSField, CreadAPIKey );
return data;
}
Then we define the void setup for initializing all the hardware and serial port
void setup() {
Serial.begin(9600);
Serial.println("Start");
myservo.attach(13);
myservo.write(pos);
lcd.begin(16, 2);
lcd.clear();
lcd.setCursor(1,0);
lcd.print("Safety Monitor");
lcd.setCursor(5,1);
lcd.print ("System");
connectWiFi();
delay(2000);
}
Finally, we define the loop function which continuously monitors the count and mask values and triggers the respective hardware and messages on LCD
void loop() {
delay(3000);
Serial.println("Waiting...");
cOperation = CreadTSData( CchannelID, dataFieldCount );
Serial.println( " Data read from Count Detection Channel: " + String(cOperation) );
lcd.clear();
lcd.print("Human Detected " + String(cOperation));
lcd.setCursor(2,1);
lcd.print("Maximum : 4");
delay(2000);
//reading data from thing speak
dOperation = DreadTSData( DchannelID, dataFieldDoor );
Serial.println( " Data read from Mask Detection Channel: " + String(dOperation) );
if (dOperation == 1 and cOperation < 4 ){
lcd.clear();
lcd.setCursor(1,0);
lcd.print("Mask Detected");
lcd.setCursor(2,1);
lcd.print("Door Opened");
pos=100;
myservo.write(pos);
delay(10000);
pos=pos-100;
myservo.write(pos);
lcd.clear();
lcd.setCursor(2,0);
lcd.print("Door Closed");
delay(2000);
}
if(cOperation >= 4){
lcd.clear();
lcd.setCursor(0,0);
lcd.print("MaxCount Reached");
lcd.setCursor(2,1);
lcd.print("Please Wait");
}
else{
lcd.clear();
lcd.print("No Mask Detected");
lcd.setCursor(0,1);
lcd.print("Look into Camera");
}
}
Benefits of Safety Monitoring System
- Firstly, the system restricts access for those not wearing the masks and notifies the authorities.
- Secondly, the system is easy to implement.
- Thirdly, it is easy to access and control
- Moreover, the system can be connected with your existing system eg. CCTV cameras.
- Partially occluded faces either with mask or hair or hand, can be easily detected.
Credits
- Mehul Jain
- Aishwarya Kanchan
- Vaishnavi Tiwari
- Jaibhan Singh Gaur