Files
swissmx_tools/jfj/jfjoch_device.py
2025-06-13 13:44:40 +02:00

130 lines
3.6 KiB
Python

import sys
sys.path.append('/sf/cristallina/applications/mx/jungfraujoch_openapi_python')
from datetime import datetime
from time import sleep
import string
import openapi_client
from openapi_client.models.dataset_settings import DatasetSettings
from openapi_client.rest import ApiException
ERROR_STATUS_WAITING = 504
ALLOWED_CHARS = set(
string.ascii_letters + string.digits + "_-+/"
)
def character_cleanup(s, default="_", allowed=ALLOWED_CHARS):
return "".join(i if i in allowed else default for i in s)
class JFJ:
def __init__(self, host):
configuration = openapi_client.Configuration(
host = host
)
api_client = openapi_client.ApiClient(configuration)
self.api = openapi_client.DefaultApi(api_client)
self.actually_init()
def actually_init(self):
status = self.get_daq_status()
print(f"{status=}")
if status != "Idle":
self.api.initialize_post()
self.actually_wait_till_done()
status = self.get_daq_status()
if status != "Idle":
raise RuntimeError(f"status is not Idle but {status}")
def actually_wait_till_done(self):
while True:
try:
done = self.api.wait_till_done_post()
except Exception as e:
if e.status != ERROR_STATUS_WAITING:
print(e)
raise e
else:
#TODO: use number_of_triggers_left for progress...
if done is None: # this seems to be coming instead of: status == 200
return
#sleep(0.1) #TODO
def get_daq_status(self):
return self.api.status_get().state
def get_detector_status(self):
return self.api.detector_status_get()#.state
def get_detectors(self):
return self.api.config_select_detector_get()
def get_detector_config(self):
return self.api.config_detector_get()
def acquire(self, file_prefix, **kwargs):
#print('got here')
status = self.get_daq_status()
if status != "Idle":
raise RuntimeError(f"status is not Idle but {status}")
file_prefix = character_cleanup(file_prefix)
#print('did a filename check')
dataset_settings = openapi_client.DatasetSettings(file_prefix=file_prefix, **kwargs)
#print('dataset_settings set')
self.api.start_post(dataset_settings=dataset_settings)
#print('did start_post')
#self.actually_wait_till_done()
def take_pedestal(self):
self.api.pedestal_post()
self.actually_wait_till_done()
if __name__ == "__main__":
#jfj = JFJ("http://sf-daq-2:5232")
jfj = JFJ("http://sf-daq-2:8080")
dets = jfj.get_detectors()
print(f"{dets=}")
cfg = jfj.get_detector_config()
print(f"{cfg=}")
stat = jfj.get_detector_status()
print(f"{stat=}")
#jfj.take_pedestal()
now = datetime.now().strftime("%Y%m%d-%H%M")
name = f"test_{now}"
# name = f"!@#$%"
jfj.acquire(
beam_x_pxl = 1613,
beam_y_pxl = 1666,
detector_distance_mm = 151,
incident_energy_keV = 10.9,
sample_name = name,
file_prefix = name,
ntrigger = 10
)
#["images_per_trigger", "ntrigger", "summation", "beam_x_pxl", "beam_y_pxl", "detector_distance_mm", "incident_energy_keV", "file_prefix", "images_per_file", "space_group_number", "sample_name", "fpga_output", "compression", "total_flux", "transmission", "goniometer", "header_appendix", "image_appendix", "energy_multiplier", "data_reduction_factor_serialmx", "run_number", "experiment_group", "unit_cell"]