计算汽车 OpenCV + Python 问题 [英] Counting Cars OpenCV + Python Issue

查看:32
本文介绍了计算汽车 OpenCV + Python 问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在尝试在过线时计算汽车并且它有效,但问题是它计算了一辆车很多次,这很荒谬,因为它应该只计算一次.

这是我使用的代码:

导入 cv2将 numpy 导入为 npbgsMOG = cv2.BackgroundSubtractorMOG()cap = cv2.VideoCapture(traffic.avi")计数器 = 0如果上限:而真:ret, frame = cap.read()如果返回:fgmask = bgsMOG.apply(frame, None, 0.01)cv2.line(frame, (0,60), (160,60), (255,255,0), 1)# 找到汽车的countours轮廓,层次结构 = cv2.findContours(fgmask,cv2.RETR_EXTERNAL,cv2.CHAIN_APPROX_SIMPLE)尝试:层次结构=层次结构[0]除了:层次结构 = []对于轮廓,zip 中的 hier(轮廓,层次结构):(x, y, w, h) = cv2.boundingRect(contour)如果 w >20 和 h >20:cv2.rectangle(frame, (x,y), (x+w,y+h), (255, 0, 0), 1)# 找到汽车的质心x1 = w/2y1 = 小时/2cx = x+x1cy = y+y1## 打印cy=", cy## 打印cx=",cx质心 = (cx,cy)## 打印centoid=",质心# 绘制质心的圆cv2.circle(frame,(int(cx),int(cy)),2,(0,0,255),-1)# 确保汽车过线## dy = cy-108## 打印dy",dy如果质心 >(27, 38) 和质心 <(134, 108):## 如果(cx <= 132)和(cx >= 20):计数器 +=1## 打印计数器=",计数器## 如果 cy >10和cy<160:cv2.putText(frame, str(counter), (x,y-5),cv2.FONT_HERSHEY_SIMPLEX,0.5, (255, 0, 255), 2)## cv2.namedWindow('输出',cv2.cv.CV_WINDOW_NORMAL)cv2.imshow('输出', 帧)## cv2.imshow('FGMASK', fgmask)键 = cv2.waitKey(60)如果键 == 27:休息cap.release()cv2.destroyAllWindows()

视频在我的 GitHub 页面@

解决方案

准备

为了了解发生了什么,并最终解决我们的问题,我们首先需要稍微改进脚本.

我添加了算法重要步骤的日志记录,稍微重构了代码,添加了遮罩和处理图像的保存,添加了使用单个帧图像运行脚本的能力,以及一些其他修改.

这是此时脚本的样子:

导入日志导入 logging.handlers导入操作系统导入时间导入系统导入 cv2将 numpy 导入为 np从 Vehicle_counter 导入 VehicleCounter# ==============================================================================IMAGE_DIR = "图像"IMAGE_FILENAME_FORMAT = IMAGE_DIR + "/frame_%04d.png"# 支持视频文件或单个帧CAPTURE_FROM_VIDEO = 假如果 CAPTURE_FROM_VIDEO:IMAGE_SOURCE = "traffic.avi" # 视频文件别的:IMAGE_SOURCE = IMAGE_FILENAME_FORMAT # 图像序列# 帧之间等待的时间,0=永远等待时间 = 1 # 250 # 毫秒LOG_TO_FILE = 真# 在处理过的帧上绘制的颜色DIVIDER_COLOUR = (255, 255, 0)BOUNDING_BOX_COLOUR = (255, 0, 0)CENTROID_COLOUR = (0, 0, 255)# ==============================================================================定义 init_logging():main_logger = logging.getLogger()格式化程序 = logging.Formatter(fmt='%(asctime)s.%(msecs)03d %(levelname)-8s [%(name)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')handler_stream = logging.StreamHandler(sys.stdout)handler_stream.setFormatter(格式化程序)main_logger.addHandler(handler_stream)如果 LOG_TO_FILE:handler_file = logging.handlers.RotatingFileHandler("debug.log", maxBytes = 2**24, 备份计数 = 10)handler_file.setFormatter(格式化程序)main_logger.addHandler(handler_file)main_logger.setLevel(logging.DEBUG)返回 main_logger# ==============================================================================def save_frame(file_name_format, frame_number, frame, label_format):file_name = file_name_format % frame_number标签 = label_format % frame_numberlog.debug("将 %s 保存为 '%s'", label, file_name)cv2.imwrite(file_name, frame)# ==============================================================================def get_centroid(x, y, w, h):x1 = int(w/2)y1 = int(h/2)cx = x + x1cy = y + y1返回 (cx, cy)# ==============================================================================defdetect_vehicles(fg_mask):log = logging.getLogger("detect_vehicles")MIN_CONTOUR_WIDTH = 21MIN_CONTOUR_HEIGHT = 21# 找出图像中任何车辆的轮廓轮廓,层次结构 = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)log.debug("找到 %d 个车辆轮廓.", len(contours))匹配 = []对于枚举(轮廓)中的(i,轮廓):(x, y, w, h) = cv2.boundingRect(contour)轮廓有效 = (w >= MIN_CONTOUR_WIDTH) 和 (h >= MIN_CONTOUR_HEIGHT)log.debug("Contour #%d: pos=(x=%d, y=%d) size=(w=%d, h=%d) valid=%s", i, x, y, w, h, contour_valid)如果不是轮廓_有效:继续质心 = get_centroid(x, y, w, h)匹配.追加(((x,y,w,h),质心))返回匹配项# ==============================================================================def filter_mask(fg_mask):内核 = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))# 填充任何小孔关闭 = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, 内核)# 去除噪音开场= cv2.morphologyEx(关闭,cv2.MORPH_OPEN,内核)# 扩张以合并相邻的 blob扩张 = cv2.dilate(开场,核,迭代 = 2)返回膨胀# ==============================================================================def process_frame(frame_number, frame, bg_subtractor, car_counter):log = logging.getLogger("process_frame")# 创建要绘制的源帧的副本处理 = frame.copy()# 画分界线——我们计算穿过这条线的汽车.cv2.line(已处理,(0,car_counter.divider),(frame.shape[1],car_counter.divider),DIVIDER_COLOUR,1)# 去除背景fg_mask = bg_subtractor.apply(框架,无,0.01)fg_mask = filter_mask(fg_mask)save_frame(IMAGE_DIR + "/mask_%04d.png", frame_number, fg_mask, "帧#%d 的前景掩码")匹配 = 检测车辆(fg_mask)log.debug("找到 %d 个有效车辆轮廓.", len(matches))for (i, match) in enumerate(matches):轮廓,质心 = 匹配log.debug("有效车辆轮廓#%d: centroid=%s, bounding_box=%s", i, centroid, contour)x, y, w, h = 轮廓# 在处理过的帧上标记边界框和质心# NB: 修复了右下角的 off-by onecv2.rectangle(已处理,(x,y),(x + w - 1,y + h - 1),BOUNDING_BOX_COLOUR,1)cv2.circle(已处理,质心,2,CENTROID_COLOUR,-1)log.debug("正在更新车辆数量...")car_counter.update_count(匹配,处理)退货处理# ==============================================================================定义主():log = logging.getLogger("main")log.debug("创建背景减法器...")bg_subtractor = cv2.BackgroundSubtractorMOG()log.debug("预训练背景减法器...")default_bg = cv2.imread(IMAGE_FILENAME_FORMAT % 119)bg_subtractor.apply(default_bg,无,1.0)car_counter = None # 将在捕获第一帧后创建# 设置图片来源log.debug("正在初始化视频捕获设备#%s...", IMAGE_SOURCE)帽 = cv2.VideoCapture(IMAGE_SOURCE)frame_width = cap.get(cv2.cv.CV_CAP_PROP_FRAME_WIDTH)frame_height = cap.get(cv2.cv.CV_CAP_PROP_FRAME_HEIGHT)log.debug("视频采集帧大小=(w=%d, h=%d)", frame_width, frame_height)log.debug("开始捕获循环...")frame_number = -1而真:frame_number += 1log.debug("捕获帧#%d...", frame_number)ret, frame = cap.read()如果不退:log.error("帧捕获失败,正在停止...")休息log.debug("得到帧#%d: shape=%s", frame_number, frame.shape)如果 car_counter 为 None:# 我们在这里这样做,以便我们可以用实际的帧大小进行初始化log.debug("创建车辆计数器...")car_counter = VehicleCounter(frame.shape[:2], frame.shape[0]/2)# 将原始帧从视频存档到磁盘以供以后检查/测试如果 CAPTURE_FROM_VIDEO:save_frame(IMAGE_FILENAME_FORMAT), frame_number, frame, "源帧#%d")log.debug("处理帧#%d...", frame_number)处理 = process_frame(frame_number, frame, bg_subtractor, car_counter)save_frame(IMAGE_DIR + "/processed_%04d.png", frame_number, 已处理, "已处理帧 #%d")cv2.imshow('源图像', 帧)cv2.imshow('处理过的图片', 已处理)log.debug("帧 #%d 已处理.", frame_number)c = cv2.waitKey(WAIT_TIME)如果 c == 27:log.debug("检测到ESC,停止...")休息log.debug("正在关闭视频捕获设备...")cap.release()cv2.destroyAllWindows()log.debug("完成.")# ==============================================================================如果 __name__ == "__main__":日志 = init_logging()如果不是 os.path.exists(IMAGE_DIR):log.debug("正在创建图像目录`%s`...", IMAGE_DIR)os.makedirs(IMAGE_DIR)主要的()

这个脚本负责处理图像流,并识别每一帧中的所有车辆——我在代码中将它们称为matches.

<小时>

对检测到的车辆进行计数的任务委托给 VehicleCounter 类.随着我们的进步,我选择将其作为一门课程的原因将变得显而易见.我没有实现您的车辆计数算法,因为它不会起作用,原因会随着我们深入研究而再次变得明显.

文件 vehicle_counter.py 包含以下代码:

导入日志# ==============================================================================类 VehicleCounter(对象):def __init__(self、shape、divider):self.log = logging.getLogger("vehicle_counter")self.height, self.width = 形状self.divider = 分频器self.vehicle_count = 0def update_count(self,matches, output_image = None):self.log.debug("使用 %d 个匹配更新计数...", len(matches))# ==============================================================================

<小时>

最后,我编写了一个脚本,将所有生成的图像拼接在一起,以便检查它们:

导入 cv2将 numpy 导入为 np# ==============================================================================输入宽度 = 160输入高度 = 120OUTPUT_TILE_WIDTH = 10OUTPUT_TILE_HEIGHT = 12TILE_COUNT = OUTPUT_TILE_WIDTH * OUTPUT_TILE_HEIGHT# ==============================================================================定义缝合图像(输入格式,输出文件名):output_shape = (INPUT_HEIGHT * OUTPUT_TILE_HEIGHT, INPUT_WIDTH * OUTPUT_TILE_WIDTH, 3)输出 = np.zeros(output_shape, np.uint8)对于我在范围内(TILE_COUNT):img = cv2.imread(input_format % i)cv2.rectangle(img, (0, 0), (INPUT_WIDTH - 1, INPUT_HEIGHT - 1), (0, 0, 255), 1)# 绘制帧数cv2.putText(img, str(i), (2, 10), cv2.FONT_HERSHEY_PLAIN, 0.7, (255, 255, 255), 1)x = i % OUTPUT_TILE_WIDTH * INPUT_WIDTHy = i/OUTPUT_TILE_WIDTH * INPUT_HEIGHT输出[y:y+INPUT_HEIGHT, x:x+INPUT_WIDTH,:] = imgcv2.imwrite(输出文件名,输出)# ==============================================================================缝合图像(图像/帧_%04d.png",缝合帧.png")缝合图像(图像/掩码_%04d.png",缝合掩码.png")缝合图像(图像/已处理_%04d.png",缝合_已处理.png")

<小时>

分析

为了解决这个问题,我们应该对我们期望得到的结果有所了解.我们还应该标记视频中所有不同的汽车,以便更容易谈论它们.

如果我们运行我们的脚本,并将图像拼接在一起,我们会得到一些有用的文件来帮助我们分析问题:

  • 包含

.

在检查这些之后,许多问题变得明显:

  • 前景蒙版往往很嘈杂.我们应该做一些过滤(侵蚀/扩张?)以消除噪音和缩小差距.
  • 有时我们会错过车辆(灰色的).
  • 有些车辆在单帧中被检测到两次.
  • 在画面的上部区域很少检测到车辆.
  • 通常在连续帧中检测到同一辆车.我们需要想办法在连续帧中跟踪同一辆车,并且只计算一次.
<小时>

解决方案

1.预置背景减法器

我们的视频很短,只有 120 帧.在0.01的学习率下,背景检测器需要大部分视频才能稳定.

幸运的是,视频的最后一帧(第 119 帧)完全没有车辆,因此我们可以将其用作初始背景图像.(其他获取合适图像的选项在注释和评论中提到.)

要使用这个初始背景图像,我们只需加载它,并将其应用到具有学习因子1.0的背景减法器上:

bg_subtractor = cv2.BackgroundSubtractorMOG()default_bg = cv2.imread(IMAGE_FILENAME_FORMAT % 119)bg_subtractor.apply(default_bg,无,1.0)

当我们查看新的.

def filter_mask(fg_mask):内核 = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))# 填充任何小孔关闭 = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, 内核)# 去除噪音开场= cv2.morphologyEx(关闭,cv2.MORPH_OPEN,内核)# 扩张以合并相邻的 blob扩张 = cv2.dilate(开场,核,迭代 = 2)返回膨胀

