wip websocket refactoring

This commit is contained in:
2025-06-17 08:22:34 +02:00
parent 714a038cd9
commit f0af312fc9
3 changed files with 97 additions and 65 deletions

View File

@@ -11,6 +11,8 @@ if __name__ == """__main__""":
# Create a Timepix object
timepix = Timepix(name="TimePixDetector")
timepix.on_connected()
## LOOP for a scan
timepix.stage()
timepix.pre_scan()
print(f"State of timepix_fly_client {timepix.timepix_fly_client.state().state}")
@@ -28,5 +30,6 @@ if __name__ == """__main__""":
timepix.complete()
for ii, msg in enumerate(timepix._data_buffer):
print(f"Received data {ii} with message type {msg['type']}")
timepix.timepix_fly_client.stop()
timepix.unstage()
# timepix.destroy()
timepix.destroy()

View File

@@ -69,6 +69,7 @@ class Timepix(PSIDeviceBase, Device):
self._socket_server_buffer_size = 4096
#
self._data_buffer = []
self._global_buffer = "" # Global buffer to store received data
# Decoding
self._decoder = json.JSONDecoder()
@@ -129,7 +130,7 @@ class Timepix(PSIDeviceBase, Device):
def on_pre_scan(self) -> DeviceStatus | StatusBase | None:
"""Called right before the scan starts on all devices automatically."""
self.timepix_fly_client.start()
self.timepix_fly_client.start() # --> State goes to setup
self._wait_for_state_condition("setup", timeout=5.0)
def on_trigger(self) -> DeviceStatus | StatusBase | None:
@@ -137,7 +138,9 @@ class Timepix(PSIDeviceBase, Device):
def on_complete(self) -> DeviceStatus | StatusBase | None:
"""Called to inquire if a device has completed a scans."""
self._wait_for_state_condition("config", timeout=5.0)
self._wait_for_state_condition(
"config", timeout=5.0
) # During measurment, the state is set to collect. Goes back to config after the measurement is done.
def on_kickoff(self) -> DeviceStatus | StatusBase | None:
"""Called to kickoff a device for a fly scan. Has to be called explicitly."""
@@ -207,13 +210,16 @@ class Timepix(PSIDeviceBase, Device):
continue
logger.info(f"Received data: {len(chunk)} bytes")
buffer += chunk.decode("utf-8") # Trailing byte, i.e. -> "\n"
self._global_buffer += (
buffer # Append to global buffer, #TODO check why endfram misses
)
if buffer.endswith("}\n"):
self._decode_received_data(buffer)
buffer = "" # Reset buffer after processing
# Ignore timeout exception on socket aslong as server_thread_event is not set
except socket.timeout:
continue
pass
def _decode_received_data(self, buffer: str) -> None:
"""
@@ -227,6 +233,10 @@ class Timepix(PSIDeviceBase, Device):
logger.warning(f"Failed to decode JSON from buffer: {buffer}")
# If decoding fails, append the data to the buffer and wait for more data
# TD Spectra is organized as TROIN * num_energy_points (send in startframe) + energy_point -> 'p'
# TROIN = 10, energy_points = 2
# TROIN: (0,0), (0,1), (1,0), (1,1), (2,0), (2,1), (3,0), (3,1), (4,0), (4,1), etc...
def _reset_buffers(self):
"""Reset the data buffers."""
logger.info(f"Resetting data buffers for {self.name}.")
@@ -274,71 +284,72 @@ class Timepix(PSIDeviceBase, Device):
# pylint: disable=protected-access
if __name__ == "__main__":
# from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_mock_server import (
# TimePixFlyMockServer,
# )
# mock_server = TimePixFlyMockServer()
# # Create a Timepix object
# timepix = Timepix(name="TimePixDetector")
# timepix.on_connected()
# timepix.stage()
# timepix.pre_scan()
# mock_server.start_acquisition()
# time.sleep(5)
# timepix.complete()
# print(timepix._data_buffer)
# timepix.unstage()
timepix = Timepix(name="TimePixDetector")
from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_mock_server import (
TimePixFlyMockServer,
)
mock_server = TimePixFlyMockServer()
# Create a Timepix object
timepix = Timepix(name="TimePixDetector")
timepix.on_connected()
infos = socket.getaddrinfo("localhost", None)
print(infos)
print(f"Last error: {timepix.timepix_fly_client.last_error().message}")
print(f"TimePix version: {timepix.timepix_fly_client.version().version}")
print(f"TimePix state: {timepix.timepix_fly_client.state().state}")
timepix.timepix_fly_client.set_other_config(
OtherConfigModel(
output_uri=f"tcp:{timepix._data_server_host}:{timepix._data_server_port}",
TRoiStep=1,
TRoiN=5000,
)
)
print(f"Other config: {timepix.timepix_fly_client.get_other_config()}")
# print(f"Pixel map from file: {timepix.timepix_fly_client.get_pixel_map()}")
# PixelMap does not work yet
pixel_map = PixelMap(
chips=[
[{"i": 256 ^ 2 - 1, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255 * 256, "p": [0, 1], "f": [0.5, 0.5]}],
[{"i": 255, "p": [1, 2], "f": [0.5, 0.5]}],
[{"i": 0, "p": [1, 2], "f": [0.5, 0.5]}],
]
)
timepix.timepix_fly_client.set_pixel_map(pixel_map)
# test = timepix.timepix_fly_client.get_pixel_map() # TODO Throws an error at the moment
# print(test)
print(f"Initialized {timepix.name} with prefix {timepix.prefix}")
timepix.timepix_fly_client.start()
mock_server.start_acquisition() # Start the mock server acquisition
time.sleep(
1
) # Wait for server stats to go to ready to start acquisition# Start acquisition, this will go through EPICS interface probably
timepix._data_server_thread_event.wait(timeout=1.0)
timepix._data_server_thread_event.set()
timepix.stage()
timepix.pre_scan()
mock_server.start_acquisition()
time.sleep(5)
timepix.complete()
print(timepix._data_buffer)
# raise RuntimeError("Stopping the server thread for testing purposes.")
data = [el in timepix._data for el in timepix._data]
# timepix._stop_data_server_thread()
print("Data server thread stopped.")
print(timepix._global_buffer[-100:])
timepix.unstage()
# timepix = Timepix(name="TimePixDetector")
# from superxas_bec.devices.timepix.timepix_fly_client.timepix_fly_mock_server import (
# TimePixFlyMockServer,
# )
# mock_server = TimePixFlyMockServer()
# timepix.on_connected()
# infos = socket.getaddrinfo("localhost", None)
# print(infos)
# print(f"Last error: {timepix.timepix_fly_client.last_error().message}")
# print(f"TimePix version: {timepix.timepix_fly_client.version().version}")
# print(f"TimePix state: {timepix.timepix_fly_client.state().state}")
# timepix.timepix_fly_client.set_other_config(
# OtherConfigModel(
# output_uri=f"tcp:{timepix._data_server_host}:{timepix._data_server_port}",
# TRoiStep=1,
# TRoiN=5000,
# )
# )
# print(f"Other config: {timepix.timepix_fly_client.get_other_config()}")
# # print(f"Pixel map from file: {timepix.timepix_fly_client.get_pixel_map()}")
# # PixelMap does not work yet
# pixel_map = PixelMap(
# chips=[
# [{"i": 256 ^ 2 - 1, "p": [0, 1], "f": [0.5, 0.5]}],
# [{"i": 255 * 256, "p": [0, 1], "f": [0.5, 0.5]}],
# [{"i": 255, "p": [1, 2], "f": [0.5, 0.5]}],
# [{"i": 0, "p": [1, 2], "f": [0.5, 0.5]}],
# ]
# )
# timepix.timepix_fly_client.set_pixel_map(pixel_map)
# # test = timepix.timepix_fly_client.get_pixel_map() # TODO Throws an error at the moment
# # print(test)
# print(f"Initialized {timepix.name} with prefix {timepix.prefix}")
# timepix.timepix_fly_client.start()
# mock_server.start_acquisition() # Start the mock server acquisition
# time.sleep(
# 1
# ) # Wait for server stats to go to ready to start acquisition# Start acquisition, this will go through EPICS interface probably
# timepix._data_server_thread_event.wait(timeout=1.0)
# timepix._data_server_thread_event.set()
# print(timepix._data_buffer)
# # raise RuntimeError("Stopping the server thread for testing purposes.")
# data = [el in timepix._data for el in timepix._data]
# # timepix._stop_data_server_thread()
# print("Data server thread stopped.")

View File

@@ -0,0 +1,18 @@
#!/usr/bin/env python
"""Client using the asyncio API."""
import asyncio
from websockets.asyncio.client import connect
async def hello():
async with connect("ws://localhost:8452/ws") as websocket:
await websocket.send("Hello world!")
while True:
message = await websocket.recv()
print(message)
if __name__ == "__main__":
asyncio.run(hello())