Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.zip filter=lfs diff=lfs merge=lfs -text
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ __pycache__/
# IDE files
.idea
.vs_code/

data/
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "perception/third_party/sort"]
path = perception/third_party/sort
url = https://github.com/lawchekun/sort.git
34 changes: 31 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,43 @@ Misc code, camera calibration etc.
Code for specific tasks like

1. cross: cross detection
1. segmentation:
1. path_marker: path_marker detection
1. spinny_wheel_detection
2. segmentation:
3. path_marker: path_marker detection
4. spinny_wheel_detection

etc

In order to create your own algorithm to test:

1. Create <your_algo>.py and put it in one of the specific task folders in perception/tasks.

2. Create a class which extends the TaskPerceiver class. perception/tasks/TaskPerceiver.py includes a template with documentation for how to do this.

## vis:
Visualization tools
Code for testing tasks (Ideally this should be placed a separate folder called `tests`).

After writing the code for your specific task algorithm, you can do one of two things:

1. Add this to the end of <your algorithm>.py file:

if __name__ == '__main__':
from perception.vis.vis import run
run(<list of file/directory names>, <new instance of your class>, <save your video?>)
and then run

python <your algorithm>.py
2. Add this to the perception/__init__.py file:

import <path to your module>

ALGOS = {
'custom_name': <your module>.<your class reference>
}
and then run

python vis.py --algorithm custom_name [--data <path to file/directory>] [--profile <function name>] [--save_video]
The **algorithm** parameter is required. If **data** isn't specified, it'll default to your webcam. If **profile** isn't specified, it will be off by default. Add the **save_video** tag if you want to save your vis test as an mp4 file.

## wiki:
Flowchart on TaskPerceiver, TaskReceiver, AlgorithmRunner.
14 changes: 14 additions & 0 deletions perception/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import perception.vis.TestAlgo as TestAlgo
import perception.tasks.gate.GateCenterAlgo as GateSeg
import perception.tasks.gate.GateSegmentationAlgoA as GateSegA
import perception.tasks.gate.GateSegmentationAlgoB as GateSegB
import perception.tasks.gate.GateSegmentationAlgoC as GateSegC
# import perception.tasks as tasks

ALGOS = {
'test': TestAlgo.TestAlgo,
'gateseg': GateSeg.GateCenterAlgo,
'gatesegA': GateSegA.GateSegmentationAlgoA,
'gatesegB': GateSegB.GateSegmentationAlgoB,
'gatesegC': GateSegC.GateSegmentationAlgoC
}
30 changes: 20 additions & 10 deletions perception/tasks/TaskPerceiver.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
from typing import Any
from typing import Any, Dict
import numpy as np
class TaskPerceiver:

def __init__(self):
self.time = 0

def analyze(self, frame: np.ndarray, debug: bool) -> Any:
class TaskPerceiver:
def __init__(self, **kwargs):
"""Initializes the TaskPerceiver.
Args:
kwargs: Each keyworded argument is of the form
var_name = (range, default_val), where range is the range of values
for the slider which controls this variable, and default_val is
the initial value of the slider.
"""
self.kwargs = kwargs

def analyze(self, frame: np.ndarray, debug: bool, slider_vals: Dict[str, int]) -> Any:
"""Runs the algorithm and returns the result.
Args:
frame: The frame to analyze
frame: The frame to analyze
debug: Whether or not to display intermediate images for debugging

Returns:
the result of the algorithm
slider_vals: A list of names of the variables which the user should be
able to control from the Visualizer, mapped to current slider
value for that variable
Returns:
the result of the algorithm
debug frames must each be same size as original input frame. Might change this in the future.
"""
raise NotImplementedError("Need to implement with child class.")

Empty file added perception/tasks/__init__.py
Empty file.
9 changes: 0 additions & 9 deletions perception/tasks/cross/CrossPerceiver.py

This file was deleted.

Empty file.
29 changes: 12 additions & 17 deletions perception/tasks/cross/cross_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
#############################################################################

sys.path.insert(0, '../background_removal')
from peak_removal_adaptive_thresholding import filter_out_highest_peak_multidim
from combined_filter import combined_filter
from perception.tasks.segmentation.peak_removal_adaptive_thresholding import filter_out_highest_peak_multidim
from perception.tasks.segmentation.combinedFilter import init_combined_filter

ret, frame = True, cv2.imread('../data/cross/cross.png') # https://i.imgur.com/rjv1Vcy.png
ret, frame = True, cv2.imread('../data/cross/cross.png') # https://i.imgur.com/rjv1Vcy.png

# "hsv" = Apply hsv thresholding before trying to find the path marker
# "multidim" = Apply filter_out_highest_peak_multidim
Expand All @@ -29,7 +29,7 @@ def find_cross(frame, draw_figs=True):
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

ret, thresh = cv2.threshold(gray, 127, 255,0)
__, contours,hierarchy = cv2.findContours(thresh,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE)
__, contours,hierarchy = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
contours.sort(key=lambda c: cv2.contourArea(c), reverse=True)

possible_crosses = []
Expand All @@ -44,17 +44,16 @@ def find_cross(frame, draw_figs=True):
if defects is not None and len(defects) == 4:
possible_crosses.append(defects)


if draw_figs:
img = frame.copy()
for defects in possible_crosses:
for i in range(defects.shape[0]):
s,e,f,d = defects[i,0]
s, e, f, d = defects[i, 0]
# start = tuple(cnt[s][0])
# end = tuple(cnt[e][0])
far = tuple(cnt[f][0])
# cv2.line(img,start,end,[0,255,0],2)
cv2.circle(img,far,5,[0,0,255],-1)
cv2.circle(img, far, 5, [0, 0, 255], -1)
cv2.imshow('cross at contour number ' + str(i),img)
cv2.imshow('original', frame)

