import logging import os import subprocess import sys from datetime import datetime, timezone import zoneinfo from typing import List import h5py import numpy as np from orsopy import fileio from orsopy.fileio.model_language import SampleModel from . import const from .header import Header from .instrument import Detector from .options import ExperimentConfig, ReaderConfig try: from . import nb_helpers except Exception: nb_helpers = None class AmorData: """read meta-data and event streams from .hdf file(s), apply filters and conversions""" chopperDetectorDistance: float chopperDistance: float chopperPhase: float chopperSpeed: float div: float data_file_numbers: List[int] delta_z: np.ndarray detZ_e: np.ndarray lamda_e: np.ndarray wallTime_e: np.ndarray kad: float kap: float lambdaMax: float lambda_e: np.ndarray #monitor: float mu: float nu: float tau: float tofCut: float start_date: str monitorType: str seriesStartTime = None #------------------------------------------------------------------------------------------------- def __init__(self, header: Header, reader_config: ReaderConfig, config: ExperimentConfig, short_notation:str, norm=False): #self.startTime = reader_config.startTime self.header = header self.config = config self.reader_config = reader_config self.expand_file_list(short_notation) self.read_data(norm=norm) #------------------------------------------------------------------------------------------------- def read_data(self, norm=False): self.file_list = [] for number in self.data_file_numbers: self.file_list.append(self.path_generator(number)) ## read specific meta data and measurement from first file if norm: self.readHeaderInfo = False else: self.readHeaderInfo = True _detZ_e = [] _lamda_e = [] _wallTime_e = [] #_monitor = 0 _monitorPerPulse = [] _pulseTimeS = [] for file in self.file_list: self.read_individual_data(file, norm) _detZ_e = np.append(_detZ_e, self.detZ_e) _lamda_e = np.append(_lamda_e, self.lamda_e) _wallTime_e = np.append(_wallTime_e, self.wallTime_e) _monitorPerPulse = np.append(_monitorPerPulse, self.monitorPerPulse) _pulseTimeS = np.append(_pulseTimeS, self.pulseTimeS) #_monitor += self.monitor self.detZ_e = _detZ_e self.lamda_e = _lamda_e self.wallTime_e = _wallTime_e #self.monitor = _monitor self.monitorPerPulse = _monitorPerPulse self.pulseTimeS = _pulseTimeS #------------------------------------------------------------------------------------------------- #def path_generator(self, number): # fileName = f'amor{self.reader_config.year}n{number:06d}.hdf' # if os.path.exists(os.path.join(self.reader_config.dataPath,fileName)): # path = self.reader_config.dataPath # elif os.path.exists(fileName): # path = '.' # elif os.path.exists(os.path.join('.','raw', fileName)): # path = os.path.join('.','raw') # elif os.path.exists(os.path.join('..','raw', fileName)): # path = os.path.join('..','raw') # elif os.path.exists(f'/afs/psi.ch/project/sinqdata/{self.reader_config.year}/amor/{int(number/1000)}/{fileName}'): # path = f'/afs/psi.ch/project/sinqdata/{self.reader_config.year}/amor/{int(number/1000)}' # else: # sys.exit(f'# ERROR: the file {fileName} is nowhere to be found!') # return os.path.join(path, fileName) #------------------------------------------------------------------------------------------------- def path_generator(self, number): fileName = f'amor{self.reader_config.year}n{number:06d}.hdf' path = '' for rawd in self.reader_config.rawPath: if os.path.exists(os.path.join(rawd,fileName)): path = rawd break if not path: if os.path.exists(f'/afs/psi.ch/project/sinqdata/{self.reader_config.year}/amor/{int(number/1000)}/{fileName}'): path = f'/afs/psi.ch/project/sinqdata/{self.reader_config.year}/amor/{int(number/1000)}' else: sys.exit(f'# ERROR: the file {fileName} can not be found in {self.reader_config.rawPath}') return os.path.join(path, fileName) #------------------------------------------------------------------------------------------------- def expand_file_list(self, short_notation): """Evaluate string entry for file number lists""" #log().debug('Executing get_flist') file_list=[] for i in short_notation.split(','): if '-' in i: if ':' in i: step = i.split(':', 1)[1] file_list += range(int(i.split('-', 1)[0]), int((i.rsplit('-', 1)[1]).split(':', 1)[0])+1, int(step)) else: step = 1 file_list += range(int(i.split('-', 1)[0]), int(i.split('-', 1)[1])+1, int(step)) else: file_list += [int(i)] self.data_file_numbers=sorted(file_list) #------------------------------------------------------------------------------------------------- def resolve_pixels(self): """determine spatial coordinats and angles from pixel number""" nPixel = Detector.nWires * Detector.nStripes * Detector.nBlades pixelID = np.arange(nPixel) (bladeNr, bPixel) = np.divmod(pixelID, Detector.nWires * Detector.nStripes) (bZi, detYi) = np.divmod(bPixel, Detector.nStripes) # z index on blade, y index on detector detZi = bladeNr * Detector.nWires + bZi # z index on detector detX = bZi * Detector.dX # x position in detector # detZ = Detector.zero - bladeNr * Detector.bladeZ - bZi * Detector.dZ # z position on detector bladeAngle = np.rad2deg( 2. * np.arcsin(0.5*Detector.bladeZ / Detector.distance) ) delta = (Detector.nBlades/2. - bladeNr) * bladeAngle \ - np.rad2deg( np.arctan(bZi*Detector.dZ / ( Detector.distance + bZi * Detector.dX) ) ) self.delta_z = delta[detYi==1] return np.vstack((detYi.T, detZi.T, detX.T, delta.T)).T #------------------------------------------------------------------------------------------------- def read_individual_data(self, fileName, norm=False): self.hdf = h5py.File(fileName, 'r', swmr=True) if self.readHeaderInfo: self.read_header_info() logging.warning(f' from file: {fileName}') self.read_individual_header() # add header content if self.readHeaderInfo: self.readHeaderInfo = False self.header.measurement_instrument_settings = fileio.InstrumentSettings( incident_angle = fileio.ValueRange(round(self.mu+self.kap+self.kad-0.5*self.div, 3), round(self.mu+self.kap+self.kad+0.5*self.div, 3), 'deg'), wavelength = fileio.ValueRange(const.lamdaCut, self.config.lambdaRange[1], 'angstrom'), polarization = fileio.Polarization.unpolarized, ) self.header.measurement_instrument_settings.mu = fileio.Value(round(self.mu, 3), 'deg', comment='sample angle to horizon') self.header.measurement_instrument_settings.nu = fileio.Value(round(self.nu, 3), 'deg', comment='detector angle to horizon') self.header.measurement_instrument_settings.div = fileio.Value(round(self.div, 3), 'deg', comment='incoming beam divergence') self.header.measurement_instrument_settings.kap = fileio.Value(round(self.kap, 3), 'deg', comment='incoming beam inclination') if abs(self.kad)>0.02: self.header.measurement_instrument_settings.kad = fileio.Value(round(self.kad, 3), 'deg', comment='incoming beam angular offset') if norm: self.header.measurement_additional_files.append(fileio.File(file=fileName.split('/')[-1], timestamp=self.fileDate)) else: self.header.measurement_data_files.append(fileio.File(file=fileName.split('/')[-1], timestamp=self.fileDate)) logging.info(f' mu = {self.mu:6.3f}, nu = {self.nu:6.3f}, kap = {self.kap:6.3f}, kad = {self.kad:6.3f}') self.read_event_stream() totalNumber = np.shape(self.tof_e)[0] # check for empty event stream if totalNumber == 0: logging.error('empty event stream: can not determine end time') sys.exit() self.sort_pulses() self.associate_pulse_with_monitor() self.extract_walltime(norm) # following lines: debugging output to trace the time-offset of proton current and neutron pulses if self.config.monitorType == 'x': cpp, t_bins = np.histogram(self.wallTime_e, self.pulseTimeS) np.savetxt('tme.hst', np.vstack((self.pulseTimeS[:-1], cpp, self.monitorPerPulse[:-1])).T) #self.average_events_per_pulse() # for debugging only. VERY time consuming!!! self.monitor_threshold() self.filter_strange_times() self.merge_frames() self.filter_project_x() self.correct_for_chopper_opening() self.calculate_derived_properties() self.filter_qz_range(norm) logging.info(f' number of events: total = {totalNumber:7d}, filtered = {np.shape(self.lamda_e)[0]:7d}') def sort_pulses(self): chopperPeriod = np.int64(2*self.tau*1e9) pulseTime = np.sort(self.dataPacketTime_p) pulseTime = pulseTime[np.abs(pulseTime[:]-np.roll(pulseTime, 1)[:])>5] pulseTime -= np.int64(self.seriesStartTime) self.stopTime = pulseTime[-1] pulseTime = pulseTime[pulseTime>=0] # fill in missing pulse times # TODO: check for real end time self.pulseTimeS = np.array([], dtype=np.int64) try: # further files # TODO: use the first pulse of the respective measurement #nextPulseTime = startTime % np.int64(self.tau*2e9) #nextPulseTime = self.pulseTimeS[-1] + chopperPeriod nextPulseTime = pulseTime[0] except AttributeError: # first file nextPulseTime = pulseTime[0] % np.int64(self.tau*2e9) for tt in pulseTime: while tt - nextPulseTime > self.tau*1e9: self.pulseTimeS = np.append(self.pulseTimeS, nextPulseTime) nextPulseTime += chopperPeriod self.pulseTimeS = np.append(self.pulseTimeS, tt) nextPulseTime = self.pulseTimeS[-1] + chopperPeriod def get_current_per_pulse(self, pulseTimeS, currentTimeS, currents): # add currents for early pulses and current time value after last pulse (j+1) currentTimeS = np.hstack([[0], currentTimeS, [pulseTimeS[-1]+1]]) currents = np.hstack([[0], currents]) pulseCurrentS = np.zeros(pulseTimeS.shape[0], dtype=float) j = 0 for i, ti in enumerate(pulseTimeS): if ti >= currentTimeS[j+1]: j += 1 pulseCurrentS[i] = currents[j] #print(f' {i} {pulseTimeS[i]} {pulseCurrentS[i]}') return pulseCurrentS def associate_pulse_with_monitor(self): if self.config.monitorType == 'p': # protonCharge self.currentTime -= np.int64(self.seriesStartTime) self.currentTime -= np.int64(16e9) # time offset of proton current signal self.monitorPerPulse = self.get_current_per_pulse(self.pulseTimeS, self.currentTime, self.current) * 2*self.tau * 1e-3 # filter low-current pulses self.monitorPerPulse = np.where(self.monitorPerPulse > 2*self.tau * self.config.lowCurrentThreshold * 1e-3, self.monitorPerPulse, 0) elif self.config.monitorType == 't': # countingTime self.monitorPerPulse = np.ones(np.shape(self.pulseTimeS)[0])*self.tau else: self.monitorPerPulse = 1./np.shape(pulseTimeS)[1] def extract_walltime(self, norm): if nb_helpers: self.wallTime_e = nb_helpers.extract_walltime(self.tof_e, self.dataPacket_p, self.dataPacketTime_p) else: self.wallTime_e = np.empty(np.shape(self.tof_e)[0], dtype=np.int64) for i in range(len(self.dataPacket_p)-1): self.wallTime_e[self.dataPacket_p[i]:self.dataPacket_p[i+1]] = self.dataPacketTime_p[i] self.wallTime_e[self.dataPacket_p[-1]:] = self.dataPacketTime_p[-1] self.wallTime_e -= np.int64(self.seriesStartTime) logging.debug(f' wall time from {self.wallTime_e[0]/1e9:6.1f} s to {self.wallTime_e[-1]/1e9:6.1f} s') def average_events_per_pulse(self): if self.config.monitorType == 'p': for i, time in enumerate(self.pulseTimeS): events = np.shape(self.wallTime_e[self.wallTime_e == time])[0] logging.info(f'pulse: {i:6.0f}, events: {events:6.0f}, monitor: {self.monitorPerPulse[i]:6.2f}') def monitor_threshold(self): if self.config.monitorType == 'p': # fix to check for file compatibility goodTimeS = self.pulseTimeS[self.monitorPerPulse!=0] filter_e = np.where(np.isin(self.wallTime_e, goodTimeS), True, False) self.tof_e = self.tof_e[filter_e] self.pixelID_e = self.pixelID_e[filter_e] self.wallTime_e = self.wallTime_e[filter_e] logging.info(f' rejected {np.shape(self.monitorPerPulse)[0]-np.shape(goodTimeS)[0]} out of {np.shape(self.monitorPerPulse)[0]} pulses') logging.info(f' with {np.shape(filter_e)[0]-np.shape(self.tof_e)[0]} events due to low beam current') logging.info(f' average counts per pulse = {np.shape(self.tof_e)[0] / np.shape(goodTimeS[goodTimeS!=0])[0]:7.1f}') def filter_qz_range(self, norm): if self.config.qzRange[1]<0.3 and not norm: self.mask_e = np.logical_and(self.mask_e, (self.config.qzRange[0]<=self.qz_e) & (self.qz_e<=self.config.qzRange[1])) self.detZ_e = self.detZ_e[self.mask_e] self.lamda_e = self.lamda_e[self.mask_e] self.wallTime_e = self.wallTime_e[self.mask_e] def calculate_derived_properties(self): self.lamdaMax = const.lamdaCut+1.e13*self.tau*const.hdm/(self.chopperDetectorDistance+124.) if nb_helpers: self.lamda_e, self.qz_e, self.mask_e = nb_helpers.calculate_derived_properties_focussing( self.tof_e, self.detXdist_e, self.delta_e, self.mask_e, self.config.lambdaRange[0], self.config.lambdaRange[1], self.nu, self.mu, self.chopperDetectorDistance, const.hdm ) return # lambda self.lamda_e = (1.e13*const.hdm)*self.tof_e/(self.chopperDetectorDistance+self.detXdist_e) self.mask_e = np.logical_and(self.mask_e, (self.config.lambdaRange[0]<=self.lamda_e) & ( self.lamda_e<=self.config.lambdaRange[1])) # alpha_f # q_z if self.config.incidentAngle == 'alphaF': alphaF_e = self.nu - self.mu + self.delta_e self.qz_e = 4*np.pi*(np.sin(np.deg2rad(alphaF_e))/self.lamda_e) # qx_e = 0. self.header.measurement_scheme = 'angle- and energy-dispersive' elif self.config.incidentAngle == 'nu': alphaF_e = (self.nu + self.delta_e + self.kap + self.kad) / 2. self.qz_e = 4*np.pi*(np.sin(np.deg2rad(alphaF_e))/self.lamda_e) # qx_e = 0. self.header.measurement_scheme = 'energy-dispersive' else: alphaF_e = self.nu - self.mu + self.delta_e alphaI = self.kap + self.kad + self.mu self.qz_e = 2*np.pi * ((np.sin(np.deg2rad(alphaF_e)) + np.sin(np.deg2rad(alphaI)))/self.lamda_e) self.qx_e = 2*np.pi * ((np.cos(np.deg2rad(alphaF_e)) - np.cos(np.deg2rad(alphaI)))/self.lamda_e) self.header.measurement_scheme = 'energy-dispersive' def correct_for_chopper_opening(self): # correct tof for beam size effect at chopper: t_cor = (delta / 180 deg) * tau if self.config.incidentAngle == 'alphaF': self.tof_e -= ( self.delta_e / 180. ) * self.tau else: # TODO: check sign of correction self.tof_e -= ( self.kad / 180. ) * self.tau def filter_project_x(self): pixelLookUp = self.resolve_pixels() if nb_helpers: (self.detZ_e, self.detXdist_e, self.delta_e, self.mask_e) = nb_helpers.filter_project_x( pixelLookUp, self.pixelID_e.astype(np.int64), self.config.yRange[0], self.config.yRange[1] ) else: # resolve pixel ID into y and z indicees, x position and angle (detY_e, self.detZ_e, self.detXdist_e, self.delta_e) = pixelLookUp[np.int_(self.pixelID_e)-1, :].T # define mask and filter y range self.mask_e = (self.config.yRange[0]<=detY_e) & (detY_e<=self.config.yRange[1]) def merge_frames(self): total_offset = self.tofCut+self.tau*self.config.chopperPhaseOffset/180. if nb_helpers: self.tof_e = nb_helpers.merge_frames(self.tof_e, self.tofCut, self.tau, total_offset) else: self.tof_e = np.remainder(self.tof_e-(self.tofCut-self.tau), self.tau)+total_offset # tof shifted to 1 frame def filter_strange_times(self): # 'strange' tof times are those with t > 2 tau (originating from the efu) filter_e = (self.tof_e<=2*self.tau) self.tof_e = self.tof_e[filter_e] self.pixelID_e = self.pixelID_e[filter_e] self.wallTime_e = self.wallTime_e[filter_e] if np.shape(filter_e)[0]-np.shape(self.tof_e)[0]>0.5: logging.warning(f' strange times: {np.shape(filter_e)[0]-np.shape(self.tof_e)[0]}') def read_event_stream(self): self.tof_e = np.array(self.hdf['/entry1/Amor/detector/data/event_time_offset'][:])/1.e9 self.pixelID_e = np.array(self.hdf['/entry1/Amor/detector/data/event_id'][:], dtype=np.int64) self.dataPacket_p = np.array(self.hdf['/entry1/Amor/detector/data/event_index'][:], dtype=np.uint64) self.dataPacketTime_p = np.array(self.hdf['/entry1/Amor/detector/data/event_time_zero'][:], dtype=np.int64) if self.config.monitorType in ['auto', 'p']: try: self.currentTime = np.array(self.hdf['entry1/Amor/detector/proton_current/time'][:], dtype=np.int64) self.current = np.array(self.hdf['entry1/Amor/detector/proton_current/value'][:,0], dtype=float) if len(self.current)>4: self.config.monitorType = 'p' else: self.config.monitorType = 't' except(KeyError, IndexError): self.config.monitorType = 't' else: self.config.monitorType = 't' #TODO: protonMonitor def read_individual_header(self): self.chopperDistance = float(np.take(self.hdf['entry1/Amor/chopper/pair_separation'], 0)) self.detectorDistance = float(np.take(self.hdf['entry1/Amor/detector/transformation/distance'], 0)) self.chopperDetectorDistance = self.detectorDistance-float(np.take(self.hdf['entry1/Amor/chopper/distance'], 0)) self.tofCut = const.lamdaCut*self.chopperDetectorDistance/const.hdm*1.e-13 try: self.mu = float(np.take(self.hdf['/entry1/Amor/master_parameters/mu/value'], 0)) self.nu = float(np.take(self.hdf['/entry1/Amor/master_parameters/nu/value'], 0)) self.kap = float(np.take(self.hdf['/entry1/Amor/master_parameters/kap/value'], 0)) self.kad = float(np.take(self.hdf['/entry1/Amor/master_parameters/kad/value'], 0)) self.div = float(np.take(self.hdf['/entry1/Amor/master_parameters/div/value'], 0)) self.chopperSpeed = float(np.take(self.hdf['/entry1/Amor/chopper/rotation_speed/value'], 0)) self.chopperPhase = float(np.take(self.hdf['/entry1/Amor/chopper/phase/value'], 0)) except(KeyError, IndexError): logging.warning(" using parameters from nicos cache") year_date = str(self.start_date).replace('-', '/', 1) #cachePath = '/home/amor/nicosdata/amor/cache/' #cachePath = '/home/nicos/amorcache/' cachePath = '/home/amor/cache/' value = str(subprocess.getoutput(f'/usr/bin/grep "value" {cachePath}nicos-mu/{year_date}')).split('\t')[-1] self.mu = float(value) value = str(subprocess.getoutput(f'/usr/bin/grep "value" {cachePath}nicos-nu/{year_date}')).split('\t')[-1] self.nu = float(value) value = str(subprocess.getoutput(f'/usr/bin/grep "value" {cachePath}nicos-kap/{year_date}')).split('\t')[-1] self.kap = float(value) value = str(subprocess.getoutput(f'/usr/bin/grep "value" {cachePath}nicos-kad/{year_date}')).split('\t')[-1] self.kad = float(value) value = str(subprocess.getoutput(f'/usr/bin/grep "value" {cachePath}nicos-div/{year_date}')).split('\t')[-1] self.div = float(value) value = str(subprocess.getoutput(f'/usr/bin/grep "value" {cachePath}nicos-ch1_speed/{year_date}')).split('\t')[-1] self.chopperSpeed = float(value) self.chopperPhase = self.config.chopperPhase self.tau = 30. / self.chopperSpeed if self.config.muOffset: self.mu += self.config.muOffset if self.config.mu: self.mu = self.config.mu if self.config.nu: self.nu = self.config.nu # extract start time as unix time, adding UTC offset of 1h to time string tz = zoneinfo.ZoneInfo(key='Europe/Zurich') dz = datetime.fromisoformat(self.hdf['/entry1/start_time'][0].decode('utf-8')) self.fileDate=datetime(dz.year, dz.month, dz.day, dz.hour, dz.minute, dz.second, tzinfo=tz) #timeOffset = f'{int(str(tz.utcoffset(dz)).split(':')[0]):+03d}' #self.fileDate = datetime.fromisoformat( self.hdf['/entry1/start_time'][0].decode('utf-8')+timeOffset ) self.startTime = np.int64( (self.fileDate.timestamp() ) * 1e9 ) if self.seriesStartTime is None: self.seriesStartTime = self.startTime def read_header_info(self): # read general information and first data set logging.info(f' meta data from: {self.file_list[0]}') self.hdf = h5py.File(self.file_list[0], 'r', swmr=True) title = self.hdf['entry1/title'][0].decode('utf-8') proposal_id = self.hdf['entry1/proposal_id'][0].decode('utf-8') user_name = self.hdf['entry1/user/name'][0].decode('utf-8') user_affiliation = 'unknown' user_email = self.hdf['entry1/user/email'][0].decode('utf-8') user_orcid = None sampleName = self.hdf['entry1/sample/name'][0].decode('utf-8') model = self.hdf['entry1/sample/model'][0].decode('utf-8') instrumentName = 'Amor' source = self.hdf['entry1/Amor/source/name'][0].decode('utf-8') sourceProbe = 'neutron' start_time = self.hdf['entry1/start_time'][0].decode('utf-8') self.start_date = start_time.split(' ')[0] if self.config.sampleModel: model = self.config.sampleModel # assembling orso header information self.header.owner = fileio.Person( name=user_name, affiliation=user_affiliation, contact=user_email, ) if user_orcid: self.header.owner.orcid = user_orcid self.header.experiment = fileio.Experiment( title=title, instrument=instrumentName, start_date=self.start_date, probe=sourceProbe, facility=source, proposalID=proposal_id ) self.header.sample = fileio.Sample( name=sampleName, model=SampleModel(stack=model), sample_parameters=None, ) self.header.measurement_scheme = 'angle- and energy-dispersive'