blob: 953da51d4b9058c02150807ccd07cf6f80a6012e [file] [log] [blame]
"""
SPDX-FileCopyrightText: 2022-present Intel Corporation
SPDX-FileCopyrightText: 2020-present Open Networking Foundation <info@opennetworking.org>
SPDX-License-Identifier: Apache-2.0
"""
import paho.mqtt.client as mqtt
import time
import threading
import logging as log
from multiprocessing import Process, Queue, Value, Lock
from roc import Roc
import config
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident
class CameraEvent(object):
"""An Event-like class that signals all active clients when a new frame is
available.
"""
def __init__(self):
self.events = {}
def wait(self):
"""Invoked from each client's thread to wait for the next frame."""
ident = get_ident()
if ident not in self.events:
# this is a new client
# add an entry for it in the self.events dict
# each entry has two elements, a threading.Event() and a timestamp
self.events[ident] = [threading.Event(), time.time()]
return self.events[ident][0].wait(timeout=10.0)
def set(self):
"""Invoked by the camera thread when a new frame is available."""
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].isSet():
# if this client's event is not set, then set it
# also update the last set timestamp to now
event[0].set()
event[1] = now
else:
# if the client's event is already set, it means the client
# did not process a previous frame
# if the event stays set for more than 2 minutes, then assume
# the client is gone and remove it
if now - event[1] > 120:
remove = ident
#if remove:
# del self.events[remove]
def clear(self):
"""Invoked from each client's thread after a frame was processed."""
self.events[get_ident()][0].clear()
class BaseCamera(object):
frame = {}
event = {}
deviceQ = {}
cameras = [0]*len(config.cameras)
lock = Lock()
activity_counter = Value('i', 0)
def __init__(self, device, url, keycloak, enterprise, site, user, password, mbrlow, mbrhigh, devicegroup, noroc):
self.mqttBroker = "localhost"
self.device = device
self.mbrlow = mbrlow
self.mbrhigh = mbrhigh
self.devicegroup = devicegroup
self.noroc = noroc
self.roc = Roc(url, keycloak, user, password, enterprise, site)
"""Start the background camera process if it isn't running yet."""
if BaseCamera.cameras[int(self.device)] == 0:
BaseCamera.cameras[int(self.device)] = 1
BaseCamera.event[self.device] = CameraEvent()
self.last_detected = None
self.timer = None
self.detected = False
BaseCamera.deviceQ[self.device] = Queue(100)
self.set_resolution(self.device, "low")
threading.Thread(target=self._thread).start()
# start background frame process
Process(target=self._process).start()
# wait until frames are available
_ = self.get_frame()
log.info("Start camera {} feed to {}".format(self.device, self.client))
def get_frame(self):
"""Return the current camera frame."""
while not BaseCamera.event[self.device].wait():
log.info("get_frame timeout device:{}, thread:{}".format(self.device, get_ident()))
BaseCamera.event[self.device].clear()
return BaseCamera.frame[self.device]
def frames(self):
""""Generator that returns frames from the camera."""
raise NotImplementedError('Must be implemented by subclasses.')
def _thread(self):
while True:
BaseCamera.frame[self.device] = BaseCamera.deviceQ[self.device].get(block=True)
BaseCamera.event[self.device].set() # send signal to clients
time.sleep(0)
def _process(self):
"""Camera background process."""
frames_iterator = self.frames()
for frame in frames_iterator:
BaseCamera.deviceQ[self.device].put(frame, block=True)
def person_detected(self, num):
self.last_detected = time.time()
if not self.detected:
BaseCamera.lock.acquire()
BaseCamera.activity_counter.value += 1
BaseCamera.lock.release()
self.set_resolution_high()
if not self.noroc:
self.roc.set_mbr(self.devicegroup, self.mbrhigh)
self.detected = True
self.start_timer()
def no_person_detected(self):
self.detected = False
self.timer = None
BaseCamera.lock.acquire()
BaseCamera.activity_counter.value -= 1
if BaseCamera.activity_counter.value <= 0:
BaseCamera.activity_counter.value = 0
self.set_resolution_low()
if not self.noroc:
self.roc.set_mbr(self.devicegroup, self.mbrlow)
BaseCamera.lock.release()
def start_timer(self):
# log.info("Start timer for device {}".format(device))
self.timer = threading.Timer(10.0, self.timer_expiry)
self.timer.start()
def set_resolution_high(self):
for device in range(0, len(config.cameras)):
self.set_resolution(str(device), "high")
def set_resolution_low(self):
for device in range(0, len(config.cameras)):
self.set_resolution(str(device), "low")
def set_resolution(self, device, level):
log.info("Setting camera {} resolution to {}".format(device, level))
client = mqtt.Client()
client.connect(self.mqttBroker)
client.publish("camera/" + str(5000 + int(device)), level)
def timer_expiry(self):
now = time.time()
diff = now - self.last_detected
log.debug("timer_expiry() - now:{}, last_detected:{}".format(now, self.last_detected))
if diff > 5.0:
self.no_person_detected()
else:
# Restart timer since person detected not too long back
self.start_timer()