更换文档检测模型
This commit is contained in:
320
paddle_detection/deploy/pipeline/ppvehicle/vehicle_retrograde.py
Normal file
320
paddle_detection/deploy/pipeline/ppvehicle/vehicle_retrograde.py
Normal file
@@ -0,0 +1,320 @@
|
||||
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import numpy as np
|
||||
import math
|
||||
|
||||
|
||||
class VehicleRetrogradeRecognizer(object):
|
||||
def __init__(self, cfg):
|
||||
self.cfg = cfg
|
||||
self.filter_horizontal_flag = self.cfg['filter_horizontal_flag']
|
||||
self.deviation = self.cfg['deviation']
|
||||
self.move_scale = self.cfg['move_scale']
|
||||
self.keep_right_flag = self.cfg['keep_right_flag']
|
||||
self.center_traj_retrograde = [{}] #retrograde recognizer record use
|
||||
self.fence_line = None if len(self.cfg[
|
||||
'fence_line']) == 0 else self.cfg['fence_line']
|
||||
|
||||
def update_center_traj(self, mot_res, max_len):
|
||||
from collections import deque, defaultdict
|
||||
if mot_res is not None:
|
||||
ids = mot_res['boxes'][:, 0]
|
||||
scores = mot_res['boxes'][:, 2]
|
||||
boxes = mot_res['boxes'][:, 3:]
|
||||
boxes[:, 2] = boxes[:, 2] - boxes[:, 0]
|
||||
boxes[:, 3] = boxes[:, 3] - boxes[:, 1]
|
||||
else:
|
||||
boxes = np.zeros([0, 4])
|
||||
ids = np.zeros([0])
|
||||
scores = np.zeros([0])
|
||||
|
||||
# single class, still need to be defaultdict type for ploting
|
||||
num_classes = 1
|
||||
online_tlwhs = defaultdict(list)
|
||||
online_scores = defaultdict(list)
|
||||
online_ids = defaultdict(list)
|
||||
online_tlwhs[0] = boxes
|
||||
online_ids[0] = ids
|
||||
|
||||
if mot_res is not None:
|
||||
for cls_id in range(num_classes):
|
||||
tlwhs = online_tlwhs[cls_id]
|
||||
obj_ids = online_ids[cls_id]
|
||||
for i, tlwh in enumerate(tlwhs):
|
||||
x1, y1, w, h = tlwh
|
||||
center = tuple(map(int, (x1 + w / 2., y1 + h)))
|
||||
obj_id = int(obj_ids[i])
|
||||
if self.center_traj_retrograde is not None:
|
||||
if obj_id not in self.center_traj_retrograde[cls_id]:
|
||||
self.center_traj_retrograde[cls_id][obj_id] = deque(
|
||||
maxlen=max_len)
|
||||
self.center_traj_retrograde[cls_id][obj_id].append(
|
||||
center)
|
||||
|
||||
def get_angle(self, array):
|
||||
|
||||
x1, y1, x2, y2 = array
|
||||
a_x = x2 - x1
|
||||
a_y = y2 - y1
|
||||
angle1 = math.atan2(a_y, a_x)
|
||||
angle1 = int(angle1 * 180 / math.pi)
|
||||
|
||||
a_x = x2 - x1 if y2 >= y1 else x1 - x2
|
||||
a_y = y2 - y1 if y2 >= y1 else y1 - y2
|
||||
angle2 = math.atan2(a_y, a_x)
|
||||
angle2 = int(angle2 * 180 / math.pi)
|
||||
if angle2 > 90:
|
||||
angle2 = 180 - angle2
|
||||
|
||||
return angle1, angle2
|
||||
|
||||
def is_move(self, array, frame_shape):
|
||||
x1, y1, x2, y2 = array
|
||||
h, w, _ = frame_shape
|
||||
|
||||
if abs(x1 - x2) > w * self.move_scale or abs(y1 -
|
||||
y2) > h * self.move_scale:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_distance_point2line(self, point, line):
|
||||
|
||||
line_point1, line_point2 = np.array(line[0:2]), np.array(line[2:])
|
||||
vec1 = line_point1 - point
|
||||
vec2 = line_point2 - point
|
||||
distance = np.abs(np.cross(vec1, vec2)) / np.linalg.norm(line_point1 -
|
||||
line_point2)
|
||||
|
||||
return distance
|
||||
|
||||
def driving_direction(self, line1, line2, is_init=False):
|
||||
x1, y1 = line1[2] - line1[0], line1[3] - line1[1]
|
||||
x2, y2 = line2[0] - line1[0], line2[1] - line1[1]
|
||||
result = x1 * y2 - x2 * y1
|
||||
|
||||
distance = self.get_distance_point2line([x2, y2], line1)
|
||||
|
||||
if result < 0:
|
||||
result = 1
|
||||
elif result == 0:
|
||||
if line2[3] >= line2[1]:
|
||||
return -1
|
||||
else:
|
||||
return 1
|
||||
else:
|
||||
result = -1
|
||||
|
||||
return result, distance
|
||||
|
||||
def get_long_fence_line(self, h, w, line):
|
||||
|
||||
x1, y1, x2, y2 = line
|
||||
if x1 == x2:
|
||||
return [x1, 0, x1, h]
|
||||
if y1 == y2:
|
||||
return [0, y1, w, y1]
|
||||
k = (y2 - y1) / (x2 - x1)
|
||||
b = y1 - k * x1
|
||||
|
||||
if k == 1 and b == 0:
|
||||
return [0, 0, w, h]
|
||||
if k == -1 and b == 0:
|
||||
return [w, 0, h, h]
|
||||
|
||||
top = [-b / k, 0]
|
||||
left = [0, b]
|
||||
right = [w, k * w + b]
|
||||
bottom = [(h - b) / k, h]
|
||||
candidate = np.array([top, left, right, bottom])
|
||||
|
||||
flag = np.array([0, 0, 0, 0])
|
||||
|
||||
if top[0] >= 0 and top[0] <= w:
|
||||
flag[0] = 1
|
||||
if left[1] > 0 and left[1] <= h:
|
||||
flag[1] = 1
|
||||
if right[1] > 0 and right[1] <= h:
|
||||
flag[2] = 1
|
||||
if bottom[0] > 0 and bottom[0] < w:
|
||||
flag[3] = 1
|
||||
|
||||
ind = np.where(flag == 1)
|
||||
candidate = candidate[ind]
|
||||
candidate_sort = candidate[candidate[:, 1].argsort()]
|
||||
|
||||
return [
|
||||
int(candidate_sort[0][0]), int(candidate_sort[0][1]),
|
||||
int(candidate_sort[1][0]), int(candidate_sort[1][1])
|
||||
]
|
||||
|
||||
def init_fence_line(self, lanes, pos_dir_traj, neg_dir_traj, frame_shape):
|
||||
|
||||
fence_lines_candidate = None
|
||||
h, w, _ = frame_shape
|
||||
abs_distance = h * h + w * w
|
||||
|
||||
for lane in lanes[0]:
|
||||
pos_dir_distansce = h * h + w * w
|
||||
neg_dir_distansce = h * h + w * w
|
||||
pos_dir = 0
|
||||
neg_dir = 0
|
||||
|
||||
for traj_line in pos_dir_traj:
|
||||
dir_result, distansce = self.driving_direction(
|
||||
lane, traj_line['traj_line'])
|
||||
if dir_result > 0:
|
||||
pos_dir_distansce = distansce if distansce < pos_dir_distansce else pos_dir_distansce
|
||||
pos_dir = 1
|
||||
else:
|
||||
neg_dir_distansce = distansce if distansce < neg_dir_distansce else neg_dir_distansce
|
||||
neg_dir = 1
|
||||
|
||||
if pos_dir > 0 and neg_dir > 0:
|
||||
continue
|
||||
|
||||
for traj_line in neg_dir_traj:
|
||||
|
||||
dir_result, distansce = self.driving_direction(
|
||||
lane, traj_line['traj_line'])
|
||||
|
||||
if dir_result > 0:
|
||||
pos_dir_distansce = distansce if distansce < pos_dir_distansce else pos_dir_distansce
|
||||
pos_dir = 1
|
||||
else:
|
||||
neg_dir_distansce = distansce if distansce < neg_dir_distansce else neg_dir_distansce
|
||||
neg_dir = 1
|
||||
|
||||
if pos_dir > 0 and neg_dir > 0:
|
||||
diff_dir_distance = abs(pos_dir_distansce - neg_dir_distansce)
|
||||
if diff_dir_distance < abs_distance:
|
||||
fence_lines_candidate = lane
|
||||
abs_distance = diff_dir_distance
|
||||
|
||||
if fence_lines_candidate is None:
|
||||
return None
|
||||
|
||||
fence_lines_candidate = self.get_long_fence_line(h, w,
|
||||
fence_lines_candidate)
|
||||
|
||||
return fence_lines_candidate
|
||||
|
||||
def judge_retrograde(self, traj_line):
|
||||
|
||||
line1 = self.fence_line
|
||||
x1, y1 = line1[2] - line1[0], line1[3] - line1[1]
|
||||
|
||||
line2 = traj_line['traj_line']
|
||||
x2_start_point, y2_start_point = line2[0] - line1[0], line2[1] - line1[
|
||||
1]
|
||||
x2_end_point, y2_end_point = line2[2] - line1[0], line2[3] - line1[1]
|
||||
|
||||
start_point_dir = x1 * y2_start_point - x2_start_point * y1
|
||||
end_point_dir = x1 * y2_end_point - x2_end_point * y1
|
||||
|
||||
if start_point_dir < 0:
|
||||
start_point_dir = 1
|
||||
|
||||
elif start_point_dir == 0:
|
||||
if line2[3] >= line2[1]:
|
||||
start_point_dir = -1
|
||||
else:
|
||||
start_point_dir = 1
|
||||
else:
|
||||
start_point_dir = -1
|
||||
|
||||
if end_point_dir < 0:
|
||||
end_point_dir = 1
|
||||
|
||||
elif end_point_dir == 0:
|
||||
if line2[3] >= line2[1]:
|
||||
end_point_dir = -1
|
||||
else:
|
||||
end_point_dir = 1
|
||||
else:
|
||||
end_point_dir = -1
|
||||
|
||||
if self.keep_right_flag:
|
||||
driver_dir = -1 if (line2[3] - line2[1]) >= 0 else 1
|
||||
else:
|
||||
driver_dir = -1 if (line2[3] - line2[1]) <= 0 else 1
|
||||
|
||||
return start_point_dir == driver_dir and start_point_dir == end_point_dir
|
||||
|
||||
def mot_run(self, lanes_res, det_res, frame_shape):
|
||||
|
||||
det = det_res['boxes']
|
||||
directions = lanes_res['directions']
|
||||
lanes = lanes_res['output']
|
||||
if len(directions) > 0:
|
||||
direction = directions[0]
|
||||
else:
|
||||
return [], self.fence_line
|
||||
|
||||
if len(det) == 0:
|
||||
return [], self.fence_line
|
||||
|
||||
traj_lines = []
|
||||
pos_dir_traj = []
|
||||
neg_dir_traj = []
|
||||
for i in range(len(det)):
|
||||
class_id = int(det[i][1])
|
||||
mot_id = int(det[i][0])
|
||||
traj_i = self.center_traj_retrograde[class_id][mot_id]
|
||||
if len(traj_i) < 2:
|
||||
continue
|
||||
|
||||
traj_line = {
|
||||
'index': i,
|
||||
'mot_id': mot_id,
|
||||
'traj_line':
|
||||
[traj_i[0][0], traj_i[0][1], traj_i[-1][0], traj_i[-1][1]]
|
||||
}
|
||||
|
||||
if not self.is_move(traj_line['traj_line'], frame_shape):
|
||||
continue
|
||||
angle, angle_deviation = self.get_angle(traj_line['traj_line'])
|
||||
if direction is not None and self.filter_horizontal_flag:
|
||||
if abs(angle_deviation - direction) > self.deviation:
|
||||
continue
|
||||
|
||||
traj_line['angle'] = angle
|
||||
traj_lines.append(traj_line)
|
||||
|
||||
if self.fence_line is None:
|
||||
if angle >= 0:
|
||||
pos_dir_traj.append(traj_line)
|
||||
else:
|
||||
neg_dir_traj.append(traj_line)
|
||||
|
||||
if len(traj_lines) == 0:
|
||||
return [], self.fence_line
|
||||
|
||||
if self.fence_line is None:
|
||||
|
||||
if len(pos_dir_traj) < 1 or len(neg_dir_traj) < 1:
|
||||
return [], None
|
||||
|
||||
self.fence_line = self.init_fence_line(lanes, pos_dir_traj,
|
||||
neg_dir_traj, frame_shape)
|
||||
return [], self.fence_line
|
||||
|
||||
else:
|
||||
retrograde_list = []
|
||||
for traj_line in traj_lines:
|
||||
if self.judge_retrograde(traj_line) == False:
|
||||
retrograde_list.append(det[traj_line['index']][0])
|
||||
|
||||
return retrograde_list, self.fence_line
|
||||
Reference in New Issue
Block a user