如何完全控制与烧瓶应用程序并行运行的过程(启动/终止)? [英] how to have full control over a process (start/terminate) which runs in parallel with flask application?

查看:51
本文介绍了如何完全控制与烧瓶应用程序并行运行的过程(启动/终止)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我的应用程序体系结构:

This is my application architecture:

在我的代码中,有一个 pedestrian.py 文件,该文件使用while循环从rtsp链接中读取帧,并在进行行人检测过程之后读取(在此

In my code there is a pedestrian.py file which uses a while loop to read frames from rtsp link and after doing pedestrian detection process (available in this link), it caches the frame in Redis.

(请注意,在循环中,每次将输出帧替换为循环中的前一个输出.这意味着在任何时候,redis中仅存在一帧.)

(please note that in the loop each time the output frame is replaced with the previous output from loop. it means that there exists only one frame in redis in any moment.)

然后在flask应用程序中,我从redis中读取已处理的帧,并将其发送给客户端.

Then in flask application, I read processed frame from redis and send it for the clients.

这是我的行人检测代码:

This is the code for my pedestrian detection:

from redis import Redis
from concurrent.futures import ThreadPoolExecutor
import cv2
import torch
from os import environ


r = Redis('111.222.333.444')

class RealTimeTracking(object):
    """
    This class is built to get frame from rtsp link and continuously
    save each frame in the output directory. then we use flask to give it
    as service to client.
    Args:
        args: parse_args inputs
        cfg: deepsort dict and yolo-model cfg from server_cfg file

    """

    def __init__(self, cfg, args):
        # Create a VideoCapture object
        self.cfg = cfg
        self.args = args
        use_cuda = self.args.use_cuda and torch.cuda.is_available()

        if not use_cuda:
            raise UserWarning("Running in cpu mode!")

        self.detector = build_detector(cfg, use_cuda=use_cuda)
        self.deepsort = build_tracker(cfg, use_cuda=use_cuda)
        self.class_names = self.detector.class_names

        self.vdo = cv2.VideoCapture(self.args.input)
        self.status, self.frame = None, None
        self.total_frames = int(cv2.VideoCapture.get(self.vdo, cv2.CAP_PROP_FRAME_COUNT))
        self.im_width = int(self.vdo.get(cv2.CAP_PROP_FRAME_WIDTH))
        self.im_height = int(self.vdo.get(cv2.CAP_PROP_FRAME_HEIGHT))

        self.output_frame = None

        self.thread = ThreadPoolExecutor(max_workers=1)
        self.thread.submit(self.update)
        print('streaming started ...')

    def update(self):
        while True:
            if self.vdo.isOpened():
                (self.status, self.frame) = self.vdo.read()

    def run(self):
        while True:
            try:
                if self.status:
                    frame = self.frame.copy()
                    # frame = cv2.resize(frame, (640, 480))
                    self.detection(frame=frame)
                    frame_to_bytes = cv2.imencode('.jpg', frame)[1].tobytes()
                    r.set('frame', frame_to_bytes)
            except AttributeError:
                pass

    def detection(self, frame):
        im = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        # do detection
        bbox_xywh, cls_conf, cls_ids = self.detector(im)
        if bbox_xywh is not None:
            # select person class
            mask = cls_ids == 0

            bbox_xywh = bbox_xywh[mask]
            bbox_xywh[:, 3:] *= 1.2  # bbox dilation just in case bbox too small
            cls_conf = cls_conf[mask]

            # do tracking
            outputs = self.deepsort.update(bbox_xywh, cls_conf, im)

            # draw boxes for visualization
            if len(outputs) > 0:
                self.draw_boxes(img=frame, output=outputs)

    @staticmethod
    def draw_boxes(img, output, offset=(0, 0)):
        for i, box in enumerate(output):
            x1, y1, x2, y2, identity = [int(ii) for ii in box]
            x1 += offset[0]
            x2 += offset[0]
            y1 += offset[1]
            y2 += offset[1]

            # box text and bar
            color = compute_color_for_labels(identity)
            label = '{}{:d}'.format("", identity)
            t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 2, 2)[0]
            cv2.rectangle(img, (x1, y1), (x2, y2), color, 3)
            cv2.rectangle(img, (x1, y1), (x1 + t_size[0] + 3, y1 + t_size[1] + 4), color, -1)
            cv2.putText(img, label, (x1, y1 + t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 2, [255, 255, 255], 2)
        return img


if __name__ == "__main__":

    args = parse_args() # argument: --rtsp_link = 'rtsp://me@111.222.333.444/Channels/105'
    cfg = get_config()
    cfg.merge_from_dict(model)
    cfg.merge_from_dict(deep_sort_dict)
    vdo_trk = RealTimeTracking(cfg, args)
    vdo_trk.run()

这是烧瓶服务器 app.py 的代码:

This is the code for flask server app.py:

from dotenv import load_dotenv
from time import sleep
from os import getenv
from os.path import join
import subprocess
from flask import Response, Flask

from config.config import DevelopmentConfig
from redis import Redis

r = Redis('111.222.333.444')
app = Flask(__name__)

def gen():
    while True:
        frame = r.get('frame')
        if frame is not None:
            yield b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'

@app.route('/')
def video_feed():
    """Video streaming route. Put this in the src attribute of an img tag."""
    return Response(gen(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')


if __name__ == '__main__':
    load_dotenv()
    app.config.from_object(DevelopmentConfig)
    cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://me@111.222.333.444/Channels/105']
    p = subprocess.Popen(cmd)
    sleep(6)
    app.run()

此代码可在我的系统中完美运行.

This code runs perfectly in my system.

如您所见,在运行烧瓶服务器之前,我使用cmd命令在rtsp链接上运行行人检测.

As you can see I use the cmd command to run the pedestrian detection on rtsp link before running flask server.

但是我真正需要完成的任务是能够在不同的相机之间切换.我的意思是,在Flask服务器运行时,我希望能够在请求到来的任何时候终止 pedestrian.py 进程,并使用新的-rtsp_link 参数(切换到另一台摄像机).

But what I really need for this task is to be able to switch between different cameras. I mean while the flask server is running, I want to be able to terminate the pedestrian.py process in any moment the request comes and restart the pedestrian.py with new --rtsp_linkargument (switch to another camera).

类似这样的东西:

@app.route('/cam1'):
    def cam1():
        stop('pedestrian.py')
        cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://me@111.222.333.444/Channels/101']
    p = subprocess.Popen(cmd)

@app.route('/cam2'):
    def cam2():
        stop('pedestrian.py')
        cmd = ['python', join("my_project.dir", "pedestrian.py"), '--rtsp_link=rtsp://me@111.222.333.444/Channels/110']
    p = subprocess.Popen(cmd)

我的烧瓶知识可能不够好.我可能需要为此使用post方法和身份验证.

My flask knowledge might not be good enough. I probably need to use post method and also authentication for that.

您能告诉我如何在此代码中实现这种方法吗?

Would you tell me how can I implement such thing in this code?

推荐答案

我找到了一种自动启动/停止行人检测的方法.更多详细信息,请参见我的存储库:

I found a way to automate start/stop the pedestrian detection. more details available in my repo:

从os.path导入连接从os import getenv,环境从dotenv导入load_dotenv导入argparse从线程导入线程

from os.path import join from os import getenv, environ from dotenv import load_dotenv import argparse from threading import Thread

from redis import Redis
from flask import Response, Flask, jsonify, request, abort

from rtsp_threaded_tracker import RealTimeTracking
from server_cfg import model, deep_sort_dict
from config.config import DevelopmentConfig
from utils.parser import get_config

redis_cache = Redis('127.0.0.1')
app = Flask(__name__)
environ['in_progress'] = 'off'


def parse_args():
    """
    Parses the arguments
    Returns:
        argparse Namespace
    """
    assert 'project_root' in environ.keys()
    project_root = getenv('project_root')
    parser = argparse.ArgumentParser()

    parser.add_argument("--input",
                        type=str,
                        default=getenv('camera_stream'))

    parser.add_argument("--model",
                        type=str,
                        default=join(project_root,
                                     getenv('model_type')))

    parser.add_argument("--cpu",
                        dest="use_cuda",
                        action="store_false", default=True)
    args = parser.parse_args()

    return args


def gen():
    """
    Returns: video frames from redis cache
    """
    while True:
        frame = redis_cache.get('frame')
        if frame is not None:
            yield b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'


def pedestrian_tracking(cfg, args):
    """
    starts the pedestrian detection on rtsp link
    Args:
        cfg:
        args:
    Returns:
    """
    tracker = RealTimeTracking(cfg, args)
    tracker.run()


def trigger_process(cfg, args):
    """
    triggers pedestrian_tracking process on rtsp link using a thread
    Args:
        cfg:
        args:
    Returns:
    """
    try:
        t = Thread(target=pedestrian_tracking, args=(cfg, args))
        t.start()
        return jsonify({"message": "Pedestrian detection started successfully"})
    except Exception:
        return jsonify({'message': "Unexpected exception occured in process"})


@app.errorhandler(400)
def bad_argument(error):
    return jsonify({'message': error.description['message']})


# Routes
@app.route('/stream', methods=['GET'])
def stream():
    """
    Provides video frames on http link
    Returns:
    """
    return Response(gen(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')


@app.route("/run", methods=['GET'])
def process_manager():
    """
    request parameters:
    run (bool): 1  -> start the pedestrian tracking
                0  -> stop it
    camera_stream: str -> rtsp link to security camera
    :return:
    """
    # data = request.args
    data = request.args
    status = data['run']
    status = int(status) if status.isnumeric() else abort(400, {'message': f"bad argument for run {data['run']}"})
    if status == 1:
        # if pedestrian tracking is not running, start it off!
        try:
            if environ.get('in_progress', 'off') == 'off':
                global cfg, args
                vdo = data.get('camera_stream')
                if vdo is not None:
                    args.input = int(vdo)
                environ['in_progress'] = 'on'
                return trigger_process(cfg, args)
            elif environ.get('in_progress') == 'on':
                # if pedestrian tracking is running, don't start another one (we are short of gpu resources)
                return jsonify({"message": " Pedestrian detection is already in progress."})
        except Exception:
            environ['in_progress'] = 'off'
            return abort(503)
    elif status == 0:
        if environ.get('in_progress', 'off') == 'off':
            return jsonify({"message": "pedestrian detection is already terminated!"})
        else:
            environ['in_progress'] = 'off'
            return jsonify({"message": "Pedestrian detection terminated!"})


if __name__ == '__main__':
    load_dotenv()
    app.config.from_object(DevelopmentConfig)

    # BackProcess Initialization
    args = parse_args()
    cfg = get_config()
    cfg.merge_from_dict(model)
    cfg.merge_from_dict(deep_sort_dict)
    # Start the flask app
    app.run()   

这篇关于如何完全控制与烧瓶应用程序并行运行的过程(启动/终止)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