Source code for pymanip.daq.DAQmx

"""DAQmx acquisition module (:mod:`pymanip.daq.DAQmx`)
======================================================

The :mod:`fluidlab.daq.daqmx` module is a simple functional front-end to
the third-party :mod:`PyDAQmx` module. It mainly provides two simple one-liner functions:

- :func:`~fluidlab.daq.daqmx.read_analog`

- :func:`~fluidlab.daq.daqmx.write_analog`

The :mod:`pymanip.daq.DAQmx` module is essentially based on its fluidlab counterpart, with
other choices for the default values of arguments, and an additionnal `autoset` feature for
the :func:`read_analog` function.
It also adds a convenience function for printing the list of DAQmx devices, used by the
pymanip CLI interface.

A discovery function is also added, :func:`~pymanip.daq.daqmx.print_connected_devices`, based
on the dedicated :class:`~pymanip.daq.daqmx.DAQDevice` class, which is used by the
`list_daq` sub-command on pymanip command line.

.. autoclass:: DAQDevice
   :members:
   :private-members:

.. autofunction:: print_connected_devices

.. autofunction:: read_analog

"""

import ctypes
import numpy as np
from fluiddyn.util.terminal_colors import cprint
import fluidlab.daq.daqmx as daqmx
from fluidlab.daq.daqmx import write_analog


