Convert value from caproto type to Python types

This commit is contained in:
2025-07-23 09:13:02 +02:00
parent 962f9bcebf
commit 20f86f825b

View File

@@ -4,6 +4,33 @@ import os
import pytest
import yaml
from caproto.sync.client import read, write
from caproto import CaprotoTimeoutError, ChannelType
FTYPE_TO_TYPE = {
ChannelType.STRING: str,
ChannelType.INT: int,
ChannelType.FLOAT: float,
ChannelType.ENUM: int,
ChannelType.CHAR: bytes,
ChannelType.LONG: int,
ChannelType.DOUBLE: float,
ChannelType.TIME_STRING: str,
ChannelType.TIME_INT: int,
ChannelType.TIME_FLOAT: float,
ChannelType.TIME_ENUM: int,
ChannelType.TIME_CHAR: bytes,
ChannelType.TIME_LONG: int,
ChannelType.TIME_DOUBLE: float,
ChannelType.CTRL_STRING: str,
ChannelType.CTRL_INT: int,
ChannelType.CTRL_FLOAT: float,
ChannelType.CTRL_ENUM: int,
ChannelType.CTRL_CHAR: bytes,
ChannelType.CTRL_LONG: int,
ChannelType.CTRL_DOUBLE: float,
}
def read_config():
@@ -16,7 +43,7 @@ class MajorState(Exception):
pass
class Axis:
class Motor:
# Motor record fields
fields = {
@@ -60,11 +87,38 @@ class Axis:
def write_field(self, fieldname, value):
self.write('.' + self.fields[fieldname], value)
def read(self, suffix):
return read(self.pv + suffix).data
def read(self, suffix, as_string=False):
response = read(self.pv + suffix).data
return self._convert_value(response, as_string)
def read_field(self, fieldname):
return self.read('.' + self.fields[fieldname])
def _convert_value(self, response, as_string=False):
# By default, the response data is always a list to cover all possible
# readback values from EPICS (lists, strings, chars, numbers). The last
# two cases need to be treated in a special way. They can be identified
# by the list length being 1.
if len(response.data) == 1:
value = response.data[0]
if isinstance(value, bytes):
return value.decode()
# If an empty string is returned, the data has a single entry
# (the NULL terminator)
if as_string:
return bytes(response.data).rstrip(b'\x00').decode(
encoding='utf-8', errors='ignore')
return value
# Strings are read with their NULL terminator, hence it needs to be
# stripped before decoding
if as_string:
return bytes(response.data).rstrip(b'\x00').decode(
encoding='utf-8', errors='ignore')
return response.data
def read_field(self, fieldname, as_string=False):
return self.read('.' + self.fields[fieldname], as_string)
def move_and_wait(self, target):
"""
@@ -107,7 +161,7 @@ class Axis:
return bool(str[15])
class SinqMotor(Axis):
class SinqMotor(Motor):
# PV suffixes used in SinqMotor drivers
suffixes = {
'enable': ':Enable',
@@ -130,5 +184,5 @@ class SinqMotor(Axis):
return self.read('.' + self.fields[fieldname])
class TurboPMAC(Axis):
class TurboPMAC(SinqMotor):
pass