import os import json import subprocess from bec_lib.core.file_utils import FileWriterMixin #from bec_lib.core.bec_service import SERVICE_CONFIG class csaxs_archiver: """Class to archive data from a beamtime. To run this script from a shell, go to discovery.psi.ch and copy your token. Afterwards, get a Keberos token (kinit) for yourself in the shell. Activate the bec_venv by doing "source bec_venv/bin/activate" and then run this code via python $filename. As a last step, adjust the dictionary below in if __name__ == '__main__' with your token as well as information about the experiment """ def __init__( self, start_scan: int, stop_scan: int, base_path: str, log_path: str, eacc: str, pi: str, pi_email: str, token: str, type: str = "raw", overwrite: bool = False, online: bool = True, ): self.start_scan = start_scan self.stop_scan = stop_scan self.log_path = os.path.expanduser(log_path) self.eacc = eacc self.pi = pi self.pi_email = pi_email self.token = token self.type = type self.overwrite = overwrite self.online = online #from bec_lib.core.bec_service import SERVICE_CONFIG #SERVICE_CONFIG.config["service_config"]["file_writer"] self._load_datacatalogue_module() self._create_directory(base_path) self.service_cfg = {'base_path' : f'{self.base_path}'} self.file_writer = FileWriterMixin(self.service_cfg) def _create_directory(self, base_path: str) -> None: if self.online: self.base_path = os.path.expanduser("~/Data10") else: self.base_path = base_path if not os.path.exists(self.log_path): os.makedirs(self.log_path) def _load_datacatalogue_module(self): command = 'module add datacatalog/1.1.9' os.popen(command) # result = subprocess.run( # command, # shell=False, # stdout=subprocess.PIPE, # stderr=subprocess.PIPE, # universal_newlines=True, # ) # if result.returncode == 0: # print(f"Command {command} was succesful") # else: # print(f"Failed to run command {command} with return message {result.returncode}") def prep_metadata(self, scannr: int) -> dict: user_metadata = {} user_metadata.update( { "principalInvestigator": self.pi_email, "owner": self.pi, "ownerEmail": self.pi_email, "sourceFolder": self.base_path, "creationLocation": "/PSI/SLS/CSAXS", "type": "raw", "ownerGroup": f"p{self.eacc.strip('e')}", "datasetName": f"S{scannr:05d}", } ) return user_metadata def _write_ingestion_log(self, scannr: int) -> None: ... def run_for_all_scans(self): for scan in range(self.start_scan, self.stop_scan + 1): print(f"Start ingestion for scan {scan}") fname = os.path.join(os.path.expanduser(self.log_path), f"ingestion_log_S{scan:05d}") self.datafile_name = f"{fname}.txt" if os.path.isfile(self.datafile_name) and not self.overwrite == True: print( f"Skipping scan {scan}, already ingested due to logs, moving on to next scan {scan+1}" ) continue user_metadata = self.prep_metadata(scan) # Write metadata file in json file self.metadata_file = f"{fname}.json" with open(self.metadata_file, "w") as file: json.dump(user_metadata, file) # Compile datapath based on structure a cSAXS datadir_path = os.path.join('data', self.file_writer.get_scan_directory(scan, 1000, 5)) print(f"Archiving directory {datadir_path}") if not os.path.isdir(os.path.join(self.base_path, datadir_path)): print(f"Did not find directory {datadir_path}, skipping scan {scan}") continue # Create datafile path for archiving with open(self.datafile_name, "w") as file: file.write(datadir_path) print(f"Starting ingestion for S#{scan}") command = f'datasetIngestor -allowexistingsource -ingest -autoarchive -noninteractive -token {self.token} {self.metadata_file} {self.datafile_name}' rtr = os.popen(command) #with open(os.path.join(fname,'_log.txt'), "w") as file: # print(f'Writing reponse to file') # file.write(rtr.read()) # result = subprocess.run(command, shell=False, stdout = subprocess.PIPE, stderr = subprocess.PIPE, universal_newlines=True) # if result.returncode == 0: # print(f"Command {command} was succesful") # else: # print(f"Failed to run command {command} with return message {result.returncode}") if __name__ == "__main__": # Generate dictionary with user input. user_input = { "base_path": "~/Data10", "eacc": "e20643", "pi": "Marianne Liebi", "pi_email": "marianne.liebi@psi.ch", 'log_path' : '~/Data10/documentation/ingestion_logs/' } archiver = csaxs_archiver(start_scan=1, stop_scan=349, token='', **user_input) archiver.run_for_all_scans()