Blacking and cleanup of unused code
This commit is contained in:
@@ -352,7 +352,7 @@ class AerotechAbrStage(PsiDeviceBase):
|
||||
self.start_command.set(1).wait()
|
||||
|
||||
# Define wait until the busy flag goes high
|
||||
def is_busy(*args, value, **kwargs):
|
||||
def is_busy(*, value, **_):
|
||||
return bool(value == 0)
|
||||
|
||||
# Subscribe and wait for update
|
||||
@@ -376,7 +376,7 @@ class AerotechAbrStage(PsiDeviceBase):
|
||||
"""
|
||||
|
||||
# Define wait until the busy flag goes down (excluding initial update)
|
||||
def is_idle(*args, value, **kwargs):
|
||||
def is_idle(*, value, **_):
|
||||
return bool(value == 1)
|
||||
|
||||
# Subscribe and wait for update
|
||||
|
||||
@@ -5,37 +5,25 @@ Created on Tue Jun 11 11:28:38 2024
|
||||
@author: mohacsi_i
|
||||
"""
|
||||
import types
|
||||
import math
|
||||
from ophyd import (
|
||||
Component,
|
||||
PVPositioner,
|
||||
Device,
|
||||
Signal,
|
||||
EpicsSignal,
|
||||
EpicsSignalRO,
|
||||
Kind,
|
||||
PositionerBase,
|
||||
)
|
||||
from ophyd.status import Status, MoveStatus, SubscriptionStatus, DeviceStatus
|
||||
from ophyd import Component, PVPositioner, Signal, EpicsSignal, EpicsSignalRO, Kind, PositionerBase
|
||||
from ophyd.status import Status, MoveStatus
|
||||
|
||||
from .A3200enums import AbrMode
|
||||
|
||||
from bec_lib import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
from .A3200enums import *
|
||||
# ABR_DONE = 0
|
||||
# ABR_READY = 1
|
||||
# ABR_BUSY = 2
|
||||
|
||||
# GRID_SCAN_BUSY = 0
|
||||
# GRID_SCAN_DONE = 1
|
||||
|
||||
ABR_DONE = 0
|
||||
ABR_READY = 1
|
||||
ABR_BUSY = 2
|
||||
|
||||
GRID_SCAN_BUSY = 0
|
||||
GRID_SCAN_DONE = 1
|
||||
|
||||
DIRECT_MODE = 0
|
||||
MEASURING_MODE = 1
|
||||
# DIRECT_MODE = 0
|
||||
# MEASURING_MODE = 1
|
||||
|
||||
|
||||
class A3200Axis(PVPositioner):
|
||||
@@ -162,7 +150,7 @@ class A3200Axis(PVPositioner):
|
||||
position,
|
||||
wait=True,
|
||||
timeout=None,
|
||||
moved_cb=None,
|
||||
# moved_cb=None,
|
||||
velocity=None,
|
||||
relative=False,
|
||||
direct=False,
|
||||
@@ -171,7 +159,7 @@ class A3200Axis(PVPositioner):
|
||||
"""Native absolute/relative movement on the A3200"""
|
||||
|
||||
# Check if we're in direct movement mode
|
||||
if self.abr_mode.value not in (DIRECT_MODE, "DIRECT"):
|
||||
if self.abr_mode.value not in (AbrMode.DIRECT, "DIRECT"):
|
||||
if direct:
|
||||
self.abr_mode_direct.set(1).wait()
|
||||
else:
|
||||
@@ -189,7 +177,7 @@ class A3200Axis(PVPositioner):
|
||||
self.check_value(position)
|
||||
|
||||
# Get MoveStatus from parent of parent
|
||||
status = PositionerBase.move(self, position=position, timeout=timeout)
|
||||
status = PositionerBase.move(self, position=position, timeout=timeout, **kwargs)
|
||||
|
||||
has_done = self.done is not None
|
||||
if not has_done:
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
import time
|
||||
from ophyd import Component, Device, Kind, Device, Signal, SignalRO
|
||||
from ophyd.status import SubscriptionStatus
|
||||
|
||||
import requests
|
||||
from ophyd import Component, Device, Kind, Signal, SignalRO
|
||||
|
||||
try:
|
||||
from bec_lib import bec_logger
|
||||
@@ -29,7 +27,7 @@ class SmarGonSignal(Signal):
|
||||
self._limits = (low_limit, high_limit)
|
||||
# self.get()
|
||||
|
||||
def put(self, value, *args, timestamp=None, **kwargs):
|
||||
def put(self, value, *, timestamp=None, **kwargs):
|
||||
"""Overriden put to add communication with smargopolo"""
|
||||
# Validate new value
|
||||
self.check_value(value)
|
||||
@@ -38,7 +36,7 @@ class SmarGonSignal(Signal):
|
||||
timestamp = time.time()
|
||||
|
||||
# Perform the actual write to SmargoPolo
|
||||
r = self.parent._go_n_put(f"{self.write_addr}?{self.addr.upper()}={value}")
|
||||
r = self.parent._go_n_put(f"{self.write_addr}?{self.addr.upper()}={value}", **kwargs)
|
||||
|
||||
old_value = self._readback
|
||||
self._timestamp = timestamp
|
||||
@@ -91,24 +89,6 @@ class SmarGonSignalRO(Signal):
|
||||
return super().get(*args, **kwargs)
|
||||
|
||||
|
||||
# class SmarGonMovingSignalRO(Signal):
|
||||
# """Small helper class to read PVs that need to be processed first.
|
||||
# TODO: Add monitoring
|
||||
# """
|
||||
|
||||
# def __init__(self, *args, **kwargs):
|
||||
# super().__init__(*args, **kwargs)
|
||||
# self._metadata["write_access"] = False
|
||||
|
||||
# def get(self, *args, **kwargs):
|
||||
# r = self.parent._go_n_get("readbackMCS")
|
||||
# print(r['state'])
|
||||
# moving_str = r["state"]["q1"] + r["state"]["q2"] + r["state"]["q3"] + r["state"]["q4"] + r["state"]["q5"][0]
|
||||
# moving = int(moving_str)
|
||||
# self.put(moving_str, force=True)
|
||||
# return super().get(*args, **kwargs)
|
||||
|
||||
|
||||
class SmarGonAxis(Device):
|
||||
"""SmarGon client deice
|
||||
|
||||
@@ -147,7 +127,6 @@ class SmarGonAxis(Device):
|
||||
self.__class__.__dict__["setpoint"].kwargs["low_limit"] = low_limit
|
||||
self.__class__.__dict__["setpoint"].kwargs["high_limit"] = high_limit
|
||||
|
||||
# super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
|
||||
super().__init__(
|
||||
prefix=prefix,
|
||||
name=name,
|
||||
@@ -155,6 +134,7 @@ class SmarGonAxis(Device):
|
||||
read_attrs=read_attrs,
|
||||
configuration_attrs=configuration_attrs,
|
||||
parent=parent,
|
||||
# device_manager=device_manager,
|
||||
**kwargs,
|
||||
)
|
||||
self.sg_url._metadata["write_access"] = False
|
||||
@@ -170,20 +150,20 @@ class SmarGonAxis(Device):
|
||||
def _go_n_get(self, address, **kwargs):
|
||||
"""Helper function to connect to smargopolo"""
|
||||
cmd = f"{self.sg_url.get()}/{address}"
|
||||
r = requests.get(cmd, timeout=1)
|
||||
r = requests.get(cmd, timeout=1, **kwargs)
|
||||
if not r.ok:
|
||||
raise RuntimeError(
|
||||
f"[{self.name}] Error getting {address}; server returned {r.status_code} => {r.reason}"
|
||||
f"[{self.name}] Error getting {address}; reply was {r.status_code} => {r.reason}"
|
||||
)
|
||||
return r.json()
|
||||
|
||||
def _go_n_put(self, address, **kwargs):
|
||||
"""Helper function to connect to smargopolo"""
|
||||
cmd = f"{self.sg_url.get()}/{address}"
|
||||
r = requests.put(cmd, timeout=1)
|
||||
r = requests.put(cmd, timeout=1, **kwargs)
|
||||
if not r.ok:
|
||||
raise RuntimeError(
|
||||
f"[{self.name}] Error putting {address}; server returned {r.status_code} => {r.reason}"
|
||||
f"[{self.name}] Error putting {address}; reply was {r.status_code} => {r.reason}"
|
||||
)
|
||||
return r.json()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user