检查.

质心坐标列表:

traces = {'A': [(112, 36), (112, 45), (112, 52), (112, 54), (112, 63), (111, 73), (111, 86), (111,91), (111, 97), (110, 105)], 'B': [(119, 37), (120, 42), (121, 54), (121, 55), (123, 64), (124, 74), (125, 87), (127), 94), (125, 100), (126, 108)], 'C': [(93, 23), (91, 27), (89, 31), (87, 36), (85, 42), (82, 49), (79, 59), (74), 71), (70, 82), (62, 86), (61, 92), (55, 101)], 'D': [(118, 30), (124, 83), (125, 90), (116, 101), (122, 100)], 'E': [(77, 27), (75, 30), (73, 33), (70, 37), (67, 42), (63, 47), (59, 53), (55), 59), (49, 67), (43, 75), (36, 85), (27, 92), (24, 97), (20, 102)], 'F': [(119, 30), (120, 34), (120, 39), (122, 59), (123, 60), (124, 70), (125, 82), (127), 91), (126, 97), (128, 104)], 'G': [(88, 37), (87, 41), (85, 48), (82, 55), (79, 63), (76, 74), (72, 87), (67), 92), (65, 98), (60, 106)], 'H': [(124, 35), (123, 40), (125, 45), (127, 59), (126, 59), (128, 67), (130, 78), (132), 88), (134, 93), (135, 99), (135, 107)], '我': [(98, 26), (97, 30), (96, 34), (94, 40), (92, 47), (90, 55), (87, 64), (84), 77), (79, 87), (74, 93), (73, 102)], 'J': [(123, 60), (125, 63), (125, 81), (127, 93), (126, 98), (125, 100)]}

