blob: 953da51d4b9058c02150807ccd07cf6f80a6012e [file] [log] [blame]
Shad Ansarif53bc4f2022-06-07 14:31:11 -07001"""
2SPDX-FileCopyrightText: 2022-present Intel Corporation
3SPDX-FileCopyrightText: 2020-present Open Networking Foundation <info@opennetworking.org>
4SPDX-License-Identifier: Apache-2.0
5"""
Shad Ansari5e8d0692021-12-08 19:09:34 +00006import paho.mqtt.client as mqtt
7import time
Shad Ansari5e8d0692021-12-08 19:09:34 +00008import threading
9import logging as log
Shad Ansari35019042022-03-29 20:15:54 +000010from multiprocessing import Process, Queue, Value, Lock
Shad Ansari5e8d0692021-12-08 19:09:34 +000011
shadf64b92a2022-01-13 19:06:29 +000012from roc import Roc
Shad Ansari925bfe32021-12-14 21:39:10 +000013import config
Shad Ansari30a23732021-09-29 23:07:21 -070014
Shad Ansari717d0d02022-01-20 08:44:05 +000015try:
16 from greenlet import getcurrent as get_ident
17except ImportError:
18 try:
19 from thread import get_ident
20 except ImportError:
21 from _thread import get_ident
22
23
24class CameraEvent(object):
25 """An Event-like class that signals all active clients when a new frame is
26 available.
27 """
28 def __init__(self):
29 self.events = {}
30
31 def wait(self):
32 """Invoked from each client's thread to wait for the next frame."""
33 ident = get_ident()
34 if ident not in self.events:
35 # this is a new client
36 # add an entry for it in the self.events dict
37 # each entry has two elements, a threading.Event() and a timestamp
38 self.events[ident] = [threading.Event(), time.time()]
39 return self.events[ident][0].wait(timeout=10.0)
40
41 def set(self):
42 """Invoked by the camera thread when a new frame is available."""
43 now = time.time()
44 remove = None
45 for ident, event in self.events.items():
46 if not event[0].isSet():
47 # if this client's event is not set, then set it
48 # also update the last set timestamp to now
49 event[0].set()
50 event[1] = now
51 else:
52 # if the client's event is already set, it means the client
53 # did not process a previous frame
54 # if the event stays set for more than 2 minutes, then assume
55 # the client is gone and remove it
56 if now - event[1] > 120:
57 remove = ident
58 #if remove:
59 # del self.events[remove]
60
61 def clear(self):
62 """Invoked from each client's thread after a frame was processed."""
63 self.events[get_ident()][0].clear()
64
Shad Ansari30a23732021-09-29 23:07:21 -070065
66class BaseCamera(object):
Shad Ansari717d0d02022-01-20 08:44:05 +000067 frame = {}
68 event = {}
69
70 deviceQ = {}
71 cameras = [0]*len(config.cameras)
72
Shad Ansari5e8d0692021-12-08 19:09:34 +000073 lock = Lock()
Shad Ansari717d0d02022-01-20 08:44:05 +000074 activity_counter = Value('i', 0)
Shad Ansari30a23732021-09-29 23:07:21 -070075
Shad Ansari2948cc02022-04-14 21:33:37 +000076 def __init__(self, device, url, keycloak, enterprise, site, user, password, mbrlow, mbrhigh, devicegroup, noroc):
Shad Ansari5e8d0692021-12-08 19:09:34 +000077 self.mqttBroker = "localhost"
Shad Ansaric0726e62021-10-04 22:38:53 +000078 self.device = device
Shad Ansari5e8d0692021-12-08 19:09:34 +000079 self.mbrlow = mbrlow
80 self.mbrhigh = mbrhigh
81 self.devicegroup = devicegroup
Shad Ansariec6bbd32021-12-10 20:57:16 +000082 self.noroc = noroc
Shad Ansaric9f48d32021-10-25 19:03:34 +000083
Shad Ansari2948cc02022-04-14 21:33:37 +000084 self.roc = Roc(url, keycloak, user, password, enterprise, site)
Shad Ansari717d0d02022-01-20 08:44:05 +000085
Shad Ansari5e8d0692021-12-08 19:09:34 +000086 """Start the background camera process if it isn't running yet."""
87 if BaseCamera.cameras[int(self.device)] == 0:
88 BaseCamera.cameras[int(self.device)] = 1
Shad Ansari717d0d02022-01-20 08:44:05 +000089 BaseCamera.event[self.device] = CameraEvent()
Shad Ansari5e8d0692021-12-08 19:09:34 +000090 self.last_detected = None
91 self.timer = None
92 self.detected = False
Shad Ansari717d0d02022-01-20 08:44:05 +000093 BaseCamera.deviceQ[self.device] = Queue(100)
Shad Ansariec6bbd32021-12-10 20:57:16 +000094 self.set_resolution(self.device, "low")
Shad Ansari717d0d02022-01-20 08:44:05 +000095 threading.Thread(target=self._thread).start()
Shad Ansari26682be2021-10-26 03:52:35 +000096 # start background frame process
Shad Ansari717d0d02022-01-20 08:44:05 +000097 Process(target=self._process).start()
Shad Ansari30a23732021-09-29 23:07:21 -070098 # wait until frames are available
Shad Ansari60ca8cc2021-11-02 18:46:44 +000099 _ = self.get_frame()
Shad Ansari2eddc1e2022-01-10 17:53:06 +0000100 log.info("Start camera {} feed to {}".format(self.device, self.client))
Shad Ansari30a23732021-09-29 23:07:21 -0700101
102 def get_frame(self):
103 """Return the current camera frame."""
Shad Ansari717d0d02022-01-20 08:44:05 +0000104 while not BaseCamera.event[self.device].wait():
105 log.info("get_frame timeout device:{}, thread:{}".format(self.device, get_ident()))
106 BaseCamera.event[self.device].clear()
107 return BaseCamera.frame[self.device]
Shad Ansari30a23732021-09-29 23:07:21 -0700108
Shad Ansari341ca3a2021-09-30 12:10:00 -0700109 def frames(self):
Shad Ansari30a23732021-09-29 23:07:21 -0700110 """"Generator that returns frames from the camera."""
Shad Ansari341ca3a2021-09-30 12:10:00 -0700111 raise NotImplementedError('Must be implemented by subclasses.')
Shad Ansari30a23732021-09-29 23:07:21 -0700112
Shad Ansari717d0d02022-01-20 08:44:05 +0000113 def _thread(self):
114 while True:
115 BaseCamera.frame[self.device] = BaseCamera.deviceQ[self.device].get(block=True)
116 BaseCamera.event[self.device].set() # send signal to clients
117 time.sleep(0)
118
Shad Ansari717d0d02022-01-20 08:44:05 +0000119 def _process(self):
Shad Ansari26682be2021-10-26 03:52:35 +0000120 """Camera background process."""
Shad Ansari30a23732021-09-29 23:07:21 -0700121 frames_iterator = self.frames()
122 for frame in frames_iterator:
Shad Ansari717d0d02022-01-20 08:44:05 +0000123 BaseCamera.deviceQ[self.device].put(frame, block=True)
Shad Ansari4ae11682021-10-22 18:51:53 +0000124
Shad Ansari5e8d0692021-12-08 19:09:34 +0000125 def person_detected(self, num):
126 self.last_detected = time.time()
127 if not self.detected:
128 BaseCamera.lock.acquire()
129 BaseCamera.activity_counter.value += 1
130 BaseCamera.lock.release()
131 self.set_resolution_high()
Shad Ansari2eddc1e2022-01-10 17:53:06 +0000132 if not self.noroc:
shadf64b92a2022-01-13 19:06:29 +0000133 self.roc.set_mbr(self.devicegroup, self.mbrhigh)
Shad Ansari5e8d0692021-12-08 19:09:34 +0000134 self.detected = True
135 self.start_timer()
136
137 def no_person_detected(self):
138 self.detected = False
139 self.timer = None
140 BaseCamera.lock.acquire()
Shad Ansari35019042022-03-29 20:15:54 +0000141 BaseCamera.activity_counter.value -= 1
Shad Ansari5e8d0692021-12-08 19:09:34 +0000142 if BaseCamera.activity_counter.value <= 0:
143 BaseCamera.activity_counter.value = 0
144 self.set_resolution_low()
Shad Ansari2eddc1e2022-01-10 17:53:06 +0000145 if not self.noroc:
shadf64b92a2022-01-13 19:06:29 +0000146 self.roc.set_mbr(self.devicegroup, self.mbrlow)
Shad Ansari5e8d0692021-12-08 19:09:34 +0000147 BaseCamera.lock.release()
148
Shad Ansari5e8d0692021-12-08 19:09:34 +0000149 def start_timer(self):
150 # log.info("Start timer for device {}".format(device))
151 self.timer = threading.Timer(10.0, self.timer_expiry)
152 self.timer.start()
153
Shad Ansari5e8d0692021-12-08 19:09:34 +0000154 def set_resolution_high(self):
Shad Ansari925bfe32021-12-14 21:39:10 +0000155 for device in range(0, len(config.cameras)):
Shad Ansari5e8d0692021-12-08 19:09:34 +0000156 self.set_resolution(str(device), "high")
157
Shad Ansari5e8d0692021-12-08 19:09:34 +0000158 def set_resolution_low(self):
Shad Ansari925bfe32021-12-14 21:39:10 +0000159 for device in range(0, len(config.cameras)):
Shad Ansari5e8d0692021-12-08 19:09:34 +0000160 self.set_resolution(str(device), "low")
161
Shad Ansari5e8d0692021-12-08 19:09:34 +0000162 def set_resolution(self, device, level):
163 log.info("Setting camera {} resolution to {}".format(device, level))
164 client = mqtt.Client()
165 client.connect(self.mqttBroker)
166 client.publish("camera/" + str(5000 + int(device)), level)
167
Shad Ansari5e8d0692021-12-08 19:09:34 +0000168 def timer_expiry(self):
169 now = time.time()
170 diff = now - self.last_detected
shadf64b92a2022-01-13 19:06:29 +0000171 log.debug("timer_expiry() - now:{}, last_detected:{}".format(now, self.last_detected))
Shad Ansari5e8d0692021-12-08 19:09:34 +0000172 if diff > 5.0:
173 self.no_person_detected()
174 else:
175 # Restart timer since person detected not too long back
176 self.start_timer()