blob: 9d4be6a7de6eec8124015750f6d4bd968481cdd6 [file] [log] [blame]
#!/usr/bin/env python
"""
SPDX-FileCopyrightText: 2022-present Intel Corporation
SPDX-FileCopyrightText: 2020-present Open Networking Foundation <info@opennetworking.org>
SPDX-License-Identifier: Apache-2.0
"""
import os
import logging as log
import sys
import flask
from argparse import ArgumentParser, SUPPRESS
import config
from person_detection import Camera
from roc import Roc
app = flask.Flask(__name__)
@app.route('/')
def index():
"""Video streaming home page."""
global args
devices = {str(x): "Camera "+str(x) for x in range(args.num)}
log.info("{} - connected".format(flask.request.remote_addr))
return flask.render_template('index.html', devices=devices)
def gen(camera):
"""Video streaming generator function."""
while True:
frame = camera.get_frame()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
@app.route('/video_feed/<device>')
def video_feed(device):
"""Video streaming route. Put this in the src attribute of an img tag."""
global args
log.debug("{} - video feed {}".format(flask.request.remote_addr, device))
camera = Camera(device, flask.request.remote_addr, args)
return flask.Response(gen(camera),
mimetype='multipart/x-mixed-replace; boundary=frame')
def name_to_port(name):
return int(name)
def build_argparser():
parser = ArgumentParser(add_help=False)
args = parser.add_argument_group('Options')
args.add_argument('-h', '--help', action='help', default=SUPPRESS,
help='Show this help message and exit.')
args.add_argument("-m", "--model",
default="models/intel/person-detection-retail-0013/FP32/person-detection-retail-0013.xml",
help="Path to model", type=str)
args.add_argument("-i", "--input",
help="Video input", default="gstreamer", type=str)
args.add_argument("-pt", "--prob_threshold", default=0.75,
help="Probability threshold for detection", type=float)
args.add_argument("--noroc", action='store_true', help="No ROC")
args.add_argument("--key", help="ROC api key", type=str)
args.add_argument("--url", default=config.ROCURL,
help="ROC url", type=str)
args.add_argument("--keycloak", default=config.KEYCLOAKURL,
help="Keycloak url", type=str)
args.add_argument("--enterprise", default=config.ENTERPRISE,
help="Enterprise ID", type=str)
args.add_argument("--site", default=config.SITE,
help="Site ID", type=str)
args.add_argument("--devicegroup", default=config.DEVICEGROUP,
help="Camera device group", type=str)
args.add_argument("--user", default=config.ROCUSER,
help="ROC username", type=str)
args.add_argument("--password", default=config.ROCPASSWORD,
help="ROC password", type=str)
args.add_argument("--num", default=config.NUMDEVICES,
help="Number of camera devices",
type=int)
args.add_argument("--mqttip", default=config.MQTTIP,
help="IP address of MQTT",
type=str)
args.add_argument("--mbrlow", help="Low range of MBR", default=7000000,
type=int)
args.add_argument("--mbrhigh", help="High range of MBR", default=10000000,
type=int)
return parser
if __name__ == '__main__':
log.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=log.DEBUG,
datefmt='%Y-%m-%d %H:%M:%S',
stream=sys.stdout)
log.debug("Starting person detection app")
args = build_argparser().parse_args()
if not args.noroc:
key = args.key
if key is None:
if args.user is not None and args.password is not None:
roc = Roc(args.url, args.keycloak, args.user, args.password,
args.enterprise, args.site)
key = roc.get_key()
else:
log.error("Either key or user/password required")
sys.exit()
log.info("Device group:{} mbr:{}(low)".format(args.devicegroup,
args.mbrlow))
roc.set_mbr(args.devicegroup, args.mbrlow)
app.run(host='0.0.0.0', debug=True)