在背景上绘制的个别车辆轨迹:

所有车辆痕迹的组合放大图像:

向量

为了分析运动,我们需要使用向量(即移动的距离和方向).下图显示了角度如何对应于图像中车辆的运动.

我们可以使用下面的函数来计算两点之间的向量:

def get_vector(a, b):"""计算从a点到b点的向量(距离,角度).角度范围从 -180 到 180 度.角度为 0 点的矢量在图像上笔直向下.值沿顺时针方向增加."""dx = 浮点数(b[0] - a[0])dy = float(b[1] - a[1])距离 = math.sqrt(dx**2 + dy**2)如果 dy >0:角度 = math.degrees(math.atan(-dx/dy))elif dy == 0:如果 dx <0:角度 = 90.0elif dx >0:角度 = -90.0别的:角度 = 0.0别的:如果 dx <0:角度 = 180 - math.degrees(math.atan(dx/dy))elif dx >0:角度 = -180 - math.degrees(math.atan(dx/dy))别的:角度 = 180.0返回距离,角度

分类

我们寻找可用于将运动分类为有效/无效的模式的一种方法是制作散点图(角度与距离):

  • 绿色点代表有效移动,我们使用每辆车的点列表确定.
  • 红色点代表无效运动 - 相邻车道中点之间的向量.
  • 我绘制了两条蓝色曲线,我们可以用它来区分两种类型的运动.位于任一曲线下方的任何点都可以被认为是有效的.曲线是:
    • 距离 = -0.008 * 角度**2 + 0.4 * 角度 + 25.0
    • 距离 = 10.0

我们可以使用以下函数对运动向量进行分类:

def is_valid_vector(a):距离,角度 = a阈值距离 = 最大值(10.0,-0.008 * 角度**2 + 0.4 * 角度 + 25.0)返回(距离<=阈值距离)

注意:有一个异常值,这是由于我们在第 43..48 帧中丢失了车辆 D 的轨迹而发生的.

算法

我们将使用 Vehicle 类来存储有关每个被跟踪车辆的信息:

  • 某种标识符
  • 职位列表,最近在前面
  • Last-seen counter -- 自我们上次看到这辆车以来的帧数
  • 标记车辆是否被计数

Class VehicleCounter 将存储当前跟踪的车辆列表并跟踪总数.在每一帧上,我们将使用已识别车辆的边界框列表和位置(候选列表)来更新 VehicleCounter 的状态:

  1. 更新当前跟踪的Vehicle:
    • 对于每辆车
      • 如果给定车辆有任何有效匹配,则更新车辆位置并重置其上次看到的计数器.从候选列表中删除匹配项.
      • 否则,请增加该车辆的最后出现次数.
  2. 为任何剩余的匹配项创建新的Vehicle
  3. 更新车辆数量
    • 对于每辆车
      • 如果车辆已通过分隔线且尚未计数,则更新总计数并将车辆标记为已计数
  4. 移除不再可见的车辆
    • 对于每辆车
      • 如果上次看到的计数器超过阈值,请移除车辆

4.解决方案

我们可以在 vehicle_counter.py 的最终版本中重用主脚本,其中包含我们的计数算法的实现:

