In this project, We will see how we can build a simple Car using Raspberry Pi, which drives itself in a track using the power of Image Processing.
Here’s a demo of the Car in Action.
CODE:
The entrie code for this project is available at the following github repository: https://github.com/htgdokania/Self-Driving-Car
Structure/Work Flow:
- STEP1: First we need to stream Pi Camera feed to a computer to process further
- STEP2: Read the frame on the server-side and start processing it.
- STEP3: Check for Red Stop Marker and send info to pi if found.
- STEP4: Detect the slope of the lines in the frame
- First, we apply canny edge detection.
- Next, we filter out the required region of interest (birds-eye view ).
- Next, We detect Lines using HoughLines and then get the slope of the detect lines in the region of interest
- Using this information We find the number of left lines and right lines.
- STEP5: Send info back to pi about (number of left lines, right lines & red marker status)to move the Car accordingly.
- STEP6: Raspberry Pi client-side code
- Receive data from the computer about the processed frame(no. of left, right lines & Red marker status )
- check for obstacle /Pedestrian using an Ultrasonic Sensor in Pi.
- Finally, make the Car move based on all this information processed.
STEP1: Stream Pi Camera feed to a computer to process further
- First setup Raspberry pi with raspbian operating system .Refer here
- Also, install the picamera on Raspberry pi and make sure it is enabled. Refer Here
Now, we need two scripts:
- A server (presumably on a fast machine, my case Windows Desktop) which listens for a connection from the Raspberry Pi, and
- A client that runs on the Raspberry Pi and sends a continuous stream of images to the server.
For Reference read the picamera documentation (HERE).
NOTE: Always run the Server first ,before running the client code on the raspberry pi .
Code for client VideoStreamClient.py (To be Run on Raspberry pi )
import io
import socket
import struct
import time
import picamera
# create socket and bind host
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(('192.168.31.7', 8000))#replace with the server ip address ,port=8000
connection = client_socket.makefile('wb')
try:
with picamera.PiCamera() as camera:
camera.resolution = (640, 480) # pi camera resolution
camera.framerate = 15 # 15 frames/sec
time.sleep(2) # give 2 secs for camera to initilize
start = time.time()
stream = io.BytesIO()
# send jpeg format video stream
for foo in camera.capture_continuous(stream, 'jpeg', use_video_port = True):
connection.write(struct.pack('<L', stream.tell()))
connection.flush()
stream.seek(0)
connection.write(stream.read())
stream.seek(0)
stream.truncate()
print('New frame sent.')
connection.write(struct.pack('<L', 0))
except:
pass
Code for Server part (Run on Desktop): For the server part we have written a code main.py in which we read the frame over network stream, as well as process the same for further actions.(explained in further steps)
STEP2: Read the frame on the server-side and start processing it.[main.py]
- First import the Required Libraries.
import numpy as np
import cv2
import socket
import send_data_pi
Next,we define a class named VideoStreaming() .The required functions are defined within it.
- The first step is to Initialize the required variables for the image streaming part within __init__().
- Finally, call the streaming() function to start reading the frames.
class VideoStreaming(object):
def __init__(self, host, port):
self.server_socket = socket.socket()
self.server_socket.bind(('', port))
self.server_socket.listen(0)
self.connection, self.client_address = self.server_socket.accept()
self.connection = self.connection.makefile('rb')
self.host_name = socket.gethostname()
self.host_ip = socket.gethostbyname(self.host_name)
self.streaming()
Next,define Streaming fucntion().First we read the frame and store it in “image“.Further Within this function we call all other functions to process this “image” including checkforred(),canny(),roi(),avg_lines().
def streaming(self):
try:
print("Host: ", self.host_name + ' ' + self.host_ip)
print("Connection from: ", self.client_address)
print("Streaming...")
print("Press 'q' to exit")
# need bytes here
stream_bytes = b' '
while True:
stream_bytes += self.connection.read(1024)
first = stream_bytes.find(b'\xff\xd8')
last = stream_bytes.find(b'\xff\xd9')
if first != -1 and last != -1:
jpg = stream_bytes[first:last + 2]
stream_bytes = stream_bytes[last + 2:]
image = cv2.imdecode(np.frombuffer(jpg, dtype=np.uint8), cv2.IMREAD_COLOR)
At this point we have successfully read the current frame. we can display it using the below code.
cv2.imshow("frame",image)
- Now, let’s Process the image. We first make a copy of the image and store it as lane_image.
- Next check for the red marker, if present send info to pi about the same to stop the car.
lane_image=np.copy(image)
lane_image,red=self.checkforred(lane_image)
if red:
self.sendinfoback(0,0,1)
- Else process further to detect lines in the current frame.
- Perform canny edge detection
- Define the region of interest
- Perform HoughLines detection.
- Further, classify them as left or right line based on its slope.
- Send info about the same to pi
- Finally, add the lines on the frame and display it.
else:
canny=self.canny(lane_image)
roi=self.region_of_interest(canny)
lane=cv2.bitwise_and(canny,roi)
lines=cv2.HoughLinesP(lane,1,np.pi/180,30,np.array([]),minLineLength=20,maxLineGap=5)
self.average_slope_intercept(lines,lane_image)
line_image=self.display_lines(lines,lane_image)
lane_image=cv2.addWeighted(lane_image,1,line_image,1,0)
cv2.imshow('frame',lane_image) #display image
key=cv2.waitKey(1) & 0xFF
if key == ord('q'):
send_data_pi.Tcp_Close()
break
finally:
self.connection.close()
self.server_socket.close()
STEP3: Function to Check for Red Stop Marker and send info to pi.
- Define checkforred() function which is called within the above streaming function.
We check for contours with the HSV range for the red color to detect our marker. If contour size is greater then our threshold value we set the status to 1 and return.
def checkforred(self,image):
font = cv2.FONT_HERSHEY_SIMPLEX
hsv=cv2.cvtColor(image,cv2.COLOR_BGR2HSV)
#Red HSV Range
low_red=np.array([157,56,0])
high_red=np.array([179,255,255])
mask=cv2.inRange(hsv,low_red,high_red)
blur=cv2.GaussianBlur(mask,(15,15),0)
contours,_=cv2.findContours(blur,cv2.RETR_TREE,cv2.CHAIN_APPROX_NONE)
status=0
for contour in contours:
area=cv2.contourArea(contour)
if area>20000:
status=1
cv2.drawContours(image,contour,-1,(0,0,255),3)
cv2.putText(image,'RED STOP',(240,320), font, 2,(0,0,255),2,cv2.LINE_AA)
return (image,status)
- If Red is detected we send this info back to pi using sendinfoback() function. Here we send info as bytes
def sendinfoback(self,l,r,red):
D=b''
D+=bytes([l,r,red]) # leftlines,rightlines,redstatus
print('here inside sendinfo',D)
send_data_pi.Tcp_Write(D)
STEP4: Detect the slope of the lines and count left and right lines.
Canny edge detection:
- First We need to detect the edges using Canny edge detection. Here based on the change in adjacent pixel values in both x and y direction edges are detected. We can set the range using lower and upper threshold values.
Let’s define canny() function
def canny(self,image):
gray=cv2.cvtColor(image,cv2.COLOR_RGB2GRAY)
blur=cv2.GaussianBlur(gray, (7,7), 0)
canny=cv2.Canny(blur,50,150) # lowerThreshold=50 UpperThreshold=150
return canny
Region of interest
Next we define the region of interest,so that we look for lines only at that region and not in the entire frame.
def region_of_interest(self,image):
height=image.shape[0]
width=image.shape[1]
region=np.array([[(100,height),(width-100,height),(width-100,height-120),(100,height-120)]])
mask=np.zeros_like(image)
cv2.fillPoly(mask,region, 255)
return mask
Next we simply perform a bitwise_and operation on the above two results to get this
Next, we look for lines using the HoughsLineP() function by passing the above image result.This returns a list of lines .
Houghs Transform
The Hough Line Transfer in Opencv is cv2.HoughLinesP() which we use to detect straing lines
lines=cv2.HoughLinesP(image,ρ accuracy, θ accuracy,threshold,minLinelength,maxLineGap)
lines=cv2.HoughLinesP(lane,1,np.pi/180,30,np.array([]),minLineLength=20,maxLineGap=5)
- lines: A vector that will store parameters (ρ,θ) of the detected lines.
- ρ: Resolution of parameter ρ in pixels.
- θ: Resolution of parameter θ in radians
- Threshold: minimum number of intersections to detect a line.
Next, we check for the slope of lines by calling the average_slope_intercept() function: And send the number of left and right lines detected to Pi to move the car accordingly.
def average_slope_intercept(self,lines,image):
left_fit=[]
right_fit=[]
if lines is not None:
for line in lines:
x1,y1,x2,y2=line.reshape(4)
parameters=np.polyfit((x1,x2),(y1,y2),1)
slope=parameters[0]
intercept=parameters[1]
if slope<0:
right_fit.append((slope,intercept))
else:
left_fit.append((slope,intercept))
left_fitavg=np.average(left_fit, axis=0)
right_fitavg=np.average(right_fit, axis=0)
print("left slope",left_fitavg,"rigt slope",right_fitavg)
self.sendinfoback(len(left_fit), len(right_fit),red=0) # Send number of left and right lines detected.
Next call display_lines() to join the coordinates of the lines we get from HoughsLineP() function for visual representation.
def display_lines(self,lines,image):
line_image=np.zeros_like(image)
if lines is not None:
for line in lines:
if len(line)>0:
x1,y1,x2,y2=line.reshape(4)
cv2.line(line_image,(x1,y1),(x2,y2),[0,255,0],10)
return line_image
Finally, add this image with the original frame for a better perspective of the detected edge.
STEP5: Send info back to pi about (number of left lines, right lines & red marker status)
First, we write the server part for this(at a separate port address) , which we call inside the avg_slope_intercept() function.
This will send the information about the number of left lines ,right lines and red color marker status present in the current frame.
Code [send_data_pi.py]:
import socket
def Tcp_server_wait ( numofclientwait, port ):
global s2
s2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s2.bind(('',port))
s2.listen(numofclientwait)
def Tcp_server_next ( ):
global s
s = s2.accept()[0]
def Tcp_Write(D):
s.send(D + b'\r')
return
def Tcp_Close( ):
s.close()
s2.close()
return
Tcp_server_wait ( 5, 17098 ) # This ata is being sent using a different port
Tcp_server_next()
We import the above code in main.py and call the below function whenever required to send information to pi.
def sendinfoback(self,l,r,red):
D=b''
D+=bytes([l,r,red]) # leftlines,rightlines,redstatus
print('here inside sendinfo',D)
send_data_pi.Tcp_Write(D)
Here the information is sent as bytes.
STEP6 :Raspberry pi client-side code
In pi within a separate code [main.py]:
- First,we read the information sent from the desktop computer containing information about the left, right and red marker status.
- Also, get the distance from the attached ultrasonic sensor to pi.
- Based on this information make the car move.
First import the libraries. The code for each one is written below[main.py]
from time import sleep
import CarMove
import ActionClientRead
from ultrasonic import UltraSonic
US=UltraSonic()
m1=CarMove.move()
Let's us now first look at the code for each of these imported codes.
CarMove.py :
This code contains information to manipulate the car movement.
Connections:
Working:
We basically make the car wheels always move forward and by only changing the PWM values of left and right enable pins we control left, right, and stop motion as it changes the speed of the motor.
Import libraries
import RPi.GPIO as GPIO
from time import sleep
Define the GPIO pins to which connections are made
#Connections from Motor Driver to Pi GPIO
Rin1 = 21
Rin2 = 20
Ren = 12 # Right Enable
Lin1 = 13
Lin2 = 19
Len = 26 #left Enable
set the pins as input or output type and initialize the enable pins
initialvaluespeed=30 # This should be between 0 to 100
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
#Initialization for right motor
GPIO.setup(Rin1,GPIO.OUT)
GPIO.setup(Rin2,GPIO.OUT)
GPIO.setup(Ren,GPIO.OUT)
GPIO.output(Rin1,GPIO.LOW)
GPIO.output(Rin2,GPIO.LOW)
Rp=GPIO.PWM(Ren,1000)
Rp.start(initialvaluespeed)
#Initialization for left motor
GPIO.setup(Lin1,GPIO.OUT)
GPIO.setup(Lin2,GPIO.OUT)
GPIO.setup(Len,GPIO.OUT)
GPIO.output(Lin1,GPIO.LOW)
GPIO.output(Lin2,GPIO.LOW)
Lp=GPIO.PWM(Len,1000)
Lp.start(initialvaluespeed)
Finally create a class move() for different functions of movement
class move():
def __init__(self):
print("starting")
def Rspeed(self,val):
Rp.ChangeDutyCycle(initialvaluespeed+val)
def Lspeed(self,val):
Lp.ChangeDutyCycle(initialvaluespeed+val)
def forward(self):
GPIO.output(Rin2,GPIO.LOW)
GPIO.output(Rin1,GPIO.HIGH)
GPIO.output(Lin1,GPIO.LOW)
GPIO.output(Lin2,GPIO.HIGH)
def escape():
GPIO.cleanup()
ActionClientRead.py :
This part reads the bytes sent at port 17098 to get information regarding left,right,red marker status.
import socket, time
def Tcp_connect( HostIp, Port ):
global s
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HostIp, Port))
return
def Tcp_Read():
a = ''
b = b''
while a != b'\r':
a = s.recv(1)
b = b + a
return b
def Tcp_Close():
s.close()
return
Tcp_connect( '192.168.31.7', 17098) # Replace with Your Server IP,port=17098
Ultrasonic.py:
Connections:
It contains a function Distance() within class UltraSonic which returns the current distance in front of the ultrasonic sensor. This information can be used to stop the car when some obstacle (any person, another car in traffic) suddenly comes in front of the Car.
First import libraries and initialize pins
import RPi.GPIO as GPIO
import time
GPIO.setmode(GPIO.BCM)
GPIO_TRIGGER = 4
GPIO_ECHO = 17
GPIO.setwarnings(False)
GPIO.setup(GPIO_TRIGGER, GPIO.OUT)
GPIO.setup(GPIO_ECHO, GPIO.IN)
GPIO.output(GPIO_TRIGGER, False)
print ("Waiting For Sensor To Settle")
time.sleep(2)
Next, create a class ultraSonic():
class UltraSonic():
def __init__ (self):
print("started")
def Distance(self):
#trigger the ultrasonic sensor for a very short period (10us).
GPIO.output(GPIO_TRIGGER, True)
time.sleep(0.00001)
GPIO.output(GPIO_TRIGGER, False)
while GPIO.input(GPIO_ECHO) == 0:
pass
StartTime = time.time() #start timer once the pulse is sent completely and echo becomes high or 1
while GPIO.input(GPIO_ECHO) == 1:
pass
StopTime = time.time() #stop the timer once the signal is completely received and echo again becomes 0
TimeElapsed = StopTime - StartTime # This records the time duration for which echo pin was high
speed=34300 #speed of sound in air 343 m/s or 34300cm/s
twicedistance = (TimeElapsed * speed) #as time elapsed accounts for amount of time it takes for the pulse to go and come back
distance=twicedistance/2 # to get actual distance simply divide it by 2
time.sleep(.01)
return round(distance,2) # round off upto 2 decimal points
Now lets continue our code … [main.py]
So, in the above lines, we have imported the libraries. Next, within an infinite loop: First get all the information(number o left lines, no. of right lines, red status, ultrasonic distance) and then move the car accordingly:
dat=[0,0,0,0,0] # initialize array to store info
while 1:
c=0
D=ActionClientRead.Tcp_Read() # read the data
for b in D:
dat[c]=b
c+=1
left=dat[0] # number of left lines detected
right=dat[1] # number of right lines detected
red=dat[2] # Indicate whether red Color Marker present (1) or Not (0)
dis=US.Distance() #Get current Distance from US sensor
print("distance=",dis)
print("left=",left)
print("right=",right)
speedR,speedL=0,0 # Default forward condition
if red or dis<15 : # Stop the car if condition is true
speedR=-1*CarMove.initialvaluespeed
speedL=-1*CarMove.initialvaluespeed
elif(left>right): # if left lines are more ==> move left by stopping the left wheel and increasing right wheel speed
speedR=10
speedL=-1*CarMove.initialvaluespeed
elif(right>left): # if right is more==> move right by stopping the right wheel and increasing left wheel speed
speedL=10
speedR=-1*CarMove.initialvaluespeed
m1.Rspeed(speedR)
m1.Lspeed(speedL)
m1.forward()