Image Processing/Cv2

opencv 를 이용해서 rtsp 동영상 받기(ft. multithreading)

jinmc 2021. 6. 18. 15:29
반응형

opencv를 이용해서 rtsp를 이용해서 영상을 받고, 그 영상을 display하는 코드 입니다.

 

import os

os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;udp"

import numpy as np
import cv2 as cv
cap = cv.VideoCapture('rtsp://192.168.0.2:8554/')
while cap.isOpened():
    ret, frame = cap.read()
    # if frame is read correctly ret is True
    if not ret:
        print("Can't receive frame (stream end?). Exiting ...")
        break
    cv.imshow('frame', frame)
    if cv.waitKey(1) == ord('q'):
        break
cap.release()
cv.destroyAllWindows()

 

하지만 이건 끊임없이 들어오는 영상을 처리하기 위해서 필요하고, 

만약 여러 개의 rtsp url이 들어온다든지, 하나의 rtsp url에서 여러 개의 동영상들이 일정한 interval을 끊긴 이후에 들어오는 경우에는 어떻게 해야 할까요? 정답은 multithreading을 사용하면 됩니다.

 

Main file :

 

from datasets import LoadStreams

import threading
import os
import logging
import cv2
import torch
import time
logger = logging.getLogger(__name__)


def select_device(device='', batch_size=None):
    # device = 'cpu' or '0' or '0,1,2,3'
    cpu_request = device.lower() == 'cpu'
    if device and not cpu_request:  # if device requested other than 'cpu'
        os.environ['CUDA_VISIBLE_DEVICES'] = device  # set environment variable
        assert torch.cuda.is_available(), f'CUDA unavailable, invalid device {device} requested'  # check availablity

    cuda = False if cpu_request else torch.cuda.is_available()
    if cuda:
        c = 1024 ** 2  # bytes to MB
        ng = torch.cuda.device_count()
        if ng > 1 and batch_size:  # check that batch_size is compatible with device_count
            assert batch_size % ng == 0, f'batch-size {batch_size} not multiple of GPU count {ng}'
        x = [torch.cuda.get_device_properties(i) for i in range(ng)]
        s = f'Using torch {torch.__version__} '
        for i, d in enumerate((device or '0').split(',')):
            if i == 1:
                s = ' ' * len(s)
            logger.info(f"{s}CUDA:{d} ({x[i].name}, {x[i].total_memory / c}MB)")
    else:
        logger.info(f'Using torch {torch.__version__} CPU')

    logger.info('')  # skip a line
    return torch.device('cuda:0' if cuda else 'cpu')


def detect(rtsp_url):
    dataset = LoadStreams(rtsp_url)
    device = select_device('')
    count = 0
    view_img = True
    # img = torch.zeros((1, 3, imgsz, imgsz), device=device)  # init img

    try:
        for frame_idx, (path, img, im0s, vid_cap) in enumerate(dataset):  # for every frame
            count += 1
            im0 = im0s[0].copy()
            if view_img:
                cv2.imshow(str(path), im0)
                # if cv2.waitKey(1) == ord('q'):  # q to quit
                #     raise StopIteration
    except:
        print("finish execption")
        dataset.stop()
    return "good"

if __name__ == '__main__':
    rtsp_url = "rtsp://192.168.0.2:8554/"
    while True:
        for thread in threading.enumerate():
            print(thread.name)
        print(detect(rtsp_url))

 

Dataset Class File : 

 

import glob
import logging
import math
import os
import random
import shutil
import time
import re
from itertools import repeat
from multiprocessing.pool import ThreadPool
from pathlib import Path
from threading import Thread

import cv2
import numpy as np
import torch
os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;udp"

