基于计算机视觉和OpenCV:创建一个能够计算道路交通流量的应用

2017年09月19日 由 yining 发表 357818 0
基于计算机视觉和OpenCV:创建一个能够计算道路交通流量的应用

本文将介绍如何在不需要大量的深度学习算法的情况下,基于计算机视觉来计算道路交通流量。本教程只使用Python和OpenCV,在背景差分算法的帮助下,实现非常简单的运动检测方法。

本项目所需要的代码:

https://github.com/creotiv/object_detection_projects/tree/master/opencv_traffic_counting

主要内容

1.了解用于前景检测的背景差分算法的主旨
2.OpenCV图像滤波器
3.物体轮廓的检测
4.为进一步的数据操作构建处理管道

项目最终效果:

[video width="1280" height="720" mp4="http://imgcdn.atyun.com/2017/09/videoplayback.mp4"][/video]

视频来源: https://youtu.be/_o5iLbRHKao


背景差分算法

基于计算机视觉和OpenCV:创建一个能够计算道路交通流量的应用

背景差分有很多不同的算法,但它们的主旨非常简单。假设你有一个你房间的视频,在这个视频里没有出现人或者宠物,所以基本上这个房间是静态的,我们把它叫做背景层。为了获得在视频中移动的物体我们需要做的是:

foreground_objects = current_frame - background_layer(前景物体=当前帧 - 背景层)

但是在某些情况下,因为光线总是在改变,还有一些被人为移动或者本身可以运动的物体,我们不能得到静态帧。在这种情况下我们保存一些的帧数,并试图找出在大多数像素中,它们哪些是相同的,那么这些像素就会成为背景层的一部分。问题是,我们如何得到这个背景层和额外的滤波,从而使选择更加准确。

所以,我们将使用MOG算法来进行背景差分,在处理之后,它看起来是这样的:

基于计算机视觉和OpenCV:创建一个能够计算道路交通流量的应用

原图(上),使用MOG差分前景(带有阴影检测)(下)


可以看到在前景掩模上有一些噪声,我们会用一些标准的滤波技术去除。现在代码看上去是这样的:
import os
import logging
import logging.handlers
import random

import numpy as np
import skvideo.io
import cv2
import matplotlib.pyplot as plt

import utils
# without this some strange errors happen
cv2.ocl.setUseOpenCL(False)
random.seed(123)

# ============================================================================
IMAGE_DIR = "./out"
VIDEO_SOURCE = "input.mp4"
SHAPE = (720, 1280)  # HxW
# ============================================================================


def train_bg_subtractor(inst, cap, num=500):
    '''
        BG substractor need process some amount of frames to start giving result
    '''
    print ('Training BG Subtractor...')
    i = 0
    for frame in cap:
        inst.apply(frame, None, 0.001)
        i += 1
        if i >= num:
            return cap


def main():
    log = logging.getLogger("main")

    # creting MOG bg subtractor with 500 frames in cache
    # and shadow detction
    bg_subtractor = cv2.createBackgroundSubtractorMOG2(
        history=500, detectShadows=True)

    # Set up image source
    # You can use also CV2, for some reason it not working for me
    cap = skvideo.io.vreader(VIDEO_SOURCE)

    # skipping 500 frames to train bg subtractor
    train_bg_subtractor(bg_subtractor, cap, num=500)

    frame_number = -1
    for frame in cap:
        if not frame.any():
            log.error("Frame capture failed, stopping...")
            break

        frame_number += 1

        utils.save_frame(frame, "./out/frame_%04d.png" % frame_number)

        fg_mask = bg_subtractor.apply(frame, None, 0.001)
        
        utils.save_frame(frame, "./out/fg_mask_%04d.png" % frame_number)

# ============================================================================

if __name__ == "__main__":
    log = utils.init_logging()

    if not os.path.exists(IMAGE_DIR):
        log.debug("Creating image directory `%s`...", IMAGE_DIR)
        os.makedirs(IMAGE_DIR)

    main()

滤波

对于我们的情况,我们需要这些滤波器:“Threshold”, “Erode, Dilate, Opening, Closing”。请通过阅读下面链接内容查看这些滤波器是如何工作的。

http://docs.opencv.org/3.1.0/d7/d4d/tutorial_py_thresholding.html

http://docs.opencv.org/3.1.0/d9/d61/tutorial_py_morphological_ops.html