导入日志导入数学导入 cv2将 numpy 导入为 np# ==============================================================================CAR_COLOURS = [ (0,0,255), (0,106,255), (0,216,255), (0,255,182), (0,255,76), (144,255,0), (255,255,0), (255,148,0), (255,0,178), (220,0,255) ]# ==============================================================================类车辆(对象):def __init__(self, id, position):self.id = idself.positions = [位置]self.frames_since_seen = 0self.counted = 假@财产def last_position(self):返回 self.positions[-1]def add_position(self, new_position):self.positions.append(new_position)self.frames_since_seen = 0def draw(self, output_image):car_colour = CAR_COLOURS[self.id % len(CAR_COLOURS)]对于 self.positions 中的点:cv2.circle(输出图像,点,2,汽车颜色,-1)cv2.polylines(output_image, [np.int32(self.positions)], 错误, car_colour, 1)# ==============================================================================类 VehicleCounter(对象):def __init__(self、shape、divider):self.log = logging.getLogger("vehicle_counter")self.height, self.width = 形状self.divider = 分频器self.vehicles = []self.next_vehicle_id = 0self.vehicle_count = 0self.max_unseen_frames = 7@静态方法def get_vector(a, b):"""计算从a点到b点的向量(距离,角度).角度范围从 -180 到 180 度.角度为 0 点的矢量在图像上笔直向下.值沿顺时针方向增加."""dx = 浮点数(b[0] - a[0])dy = float(b[1] - a[1])距离 = math.sqrt(dx**2 + dy**2)如果 dy >0:角度 = math.degrees(math.atan(-dx/dy))elif dy == 0:如果 dx <0:角度 = 90.0elif dx >0:角度 = -90.0别的:角度 = 0.0别的:如果 dx <0:角度 = 180 - math.degrees(math.atan(dx/dy))elif dx >0:角度 = -180 - math.degrees(math.atan(dx/dy))别的:角度 = 180.0返回距离,角度@静态方法def is_valid_vector(a):距离,角度 = a阈值距离 = 最大值(10.0,-0.008 * 角度**2 + 0.4 * 角度 + 25.0)返回(距离<=阈值距离)def update_vehicle(自我,车辆,匹配):# 查找是否有任何匹配项适合此车辆对于我,匹配枚举(匹配):轮廓,质心 = 匹配向量 = self.get_vector(vehicle.last_position, centroid)如果 self.is_valid_vector(vector):Vehicle.add_position(质心)self.log.debug("添加匹配 (%d, %d) 到车辆 #%d.vector=(%0.2f,%0.2f)",质心[0],质心[1],车辆.id,向量[0],向量[1])返回我# 没有合适的匹配...Vehicle.frames_since_seen += 1self.log.debug("没有匹配车辆#%d.frames_since_seen=%d", Vehicle.id, Vehicle.frames_since_seen)返回无def update_count(self,matches, output_image = None):self.log.debug("使用 %d 个匹配更新计数...", len(matches))# 首先更新所有现有车辆对于self.vehicles中的车辆:i = self.update_vehicle(车辆,匹配)如果我不是无:德尔匹配[i]# 根据剩余匹配添加新车辆对于比赛中的比赛:轮廓,质心 = 匹配new_vehicle = 车辆(self.next_vehicle_id,质心)self.next_vehicle_id += 1self.vehicles.append(new_vehicle)self.log.debug("从匹配中创建了#%d 的新载具 (%d, %d).", new_vehicle.id, 质心[0], 质心[1])# 计算任何通过分隔线的未计数车辆对于self.vehicles中的车辆:如果不是 Vehicle.counted 和 (vehicle.last_position[1] > self.divider):self.vehicle_count += 1车辆.计数 = 真self.log.debug("计算车辆#%d (总计数=%d).", Vehicle.id, self.vehicle_count)# 可选地在图像上绘制车辆如果 output_image 不是 None:对于self.vehicles中的车辆:车辆.绘制(输出图像)cv2.putText(output_image, ("%02d" % self.vehicle_count), (142, 10), cv2.FONT_HERSHEY_PLAIN, 0.7, (127, 255, 255), 1)# 移除未见足够长的车辆移除 = [ v.id for v in self.vehicles如果 v.frames_since_seen >= self.max_unseen_frames ]self.vehicles[:] = [ v for v in self.vehicles如果不是 v.frames_since_seen >= self.max_unseen_frames ]对于已删除的 id:self.log.debug("已移除车辆 #%d.", id)self.log.debug("计数更新,跟踪 %d 辆车.", len(self.vehicles))# ==============================================================================

程序现在将所有当前跟踪车辆的历史路径连同车辆计数一起绘制到输出图像中.每辆车都分配了 10 种颜色中的一种.

请注意,车辆 D 最终被跟踪了两次,但它只计算了一次,因为我们在穿过分隔线之前就失去了对它的跟踪.附录中提到了如何解决这个问题的想法.

基于脚本生成的最后处理的帧

车辆总数为10.这是一个正确的结果.

更多细节可以在脚本生成的输出中找到:

  • 完整的 只需一点点工作.
  • 正如 Henrik 所建议的,我们可以使用 中值混合.

I have been trying to count cars when crossing the line and it works, but the problem is it counts one car many times which is ridiculous because it should only be counted once.

Here is the code I am using:

import cv2
import numpy as np

bgsMOG = cv2.BackgroundSubtractorMOG()
cap    = cv2.VideoCapture("traffic.avi")
counter = 0

if cap:
    while True:
        ret, frame = cap.read()

        if ret:
            fgmask = bgsMOG.apply(frame, None, 0.01)
            cv2.line(frame, (0,60), (160,60), (255,255,0), 1)
            # To find the countours of the Cars
            contours, hierarchy = cv2.findContours(fgmask,
                                    cv2.RETR_EXTERNAL,cv2.CHAIN_APPROX_SIMPLE)

            try:
                hierarchy = hierarchy[0]

            except:
                hierarchy = []

            for contour, hier in zip(contours, hierarchy):
                (x, y, w, h) = cv2.boundingRect(contour)

                if w > 20 and h > 20:
                    cv2.rectangle(frame, (x,y), (x+w,y+h), (255, 0, 0), 1)

                    # To find the centroid of the car
                    x1 = w/2
                    y1 = h/2

                    cx = x+x1
                    cy = y+y1
