| import paho.mqtt.client as mqtt |
| import time |
| import os |
| import sys |
| import threading |
| import logging as log |
| from multiprocessing import Process, Queue, Value, Array, Lock |
| |
| import roc |
| |
| |
| class BaseCamera(object): |
| process = {} # background process that reads frames from camera |
| frame = {} # frame queue |
| activity_counter = Value('i', 0) |
| cameras = Array('i', [0, 0, 0, 0]) |
| lock = Lock() |
| |
| def __init__(self, device, key, mbrlow, mbrhigh, devicegroup): |
| self.mqttBroker = "localhost" |
| self.device = device |
| self.key = key |
| self.mbrlow = mbrlow |
| self.mbrhigh = mbrhigh |
| self.devicegroup = devicegroup |
| |
| """Start the background camera process if it isn't running yet.""" |
| if BaseCamera.cameras[int(self.device)] == 0: |
| BaseCamera.cameras[int(self.device)] = 1 |
| self.last_detected = None |
| self.timer = None |
| self.detected = False |
| BaseCamera.frame[self.device] = Queue(100) |
| # start background frame process |
| BaseCamera.process[self.device] = Process(target=self._process, args=(self.device)) |
| BaseCamera.process[self.device].start() |
| # wait until frames are available |
| _ = self.get_frame() |
| |
| def get_frame(self): |
| """Return the current camera frame.""" |
| |
| # blocks |
| return BaseCamera.frame[self.device].get(block=True) |
| |
| def frames(self): |
| """"Generator that returns frames from the camera.""" |
| raise NotImplementedError('Must be implemented by subclasses.') |
| |
| def _process(self, device): |
| """Camera background process.""" |
| frames_iterator = self.frames() |
| for frame in frames_iterator: |
| BaseCamera.frame[device].put(frame, block=True) |
| |
| BaseCamera.process[device] = None |
| |
| def person_detected(self, num): |
| self.last_detected = time.time() |
| if not self.detected: |
| BaseCamera.lock.acquire() |
| BaseCamera.activity_counter.value += 1 |
| BaseCamera.lock.release() |
| self.set_resolution_high() |
| roc.set_mbr(self.key, self.devicegroup, self.mbrhigh) |
| self.detected = True |
| self.start_timer() |
| |
| def no_person_detected(self): |
| self.detected = False |
| self.timer = None |
| BaseCamera.lock.acquire() |
| BaseCamera.activity_counter.value -=1 |
| if BaseCamera.activity_counter.value <= 0: |
| BaseCamera.activity_counter.value = 0 |
| self.set_resolution_low() |
| roc.set_mbr(self.key, self.devicegroup, self.mbrlow) |
| BaseCamera.lock.release() |
| |
| |
| def start_timer(self): |
| # log.info("Start timer for device {}".format(device)) |
| self.timer = threading.Timer(10.0, self.timer_expiry) |
| self.timer.start() |
| |
| |
| def set_resolution_high(self): |
| for device in range(0, 4): |
| self.set_resolution(str(device), "high") |
| |
| |
| def set_resolution_low(self): |
| for device in range(0, 4): |
| self.set_resolution(str(device), "low") |
| |
| |
| def set_resolution(self, device, level): |
| log.info("Setting camera {} resolution to {}".format(device, level)) |
| client = mqtt.Client() |
| client.connect(self.mqttBroker) |
| client.publish("camera/" + str(5000 + int(device)), level) |
| |
| |
| def timer_expiry(self): |
| now = time.time() |
| diff = now - self.last_detected |
| log.info("timer_expiry() - now:{}, last_detected:{}".format(now, self.last_detected)) |
| if diff > 5.0: |
| self.no_person_detected() |
| else: |
| # Restart timer since person detected not too long back |
| self.start_timer() |