| import time |
| import threading |
| try: |
| from greenlet import getcurrent as get_ident |
| except ImportError: |
| try: |
| from thread import get_ident |
| except ImportError: |
| from _thread import get_ident |
| |
| |
| class CameraEvent(object): |
| """An Event-like class that signals all active clients when a new frame is |
| available. |
| """ |
| def __init__(self): |
| self.events = {} |
| |
| def wait(self): |
| """Invoked from each client's thread to wait for the next frame.""" |
| ident = get_ident() |
| if ident not in self.events: |
| # this is a new client |
| # add an entry for it in the self.events dict |
| # each entry has two elements, a threading.Event() and a timestamp |
| self.events[ident] = [threading.Event(), time.time()] |
| return self.events[ident][0].wait() |
| |
| def set(self): |
| """Invoked by the camera thread when a new frame is available.""" |
| now = time.time() |
| remove = None |
| for ident, event in self.events.items(): |
| if not event[0].isSet(): |
| # if this client's event is not set, then set it |
| # also update the last set timestamp to now |
| event[0].set() |
| event[1] = now |
| else: |
| # if the client's event is already set, it means the client |
| # did not process a previous frame |
| # if the event stays set for more than 5 seconds, then assume |
| # the client is gone and remove it |
| if now - event[1] > 5: |
| remove = ident |
| if remove: |
| del self.events[remove] |
| |
| def clear(self): |
| """Invoked from each client's thread after a frame was processed.""" |
| self.events[get_ident()][0].clear() |
| |
| |
| class BaseCamera(object): |
| thread = {} # background thread that reads frames from camera |
| frame = {} # current frame is stored here by background thread |
| last_access = {} # time of last client access to the camera |
| event = {} |
| idle = False # if True, stops thread if no client connected |
| |
| def __init__(self, device=None, idle=False): |
| """Start the background camera thread if it isn't running yet.""" |
| self.device = device |
| self.idle = idle |
| BaseCamera.event[self.device] = CameraEvent() |
| if self.device not in BaseCamera.thread: |
| BaseCamera.thread[self.device] = None |
| if BaseCamera.thread[self.device] is None: |
| BaseCamera.last_access[self.device] = time.time() |
| |
| # start background frame thread |
| BaseCamera.thread[self.device] = threading.Thread(target=self._thread, args=(self.device)) |
| BaseCamera.thread[self.device].start() |
| |
| # wait until frames are available |
| while self.get_frame() is None: |
| time.sleep(0) |
| |
| def get_frame(self): |
| """Return the current camera frame.""" |
| BaseCamera.last_access[self.device] = time.time() |
| |
| # wait for a signal from the camera thread |
| BaseCamera.event[self.device].wait() |
| BaseCamera.event[self.device].clear() |
| |
| return BaseCamera.frame[self.device] |
| |
| def frames(self): |
| """"Generator that returns frames from the camera.""" |
| raise NotImplementedError('Must be implemented by subclasses.') |
| |
| def _thread(self, device): |
| """Camera background thread.""" |
| frames_iterator = self.frames() |
| for frame in frames_iterator: |
| BaseCamera.frame[device] = frame |
| BaseCamera.event[device].set() # send signal to clients |
| time.sleep(0) |
| |
| if self.idle: |
| # if there hasn't been any clients asking for frames in |
| # the last 10 seconds then stop the thread |
| if time.time() - BaseCamera.last_access[device] > 10: |
| frames_iterator.close() |
| print('Stopping camera thread due to inactivity.') |
| break |
| |
| BaseCamera.thread[device] = None |