现在我们要用它们来去除前景蒙版上的一些噪声。首先,我们将使用“Closing”来移除区域的间隙,然后用“Opening”移除1–2 px点,然后用“Dilate”使物体变得bolder。
def filter_mask(img):

    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))

    # Fill any small holes
    closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)
    # Remove noise
    opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)

    # Dilate to merge adjacent blobs
    dilation = cv2.dilate(opening, kernel, iterations=2)

    # threshold
    th = dilation[dilation < 240] = 0

    return th

然后我们的前景看上去是这样的

基于计算机视觉和OpenCV:创建一个能够计算道路交通流量的应用

物体轮廓的检测

为此,我们将使用标准的带有参数的cv2.findContours方法:
cv2.CV_RETR_EXTERNAL — get only outer contours.
cv2.CV_CHAIN_APPROX_TC89_L1 - use Teh-Chin chain approximation algorithm (faster)

def get_centroid(x, y, w, h):
    x1 = int(w / 2)
    y1 = int(h / 2)

    cx = x + x1
    cy = y + y1

    return (cx, cy)

def detect_vehicles(fg_mask, min_contour_width=35, min_contour_height=35):

    matches = []

    # finding external contours
    im, contours, hierarchy = cv2.findContours(
        fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)

    # filtering by with, height
    for (i, contour) in enumerate(contours):
        (x, y, w, h) = cv2.boundingRect(contour)
        contour_valid = (w >= min_contour_width) and (
            h >= min_contour_height)

        if not contour_valid:
            continue
        
        # getting center of the bounding box
        centroid = get_centroid(x, y, w, h)

        matches.append(((x, y, w, h), centroid))

    return matches

构建处理管道

现在我们将构建简单的处理管道:
class PipelineRunner(object):
    '''
        Very simple pipline.
        Just run passed processors in order with passing context from one to 
        another.
        You can also set log level for processors.
    '''

    def __init__(self, pipeline=None, log_level=logging.DEBUG):
        self.pipeline = pipeline or []
        self.context = {}
        self.log = logging.getLogger(self.__class__.__name__)
        self.log.setLevel(log_level)
        self.log_level = log_level
        self.set_log_level()

    def set_context(self, data):
        self.context = data

    def add(self, processor):
        if not isinstance(processor, PipelineProcessor):
            raise Exception(
                'Processor should be an isinstance of PipelineProcessor.')
        processor.log.setLevel(self.log_level)
        self.pipeline.append(processor)

    def remove(self, name):
        for i, p in enumerate(self.pipeline):
            if p.__class__.__name__ == name:
                del self.pipeline[i]
                return True
        return False

    def set_log_level(self):
        for p in self.pipeline:
            p.log.setLevel(self.log_level)

    def run(self):
        for p in self.pipeline:
            self.context = p(self.context)

        self.log.debug("Frame #%d processed.", self.context['frame_number'])

        return self.context


class PipelineProcessor(object):
    '''
        Base class for processors.
    '''

    def __init__(self):
        self.log = logging.getLogger(self.__class__.__name__)

输入构造函数(input constructor)将会获取一个将按顺序运行的处理器列表。每个处理器都有各自的工作。因此,现在让我们来创建轮廓检测处理器。
class ContourDetection(PipelineProcessor):
    '''
        Detecting moving objects.
        Purpose of this processor is to subtrac background, get moving objects
        and detect them with a cv2.findContours method, and then filter off-by
        width and height. 
        bg_subtractor - background subtractor isinstance.
        min_contour_width - min bounding rectangle width.
        min_contour_height - min bounding rectangle height.
        save_image - if True will save detected objects mask to file.
        image_dir - where to save images(must exist).        
    '''

    def __init__(self, bg_subtractor, min_contour_width=35, min_contour_height=35, save_image=False, image_dir='images'):
        super(ContourDetection, self).__init__()

        self.bg_subtractor = bg_subtractor
        self.min_contour_width = min_contour_width
        self.min_contour_height = min_contour_height
        self.save_image = save_image
        self.image_dir = image_dir

    def filter_mask(self, img, a=None):
        '''
            This filters are hand-picked just based on visual tests
        '''

        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2, 2))

        # Fill any small holes
        closing = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel)
        # Remove noise
        opening = cv2.morphologyEx(closing, cv2.MORPH_OPEN, kernel)

        # Dilate to merge adjacent blobs
        dilation = cv2.dilate(opening, kernel, iterations=2)

        return dilation

    def detect_vehicles(self, fg_mask, context):

        matches = []

        # finding external contours
        im2, contours, hierarchy = cv2.findContours(
            fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_TC89_L1)

        for (i, contour) in enumerate(contours):
            (x, y, w, h) = cv2.boundingRect(contour)
            contour_valid = (w >= self.min_contour_width) and (
                h >= self.min_contour_height)

            if not contour_valid:
                continue

            centroid = utils.get_centroid(x, y, w, h)

            matches.append(((x, y, w, h), centroid))

        return matches

    def __call__(self, context):
        frame = context['frame'].copy()
        frame_number = context['frame_number']

        fg_mask = self.bg_subtractor.apply(frame, None, 0.001)
        # just thresholding values
        fg_mask[fg_mask < 240] = 0
        fg_mask = self.filter_mask(fg_mask, frame_number)

        if self.save_image:
            utils.save_frame(fg_mask, self.image_dir +
                             "/mask_%04d.png" % frame_number, flip=False)

        context['objects'] = self.detect_vehicles(fg_mask, context)
        context['fg_mask'] = fg_mask

        return contex

