import sys sys.path.append('/sf/cristallina/applications/mx/jungfraujoch_openapi_python') from datetime import datetime from time import sleep import string import openapi_client from openapi_client.models.dataset_settings import DatasetSettings from openapi_client.rest import ApiException ERROR_STATUS_WAITING = 504 ALLOWED_CHARS = set( string.ascii_letters + string.digits + "_-+/" ) def character_cleanup(s, default="_", allowed=ALLOWED_CHARS): return "".join(i if i in allowed else default for i in s) class JFJ: def __init__(self, host): configuration = openapi_client.Configuration( host = host ) api_client = openapi_client.ApiClient(configuration) self.api = openapi_client.DefaultApi(api_client) self.actually_init() def actually_init(self): status = self.get_daq_status() print(f"{status=}") if status != "Idle": self.api.initialize_post() self.actually_wait_till_done() status = self.get_daq_status() if status != "Idle": raise RuntimeError(f"status is not Idle but {status}") def actually_wait_till_done(self): while True: try: done = self.api.wait_till_done_post() except Exception as e: if e.status != ERROR_STATUS_WAITING: print(e) raise e else: #TODO: use number_of_triggers_left for progress... if done is None: # this seems to be coming instead of: status == 200 return #sleep(0.1) #TODO def get_daq_status(self): return self.api.status_get().state def get_detector_status(self): return self.api.detector_status_get()#.state def get_detectors(self): return self.api.config_select_detector_get() def get_detector_config(self): return self.api.config_detector_get() def acquire(self, file_prefix, **kwargs): #print('got here') status = self.get_daq_status() if status != "Idle": raise RuntimeError(f"status is not Idle but {status}") file_prefix = character_cleanup(file_prefix) #print('did a filename check') dataset_settings = openapi_client.DatasetSettings(file_prefix=file_prefix, **kwargs) #print('dataset_settings set') self.api.start_post(dataset_settings=dataset_settings) #print('did start_post') #self.actually_wait_till_done() def take_pedestal(self): self.api.pedestal_post() self.actually_wait_till_done() if __name__ == "__main__": #jfj = JFJ("http://sf-daq-2:5232") jfj = JFJ("http://sf-daq-2:8080") dets = jfj.get_detectors() print(f"{dets=}") cfg = jfj.get_detector_config() print(f"{cfg=}") stat = jfj.get_detector_status() print(f"{stat=}") #jfj.take_pedestal() now = datetime.now().strftime("%Y%m%d-%H%M") name = f"test_{now}" # name = f"!@#$%" jfj.acquire( beam_x_pxl = 1613, beam_y_pxl = 1666, detector_distance_mm = 151, incident_energy_keV = 10.9, sample_name = name, file_prefix = name, ntrigger = 10 ) #["images_per_trigger", "ntrigger", "summation", "beam_x_pxl", "beam_y_pxl", "detector_distance_mm", "incident_energy_keV", "file_prefix", "images_per_file", "space_group_number", "sample_name", "fpga_output", "compression", "total_flux", "transmission", "goniometer", "header_appendix", "image_appendix", "energy_multiplier", "data_reduction_factor_serialmx", "run_number", "experiment_group", "unit_cell"]