##                    print "cy=", cy
##                    print "cx=", cx
                    centroid = (cx,cy)
##                    print "centoid=", centroid
                    # Draw the circle of Centroid
                    cv2.circle(frame,(int(cx),int(cy)),2,(0,0,255),-1)

                    # To make sure the Car crosses the line
##                    dy = cy-108
##                    print "dy", dy
                    if centroid > (27, 38) and centroid < (134, 108):
##                        if (cx <= 132)and(cx >= 20):
                        counter +=1
##                            print "counter=", counter
##                    if cy > 10 and cy < 160:
                    cv2.putText(frame, str(counter), (x,y-5),
                                        cv2.FONT_HERSHEY_SIMPLEX,
                                        0.5, (255, 0, 255), 2)
##            cv2.namedWindow('Output',cv2.cv.CV_WINDOW_NORMAL)
            cv2.imshow('Output', frame)
##          cv2.imshow('FGMASK', fgmask)


            key = cv2.waitKey(60)
            if key == 27:
                break

cap.release()
cv2.destroyAllWindows()

And the video is on my GitHub page @ https://github.com/Tes3awy/MATLAB-Tutorials/blob/f24b680f2215c1b1bb96c76f5ba81df533552983/traffic.avi (and it's also a built-in video in Matlab library)

How can make it so that each car is only counted once?


The individual frames of the video look as follows:

解决方案

Preparation

In order to understand what is happening, and eventually solve our problem, we first need to improve the script a little.

I've added logging of the important steps of your algorithm, refactored the code a little, and added saving of the mask and processed images, added ability to run the script using the individual frame images, along with some other modifications.

This is what the script looks like at this point:

import logging
import logging.handlers
import os
import time
import sys

import cv2
import numpy as np

from vehicle_counter import VehicleCounter

# ============================================================================

IMAGE_DIR = "images"
IMAGE_FILENAME_FORMAT = IMAGE_DIR + "/frame_%04d.png"

# Support either video file or individual frames
CAPTURE_FROM_VIDEO = False
if CAPTURE_FROM_VIDEO:
    IMAGE_SOURCE = "traffic.avi" # Video file
else:
    IMAGE_SOURCE = IMAGE_FILENAME_FORMAT # Image sequence

# Time to wait between frames, 0=forever
WAIT_TIME = 1 # 250 # ms

LOG_TO_FILE = True

# Colours for drawing on processed frames    
DIVIDER_COLOUR = (255, 255, 0)
BOUNDING_BOX_COLOUR = (255, 0, 0)
CENTROID_COLOUR = (0, 0, 255)

# ============================================================================

def init_logging():
    main_logger = logging.getLogger()

    formatter = logging.Formatter(
        fmt='%(asctime)s.%(msecs)03d %(levelname)-8s [%(name)s] %(message)s'
        , datefmt='%Y-%m-%d %H:%M:%S')

    handler_stream = logging.StreamHandler(sys.stdout)
    handler_stream.setFormatter(formatter)
    main_logger.addHandler(handler_stream)

    if LOG_TO_FILE:
        handler_file = logging.handlers.RotatingFileHandler("debug.log"
            , maxBytes = 2**24
            , backupCount = 10)
        handler_file.setFormatter(formatter)
        main_logger.addHandler(handler_file)

    main_logger.setLevel(logging.DEBUG)

    return main_logger

# ============================================================================

def save_frame(file_name_format, frame_number, frame, label_format):
    file_name = file_name_format % frame_number
    label = label_format % frame_number

    log.debug("Saving %s as '%s'", label, file_name)
    cv2.imwrite(file_name, frame)

# ============================================================================

def get_centroid(x, y, w, h):
    x1 = int(w / 2)
    y1 = int(h / 2)

    cx = x + x1
    cy = y + y1

    return (cx, cy)

# ============================================================================

def detect_vehicles(fg_mask):
    log = logging.getLogger("detect_vehicles")

    MIN_CONTOUR_WIDTH = 21
    MIN_CONTOUR_HEIGHT = 21

    # Find the contours of any vehicles in the image
    contours, hierarchy = cv2.findContours(fg_mask
        , cv2.RETR_EXTERNAL
        , cv2.CHAIN_APPROX_SIMPLE)

    log.debug("Found %d vehicle contours.", len(contours))

    matches = []
    for (i, contour) in enumerate(contours):
        (x, y, w, h) = cv2.boundingRect(contour)
        contour_valid = (w >= MIN_CONTOUR_WIDTH) and (h >= MIN_CONTOUR_HEIGHT)

        log.debug("Contour #%d: pos=(x=%d, y=%d) size=(w=%d, h=%d) valid=%s"
            , i, x, y, w, h, contour_valid)

        if not contour_valid:
            continue

        centroid = get_centroid(x, y, w, h)

        matches.append(((x, y, w, h), centroid))

    return matches

# ============================================================================

def filter_mask(fg_mask):
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))

    # Fill any small holes
    closing = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel)
    # Remove noise
    opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)

    # Dilate to merge adjacent blobs
    dilation = cv2.dilate(opening, kernel, iterations = 2)

    return dilation

# ============================================================================

def process_frame(frame_number, frame, bg_subtractor, car_counter):
    log = logging.getLogger("process_frame")

    # Create a copy of source frame to draw into
    processed = frame.copy()

    # Draw dividing line -- we count cars as they cross this line.
    cv2.line(processed, (0, car_counter.divider), (frame.shape[1], car_counter.divider), DIVIDER_COLOUR, 1)

    # Remove the background
    fg_mask = bg_subtractor.apply(frame, None, 0.01)
    fg_mask = filter_mask(fg_mask)

    save_frame(IMAGE_DIR + "/mask_%04d.png"
        , frame_number, fg_mask, "foreground mask for frame #%d")

    matches = detect_vehicles(fg_mask)

    log.debug("Found %d valid vehicle contours.", len(matches))
    for (i, match) in enumerate(matches):
        contour, centroid = match

        log.debug("Valid vehicle contour #%d: centroid=%s, bounding_box=%s", i, centroid, contour)

        x, y, w, h = contour

        # Mark the bounding box and the centroid on the processed frame
        # NB: Fixed the off-by one in the bottom right corner
        cv2.rectangle(processed, (x, y), (x + w - 1, y + h - 1), BOUNDING_BOX_COLOUR, 1)
        cv2.circle(processed, centroid, 2, CENTROID_COLOUR, -1)

    log.debug("Updating vehicle count...")
    car_counter.update_count(matches, processed)

    return processed