把背景差分,滤波和检测的部分合并在一起。现在,让我们创建一个处理器,它将在不同的帧上链接检测到的对象,然后创建路径,并且还将计算出到达出口区的车辆数量。
 '''
        Counting vehicles that entered in exit zone.
        Purpose of this class based on detected object and local cache create
        objects pathes and count that entered in exit zone defined by exit masks.
        exit_masks - list of the exit masks.
        path_size - max number of points in a path.
        max_dst - max distance between two points.
    '''

    def __init__(self, exit_masks=[], path_size=10, max_dst=30, x_weight=1.0, y_weight=1.0):
        super(VehicleCounter, self).__init__()

        self.exit_masks = exit_masks

        self.vehicle_count = 0
        self.path_size = path_size
        self.pathes = []
        self.max_dst = max_dst
        self.x_weight = x_weight
        self.y_weight = y_weight

    def check_exit(self, point):
        for exit_mask in self.exit_masks:
            try:
                if exit_mask[point[1]][point[0]] == 255:
                    return True
            except:
                return True
        return False

    def __call__(self, context):
        objects = context['objects']
        context['exit_masks'] = self.exit_masks
        context['pathes'] = self.pathes
        context['vehicle_count'] = self.vehicle_count
        if not objects:
            return context

        points = np.array(objects)[:, 0:2]
        points = points.tolist()

        # add new points if pathes is empty
        if not self.pathes:
            for match in points:
                self.pathes.append([match])

        else:
            # link new points with old pathes based on minimum distance between
            # points
            new_pathes = []

            for path in self.pathes:
                _min = 999999
                _match = None
                for p in points:
                    if len(path) == 1:
                        # distance from last point to current
                        d = utils.distance(p[0], path[-1][0])
                    else:
                        # based on 2 prev points predict next point and calculate
                        # distance from predicted next point to current
                        xn = 2 * path[-1][0][0] - path[-2][0][0]
                        yn = 2 * path[-1][0][1] - path[-2][0][1]
                        d = utils.distance(
                            p[0], (xn, yn),
                            x_weight=self.x_weight,
                            y_weight=self.y_weight
                        )

                    if d < _min:
                        _min = d
                        _match = p

                if _match and _min <= self.max_dst:
                    points.remove(_match)
                    path.append(_match)
                    new_pathes.append(path)

                # do not drop path if current frame has no matches
                if _match is None:
                    new_pathes.append(path)

            self.pathes = new_pathes

            # add new pathes
            if len(points):
                for p in points:
                    # do not add points that already should be counted
                    if self.check_exit(p[1]):
                        continue
                    self.pathes.append([p])

        # save only last N points in path
        for i, _ in enumerate(self.pathes):
            self.pathes[i] = self.pathes[i][self.path_size * -1:]

        # count vehicles and drop counted pathes:
        new_pathes = []
        for i, path in enumerate(self.pathes):
            d = path[-2:]

            if (
                # need at list two points to count
                len(d) >= 2 and
                # prev point not in exit zone
                not self.check_exit(d[0][1]) and
                # current point in exit zone
                self.check_exit(d[1][1]) and
                # path len is bigger then min
                self.path_size <= len(path)
            ):
                self.vehicle_count += 1
            else:
                # prevent linking with path that already in exit zone
                add = True
                for p in path:
                    if self.check_exit(p[1]):
                        add = False
                        break
                if add:
                    new_pathes.append(path)

        self.pathes = new_pathes

        context['pathes'] = self.pathes
        context['objects'] = objects
        context['vehicle_count'] = self.vehicle_count

        self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count)

        return context

