fix: added find_reference and folerr public methods

This commit is contained in:
2024-07-02 17:00:00 +02:00
parent 977a2beb4c
commit 5648b4dbd6

View File

@@ -1,6 +1,6 @@
import functools
import threading
import time
import functools
import urllib.request
import xml.etree.ElementTree as ET
@@ -8,25 +8,25 @@ import numpy as np
from bec_lib.logger import bec_logger
from ophyd import Component as Cpt
from ophyd import Device, PositionerBase, Signal
from ophyd_devices.utils.controller import threadlocked
from ophyd.status import wait as status_wait
from ophyd.utils import LimitError
from ophyd_devices.utils.controller import threadlocked
from ophyd_devices.utils.socket import SocketIO, raise_if_disconnected
from csaxs_bec.devices.omny.galil.galil_ophyd import (
BECConfigError,
GalilAxesReferenced,
GalilCommunicationError,
GalilController,
GalilError,
GalilMotorIsMoving,
GalilSetpointSignal,
GalilSignalRO,
GalilCommunicationError,
GalilError,
)
logger = bec_logger.logger
def retry_once(fcn):
"""Decorator to rerun a function in case a Galil communication error was raised. This may happen if the buffer was not empty."""
@@ -45,9 +45,10 @@ class GalilMotorResolution(GalilSignalRO):
@retry_once
@threadlocked
def _socket_get(self):
#encoder steps per mm, for rotation we have 89565.8666667 instead. to be implemented
# encoder steps per mm, for rotation we have 89565.8666667 instead. to be implemented
return 51200
class GalilReadbackSignal(GalilSignalRO):
@retry_once
@threadlocked
@@ -66,15 +67,16 @@ class GalilReadbackSignal(GalilSignalRO):
def read(self):
self._metadata["timestamp"] = time.time()
val = super().read()
# if self.parent.axis_Id_numeric == 2:
# try:
# rt = self.parent.device_manager.devices[self.parent.rt]
# if rt.enabled:
# rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"])
# except KeyError:
# logger.warning("Failed to set RT value during readback.")
# if self.parent.axis_Id_numeric == 2:
# try:
# rt = self.parent.device_manager.devices[self.parent.rt]
# if rt.enabled:
# rt.obj.controller.set_rotation_angle(val[self.parent.name]["value"])
# except KeyError:
# logger.warning("Failed to set RT value during readback.")
return val
class OMNYGalilController(GalilController):
USER_ACCESS = [
"describe",
@@ -93,109 +95,108 @@ class OMNYGalilController(GalilController):
"_ogalil_switchsocket_are_all_on",
"_ogalil_folerr_not_ignore",
"_ogalil_folerr_reset_and_ignore",
]
def on(self) -> None:
"""Open a new socket connection to the controller"""
self._ogalil_switchsocket_switch_all_on()
time.sleep(0.3)
super().on()
def _ogalil_switchsocket(self, number:int, switch:bool):
#number is socket number ranging from 1 to 4
#switch is either 0 or 1
if number not in range(1,5):
def _ogalil_switchsocket(self, number: int, switch: bool):
# number is socket number ranging from 1 to 4
# switch is either 0 or 1
if number not in range(1, 5):
raise Exception("Socket number ranges from 1 to 4")
else:
contents = urllib.request.urlopen(f"http://mpc3217:8091/netio.cgi?pass=24A42C3929C5&output{number}={switch}").read()
#print(contents)
if b'OK' in contents:
contents = urllib.request.urlopen(
f"http://mpc3217:8091/netio.cgi?pass=24A42C3929C5&output{number}={switch}"
).read()
# print(contents)
if b"OK" in contents:
print(f"Controller number switchsocket {number} is now {switch}")
return(1)
return 1
else:
print(f"Failed to switch controller number {number}")
return(0)
return 0
def _ogalil_switchsocket_are_all_on(self):
if self._ogalil_switchsocket_status() == [1,1,1,1]:
return(1)
if self._ogalil_switchsocket_status() == [1, 1, 1, 1]:
return 1
else:
return(0)
return 0
def _ogalil_tempdisabledebug(self):
#sock_put(_ogalil_debugging_host_and_port_str[ogalil_no][],"WH\r\n") < 1) {
#ret_str = sock_get(_ogalil_debugging_host_and_port_str[ogalil_no][],_ogalil_prompt_str)
# sock_put(_ogalil_debugging_host_and_port_str[ogalil_no][],"WH\r\n") < 1) {
# ret_str = sock_get(_ogalil_debugging_host_and_port_str[ogalil_no][],_ogalil_prompt_str)
# printf("%s _ogalil_redirect_debug_output(%d): expected \"IHx\" received \"%s\"\n",\
#cmd_str = sprintf("CF %s\r\n",substr(ret_str,3,1))
#if (sock_put(_ogalil_debugging_host_and_port_str[ogalil_no][],cmd_str) < 1) {
#cmd_str = sprintf("CW 2\r\n")
#if (sock_put(_ogalil_debugging_host_and_port_str[ogalil_no][],cmd_str) < 1) {
# cmd_str = sprintf("CF %s\r\n",substr(ret_str,3,1))
# if (sock_put(_ogalil_debugging_host_and_port_str[ogalil_no][],cmd_str) < 1) {
# cmd_str = sprintf("CW 2\r\n")
# if (sock_put(_ogalil_debugging_host_and_port_str[ogalil_no][],cmd_str) < 1) {
print("not yet implemented")
return 0
def _ogalil_folerr_reset_and_ignore(self, device) -> None:
axis_id = device._config["deviceConfig"].get("axis_Id")
# axis_id_numeric = self.axis_id_to_numeric(axis_id)
# self.socket_put_confirmed(f"folaxerr[{axis_id_numeric}]=0")
def _ogalil_folerr_reset_and_ignore(self, axis_id_numeric: int) -> None:
# axis_id = device._config["deviceConfig"].get("axis_Id")
# axis_id_numeric = self.axis_id_to_numeric(axis_id)
self.socket_put_confirmed(f"folaxerr[{axis_id_numeric}]=0")
self.socket_put_confirmed(f"folerr=0")
self.socket_put_confirmed(f"IgNoFol=1")
# def _ogalil_set_axis_to_pos_wo_reference_search(mne, pos_mm) '{
# def _ogalil_set_axis_to_pos_wo_reference_search(mne, pos_mm) '{
# printf("be careful. this routine does not yet take the user motor direction into account\n")
# printf("be careful. this routine does not yet take the user motor direction into account\n")
# global _ogalil_mne_controller_no
# global _ogalil_mne_axis_no
# _ogalil_get_controller_and_axis_number(mne)
# global _ogalil_mne_controller_no
# global _ogalil_mne_axis_no
# _ogalil_get_controller_and_axis_number(mne)
# local pos_encoder
# _ogalil_put_confirmed(_ogalil_mne_controller_no, "IgNoFol=1")
# local pos_encoder
# _ogalil_put_confirmed(_ogalil_mne_controller_no, "IgNoFol=1")
# # pos_mm = pos_encoder / _ogalil_encoder_steps_per_mm
# pos_encoder = pos_mm * _ogalil_encoder_steps_per_mm
# # rotation stage
# if(_ogalil_mne_controller_no == 2 && _ogalil_mne_axis_no == 2)
# {
# pos_encoder = pos_mm * _ogalil_encoder_steps_per_degree
# }
# _ogalil_put_confirmed(_ogalil_mne_controller_no, sprintf("DE%c=%d",_ogalil_mne_axis_no+65, pos_encoder))
# # pos_mm = pos_encoder / _ogalil_encoder_steps_per_mm
# pos_encoder = pos_mm * _ogalil_encoder_steps_per_mm
# # rotation stage
# if(_ogalil_mne_controller_no == 2 && _ogalil_mne_axis_no == 2)
# {
# pos_encoder = pos_mm * _ogalil_encoder_steps_per_degree
# }
# _ogalil_put_confirmed(_ogalil_mne_controller_no, sprintf("DP%c=_TP%c*ratio[%d]", _ogalil_mne_axis_no+65, _ogalil_mne_axis_no+65, _ogalil_mne_axis_no))
# _ogalil_put_confirmed(_ogalil_mne_controller_no, sprintf("folaxerr[%d]=0",_ogalil_mne_axis_no))
# _ogalil_put_confirmed(_ogalil_mne_controller_no, sprintf("axisref[%d]=1",_ogalil_mne_axis_no))
# _ogalil_put_confirmed(_ogalil_mne_controller_no, "folerr=0")
# _ogalil_put_confirmed(_ogalil_mne_controller_no, sprintf("DE%c=%d",_ogalil_mne_axis_no+65, pos_encoder))
# _ogalil_put_confirmed(_ogalil_mne_controller_no, "IgNoFol=0")
# _ogalil_put_confirmed(_ogalil_mne_controller_no, sprintf("DP%c=_TP%c*ratio[%d]", _ogalil_mne_axis_no+65, _ogalil_mne_axis_no+65, _ogalil_mne_axis_no))
# _ogalil_put_confirmed(_ogalil_mne_controller_no, sprintf("folaxerr[%d]=0",_ogalil_mne_axis_no))
# _ogalil_put_confirmed(_ogalil_mne_controller_no, sprintf("axisref[%d]=1",_ogalil_mne_axis_no))
# _ogalil_put_confirmed(_ogalil_mne_controller_no, "folerr=0")
# _ogalil_put_confirmed(_ogalil_mne_controller_no, "IgNoFol=0")
# read_motors(7)
# }'
# read_motors(7)
# }'
def _ogalil_folerr_not_ignore(self):
self.socket_put_confirmed("IgNoFol=0")
def _ogalil_switchsocket_switch_all_on(self):
if not self._ogalil_switchsocket_are_all_on():
for j in range (1,5):
self._ogalil_switchsocket(j,1)
for j in range(1, 5):
self._ogalil_switchsocket(j, 1)
time.sleep(0.4)
def _ogalil_switchsocket_status(self):
contents = urllib.request.urlopen("http://mpc3217:8091/netio.xml").read()
root = ET.fromstring(contents)
returnvalue=[]
for j in range(0,4):
returnvalue = []
for j in range(0, 4):
status = int(root[1][j][2].text)
returnvalue.append(int(root[1][j][2].text))
if status:
print(f"Controller {j+1} is ON")
else:
print(f"Controller {j+1} is OFF")
return(returnvalue)
return returnvalue
def show_status_other(self):
swver = float(self.socket_put_and_receive("MGswver"))
@@ -203,13 +204,15 @@ class OMNYGalilController(GalilController):
tempab = float(self.socket_put_and_receive("MGtempab"))
timeab = float(self.socket_put_and_receive("MGtimeab"))
IgNoFol = float(self.socket_put_and_receive("MGIgNoFol"))
print(f"OMNY galil firmware {swver:2.0f}, TempAbort: {tempab:1.0f}, Allaxref: {allaxref:1.0f}, TimeAbort: {timeab:1.0f}, Ignore Folerr: {IgNoFol:1.0f}\n")
print(
f"OMNY galil firmware {swver:2.0f}, TempAbort: {tempab:1.0f}, Allaxref: {allaxref:1.0f}, TimeAbort: {timeab:1.0f}, Ignore Folerr: {IgNoFol:1.0f}\n"
)
if self.sock.port == 8083:
self._ogalil_switchsocket_status()
class OMNYGalilMotor(Device, PositionerBase):
USER_ACCESS = ["controller"]
USER_ACCESS = ["controller", "find_reference", "_ogalil_folerr_reset_and_ignore"]
readback = Cpt(GalilReadbackSignal, signal_name="readback", kind="hinted")
user_setpoint = Cpt(GalilSetpointSignal, signal_name="setpoint")
motor_resolution = Cpt(GalilMotorResolution, signal_name="resolution", kind="config")
@@ -396,6 +399,16 @@ class OMNYGalilMotor(Device, PositionerBase):
"""The engineering units (EGU) for positions"""
return "mm"
def _ogalil_folerr_reset_and_ignore(self):
# pylint: disable=protected-access
self.controller._ogalil_folerr_reset_and_ignore(self.axis_Id_numeric)
def find_reference(self):
"""
Find the reference of the axis.
"""
self.controller.find_reference(self.axis_Id_numeric)
def stop(self, *, success=False):
self.controller.stop_all_axes()
return super().stop(success=success)