Advanced usage¶
In this section, we illustrate a more advanced usage from one of our own use case. We need simultaneous acquisition on two cameras. The framerate is too fast to wait for each frame to be saved before grabbing the next one. But we don’t want to delay until the end of the acquisition (which might still be long) to start saving, because we don’t want to loose all the data in case something bad happens, and we wish to be able to have a look at the picture before the acquisition ends.
So, in this example, we implement simple queues in which the frames
are stored, and there is a fourth task which gets the frames from this queue and saves them,
(at a lower rate than the acquisition rate). When the acquisition is stopped,
this last task finishes the saving.
In addition, we want to save acquisition parameters with an
AsyncSession
object.
To summarize the four tasks:
task0: acquisition on cam 0
task1: acquisition on cam 1
task2: software trigger when cams are ready
task3: background saving of images
The first 3 tasks are similar to those in the previous section. We use
Python standard library SimpleQueue
class to implement the frame
queue.
We comment the various parts of this script in the following subsections.
Preambule¶
First the import statements, and definition of some global parameters (video parameter, as well as names for output files).
from queue import SimpleQueue
import asyncio
import os
import cv2
from datetime import datetime
from pymanip.instruments import Agilent33220a
from pymanip.video.avt import AVT_Camera
from pymanip.asyncsession import AsyncSession
from progressbar import ProgressBar
# User inputs
compression_level = 3
exposure_time = 10e-3
cam_type = "avt"
fps = 2
num_imgs = 10
# Paths to save data
current_date = datetime.today().strftime("%Y%m%d")
current_dir = os.path.dirname(os.path.realpath(__file__))
saving_dir_date = f"{current_dir}\\data\\{current_date}\\"
if not os.path.isdir(saving_dir_date):
os.makedirs(saving_dir_date)
num_dir = len(os.listdir(saving_dir_date))
saving_dir_run = f"{saving_dir_date}run{num_dir+1:02.0f}\\"
if not os.path.isdir(saving_dir_run):
os.makedirs(saving_dir_run)
basename = f"{saving_dir_run}session"
Acquisition task (task 0 and task 1)¶
This task is responsible for grabbing frames for the camera, and putting
it in the queue. Note that we must copy the image because the numpy
array yielded by the acquisition_async()
generator uses shared memory (and would no longer hold this particular frame
on subsequent iterations of the generator). The queues are created in the main
function, and are called im_buffer0
for camera 0, and im_buffer1
for camera 1.
In addition, we want to be able to abort the script. We use the mechanisms
defined in the pymanip.asyncsession.AsyncSession
class which set ups
signal handling for interrupt signal. It basically defines a running
attribute that is set to False
when the program should stop. The
acquisition task must check this variable to cleanup stop grabbing the
frames if the user has sent the interrupt signal.
async def acquire_images(sesn, cam, num_cam, initialing_cams):
global num_imgs
kk = 0
bar = ProgressBar(min_value=0, max_value=num_imgs, initial_value=kk)
gen = cam.acquisition_async(num_imgs, initialising_cams=initialing_cams)
async for im in gen:
if num_cam == 0:
sesn.im_buffer0.put(im.copy())
elif num_cam == 1:
sesn.im_buffer1.put(im.copy())
kk += 1
bar.update(kk)
if not sesn.running:
num_imgs = kk
success = await gen.asend(True)
if not success:
print("Unable to stop camera acquisition")
break
bar.finish()
print(f"Camera acquisition stopped ({kk:d} images recorded).")
sesn.running = False
Software trigger task (task 2)¶
This task monitor the set of initialising cams, which gets empty when all the cameras are ready to grab frames. Then, it triggers the generator function.
async def start_clock(cams):
# Start clocks once all camera are done initializing
while len(cams) > 0:
await asyncio.sleep(1e-3)
gbf.trigger()
return datetime.now().timestamp()
Background saving of images (task 3)¶
This task looks for images in the im_buffer0
and im_buffer1
queues, as long as
the acquisition is still running or that the queues are not empty.
The images are saved using OpenCV imwrite()
function that we run in an executor (i.e. in a
separate thread), so as not to block the acquisition tasks.
async def save_images(sesn):
params = (cv2.IMWRITE_PNG_COMPRESSION, compression_level)
loop = asyncio.get_event_loop()
i = 0
bar = None
while sesn.running or not sesn.im_buffer0.empty() or not sesn.im_buffer1.empty():
if sesn.im_buffer0.empty() and sesn.im_buffer1.empty():
await asyncio.sleep(1.0)
else:
if not im_buffer0.empty():
im0 = sesn.im_buffer0.get()
filename0 = f"{saving_dir_run}\\cam0_{i:04d}.png"
await loop.run_in_executor(None, cv2.imwrite, filename0, im0, params)
i += 1
if not im_buffer1.empty():
im1 = sesn.im_buffer1.get()
filename1 = f"{saving_dir_run}\\cam1_{i:04d}.png"
await loop.run_in_executor(None, cv2.imwrite, filename1, im1, params)
if not sesn.running:
if bar is None:
print("Saving is terminating...")
bar = ProgressBar(
min_value=0, max_value=2 * num_imgs, initial_value=2 * i
)
else:
bar.update(2 * i)
if bar is not None:
bar.finish()
print(f"{2*i:d} images saved.")
One important point is that this task is the only task which access disk storage. The acquisition tasks work solely on memory, so they are not slowed down by the saving task.
Main function and setup¶
The main function sets up the function generator and the cameras, and start the tasks. It must also create the queues.
async def main():
with AsyncSession(basename) as sesn, \
Agilent33220a("USB0::2391::1031::MY44052515::INSTR") as sesn.gbf:
# Configure function generator
sesn.gbf.configure_burst(fps, num_imgs)
sesn.save_parameter(fps=fps, num_imgs=num_imgs)
# Prepare buffer queues
sesn.im_buffer0 = SimpleQueue()
sesn.im_buffer1 = SimpleQueue()
# Prepare camera and start tasks
with AVT_Camera(0) as cam0, \
AVT_Camera(1) as cam1:
# External trigger and camera properties
cam0.set_trigger_mode(True)
cam1.set_trigger_mode(True)
cam0.set_exposure_time(exposure_time)
cam1.set_exposure_time(exposure_time)
sesn.save_parameter(exposure_time=exposure_time)
cam0.camera.IIDCPacketSizeAuto = "Off"
cam0.camera.IIDCPacketSize = 5720
cam1.camera.IIDCPacketSizeAuto = "Off"
cam1.camera.IIDCPacketSize = 8192 // 2
# Set up tasks
initialing_cams = {cam0, cam1}
task0 = acquire_images(sesn, cam0, 0, initialing_cams)
task1 = acquire_images(sesn, cam1, 1, initialing_cams)
task2 = start_clock(initialing_cams)
task3 = save_images(sesn)
# We use AsyncSession monitor co-routine which set ups the signal
# handling. We don't need remote access, so server_port=None.
# Alternative:
# await asyncio.gather(task0, task1, task2, task3)
await sesn.monitor(task0, task1, task2, task3,
server_port=None)
asyncio.run(main())