# ============================================================================

def main():
    log = logging.getLogger("main")

    log.debug("Creating background subtractor...")
    bg_subtractor = cv2.BackgroundSubtractorMOG()

    log.debug("Pre-training the background subtractor...")
    default_bg = cv2.imread(IMAGE_FILENAME_FORMAT % 119)
    bg_subtractor.apply(default_bg, None, 1.0)

    car_counter = None # Will be created after first frame is captured

    # Set up image source
    log.debug("Initializing video capture device #%s...", IMAGE_SOURCE)
    cap = cv2.VideoCapture(IMAGE_SOURCE)

    frame_width = cap.get(cv2.cv.CV_CAP_PROP_FRAME_WIDTH)
    frame_height = cap.get(cv2.cv.CV_CAP_PROP_FRAME_HEIGHT)
    log.debug("Video capture frame size=(w=%d, h=%d)", frame_width, frame_height)

    log.debug("Starting capture loop...")
    frame_number = -1
    while True:
        frame_number += 1
        log.debug("Capturing frame #%d...", frame_number)
        ret, frame = cap.read()
        if not ret:
            log.error("Frame capture failed, stopping...")
            break

        log.debug("Got frame #%d: shape=%s", frame_number, frame.shape)

        if car_counter is None:
            # We do this here, so that we can initialize with actual frame size
            log.debug("Creating vehicle counter...")
            car_counter = VehicleCounter(frame.shape[:2], frame.shape[0] / 2)

        # Archive raw frames from video to disk for later inspection/testing
        if CAPTURE_FROM_VIDEO:
            save_frame(IMAGE_FILENAME_FORMAT
                , frame_number, frame, "source frame #%d")

        log.debug("Processing frame #%d...", frame_number)
        processed = process_frame(frame_number, frame, bg_subtractor, car_counter)

        save_frame(IMAGE_DIR + "/processed_%04d.png"
            , frame_number, processed, "processed frame #%d")

        cv2.imshow('Source Image', frame)
        cv2.imshow('Processed Image', processed)

        log.debug("Frame #%d processed.", frame_number)

        c = cv2.waitKey(WAIT_TIME)
        if c == 27:
            log.debug("ESC detected, stopping...")
            break

    log.debug("Closing video capture device...")
    cap.release()
    cv2.destroyAllWindows()
    log.debug("Done.")

# ============================================================================

if __name__ == "__main__":
    log = init_logging()

    if not os.path.exists(IMAGE_DIR):
        log.debug("Creating image directory `%s`...", IMAGE_DIR)
        os.makedirs(IMAGE_DIR)

    main()

This script is responsible for processing of the stream of images, and identifying all the vehicles in each frame -- I refer to them as matches in the code.


The task of counting the detected vehicles is delegated to class VehicleCounter. The reason why I chose to make this a class will become evident as we progress. I did not implement your vehicle counting algorithm, because it will not work for reasons that will again become evident as we dig into this deeper.

File vehicle_counter.py contains the following code:

import logging

# ============================================================================

class VehicleCounter(object):
    def __init__(self, shape, divider):
        self.log = logging.getLogger("vehicle_counter")

        self.height, self.width = shape
        self.divider = divider

        self.vehicle_count = 0


    def update_count(self, matches, output_image = None):
        self.log.debug("Updating count using %d matches...", len(matches))

# ============================================================================


Finally, I wrote a script that will stitch all the generated images together, so it's easier to inspect them:

import cv2
import numpy as np

# ============================================================================

INPUT_WIDTH = 160
INPUT_HEIGHT = 120

OUTPUT_TILE_WIDTH = 10
OUTPUT_TILE_HEIGHT = 12

TILE_COUNT = OUTPUT_TILE_WIDTH * OUTPUT_TILE_HEIGHT

# ============================================================================

def stitch_images(input_format, output_filename):
    output_shape = (INPUT_HEIGHT * OUTPUT_TILE_HEIGHT
        , INPUT_WIDTH * OUTPUT_TILE_WIDTH
        , 3)
    output = np.zeros(output_shape, np.uint8)

    for i in range(TILE_COUNT):
        img = cv2.imread(input_format % i)
        cv2.rectangle(img, (0, 0), (INPUT_WIDTH - 1, INPUT_HEIGHT - 1), (0, 0, 255), 1)
        # Draw the frame number
        cv2.putText(img, str(i), (2, 10)
            , cv2.FONT_HERSHEY_PLAIN, 0.7, (255, 255, 255), 1)
        x = i % OUTPUT_TILE_WIDTH * INPUT_WIDTH
        y = i / OUTPUT_TILE_WIDTH * INPUT_HEIGHT
        output[y:y+INPUT_HEIGHT, x:x+INPUT_WIDTH,:] = img

    cv2.imwrite(output_filename, output)

# ============================================================================

stitch_images("images/frame_%04d.png", "stitched_frames.png")
stitch_images("images/mask_%04d.png", "stitched_masks.png")
stitch_images("images/processed_%04d.png", "stitched_processed.png")


Analysis

In order to solve this problem, we should have some idea about what results we expect to get. We should also label all the distinct cars in the video, so it's easier to talk about them.

If we run our script, and stitch the images together, we get the a number of useful files to help us analyze the problem:

Upon inspecting those, a number of issues become evident:

  • The foreground masks tend to be noisy. We should do some filtering (erode/dilate?) to get rid of the noise and narrow gaps.
  • Sometimes we miss vehicles (grey ones).
  • Some vehicles get detected twice in the single frame.
  • Vehicles are rarely detected in the upper regions of the frame.
  • The same vehicle is often detected in consecutive frames. We need to figure out a way of tracking the same vehicle in consecutive frames, and counting it only once.

Solution

1. Pre-Seeding the Background Subtractor

Our video is quite short, only 120 frames. With learning rate of 0.01, it will take a substantial part of the video for the background detector to stabilize.

