blob: 0998e1ab9561286ee27c4fe51ddbee318372bcba [file] [log] [blame]
import threading
from queue import Queue
class BaseCamera(object):
thread = {} # background thread that reads frames from camera
frame = {} # frame queue
def __init__(self, device=None, idle=False):
"""Start the background camera thread if it isn't running yet."""
self.device = device
if self.device not in BaseCamera.thread:
BaseCamera.thread[self.device] = None
if BaseCamera.thread[self.device] is None:
self.frame[device] = Queue(100)
# start background frame thread
BaseCamera.thread[self.device] = threading.Thread(target=self._thread, args=(self.device))
BaseCamera.thread[self.device].start()
# wait until frames are available
_ = self.get_frame()
def get_frame(self):
"""Return the current camera frame."""
# blocks
return BaseCamera.frame[self.device].get(block=True)
def frames(self):
""""Generator that returns frames from the camera."""
raise NotImplementedError('Must be implemented by subclasses.')
def _thread(self, device):
"""Camera background thread."""
frames_iterator = self.frames()
for frame in frames_iterator:
BaseCamera.frame[device].put(frame, block=True)
BaseCamera.thread[device] = None