这个类有点复杂,让我们通过分部来完成。

基于计算机视觉和OpenCV:创建一个能够计算道路交通流量的应用

图像上的绿色掩膜是出口区,是我们计算车辆的地方。使用掩膜是因为它比使用向量算法更有效,也更简单。只要使用“二进制”操作来检查该区域的那个点就可以了。下面是我们如何设置它的方法:



EXIT_PTS = np.array([
    [[732, 720], [732, 590], [1280, 500], [1280, 720]],
    [[0, 400], [645, 400], [645, 0], [0, 0]]
])

base = np.zeros(SHAPE + (3,), dtype='uint8')
exit_mask = cv2.fillPoly(base, EXIT_PTS, (255, 255, 255))[:, :, 0]

在路径上将点连接起来。
new_pathes = []

for path in self.pathes:
    _min = 999999
    _match = None
    for p in points:
        if len(path) == 1:
            # distance from last point to current
            d = utils.distance(p[0], path[-1][0])
        else:
            # based on 2 prev points predict next point and calculate
            # distance from predicted next point to current
            xn = 2 * path[-1][0][0] - path[-2][0][0]
            yn = 2 * path[-1][0][1] - path[-2][0][1]
            d = utils.distance(
                p[0], (xn, yn),
                x_weight=self.x_weight,
                y_weight=self.y_weight
            )

        if d < _min:
            _min = d
            _match = p

    if _match and _min <= self.max_dst:
        points.remove(_match)
        path.append(_match)
        new_pathes.append(path)

    # do not drop path if current frame has no matches
    if _match is None:
        new_pathes.append(path)

self.pathes = new_pathes

# add new pathes
if len(points):
    for p in points:
        # do not add points that already should be counted
        if self.check_exit(p[1]):
            continue
        self.pathes.append([p])

# save only last N points in path
for i, _ in enumerate(self.pathes):
    self.pathes[i] = self.pathes[i][self.path_size * -1:]

在第一帧,我们只是把所有的点都添加到新的路径中。

接下来,如果len(path)==1,那么对于缓存中的每条路径,我们将尝试从新检测到的对象中找到点(质心),这将是到路径的最后一点的最小的欧氏距离。







如果len(path) > 1,那么在路径的最后两个点上,我们就会在同一直线上预测新的点,并在它和当前点之间找到最小距离。最小距离添加到当前路径的末尾,并从列表中删除。如果在这之后留下一些点,我们将会把它们作为新的路径添加。同时我们也限制了路径上的点的个数。
# count vehicles and drop counted pathes: 
 new_pathes = [] 
 for i, path in enumerate(self.pathes): 
     d = path[-2:] 
 
 
    if ( 
         # need at list two points to count 
         len(d) >= 2 and 
         # prev point not in exit zone 
         not self.check_exit(d[0][1]) and 
         # current point in exit zone 
         self.check_exit(d[1][1]) and 
         # path len is bigger then min 
         self.path_size <= len(path) 
     ): 
         self.vehicle_count += 1 
    else: 
         # prevent linking with path that already in exit zone 
         add = True 
         for p in path: 
             if self.check_exit(p[1]): 
                add = False 
                 break 
        if add: 
            new_pathes.append(path) 
 
 
 self.pathes = new_pathes 
 
 
 context['pathes'] = self.pathes 
 context['objects'] = objects 
 context['vehicle_count'] = self.vehicle_count 
 
 
 self.log.debug('#VEHICLES FOUND: %s' % self.vehicle_count) 

 
 return context