class LoadStreams:  # multiple IP or RTSP cameras
    def __init__(self, s='streams.txt', img_size=640):

        self.mode = 'stream'
        self.img_size = img_size
        self.capture = None
        self.stopFlag = False

        # sources = [sources]

        # n = len(s)
        self.imgs = [None]
        self.sources = [clean_str(x) for x in s]  # clean source names for later
        # for i, s in enumerate(sources):
        # Start the thread to read frames from the video stream
        # print('%g/%g: %s... ' % (i + 1, n, s), end='')
        cap = cv2.VideoCapture(eval(s) if s.isnumeric() else s)
        assert cap.isOpened(), 'Failed to open %s' % s
        w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = cap.get(cv2.CAP_PROP_FPS) % 100
        _, self.imgs[0] = cap.read()  # guarantee first frame
        thread = Thread(target=self.update, args=([0, cap]), daemon=True)
        print(' success (%gx%g at %.2f FPS).' % (w, h, fps))
        thread.start()
        print('')  # newline
        self.capture = cap

        # check for common shapes
        s = np.stack([letterbox(x, new_shape=self.img_size)[0].shape for x in self.imgs], 0)  # inference shapes
        self.rect = np.unique(s, axis=0).shape[0] == 1  # rect inference if all shapes equal
        if not self.rect:
            print('WARNING: Different stream shapes detected. For optimal performance supply similarly-shaped streams.')

    def update(self, index, cap):
        # Read next stream frame in a daemon thread
        n = 0
        while cap.isOpened() and not self.stopFlag:
            n += 1
            # _, self.imgs[index] = cap.read()
            cap.grab()
            if n == 4:  # read every 4th frame
                _, self.imgs[index] = cap.retrieve()
                n = 0
            time.sleep(0.01)  # wait time

    def __iter__(self):
        self.count = -1
        return self

    def __next__(self):
        self.count += 1
        img0 = self.imgs.copy()
        if cv2.waitKey(1) == ord('q'):  # q to quit
            self.capture.release()
            print("raise stopiteration")
            raise StopIteration

        # Letterbox
        img = [letterbox(x, new_shape=self.img_size, auto=self.rect)[0] for x in img0]

        # Stack
        img = np.stack(img, 0)

        # Convert
        img = img[:, :, :, ::-1].transpose(0, 3, 1, 2)  # BGR to RGB, to bsx3x416x416
        img = np.ascontiguousarray(img)

        return self.sources, img, img0, None

    def __len__(self):
        return 0  # 1E12 frames = 32 streams at 30 FPS for 30 years

    def stop(self):
        self.stopFlag = True
        print("stop!")


def clean_str(s):
    # Cleans a string by replacing special characters with underscore _
    return re.sub(pattern="[|@#!¡·$€%&()=?¿^*;:,¨´><+]", repl="_", string=s)


def letterbox(img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True):
    # Resize image to a 32-pixel-multiple rectangle https://github.com/ultralytics/yolov3/issues/232
    shape = img.shape[:2]  # current shape [height, width]
    if isinstance(new_shape, int):
        new_shape = (new_shape, new_shape)

    # Scale ratio (new / old)
    r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
    if not scaleup:  # only scale down, do not scale up (for better test mAP)
        r = min(r, 1.0)

    # Compute padding
    ratio = r, r  # width, height ratios
    new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r))
    dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]  # wh padding
    if auto:  # minimum rectangle
        dw, dh = np.mod(dw, 32), np.mod(dh, 32)  # wh padding
    elif scaleFill:  # stretch
        dw, dh = 0.0, 0.0
        new_unpad = (new_shape[1], new_shape[0])
        ratio = new_shape[1] / shape[1], new_shape[0] / shape[0]  # width, height ratios

    dw /= 2  # divide padding into 2 sides
    dh /= 2

    if shape[::-1] != new_unpad:  # resize
        img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR)
    top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
    left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
    img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color)  # add border
    return img, ratio, (dw, dh)

여기서 대부분의 코드는 yolov5 github에서 가져온 것입니다. 

https://github.com/ultralytics/yolov5 

 

하지만 중요한 부분은, detect function을 계속 부를 때, thread를 계속 생성하고, 

그 thread가 무한루프를 계속 돌기 때문에 그 무한루프를 멈춰줘야지 thread를 stop하고, thread가 계속 쌓이는 걸 방지할 수 있습니다.

 

제가 add한 코드는 다음과 같습니다.

 

self.stopFlag = False

def stop(self):

    self.stopFlag = True

while cap.isOpened() and not self.stopFlag:

 

그리고 나중에 에러가 생겼을 때 stop function을 call 해주는 것 뿐이었습니다.

multi threading을 이용해서 잘 돌아가는 걸 확인하였고, 좋은 경험이고 많은걸 배웠습니다.

 

아 그 전에 시행착오로 def stop()할 때, cap.release()를 했었는데, 

cap.release는 비디오에 관련된 하드웨어 메모리 릴리즈와 포인터 릴리즈를 하며,

그런 경우 while loop에서 비디오를 찾는데 비디오 관련된 메모리가 missing하기 때문에,

segmentation fault가 일어납니다. 

 

반응형