| import paho.mqtt.client as mqtt |
| import time |
| import os |
| import sys |
| import threading |
| import logging as log |
| from multiprocessing import Process, Queue, Value, Array, Lock |
| import threading |
| from collections import defaultdict |
| |
| from roc import Roc |
| import config |
| |
| try: |
| from greenlet import getcurrent as get_ident |
| except ImportError: |
| try: |
| from thread import get_ident |
| except ImportError: |
| from _thread import get_ident |
| |
| |
| class CameraEvent(object): |
| """An Event-like class that signals all active clients when a new frame is |
| available. |
| """ |
| def __init__(self): |
| self.events = {} |
| |
| def wait(self): |
| """Invoked from each client's thread to wait for the next frame.""" |
| ident = get_ident() |
| if ident not in self.events: |
| # this is a new client |
| # add an entry for it in the self.events dict |
| # each entry has two elements, a threading.Event() and a timestamp |
| self.events[ident] = [threading.Event(), time.time()] |
| return self.events[ident][0].wait(timeout=10.0) |
| |
| def set(self): |
| """Invoked by the camera thread when a new frame is available.""" |
| now = time.time() |
| remove = None |
| for ident, event in self.events.items(): |
| if not event[0].isSet(): |
| # if this client's event is not set, then set it |
| # also update the last set timestamp to now |
| event[0].set() |
| event[1] = now |
| else: |
| # if the client's event is already set, it means the client |
| # did not process a previous frame |
| # if the event stays set for more than 2 minutes, then assume |
| # the client is gone and remove it |
| if now - event[1] > 120: |
| remove = ident |
| #if remove: |
| # del self.events[remove] |
| |
| def clear(self): |
| """Invoked from each client's thread after a frame was processed.""" |
| self.events[get_ident()][0].clear() |
| |
| |
| class BaseCamera(object): |
| frame = {} |
| event = {} |
| |
| deviceQ = {} |
| cameras = [0]*len(config.cameras) |
| |
| lock = Lock() |
| activity_counter = Value('i', 0) |
| |
| def __init__(self, device, user, password, mbrlow, mbrhigh, devicegroup, noroc): |
| self.mqttBroker = "localhost" |
| self.device = device |
| self.mbrlow = mbrlow |
| self.mbrhigh = mbrhigh |
| self.devicegroup = devicegroup |
| self.noroc = noroc |
| self.roc = Roc(user, password) |
| |
| |
| """Start the background camera process if it isn't running yet.""" |
| if BaseCamera.cameras[int(self.device)] == 0: |
| BaseCamera.cameras[int(self.device)] = 1 |
| BaseCamera.event[self.device] = CameraEvent() |
| self.last_detected = None |
| self.timer = None |
| self.detected = False |
| BaseCamera.deviceQ[self.device] = Queue(100) |
| self.set_resolution(self.device, "low") |
| threading.Thread(target=self._thread).start() |
| # start background frame process |
| Process(target=self._process).start() |
| # wait until frames are available |
| _ = self.get_frame() |
| log.info("Start camera {} feed to {}".format(self.device, self.client)) |
| |
| def get_frame(self): |
| """Return the current camera frame.""" |
| while not BaseCamera.event[self.device].wait(): |
| log.info("get_frame timeout device:{}, thread:{}".format(self.device, get_ident())) |
| BaseCamera.event[self.device].clear() |
| return BaseCamera.frame[self.device] |
| |
| def frames(self): |
| """"Generator that returns frames from the camera.""" |
| raise NotImplementedError('Must be implemented by subclasses.') |
| |
| def _thread(self): |
| while True: |
| BaseCamera.frame[self.device] = BaseCamera.deviceQ[self.device].get(block=True) |
| BaseCamera.event[self.device].set() # send signal to clients |
| time.sleep(0) |
| |
| |
| def _process(self): |
| """Camera background process.""" |
| frames_iterator = self.frames() |
| for frame in frames_iterator: |
| BaseCamera.deviceQ[self.device].put(frame, block=True) |
| |
| |
| def person_detected(self, num): |
| self.last_detected = time.time() |
| if not self.detected: |
| BaseCamera.lock.acquire() |
| BaseCamera.activity_counter.value += 1 |
| BaseCamera.lock.release() |
| self.set_resolution_high() |
| if not self.noroc: |
| self.roc.set_mbr(self.devicegroup, self.mbrhigh) |
| self.detected = True |
| self.start_timer() |
| |
| def no_person_detected(self): |
| self.detected = False |
| self.timer = None |
| BaseCamera.lock.acquire() |
| BaseCamera.activity_counter.value -=1 |
| if BaseCamera.activity_counter.value <= 0: |
| BaseCamera.activity_counter.value = 0 |
| self.set_resolution_low() |
| if not self.noroc: |
| self.roc.set_mbr(self.devicegroup, self.mbrlow) |
| BaseCamera.lock.release() |
| |
| |
| def start_timer(self): |
| # log.info("Start timer for device {}".format(device)) |
| self.timer = threading.Timer(10.0, self.timer_expiry) |
| self.timer.start() |
| |
| |
| def set_resolution_high(self): |
| for device in range(0, len(config.cameras)): |
| self.set_resolution(str(device), "high") |
| |
| |
| def set_resolution_low(self): |
| for device in range(0, len(config.cameras)): |
| self.set_resolution(str(device), "low") |
| |
| |
| def set_resolution(self, device, level): |
| log.info("Setting camera {} resolution to {}".format(device, level)) |
| client = mqtt.Client() |
| client.connect(self.mqttBroker) |
| client.publish("camera/" + str(5000 + int(device)), level) |
| |
| |
| def timer_expiry(self): |
| now = time.time() |
| diff = now - self.last_detected |
| log.debug("timer_expiry() - now:{}, last_detected:{}".format(now, self.last_detected)) |
| if diff > 5.0: |
| self.no_person_detected() |
| else: |
| # Restart timer since person detected not too long back |
| self.start_timer() |