Fortunately, the last frame of the video (frame number 119) is completely devoid of vehicles, and therefore we can use it as our initial background image. (Other options of obtaining suitable image are mentioned in notes and comments.)

To use this initial background image, we simply load it, and apply it on the background subtractor with learning factor 1.0:

bg_subtractor = cv2.BackgroundSubtractorMOG()
default_bg = cv2.imread(IMAGE_FILENAME_FORMAT % 119)
bg_subtractor.apply(default_bg, None, 1.0)

When we look at the new mosaic of masks we can see that we get less noise and the vehicle detection works better in the early frames.

2. Cleaning Up the Foreground Mask

A simple approach to improve our foreground mask is to apply a few morphological transformations.

def filter_mask(fg_mask):
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))

    # Fill any small holes
    closing = cv2.morphologyEx(fg_mask, cv2.MORPH_CLOSE, kernel)
    # Remove noise
    opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)

    # Dilate to merge adjacent blobs
    dilation = cv2.dilate(opening, kernel, iterations = 2)

    return dilation

Inspecting the masks, processed frames and the log file generated with filtering, we can see that we now detect vehicles more reliably, and have mitigated the issue of different parts of one vehicle being detected as separate objects.

3. Tracking Vehicles Between Frames

At this point, we need to go through our log file, and collect all the centroid coordinates for each vehicle. This will allow us to plot and inspect the path each vehicle traces across the image, and develop an algorithm to do this automatically. To make this process easier, we can create a reduced log by grepping out the relevant entries.

The lists of centroid coordinates:

traces = {
    'A': [(112, 36), (112, 45), (112, 52), (112, 54), (112, 63), (111, 73), (111, 86), (111, 91), (111, 97), (110, 105)]
    , 'B': [(119, 37), (120, 42), (121, 54), (121, 55), (123, 64), (124, 74), (125, 87), (127, 94), (125, 100), (126, 108)]
    , 'C': [(93, 23), (91, 27), (89, 31), (87, 36), (85, 42), (82, 49), (79, 59), (74, 71), (70, 82), (62, 86), (61, 92), (55, 101)]
    , 'D': [(118, 30), (124, 83), (125, 90), (116, 101), (122, 100)]
    , 'E': [(77, 27), (75, 30), (73, 33), (70, 37), (67, 42), (63, 47), (59, 53), (55, 59), (49, 67), (43, 75), (36, 85), (27, 92), (24, 97), (20, 102)]
    , 'F': [(119, 30), (120, 34), (120, 39), (122, 59), (123, 60), (124, 70), (125, 82), (127, 91), (126, 97), (128, 104)]
    , 'G': [(88, 37), (87, 41), (85, 48), (82, 55), (79, 63), (76, 74), (72, 87), (67, 92), (65, 98), (60, 106)]
    , 'H': [(124, 35), (123, 40), (125, 45), (127, 59), (126, 59), (128, 67), (130, 78), (132, 88), (134, 93), (135, 99), (135, 107)]
    , 'I': [(98, 26), (97, 30), (96, 34), (94, 40), (92, 47), (90, 55), (87, 64), (84, 77), (79, 87), (74, 93), (73, 102)]
    , 'J': [(123, 60), (125, 63), (125, 81), (127, 93), (126, 98), (125, 100)]
}

Individual vehicle traces plotted on the background:

Combined enlarged image of all the vehicle traces:

Vectors

In order to analyze the movement, we need to work with vectors (i.e. the distance and direction moved). The following diagram shows how the angles correspond to movement of vehicles in the image.

We can use the following function to calculate the vector between two points:

def get_vector(a, b):
    """Calculate vector (distance, angle in degrees) from point a to point b.

    Angle ranges from -180 to 180 degrees.
    Vector with angle 0 points straight down on the image.
    Values increase in clockwise direction.
    """
    dx = float(b[0] - a[0])
    dy = float(b[1] - a[1])

    distance = math.sqrt(dx**2 + dy**2)

    if dy > 0:
        angle = math.degrees(math.atan(-dx/dy))
    elif dy == 0:
        if dx < 0:
            angle = 90.0
        elif dx > 0:
            angle = -90.0
        else:
            angle = 0.0
    else:
        if dx < 0:
            angle = 180 - math.degrees(math.atan(dx/dy))
        elif dx > 0:
            angle = -180 - math.degrees(math.atan(dx/dy))
        else:
            angle = 180.0        

    return distance, angle

Categorization

One way we can look for patterns that could be used to categorize the movements as valid/invalid is to make a scatter plot (angle vs. distance):

  • Green points represent valid movement, that we determined using the lists of points for each vehicle.
  • Red points represent invalid movement - vectors between points in adjacent traffic lanes.
  • I plotted two blue curves, which we can use to separate the two types of movements. Any point that lies below either curve can be considered as valid. The curves are:
    • distance = -0.008 * angle**2 + 0.4 * angle + 25.0
    • distance = 10.0

We can use the following function to categorize the movement vectors:

def is_valid_vector(a):
    distance, angle = a
    threshold_distance = max(10.0, -0.008 * angle**2 + 0.4 * angle + 25.0)
    return (distance <= threshold_distance)

NB: There is one outlier, which is occurs due to our loosing track of vehicle D in frames 43..48.

Algorithm

We will use class Vehicle to store information about each tracked vehicle:

  • Some kind of identifier
  • List of positions, most recent at front
  • Last-seen counter -- number of frames since we've last seen this vehicle
  • Flag to mark whether the vehicle was counted or not

Class VehicleCounter will store a list of currently tracked vehicles and keep track of the total count. On each frame, we will use the list of bounding boxes and positions of identified vehicles (candidate list) to update the state of VehicleCounter:

  1. Update currently tracked Vehicles:
    • For each vehicle
      • If there is any valid match for given vehicle, update vehicle position and reset its last-seen counter. Remove the match from the candidate list.
      • Otherwise, increase the last-seen counter for that vehicle.
  2. Create new Vehicles for any remaining matches
  3. Update vehicle count
    • For each vehicle
      • If the vehicle is past divider and has not been counted yet, update the total count and mark the vehicle as counted
  4. Remove vehicles that are no longer visible
    • For each vehicle
      • If the last-seen counter exceeds threshold, remove the vehicle

4. Solution