现在我们将尝试计算进入出口区的车辆。要做到这一点,我们只需要在路径中取两个最后的点,并检查它们在出口区域中的最后一个点,以及之前没有的点,并且检查len(path)是否应该大于限制。后面的部分是防止将新点与出口区的点反向连接起来。
最后两个处理器是CSV写入器,用于创建报告CSV文件,以及用于调试和图片的可视化。
class CsvWriter(PipelineProcessor):

    def __init__(self, path, name, start_time=0, fps=15):
        super(CsvWriter, self).__init__()

        self.fp = open(os.path.join(path, name), 'w')
        self.writer = csv.DictWriter(self.fp, fieldnames=['time', 'vehicles'])
        self.writer.writeheader()
        self.start_time = start_time
        self.fps = fps
        self.path = path
        self.name = name
        self.prev = None

    def __call__(self, context):
        frame_number = context['frame_number']
        count = _count = context['vehicle_count']

        if self.prev:
            _count = count - self.prev

        time = ((self.start_time + int(frame_number / self.fps)) * 100 
                + int(100.0 / self.fps) * (frame_number % self.fps))
        self.writer.writerow({'time': time, 'vehicles': _count})
        self.prev = count

        return context


class Visualizer(PipelineProcessor):

    def __init__(self, save_image=True, image_dir='images'):
        super(Visualizer, self).__init__()

        self.save_image = save_image
        self.image_dir = image_dir

    def check_exit(self, point, exit_masks=[]):
        for exit_mask in exit_masks:
            if exit_mask[point[1]][point[0]] == 255:
                return True
        return False

    def draw_pathes(self, img, pathes):
        if not img.any():
            return

        for i, path in enumerate(pathes):
            path = np.array(path)[:, 1].tolist()
            for point in path:
                cv2.circle(img, point, 2, CAR_COLOURS[0], -1)
                cv2.polylines(img, [np.int32(path)], False, CAR_COLOURS[0], 1)

        return img

    def draw_boxes(self, img, pathes, exit_masks=[]):
        for (i, match) in enumerate(pathes):

            contour, centroid = match[-1][:2]
            if self.check_exit(centroid, exit_masks):
                continue

            x, y, w, h = contour

            cv2.rectangle(img, (x, y), (x + w - 1, y + h - 1),
                          BOUNDING_BOX_COLOUR, 1)
            cv2.circle(img, centroid, 2, CENTROID_COLOUR, -1)

        return img

    def draw_ui(self, img, vehicle_count, exit_masks=[]):

        # this just add green mask with opacity to the image
        for exit_mask in exit_masks:
            _img = np.zeros(img.shape, img.dtype)
            _img[:, :] = EXIT_COLOR
            mask = cv2.bitwise_and(_img, _img, mask=exit_mask)
            cv2.addWeighted(mask, 1, img, 1, 0, img)

        # drawing top block with counts
        cv2.rectangle(img, (0, 0), (img.shape[1], 50), (0, 0, 0), cv2.FILLED)
        cv2.putText(img, ("Vehicles passed: {total} ".format(total=vehicle_count)), (30, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 1)
        return img

    def __call__(self, context):
        frame = context['frame'].copy()
        frame_number = context['frame_number']
        pathes = context['pathes']
        exit_masks = context['exit_masks']
        vehicle_count = context['vehicle_count']

        frame = self.draw_ui(frame, vehicle_count, exit_masks)
        frame = self.draw_pathes(frame, pathes)
        frame = self.draw_boxes(frame, pathes, exit_masks)

        utils.save_frame(frame, self.image_dir +
                         "/processed_%04d.png" % frame_number)

        return context

CSV写入器是按时间保存数据的,因为我们需要将它进一步分析。因此,我使用这个公式向unixtimestamp添加额外的帧计时:











time = ((self.start_time + int(frame_number / self.fps)) * 100 
        + int(100.0 / self.fps) * (frame_number % self.fps))



所以在起始时间=1 000 000和fps=10时,会得到这样的结果
帧1=1 000000010
帧1=1 000000020

在得到完整的csv报告之后,你可以按照你的需要聚合这些数据。




结论


正如你们所看到的,这个操作并不像很多人想象的那么难。但是,如果你运行脚本,会看到这个解决方案并不理想,并且在前景对象的重叠问题上存在问题,也没有对车辆类型进行分类。但是,因为摄像机所处于位置很好(在路面之上),所以它的准确率相当高。这告诉我们,只要用对了方法,即使是使用的简单的算法也能得到很好的结果。



那么,我们能做些什么来解决当前的问题呢? 一种方法是尝试添加一些额外的滤波,试图分离对象以获得更好的检测。另一种方法是使用更复杂的算法,比如深度卷积网络。
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
PilotAILabs
30000~60000/年 深圳市
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
Maluuba
20000~40000/月
写评论取消
回复取消