diff --git a/tests/tests_devices/test_galil.py b/tests/tests_devices/test_galil.py index 44c0476..10ff83d 100644 --- a/tests/tests_devices/test_galil.py +++ b/tests/tests_devices/test_galil.py @@ -272,33 +272,27 @@ def test_galil_rio_signal_read(galil_rio): ## Test read of all channels ########### - assert galil_rio.an_ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms + assert galil_rio.analog_in.ch0._READ_TIMEOUT == 0.1 # Default read timeout of 100ms # Mock the socket to return specific values - an_buffer = b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901\r\n" - di_buffer = b"0.0\r\n" + analog_bufffer = b" 1.234 2.345 3.456 4.567 5.678 6.789 7.890 8.901\r\n" galil_rio.controller.sock.buffer_recv = [] # Clear any existing buffer - for name in galil_rio.component_names: - if name.startswith("an_ch"): - galil_rio.controller.sock.buffer_recv.append(an_buffer) - elif name.startswith("di_ou"): - galil_rio.controller.sock.buffer_recv.append(di_buffer) - galil_rio._last_readback = 0 # Force read from controller - + galil_rio.controller.sock.buffer_recv.append(analog_bufffer) read_values = galil_rio.read() - assert len(read_values) == 16 # 16 channels + assert len(read_values) == 8 # 8 channels + expected_values = { - galil_rio.an_ch0.name: {"value": 1.234}, - galil_rio.an_ch1.name: {"value": 2.345}, - galil_rio.an_ch2.name: {"value": 3.456}, - galil_rio.an_ch3.name: {"value": 4.567}, - galil_rio.an_ch4.name: {"value": 5.678}, - galil_rio.an_ch5.name: {"value": 6.789}, - galil_rio.an_ch6.name: {"value": 7.890}, - galil_rio.an_ch7.name: {"value": 8.901}, + galil_rio.analog_in.ch0.name: {"value": 1.234}, + galil_rio.analog_in.ch1.name: {"value": 2.345}, + galil_rio.analog_in.ch2.name: {"value": 3.456}, + galil_rio.analog_in.ch3.name: {"value": 4.567}, + galil_rio.analog_in.ch4.name: {"value": 5.678}, + galil_rio.analog_in.ch5.name: {"value": 6.789}, + galil_rio.analog_in.ch6.name: {"value": 7.890}, + galil_rio.analog_in.ch7.name: {"value": 8.901}, } # All timestamps should be the same assert all( - ret["timestamp"] == read_values[galil_rio.an_ch0.name]["timestamp"] + ret["timestamp"] == read_values[galil_rio.analog_in.ch0.name]["timestamp"] for signal_name, ret in read_values.items() ) # Check values @@ -308,7 +302,7 @@ def test_galil_rio_signal_read(galil_rio): # Check communication command to socker assert galil_rio.controller.sock.buffer_put == [ - b"MG@AN[0],@AN[1],@AN[2],@AN[3],@AN[4],@AN[5],@AN[6],@AN[7]\r" + b"MG@AN[0], @AN[1], @AN[2], @AN[3], @AN[4], @AN[5], @AN[6], @AN[7]\r" ] ########### @@ -320,11 +314,11 @@ def test_galil_rio_signal_read(galil_rio): def value_callback(value, old_value, **kwargs): obj = kwargs.get("obj") - galil = obj.parent + galil = obj.parent.parent readback = galil.read() value_callback_buffer.append(readback) - galil_rio.an_ch0.subscribe(value_callback, run=False) + galil_rio.analog_in.ch0.subscribe(value_callback, run=False) galil_rio.controller.sock.buffer_recv = [b" 2.5 2.6 2.7 2.8 2.9 3.0 3.1 3.2"] expected_values = [2.5, 2.6, 2.7, 2.8, 2.9, 3.0, 3.1, 3.2] @@ -335,12 +329,14 @@ def test_galil_rio_signal_read(galil_rio): # Should have used the cached value for walk in galil_rio.walk_signals(): walk.item._READ_TIMEOUT = 10 # Make sure cached read is used - ret = galil_rio.an_ch0.read() + ret = galil_rio.analog_in.ch0.read() # Should not trigger callback since value did not change - assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 1.234) + assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 1.234) # Same timestamp as for another channel as this is cached read - assert np.isclose(ret[galil_rio.an_ch0.name]["timestamp"], galil_rio.an_ch7.timestamp) + assert np.isclose( + ret[galil_rio.analog_in.ch0.name]["timestamp"], galil_rio.analog_in.ch7.timestamp + ) assert len(value_callback_buffer) == 0 ################## @@ -348,10 +344,10 @@ def test_galil_rio_signal_read(galil_rio): ################## # Now force a read from the controller - galil_rio._last_readback = 0 # Force read from controller - ret = galil_rio.an_ch0.read() + galil_rio._readback_metadata["last_readback"] = 0 # Force read from controller + ret = galil_rio.analog_in.ch0.read() - assert np.isclose(ret[galil_rio.an_ch0.name]["value"], 2.5) + assert np.isclose(ret[galil_rio.analog_in.ch0.name]["value"], 2.5) # Check callback invocation, but only 1 callback even with galil_rio.read() call in callback assert len(value_callback_buffer) == 1 @@ -359,7 +355,8 @@ def test_galil_rio_signal_read(galil_rio): assert np.isclose(values, expected_values).all() assert all( [ - value["timestamp"] == value_callback_buffer[0][galil_rio.an_ch0.name]["timestamp"] + value["timestamp"] + == value_callback_buffer[0][galil_rio.analog_in.ch0.name]["timestamp"] for value in value_callback_buffer[0].values() ] ) @@ -369,12 +366,34 @@ def test_galil_rio_digital_out_signal(galil_rio): """ Test that the Galil RIO digital output signal can be set correctly. """ + ## Test Read from digital output channels + buffer_receive = [] + excepted_put_buffer = [] + for ii in range(galil_rio.digital_out.ch0._NUM_DIGITAL_OUTPUT_CHANNELS): + cmd = f"MG@OUT[{ii}]\r".encode() + excepted_put_buffer.append(cmd) + recv = " 1.000".encode() + buffer_receive.append(recv) + + galil_rio.controller.sock.buffer_recv = buffer_receive # Mock response for readback + + digital_read = galil_rio.read_configuration() # Read to populate readback values + + for walk in galil_rio.digital_out.walk_signals(): + assert np.isclose(digital_read[walk.item.name]["value"], 1.0) + + assert galil_rio.controller.sock.buffer_put == excepted_put_buffer + + # Test writing to digital output channels + galil_rio.controller.sock.buffer_put = [] # Clear buffer put + galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback + # Set digital output channel 0 to high - galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback] - galil_rio.di_out0.put(1) + galil_rio.digital_out.ch0.put(1) assert galil_rio.controller.sock.buffer_put == [b"SB0\r"] - galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback # Set digital output channel 0 to low - galil_rio.di_out0.put(0) - assert galil_rio.controller.sock.buffer_put == [b"SB0\r", b"CB0\r"] + galil_rio.controller.sock.buffer_put = [] # Clear buffer put + galil_rio.controller.sock.buffer_recv = [b":"] # Mock response for readback + galil_rio.digital_out.ch0.put(0) + assert galil_rio.controller.sock.buffer_put == [b"CB0\r"]