| import threading |
| from queue import Queue |
| |
| |
| class BaseCamera(object): |
| thread = {} # background thread that reads frames from camera |
| frame = {} # frame queue |
| |
| def __init__(self, device=None, idle=False): |
| """Start the background camera thread if it isn't running yet.""" |
| self.device = device |
| if self.device not in BaseCamera.thread: |
| BaseCamera.thread[self.device] = None |
| if BaseCamera.thread[self.device] is None: |
| |
| self.frame[device] = Queue(100) |
| |
| # start background frame thread |
| BaseCamera.thread[self.device] = threading.Thread(target=self._thread, args=(self.device)) |
| BaseCamera.thread[self.device].start() |
| |
| # wait until frames are available |
| _ = self.get_frame() |
| |
| def get_frame(self): |
| """Return the current camera frame.""" |
| |
| # blocks |
| return BaseCamera.frame[self.device].get(block=True) |
| |
| def frames(self): |
| """"Generator that returns frames from the camera.""" |
| raise NotImplementedError('Must be implemented by subclasses.') |
| |
| def _thread(self, device): |
| """Camera background thread.""" |
| frames_iterator = self.frames() |
| for frame in frames_iterator: |
| BaseCamera.frame[device].put(frame, block=True) |
| |
| BaseCamera.thread[device] = None |