We can reuse the main script with the final version of vehicle_counter.py, containing the implementation of our counting algorithm:

import logging
import math

import cv2
import numpy as np

# ============================================================================

CAR_COLOURS = [ (0,0,255), (0,106,255), (0,216,255), (0,255,182), (0,255,76)
    , (144,255,0), (255,255,0), (255,148,0), (255,0,178), (220,0,255) ]

# ============================================================================

class Vehicle(object):
    def __init__(self, id, position):
        self.id = id
        self.positions = [position]
        self.frames_since_seen = 0
        self.counted = False

    @property
    def last_position(self):
        return self.positions[-1]

    def add_position(self, new_position):
        self.positions.append(new_position)
        self.frames_since_seen = 0

    def draw(self, output_image):
        car_colour = CAR_COLOURS[self.id % len(CAR_COLOURS)]
        for point in self.positions:
            cv2.circle(output_image, point, 2, car_colour, -1)
            cv2.polylines(output_image, [np.int32(self.positions)]
                , False, car_colour, 1)


# ============================================================================

class VehicleCounter(object):
    def __init__(self, shape, divider):
        self.log = logging.getLogger("vehicle_counter")

        self.height, self.width = shape
        self.divider = divider

        self.vehicles = []
        self.next_vehicle_id = 0
        self.vehicle_count = 0
        self.max_unseen_frames = 7


    @staticmethod
    def get_vector(a, b):
        """Calculate vector (distance, angle in degrees) from point a to point b.

        Angle ranges from -180 to 180 degrees.
        Vector with angle 0 points straight down on the image.
        Values increase in clockwise direction.
        """
        dx = float(b[0] - a[0])
        dy = float(b[1] - a[1])

        distance = math.sqrt(dx**2 + dy**2)

        if dy > 0:
            angle = math.degrees(math.atan(-dx/dy))
        elif dy == 0:
            if dx < 0:
                angle = 90.0
            elif dx > 0:
                angle = -90.0
            else:
                angle = 0.0
        else:
            if dx < 0:
                angle = 180 - math.degrees(math.atan(dx/dy))
            elif dx > 0:
                angle = -180 - math.degrees(math.atan(dx/dy))
            else:
                angle = 180.0        

        return distance, angle 


    @staticmethod
    def is_valid_vector(a):
        distance, angle = a
        threshold_distance = max(10.0, -0.008 * angle**2 + 0.4 * angle + 25.0)
        return (distance <= threshold_distance)


    def update_vehicle(self, vehicle, matches):
        # Find if any of the matches fits this vehicle
        for i, match in enumerate(matches):
            contour, centroid = match

            vector = self.get_vector(vehicle.last_position, centroid)
            if self.is_valid_vector(vector):
                vehicle.add_position(centroid)
                self.log.debug("Added match (%d, %d) to vehicle #%d. vector=(%0.2f,%0.2f)"
                    , centroid[0], centroid[1], vehicle.id, vector[0], vector[1])
                return i

        # No matches fit...        
        vehicle.frames_since_seen += 1
        self.log.debug("No match for vehicle #%d. frames_since_seen=%d"
            , vehicle.id, vehicle.frames_since_seen)

        return None


    def update_count(self, matches, output_image = None):
        self.log.debug("Updating count using %d matches...", len(matches))

        # First update all the existing vehicles
        for vehicle in self.vehicles:
            i = self.update_vehicle(vehicle, matches)
            if i is not None:
                del matches[i]

        # Add new vehicles based on the remaining matches
        for match in matches:
            contour, centroid = match
            new_vehicle = Vehicle(self.next_vehicle_id, centroid)
            self.next_vehicle_id += 1
            self.vehicles.append(new_vehicle)
            self.log.debug("Created new vehicle #%d from match (%d, %d)."
                , new_vehicle.id, centroid[0], centroid[1])

        # Count any uncounted vehicles that are past the divider
        for vehicle in self.vehicles:
            if not vehicle.counted and (vehicle.last_position[1] > self.divider):
                self.vehicle_count += 1
                vehicle.counted = True
                self.log.debug("Counted vehicle #%d (total count=%d)."
                    , vehicle.id, self.vehicle_count)

        # Optionally draw the vehicles on an image
        if output_image is not None:
            for vehicle in self.vehicles:
                vehicle.draw(output_image)

            cv2.putText(output_image, ("%02d" % self.vehicle_count), (142, 10)
                , cv2.FONT_HERSHEY_PLAIN, 0.7, (127, 255, 255), 1)

        # Remove vehicles that have not been seen long enough
        removed = [ v.id for v in self.vehicles
            if v.frames_since_seen >= self.max_unseen_frames ]
        self.vehicles[:] = [ v for v in self.vehicles
            if not v.frames_since_seen >= self.max_unseen_frames ]
        for id in removed:
            self.log.debug("Removed vehicle #%d.", id)

        self.log.debug("Count updated, tracking %d vehicles.", len(self.vehicles))

# ============================================================================

The program now draws the historical paths of all currently tracked vehicles into the output image, along with the vehicle count. Each vehicle is assigned 1 of 10 colours.

Notice that vehicle D ends up being tracked twice, however it is counted only once, since we lose track of it before crossing the divider. Ideas on how to resolve this are mentioned in the appendix.

Based on the last processed frame generated by the script

the total vehicle count is 10. This is a correct result.

More details can be found in the output the script generated:


A. Potential Improvements

  • Refactor, add unit tests.
  • Improve filtering/preprocessing of the foreground mask
    • Multiple iterations of filtering, fill holes using cv2.drawContours with CV_FILLED?
    • Watershed Algorithm?
  • Improve categorization of movement vectors
    • Create a predictor to estimate initial movement angle when vehicles are created (and only one position is known)... in order to be able to
    • Use change in direction rather than direction alone (I think this would cluster the angles of valid motion vectors close to zero).
  • Improve vehicle tracking
    • Predict position for frames where vehicle is not seen.

B. Notes

  • It seems it's not possible to directly extract the current background image from BackgroundSubtractorMOG in Python (at least in OpenCV 2.4.x), but there is a way to do it with a little work.
  • As suggested by Henrik, we can obtain a good estimate of the background using median blending.

这篇关于计算汽车 OpenCV + Python 问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