blob: 53525abefe0984095fa7b965f5f21199a1d2a346 [file] [log] [blame]
import paho.mqtt.client as mqtt
import time
import os
import sys
import threading
import logging as log
from multiprocessing import Process, Queue, Value, Array, Lock
import threading
from collections import defaultdict
from roc import Roc
import config
try:
from greenlet import getcurrent as get_ident
except ImportError:
try:
from thread import get_ident
except ImportError:
from _thread import get_ident
class CameraEvent(object):
"""An Event-like class that signals all active clients when a new frame is
available.
"""
def __init__(self):
self.events = {}
def wait(self):
"""Invoked from each client's thread to wait for the next frame."""
ident = get_ident()
if ident not in self.events:
# this is a new client
# add an entry for it in the self.events dict
# each entry has two elements, a threading.Event() and a timestamp
self.events[ident] = [threading.Event(), time.time()]
return self.events[ident][0].wait(timeout=10.0)
def set(self):
"""Invoked by the camera thread when a new frame is available."""
now = time.time()
remove = None
for ident, event in self.events.items():
if not event[0].isSet():
# if this client's event is not set, then set it
# also update the last set timestamp to now
event[0].set()
event[1] = now
else:
# if the client's event is already set, it means the client
# did not process a previous frame
# if the event stays set for more than 2 minutes, then assume
# the client is gone and remove it
if now - event[1] > 120:
remove = ident
#if remove:
# del self.events[remove]
def clear(self):
"""Invoked from each client's thread after a frame was processed."""
self.events[get_ident()][0].clear()
class BaseCamera(object):
frame = {}
event = {}
deviceQ = {}
cameras = [0]*len(config.cameras)
lock = Lock()
activity_counter = Value('i', 0)
def __init__(self, device, user, password, mbrlow, mbrhigh, devicegroup, noroc):
self.mqttBroker = "localhost"
self.device = device
self.mbrlow = mbrlow
self.mbrhigh = mbrhigh
self.devicegroup = devicegroup
self.noroc = noroc
self.roc = Roc(user, password)
"""Start the background camera process if it isn't running yet."""
if BaseCamera.cameras[int(self.device)] == 0:
BaseCamera.cameras[int(self.device)] = 1
BaseCamera.event[self.device] = CameraEvent()
self.last_detected = None
self.timer = None
self.detected = False
BaseCamera.deviceQ[self.device] = Queue(100)
self.set_resolution(self.device, "low")
threading.Thread(target=self._thread).start()
# start background frame process
Process(target=self._process).start()
# wait until frames are available
_ = self.get_frame()
log.info("Start camera {} feed to {}".format(self.device, self.client))
def get_frame(self):
"""Return the current camera frame."""
while not BaseCamera.event[self.device].wait():
log.info("get_frame timeout device:{}, thread:{}".format(self.device, get_ident()))
BaseCamera.event[self.device].clear()
return BaseCamera.frame[self.device]
def frames(self):
""""Generator that returns frames from the camera."""
raise NotImplementedError('Must be implemented by subclasses.')
def _thread(self):
while True:
BaseCamera.frame[self.device] = BaseCamera.deviceQ[self.device].get(block=True)
BaseCamera.event[self.device].set() # send signal to clients
time.sleep(0)
def _process(self):
"""Camera background process."""
frames_iterator = self.frames()
for frame in frames_iterator:
BaseCamera.deviceQ[self.device].put(frame, block=True)
def person_detected(self, num):
self.last_detected = time.time()
if not self.detected:
BaseCamera.lock.acquire()
BaseCamera.activity_counter.value += 1
BaseCamera.lock.release()
self.set_resolution_high()
if not self.noroc:
self.roc.set_mbr(self.devicegroup, self.mbrhigh)
self.detected = True
self.start_timer()
def no_person_detected(self):
self.detected = False
self.timer = None
BaseCamera.lock.acquire()
BaseCamera.activity_counter.value -=1
if BaseCamera.activity_counter.value <= 0:
BaseCamera.activity_counter.value = 0
self.set_resolution_low()
if not self.noroc:
self.roc.set_mbr(self.devicegroup, self.mbrlow)
BaseCamera.lock.release()
def start_timer(self):
# log.info("Start timer for device {}".format(device))
self.timer = threading.Timer(10.0, self.timer_expiry)
self.timer.start()
def set_resolution_high(self):
for device in range(0, len(config.cameras)):
self.set_resolution(str(device), "high")
def set_resolution_low(self):
for device in range(0, len(config.cameras)):
self.set_resolution(str(device), "low")
def set_resolution(self, device, level):
log.info("Setting camera {} resolution to {}".format(device, level))
client = mqtt.Client()
client.connect(self.mqttBroker)
client.publish("camera/" + str(5000 + int(device)), level)
def timer_expiry(self):
now = time.time()
diff = now - self.last_detected
log.debug("timer_expiry() - now:{}, last_detected:{}".format(now, self.last_detected))
if diff > 5.0:
self.no_person_detected()
else:
# Restart timer since person detected not too long back
self.start_timer()