blob: 53525abefe0984095fa7b965f5f21199a1d2a346 [file] [log] [blame]
Shad Ansari5e8d0692021-12-08 19:09:34 +00001import paho.mqtt.client as mqtt
2import time
3import os
4import sys
5import threading
6import logging as log
7from multiprocessing import Process, Queue, Value, Array, Lock
Shad Ansari717d0d02022-01-20 08:44:05 +00008import threading
9from collections import defaultdict
Shad Ansari5e8d0692021-12-08 19:09:34 +000010
shadf64b92a2022-01-13 19:06:29 +000011from roc import Roc
Shad Ansari925bfe32021-12-14 21:39:10 +000012import config
Shad Ansari30a23732021-09-29 23:07:21 -070013
Shad Ansari717d0d02022-01-20 08:44:05 +000014try:
15 from greenlet import getcurrent as get_ident
16except ImportError:
17 try:
18 from thread import get_ident
19 except ImportError:
20 from _thread import get_ident
21
22
23class CameraEvent(object):
24 """An Event-like class that signals all active clients when a new frame is
25 available.
26 """
27 def __init__(self):
28 self.events = {}
29
30 def wait(self):
31 """Invoked from each client's thread to wait for the next frame."""
32 ident = get_ident()
33 if ident not in self.events:
34 # this is a new client
35 # add an entry for it in the self.events dict
36 # each entry has two elements, a threading.Event() and a timestamp
37 self.events[ident] = [threading.Event(), time.time()]
38 return self.events[ident][0].wait(timeout=10.0)
39
40 def set(self):
41 """Invoked by the camera thread when a new frame is available."""
42 now = time.time()
43 remove = None
44 for ident, event in self.events.items():
45 if not event[0].isSet():
46 # if this client's event is not set, then set it
47 # also update the last set timestamp to now
48 event[0].set()
49 event[1] = now
50 else:
51 # if the client's event is already set, it means the client
52 # did not process a previous frame
53 # if the event stays set for more than 2 minutes, then assume
54 # the client is gone and remove it
55 if now - event[1] > 120:
56 remove = ident
57 #if remove:
58 # del self.events[remove]
59
60 def clear(self):
61 """Invoked from each client's thread after a frame was processed."""
62 self.events[get_ident()][0].clear()
63
Shad Ansari30a23732021-09-29 23:07:21 -070064
65class BaseCamera(object):
Shad Ansari717d0d02022-01-20 08:44:05 +000066 frame = {}
67 event = {}
68
69 deviceQ = {}
70 cameras = [0]*len(config.cameras)
71
Shad Ansari5e8d0692021-12-08 19:09:34 +000072 lock = Lock()
Shad Ansari717d0d02022-01-20 08:44:05 +000073 activity_counter = Value('i', 0)
Shad Ansari30a23732021-09-29 23:07:21 -070074
shadf64b92a2022-01-13 19:06:29 +000075 def __init__(self, device, user, password, mbrlow, mbrhigh, devicegroup, noroc):
Shad Ansari5e8d0692021-12-08 19:09:34 +000076 self.mqttBroker = "localhost"
Shad Ansaric0726e62021-10-04 22:38:53 +000077 self.device = device
Shad Ansari5e8d0692021-12-08 19:09:34 +000078 self.mbrlow = mbrlow
79 self.mbrhigh = mbrhigh
80 self.devicegroup = devicegroup
Shad Ansariec6bbd32021-12-10 20:57:16 +000081 self.noroc = noroc
shadf64b92a2022-01-13 19:06:29 +000082 self.roc = Roc(user, password)
Shad Ansaric9f48d32021-10-25 19:03:34 +000083
Shad Ansari717d0d02022-01-20 08:44:05 +000084
Shad Ansari5e8d0692021-12-08 19:09:34 +000085 """Start the background camera process if it isn't running yet."""
86 if BaseCamera.cameras[int(self.device)] == 0:
87 BaseCamera.cameras[int(self.device)] = 1
Shad Ansari717d0d02022-01-20 08:44:05 +000088 BaseCamera.event[self.device] = CameraEvent()
Shad Ansari5e8d0692021-12-08 19:09:34 +000089 self.last_detected = None
90 self.timer = None
91 self.detected = False
Shad Ansari717d0d02022-01-20 08:44:05 +000092 BaseCamera.deviceQ[self.device] = Queue(100)
Shad Ansariec6bbd32021-12-10 20:57:16 +000093 self.set_resolution(self.device, "low")
Shad Ansari717d0d02022-01-20 08:44:05 +000094 threading.Thread(target=self._thread).start()
Shad Ansari26682be2021-10-26 03:52:35 +000095 # start background frame process
Shad Ansari717d0d02022-01-20 08:44:05 +000096 Process(target=self._process).start()
Shad Ansari30a23732021-09-29 23:07:21 -070097 # wait until frames are available
Shad Ansari60ca8cc2021-11-02 18:46:44 +000098 _ = self.get_frame()
Shad Ansari2eddc1e2022-01-10 17:53:06 +000099 log.info("Start camera {} feed to {}".format(self.device, self.client))
Shad Ansari30a23732021-09-29 23:07:21 -0700100
101 def get_frame(self):
102 """Return the current camera frame."""
Shad Ansari717d0d02022-01-20 08:44:05 +0000103 while not BaseCamera.event[self.device].wait():
104 log.info("get_frame timeout device:{}, thread:{}".format(self.device, get_ident()))
105 BaseCamera.event[self.device].clear()
106 return BaseCamera.frame[self.device]
Shad Ansari30a23732021-09-29 23:07:21 -0700107
Shad Ansari341ca3a2021-09-30 12:10:00 -0700108 def frames(self):
Shad Ansari30a23732021-09-29 23:07:21 -0700109 """"Generator that returns frames from the camera."""
Shad Ansari341ca3a2021-09-30 12:10:00 -0700110 raise NotImplementedError('Must be implemented by subclasses.')
Shad Ansari30a23732021-09-29 23:07:21 -0700111
Shad Ansari717d0d02022-01-20 08:44:05 +0000112 def _thread(self):
113 while True:
114 BaseCamera.frame[self.device] = BaseCamera.deviceQ[self.device].get(block=True)
115 BaseCamera.event[self.device].set() # send signal to clients
116 time.sleep(0)
117
118
119 def _process(self):
Shad Ansari26682be2021-10-26 03:52:35 +0000120 """Camera background process."""
Shad Ansari30a23732021-09-29 23:07:21 -0700121 frames_iterator = self.frames()
122 for frame in frames_iterator:
Shad Ansari717d0d02022-01-20 08:44:05 +0000123 BaseCamera.deviceQ[self.device].put(frame, block=True)
Shad Ansari4ae11682021-10-22 18:51:53 +0000124
Shad Ansari5e8d0692021-12-08 19:09:34 +0000125
126 def person_detected(self, num):
127 self.last_detected = time.time()
128 if not self.detected:
129 BaseCamera.lock.acquire()
130 BaseCamera.activity_counter.value += 1
131 BaseCamera.lock.release()
132 self.set_resolution_high()
Shad Ansari2eddc1e2022-01-10 17:53:06 +0000133 if not self.noroc:
shadf64b92a2022-01-13 19:06:29 +0000134 self.roc.set_mbr(self.devicegroup, self.mbrhigh)
Shad Ansari5e8d0692021-12-08 19:09:34 +0000135 self.detected = True
136 self.start_timer()
137
138 def no_person_detected(self):
139 self.detected = False
140 self.timer = None
141 BaseCamera.lock.acquire()
142 BaseCamera.activity_counter.value -=1
143 if BaseCamera.activity_counter.value <= 0:
144 BaseCamera.activity_counter.value = 0
145 self.set_resolution_low()
Shad Ansari2eddc1e2022-01-10 17:53:06 +0000146 if not self.noroc:
shadf64b92a2022-01-13 19:06:29 +0000147 self.roc.set_mbr(self.devicegroup, self.mbrlow)
Shad Ansari5e8d0692021-12-08 19:09:34 +0000148 BaseCamera.lock.release()
149
150
151 def start_timer(self):
152 # log.info("Start timer for device {}".format(device))
153 self.timer = threading.Timer(10.0, self.timer_expiry)
154 self.timer.start()
155
156
157 def set_resolution_high(self):
Shad Ansari925bfe32021-12-14 21:39:10 +0000158 for device in range(0, len(config.cameras)):
Shad Ansari5e8d0692021-12-08 19:09:34 +0000159 self.set_resolution(str(device), "high")
160
161
162 def set_resolution_low(self):
Shad Ansari925bfe32021-12-14 21:39:10 +0000163 for device in range(0, len(config.cameras)):
Shad Ansari5e8d0692021-12-08 19:09:34 +0000164 self.set_resolution(str(device), "low")
165
166
167 def set_resolution(self, device, level):
168 log.info("Setting camera {} resolution to {}".format(device, level))
169 client = mqtt.Client()
170 client.connect(self.mqttBroker)
171 client.publish("camera/" + str(5000 + int(device)), level)
172
173
174 def timer_expiry(self):
175 now = time.time()
176 diff = now - self.last_detected
shadf64b92a2022-01-13 19:06:29 +0000177 log.debug("timer_expiry() - now:{}, last_detected:{}".format(now, self.last_detected))
Shad Ansari5e8d0692021-12-08 19:09:34 +0000178 if diff > 5.0:
179 self.no_person_detected()
180 else:
181 # Restart timer since person detected not too long back
182 self.start_timer()