tests: fix tests after refactoring

This commit is contained in:
2026-02-11 08:33:53 +01:00
parent 16ea7f410e
commit 5811e445fe
+54 -35
View File
@@ -272,33 +272,27 @@ def test_galil_rio_signal_read(galil_rio):
## Test read of all channels
###########
assert galil_rio.an_ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms
assert galil_rio.analog_in.ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms
# Mock the socket to return specific values
an_buffer = b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901\r\n"
di_buffer = b"0.0\r\n"
analog_bufffer = b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901\r\n"
galil_rio.controller.sock.buffer_recv = [] # Clear any existing buffer
for name in galil_rio.component_names:
if name.startswith("an_ch"):
galil_rio.controller.sock.buffer_recv.append(an_buffer)
elif name.startswith("di_ou"):
galil_rio.controller.sock.buffer_recv.append(di_buffer)
galil_rio._last_readback = 0 # Force read from controller
galil_rio.controller.sock.buffer_recv.append(analog_bufffer)
read_values = galil_rio.read()
assert len(read_values) == 16 # 16 channels
assert len(read_values) == 8 # 8 channels
expected_values = {
galil_rio.an_ch0.name: {"value": 1.234},
galil_rio.an_ch1.name: {"value": 2.345},
galil_rio.an_ch2.name: {"value": 3.456},
galil_rio.an_ch3.name: {"value": 4.567},
galil_rio.an_ch4.name: {"value": 5.678},
galil_rio.an_ch5.name: {"value": 6.789},
galil_rio.an_ch6.name: {"value": 7.890},
galil_rio.an_ch7.name: {"value": 8.901},
galil_rio.analog_in.ch0.name: {"value": 1.234},
galil_rio.analog_in.ch1.name: {"value": 2.345},
galil_rio.analog_in.ch2.name: {"value": 3.456},
galil_rio.analog_in.ch3.name: {"value": 4.567},
galil_rio.analog_in.ch4.name: {"value": 5.678},
galil_rio.analog_in.ch5.name: {"value": 6.789},
galil_rio.analog_in.ch6.name: {"value": 7.890},
galil_rio.analog_in.ch7.name: {"value": 8.901},
}
# All timestamps should be the same
assert all(
ret["timestamp"] == read_values[galil_rio.an_ch0.name]["timestamp"]
ret["timestamp"] == read_values[galil_rio.analog_in.ch0.name]["timestamp"]
for signal_name, ret in read_values.items()
)
# Check values
@@ -308,7 +302,7 @@ def test_galil_rio_signal_read(galil_rio):
# Check communication command to socker
assert galil_rio.controller.sock.buffer_put == [
b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r"
b"MG@AN[0], @AN[1], @AN[2], @AN[3], @AN[4], @AN[5], @AN[6], @AN[7]\r"
]
###########
@@ -320,11 +314,11 @@ def test_galil_rio_signal_read(galil_rio):
def value_callback(value, old_value, **kwargs):
obj = kwargs.get("obj")
galil = obj.parent
galil = obj.parent.parent
readback = galil.read()
value_callback_buffer.append(readback)
galil_rio.an_ch0.subscribe(value_callback, run=False)
galil_rio.analog_in.ch0.subscribe(value_callback, run=False)
galil_rio.controller.sock.buffer_recv = [b" 2.5 2.6 2.7 2.8 2.9 3.0 3.1 3.2"]
expected_values = [2.5, 2.6, 2.7, 2.8, 2.9, 3.0, 3.1, 3.2]
@@ -335,12 +329,14 @@ def test_galil_rio_signal_read(galil_rio):
# Should have used the cached value
for walk in galil_rio.walk_signals():
walk.item._READ_TIMEOUT = 10 # Make sure cached read is used
ret = galil_rio.an_ch0.read()
ret = galil_rio.analog_in.ch0.read()
# Should not trigger callback since value did not change
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 1.234)
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 1.234)
# Same timestamp as for another channel as this is cached read
assert np.isclose(ret[galil_rio.an_ch0.name]["timestamp"], galil_rio.an_ch7.timestamp)
assert np.isclose(
ret[galil_rio.analog_in.ch0.name]["timestamp"], galil_rio.analog_in.ch7.timestamp
)
assert len(value_callback_buffer) == 0
##################
@@ -348,10 +344,10 @@ def test_galil_rio_signal_read(galil_rio):
##################
# Now force a read from the controller
galil_rio._last_readback = 0 # Force read from controller
ret = galil_rio.an_ch0.read()
galil_rio._readback_metadata["last_readback"] = 0 # Force read from controller
ret = galil_rio.analog_in.ch0.read()
assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 2.5)
assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 2.5)
# Check callback invocation, but only 1 callback even with galil_rio.read() call in callback
assert len(value_callback_buffer) == 1
@@ -359,7 +355,8 @@ def test_galil_rio_signal_read(galil_rio):
assert np.isclose(values, expected_values).all()
assert all(
[
value["timestamp"] == value_callback_buffer[0][galil_rio.an_ch0.name]["timestamp"]
value["timestamp"]
== value_callback_buffer[0][galil_rio.analog_in.ch0.name]["timestamp"]
for value in value_callback_buffer[0].values()
]
)
@@ -369,12 +366,34 @@ def test_galil_rio_digital_out_signal(galil_rio):
"""
Test that the Galil RIO digital output signal can be set correctly.
"""
## Test Read from digital output channels
buffer_receive = []
excepted_put_buffer = []
for ii in range(galil_rio.digital_out.ch0._NUM_DIGITAL_OUTPUT_CHANNELS):
cmd = f"MG@OUT[{ii}]\r".encode()
excepted_put_buffer.append(cmd)
recv = " 1.000".encode()
buffer_receive.append(recv)
galil_rio.controller.sock.buffer_recv = buffer_receive # Mock response for readback
digital_read = galil_rio.read_configuration() # Read to populate readback values
for walk in galil_rio.digital_out.walk_signals():
assert np.isclose(digital_read[walk.item.name]["value"], 1.0)
assert galil_rio.controller.sock.buffer_put == excepted_put_buffer
# Test writing to digital output channels
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
# Set digital output channel 0 to high
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback]
galil_rio.di_out0.put(1)
galil_rio.digital_out.ch0.put(1)
assert galil_rio.controller.sock.buffer_put == [b"SB0\r"]
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
# Set digital output channel 0 to low
galil_rio.di_out0.put(0)
assert galil_rio.controller.sock.buffer_put == [b"SB0\r", b"CB0\r"]
galil_rio.controller.sock.buffer_put = [] # Clear buffer put
galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback
galil_rio.digital_out.ch0.put(0)
assert galil_rio.controller.sock.buffer_put == [b"CB0\r"]