Expand All @@ -64,15 +63,15 @@ def find_cross(frame, draw_figs=True):
###########################################
# Main Body
###########################################

# TODO: port to vis
if __name__ == "__main__":
combined_filter = init_combined_filter()

ret_tries = 0
while(1 and ret_tries < 50):
while 1 and ret_tries < 50:
# ret,frame = cap.read()

if ret == True:
if ret:
# frame = cv2.resize(frame, (0,0), fx=0.5, fy=0.5)

if thresholding == "multidim":
votes1, threshed = filter_out_highest_peak_multidim(frame)
threshed = cv2.morphologyEx(threshed, cv2.MORPH_OPEN, np.ones((5,5),np.uint8))
Expand All @@ -86,13 +85,9 @@ def find_cross(frame, draw_figs=True):

ret_tries = 0
k = cv2.waitKey(60) & 0xff
if k == 27: # esc
if testing:
print("hsv thresholds:")
print(thresholds_used)
if k == 27: # esc
break
else:
ret_tries += 1

cv2.destroyAllWindows()
cap.release()
99 changes: 99 additions & 0 deletions perception/tasks/gate/GateCenterAlgo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from perception.tasks.gate.GateSegmentationAlgoA import GateSegmentationAlgoA
from perception.tasks.TaskPerceiver import TaskPerceiver
from collections import namedtuple

import numpy as np
import math
import cv2 as cv
import statistics


class GateCenterAlgo(TaskPerceiver):
center_x_locs, center_y_locs = [], []
output_class = namedtuple("GateOutput", ["centerx", "centery"])
output_type = {'centerx': np.int16, 'centery': np.int16}

def __init__(self):
super().__init__(optical_flow_c=((0, 100), 10))
self.gate_center = self.output_class(250, 250)
self.use_optical_flow = False
self.optical_flow_c = 0.1
self.gate = GateSegmentationAlgoA()
self.prvs = None

# TODO: do input and return typing
def analyze(self, frame, debug, slider_vals):
self.optical_flow_c = slider_vals['optical_flow_c']/100
rect, debug_filters = self.gate.analyze(frame, True)
debug_filter = debug_filters[-1]
debug_filters = debug_filters[:-1]

if self.prvs is None:
# frame = cv.resize(frame, None, fx=0.3, fy=0.3)
self.prvs = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
else:
if rect[0] and rect[1]:
self.gate_center = self.get_center(rect[0], rect[1], frame)
if self.use_optical_flow:
cv.circle(debug_filter, self.gate_center, 5, (3,186,252), -1)
else:
cv.circle(debug_filter, self.gate_center, 5, (0,0,255), -1)

if debug:
return (self.gate_center[0], self.gate_center[1]), list(debug_filters) + [debug_filter]
return (self.gate_center[0], self.gate_center[1])

def center_without_optical_flow(self, center_x, center_y):
# get starting center location, averaging over the first 2510 frames
if len(self.center_x_locs) == 0:
self.center_x_locs.append(center_x)
self.center_y_locs.append(center_y)

elif len(self.center_x_locs) < 25:
self.center_x_locs.append(center_x)
self.center_y_locs.append(center_y)
center_x = int(statistics.mean(self.center_x_locs))
center_y = int(statistics.mean(self.center_y_locs))

# use new center location only when it is close to the previous valid location
else:
self.center_x_locs.append(center_x)
self.center_y_locs.append(center_y)
self.center_x_locs.pop(0)
self.center_y_locs.pop(0)
x_temp_avg = int(statistics.mean(self.center_x_locs))
y_temp_avg = int(statistics.mean(self.center_y_locs))
if math.sqrt((center_x - x_temp_avg)**2 + (center_y - y_temp_avg)**2) > 10:
center_x, center_y = int(x_temp_avg), int(y_temp_avg)

return (center_x, center_y)

def dense_optical_flow(self, frame):
next_frame = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
flow = cv.calcOpticalFlowFarneback(self.prvs, next_frame, None, 0.5, 3, 15, 3, 5, 1.2, 0)
mag, ang = cv.cartToPolar(flow[..., 0], flow[..., 1])
mag = cv.normalize(mag, None, 0, 255, cv.NORM_MINMAX)
# hsv[...,0] = ang*180/np.pi
# hsv[...,2] = mag
# bgr = cv.cvtColor(hsv,cv.COLOR_HSV2BGR)
# cv.imshow('bgr', bgr)
return next_frame, mag, ang

def get_center(self, rect1, rect2, frame):
x1, y1, w1, h1 = rect1
x2, y2, w2, h2 = rect2
center_x, center_y = (x1 + x2) // 2, ((y1 + h1 // 2) + (y2 + h2 // 2)) // 2
self.prvs, mag, ang = self.dense_optical_flow(frame)

if len(self.center_x_locs) < 25 or (np.mean(mag) < 40 and ((not self.use_optical_flow ) or \
(self.use_optical_flow and (center_x - self.gate_center[0])**2 + (center_y - self.gate_center[1])**2 < 50))):
self.use_optical_flow = False
return self.center_without_optical_flow(center_x, center_y)
self.use_optical_flow = True
return (int(self.gate_center[0] + self.optical_flow_c * np.mean(mag * np.cos(ang))), \
(int(self.gate_center[1] + self.optical_flow_c * np.mean(mag * np.sin(ang)))))


if __name__ == '__main__':
from perception.vis.vis import run
run(['..\..\..\data\GOPR1142.MP4'], GateCenterAlgo(), False)
9 changes: 0 additions & 9 deletions perception/tasks/gate/GatePerceiver.py

This file was deleted.

Loading