blob: bf849f1112e269e01a8d43a93fd4dff720991dcb [file] [log] [blame]
Shad Ansari5e8d0692021-12-08 19:09:34 +00001import paho.mqtt.client as mqtt
2import time
Shad Ansari5e8d0692021-12-08 19:09:34 +00003import threading
4import logging as log
Shad Ansari35019042022-03-29 20:15:54 +00005from multiprocessing import Process, Queue, Value, Lock
Shad Ansari5e8d0692021-12-08 19:09:34 +00006
shadf64b92a2022-01-13 19:06:29 +00007from roc import Roc
Shad Ansari925bfe32021-12-14 21:39:10 +00008import config
Shad Ansari30a23732021-09-29 23:07:21 -07009
Shad Ansari717d0d02022-01-20 08:44:05 +000010try:
11 from greenlet import getcurrent as get_ident
12except ImportError:
13 try:
14 from thread import get_ident
15 except ImportError:
16 from _thread import get_ident
17
18
19class CameraEvent(object):
20 """An Event-like class that signals all active clients when a new frame is
21 available.
22 """
23 def __init__(self):
24 self.events = {}
25
26 def wait(self):
27 """Invoked from each client's thread to wait for the next frame."""
28 ident = get_ident()
29 if ident not in self.events:
30 # this is a new client
31 # add an entry for it in the self.events dict
32 # each entry has two elements, a threading.Event() and a timestamp
33 self.events[ident] = [threading.Event(), time.time()]
34 return self.events[ident][0].wait(timeout=10.0)
35
36 def set(self):
37 """Invoked by the camera thread when a new frame is available."""
38 now = time.time()
39 remove = None
40 for ident, event in self.events.items():
41 if not event[0].isSet():
42 # if this client's event is not set, then set it
43 # also update the last set timestamp to now
44 event[0].set()
45 event[1] = now
46 else:
47 # if the client's event is already set, it means the client
48 # did not process a previous frame
49 # if the event stays set for more than 2 minutes, then assume
50 # the client is gone and remove it
51 if now - event[1] > 120:
52 remove = ident
53 #if remove:
54 # del self.events[remove]
55
56 def clear(self):
57 """Invoked from each client's thread after a frame was processed."""
58 self.events[get_ident()][0].clear()
59
Shad Ansari30a23732021-09-29 23:07:21 -070060
61class BaseCamera(object):
Shad Ansari717d0d02022-01-20 08:44:05 +000062 frame = {}
63 event = {}
64
65 deviceQ = {}
66 cameras = [0]*len(config.cameras)
67
Shad Ansari5e8d0692021-12-08 19:09:34 +000068 lock = Lock()
Shad Ansari717d0d02022-01-20 08:44:05 +000069 activity_counter = Value('i', 0)
Shad Ansari30a23732021-09-29 23:07:21 -070070
Shad Ansari2948cc02022-04-14 21:33:37 +000071 def __init__(self, device, url, keycloak, enterprise, site, user, password, mbrlow, mbrhigh, devicegroup, noroc):
Shad Ansari5e8d0692021-12-08 19:09:34 +000072 self.mqttBroker = "localhost"
Shad Ansaric0726e62021-10-04 22:38:53 +000073 self.device = device
Shad Ansari5e8d0692021-12-08 19:09:34 +000074 self.mbrlow = mbrlow
75 self.mbrhigh = mbrhigh
76 self.devicegroup = devicegroup
Shad Ansariec6bbd32021-12-10 20:57:16 +000077 self.noroc = noroc
Shad Ansaric9f48d32021-10-25 19:03:34 +000078
Shad Ansari2948cc02022-04-14 21:33:37 +000079 self.roc = Roc(url, keycloak, user, password, enterprise, site)
Shad Ansari717d0d02022-01-20 08:44:05 +000080
Shad Ansari5e8d0692021-12-08 19:09:34 +000081 """Start the background camera process if it isn't running yet."""
82 if BaseCamera.cameras[int(self.device)] == 0:
83 BaseCamera.cameras[int(self.device)] = 1
Shad Ansari717d0d02022-01-20 08:44:05 +000084 BaseCamera.event[self.device] = CameraEvent()
Shad Ansari5e8d0692021-12-08 19:09:34 +000085 self.last_detected = None
86 self.timer = None
87 self.detected = False
Shad Ansari717d0d02022-01-20 08:44:05 +000088 BaseCamera.deviceQ[self.device] = Queue(100)
Shad Ansariec6bbd32021-12-10 20:57:16 +000089 self.set_resolution(self.device, "low")
Shad Ansari717d0d02022-01-20 08:44:05 +000090 threading.Thread(target=self._thread).start()
Shad Ansari26682be2021-10-26 03:52:35 +000091 # start background frame process
Shad Ansari717d0d02022-01-20 08:44:05 +000092 Process(target=self._process).start()
Shad Ansari30a23732021-09-29 23:07:21 -070093 # wait until frames are available
Shad Ansari60ca8cc2021-11-02 18:46:44 +000094 _ = self.get_frame()
Shad Ansari2eddc1e2022-01-10 17:53:06 +000095 log.info("Start camera {} feed to {}".format(self.device, self.client))
Shad Ansari30a23732021-09-29 23:07:21 -070096
97 def get_frame(self):
98 """Return the current camera frame."""
Shad Ansari717d0d02022-01-20 08:44:05 +000099 while not BaseCamera.event[self.device].wait():
100 log.info("get_frame timeout device:{}, thread:{}".format(self.device, get_ident()))
101 BaseCamera.event[self.device].clear()
102 return BaseCamera.frame[self.device]
Shad Ansari30a23732021-09-29 23:07:21 -0700103
Shad Ansari341ca3a2021-09-30 12:10:00 -0700104 def frames(self):
Shad Ansari30a23732021-09-29 23:07:21 -0700105 """"Generator that returns frames from the camera."""
Shad Ansari341ca3a2021-09-30 12:10:00 -0700106 raise NotImplementedError('Must be implemented by subclasses.')
Shad Ansari30a23732021-09-29 23:07:21 -0700107
Shad Ansari717d0d02022-01-20 08:44:05 +0000108 def _thread(self):
109 while True:
110 BaseCamera.frame[self.device] = BaseCamera.deviceQ[self.device].get(block=True)
111 BaseCamera.event[self.device].set() # send signal to clients
112 time.sleep(0)
113
Shad Ansari717d0d02022-01-20 08:44:05 +0000114 def _process(self):
Shad Ansari26682be2021-10-26 03:52:35 +0000115 """Camera background process."""
Shad Ansari30a23732021-09-29 23:07:21 -0700116 frames_iterator = self.frames()
117 for frame in frames_iterator:
Shad Ansari717d0d02022-01-20 08:44:05 +0000118 BaseCamera.deviceQ[self.device].put(frame, block=True)
Shad Ansari4ae11682021-10-22 18:51:53 +0000119
Shad Ansari5e8d0692021-12-08 19:09:34 +0000120 def person_detected(self, num):
121 self.last_detected = time.time()
122 if not self.detected:
123 BaseCamera.lock.acquire()
124 BaseCamera.activity_counter.value += 1
125 BaseCamera.lock.release()
126 self.set_resolution_high()
Shad Ansari2eddc1e2022-01-10 17:53:06 +0000127 if not self.noroc:
shadf64b92a2022-01-13 19:06:29 +0000128 self.roc.set_mbr(self.devicegroup, self.mbrhigh)
Shad Ansari5e8d0692021-12-08 19:09:34 +0000129 self.detected = True
130 self.start_timer()
131
132 def no_person_detected(self):
133 self.detected = False
134 self.timer = None
135 BaseCamera.lock.acquire()
Shad Ansari35019042022-03-29 20:15:54 +0000136 BaseCamera.activity_counter.value -= 1
Shad Ansari5e8d0692021-12-08 19:09:34 +0000137 if BaseCamera.activity_counter.value <= 0:
138 BaseCamera.activity_counter.value = 0
139 self.set_resolution_low()
Shad Ansari2eddc1e2022-01-10 17:53:06 +0000140 if not self.noroc:
shadf64b92a2022-01-13 19:06:29 +0000141 self.roc.set_mbr(self.devicegroup, self.mbrlow)
Shad Ansari5e8d0692021-12-08 19:09:34 +0000142 BaseCamera.lock.release()
143
Shad Ansari5e8d0692021-12-08 19:09:34 +0000144 def start_timer(self):
145 # log.info("Start timer for device {}".format(device))
146 self.timer = threading.Timer(10.0, self.timer_expiry)
147 self.timer.start()
148
Shad Ansari5e8d0692021-12-08 19:09:34 +0000149 def set_resolution_high(self):
Shad Ansari925bfe32021-12-14 21:39:10 +0000150 for device in range(0, len(config.cameras)):
Shad Ansari5e8d0692021-12-08 19:09:34 +0000151 self.set_resolution(str(device), "high")
152
Shad Ansari5e8d0692021-12-08 19:09:34 +0000153 def set_resolution_low(self):
Shad Ansari925bfe32021-12-14 21:39:10 +0000154 for device in range(0, len(config.cameras)):
Shad Ansari5e8d0692021-12-08 19:09:34 +0000155 self.set_resolution(str(device), "low")
156
Shad Ansari5e8d0692021-12-08 19:09:34 +0000157 def set_resolution(self, device, level):
158 log.info("Setting camera {} resolution to {}".format(device, level))
159 client = mqtt.Client()
160 client.connect(self.mqttBroker)
161 client.publish("camera/" + str(5000 + int(device)), level)
162
Shad Ansari5e8d0692021-12-08 19:09:34 +0000163 def timer_expiry(self):
164 now = time.time()
165 diff = now - self.last_detected
shadf64b92a2022-01-13 19:06:29 +0000166 log.debug("timer_expiry() - now:{}, last_detected:{}".format(now, self.last_detected))
Shad Ansari5e8d0692021-12-08 19:09:34 +0000167 if diff > 5.0:
168 self.no_person_detected()
169 else:
170 # Restart timer since person detected not too long back
171 self.start_timer()