from __future__ import absolute_import
from __future__ import print_function
import argparse
import logging
import time
import ast
import json
import random
from xylem import Producer
import larpix.quickstart as larpix_quickstart
from larpix.io.fakeio import FakeIO
from larpix.io.zmq_io import ZMQ_IO
from larpix.io.multizmq_io import MultiZMQ_IO
import larpix.configs as configs
import larpix.larpix as larpix
from larpixdaq.packetformat import toBytes
from larpixdaq.routines import ROUTINES, init_routines
from larpixdaq.logger_producer import DAQLogger
from larpixdaq.core import CORE_PORT
[docs]class LArPixProducer(object):
"""The entry point of LArPix data into the xylem DAQ pipeline.
On initialization, the LArPixProducer object will be connected to
the DAQ Core, loaded up with the ``'pcb-1'`` Controller
configuration, connected to the appropriate IO instance, and set up
with a ``larpix.logger.Logger`` subclass instance that logs all data
into xylem (disabled on initialization, accessible at
``self.board.logger``).
The producer is the component with direct contact into the LArPix
environment. As such, the producer receives data from the data board
and sends it into the DAQ chain. It also sends configuration
commands to the LArPix ASICs and runs custom DAQ routines such as
threshold scans and calibrations.
Implementation-wise, this all happens via a larpix-control
Controller object (not to be confused with the DAQ Controller).
Custom routines can be implemented using the
:py:mod:`larpixdaq.routines` package. Custom
Routines are managed in a Routine object, in which you should store
the routine name, function handle/reference, and list of parameters.
(TODO!!! allow for documentation for custom routines.) Routines can
access the DAQ functionality via their arguments controller,
send_data, send_info. They can also accept additional arguments.
Routines must return a tuple of (controller, result) where result is
the output of the routine (e.g. a list of thresholds, or even simply
the string "success") which must be JSON-serializable.
:var producer: the xylem Producer object used to send data
:var board: the ``larpix.larpix.Controller`` instance used to gather
data
:var current_boardname: the short name of the layout/configuration
used for the Controller object, e.g. ``'pcb-1'``.
:var state: the DAQ State of the xylem Producer component
:param output_address: the full TCP address (including port number)
that data will be published to
:param core_address: the full TCP address (including port number) of
the DAQ Core
:param log_address: the full TCP address (including port number) of
the DAQ Log
:param io_config: a list with the IO class name in position 0, and,
optionally, the positional arguments to pass to the IO class
constructor. E.g. ``['FakeIO']`` or ``['MultiZMQ_IO',
'io/default.json']``.
"""
def __init__(self, output_address, core_address, log_address,
io_config):
kwargs = {
'core_address': core_address,
'log_address': log_address,
'heartbeat_time_ms': 300,
}
self.producer = Producer(output_address, name='LArPix board', group='BOARD', **kwargs)
self.board = larpix.Controller()
io_class = io_config[0]
io_args = io_config[1:]
if io_class == 'FakeIO':
self.board.io = FakeIO(*io_args)
elif io_class == 'ZMQ_IO':
self.board.io = ZMQ_IO(*io_args)
elif io_class == 'MultiZMQ_IO':
self.board.io = MultiZMQ_IO(*io_args)
else:
raise ValueError('Invalid IO class from --io-config: %s' %
io_config[0])
self.board.load('controller/pcb-1_chip_info.json')
self.current_boardname = 'pcb-1'
self.board.logger = DAQLogger(self.producer)
self.state = ''
run = False
configurations = {
'startup': 'startup.json',
'quiet': 'quiet.json',
}
############
# Routines #
############
init_routines()
self.producer.register_action(*self._get_register_action_args('write_config'))
self.producer.register_action(*self._get_register_action_args('read_config'))
self.producer.register_action(*self._get_register_action_args('validate_config'))
self.producer.register_action(*self._get_register_action_args('retrieve_config'))
self.producer.register_action(*self._get_register_action_args('send_config'))
self.producer.register_action(*self._get_register_action_args('get_boards'))
self.producer.register_action(*self._get_register_action_args('load_board'))
self.producer.register_action(*self._get_register_action_args('list_routines'))
self.producer.register_action(*self._get_register_action_args('run_routine'))
self.producer.register_action(*self._get_register_action_args('sleep'))
self.producer.request_state()
def _get_register_action_args(self, name):
"""Return a tuple that can be passed as
``*self._get_register_action_args(name)`` to
producer.register_action.
:param name: the (string) name of the method to use for the
routine. Doubles as the routine name.
"""
method = getattr(self, name)
return (name, method, method.__doc__)
[docs] def write_config(self, key, registers_str=''):
"""Send the given configuration to the board.
:param key: the chip key to send the configuration to
:param registers_str: the configuration registers to send,
specified as an int, a list of ints, or a string specifying a
literal int or list of ints (e.g. ``'[1, 2, 10]'``).
"""
if registers_str: # treat as int or list
registers = ast.literal_eval(registers_str)
else:
registers = None
self.board.write_configuration(key, registers, 0, None)
return 'success'
[docs] def read_config(self, key, registers_str=''):
"""Read configurations from the board.
:param key: the chip key to send the configuration to
:param registers_str: the configuration registers to send,
specified as an int, a list of ints, or a string specifying a
literal int or list of ints (e.g. ``'[1, 2, 10]'``).
"""
if registers_str: # treat as int or list
registers = ast.literal_eval(registers_str)
else:
registers = None
if isinstance(self.board.io, FakeIO):
chip = self.board.get_chip(key)
packets = chip.get_configuration_packets(
larpix.Packet.CONFIG_WRITE_PACKET)
for p in packets:
p.packet_type = larpix.Packet.CONFIG_READ_PACKET
p.assign_parity()
self.board.io.queue.append((packets, b'some bytes'))
self.board.read_configuration(key, registers, message=None)
packets = self.board.reads[-1]
result = '\n'.join(str(p) for p in packets if p.packet_type
== p.CONFIG_READ_PACKET)
return result
[docs] def validate_config(self, key):
"""Read configurations from the board and compare to those stored
in software, returning ``True`` if they're equal.
:param key: the chip key whose configuration will be validated
"""
if isinstance(self.board.io, FakeIO):
chip = self.board.get_chip(key)
packets = chip.get_configuration_packets(
larpix.Packet.CONFIG_WRITE_PACKET)
for p in packets:
p.packet_type = larpix.Packet.CONFIG_READ_PACKET
self.board.io.queue.append((packets, b'some bytes'))
result = self.board.verify_configuration(key)
return result
[docs] def retrieve_config(self, key):
"""Return the current configuration stored in software for the
given chip.
:param key: the chip key whose configuration will be retrieved
"""
chip = self.board.get_chip(key)
return chip.config.to_dict()
[docs] def send_config(self, updates):
"""Apply the given updates to the software configuration.
:param updates: a dict of configuration register updates
compatible with ``larpix.larpix.Config.from_dict``.
"""
for key, chip_updates in updates.items():
chip = self.board.get_chip(key)
chip.config.from_dict(chip_updates)
return 'success'
[docs] def get_boards(self):
"""List the available boards and chip keys, and the current board
name.
"""
boards = [
'pcb-1',
'pcb-2',
'pcb-3',
'pcb-4',
'pcb-5',
'pcb-6',
'pcb-10',
]
board_data = []
for boardname in boards:
result = configs.load('controller/' + boardname +
'_chip_info.json')
boarditem = {
'name': result['name'],
'chips': result['chip_list'],
}
board_data.append(boarditem)
return {'data': board_data, 'current': self.current_boardname}
[docs] def load_board(self, filename):
"""Load the board (Controller) configuration located at the given
filename.
:param filename: the file name to load. If using a pre-installed
configuration, the file name must begin with the standard
``'controller/'`` directory prefix.
"""
data = configs.load(filename)
self.current_boardname = data['name']
self.board.load(filename)
return 'success'
[docs] @staticmethod
def list_routines():
"""List the available routines."""
init_routines()
return [{
'name': name,
'params': [{'name': p, 'type': 'input'} for p in
r.params],
}
for name, r in ROUTINES.items()
]
[docs] @staticmethod
def load_routines(location):
"""Load the routines saved at ``location``.
:param location: the directory to load routines from
"""
init_routines(location)
return self.list_routines()
[docs] def run_routine(self, name, *args):
"""Run the given routine.
:param name: the name of the routine to run
:param args: all subsequent arguments are passed in order to the
routine as parameters
"""
def send_data(packet_list, metadata=None):
self.producer.produce(toBytes(packet_list), metadata)
return
self.board, result = ROUTINES[name].func(self.board, send_data,
self.producer.send_info, *args)
return result
[docs] @staticmethod
def sleep(time_in_sec):
"""Sleep and return success."""
delay = int(time_in_sec)
time.sleep(delay)
return 'success'
[docs] def run(self):
"""Event loop of checking for DAQ commands, checking for new
data, and repeating.
If the DAQ State is RUN, the data logger will be enabled so that
newly-arrived data will be sent to the xylem pipeline. In all
other DAQ States, the data logger will be disabled so data will
not be send down the pipeline.
If the IO object on ``self.board`` is a FakeIO object, fake data
will be generated to mimic data arriving from the LArPix board.
"""
while True:
self.producer.receive(0.25)
if self.state != self.producer.state:
old_state = self.state
new_state = self.producer.state
print('State update: New state: %s' % new_state)
if old_state == 'RUN':
self.producer.send_info('Ending run')
self.board.logger.disable()
if new_state == 'RUN':
self.producer.send_info('Beginning run')
self.board.logger.enable()
fake_timestamp = 0
self.state = self.producer.state
if self.state == 'RUN':
if not self.board.io.is_listening:
logging.debug('about to start listening')
self.board.start_listening()
if isinstance(self.board.io, FakeIO):
packets = []
for _ in range(300):
p = larpix.Packet()
p.timestamp = fake_timestamp % 16777216
p.dataword = int(sum(random.random() for _ in range(256)))
chip = random.choice(list(self.board.chips.values()))
p.chipid = chip.chip_id
p.channel_id = random.randint(0, 31)
fake_timestamp += 1
p.assign_parity()
p.chip_key = '%d-%d-%d' % (1, 1, chip.chip_id)
packets.append(p)
self.board.io.queue.append((packets, p.bytes() + b'\x00'))
data = self.board.read()
else:
if self.board.io.is_listening:
self.board.stop_listening()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Launch the data '
'interface between LArPix and the xylem DAQ pipeline')
parser.add_argument('address',
help='The address to publish data to including port number')
parser.add_argument('--core', default='tcp://127.0.0.1',
help='The address of the DAQ Core, not including port number')
parser.add_argument('--log-address', default='tcp://127.0.0.1:56789',
help='Address to connect to global log, including port number')
parser.add_argument('--io-config', nargs='+', required=True,
help='<IO class> [constructor arguments], e.g. "ZMQ_IO io/default.json"')
parser.add_argument('-d', '--debug', action='store_true',
help='Enter debug (verbose) mode')
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
address = args.address
core_url = args.core
core_address = core_url + (':%d' % CORE_PORT)
io_config = [args.io_config[0]]
for arg in args.io_config[1:]:
try:
parsed_arg = ast.literal_eval(arg)
except ValueError:
parsed_arg = arg
io_config.append(parsed_arg)
producer = LArPixProducer(address, core_address, args.log_address,
io_config)
try:
producer.run()
except KeyboardInterrupt:
pass
finally:
producer.producer.cleanup()