'''
The operator interface for the LArPix DAQ system.
'''
import xylem
from larpixdaq.core import CORE_PORT
end_receive_loop_headers = {
'ACTIONS',
'ACTION RESULT',
'STATE',
'STATE REQUEST',
'CLIENTS',
}
[docs]class Operator(object):
"""The LArPix DAQ Operator module provides the interface into the
DAQ core for all DAQ operations.
The Operator class handles all of the needs of an DAQ operator:
start/end runs, load/validate configurations, run calibrations,
examine data samples and data rates, etc.
Operator methods interact with the DAQ Core to accomplish the
desired behavior. The DAQ Core can send
multiple responses for a single request - e.g. an immediate
acknowledgement of receipt and then the eventual result. The
methods implementing these interactions return generator
iterators <https://docs.python.org/3/glossary.html#term-generator>
rather than values. The way to call these functions usually looks
like::
o = Operator()
final_responses = []
for response in o.run_routine('example'):
print(response)
# interact with response object within loop
# When the loop ends, the last response received is still saved in
# the response object
final_responses.append(response)
Each method has an optional keyword parameter ``timeout``. If
omitted (or if ``None``), then the Operator will wait forever for a
response from the DAQ Core. If ``timeout`` is provided, then the
Operator will yield ``None`` as an indication of an error after a
maximum wait of 10 times the given timeout, in seconds.
.. note:: if the timeout limit is reached and the "missing" message
arrives later, it will be confused with future results.
Operator objects do not maintain useful internal state, so it is
acceptable (and recommended!) to initialize a new Operator if
the current Operator ran into a timeout issue.
:param address: the TCP address of the DAQ Core. The port will be
added automatically. (Optional, if omitted or ``None``, will
default to ``tcp://127.0.0.1``.)
"""
def __init__(self, address=None):
if address is None:
address = 'tcp://127.0.0.1:%d' % CORE_PORT
else:
address += ':%d' % CORE_PORT
self._controller = xylem.Controller(address)
[docs] def cleanup(self):
"""Clean up the ZMQ objects used in the Operator.
Only necessary if you are initializing and destroying multiple
Operators in one session.
"""
self._controller.cleanup()
def _receive_loop(self, timeout=None):
"""Loop through receiving incoming messages, up to a timeout.
The loop terminates if either:
- the message header is one of the selected "end transmission"
headers listed in this module's ``end_receive_loop_headers``
list, or
- 10 calls to receive have been made (and possibly timed-out)
"""
header = None
max_loops = 10
nloops = 0
while (header not in end_receive_loop_headers
and nloops < max_loops):
result = self._controller.receive(timeout)
nloops += 1
if result is not None:
header = result['header']
yield result
if result is None:
yield None
[docs] def get_boards(self, timeout=None):
"""Get a list of board (PCB) names available to load."""
self._controller.send_action('LArPix board', 'get_boards', [])
for result in self._receive_loop(timeout):
yield result
[docs] def load_board(self, filename, timeout=None):
"""Load the particular board (PCB) into LArPix Control as a
``Controller`` configuration.
:param filename: the file to load, e.g.
``'controller/pcb-1_chip_info.json'``
"""
self._controller.send_action('LArPix board', 'load_board',
[filename])
for result in self._receive_loop(timeout):
yield result
[docs] def retrieve_pixel_layout(self, timeout=None):
"""Fetch the currently-loaded pixel geometry layout."""
self._controller.send_action('Online monitor', 'retrieve_pixel_layout',
[])
for result in self._receive_loop(timeout):
yield result
[docs] def load_pixel_layout(self, filename, timeout=None):
"""Load the specified pixel layout into the online monitor.
:param filename: the file name of the layout, e.g.
``sensor_plane_28_full.yaml``
"""
self._controller.send_action('Online monitor', 'load_pixel_layout',
[filename])
for result in self._receive_loop(timeout):
yield result
### Configurations
[docs] def write_configuration(self, chip, timeout=None):
"""Send the configuration values from software onto the ASIC.
:param chip: the chip key as a string
"""
self._controller.send_action('LArPix board', 'write_config',
[chip])
for result in self._receive_loop(timeout):
yield result
[docs] def read_configuration(self, chip, timeout=None):
"""Read the configuration values from the ASIC.
:param chip: the chip key as a string
"""
self._controller.send_action('LArPix board', 'read_config',
[chip])
for result in self._receive_loop(timeout):
yield result
[docs] def validate_configuration(self, chip, timeout=None):
"""Read the configuration from the specified LArPix ASIC and return
``(True/False, {name: (actual, stored)})``.
:param chip: the chip key as a string
"""
self._controller.send_action('LArPix board', 'validate_config',
[chip])
for result in self._receive_loop(timeout):
yield result
[docs] def retrieve_configuration(self, chip, timeout=None):
"""Return a dict of the current configuration stored in software
for the given chipid.
:param chip: the chip key as a string
:returns: a dict mapping the configuration item name (could be
multiple or part of a register) to the value.
"""
self._controller.send_action('LArPix board', 'retrieve_config',
[chip])
for result in self._receive_loop():
yield result
[docs] def send_configuration(self, updates, timeout=None):
"""Send the given configuration updates to the LArPix control
software.
:param updates: a dict mapping chip keys to a dict readable by
the LArPix Control ``Configuration.from_dict``. Note that
omitted registers will not be updated (but also will not be
deleted or reset).
"""
self._controller.send_action('LArPix board', 'send_config',
[updates])
for result in self._receive_loop():
yield result
### Calibrations
[docs] def list_routines(self):
'''
Return a list of routines/calibrations.
'''
self._controller.send_action('LArPix board',
'list_routines', [])
for result in self._receive_loop():
yield result
[docs] def load_routines(self, location):
"""Load any routines saved at ``location`` into the
LArPix Producer.
:param location: the directory containing the routines files to
load.
:returns: the new list of routines (same as subsequently calling
``list_routines``)
"""
self._controller.send_action('LArPix board', 'load_routines',
[location])
for result in self._receive_loop():
yield result
[docs] def run_routine(self, name, *args, timeout=None):
'''
Run the given routine and return the routine's output.
'''
self._controller.send_action('LArPix board',
'run_routine', [name] + list(args))
for result in self._receive_loop(timeout):
yield result
### Physics runs
[docs] def prepare_physics_run(self, timeout=None):
'''
Enter the "READY" DAQ state so that all DAQ components are ready
to begin the physics run.
'''
self._controller.request_state_change('READY')
for result in self._receive_loop(timeout):
yield result
[docs] def begin_physics_run(self, timeout=None):
'''
Begin taking physics data, activate online data monitoring and
analytics, and store the data in offline storage.
'''
self._controller.request_state_change('RUN')
for result in self._receive_loop(timeout):
yield result
[docs] def end_physics_run(self, timeout=None):
'''
Stop taking physics data, deactivate online data monitoring and
analytics, and finalize the offline storage.
'''
self._controller.request_state_change('STOP')
for result in self._receive_loop(timeout):
yield result