"""Asynchronous Acquisition Card (:mod:`pymanip.aiodaq`)
========================================================
This module defines an abstract base class for asynchronous communication
with acquisition cards.
This is used by the live oscilloscope command line tool.
Concrete implementations are:
- :class:`pymanip.aiodaq.daqmx.DAQmxSystem` for NI DAQmx cards;
- :class:`pymanip.aiodaq.scope.ScopeSystem` for NI Scope cards.
In principle, other library bindings could be implemented.
.. autoclass:: TerminalConfig
.. autoclass:: TriggerConfig
.. autoclass:: TimeoutException
.. autoclass:: AcquisitionCard
:members:
:private-members:
"""
import asyncio
from enum import IntEnum
from pymanip.asynctools import synchronize_function
[docs]class TerminalConfig(IntEnum):
RSE = 0
NRSE = 1
Diff = 2
PseudoDiff = 3
[docs]class TriggerConfig(IntEnum):
EdgeRising = 0
[docs]class TimeoutException(Exception):
pass
[docs]class AcquisitionCard:
"""Base class for all acquisition cards.
The constructor takes no argument. Channels
are added using the :meth:`~pymanip.aiodaq.daqmx.DAQmxSystem.add_channel` method,
and the clock is configured with the
:meth:`~pymanip.aiodaq.daqmx.DAQmxSystem.configure_clock` method.
"""
def __init__(self):
"""Constructor method
"""
self.channels = []
self.actual_ranges = []
self.running = False
self.last_read = 0
self.sample_rate = None
self.samples_per_chan = 1
self.loop = asyncio.get_event_loop()
def __enter__(self):
"""Context manager enter method
"""
return self
def __exit__(self, type_, value, cb):
"""Context manager exit method
"""
self.close()
@property
def samp_clk_max_rate(self):
"""Maximum sample clock rate
"""
return 0
[docs] def possible_trigger_channels(self):
"""This method returns the list of channels that can be used as trigger.
"""
return self.channels
[docs] def close(self):
"""This method closes the connection to the acquisition card.
"""
raise NotImplementedError()
[docs] def add_channel(self, channel_name, terminal_config, voltage_range):
"""This method adds a channel for acquisition.
:param channel_name: the channel to add, e.g. "Dev1/ai0"
:type channel_name: str
:param terminal_config: the configuration of the terminal, i.e. RSE, NRSE, DIFFERENTIAL or PSEUDODIFFERENTIAL
:type terminal_config: :class:`~pymanip.aiodaq.TerminalConfig`
:param voltage_range: the voltage range for the channel (actual value may differ)
:type voltage_range: float
"""
raise NotImplementedError()
[docs] def start(self):
"""This method starts the acquisition
"""
raise NotImplementedError()
[docs] async def stop(self):
"""This asynchronous method aborts the acquisition
"""
raise NotImplementedError()
[docs] async def read(self, tmo=None):
"""This asynchronous method reads data from the acquisition card.
"""
raise NotImplementedError()
[docs] async def start_read_stop(self, tmo=None):
"""This asynchronous method starts the acquisition, reads the data, and
stops the acquisition.
:param tmo: timeout for reading, defaults to None
:type tmo: float
"""
self.start()
data = await self.read(tmo)
await self.stop()
return data
[docs] def read_sync(self, tmo=None):
"""This method is a synchronous wrapper around
:meth:`~pymanip.aiodaq.AcquisitionCard.start_read_stop` method.
It is a convenience facility for simple usage.
"""
loop = asyncio.get_event_loop()
data = loop.run_until_complete(self.start_read_stop(tmo))
return data
[docs] async def read_analog(
self,
resource_names,
terminal_config,
volt_min=None,
volt_max=None,
samples_per_chan=1,
sample_rate=1,
coupling_types="DC",
output_filename=None,
verbose=True,
):
"""This asynchronous method is a high-level method for simple case. It
configures all the given channels, as well as the clock, then starts the
acquisition, read the data, and stops the acquisition.
It is essentially similar to :func:`pymanip.daq.DAQmx.read_analog`, except
asynchronous and functionnal for other cards than DAQmx cards.
:param resource_names: list of resources to read, e.g. ["Dev1/ai1", "Dev1/ai2"] for DAQmx cards, or name of the resource if only one channel is to be read.
:type resource_names: list or str
:param terminal_config: list of terminal configs for the channels
:type terminal_config: list
:param volt_min: minimum voltage expected on the channel
:type volt_min: float
:param volt_max: maximum voltage expected on the channel
:type volt_max: float
:param samples_per_chan: number of samples to read on each channel
:type samples_per_chan: int
:param sample_rate: frequency of the clock
:type sample_rate: float
:param coupling_type: coupling for the channel (e.g. AC or DC)
:type coupling_type: str
:param output_filename: filename for direct writting to the disk
:type output_filename: str, optional
:param verbose: verbosity level
:type verbose: bool, optional
"""
if isinstance(resource_names, str):
resource_names = [resource_names]
if isinstance(terminal_config, str):
terminal_config = [terminal_config]
try:
terminal_config[0]
except TypeError:
terminal_config = [terminal_config]
try:
volt_min[0]
except TypeError:
volt_min = [volt_min]
try:
volt_max[0]
except TypeError:
volt_max = [volt_max]
for chan_name, chan_tc, chan_vmin, chan_vmax in zip(
resource_names, terminal_config, volt_min, volt_max
):
volt_range = max([abs(chan_vmin), abs(chan_vmax)])
self.add_channel(chan_name, chan_tc, volt_range)
self.configure_clock(sample_rate, int(samples_per_chan))
self.configure_trigger(None)
data = await self.start_read_stop()
return data
[docs] def read_analog_sync(self, *args, **kwargs):
"""Synchronous wrapper around :meth:`pymanip.aiodaq.AcquisitionCard.read_analog`.
"""
loop = asyncio.get_event_loop()
data = loop.run_until_complete(self.read_analog(*args, **kwargs))
return data