[docs]class DAQDevice: """This class is represents a DAQmx device. :param device_name: name of the DAQmx device, e.g. "Dev1" :type device_name: str It mostly implement a number of property getters, which are wrappers to the :mod:`PyDAQmx` low-level functions. In addition, it has a static method, :meth:`~pymanip.daq.DAQmx.DAQDevice.list_connected_devices` to discover currently connected devices. """
[docs] @staticmethod def list_connected_devices(): """This static method discovers the connected devices. :return: connected devices :rtype: list of :class:`pymanip.daq.DAQmx.DAQDevice` objects """ try: from PyDAQmx import DAQmxGetSystemInfoAttribute, DAQmx_Sys_DevNames bufsize = 1024 buf = ctypes.create_string_buffer(bufsize) DAQmxGetSystemInfoAttribute(DAQmx_Sys_DevNames, ctypes.byref(buf), bufsize) return [DAQDevice(s.strip().decode("ascii")) for s in buf.value.split(b",")] except ImportError: print("Cannot list connected devices.") return None pass
def __init__(self, device_name): """Constructor method """ self.device_name = device_name @property def product_category(self): """Device product category (str)""" try: from PyDAQmx import ( DAQmxGetDevProductCategory, DAQmx_Val_MSeriesDAQ, DAQmx_Val_XSeriesDAQ, DAQmx_Val_ESeriesDAQ, DAQmx_Val_SSeriesDAQ, DAQmx_Val_BSeriesDAQ, DAQmx_Val_SCSeriesDAQ, DAQmx_Val_USBDAQ, DAQmx_Val_AOSeries, DAQmx_Val_DigitalIO, DAQmx_Val_TIOSeries, DAQmx_Val_DynamicSignalAcquisition, DAQmx_Val_Switches, DAQmx_Val_CompactDAQChassis, DAQmx_Val_CSeriesModule, DAQmx_Val_SCXIModule, DAQmx_Val_SCCConnectorBlock, DAQmx_Val_SCCModule, DAQmx_Val_NIELVIS, DAQmx_Val_NetworkDAQ, DAQmx_Val_SCExpress, DAQmx_Val_Unknown, ) category = ctypes.c_int32(DAQmx_Val_Unknown) DAQmxGetDevProductCategory(self.device_name, ctypes.byref(category)) return { DAQmx_Val_MSeriesDAQ: "M Series DAQ", DAQmx_Val_XSeriesDAQ: "X Series DAQ", DAQmx_Val_ESeriesDAQ: "E Series DAQ", DAQmx_Val_SSeriesDAQ: "S Series DAQ", DAQmx_Val_BSeriesDAQ: "B Series DAQ", DAQmx_Val_SCSeriesDAQ: "SC Series DAQ", DAQmx_Val_USBDAQ: "USB DAQ", DAQmx_Val_AOSeries: "AO Series", DAQmx_Val_DigitalIO: "Digital I/O", DAQmx_Val_TIOSeries: "TIO Series", DAQmx_Val_DynamicSignalAcquisition: "Dynamic Signal Acquisition", DAQmx_Val_Switches: "Switches", DAQmx_Val_CompactDAQChassis: "CompactDAQ chassis", DAQmx_Val_CSeriesModule: "C Series I/O module", DAQmx_Val_SCXIModule: "SCXI module", DAQmx_Val_SCCConnectorBlock: "SCC Connector Block", DAQmx_Val_SCCModule: "SCC Module", DAQmx_Val_NIELVIS: "NI ELVIS", DAQmx_Val_NetworkDAQ: "Network DAQ", DAQmx_Val_SCExpress: "SC Express", DAQmx_Val_Unknown: "Unknown by DAQmx", }.get(category.value, "Unknown") except ImportError: return None @property def product_type(self): """Device product type""" from PyDAQmx import DAQmxGetDevProductType bufsize = 1024 buf = ctypes.create_string_buffer(bufsize) DAQmxGetDevProductType(self.device_name, buf, bufsize) return buf.value.decode("ascii") @property def product_num(self): """Device product num""" from PyDAQmx import DAQmxGetDevProductNum num = ctypes.c_uint32(0) DAQmxGetDevProductNum(self.device_name, ctypes.byref(num)) return num.value @property def ai_chans(self): """List of the analog input channels on the device""" from PyDAQmx import DAQmxGetDevAIPhysicalChans bufsize = 2048 buf = ctypes.create_string_buffer(bufsize) DAQmxGetDevAIPhysicalChans(self.device_name, buf, bufsize) chans = [s.strip().decode("ascii") for s in buf.value.split(b",")] if chans == [""]: chans = [] return chans @property def ao_chans(self): """List of the analog output channels on the device""" from PyDAQmx import DAQmxGetDevAOPhysicalChans bufsize = 2048 buf = ctypes.create_string_buffer(bufsize) DAQmxGetDevAOPhysicalChans(self.device_name, buf, bufsize) chans = [s.strip().decode("ascii") for s in buf.value.split(b",")] if chans == [""]: chans = [] return chans @property def di_lines(self): """List of digital input lines on the device""" from PyDAQmx import DAQmxGetDevDILines bufsize = 2048 buf = ctypes.create_string_buffer(bufsize) DAQmxGetDevDILines(self.device_name, buf, bufsize) chans = [s.strip().decode("ascii") for s in buf.value.split(b",")] if chans == [""]: chans = [] return chans @property def di_ports(self): """List of digital input ports on the device""" from PyDAQmx import DAQmxGetDevDIPorts bufsize = 2048 buf = ctypes.create_string_buffer(bufsize) DAQmxGetDevDIPorts(self.device_name, buf, bufsize) chans = [s.strip().decode("ascii") for s in buf.value.split(b",")] if chans == [""]: chans = [] return chans @property def do_lines(self): """List of digital output lines on the device""" from PyDAQmx import DAQmxGetDevDOLines bufsize = 2048 buf = ctypes.create_string_buffer(bufsize) DAQmxGetDevDOLines(self.device_name, buf, bufsize) chans = [s.strip().decode("ascii") for s in buf.value.split(b",")] if chans == [""]: chans = [] return chans @property def do_ports(self): """List of digital output ports on the device""" from PyDAQmx import DAQmxGetDevDOPorts bufsize = 2048 buf = ctypes.create_string_buffer(bufsize) DAQmxGetDevDOPorts(self.device_name, buf, bufsize) chans = [s.strip().decode("ascii") for s in buf.value.split(b",")] if chans == [""]: chans = [] return chans @property def bus_type(self): """Bus type connection to the device""" from PyDAQmx import ( DAQmxGetDevBusType, DAQmx_Val_PCI, DAQmx_Val_PXI, DAQmx_Val_SCXI, DAQmx_Val_PCCard, DAQmx_Val_USB, DAQmx_Val_Unknown, ) t = ctypes.c_int32(0) DAQmxGetDevBusType(self.device_name, ctypes.byref(t)) return { DAQmx_Val_PCI: "PCI", DAQmx_Val_PXI: "PXI", DAQmx_Val_SCXI: "SCXI", DAQmx_Val_PCCard: "PCCard", DAQmx_Val_USB: "USB", DAQmx_Val_Unknown: "DAQmx unknown", }.get(t.value, "Unknown") @property def pci_busnum(self): """PCI Bus number""" from PyDAQmx import DAQmxGetDevPCIBusNum num = ctypes.c_uint32(0) DAQmxGetDevPCIBusNum(self.device_name, ctypes.byref(num)) return num.value @property def pci_devnum(self): """PCI Device number""" from PyDAQmx import DAQmxGetDevPCIDevNum num = ctypes.c_uint32(0) DAQmxGetDevPCIDevNum(self.device_name, ctypes.byref(num)) return num.value @property def pxi_chassisnum(self): """PXI Chassis number""" from PyDAQmx import DAQmxGetDevPXIChassisNum num = ctypes.c_uint32(0) DAQmxGetDevPXIChassisNum(self.device_name, ctypes.byref(num)) return num.value @property def pxi_slotnum(self): """PXI Slot number""" from PyDAQmx import DAQmxGetDevPXISlotNum num = ctypes.c_uint32(0) DAQmxGetDevPXISlotNum(self.device_name, ctypes.byref(num)) return num.value @property def location(self): """Description of the location (PCI bus and number, or PXI chassis and slot)""" from PyDAQmx import DAQError bus = self.bus_type if bus == "PCI": desc = "PCI {:d}, {:d}".format(self.pci_busnum, self.pci_devnum) elif bus == "PXI": try: desc = "PXI chassis {:d} slot {:d}".format( self.pxi_chassisnum, self.pxi_slotnum ) except DAQError: # Si le chassis n'est pas identifié alors DAQmx ne peut pas # renvoyer les informations, et une exception est levée desc = "PXI (unidentified)" pass else: desc = bus return desc
# Ici read_analog est verbose=True par défaut contrairement à fluidlab # et on ajoute une fonction "autoset" si volt_min, volt_max sont None
[docs]def read_analog( resource_names, terminal_config, volt_min=None, volt_max=None, samples_per_chan=1, sample_rate=1, coupling_types="DC", output_filename=None, verbose=True, ): """This function reads signal from analog input. :param resources_names: names from MAX (Dev1/ai0) :type resource_names: str, or list :param terminal_config: "Diff", "RSE", "NRSE" :type terminal_config: str, or list :param volt_min: minimum voltage :type volt_min: float, or list, optional :param volt_max: maximum voltage :type volt_max: float, or list, optional :param samples_per_chan: Number of samples to be read per channel :type samples_per_chan: int :param sample_rate: Clock frequency :type sample_rate: float :param coupling_type: Coupling of the channels ("DC", "AC", "GND") :type coupling_type: str, or list :param output_filename: If not None, file to write the acquired data :type output_filename: str, optional :param verbose: Verbosity level. Defaults to True (unlike in Fluidlab) :type verbose: bool, optional If the channel range is not specified, a 5.0 seconds samples will first be acquired to determine appropriate channel range (autoset feature). """ # Les type checks ci-dessous ne sont pas très pythoniques # mais nécessaire parce que PyDAQmx est une passerelle vers le C # et il y a des sous-entendus de type. # Ensure that samples_per_chan is integer if not isinstance(samples_per_chan, int): samples_per_chan = int(samples_per_chan) # Ensure resource_names is str or list of str if isinstance(resource_names, str): num_channels = 1 resource_names = str(resource_names) else: num_channels = len(resource_names) resource_names = [str(r) for r in resource_names] # If no range is provided, take a 5s sample if volt_min is None or volt_max is None: print("Sampling 5s data to determine channel range") if num_channels == 1: volt_min = -10.0 volt_max = 10.0 else: volt_min = [-10.0] * num_channels volt_max = [10.0] * num_channels data = daqmx.read_analog( resource_names, terminal_config, volt_min, volt_max, samples_per_chan=50000, sample_rate=10e3, coupling_types=coupling_types, verbose=False, ) if num_channels == 1: volt_range = np.max(np.abs(data)) * 1.25 volt_min = -volt_range volt_max = volt_range else: for chan in range(num_channels): volt_range = np.max(np.abs(data[chan])) * 1.25 volt_min[chan] = -volt_range volt_max[chan] = volt_range print( "Channel", chan, "min max:", np.min(data[chan]), np.max(data[chan]) ) # Run fluidlab daqmx.read_analog with verbose=True by default data = daqmx.read_analog( resource_names, terminal_config, volt_min, volt_max, samples_per_chan, sample_rate, coupling_types, output_filename, verbose, ) # If verbose, check that voltage range has not been reached and issue a warning otherwise if verbose: if num_channels == 1: channel_range = np.max([np.abs(volt_min), np.abs(volt_max)]) if np.max(np.abs(data)) >= channel_range: cprint.red("WARNING: channel range too small!") else: for chan in range(num_channels): try: channel_range = np.max( [np.abs(volt_min[chan]), np.abs(volt_max[chan])] ) except TypeError: channel_range = np.max([np.abs(volt_min), np.abs(volt_max)]) pass if np.max(np.abs(data[chan])) >= channel_range: cprint.red( "WARNING: channel range is too small for channel " + resource_names[chan] ) return data
# def write_analog(resource_names, sample_rate, signals, blocking=True): # return daqmx.write_analog(resource_names, sample_rate, signals, blocking) def write_analog_end_task(task, timeout=0.0): daqmx.write_analog_end_task(task, timeout) def measure_freq(resource_name, freq_min=1, freq_max=1000): return daqmx.measure_freq(resource_name, freq_min, freq_max)