Add support for multiple web clients
Change-Id: I7efb6c179a18c6baace08db544fee5e15af28056
diff --git a/person_detection/base_camera.py b/person_detection/base_camera.py
index e0018f5..53525ab 100644
--- a/person_detection/base_camera.py
+++ b/person_detection/base_camera.py
@@ -5,17 +5,72 @@
import threading
import logging as log
from multiprocessing import Process, Queue, Value, Array, Lock
+import threading
+from collections import defaultdict
from roc import Roc
import config
+try:
+ from greenlet import getcurrent as get_ident
+except ImportError:
+ try:
+ from thread import get_ident
+ except ImportError:
+ from _thread import get_ident
+
+
+class CameraEvent(object):
+ """An Event-like class that signals all active clients when a new frame is
+ available.
+ """
+ def __init__(self):
+ self.events = {}
+
+ def wait(self):
+ """Invoked from each client's thread to wait for the next frame."""
+ ident = get_ident()
+ if ident not in self.events:
+ # this is a new client
+ # add an entry for it in the self.events dict
+ # each entry has two elements, a threading.Event() and a timestamp
+ self.events[ident] = [threading.Event(), time.time()]
+ return self.events[ident][0].wait(timeout=10.0)
+
+ def set(self):
+ """Invoked by the camera thread when a new frame is available."""
+ now = time.time()
+ remove = None
+ for ident, event in self.events.items():
+ if not event[0].isSet():
+ # if this client's event is not set, then set it
+ # also update the last set timestamp to now
+ event[0].set()
+ event[1] = now
+ else:
+ # if the client's event is already set, it means the client
+ # did not process a previous frame
+ # if the event stays set for more than 2 minutes, then assume
+ # the client is gone and remove it
+ if now - event[1] > 120:
+ remove = ident
+ #if remove:
+ # del self.events[remove]
+
+ def clear(self):
+ """Invoked from each client's thread after a frame was processed."""
+ self.events[get_ident()][0].clear()
+
class BaseCamera(object):
- process = {} # background process that reads frames from camera
- frame = {} # frame queue
- activity_counter = Value('i', 0)
- cameras = Array('i', [0]*len(config.cameras))
+ frame = {}
+ event = {}
+
+ deviceQ = {}
+ cameras = [0]*len(config.cameras)
+
lock = Lock()
+ activity_counter = Value('i', 0)
def __init__(self, device, user, password, mbrlow, mbrhigh, devicegroup, noroc):
self.mqttBroker = "localhost"
@@ -26,38 +81,47 @@
self.noroc = noroc
self.roc = Roc(user, password)
+
"""Start the background camera process if it isn't running yet."""
if BaseCamera.cameras[int(self.device)] == 0:
BaseCamera.cameras[int(self.device)] = 1
+ BaseCamera.event[self.device] = CameraEvent()
self.last_detected = None
self.timer = None
self.detected = False
- BaseCamera.frame[self.device] = Queue(100)
+ BaseCamera.deviceQ[self.device] = Queue(100)
self.set_resolution(self.device, "low")
+ threading.Thread(target=self._thread).start()
# start background frame process
- BaseCamera.process[self.device] = Process(target=self._process, args=(self.device))
- BaseCamera.process[self.device].start()
+ Process(target=self._process).start()
# wait until frames are available
_ = self.get_frame()
log.info("Start camera {} feed to {}".format(self.device, self.client))
def get_frame(self):
"""Return the current camera frame."""
-
- # blocks
- return BaseCamera.frame[self.device].get(block=True)
+ while not BaseCamera.event[self.device].wait():
+ log.info("get_frame timeout device:{}, thread:{}".format(self.device, get_ident()))
+ BaseCamera.event[self.device].clear()
+ return BaseCamera.frame[self.device]
def frames(self):
""""Generator that returns frames from the camera."""
raise NotImplementedError('Must be implemented by subclasses.')
- def _process(self, device):
+ def _thread(self):
+ while True:
+ BaseCamera.frame[self.device] = BaseCamera.deviceQ[self.device].get(block=True)
+ BaseCamera.event[self.device].set() # send signal to clients
+ time.sleep(0)
+
+
+ def _process(self):
"""Camera background process."""
frames_iterator = self.frames()
for frame in frames_iterator:
- BaseCamera.frame[device].put(frame, block=True)
+ BaseCamera.deviceQ[self.device].put(frame, block=True)
- BaseCamera.process[device] = None
def person_detected(self, num):
self.last_detected = time.time()