Files
2024-08-27 14:42:45 +08:00

321 lines
11 KiB
Python

# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import math
class VehicleRetrogradeRecognizer(object):
def __init__(self, cfg):
self.cfg = cfg
self.filter_horizontal_flag = self.cfg['filter_horizontal_flag']
self.deviation = self.cfg['deviation']
self.move_scale = self.cfg['move_scale']
self.keep_right_flag = self.cfg['keep_right_flag']
self.center_traj_retrograde = [{}] #retrograde recognizer record use
self.fence_line = None if len(self.cfg[
'fence_line']) == 0 else self.cfg['fence_line']
def update_center_traj(self, mot_res, max_len):
from collections import deque, defaultdict
if mot_res is not None:
ids = mot_res['boxes'][:, 0]
scores = mot_res['boxes'][:, 2]
boxes = mot_res['boxes'][:, 3:]
boxes[:, 2] = boxes[:, 2] - boxes[:, 0]
boxes[:, 3] = boxes[:, 3] - boxes[:, 1]
else:
boxes = np.zeros([0, 4])
ids = np.zeros([0])
scores = np.zeros([0])
# single class, still need to be defaultdict type for ploting
num_classes = 1
online_tlwhs = defaultdict(list)
online_scores = defaultdict(list)
online_ids = defaultdict(list)
online_tlwhs[0] = boxes
online_ids[0] = ids
if mot_res is not None:
for cls_id in range(num_classes):
tlwhs = online_tlwhs[cls_id]
obj_ids = online_ids[cls_id]
for i, tlwh in enumerate(tlwhs):
x1, y1, w, h = tlwh
center = tuple(map(int, (x1 + w / 2., y1 + h)))
obj_id = int(obj_ids[i])
if self.center_traj_retrograde is not None:
if obj_id not in self.center_traj_retrograde[cls_id]:
self.center_traj_retrograde[cls_id][obj_id] = deque(
maxlen=max_len)
self.center_traj_retrograde[cls_id][obj_id].append(
center)
def get_angle(self, array):
x1, y1, x2, y2 = array
a_x = x2 - x1
a_y = y2 - y1
angle1 = math.atan2(a_y, a_x)
angle1 = int(angle1 * 180 / math.pi)
a_x = x2 - x1 if y2 >= y1 else x1 - x2
a_y = y2 - y1 if y2 >= y1 else y1 - y2
angle2 = math.atan2(a_y, a_x)
angle2 = int(angle2 * 180 / math.pi)
if angle2 > 90:
angle2 = 180 - angle2
return angle1, angle2
def is_move(self, array, frame_shape):
x1, y1, x2, y2 = array
h, w, _ = frame_shape
if abs(x1 - x2) > w * self.move_scale or abs(y1 -
y2) > h * self.move_scale:
return True
else:
return False
def get_distance_point2line(self, point, line):
line_point1, line_point2 = np.array(line[0:2]), np.array(line[2:])
vec1 = line_point1 - point
vec2 = line_point2 - point
distance = np.abs(np.cross(vec1, vec2)) / np.linalg.norm(line_point1 -
line_point2)
return distance
def driving_direction(self, line1, line2, is_init=False):
x1, y1 = line1[2] - line1[0], line1[3] - line1[1]
x2, y2 = line2[0] - line1[0], line2[1] - line1[1]
result = x1 * y2 - x2 * y1
distance = self.get_distance_point2line([x2, y2], line1)
if result < 0:
result = 1
elif result == 0:
if line2[3] >= line2[1]:
return -1
else:
return 1
else:
result = -1
return result, distance
def get_long_fence_line(self, h, w, line):
x1, y1, x2, y2 = line
if x1 == x2:
return [x1, 0, x1, h]
if y1 == y2:
return [0, y1, w, y1]
k = (y2 - y1) / (x2 - x1)
b = y1 - k * x1
if k == 1 and b == 0:
return [0, 0, w, h]
if k == -1 and b == 0:
return [w, 0, h, h]
top = [-b / k, 0]
left = [0, b]
right = [w, k * w + b]
bottom = [(h - b) / k, h]
candidate = np.array([top, left, right, bottom])
flag = np.array([0, 0, 0, 0])
if top[0] >= 0 and top[0] <= w:
flag[0] = 1
if left[1] > 0 and left[1] <= h:
flag[1] = 1
if right[1] > 0 and right[1] <= h:
flag[2] = 1
if bottom[0] > 0 and bottom[0] < w:
flag[3] = 1
ind = np.where(flag == 1)
candidate = candidate[ind]
candidate_sort = candidate[candidate[:, 1].argsort()]
return [
int(candidate_sort[0][0]), int(candidate_sort[0][1]),
int(candidate_sort[1][0]), int(candidate_sort[1][1])
]
def init_fence_line(self, lanes, pos_dir_traj, neg_dir_traj, frame_shape):
fence_lines_candidate = None
h, w, _ = frame_shape
abs_distance = h * h + w * w
for lane in lanes[0]:
pos_dir_distansce = h * h + w * w
neg_dir_distansce = h * h + w * w
pos_dir = 0
neg_dir = 0
for traj_line in pos_dir_traj:
dir_result, distansce = self.driving_direction(
lane, traj_line['traj_line'])
if dir_result > 0:
pos_dir_distansce = distansce if distansce < pos_dir_distansce else pos_dir_distansce
pos_dir = 1
else:
neg_dir_distansce = distansce if distansce < neg_dir_distansce else neg_dir_distansce
neg_dir = 1
if pos_dir > 0 and neg_dir > 0:
continue
for traj_line in neg_dir_traj:
dir_result, distansce = self.driving_direction(
lane, traj_line['traj_line'])
if dir_result > 0:
pos_dir_distansce = distansce if distansce < pos_dir_distansce else pos_dir_distansce
pos_dir = 1
else:
neg_dir_distansce = distansce if distansce < neg_dir_distansce else neg_dir_distansce
neg_dir = 1
if pos_dir > 0 and neg_dir > 0:
diff_dir_distance = abs(pos_dir_distansce - neg_dir_distansce)
if diff_dir_distance < abs_distance:
fence_lines_candidate = lane
abs_distance = diff_dir_distance
if fence_lines_candidate is None:
return None
fence_lines_candidate = self.get_long_fence_line(h, w,
fence_lines_candidate)
return fence_lines_candidate
def judge_retrograde(self, traj_line):
line1 = self.fence_line
x1, y1 = line1[2] - line1[0], line1[3] - line1[1]
line2 = traj_line['traj_line']
x2_start_point, y2_start_point = line2[0] - line1[0], line2[1] - line1[
1]
x2_end_point, y2_end_point = line2[2] - line1[0], line2[3] - line1[1]
start_point_dir = x1 * y2_start_point - x2_start_point * y1
end_point_dir = x1 * y2_end_point - x2_end_point * y1
if start_point_dir < 0:
start_point_dir = 1
elif start_point_dir == 0:
if line2[3] >= line2[1]:
start_point_dir = -1
else:
start_point_dir = 1
else:
start_point_dir = -1
if end_point_dir < 0:
end_point_dir = 1
elif end_point_dir == 0:
if line2[3] >= line2[1]:
end_point_dir = -1
else:
end_point_dir = 1
else:
end_point_dir = -1
if self.keep_right_flag:
driver_dir = -1 if (line2[3] - line2[1]) >= 0 else 1
else:
driver_dir = -1 if (line2[3] - line2[1]) <= 0 else 1
return start_point_dir == driver_dir and start_point_dir == end_point_dir
def mot_run(self, lanes_res, det_res, frame_shape):
det = det_res['boxes']
directions = lanes_res['directions']
lanes = lanes_res['output']
if len(directions) > 0:
direction = directions[0]
else:
return [], self.fence_line
if len(det) == 0:
return [], self.fence_line
traj_lines = []
pos_dir_traj = []
neg_dir_traj = []
for i in range(len(det)):
class_id = int(det[i][1])
mot_id = int(det[i][0])
traj_i = self.center_traj_retrograde[class_id][mot_id]
if len(traj_i) < 2:
continue
traj_line = {
'index': i,
'mot_id': mot_id,
'traj_line':
[traj_i[0][0], traj_i[0][1], traj_i[-1][0], traj_i[-1][1]]
}
if not self.is_move(traj_line['traj_line'], frame_shape):
continue
angle, angle_deviation = self.get_angle(traj_line['traj_line'])
if direction is not None and self.filter_horizontal_flag:
if abs(angle_deviation - direction) > self.deviation:
continue
traj_line['angle'] = angle
traj_lines.append(traj_line)
if self.fence_line is None:
if angle >= 0:
pos_dir_traj.append(traj_line)
else:
neg_dir_traj.append(traj_line)
if len(traj_lines) == 0:
return [], self.fence_line
if self.fence_line is None:
if len(pos_dir_traj) < 1 or len(neg_dir_traj) < 1:
return [], None
self.fence_line = self.init_fence_line(lanes, pos_dir_traj,
neg_dir_traj, frame_shape)
return [], self.fence_line
else:
retrograde_list = []
for traj_line in traj_lines:
if self.judge_retrograde(traj_line) == False:
retrograde_list.append(det[traj_line['index']][0])
return retrograde_list, self.fence_line