From 20f86f825bea718148d626e07f9f72485173547d Mon Sep 17 00:00:00 2001 From: smathis Date: Wed, 23 Jul 2025 09:13:02 +0200 Subject: [PATCH] Convert value from caproto type to Python types --- common.py | 68 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 61 insertions(+), 7 deletions(-) diff --git a/common.py b/common.py index a4a4e40..459aea4 100644 --- a/common.py +++ b/common.py @@ -4,6 +4,33 @@ import os import pytest import yaml from caproto.sync.client import read, write +from caproto import CaprotoTimeoutError, ChannelType + +FTYPE_TO_TYPE = { + ChannelType.STRING: str, + ChannelType.INT: int, + ChannelType.FLOAT: float, + ChannelType.ENUM: int, + ChannelType.CHAR: bytes, + ChannelType.LONG: int, + ChannelType.DOUBLE: float, + + ChannelType.TIME_STRING: str, + ChannelType.TIME_INT: int, + ChannelType.TIME_FLOAT: float, + ChannelType.TIME_ENUM: int, + ChannelType.TIME_CHAR: bytes, + ChannelType.TIME_LONG: int, + ChannelType.TIME_DOUBLE: float, + + ChannelType.CTRL_STRING: str, + ChannelType.CTRL_INT: int, + ChannelType.CTRL_FLOAT: float, + ChannelType.CTRL_ENUM: int, + ChannelType.CTRL_CHAR: bytes, + ChannelType.CTRL_LONG: int, + ChannelType.CTRL_DOUBLE: float, +} def read_config(): @@ -16,7 +43,7 @@ class MajorState(Exception): pass -class Axis: +class Motor: # Motor record fields fields = { @@ -60,11 +87,38 @@ class Axis: def write_field(self, fieldname, value): self.write('.' + self.fields[fieldname], value) - def read(self, suffix): - return read(self.pv + suffix).data + def read(self, suffix, as_string=False): + response = read(self.pv + suffix).data + return self._convert_value(response, as_string) - def read_field(self, fieldname): - return self.read('.' + self.fields[fieldname]) + def _convert_value(self, response, as_string=False): + + # By default, the response data is always a list to cover all possible + # readback values from EPICS (lists, strings, chars, numbers). The last + # two cases need to be treated in a special way. They can be identified + # by the list length being 1. + if len(response.data) == 1: + value = response.data[0] + if isinstance(value, bytes): + return value.decode() + + # If an empty string is returned, the data has a single entry + # (the NULL terminator) + if as_string: + return bytes(response.data).rstrip(b'\x00').decode( + encoding='utf-8', errors='ignore') + + return value + + # Strings are read with their NULL terminator, hence it needs to be + # stripped before decoding + if as_string: + return bytes(response.data).rstrip(b'\x00').decode( + encoding='utf-8', errors='ignore') + return response.data + + def read_field(self, fieldname, as_string=False): + return self.read('.' + self.fields[fieldname], as_string) def move_and_wait(self, target): """ @@ -107,7 +161,7 @@ class Axis: return bool(str[15]) -class SinqMotor(Axis): +class SinqMotor(Motor): # PV suffixes used in SinqMotor drivers suffixes = { 'enable': ':Enable', @@ -130,5 +184,5 @@ class SinqMotor(Axis): return self.read('.' + self.fields[fieldname]) -class TurboPMAC(Axis): +class TurboPMAC(SinqMotor): pass