"""Concrete implementation with niscope (:mod:`pymanip.aiodaq.scope`)
=====================================================================
This module is a concrete implementation of the :class:`~pymanip.aiodaq.AcquisitionCard`
class using the :mod:`niscope` module.
.. note::
We have tested this module only for with a PXI-5922 card.
.. autoclass:: ScopeSystem
:members:
:private-members:
.. autofunction:: get_device_list
"""
import asyncio
import time
import warnings
import subprocess as sp
from niScope import Scope, SLOPE, TRIGGER_SOURCE, ScopeException
from pymanip.aiodaq import TriggerConfig, AcquisitionCard, TimeoutException
try:
from pymanip.aiodaq.daqmx import get_device_list as daqmx_get_devices
has_daqmx = True
except ImportError:
has_daqmx = False
try:
from pymanip.nisyscfg import scope_devices
has_nisyscfg = True
except (ImportError, OSError):
has_nisyscfg = False
possible_sample_rates = [60e6 / n for n in range(4, 1201)]
[docs]class ScopeSystem(AcquisitionCard):
"""This class is the concrete implentation for NI Scope cards.
:param scope_name: the name of the scope device, e.g. "Dev1"
:type scope_name: str
"""
def __init__(self, scope_name=None):
super(ScopeSystem, self).__init__()
self.scope_name = scope_name
if scope_name:
self.scope = Scope(scope_name)
else:
self.scope = None
self.trigger_set = False
@property
def samp_clk_max_rate(self):
"""Maximum rate for the board clock.
"""
return max(possible_sample_rates)
[docs] def possible_trigger_channels(self):
"""This method returns the list of possible channels for external triggering.
"""
return ["Ext"] + self.channels
[docs] def close(self):
"""This method closes the connection to the board.
"""
if self.scope:
self.scope.close()
self.scope = None
[docs] def add_channel(self, channel_name, terminal_config, voltage_range):
"""Concrete implementation of :meth:`pymanip.aiodaq.AcquisitionCard.add_channel`.
"""
cn = str(channel_name)
if "/" in cn:
scope_name, cn = cn.split("/")
else:
scope_name = self.scope_name
if not self.scope:
self.scope_name = scope_name
self.scope = Scope(scope_name)
if cn not in self.channels:
self.channels.append(cn)
self.scope.ConfigureVertical(channelList=cn, voltageRange=voltage_range)
actual_range = self.scope.ActualVoltageRange(cn)
self.actual_ranges.append(actual_range)
if actual_range != voltage_range:
warnings.warn(
f"Actual range is {actual_range:} " f"for chan {cn:}.",
RuntimeWarning,
)
[docs] def start(self):
"""Concrete implementation for :meth:`pymanip.aiodaq.AcquisitionCard.start`
"""
if not self.trigger_set:
self.configure_trigger()
self.scope.InitiateAcquisition()
self.running = True
[docs] async def stop(self):
"""Concrete implementation for :meth:`pymanip.aiodaq.AcquisitionCard.stop`
"""
if self.running:
self.running = False
while self.reading:
await asyncio.sleep(1.0)
self.scope.Abort()
[docs] async def read(self, tmo=None):
"""Concrete implementation for :meth:`pymanip.aiodaq.AcquisitionCard.read`
"""
self.reading = True
start = time.monotonic()
data = None
while self.running:
try:
data = await self.loop.run_in_executor(
None, self.scope.Fetch, ",".join(self.channels), None, 1.0
)
break
except ScopeException:
if tmo and time.monotonic() - start > tmo:
raise TimeoutException()
else:
continue
if data is not None:
self.last_read = time.monotonic()
if len(self.channels) > 1:
data = data.T
else:
data = data[:, 0]
self.reading = False
return data
[docs]def get_device_list(daqmx_devices=None, verbose=False):
"""This function gets the list of Scope device in the system. If NI System
Configuration is available, the list is grabbed from this library. Otherwise,
the function attempts to use the `nilsdev` command line tool.
Because `nilsdev` returns both DAQmx and Scope devices, the list of DAQmx devices
is queried to remove them from the returned list. If the user code has already
queried them, it is possible to pass them to avoid unnecessary double query.
:param daqmx_devices: the list of DAQmx devices.
:type daqmx_devices: list, optional
:param verbose: sets verbosity level
:type verbose: bool, optional
"""
if has_nisyscfg:
return {
f"{devname:} (scope)": [f"{devname:}/{ii:d}" for ii in range(2)]
for devname in scope_devices()
}
else:
# NI-Scope backend
# In principle, use nisyscfg, see essai_nisyscfg.py (does not work
# on old Linux where NI System Configuration is too old.)
# Workaround: call nilsdev. Any device not previously found by the
# DAQmx backend is a NI-Scope (presumably)
# (Only works on Linux, because there is no nilsdev on Windows)
raw_data = sp.run("/usr/local/bin/nilsdev", capture_output=True)
boards = dict()
for line in raw_data.stdout.decode("ascii").split("\n"):
line = line.strip()
if "[Not Present]" in line:
continue
if not line:
continue
board_type, desc = line.split(":")
desc = desc.strip()
board_type = board_type.strip()
if desc.startswith('"') and desc.endswith('"'):
devname = desc[1:-1]
boards[devname] = board_type
else:
print("Wrong format", desc)
if daqmx_devices is None:
if has_daqmx:
daqmx_devices = daqmx_get_devices()
else:
print("Could not make sure boards are NI-Scope and not DAQmx")
daqmx_devices = dict()
for daqmx_board_desc, daqmx_devlist in daqmx_devices.items():
board, devnum = daqmx_devlist[0].split("/")
if verbose:
print("DAQmx board:", board)
if board in boards:
del boards[board]
if verbose:
print(f"Removing board {board:} from NI-Scope list.")
return {
f"{devname:} ({board_type:})": [f"{devname:}/{ii:d}" for ii in range(2)]
for devname, board_type in boards.items()
}