"""Video acquisition module (:mod:`pymanip.video`)
==================================================
This module defines the :class:`~pymanip.video.Camera` abstract base class,
which implements common methods such as the live video preview, and higher
level simple methods to quickly set up a video recording. It also
defines common useful functions, and a simple extension of Numpy arrays to
hold metadata (such as frame timestamp).
.. autoclass:: Camera
:members:
:private-members:
.. autoclass:: CameraTimeout
.. autoclass:: MetadataArray
:members:
:private-members:
:show-inheritance:
.. attribute:: metadata
dictionnary attribute containing user-defined key-value pairs
.. autofunction:: save_image
"""
import sys
import signal
import os
import gzip
import numpy as np
try:
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore
from PyQt5.QtWidgets import (
QMainWindow,
QWidget,
QPushButton,
QVBoxLayout,
QHBoxLayout,
QApplication,
QLabel,
QCheckBox,
QDoubleSpinBox,
)
has_qtgraph = True
except ModuleNotFoundError:
has_qtgraph = False
print("pyqtgraph is not installed.")
import cv2
import h5py
import time
from progressbar import ProgressBar
import asyncio
from pymanip.asynctools import synchronize_function, synchronize_generator
[docs]class CameraTimeout(Exception):
"""This class defines a CameraTimeout exception."""
pass
[docs]def save_image(
im,
ii,
basename,
zerofill,
file_format,
compression,
compression_level,
color_order=None,
):
"""This function is a simple general function to save an input image from the camera
to disk.
:param im: input image
:type im: :class:`~pymanip.video.MetadataArray`
:param ii: frame number
:type ii: int
:param basename: file basename
:type basename: str
:param zerofill: number of digits for the frame number
:type zerofill: int
:param file_format: image file format on disk. Possible values are: "raw", "npy", "npy.gz", "hdf5", "png", or a file extension that OpenCV imwrite supports
:type file_format: str
:param compression: the compression argument "gzip" or "lzf" to pass to :meth:`h5py.create_dataset` if file_format is "hdf5"
:type compression: str
:param compression_level: the png compression level passed to opencv for the "png" file format
:type compression_level: int
"""
if file_format == "raw":
filename = ("{:}-{:0" + str(zerofill) + "d}.li16").format(basename, ii + 1)
im.tofile(filename)
elif file_format == "npy":
filename = ("{:}-{:0" + str(zerofill) + "d}.npy").format(basename, ii + 1)
np.save(filename, im)
elif file_format == "npy.gz":
filename = ("{:}-{:0" + str(zerofill) + "d}.npy.gz").format(basename, ii + 1)
with gzip.open(filename, "wb") as f:
np.save(f, im)
elif file_format in ("hdf", "hdf5"):
filename = ("{:}-{:0" + str(zerofill) + "d}.hdf5").format(basename, ii + 1)
with h5py.File(filename, "w") as f:
f.attrs["counter"] = im.metadata["counter"]
f.attrs["timestamp"] = im.metadata["timestamp"].timestamp()
if color_order is not None:
f.attrs["color_order"] = color_order
# compression='gzip' trop lent pour 30 fps
# compression='lzf' presque bon mais un peu lent à 30 fps
f.create_dataset("image", data=im, compression=compression)
else:
filename = ("{:}-{:0" + str(zerofill) + "d}.{:}").format(
basename, ii + 1, file_format
)
if file_format == "png":
params = (cv2.IMWRITE_PNG_COMPRESSION, compression_level)
else:
params = None
if color_order == "RGB":
im = cv2.cvtColor(im, cv2.COLOR_RGB2BGR)
cv2.imwrite(filename, im, params)
[docs]class Camera:
"""This class is the abstract base class for all other concrete camera classes.
The concrete sub-classes *must* implement the following methods:
- :meth:`acquisition_oneshot` method
- :meth:`acquisition` and :meth:`acquisition_async` generator methods
- :attr:`resolution`, :attr:`name` and :attr:`bitdepth` properties
The concrete sub-classes will also probably have to override the constructor
method, and the enter/exit context manager method, as well as common property getters
and setters:
- :meth:`set_exposure_time`
- :meth:`set_trigger_mode`
- :meth:`set_roi`
- :meth:`set_frame_rate`
It may also define specialized getters for the camera which support them:
- :meth:`set_adc_operating_mode`: ADC operating mode
- :meth:`set_pixel_rate`: pixel rate sensor readout (in Hz)
- :meth:`set_delay_exposuretime`
"""
[docs] def acquisition_oneshot(self):
"""This method must be implemented in the sub-classes.
It starts the camera, grab one frame, stops the camera, and returns the frame. It is useful for testing
purposes, or in cases where only one frame is desired between very long time delays. It takes no input parameters.
Returns an "autonomous" array (the buffer is independant of the camera object).
:return: frame
:rtype: :class:`~pymanip.video.MetadataArray`
"""
raise NotImplementedError()
[docs] def acquisition(
self,
num=np.inf,
timeout=1000,
raw=False,
initialising_cams=None,
raise_on_timeout=True,
):
"""This generator method is the main method that sub-classes must implement, along with the asynchronous variant.
It is used by all the other higher-level methods, and can also be used directly in user code.
:param num: number of frames to acquire, defaults to float("inf").
:type num: int, or float("inf"), optional
:param timout: timeout for frame acquisition (in milliseconds)
:type timeout: int, optional
:param raw: if True, returns bytes from the camera without any conversion. Defaults to False.
:type raw: bool, optional
:param initialising_cams: None, or set of camera objects. This camera object will remove itself from this set, once it is ready to grab frames. Useful in the case of multi camera acquisitions, to determine when all cameras are ready to grab frames. Defaults to None.
:type initialising_cams: set, optional
:param raise_on_timeout: boolean indicating whether to actually raise an exception when timeout occurs
:type raise_on_timeout: bool, optional
It starts the camera, yields :obj:`num` images, and closes the camera.
It can be aborted when sent a true-truth value object. It then cleanly
stops the camera and finally yields True as a confirmation that the stop_signal has been caught before returning.
Sub-classes must therefore reads the possible stop_signal when yielding the frame, and act accordingly.
The :class:`~pymanip.video.MetadataArray` objects yielded by this generator use a shared memory buffer which may
be overriden for the next frame, and which is no longer defined when the generator object is cleaned up.
The users are responsible for copying the array, if they want a persistant copy.
User-level code will use the generator in this manner:
.. code-block:: python
gen = cam.acquire()
for frame in gen:
# .. do something with frame ..
if I_want_to_stop:
clean = gen.send(True)
if not clean:
print('Warning generator not cleaned')
# no need to break here because the gen will be automatically exhausted
"""
yield from synchronize_generator(
self.acquisition_async, num, timeout, raw, None, raise_on_timeout
)
[docs] async def acquisition_async(
self,
num=np.inf,
timeout=1000,
raw=False,
initialising_cams=None,
raise_on_timeout=True,
):
"""This asynchronous generator method is similar to the :meth:`~pymanip.video.Camera.acquisition` generator method,
except asynchronous. So much so, that in the general case, the latter can be defined simply by yielding from this
asynchronous generator (so that the code is written once for both use cases), i.e.
.. code-block:: python
from pymanip.asynctools import synchronize_generator
def acquisition(
self,
num=np.inf,
timeout=1000,
raw=False,
initialising_cams=None,
raise_on_timeout=True,
):
yield from synchronize_generator(
self.acquisition_async,
num,
timeout,
raw,
initialising_cams,
raise_on_timeout,
)
It starts the camera, yields :obj:`num` images, and closes the camera.
It can stop yielding images by sending the generator object a true-truth value object. It then cleanly
stops the camera and finally yields True as a confirmation that the stop_signal has been caught before returning.
Sub-classes must therefore reads the possible stop_signal when yielding the frame, and act accordingly.
The :class:`~pymanip.video.MetadataArray` objects yielded by this generator use a shared memory buffer which may
be overriden for the next frame, and which is no longer defined when the generator object is cleaned up.
The users are responsible for copying the array, if they want a persistant copy.
The user API is similar, except with asynchronous calls, i.e.
.. code-block:: python
gen = cam.acquire_async()
async for frame in gen:
# .. do something with frame ..
if I_want_to_stop:
clean = await gen.asend(True)
if not clean:
print('Warning generator not cleaned')
# no need to break here because the gen will be automatically exhausted
"""
raise NotImplementedError()
def __enter__(self):
"""Context manager enter method"""
return self
def __exit__(self, type_, value, cb):
"""Context manager exit method"""
if hasattr(self, "preview_generator"):
self.preview_generator = None
[docs] def preview(self, backend="cv", slice_=None, zoom=0.5, rotate=0):
"""This methods starts and synchronously runs the live-preview GUI.
:param backend: GUI library to use. Possible values: "cv" for OpenCV GUI, "qt" for PyQtGraph GUI.
:type backend: str
:param slice_: coordinate of the region of interest to show, defaults to None
:type slice_: Iterable[int], optional
:param zoom: zoom factor, defaults to 0.5
:type zoom: float, optional
:param rotate: image rotation angle, defaults to 0
:type rotate: float, optional
"""
if backend == "cv":
self.preview_cv(slice_, zoom, rotate)
elif backend == "qt":
self.preview_qt(slice_, zoom, None, rotate)
else:
raise RuntimeError('Unknown backend "' + backend + '"')
[docs] async def preview_async_cv(self, slice_, zoom, name, rotate=0):
"""This method starts and asynchronously runs the live-preview with OpenCV GUI.
The params are identical to the :meth:`~pymanip.video.Camera.preview` method.
"""
minimum = None
maximum = None
cv2.namedWindow(name)
try:
preview_generator = self.acquisition_async()
async for im in preview_generator:
# if minimum is None:
if True:
minimum = np.min(im)
maximum = np.max(im)
# print('min, max:', minimum, maximum)
maxint = np.iinfo(im.dtype).max
if rotate == 90.0:
im = cv2.rotate(im, cv2.ROTATE_90_COUNTERCLOCKWISE)
elif rotate == -90.0:
im = cv2.rotate(im, cv2.ROTATE_90_CLOCKWISE)
try:
l, c = im.shape
color = False
except ValueError:
l, c, ncomp = im.shape
color = True
if slice_:
if color:
img = (maxint // (maximum - minimum)) * (
im[slice_[0] : slice_[1], slice_[2] : slice_[3], :]
- minimum
)
else:
img = (maxint // (maximum - minimum)) * (
im[slice_[0] : slice_[1], slice_[2] : slice_[3]] - minimum
)
else:
img = (maxint // (maximum - minimum)) * (im - minimum)
img = cv2.resize(img, (int(c * zoom), int(l * zoom)))
if color and self.color_order == "RGB":
# OpenCV works in BGR order.
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
cv2.imshow(name, img)
k = cv2.waitKey(1)
if k in (0x1B, ord("s")):
clean = await preview_generator.asend(True)
if not clean:
print("Generator not cleaned")
break
await asyncio.sleep(0.001)
except KeyboardInterrupt:
pass
finally:
cv2.destroyAllWindows()
[docs] def preview_cv(self, slice_, zoom, rotate=0):
"""This method starts and synchronously runs the live-preview with OpenCV GUI.
It is a wrapper around the :meth:`pymanip.video.Camera.preview_async_cv` method.
The params are identical to the :meth:`~pymanip.video.Camera.preview` method.
"""
return synchronize_function(
self.preview_async_cv, slice_, zoom, name="Preview", rotate=rotate
)
[docs] def preview_exitHandler(self):
"""This method sends a stop signal to the camera acquisition generator of the
live-preview GUI.
"""
clean = self.preview_generator.send(True)
if not clean:
print("Generator not cleaned")
[docs] def display_crosshair(self):
"""This method adds a centered crosshair for self-reflection to the live-preview
window (qt backend only)
"""
if self.crosshair_chkbox.isChecked():
self.vLine = pg.InfiniteLine(
pos=(self.camera.Width / 2, 0), angle=90, movable=False
)
self.hLine = pg.InfiniteLine(
pos=(0, self.camera.Height / 2), angle=0, movable=False
)
self.image_view.addItem(self.vLine, ignoreBounds=True)
self.image_view.addItem(self.hLine, ignoreBounds=True)
else:
self.image_view.removeItem(self.vLine)
self.image_view.removeItem(self.hLine)
[docs] def preview_qt(self, slice, zoom, app=None, rotate=0):
"""This methods starts and synchronously runs the live-preview with Qt GUI.
The params are identical to the :meth:`~pymanip.video.Camera.preview` method.
"""
if app:
self.app = app
just_started = False
elif not hasattr(self, "app"):
self.app = QApplication([])
self.app.aboutToQuit.connect(self.preview_exitHandler)
just_started = True
else:
just_started = False
# create window if it does not already exists
if not hasattr(self, "window"):
self.window = QMainWindow()
# self.window.resize(*self.resolution)
self.window.resize(800, 600)
self.window.setWindowTitle(self.name)
self.image_view = pg.ImageView()
self.window.setCentralWidget(self.image_view)
self.range_set = False
# adding widget for controlling the background subtraction
# and a crosshair overlay
self.central_widget = QWidget()
self.tools_widget = QWidget()
self.central_layout = QVBoxLayout(self.central_widget)
self.tools_layout = QHBoxLayout(self.tools_widget)
self.crosshair_chkbox = QCheckBox("Crosshair", self.tools_widget)
self.subtraction_chkbox = QCheckBox(
"Background subtraction", self.tools_widget
)
self.learning_label = QLabel(
"Learning rate [0, 1] :", parent=self.tools_widget
)
self.spnbx_learning = QDoubleSpinBox(parent=self.tools_widget, value=0.05)
self.spnbx_learning.setRange(0, 1)
self.spnbx_learning.setSingleStep(0.01)
self.spnbx_learning.setDecimals(3)
self.acq_btn = QPushButton("Acquisition", self.tools_widget)
self.exposure_label = QLabel(
"Exposure time (s) :", parent=self.tools_widget
)
self.spnbox_exposure = QDoubleSpinBox(parent=self.tools_widget, value=0.001)
self.spnbox_exposure.setRange(0.000033, 67.108895)
self.spnbox_exposure.setSingleStep(0.0001)
self.spnbox_exposure.setDecimals(4)
self.tools_layout.addWidget(self.crosshair_chkbox)
self.tools_layout.addWidget(self.subtraction_chkbox)
self.tools_layout.addWidget(self.learning_label)
self.tools_layout.addWidget(self.spnbx_learning)
self.tools_layout.addWidget(self.exposure_label)
self.tools_layout.addWidget(self.spnbox_exposure)
self.tools_layout.addWidget(self.acq_btn)
self.central_layout.addWidget(self.image_view)
self.central_layout.addWidget(self.tools_widget)
self.window.setCentralWidget(self.central_widget)
self.crosshair_chkbox.stateChanged.connect(self.display_crosshair)
# hide useless buttons
self.image_view.ui.roiBtn.hide()
self.image_view.ui.menuBtn.hide()
self.window.show()
# instantiate generator
if not hasattr(self, "preview_generator"):
self.preview_generator = self.acquisition(
timeout=1000, raise_on_timeout=False
)
if just_started:
self.bkgrd = None
# update view with latest image if it is ready
# do nothing otherwise (to allow GUI interaction while waiting
# for camera reading)
img = next(self.preview_generator)
if img is not None:
try:
l, c = img.shape
color = False
except ValueError:
l, c, ncomp = img.shape
color = True
if zoom != 1.0:
img = cv2.resize(img, (int(c * zoom), int(l * zoom)))
if rotate == 90.0:
img = cv2.rotate(img, cv2.ROTATE_90_COUNTERCLOCKWISE)
elif rotate == -90.0:
img = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
if self.subtraction_chkbox.isChecked():
if self.bkgrd is None:
self.bkgrd = img
self.range_set = False
learning_rate = self.spnbx_learning.value()
self.bkgrd = (1 - learning_rate) * self.bkgrd + learning_rate * img
self.bkgrd = self.bkgrd.astype(np.int32)
img = img - self.bkgrd # self.bkgrd - img
img[img < 0] = 0
img = img.astype(np.uint16)
if not color:
img = img.T
else:
img = np.transpose(img, axes=(1, 0, 2))
if self.color_order == "BGR":
# Qt comme Matplotlib travaille en RGB.
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
self.image_view.setImage(
img, autoRange=False, autoLevels=False, autoHistogramRange=False
)
if not self.range_set:
self.image_view.autoRange()
self.image_view.autoLevels()
self.range_set = True
# set timer for refreshing in 10 ms
QtCore.QTimer.singleShot(
10, lambda: self.preview_qt(slice, zoom, self.app, rotate)
)
if just_started:
self.app.exec_()
[docs] def acquire_to_files(self, *args, **kwargs):
"""This method starts the camera, acquires images and saves them to the disk.
It is a simple wrapper around the :meth:`pymanip.video.Camera.acquire_to_files_async` asynchronous
method. The parameters are identical.
"""
return synchronize_function(self.acquire_to_files_async, *args, **kwargs)
[docs] def acquire_signalHandler(self, *args, **kwargs):
"""This method sends a stop signal to the :meth:`~pymanip.video.Camera.acquire_to_files_async` method."""
self.acqinterrupted = True
[docs] async def acquire_to_files_async(
self,
num,
basename,
zerofill=4,
dryrun=False,
file_format="png",
compression=None,
compression_level=3,
verbose=True,
delay_save=False,
progressbar=True,
initialising_cams=None,
**kwargs
):
"""This asynchronous method starts the camera, acquires :obj:`num` images and saves them to the disk. It is
a simple quick way to perform camera acquisition (one-liner in the user code).
:param num: number of frames to acquire
:type num: int
:param basename: basename for image filenames to be saved on disk
:type basename: str
:param zerofill: number of digits for the framenumber for image filename, defaults to 4
:type zerofill: int, optional
:param dryrun: do the acquisition, but saves nothing (testing purposes), defaults to False
:type dryrun: bool, optional
:param file_format: format for the image files, defaults to "png". Possible values are "raw", "npy", "npy.gz", "hdf5", "png" or any other extension supported by OpenCV imwrite.
:type file_format: str, optional
:param compression: compression option for HDF5 format ("gzip", "lzf"), defaults to None.
:type compression: str, optional
:param compression_level: png compression level for PNG format, defaults to 3.
:type compression_level: int, optional
:param verbose: prints information message, defaults to True.
:type verbose: bool, optional
:param delay_save: records all the frame in RAM, and saves at the end. This is useful for fast framerates when saving time is too slow. Defaults to False.
:type delay_save: bool, optional
:param progressbar: use :mod:`progressbar` module to show a progress bar. Defaults to True.
:type progressbar: bool, optional
:param initialising_cams: None, or set of camera objects. This camera object will remove itself from this set, once it is ready to grab frames. Useful in the case of multi camera acquisitions, to determine when all cameras are ready to grab frames. Defaults to None.
:type initialising_cams: set, optional
:return: image_counter, frame_datetime
:rtype: list, list
The details of the file format are given in this table:
=============== ========================================================================
file_format description
=============== ========================================================================
raw native 16 bits integers, i.e. li16 (little-endian) on Intel CPUs
npy numpy npy file (warning: depends on pickle format)
npy.gz gzip compressed numpy file
hdf5 hdf5, with optional compression
png, jpg, tif image format with opencv imwrite with optional compression level for PNG
=============== ========================================================================
Typical usage of the function for one camera:
.. code-block:: python
async def main():
with Camera() as cam:
counts, times = await cam.acquire_to_files_async(num=20, basename='img-')
asyncio.run(main())
"""
# signal handling
if sys.platform == "win32":
signal.signal(signal.SIGINT, self.acquire_signalHandler)
else:
loop = asyncio.get_event_loop()
for signame in ("SIGINT", "SIGTERM"):
loop.add_signal_handler(
getattr(signal, signame), self.acquire_signalHandler
)
dirname = os.path.dirname(basename)
if len(dirname):
try:
os.makedirs(dirname)
except FileExistsError:
pass
count = []
dt = []
if verbose:
dateformat = "%A %d %B %Y - %X"
starttime = time.time()
starttime_str = time.strftime(dateformat, time.localtime(starttime))
print("Camera acquisition started: " + starttime_str)
if progressbar:
bar = ProgressBar(max_value=num)
computation_time = 0.0
images = list()
ii = 0
acqgen = self.acquisition_async(
num, initialising_cams=initialising_cams, **kwargs
)
self.acqinterrupted = False
async for im in acqgen:
if ii == 0:
print(im.dtype)
if not dryrun:
if delay_save:
images.append(im.copy())
else:
start_time = time.process_time()
if hasattr(self, "color_order"):
color_order = self.color_order
else:
color_order = None
if im.ndim < 3:
color_order = None
save_image(
im,
ii,
basename,
zerofill,
file_format,
compression,
compression_level,
color_order=color_order,
)
computation_time += time.process_time() - start_time
if hasattr(im, "metadata"):
count.append(im.metadata["counter"])
ts = im.metadata["timestamp"]
try:
ts = ts.timestamp()
except AttributeError:
pass
dt.append(ts)
ii += 1
if progressbar:
try:
bar.update(ii)
except Exception:
print(ii)
await asyncio.sleep(0.001)
if self.acqinterrupted:
print("")
print("Signal caught... Stopping camera acquisition...")
clean = await acqgen.asend(True)
if not clean:
print("Camera was not successfully interrupted")
break
if progressbar:
print("")
if delay_save and not dryrun:
print("Acquisition complete. Saving to disk...")
if progressbar:
bar = ProgressBar(max_value=ii)
for ii, im in enumerate(images):
start_time = time.process_time()
save_image(
im,
ii,
basename,
zerofill,
file_format,
compression,
compression_level,
)
computation_time += time.process_time() - start_time
if progressbar:
try:
bar.update(ii + 1)
except Exception:
print(ii)
await asyncio.sleep(0.0001)
if progressbar:
print("")
dt = np.array(dt)
if verbose:
print(
"Average saving time per image:",
1000 * computation_time / (ii + 1),
"ms",
)
print("average fps =", 1 / np.mean(dt[1:] - dt[:-1]))
if images:
print("image size:", images[0].shape)
print("image dtype:", images[0].dtype)
return count, dt