Files
csaxs_bec/bec_plugins/utils/csaxs_post_archive.py
2023-12-08 12:21:26 +01:00

183 lines
6.6 KiB
Python
Executable File

import os
import json
import subprocess
import requests
from bec_lib.file_utils import FileWriterMixin
#from bec_lib.bec_service import SERVICE_CONFIG
class csaxs_archiver:
"""Class to archive data from a beamtime.
To run this script from a shell, go to discovery.psi.ch and copy your token.
Complement the information in user_input below in the if __name__ == __main__ part of the script.
Afterwards, get a Keberos token (kinit) for yourself in the shell.
Activate the bec_venv by doing "source bec_venv/bin/activate" and then run this code via python $filename.
As a last step, adjust the dictionary below in if __name__ == '__main__' with your token as well as information about the experiment
"""
def __init__(
self,
start_scan: int,
stop_scan: int,
base_path: str,
log_path: str,
eacc: str,
pi: str,
pi_email: str,
token: str,
type: str = "raw",
overwrite: bool = False,
online: bool = True,
):
self.start_scan = start_scan
self.stop_scan = stop_scan
self.log_path = os.path.expanduser(log_path)
self.eacc = eacc
self.pi = pi
self.pi_email = pi_email
self.token = token
self.type = type
self.overwrite = overwrite
self.online = online
#from bec_lib.bec_service import SERVICE_CONFIG
#SERVICE_CONFIG.config["service_config"]["file_writer"]
self._load_datacatalogue_module()
self._create_directory(base_path)
self._disable_mail_confirmation()
self.service_cfg = {'base_path' : f'{self.base_path}'}
self.file_writer = FileWriterMixin(self.service_cfg)
def _disable_mail_confirmation(self):
# Define the URL and payload
url = "https://dacat.psi.ch/api/v3/Policies/updatewhere"
payload = {
"ownerGroupList": f'p{self.eacc[1:]}',
"data": '{"archiveEmailNotification": false, "accessGroups": ["slscsaxs"]}'
}
# Define headers
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
}
# Add the access_token to the URL
url += "?access_token=" + self.token
# Make a POST request
print(url, payload, headers)
response = requests.post(url, data=payload, headers=headers)
# Check the response
if response.status_code == 200:
print("Request was successful.")
print(response.json())
else:
print("Request failed with status code:", response.status_code)
print(response.text)
def _create_directory(self, base_path: str) -> None:
if self.online:
self.base_path = os.path.expanduser("~/Data10")
else:
self.base_path = base_path
if not os.path.exists(self.log_path):
os.makedirs(self.log_path)
def _load_datacatalogue_module(self):
command = 'module add datacatalog/1.1.9'
os.popen(command)
# result = subprocess.run(
# command,
# shell=False,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
# universal_newlines=True,
# )
# if result.returncode == 0:
# print(f"Command {command} was succesful")
# else:
# print(f"Failed to run command {command} with return message {result.returncode}")
def prep_metadata(self, scannr: int) -> dict:
user_metadata = {}
user_metadata.update(
{
"principalInvestigator": self.pi_email,
"owner": self.pi,
"ownerEmail": self.pi_email,
"sourceFolder": self.base_path,
"creationLocation": "/PSI/SLS/CSAXS",
"type": "raw",
"ownerGroup": f"p{self.eacc.strip('e')}",
"datasetName": f"S{scannr:05d}",
}
)
return user_metadata
def _write_ingestion_log(self, scannr: int) -> None:
...
def run_for_all_scans(self):
for scan in range(self.start_scan, self.stop_scan + 1):
print(f"Start ingestion for scan {scan}")
fname = os.path.join(os.path.expanduser(self.log_path), f"ingestion_log_S{scan:05d}")
self.datafile_name = f"{fname}.txt"
if os.path.isfile(self.datafile_name) and not self.overwrite == True:
print(
f"Skipping scan {scan}, already ingested due to logs, moving on to next scan {scan+1}"
)
continue
user_metadata = self.prep_metadata(scan)
# Write metadata file in json file
self.metadata_file = f"{fname}.json"
with open(self.metadata_file, "w") as file:
json.dump(user_metadata, file)
# Compile datapath based on structure a cSAXS
datadir_path = os.path.join('data', self.file_writer.get_scan_directory(scan, 1000, 5))
print(f"Archiving directory {datadir_path}")
if not os.path.isdir(os.path.join(self.base_path, datadir_path)):
print(f"Did not find directory {datadir_path}, skipping scan {scan}")
continue
# Create datafile path for archiving
with open(self.datafile_name, "w") as file:
file.write(datadir_path)
print(f"Starting ingestion for S#{scan}")
command = f'datasetIngestor -allowexistingsource -ingest -autoarchive -noninteractive -token {self.token} {self.metadata_file} {self.datafile_name}'
rtr = os.popen(command)
#with open(os.path.join(fname,'_log.txt'), "w") as file:
# print(f'Writing reponse to file')
# file.write(rtr.read())
# result = subprocess.run(command, shell=False, stdout = subprocess.PIPE, stderr = subprocess.PIPE, universal_newlines=True)
# if result.returncode == 0:
# print(f"Command {command} was succesful")
# else:
# print(f"Failed to run command {command} with return message {result.returncode}")
if __name__ == "__main__":
# Generate dictionary with user input.
user_input = {
"base_path": "~/Data10",
"eacc": "e20638",
"pi": "Emma Sparr",
"pi_email": "emma.sparr@fkem1.lu.se",
'log_path' : '~/Data10/documentation/ingestion_logs/',
'token' : 'YK8gkmQmEVxVxjiA57D6tVmpBVs7T235nWEuBT0behN9BPM2BdWARWPPgEsQVrPe',
'start_scan' : 1,
'stop_scan' : 450,
}
archiver = csaxs_archiver(**user_input)
archiver.run_for_all_scans()