Blacking and cleanup of unused code

This commit is contained in:
gac-x06da
2025-01-24 15:42:46 +01:00
parent 82d51649ee
commit 21bd57393f
3 changed files with 23 additions and 55 deletions

View File

@@ -352,7 +352,7 @@ class AerotechAbrStage(PsiDeviceBase):
self.start_command.set(1).wait()
# Define wait until the busy flag goes high
def is_busy(*args, value, **kwargs):
def is_busy(*, value, **_):
return bool(value == 0)
# Subscribe and wait for update
@@ -376,7 +376,7 @@ class AerotechAbrStage(PsiDeviceBase):
"""
# Define wait until the busy flag goes down (excluding initial update)
def is_idle(*args, value, **kwargs):
def is_idle(*, value, **_):
return bool(value == 1)
# Subscribe and wait for update

View File

@@ -5,37 +5,25 @@ Created on Tue Jun 11 11:28:38 2024
@author: mohacsi_i
"""
import types
import math
from ophyd import (
Component,
PVPositioner,
Device,
Signal,
EpicsSignal,
EpicsSignalRO,
Kind,
PositionerBase,
)
from ophyd.status import Status, MoveStatus, SubscriptionStatus, DeviceStatus
from ophyd import Component, PVPositioner, Signal, EpicsSignal, EpicsSignalRO, Kind, PositionerBase
from ophyd.status import Status, MoveStatus
from .A3200enums import AbrMode
from bec_lib import bec_logger
logger = bec_logger.logger
from .A3200enums import *
# ABR_DONE = 0
# ABR_READY = 1
# ABR_BUSY = 2
# GRID_SCAN_BUSY = 0
# GRID_SCAN_DONE = 1
ABR_DONE = 0
ABR_READY = 1
ABR_BUSY = 2
GRID_SCAN_BUSY = 0
GRID_SCAN_DONE = 1
DIRECT_MODE = 0
MEASURING_MODE = 1
# DIRECT_MODE = 0
# MEASURING_MODE = 1
class A3200Axis(PVPositioner):
@@ -162,7 +150,7 @@ class A3200Axis(PVPositioner):
position,
wait=True,
timeout=None,
moved_cb=None,
# moved_cb=None,
velocity=None,
relative=False,
direct=False,
@@ -171,7 +159,7 @@ class A3200Axis(PVPositioner):
"""Native absolute/relative movement on the A3200"""
# Check if we're in direct movement mode
if self.abr_mode.value not in (DIRECT_MODE, "DIRECT"):
if self.abr_mode.value not in (AbrMode.DIRECT, "DIRECT"):
if direct:
self.abr_mode_direct.set(1).wait()
else:
@@ -189,7 +177,7 @@ class A3200Axis(PVPositioner):
self.check_value(position)
# Get MoveStatus from parent of parent
status = PositionerBase.move(self, position=position, timeout=timeout)
status = PositionerBase.move(self, position=position, timeout=timeout, **kwargs)
has_done = self.done is not None
if not has_done:

View File

@@ -1,8 +1,6 @@
import time
from ophyd import Component, Device, Kind, Device, Signal, SignalRO
from ophyd.status import SubscriptionStatus
import requests
from ophyd import Component, Device, Kind, Signal, SignalRO
try:
from bec_lib import bec_logger
@@ -29,7 +27,7 @@ class SmarGonSignal(Signal):
self._limits = (low_limit, high_limit)
# self.get()
def put(self, value, *args, timestamp=None, **kwargs):
def put(self, value, *, timestamp=None, **kwargs):
"""Overriden put to add communication with smargopolo"""
# Validate new value
self.check_value(value)
@@ -38,7 +36,7 @@ class SmarGonSignal(Signal):
timestamp = time.time()
# Perform the actual write to SmargoPolo
r = self.parent._go_n_put(f"{self.write_addr}?{self.addr.upper()}={value}")
r = self.parent._go_n_put(f"{self.write_addr}?{self.addr.upper()}={value}", **kwargs)
old_value = self._readback
self._timestamp = timestamp
@@ -91,24 +89,6 @@ class SmarGonSignalRO(Signal):
return super().get(*args, **kwargs)
# class SmarGonMovingSignalRO(Signal):
# """Small helper class to read PVs that need to be processed first.
# TODO: Add monitoring
# """
# def __init__(self, *args, **kwargs):
# super().__init__(*args, **kwargs)
# self._metadata["write_access"] = False
# def get(self, *args, **kwargs):
# r = self.parent._go_n_get("readbackMCS")
# print(r['state'])
# moving_str = r["state"]["q1"] + r["state"]["q2"] + r["state"]["q3"] + r["state"]["q4"] + r["state"]["q5"][0]
# moving = int(moving_str)
# self.put(moving_str, force=True)
# return super().get(*args, **kwargs)
class SmarGonAxis(Device):
"""SmarGon client deice
@@ -147,7 +127,6 @@ class SmarGonAxis(Device):
self.__class__.__dict__["setpoint"].kwargs["low_limit"] = low_limit
self.__class__.__dict__["setpoint"].kwargs["high_limit"] = high_limit
# super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
super().__init__(
prefix=prefix,
name=name,
@@ -155,6 +134,7 @@ class SmarGonAxis(Device):
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
# device_manager=device_manager,
**kwargs,
)
self.sg_url._metadata["write_access"] = False
@@ -170,20 +150,20 @@ class SmarGonAxis(Device):
def _go_n_get(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
r = requests.get(cmd, timeout=1)
r = requests.get(cmd, timeout=1, **kwargs)
if not r.ok:
raise RuntimeError(
f"[{self.name}] Error getting {address}; server returned {r.status_code} => {r.reason}"
f"[{self.name}] Error getting {address}; reply was {r.status_code} => {r.reason}"
)
return r.json()
def _go_n_put(self, address, **kwargs):
"""Helper function to connect to smargopolo"""
cmd = f"{self.sg_url.get()}/{address}"
r = requests.put(cmd, timeout=1)
r = requests.put(cmd, timeout=1, **kwargs)
if not r.ok:
raise RuntimeError(
f"[{self.name}] Error putting {address}; server returned {r.status_code} => {r.reason}"
f"[{self.name}] Error putting {address}; reply was {r.status_code} => {r.reason}"
)
return r.json()