From 40c58d97ecbe1dfd1a6f32c2f4507d6f5f31f768 Mon Sep 17 00:00:00 2001 From: gac-x07da Date: Tue, 30 Jul 2024 18:06:32 +0200 Subject: [PATCH] feat: initial commit --- ophyd_devices/devices/panda_box.py | 190 +++++++++++++++++++++++++++++ pyproject.toml | 1 + 2 files changed, 191 insertions(+) create mode 100644 ophyd_devices/devices/panda_box.py diff --git a/ophyd_devices/devices/panda_box.py b/ophyd_devices/devices/panda_box.py new file mode 100644 index 0000000..bec45e6 --- /dev/null +++ b/ophyd_devices/devices/panda_box.py @@ -0,0 +1,190 @@ + +from ophyd_devices.utils.controller import ControllerError +from pandablocks.blocking import BlockingClient +from pandablocks.responses import ReadyData, EndData +from pandablocks.commands import GetState, SetState, Arm, Disarm, Raw +import os +import threading +from ophyd import Device, DeviceStatus, Component, Kind +from ophyd_devices.sim.sim_signals import SetableSignal +from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin +from ophyd_devices.utils import bec_utils +from bec_lib.messages import DeviceMessage +from bec_lib.endpoints import MessageEndpoints +import numpy as np + + +class PandaControllerError(ControllerError): + pass + + +class PandaController(Device): + + capture_signal_names = Component(SetableSignal, value=[""], kind=Kind.normal) + config_path = Component(SetableSignal, value="", kind=Kind.normal) + + SUB_VALUE = "" + + def __init__(self, *, name, socket_host, kind=None, read_attrs=None, configuration_attrs=None, parent=None, device_manager=None,**kwargs): + super().__init__(name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs) + self.connector = None + self.scaninfo = None + self.data_thread = None + self.kickoff_thread = None + self.started_event = None + self.end_event = None + self.socket_host = socket_host + if device_manager: + self._update_service_config() + self.device_manager = device_manager + else: + self.device_manager = bec_utils.DMMock() + base_path = kwargs["basepath"] if "basepath" in kwargs else "." + self.service_cfg = {"base_path": os.path.abspath(base_path)} + + self.connector = self.device_manager.connector + self.data_bucket = [] + self._stream_ttl = 1800 + + def _update_scaninfo(self) -> None: + """Update scaninfo from BecScaninfoMixing + This depends on device manager and operation/sim_mode + """ + self.scaninfo = BecScaninfoMixin(self.device_manager) + self.scaninfo.load_scan_metadata() + + def _get_capture_signal_keys(self) -> list[str]: + ret = self.send_command(Raw("*CAPTURE?")) + return [key.split(" ")[0].strip("!") for key in ret if key.strip(".")] + + def write_state_to_disk(self, filename:str): + state = "\n".join(self.get_state_panda()) + with open(filename, "w") as f: + f.write(state) + + def get_state_panda(self) -> list[str]: + with BlockingClient(self.socket_host) as client: + out = client.send(GetState()) + return out + + def load_state_from_disk(self, filename:str): + if not os.path.exists(filename): + raise PandaControllerError(f"Could not find state file {filename}.") + with open(filename, "r") as f: + input_data = f.read().splitlines() + with BlockingClient(self.socket_host) as client: + client.send(SetState(input_data)) + + def _send_data_to_bec(self, data:np.array) -> None: + out = defaultdict(list) + keys = data.dtype.names + for entry in data: + for i in range(len(keys)): + out[keys[i]].append(entry[i]) + msg = DeviceMessage(signals=out, metadata={})#TODO add here scan_msg metadata + done flag + self.connector.xadd( + topic=MessageEndpoints.device_async_readback(scan_id=self.scaninfo.scan_id, device=self.name), + msg={"data" : msg}, + expire=self._stream_ttl + ) + + + def _run_data_readout(self): + with BlockingClient(self.socket_host) as client: + try: + for data in client.data(scaled=False): + if isinstance(data, ReadyData): + # print(f"received ready data: {data}") + self.started_event.set() + continue + + if isinstance(data, dict): + # print(f"received frame data: {data}") + self.data_bucket.append(data) + continue + + if isinstance(data, EndData): + print(f"received end data {data}") + self.end_event.set() + break + finally: + client.send(Disarm()) + + def run(self): + self.started_event = threading.Event() + self.end_event = threading.Event() + self.data_thread = threading.Thread(target=self._run_data_readout, daemon=True) + self.data_thread.start() + + def stage(self) -> list[object]: + self.capture_signal_names.set(self._get_capture_signal_keys()) + self.run() + return super().stage() + + def send_command(self, command): + with BlockingClient(self.socket_host) as client: + out = client.send(command) + return out + + def _wait_for_scan_start(self, status:DeviceStatus): + try: + self.started_event.wait(10) + self.send_command(Arm()) + status.set_finished() + except Exception as exc: + status.set_exception(exc) + + def kickoff(self) -> DeviceStatus: + # wait for the ready data to arrive + status = DeviceStatus(device=self) + if self.started_event.is_set(): + self.send_command(Arm()) + status.set_finished() + else: + self.kickoff_thread = threading.Thread(target=self._wait_for_scan_start, args=(status,), daemon=True) + self.kickoff_thread.start() + return status + + def stop(self, success:bool=False): + self.send_command(Disarm()) + if self.kickoff_thread: + self.end_event.wait(10) + self.kickoff_thread.join() + if self.data_thread: + self.data_thread.join() + self.data_thread = None + self.kickoff_thread = None + self.started_event = None + self.end_event = None + return super().stop(success=success) + + + def unstage(self): + self.stop(success=True) + return super().unstage() + +if __name__ == "__main__": + import time + controller = PandaController(name="redpanda", socket_host="x02da-panda-2.psi.ch") + # controller.write_state_to_disk("test_config_3.ini") + controller.load_state_from_disk("test_config.ini") + # start_time = time.time() + # controller.stage() + # print(f"\n Time after stage: {time.time()- start_time}\n") + # controller.kickoff().wait() + # print(f"\nTime after kickoff {time.time()- start_time}\n") + # time.sleep(2) + # print(f"\nTime after sleep {time.time()- start_time}\n") + # controller.unstage() + # print(f"\nTime after unstage {time.time()- start_time}\n") + + # print("-----------") + # print(len(controller.data_bucket)) + # print(sum([len(data)for data in controller.data_bucket])) + + + # try: + # controller.on() + # controller.write_state_to_disk("test_config.ini") + # finally: + # controller.off() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index e1c4afe..7c8ed28 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ dependencies = [ "pytest ~= 8.0", "h5py ~= 3.10", "hdf5plugin ~= 4.3", + "pandablocks ~= 0.10", ] [project.optional-dependencies]