Extended motor class

This commit is contained in:
mohacsi_i 2024-02-23 16:15:09 +01:00
parent a7593bb90b
commit 204274ff8c
2 changed files with 40 additions and 9 deletions

View File

@ -1,6 +1,7 @@
es1_roty: es1_roty:
description: 'Test rotation stage' description: 'Test rotation stage'
deviceClass: EpicsMotor # deviceClass: EpicsMotor
deviceClass: epics:devices:EpicsMotorX
deviceConfig: {prefix: 'X02DA-ES1-SMP1:ROTY'} deviceConfig: {prefix: 'X02DA-ES1-SMP1:ROTY'}
onFailure: buffer onFailure: buffer
enabled: true enabled: true

View File

@ -1,5 +1,6 @@
from ophyd import Device, Component, EpicsMotor, EpicsSignal, EpicsSignalRO, Kind, DerivedSignal from ophyd import Device, Component, EpicsMotor, EpicsSignal, EpicsSignalRO, Kind, DerivedSignal
from ophyd.status import Status, SubscriptionStatus, StatusBase, DeviceStatus from ophyd.status import Status, SubscriptionStatus, StatusBase, DeviceStatus
from ophyd.flyers import FlyerInterface
from time import sleep from time import sleep
import warnings import warnings
import numpy as np import numpy as np
@ -14,16 +15,45 @@ from typing import Union
from collections import OrderedDict from collections import OrderedDict
class EpicsMotorX(Device): #class EpicsMotorX(EpicsMotor):
mot = Component(EpicsMotor, "", name='mot') # pass
def move(position):
self.mot.move(position, wait=True)
class EpicsMotorX(EpicsMotor):
SUB_PROGRESS = "progress"
def __init__(self, prefix="", *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, **kwargs):
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs)
self._startPosition = None
self._targetPosition = None
self.subscribe(self._progress_update, run=False)
def configure(self, d: dict):
if "target" in d:
self._targetPosition = d['target']
if "position" in d:
self._targetPosition = d['position']
#super().configure(d)
def kickoff(self):
self._startPosition = float( self.position)
return self.move(self._targetPosition, wait=False)
def _progress_update(self, value, **kwargs) -> None:
"""Progress update on the scan"""
if (self._startPosition is None) or (self._targetPosition is None) or (not self.moving):
self._run_subs( sub_type=self.SUB_PROGRESS, value=1, max_value=1, done=1, )
return
progress = np.abs( (value-self._startPosition)/(self._targetPosition-self._startPosition) )
max_value = 100
self._run_subs(
sub_type=self.SUB_PROGRESS,
value=int(100*progress), max_value=max_value, done=int(np.isclose(max_value, progress, 1e-3)), )
def goto(position, wait=True):
super().move(position, wait)
def sleep(time_ms):
sleep(time_ms)
def epicsCharArray2String(raw: bytes) -> str: def epicsCharArray2String(raw: bytes) -> str: