Files
SP2XR/SP2XR_toolkit.py
2025-06-06 11:39:59 +02:00

2183 lines
96 KiB
Python

# -*- coding: utf-8 -*-
# %% Import libraries
import os
from collections import defaultdict
import dask.dataframe as dd
# import dask.delayed
import pandas as pd
import numpy as np
import re
import warnings
from scipy.optimize import curve_fit
from numpy.polynomial import Polynomial
import matplotlib.pyplot as plt
import seaborn as sns
from glob import glob
import datetime
import struct
import zipfile
from dask import delayed
import time
import ipywidgets as widgets
from IPython.display import display, clear_output
from matplotlib.backends.backend_pdf import PdfPages
# %% Functions for listing files with specific string in name
def find_files(directory, string, avoid='xxxxxxxxxx'):
'''
This function finds all the files that contain 'string' in a directory and its subdirs.
If the same file exsist with both .csv and .zip format, only the .csv file is kept in the list
Parameters
----------
directory : str
DESCRIPTION.
string : str
DESCRIPTION.
Returns
-------
filtered_paths : list of directories
DESCRIPTION.
'''
# Step 1: Find all matching files first
all_matching_files = []
for root, dirs, files in os.walk(directory):
for file in files:
if string in file and avoid not in root:
all_matching_files.append(os.path.join(root, file))
# Step 2: Organize files by their basename
files_by_basename = defaultdict(list)
for path in all_matching_files:
basename = os.path.splitext(os.path.basename(path))[0]
files_by_basename[basename].append(path)
# Step 3: Filter based on extension priority
filtered_paths = []
for basename, paths in files_by_basename.items():
if any(path.endswith('.csv') for path in paths) and any(path.endswith('.zip') for path in paths):
# Add only the CSV version
filtered_paths.extend([path for path in paths if path.endswith('.csv')])
else:
# If only one version exists, add it without checking the extension
filtered_paths.extend(paths)
# Step 4: Filter based on size ?
return filtered_paths
def get_file_dict(directory, file_type, level='hour'):
"""
Creates a dictionary with date and hour as keys and file paths as values
for the given directory.
"""
file_dict = {}
# Assuming the files have a structure like /path/to/directory/YYYYMMDD/HH/file.parquet
for file_path in glob(f'{directory}/**/*.parquet', recursive=True):
# Extracting date and hour from the file path
parts = file_path.split(os.sep)
date = parts[-3]
hour = parts[-2]
if level == 'hour':
file_dict[(date, hour)] = os.path.join('/', *parts[:-1])
elif level == 'date':
file_dict[(date)] = os.path.join('/', *parts[:-2])
return file_dict
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
def list_files_and_dirs(base_dir):
""" List full paths of all files and directories in the given directory, including nested ones """
file_list = []
subdir_list = []
# os.walk yields a tuple (dirpath, dirnames, filenames)
for dirpath, dirnames, filenames in os.walk(base_dir):
# Add directories to subdir_list
for dirname in dirnames:
subdir_full_path = os.path.join(dirpath, dirname)
subdir_list.append(subdir_full_path)
# Add files to file_list
for filename in filenames:
file_full_path = os.path.join(dirpath, filename)
file_list.append(file_full_path)
return subdir_list, file_list
def list_first_level_subdirs(base_dir):
""" List full paths of all first-level subdirectories within the given directory """
subdir_list = []
# List all entries in the base directory
for entry in os.listdir(base_dir):
# Create full path
full_path = os.path.join(base_dir, entry)
# Check if this path is a directory
if os.path.isdir(full_path):
subdir_list.append(full_path)
return subdir_list
#%% Functions to read .csv or .zip PbP or HK files
def extract_datetime(df):
'''
Thi function selects the datetime out of the SP2XR file name.
Parameters
----------
df : Pandas DataFrame
DataFRame conteining the column 'orig_file_name'.
Returns
-------
Pandas Series
Date and time corresponding to the date present in the 'orig_file_name' column.
'''
# return pd.to_datetime(df['orig_file_name'].split('_')[-2])
return pd.to_datetime(df['path'].split('_')[-2])
def calculate_delta_sec(df):
'''
This function calculates the difference in seconds between the columns
'Time (sec)' and 'first_val' present in the input dataframe
Parameters
----------
df : pandas dataframe
The columns 'Time (sec)' and 'first_val' must be present in the DataFrame.
Returns
-------
int
Floored seconds between the values in the two columns.
'''
return np.floor(df['Time (sec)']) - df['first_val']
@delayed
def read_csv_files_with_dask(file_path, meta_pbp, meta_hk, target_directory):
'''
This function reads Pbp or HK files from the SP2XR
Parameters
----------
file_path : str
Complete path of the file to read.
meta : pandas DataFrame
Empty pandas dataframe with the structure expected for the file that is read.
This is ised in case the file is empty --> The function will return an empty DataFrame
with this structure.
Returns
-------
Dask DataFrame
Content of the file as Dask DataFrame.
'''
if file_path:
tmp_hk = pd.DataFrame()
hk_0001 = re.sub(r'PbP', 'hk', file_path)
hk_0001 = re.sub(r'(_x)\d{4}', r'\g<1>0001', hk_0001)
hk_0001 = re.sub(r'\.(csv|zip)$', '', hk_0001)
if os.path.exists(f"{hk_0001}.csv"):
try:
tmp_hk = pd.read_csv(f"{hk_0001}.csv", nrows=1, parse_dates=['Time Stamp'], usecols=['Time Stamp', 'Time (sec)'])
except pd.errors.EmptyDataError:
tmp_hk = pd.DataFrame()
except zipfile.BadZipFile:
print(f'!! Bad file: {file_path}')
tmp_hk = pd.DataFrame()
elif os.path.exists(f"{hk_0001}.zip"):
try:
tmp_hk = pd.read_csv(f"{hk_0001}.zip", nrows=1, parse_dates=['Time Stamp'], usecols=['Time Stamp', 'Time (sec)'])
except pd.errors.EmptyDataError:
tmp_hk = pd.DataFrame()
if not tmp_hk.empty:
first_val, t0 = tmp_hk[['Time (sec)', 'Time Stamp']].values[0]
if 'PbP' in file_path:
temp = meta_pbp
data_type = pd.Series(temp.dtypes.values, index=temp.columns).to_dict()
try:
df = dd.read_csv(file_path, dtype=data_type, blocksize=None)#, include_path_column=True)
df = df.fillna(0) # is this because otherwise we cannot calculate the time_lag?
# df['time_lag'] = df['Incand Peak Time'] - df['Scatter Peak Time'] # 02.09.2024 this line implies that for particles with nan in the scatt transit time time_lag=incand transit time. better to calculate timelag for particles with both scatt and incand and set 0 for particles with only incand
#!!! MISSING CORRECT TIME LAG CALCULATIONS
except zipfile.BadZipFile:
print(f'!! Bad zip file: {file_path}')
df = pd.DataFrame()
elif 'hk' in file_path:
temp = meta_hk
data_type = pd.Series(temp.dtypes.values, index=temp.columns).to_dict()
filtered_dtype_dict = {key: value for key, value in data_type.items() if value != 'datetime64[ns]'}
try:
df = dd.read_csv(file_path, dtype=filtered_dtype_dict, parse_dates=['Time Stamp'], blocksize=None, assume_missing=True)
# df = dd.read_csv(file_path, dtype=data_type, parse_dates=['Time Stamp'], blocksize=None)#, assume_missing=True)
'''if 'Time Stamp' in df.columns:
datetime_format = '%m/%d/%Y %H:%M:%S.%f'
df['Time Stamp'] = df['Time Stamp'].map_partitions(pd.to_datetime, format=datetime_format, meta=('Time Stamp', 'datetime64[ns]'))
'''
except ValueError as e:
# Handle the error if the 'Time Stamp' column is missing or any other parsing error occurs
if 'Missing column provided to \'parse_dates\'' in str(e):
print(f"Error for {file_path}: Missing column provided to 'parse_dates': 'Time Stamp'")
df = pd.DataFrame()
except pd.errors.EmptyDataError:
df = pd.DataFrame()
except zipfile.BadZipFile:
print(f'!! Bad zip file: {file_path}')
df = pd.DataFrame()
if len(df.columns) > 0:
df = df.loc[~df.isna().all(axis=1)]
df['path'] = str(file_path)
df['first_val'] = first_val
df['t0'] = t0
file_name_cut = file_path.split('\\')[-1].split('_')[-2] + '_' + file_path.split('\\')[-1].split('_')[-1].split('.')[-2]
df['file'] = file_name_cut
folder_name = file_path.split('\\')[-1].split('_')[-2]
df['folder_name'] = folder_name
if 'Time Stamp' in df.columns:
df['Time Stamp'] = df['Time Stamp'].map_partitions(pd.to_datetime, meta=('Time Stamp', 'datetime64[ns]'))
df['delta_sec'] = df.map_partitions(calculate_delta_sec, meta=('delta_sec', 'float64'))
df['calculated_time'] = df['t0'] + dd.to_timedelta(df['delta_sec'], unit='s')
df['file_datetime'] = df.apply(extract_datetime, axis=1, meta=('file_datetime', 'datetime64[ns]'))
df['date_floored'] = df['calculated_time'].dt.floor('H')
df["date"] = df["calculated_time"].dt.date.astype("date64[pyarrow]")
df['hour'] = df['calculated_time'].dt.hour.astype('i8')
df['floor_time'] = df['calculated_time'].dt.floor('S')
df['Secs_2GB'] = df['Time (sec)'].apply(np.floor, meta=('Secs_2GB', 'i8'))
fn = file_path.split('\\')[-1].split('_')[-2] + '_' + file_path.split('\\')[-1].split('_')[-1].split('.')[-2]
def name(part_idx):
return f'{fn}.parquet'
df = df.set_index('calculated_time', drop=True, sort=False, sorted=True)
df.to_parquet(path=target_directory,
engine='pyarrow',
partition_on=['date', 'hour'],
coerce_timestamps='us',
allow_truncated_timestamps=True,
name_function=name,
write_index=True,
append=False)
del df
else:
warnings.warn("tmp_hk empty or not existing")
else:
raise ValueError("No CSV files found.")
# %% Functions to read sp2b files
@delayed
def read_sp2b_from_sp2xr_zipped_2(file_path, meta, target_directory):
labview_epoch = datetime.datetime(1904, 1, 1)
results = []
'''empty_file = pd.DataFrame(columns=['ch0', 'ch1', 'flag', 'time stamp',
'res1', 'event index', 'Time/10000',
'Time remainder', 'res5', 'res6',
'res7', 'res8', 'array_2_dim', 'array_2', 'Time new'])'''
empty_file = pd.DataFrame({'ch0': pd.Series(dtype='int'),
'ch1': pd.Series(dtype='int'),
'flag': pd.Series(dtype='int'),
'time stamp': pd.Series(dtype='int'),
'res1': pd.Series(dtype='int'),
'event index': pd.Series(dtype='int'),
'Time/10000': pd.Series(dtype='int'),
'Time remainder': pd.Series(dtype='int'),
'res5': pd.Series(dtype='int'),
'res6': pd.Series(dtype='int'),
'res7': pd.Series(dtype='int'),
'res8': pd.Series(dtype='int'),
'array_2_dim': pd.Series(dtype='int'),
'array_2': pd.Series(dtype='int'),
'Time new': pd.Series(dtype='int'),
})
def process_block(f, file_size):
while f.tell() < file_size:
initial_pos = f.tell()
size_2d_array = struct.unpack("> 2i", f.read(8))
if size_2d_array[0] == 0:
# End of data marker found
break
expected_data_size = size_2d_array[0] * size_2d_array[1] * 4
if f.tell() + expected_data_size + 2 + 7*4 + 2*8 + 4 > file_size:
# Not enough data left for the expected block size, indicating incomplete block
break
# If sufficient data is present, proceed to read and unpack the block as before
array_data = struct.unpack(f"> {size_2d_array[1] * size_2d_array[0]}i", f.read(expected_data_size))
event_df = pd.DataFrame({'ch0': [array_data[::2]], 'ch1': [array_data[1::2]]})
event_df[['flag']] = struct.unpack("> H", f.read(2))
time_stamp, res1, event_index, Time_div10000, Time_remainder, res5, res6 = struct.unpack("> 7f", f.read(7 * 4))
event_df[['time stamp', 'res1', 'event index', 'Time/10000', 'Time remainder', 'res5', 'res6']] = time_stamp, res1, event_index, Time_div10000, Time_remainder, res5, res6
event_df[['res7', 'res8']] = struct.unpack("> 2d", f.read(2 * 8))
array2_dim = struct.unpack("> i", f.read(4))[0]
event_df['array_2_dim'] = array2_dim
array_data_2 = [struct.unpack(f"< {array2_dim}i", f.read(array2_dim * 4))]
event_df['array_2'] = [array_data_2]
event_df['Time new'] = labview_epoch + datetime.timedelta(seconds=Time_div10000*10000+Time_remainder)
results.append(event_df)
if file_path[-3:] == 'zip':
try:
with zipfile.ZipFile(file_path, 'r') as zip_ref:
for file_name in zip_ref.namelist():
with zip_ref.open(file_name, 'r') as f:
file_size = zip_ref.getinfo(file_name).file_size
if file_size > 100000:
process_block(f, file_size)
else:
print(f'{file_path} is too small')
results.append(empty_file)
except zipfile.BadZipFile:
print(f'!! Bad file: {file_path}')
results.append(empty_file)
elif file_path[-3:] == 'p2b':
file_size = os.path.getsize(file_path)
if file_size > 100:
with open(file_path, 'rb') as f:
process_block(f, file_size)
else:
results.append(empty_file)
results = pd.concat(results, axis=0)
if not results.empty:
# results = pd.concat(results, axis=0)
ch0 = pd.DataFrame(results['ch0'].to_list(), index=results['Time new'])
ch1 = pd.DataFrame(results['ch1'].to_list(), index=results['Time new'])
orig_time_stamp = pd.DataFrame(results['time stamp'].to_list(), index=results['Time new'], columns=['orig_time_stamp'])
if 8182 in ch0:
wrong_lines = ch0[ch0.loc[:, 8182].notna()].index
ch0.drop(wrong_lines, inplace=True)
ch0.dropna(axis=1, how='all', inplace=True)
ch1.drop(wrong_lines, inplace=True)
ch1.dropna(axis=1, how='all', inplace=True)
orig_time_stamp.drop(wrong_lines, inplace=True)
orig_time_stamp.dropna(axis=1, how='all', inplace=True)
ch_all = pd.merge(ch0, ch1, how="outer", suffixes=("_ch0", "_ch1"), left_index=True, right_index=True)
ch_all['path'] = file_path
ch_all2 = pd.merge(ch_all, orig_time_stamp, left_index=True, right_index=True, how="outer")
#ch_all['orig_time_stamp'] = orig_time_stamp#.loc[ch_all.index]
#return ch_all2 #dd.from_pandas(ch_all2, npartitions=1)
if not ch_all2.empty:
ch_all2 = dd.from_pandas(ch_all2, npartitions=1)
ch_all2['Time'] = ch_all2.index.map_partitions(lambda index: index, meta=('Time', 'datetime64[ns]'))
ch_all2['Time_round'] = ch_all2['Time'].dt.round('min').astype('datetime64[ns]')
#combined_ddf['date'] = combined_ddf.index.map_partitions(lambda index: index.normalize(), meta=('date', 'datetime64[ns]'))
# combined_ddf['hour'] = combined_ddf.index.map_partitions(lambda index: index.hour, meta=('hour', 'i8'))
ch_all2["date"] = ch_all2["Time"].dt.date.astype("date64[pyarrow]")
ch_all2['hour'] = ch_all2['Time'].dt.hour.astype('i8')
def extract_folder_name(df):
return df['path'].split('\\')[-1].split('_')[-2]
ch_all2['folder_name'] = ch_all2.apply(extract_folder_name, axis=1, meta=('folder_name', 'str'))
def extract_file_name(df):
return df['path'].split('\\')[-1].split('_')[-2] + '_' + df['path'].split('\\')[-1].split('_')[-1].split('.')[-2]
ch_all2['file'] = ch_all2.apply(extract_file_name, axis=1, meta=('file', 'str'))
fn = file_path.split('\\')[-1].split('_')[-2] + '_' + file_path.split('\\')[-1].split('_')[-1].split('.')[-2]
def name(part_idx):
return f'{fn}.parquet'
ch_all2.to_parquet(path=target_directory,
engine='pyarrow',
partition_on=['date', 'hour'],
name_function=name,
write_index=True,
append=False, schema='infer')
else:
return meta
def read_sp2b_from_sp2xr_zipped(file_path, meta):
labview_epoch = datetime.datetime(1904, 1, 1)
results = []
'''empty_file = pd.DataFrame(columns=['ch0', 'ch1', 'flag', 'time stamp',
'res1', 'event index', 'Time/10000',
'Time remainder', 'res5', 'res6',
'res7', 'res8', 'array_2_dim', 'array_2', 'Time new'])'''
empty_file = pd.DataFrame({'ch0': pd.Series(dtype='int'),
'ch1': pd.Series(dtype='int'),
'flag': pd.Series(dtype='int'),
'time stamp': pd.Series(dtype='int'),
'res1': pd.Series(dtype='int'),
'event index': pd.Series(dtype='int'),
'Time/10000': pd.Series(dtype='int'),
'Time remainder': pd.Series(dtype='int'),
'res5': pd.Series(dtype='int'),
'res6': pd.Series(dtype='int'),
'res7': pd.Series(dtype='int'),
'res8': pd.Series(dtype='int'),
'array_2_dim': pd.Series(dtype='int'),
'array_2': pd.Series(dtype='int'),
'Time new': pd.Series(dtype='int'),
})
def process_block(f, file_size):
while f.tell() < file_size:
initial_pos = f.tell()
size_2d_array = struct.unpack("> 2i", f.read(8))
if size_2d_array[0] == 0:
# End of data marker found
break
expected_data_size = size_2d_array[0] * size_2d_array[1] * 4
if f.tell() + expected_data_size + 2 + 7*4 + 2*8 + 4 > file_size:
# Not enough data left for the expected block size, indicating incomplete block
break
# If sufficient data is present, proceed to read and unpack the block as before
array_data = struct.unpack(f"> {size_2d_array[1] * size_2d_array[0]}i", f.read(expected_data_size))
event_df = pd.DataFrame({'ch0': [array_data[::2]], 'ch1': [array_data[1::2]]})
event_df[['flag']] = struct.unpack("> H", f.read(2))
time_stamp, res1, event_index, Time_div10000, Time_remainder, res5, res6 = struct.unpack("> 7f", f.read(7 * 4))
event_df[['time stamp', 'res1', 'event index', 'Time/10000', 'Time remainder', 'res5', 'res6']] = time_stamp, res1, event_index, Time_div10000, Time_remainder, res5, res6
event_df[['res7', 'res8']] = struct.unpack("> 2d", f.read(2 * 8))
array2_dim = struct.unpack("> i", f.read(4))[0]
event_df['array_2_dim'] = array2_dim
array_data_2 = [struct.unpack(f"< {array2_dim}i", f.read(array2_dim * 4))]
event_df['array_2'] = [array_data_2]
event_df['Time new'] = labview_epoch + datetime.timedelta(seconds=Time_div10000*10000+Time_remainder)
results.append(event_df)
if file_path[-3:] == 'zip':
try:
with zipfile.ZipFile(file_path, 'r') as zip_ref:
for file_name in zip_ref.namelist():
with zip_ref.open(file_name, 'r') as f:
file_size = zip_ref.getinfo(file_name).file_size
if file_size > 100:
process_block(f, file_size)
else:
results.append(empty_file)
except zipfile.BadZipFile:
print(f'!! Bad file: {file_path}')
results.append(empty_file)
elif file_path[-3:] == 'p2b':
file_size = os.path.getsize(file_path)
if file_size > 100:
with open(file_path, 'rb') as f:
process_block(f, file_size)
else:
results.append(empty_file)
results = pd.concat(results, axis=0)
if not results.empty:
# results = pd.concat(results, axis=0)
ch0 = pd.DataFrame(results['ch0'].to_list(), index=results['Time new'])
ch1 = pd.DataFrame(results['ch1'].to_list(), index=results['Time new'])
orig_time_stamp = pd.DataFrame(results['time stamp'].to_list(), index=results['Time new'], columns=['orig_time_stamp'])
if 8182 in ch0:
wrong_lines = ch0[ch0.loc[:, 8182].notna()].index
ch0.drop(wrong_lines, inplace=True)
ch0.dropna(axis=1, how='all', inplace=True)
ch1.drop(wrong_lines, inplace=True)
ch1.dropna(axis=1, how='all', inplace=True)
orig_time_stamp.drop(wrong_lines, inplace=True)
orig_time_stamp.dropna(axis=1, how='all', inplace=True)
ch_all = pd.merge(ch0, ch1, how="outer", suffixes=("_ch0", "_ch1"), left_index=True, right_index=True)
ch_all['path'] = file_path
ch_all2 = pd.merge(ch_all, orig_time_stamp, left_index=True, right_index=True, how="outer")
#ch_all['orig_time_stamp'] = orig_time_stamp#.loc[ch_all.index]
return ch_all2 #dd.from_pandas(ch_all2, npartitions=1)
else:
return meta
def read_and_process_sp2b(matches, target_directory, meta_file):
if len(matches) > 0:
delayed_results = [delayed(read_sp2b_from_sp2xr_zipped)(file, meta_file) for file in matches]
combined_ddf = dd.from_delayed(delayed_results)#.persist()
# ddf = [read_sp2b_from_sp2xr_zipped(file, meta_file) for file in matches]
# combined_ddf = dd.concat(ddf)
if combined_ddf.npartitions>0:
combined_ddf['Time'] = combined_ddf.index.map_partitions(lambda index: index, meta=('Time', 'datetime64[ns]'))
combined_ddf['Time_round'] = combined_ddf['Time'].dt.round('min').astype('datetime64[ns]')
#combined_ddf['date'] = combined_ddf.index.map_partitions(lambda index: index.normalize(), meta=('date', 'datetime64[ns]'))
# combined_ddf['hour'] = combined_ddf.index.map_partitions(lambda index: index.hour, meta=('hour', 'i8'))
combined_ddf["date"] = combined_ddf["Time"].dt.date.astype("date64[pyarrow]")
combined_ddf['hour'] = combined_ddf['Time'].dt.hour.astype('i8')
# combined_ddf.to_parquet(path=target_directory,
# engine='pyarrow',
# partition_on=['date','hour'],
# #save_divisons=True,
# append=True,
# write_index=True)
def extract_folder_name(df):
return df['path'].split('\\')[-1].split('_')[-2]
combined_ddf['folder_name'] = combined_ddf.apply(extract_folder_name, axis=1, meta=('folder_name', 'str'))
def extract_file_name(df):
return df['path'].split('\\')[-1].split('_')[-2] + '_' + df['path'].split('\\')[-1].split('_')[-1].split('.')[-2]
combined_ddf['file'] = combined_ddf.apply(extract_file_name, axis=1, meta=('file', 'str'))
# def extract_first_element(df):
# return df['orig_file_name2'].head(1)
# first_elements = combined_ddf.map_partitions(extract_first_element).compute().to_list() # !!! can i remove this compute?
# def name_function(part_idx):
# return f"{first_elements[part_idx]}.parquet"
fn = file_path.split('\\')[-1].split('_')[-2] + '_' + file_path.split('\\')[-1].split('_')[-1].split('.')[-2]
def name(part_idx):
return f'{fn}.parquet'
combined_ddf.to_parquet(path=target_directory,
engine='pyarrow',
partition_on=['date', 'hour'],
name_function=name,
write_index=True,
append=False, schema='infer')
return combined_ddf
return None
return None
def process_sp2b_parquet(ddf_sp2xr,
scatt_sat_threshold = 1e9,
inc_sat_threshold = 1e8,
scatt_noise_threshold = 1e4,
end_bkgr_ch0 = 100,
end_bkgr_ch1 = 350,
output_path = ''
):
timestr = time.strftime("%Y%m%d-%H%M%S")
ch0_columns = [col for col in ddf_sp2xr.columns if '_ch0' in col]
ch0_sp2xr = ddf_sp2xr[ch0_columns]
ch0_sp2xr = ch0_sp2xr.rename(columns=lambda col: int(col.replace('_ch0', '')))
#ch0_sp2xr.index = ddf_sp2xr['Time new']
ch0_sp2xr = ch0_sp2xr.sub(ch0_sp2xr.loc[:, :end_bkgr_ch0].mean(axis=1), axis=0)
ch1_columns = [col for col in ddf_sp2xr.columns if '_ch1' in col]
ch1_sp2xr = ddf_sp2xr[ch1_columns]
ch1_sp2xr = ch1_sp2xr.rename(columns=lambda col: int(col.replace('_ch1', '')))
#ch1_sp2xr.index = ddf_sp2xr['Time new']
ch1_sp2xr = ch1_sp2xr.sub(ch1_sp2xr.loc[:, :end_bkgr_ch1].mean(axis=1), axis=0)
ch0_sp2xr_array = ch0_sp2xr.values
ch1_sp2xr_array = ch1_sp2xr.values
max_values_ch0 = ch0_sp2xr_array.max(axis=1)
max_values_ch1 = ch1_sp2xr_array.max(axis=1)
max_indices_ch0 = ch0_sp2xr_array.argmax(axis=1) # location of first occurence is returned if maximum value is repeated
max_indices_ch1 = ch1_sp2xr_array.argmax(axis=1)
flag_000 = (max_indices_ch1 > 0)
flag_111 = (max_values_ch0 < scatt_sat_threshold)
flag_222 = (max_values_ch1 < inc_sat_threshold)
flag_333 = (max_indices_ch1 >= max_indices_ch0)
array_test = ch0_sp2xr_array
sub_test = max_indices_ch1[:,None]
mask = sub_test>np.arange(array_test.shape[1])
array_test[~mask]=-1e3
max_values_ch0_modified = array_test.max(axis=1)
max_indices_ch0_modified = array_test.argmax(axis=1)
flag_444 = np.logical_or(flag_333, (max_values_ch0_modified > scatt_noise_threshold))
time_lag = max_indices_ch1 - max_indices_ch0
time_lag_modified = max_indices_ch1 - max_indices_ch0_modified
to_concat = [ddf_sp2xr.index.values, max_values_ch0, max_values_ch1,
max_indices_ch0, max_indices_ch1,
max_values_ch0_modified, max_indices_ch0_modified,
flag_000, flag_111, flag_222, flag_333, flag_444,
time_lag, time_lag_modified]
ddf = dd.concat([dd.from_dask_array(c) for c in to_concat], axis=1)
#ddf.index = ddf_sp2xr['Time new']
ddf.columns = ['Time new', 'max_values_ch0', 'max_values_ch1',
'max_indices_ch0', 'max_indices_ch1',
'max_values_ch0_modified', 'max_indices_ch0_modified',
'flag_000', 'flag_111', 'flag_222', 'flag_333', 'flag_444',
'time_lag', 'time_lag_modified']
ddf["date"] = ddf['Time new'].dt.date.astype("date64[pyarrow]")
ddf['hour'] = ddf['Time new'].dt.hour.astype('i8')
metadata = {
'author': 'Barbara Bertozzi',
'created': timestr,
'scatt_sat_threshold': str(scatt_sat_threshold),
'inc_sat_threshold': str(inc_sat_threshold),
'scatt_noise_threshold': str(scatt_noise_threshold),
'end_bkgr_ch0': str(end_bkgr_ch0),
'end_bkgr_ch1': str(end_bkgr_ch1)
}
ddf.to_parquet(path=output_path,
partition_on=['date', 'hour'],
engine='pyarrow', write_index=True, custom_metadata=metadata, write_metadata_file=True)
return ddf
# %% Functions for BC calibration materials and conversions (mass to diam, diam to mass, ...)
#%% Convertions mass to D , N to mass
def mass2meqDiam(mass_array, rho=1800):
"""
Calculate mass equivalent diameters for the input array of mass values.
inputs: mass_array values in fg, particle density in kg m-3 (rho)
outputs: array of mass equivalent diameters in nm
"""
Dp = np.array([1e9*(6*m*1e-18/rho/np.pi)**(1/3) for m in mass_array])
return Dp
def dNdlogDp_to_dMdlogDp(N, Dp, rho=1800):
''' This is from functions.py from Rob'''
"""
Given a normalized number size distribution [dN/dlogDp, cm^-3] defined over the diameters given by Dp [nm]
return the corresponding the mass size distribution [dM/dlogDp, ug/m3]
Optional density input given in units of kg/m3
"""
#M = N*((np.pi*(Dp*1e-9)**3)/6)*rho*1e18
M = N * np.pi / 6 * rho * (Dp**3) * 1e-12
return M
def BC_mass_to_diam(mass, rho_eff=1800, BC_type=''):
'''
Thi function calculates the effective density and mobility (volume equivalent?) diameter
for BC calibration materials applying different parameterizations.
Parameters
----------
mass : float or array of floats
BC mass in fg.
rho_eff : float, optional
Effective density [kg/m3] to use in case BC_type='constant_mobility_density'. The default is 1800.
BC_type : string, optional
Identification string for the effective density parameterization to use. The default is ''.
Raises
------
ValueError
If the string provided in BC_type is not inlcuded in the list below an error is raised.
Returns
-------
density : float of array of floats
Effective density [kg/m3] corresponding to the input mass(es).
diam : float or array of floats
Diameter [nm] corresponding to the mass and effective density.
'''
if BC_type == 'constant_effective_density':
density = np.zeros_like(mass)+rho_eff
diam = 1e3*((6*mass)/(np.pi*rho_eff))**(1/3)
elif BC_type == 'Aquadag_RCAST_2009':
SplineCoefM2D, nSplineSegmM2D = Aquadag_RhoVsLogMspline_Var25(SplineCoef=[])
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
elif BC_type == 'Aquadag_Moteki_AST2010':
SplineCoefM2D, nSplineSegmM2D = Aquadag_RhoVsLogMspline_Var25(SplineCoef=[]) # this line only loads the coefficients from Moteki 2010
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
elif BC_type == 'Aquadag_PSI_2010':
SplineCoefM2D, nSplineSegmM2D = Aquadag_RhoVsLogMspline_Var8(SplineCoef=[])
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
elif BC_type == 'FS_RCAST_2009':
SplineCoefM2D, nSplineSegmM2D = Fullerene_RhoVsLogMspline_Var2(SplineCoef=[])
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
elif BC_type == 'FS_Moteki_2010':
SplineCoefM2D, nSplineSegmM2D = Fullerene_RhoVsLogMspline_Var5(SplineCoef=[])
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
elif BC_type == 'FS_PSI_2010':
SplineCoefM2D, nSplineSegmM2D = Fullerene_RhoVsLogMspline_Var8(SplineCoef=[])
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
elif BC_type == 'FS_PSI_2010_old':
SplineCoefM2D, nSplineSegmM2D = Fullerene_RhoVsLogMspline_Var8_old(SplineCoef=[])
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
elif BC_type == 'Glassy_Carbon':
diam = GlassyCarbonAlpha_Mass2Diam(mass)
else:
raise ValueError("Calibration material not defined")
diam = 1e3*(6*mass/np.pi/density)**(1/3) # 1e3: unit conversion
return density, diam
def BC_diam_to_mass(diam, rho_eff=1800, BC_type=''):
if BC_type == '': # pers comm Kondo 2009, spline fit
SplineCoefD2M, nSplineSegmD2M = Fullerene_RhoVsLogDspline_Var2(SplineCoef=[])
density = SP2_calibCurveSpline(diam, SplineCoefD2M)
elif BC_type == 'FS_Moteki_2010': # Moteki et al., AST, 2010
SplineCoefD2M, nSplineSegmD2M = Fullerene_RhoVsLogDspline_Var5(SplineCoef=[])
density = SP2_calibCurveSpline(diam, SplineCoefD2M)
elif BC_type == 'FS_PSI_2010':
SplineCoefD2M, nSplineSegmD2M = Fullerene_RhoVsLogDspline_Var8(SplineCoef=[])
density = SP2_calibCurveSpline(diam, SplineCoefD2M)
elif BC_type == 'FS_PSI_2010_old':
SplineCoefD2M, nSplineSegmD2M = Fullerene_RhoVsLogDspline_Var8_old(SplineCoef=[])
density = SP2_calibCurveSpline(diam, SplineCoefD2M)
else:
raise ValueError("Calibration material not defined")
mass = 1e-9 * density * np.pi / 6 * np.array(diam)**3
return density, mass
def SP2_calibCurveSpline(mass, SplineCoef):
'''
Given the mass and the spline coefficients list, it calculates the corresponding density.
Parameters
----------
mass : float or array of floats
BC mass [fg].
SplineCoefM2D : list of floats
Coefficients and lowercuts of th espline to be used.
Returns
-------
values : float or array of floats
BC effective density [kg/m3] corresponding to the mass and specific parameterization.
'''
coefficients_array = np.array(SplineCoef)
segments = np.delete(coefficients_array, np.arange(3, len(coefficients_array), 4))
segments_2 = np.array([segments[i:i+3] for i in range(0, len(segments), 3)])[::-1]
lowercuts = coefficients_array[3::4]
sorted_lowercuts = np.sort(np.append(lowercuts, [np.log10(mass[0]), np.log10(mass[-1])]))
values = np.zeros_like(mass) # Initialize an array to store interpolated values
for i in range(len(sorted_lowercuts) - 1):
if i == (len(sorted_lowercuts)-2):
mask = (np.log10(mass) >= sorted_lowercuts[i]) & (np.log10(mass) <= sorted_lowercuts[i + 1])
else:
mask = (np.log10(mass) >= sorted_lowercuts[i]) & (np.log10(mass) < sorted_lowercuts[i + 1])
if np.any(mask):
poly = Polynomial(segments_2[i])
# values[mask] = poly(np.log10(mass[mask]))
values[mask] = poly(np.log10(np.array(mass)[mask]))
return values
def Aquadag_RhoVsLogMspline_Var25(SplineCoef=[]):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
SplineCoef.append(1392.54)
SplineCoef.append(-618.898)
SplineCoef.append(104.364)
SplineCoef.append(1.87)
SplineCoef.append(689)
SplineCoef.append(133.555)
SplineCoef.append(-96.8268)
SplineCoef.append(0.64)
SplineCoef.append(785.701)
SplineCoef.append(-168.635)
SplineCoef.append(139.258)
return SplineCoef, int(SP2_calibCurveSplineCheck(SplineCoef))#SP2_calibCurveSplineCheck(SplineCoef)
def Aquadag_RhoVsLogMspline_Var8(SplineCoef=[]):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
SplineCoef.append(1662.41)
SplineCoef.append(-1087.41)
SplineCoef.append(214.636)
SplineCoef.append(1.7)
SplineCoef.append(689.745)
SplineCoef.append(56.897)
SplineCoef.append(-121.925)
SplineCoef.append(0.33)
SplineCoef.append(720.248)
SplineCoef.append(-127.967)
SplineCoef.append(158.172)
SplineCoef.append(-1.23)
SplineCoef.append(299.662)
SplineCoef.append(-811.846)
SplineCoef.append(-119.828)
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
def Fullerene_RhoVsLogMspline_Var2(SplineCoef=[]):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
SplineCoef.append(1180.1)
SplineCoef.append(-540.043)
SplineCoef.append(106.852)
SplineCoef.append(1.8)
SplineCoef.append(713.102)
SplineCoef.append(-21.1519)
SplineCoef.append(-37.2841)
SplineCoef.append(0.55)
SplineCoef.append(916.24)
SplineCoef.append(-759.835)
SplineCoef.append(634.246)
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
def Fullerene_RhoVsLogMspline_Var5(SplineCoef=[]):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
SplineCoef.append(1539.19)
SplineCoef.append(-904.484)
SplineCoef.append(189.201)
SplineCoef.append(1.83)
SplineCoef.append(503.999)
SplineCoef.append(226.877)
SplineCoef.append(-119.914)
SplineCoef.append(0.8)
SplineCoef.append(718.903)
SplineCoef.append(-310.384)
SplineCoef.append(215.874)
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
def Fullerene_RhoVsLogMspline_Var8(SplineCoef=[]):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
SplineCoef.append(62.7393)
SplineCoef.append(781.804)
SplineCoef.append(-325.113)
SplineCoef.append(1.6)
SplineCoef.append(652.935)
SplineCoef.append(44.0588)
SplineCoef.append(-94.5682)
SplineCoef.append(0.6)
SplineCoef.append(750.496)
SplineCoef.append(-281.145)
SplineCoef.append(176.435)
SplineCoef.append(-0.7)
SplineCoef.append(573.019)
SplineCoef.append(-788.222)
SplineCoef.append(-185.763)
SplineCoef.append(-1.7)
SplineCoef.append(1016.31)
SplineCoef.append(-266.707)
SplineCoef.append(-32.3765)
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
def Fullerene_RhoVsLogMspline_Var8_old(SplineCoef=[]):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
SplineCoef.append(984.914)
SplineCoef.append(-468.01)
SplineCoef.append(101)
SplineCoef.append(1.19)
SplineCoef.append(568.667)
SplineCoef.append(231.565)
SplineCoef.append(-192.939)
SplineCoef.append(0.715)
SplineCoef.append(747.715)
SplineCoef.append(-269.27)
SplineCoef.append(157.295)
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
def GlassyCarbonAlpha_Mass2Diam(mass):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
diam = 1e3* ( 6 * mass / np.pi / 1420 )**(1/3)
return diam
def Fullerene_RhoVsLogDspline_Var2(SplineCoef=[]):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
# units: rho [kg/m³]; diameter [nm]
# experimental mobility/mass data taken from: personal communication with the Kondo research group on 04/09/2009;
SplineCoef.append(8799.39)
SplineCoef.append(-5478.89)
SplineCoef.append(904.097)
SplineCoef.append(2.8)
SplineCoef.append(-115.387)
SplineCoef.append(888.81)
SplineCoef.append(-232.992)
SplineCoef.append(2.35)
SplineCoef.append(17587.6)
SplineCoef.append(-14177.6)
SplineCoef.append(2972.62)
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
def Fullerene_RhoVsLogDspline_Var5(SplineCoef=[]):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
# units: rho [kg/m³]; diameter [nm]
# note: experimental mobility/mass data taken from: Moteki et al., Aerosol Sci. Technol., 44, 663-675, 2010;
SplineCoef.append(13295.3)
SplineCoef.append(-8550.29)
SplineCoef.append(1423.82)
SplineCoef.append(2.8)
SplineCoef.append(-4979.54)
SplineCoef.append(4503.18)
SplineCoef.append(-907.154)
SplineCoef.append(2.45)
SplineCoef.append(8500)
SplineCoef.append(-6500.53)
SplineCoef.append(1338.5)
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
def Fullerene_RhoVsLogDspline_Var8(SplineCoef):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
# units: rho [kg/m³]; diameter [nm]
# note: experimental mobility/mass data taken from: PSI data for fullerene soot sample obtained from Shuka Schwarz, measured at ETH in Oct. 2010;
SplineCoef.append(-2281.86)
SplineCoef.append(2687.72)
SplineCoef.append(-613.973)
SplineCoef.append(2.735)
SplineCoef.append(-1987.85)
SplineCoef.append(2472.72)
SplineCoef.append(-574.667)
SplineCoef.append(2.35)
SplineCoef.append(7319.9)
SplineCoef.append(-5448.76)
SplineCoef.append(1110.76)
SplineCoef.append(1.85)
SplineCoef.append(-520.822)
SplineCoef.append(3027.69)
SplineCoef.append(-1180.18)
SplineCoef.append(1.48)
SplineCoef.append(1421.03)
SplineCoef.append(403.564)
SplineCoef.append(-293.649)
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
def Fullerene_RhoVsLogDspline_Var8_old(SplineCoef):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
# units: rho [kg/m³]; diameter [nm]
# note: experimental mobility/mass data taken from: PSI data for fullerene soot sample obtained from Shuka Schwarz, measured at ETH in Oct. 2010;
SplineCoef.append(6744.81)
SplineCoef.append(-4200.82)
SplineCoef.append(700)
SplineCoef.append(2.57)
SplineCoef.append(-6902.08)
SplineCoef.append(6419.32)
SplineCoef.append(-1366.18)
SplineCoef.append(2.4)
SplineCoef.append(7288.07)
SplineCoef.append(-5405.8)
SplineCoef.append(1097.39)
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
def Aquadag_RhoVsLogDspline_Var25(SplineCoef):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
# units: rho [kg/m³]; diameter [nm]
# note: experimental mobility/mass data from: Moteki et al., Aerosol Sci. Technol., 44, 663-675, 2010 (and they agree with pers. comm. from 04/09/2009)
SplineCoef.append(10000)
SplineCoef.append(-6088.15)
SplineCoef.append(974.717)
SplineCoef.append(2.8)
SplineCoef.append(-3200)
SplineCoef.append(3340.42)
SplineCoef.append(-708.956)
SplineCoef.append(2.35)
SplineCoef.append(6490.76)
SplineCoef.append(-4907.03)
SplineCoef.append(1045.82)
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
def Aquadag_RhoVsLogDspline_Var8(SplineCoef):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
# units: rho [kg/m³]; diameter [nm]
# note: experimental mobility/mass data taken from: "grand average" of PSI's (APM at ETH in Oct. 2010) and DMT's (Subu; CPMA in 2011 by XXXX) data for classic and new aquadag batches;
SplineCoef.append(10092.9)
SplineCoef.append(-6171.83)
SplineCoef.append(970.172)
SplineCoef.append(2.75)
SplineCoef.append(-2627.45)
SplineCoef.append(3079.33)
SplineCoef.append(-711.856)
SplineCoef.append(2.25)
SplineCoef.append(6140.89)
SplineCoef.append(-4714.75)
SplineCoef.append(1020.16)
SplineCoef.append(1.65)
SplineCoef.append(1063.19)
SplineCoef.append(1440.04)
SplineCoef.append(-844.928)
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
def Aquadag_RhoVsLogDspline_Var8_old(SplineCoef):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
# units: rho [kg/m³]; diameter [nm]
# note: experimental mobility/mass data taken from: "grand average" of PSI's (APM at ETH in Oct. 2010) and DMT's (Subu; CPMA in 2011 by XXXX) data for classic and new aquadag batches;
SplineCoef[0]=7900.57
SplineCoef[1]=-4964.2
SplineCoef[2]=830.317
SplineCoef[3]=2.6
SplineCoef[4]=-5371.15
SplineCoef[5]=5244.82
SplineCoef[6]=-1132.96
SplineCoef[7]=2.35
SplineCoef[8]=5809.57
SplineCoef[9]=-4270.69
SplineCoef[10]=891.62
SplineCoef[11]=1.6
SplineCoef[12]=904.355
SplineCoef[13]=1860.83
SplineCoef[14]=-1024.48
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
def GlassyCarbonAlpha_Diam2Mass(DiamMob):
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
# This function calculates the mass of a glassy carbon particle of a given mobility diameter.
# exact description: Glassy carbon spherical powder, 0.4-12 micron, type 2, Alpha Aesar Art. No. 38008
# note: the bulk density is taken from the supplier's data sheet and an email by Ronald Becht (Technical service, Alfa Aesar)
kGlassyCarbonAlphaRho = 1420 # kg/m3
mass = kGlassyCarbonAlphaRho * np.pi / 6 * DiamMob**3 * 1e-9
return mass
def SP2_calibCurveSplineCheck(CalibCoef):
# this function comes from the sp2 toolkit
# This function checks the dimension of the calibration coefficient wave.
nCoef = len(CalibCoef)
if np.mod(nCoef, 4) != 3:
raise ValueError("The number of coefficients handed over to the function must be 3,7,11, or ...!")
return np.ceil(nCoef/4)
# %% Function for calibration (part B)
def MultiChargeSizesEqualMob(diam, ne, TC=20, press=1013):
# This function returns the size of a particle carrying multiple charges but having the same electrical mobility as a singly charged particle with diameter diam.
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
# diam in nm
if ne == 1:
return diam
else:
Zel = DiamToElectrMobility_Tp(1, TC, press, diam)
return ElectrMobilityToDiam_Tp(ne, TC, press, Zel)
def ElectrMobilityToDiam_Tp(ne, TC, press, Zel, precision=0.001, GasType=0):
# This function calculates the particle diameter (units: [nm]) as a function of the number of
# elementary charges, temperature, pressure, and electrical mobility
# T(C), press=mbar, Zel (electrical mobility [m²/Vs]), precision (%), Gas type default air
IterDiam = 100 # initial guess=100 nm diameter
Lambda = MeanFreePathBaron4_6(TC, press, GasType=GasType)
Visc = ViscosityOfGases(GasType, TC)
Cc = CunninghamSlipCorrBaron4_8(Lambda, IterDiam)
prevDiam = 1e9 * ne * 1.602e-19 * Cc / (3 * np.pi * Visc * Zel)
while ((np.abs(IterDiam - prevDiam) / min(IterDiam, prevDiam)) > (precision / 100)):
IterDiam = prevDiam
Cc = CunninghamSlipCorrBaron4_8(Lambda, prevDiam)
prevDiam = 1e9 * ne * 1.602e-19 * Cc / (3 * np.pi * Visc * Zel)
return prevDiam
def MeanFreePathBaron4_6(T, press, GasType=0):
'''
This function calculates the mean free path (units: [nm]) of gas molecules as a function of temperature and pressure.
Source: Baron&Willeke, 2001, eq. 4-6, p.65
martin.gysel@psi.ch; 17/08/2005; 24/05/2016
T: temperature, [°C]
press: pressure, [mbar=hectopascal]
GasType: 0: air //default
1: O2 //likely not yet implemented; see sub-procedure
2: N2 //likely not yet implemented; see sub-procedure
3: Argon //implemented
4: Helium //likely not yet implemented; see sub-procedure
5: CO2 //likely not yet implemented; see sub-procedure
6: CO //likely not yet implemented; see sub-procedure
'''
# unit conversion
kTzero = -273.15 # absolute zero temperature [°C]; source: http://de.wikipedia.org/wiki/Absoluter_Nullpunkt (note remark on 273.16)
T -= kTzero # convert [°C] to [K]
# gas specific settings
# variable Lref,S,Tref,pref
if GasType == 0: # air
Lref = 66.4 # reference lambda at NTP (20 °C, 1013 mbar)
# note: TSI uses 66.5 at NTP
S = 110.4 # Sutherland constant
Tref = 293.15 # reference temperature [K]
pref = 1013 # reference pressure [mbar]
elif GasType == 3: # argon
Lref = 69.4 # reference lambda at NTP (20 °C, 1013 mbar)
S = 141.4 # Sutherland constant
Tref = 293.15 # reference temperature [K]
pref = 1013 # reference pressure [mbar]
# calculate mean free path
Lambda = Lref * (pref / press) * (T / Tref) * (1 + S / Tref) / (1 + S / T)
return Lambda
def CunninghamSlipCorrBaron4_8(Lambda, Diam):
'''
This function calculates the Cunningham slip correction factor for solid particles as a function of the mean free path of the air molecules.
slightly different parameter for oil droplets
Source: Baron, 2001, eq. 4-8, p. 66
Note: That is the parametrisation that I use for the SMPS inversion.
martin.gysel@psi.ch, 17.08.2005
Lambda: mean free path of the air molecules, [nm]
Diam: particle diameter, [nm]
'''
Diam *= 1e-9 # convert from [nm] to [m]
Lambda *= 1e-9 # convert from [nm] to [m]
Kn = 2 * Lambda / Diam
a = 1.142
b = 0.558
g = 0.999
Cc = 1 + Kn * (a + b * np.exp(-g / Kn))
return Cc
def ViscosityOfGases(GasType=0, TC=20):
'''
This function returns the viscosity of a certain gas at a certain temperature TC
return value: viscosity of the specified gas at the temperature TC [kg/m/s]=[Pa*s]
Adapted from IGOR function by martin.gysel@psi.ch; 24/05/2016;
TC : temperature, [°C]
GasType: 0: air //default
1: O2 //likely not yet implemented; see sub-procedure
2: N2 //likely not yet implemented; see sub-procedure
3: Argon //implemented
4: Helium //likely not yet implemented; see sub-procedure
5: CO2 //likely not yet implemented; see sub-procedure
6: CO //likely not yet implemented; see sub-procedure
'''
if GasType == 0: # air
visc = ViscosityOfAirBaron4_10(TC)
elif GasType == 3: # argon
visc = ViscosityOfArgon(TC)
else:
print('Specified gas not yet implemented')
return visc
def ViscosityOfArgon(TC):
'''
This function calculates the dynamic viscosity of argon (units: [kg/m/s]=[Pa*s]) as a function of air temperature.
Source:
Baron&Willeke, 2001, eq. 4-10, p. 66
Reference value and Sutherland constant => B & W p. 65
Adapted from IGOR function by martin.gysel@psi.ch; 24/05/2016
TC : temperature, [°C]
'''
S = 141.4 # Sutherland constant for Argon
TC -= 273.15 # convert [°C] -> [K]
Vref = 2.2292E-5 # reference viscosity of Argon at 293.15 K
Tref = 293.15
visc = Vref * (Tref+S) / (TC+S) * (TC/Tref)**(3/2)
return visc
def ViscosityOfAirBaron4_10(TC):
'''
This function calculates the dynamic viscosity of air (units: [kg/m/s]=[Pa*s]) as a function of air temperature.
Source: Baron&Willeke, 2001, eq. 4-10, p. 66
Function adapted from IGOR function by martin.gysel@psi.ch
mg, 17.08.2005, tested against Ernest Weingartners "VOODOO"
TC : temperature, [°C]
'''
S = 110.4 # Sutherland constant for air
TC += 273.15 # convert [°C] -> [K]
Vref = 1.8325e-5 # reference viscosity at NTP (20 °C, 1013 mbar)
# note: TSI uses 1.8203e-5 at NTP
Tref=293.15
visc = Vref * (Tref+S) / (TC+S) * (TC/Tref)**(3/2)
return visc
def DiamToElectrMobility_Tp(ne, T, press, Diam, GasType=0):
# This function calculates the electrical mobility (units: [m²/Vs]) of a particle as a function of number of
# elementary charges, temperature (C), pressure (mbar), and particle diameter (nm)
# return value: electrical mobility [m²/Vs]
keCharge = 1.6021892E-19 # elementary charge [C]
Lambda = MeanFreePathBaron4_6(T, press, GasType=GasType)
Cc = CunninghamSlipCorrBaron4_8(Lambda,Diam)
Visc = ViscosityOfGases(GasType, T)
Zel = ne * keCharge * Cc / (3 * np.pi * Visc * 1e-9 * Diam)
return Zel
def lognormal(x, *params):
'''
Lognormal function for a generic number of modes
Parameters
----------
x : np array
x-array.
*params : list
[mu1, sigma1, N1, mu2, sigma2, N2, ...] : Parameters of the lognormal distribution.
Returns
-------
Yfit : np array
y-array of the (multi mode) lognormal distribution.
'''
Yfit = np.zeros(len(x))
n_modes = int(len(params) // 3)
for i in np.arange(0, n_modes):
mu, sigma, N = params[i], params[i + n_modes], params[i + 2*n_modes]
Yfit += N / (np.log10(sigma) * np.sqrt(2 * np.pi)) * np.exp(-((np.log10(x / mu)) ** 2) / (2 * np.log10(sigma) ** 2))
return Yfit
def lognorm_fit_peak(data, bin_centers, ini_guess, fit_min_relPeak=1e4, fit_max_relPeak=1e9):
'''
This function fits multiple lognormally distributed curves simultanesouly.
The number of lognormal curves is defined by the shape of the initial guess.
Parameters
----------
data : numpy array
Distribution of peak heights.
bin_centers : list
Locations of the bin centers used to calculate the distribution "data".
ini_guess : list of list
List containing three lists (mu, sigma, N).
The length of the three sublist has to be equal and represent the number of modes to be fitted.
fit_min_relPeak : float, optional
Lower limit of peak height to consider for the fit. The default is 1e4.
fit_max_relPeak : float, optional
Upper limit of peak height to consider for the fit. The default is 1e9.
Returns
-------
popt : TYPE
Fitted parameters.
'''
# Initial guess for fitting parameters:
p0 = ini_guess[0] + ini_guess[1] + ini_guess[2]
# Select only subset of peak heights
mask = (bin_centers>=fit_min_relPeak) & (bin_centers<=fit_max_relPeak)
x = bin_centers[mask]
y = data[mask]
# Perform the fit
popt, _ = curve_fit(lognormal, x, y, p0=p0)
return popt
def polynomial(x, *coefficients):
'''
Function to generate a generic polynomial curve. The number of input coefficicnets define the degree of the polynomial curve
Parameters
----------
x : np array
x-array for the evaluation of the polynomial curve.
*coefficients : array of floats
Coefficients for the polynomial curve. First value is for order 0, and so on.
Returns
-------
result : np array
Evaluation of the polynomial curve at x-array.
'''
result = 0
for i, coef in enumerate(coefficients):
result += coef * (x ** i)
return result
def powerlaw(x, A, B, C):
y = A*(x**B) + C
return y
def calculate_calib_coeff(pbp_data, calib_dict,
size_selection_method='',
calib_material='', rho_eff=1800,
fit=None, fit_p0=[], bounds=[], save_calib_coeff=False,
append_calib_curve={},
save_peak_hist_lognorm_fit_params=False,
do_peak_histogram_plots=False, save_peak_histogram_plots=False,
do_calib_curve_plot=False, save_calib_curve_plot=False,
plot_dir=''):
'''
Parameters
----------
pbp_data : TYPE
DESCRIPTION.
calib_dict : TYPE
DESCRIPTION.
# aerosol_type : TYPE, optional
# DESCRIPTION. The default is ''.
size_selection_method : TYPE, optional
DESCRIPTION. The default is ''.
calib_material : TYPE, optional
DESCRIPTION. The default is ''.
Possible inputs are: constant_effective_density, Aquadag_RCAST_2009, Aquadag_Moteki_AST2010, Aquadag_PSI_2010, FS_RCAST_2009, FS_Moteki_2010, FS_PSI_2010, FS_PSI_2010_old, Glassy Carbon
rho_eff : float, optional
Effective density [kg/m3] to use for calib_material='constant_effective_density'. The default is 1800.
fit : TYPE, optional
DESCRIPTION. The default is None.
fit_p0 : TYPE, optional
DESCRIPTION. The default is [].
bounds : TYPE, optional
DESCRIPTION. The default is [].
save_calib_coeff : TYPE, optional
DESCRIPTION. The default is False.
append_calib_curve : TYPE, optional
DESCRIPTION. The default is {}.
save_peak_hist_lognorm_fit_params : TYPE, optional
DESCRIPTION. The default is False.
do_peak_histogram_plots : TYPE, optional
DESCRIPTION. The default is False.
save_peak_histogram_plots : TYPE, optional
DESCRIPTION. The default is False.
do_calib_curve_plot : TYPE, optional
DESCRIPTION. The default is False.
save_calib_curve_plot : TYPE, optional
DESCRIPTION. The default is False.
plot_dir : TYPE, optional
DESCRIPTION. The default is ''.
Raises
------
ValueError
DESCRIPTION.
Returns
-------
calib_dict : TYPE
DESCRIPTION.
popt : TYPE
DESCRIPTION.
'''
tmp_peak_height_fit = []
tmp_mass_fit = []
tmp_diam_fit = []
color_list = ['C0', 'C1', 'C2', 'C3']
tmp_color = []
# *** CHeck which size selection method is used and make sure the Diam or the Mass are present in the calib_dict
if size_selection_method=='APM':
for key in calib_dict.keys():
if calib_dict[key]['mass'] is None:
raise ValueError(f"Missing mass value for {key}")
elif size_selection_method=='DMA':
for key in calib_dict.keys():
if calib_dict[key]['diam'] is None:
raise ValueError(f"Missing diameter value for {key}")
else:
raise ValueError("You have to select a size selection method first!")
for key in calib_dict.keys():
folder_name = calib_dict[key]['folder_name']
aerosol_type = calib_dict[key]['type']
n_modes = len(calib_dict[key]['mu'])
nbins = calib_dict[key]['nbins']
mass = calib_dict[key]['mass']
diam = calib_dict[key]['diam']
tmp_pbp = pbp_data.query('folder_name==@folder_name').loc[:, 'Incand relPeak']
ini_guess = [calib_dict[key]['mu'], calib_dict[key]['sigma'], calib_dict[key]['N']]
# If not specified by the user select a default range for the histogram:
if len(calib_dict[key]['bins_range'])==0:
min_bin_relPeak = 1e4
max_bin_relPeak = 1e9
else:
min_bin_relPeak = calib_dict[key]['bins_range'][0]
max_bin_relPeak = calib_dict[key]['bins_range'][1]
# If not specified by the user select a default range to fit the histogram:
if len(calib_dict[key]['fit_range'])==0:
fit_min_relPeak = 1e4
fit_max_relPeak = 1e9
else:
fit_min_relPeak = calib_dict[key]['fit_range'][0]
fit_max_relPeak = calib_dict[key]['fit_range'][1]
bin_edges = np.logspace(np.log10(min_bin_relPeak), np.log10(max_bin_relPeak), nbins)
bin_centers = bin_edges[:-1] + np.diff(bin_edges) / 2
# Create the hisotgram:
count, _ = np.histogram(tmp_pbp, bins=bin_edges)
# Fit the histogram with a number of lognormal curves defined by the lenght of the initial guess:
pu = lognorm_fit_peak(count, bin_centers, ini_guess,
fit_min_relPeak=fit_min_relPeak, fit_max_relPeak=fit_max_relPeak)
tmp_peak_height_fit.append(pu[0:n_modes])
ne = np.arange(n_modes)+1
if size_selection_method == 'APM':
mass_list = [mass]*ne
tmp_mass_fit.append(mass_list)
diam_list = BC_mass_to_diam(mass_list, BC_type=calib_material)[1]
tmp_diam_fit.append(diam_list)
elif size_selection_method == 'DMA':
diam_list = [MultiChargeSizesEqualMob(diam, e, TC=20, press=1013) for e in ne]
tmp_diam_fit.append(diam_list)
mass_list = BC_diam_to_mass(diam_list, BC_type=calib_material)[1]
tmp_mass_fit.append(mass_list)
#tmp_diam_fit.append()
tmp_color.append(color_list[:n_modes])
if save_peak_hist_lognorm_fit_params:
calib_dict[key]['peak_height_lognorm_fit'] = pu[0:n_modes]
if size_selection_method == 'APM':
calib_dict[key]['peak_height_corresponding_mass'] = mass_list
calib_dict[key]['peak_height_corresponding_diam'] = diam_list
y_fit = lognormal(bin_centers, *np.ravel(pu))
if do_peak_histogram_plots:
fig, axs = plt.subplots()
if size_selection_method == 'APM':
axs.set_title(f'file:{folder_name} - mass={mass} fg - aerosol:{aerosol_type}')
elif size_selection_method == 'DMA':
axs.set_title(f'file:{folder_name} - diam={diam} nm - aerosol:{aerosol_type}')
sns.histplot(tmp_pbp, stat='frequency', log_scale=(True, False), bins=nbins, color='grey', alpha=0.5)
axst = axs.twinx()
axst.plot(bin_centers, count, lw=2, c='k')
axst.plot(bin_centers[(bin_centers>=fit_min_relPeak) & (bin_centers<=fit_max_relPeak)],
count[(bin_centers>fit_min_relPeak) & (bin_centers<fit_max_relPeak)], c='r', lw=2)
axst.plot(bin_centers, y_fit, c='k', lw=2, ls='--')
for i in np.arange(n_modes):
axs.axvline(pu[i], c='g', lw=2)
axst.set_ylim(0, )
if save_peak_histogram_plots:
if size_selection_method == 'APM':
plt.savefig(plot_dir+f'/hist_plot_mass_{mass}.png', dpi=600)
elif size_selection_method == 'DMA':
plt.savefig(plot_dir+f'/hist_plot_diam_{diam}.png', dpi=600)
tmp_peak_height_fit = np.concatenate( tmp_peak_height_fit, axis=0 )
if len(tmp_mass_fit)>0:
tmp_mass_or_diam_fit = np.concatenate( tmp_mass_fit, axis=0 )
elif len(tmp_diam_fit)>0:
tmp_mass_or_diam_fit = np.concatenate( tmp_diam_fit, axis=0 )
tmp_color = np.concatenate( tmp_color, axis=0 )
if fit == 'polynomial':
popt, _ = curve_fit(polynomial, tmp_peak_height_fit, tmp_mass_or_diam_fit, p0=fit_p0, bounds=bounds)
elif fit == 'powerlaw':
if len(fit_p0) != 3:
print(f'Problem with the initial guess provided for power law fit. Number of coeff should be 3, and not {len(fit_p0)}')
else:
popt, _ = curve_fit(powerlaw, tmp_peak_height_fit, tmp_mass_or_diam_fit, p0=fit_p0, bounds=bounds)
if do_calib_curve_plot:
fig, axs = plt.subplots(layout='tight')
axs.scatter(tmp_peak_height_fit, tmp_mass_or_diam_fit, c=tmp_color, label=f'current calib data ({folder_name})')
axs.set_xscale('log')
axs.set_yscale('log')
axs.grid(which='both')
if len(append_calib_curve)>0:
for key in append_calib_curve:
axs.plot(append_calib_curve[key]['peak'], append_calib_curve[key]['mass'],
label=append_calib_curve[key]['label'], c=append_calib_curve[key]['color'],
ls=append_calib_curve[key]['ls'], marker=append_calib_curve[key]['marker'])
axs.legend()
if fit == 'polynomial':
calib_curve_fit = polynomial(bin_centers, *np.ravel(popt))
elif fit == 'powerlaw':
calib_curve_fit = powerlaw(bin_centers, *np.ravel(popt))
if fit:
axs.plot(bin_centers, calib_curve_fit, lw=2, c='C0')
if save_calib_curve_plot:
plt.savefig(plot_dir+'mass_peakH.png', dpi=600)
return calib_dict, popt
# %% Function to read config file
def read_xr_ini_file(fname):
"""
Read a .ini SP2-XR file, save parameters of interest in a dictionary
"""
params = {}
with open(fname, 'r') as f:
for line in f:
if 'Save Every Nth Particle' in line:
try:
params['SaveRate'] = float(line.split('=')[-1].replace('\n', ''))
except ValueError:
params['SaveRate'] = 1
if ('Incand Transit Time Min' in line) and ('Disqualified' not in line):
params['IncTransitMin'] = float(line.split('=')[-1].replace('\n', ''))
if ('Incand Transit Time Max' in line) and ('Disqualified' not in line):
params['IncTransitMax'] = float(line.split('=')[-1].replace('\n', ''))
if ('Incand FWHM Min' in line) and ('Disqualified' not in line):
params['IncFWHMMin'] = float(line.split('=')[-1].replace('\n', ''))
if ('Incand HWHM Min' in line) and ('Disqualified' not in line):
params['IncHWHMMin'] = float(line.split('=')[-1].replace('\n', ''))
if ('Incand FWHM Max' in line) and ('Disqualified' not in line):
params['IncFWHMMax'] = float(line.split('=')[-1].replace('\n', ''))
if ('Incand HWHM Max' in line) and ('Disqualified' not in line):
params['IncHWHMMax'] = float(line.split('=')[-1].replace('\n', ''))
if ('Incand Inter Particle Time Min' in line) and ('Disqualified' not in line):
params['IncInterTimeMin'] = float(line.split('=')[-1].replace('\n', ''))
if ('Scatter Transit Time Min' in line) and ('Disqualified' not in line):
params['ScattTransitMin'] = float(line.split('=')[-1].replace('\n', ''))
if ('Scatter Transit Time Max' in line) and ('Disqualified' not in line):
params['ScattTransitMax'] = float(line.split('=')[-1].replace('\n', ''))
if ('Scatter FWHM Min' in line) and ('Disqualified' not in line):
params['ScattFWHMMin'] = float(line.split('=')[-1].replace('\n', ''))
if ('Scatter HWHM Min' in line) and ('Disqualified' not in line):
params['ScattHWHMMin'] = float(line.split('=')[-1].replace('\n', ''))
if ('Scatter FWHM Max' in line) and ('Disqualified' not in line):
params['ScattFWHMMax'] = float(line.split('=')[-1].replace('\n', ''))
if ('Scatter HWHM Max' in line) and ('Disqualified' not in line):
params['ScattHWHMMax'] = float(line.split('=')[-1].replace('\n', ''))
if ('Scatter Inter Particle Time Min' in line) and ('Disqualified' not in line):
params['ScattInterTimeMin'] = float(line.split('=')[-1].replace('\n', ''))
return params
#%% Functions to create histograms and distributions
def bin_ctrs_to_lims(x_ctrs):
''' This is from functions.py from Rob'''
"""
Take an array of bin centers and return an array 1 element larger of bin limits
"""
delta = np.diff(x_ctrs)/2
x_lims = np.append(x_ctrs[0]-delta[0], x_ctrs[:-1]+delta)
x_lims = np.append(x_lims, x_ctrs[-1]+delta[-1])
return x_lims
def bin_lims_to_ctrs(x_lims):
"""
Take an array of bin limits and return an array 1 element smaller of bin centers
"""
x_ctrs = (x_lims[1:] + x_lims[:-1])/2
return x_ctrs
def get_dlogp(Dp):
''' This is from functions.py from Rob'''
"""
Given an array of diameters Dp calculate an array of the log of the difference dlogdp
"""
dlogdp = np.log10(bin_ctrs_to_lims(Dp)[1:]) - np.log10(bin_ctrs_to_lims(Dp)[:-1])
return dlogdp
def counts2numConc(numb, flow, t=1):
''' This is from functions.py from Rob'''
"""
Calculate number concentration from counts (cnts), flow rate in [ccm] (Q) and sampling time in [secs] (t)
Return numConc with units of [cm^-3]
"""
conc = numb.divide(flow * (t/60), axis=0)
return conc
def calculate_histogram(series, bin_lims=np.logspace(np.log10(0.3), np.log10(400), 50)):
if series.dropna().empty:
# Return zeros for counts if the series is empty
counts = np.full(len(bin_lims) - 1, np.nan, dtype=float)
else:
counts, _ = np.histogram(series.dropna(), bins=bin_lims)
return counts.tolist()
def process_hist_and_dist(df, col, flag_col, flag_value, bin_lims, bin_ctrs, dt_str, calculate_conc=True, flow=None, rho_eff=1800, BC_type='', t=1):
# Apply condition if provided
if flag_col and flag_value is not None:
df_filtered = df.loc[df[flag_col] == flag_value]
else:
df_filtered = df
if flag_col is not None and 'cnts' in flag_col:
flag_col = flag_col[5:]
# Resample and calculate histogram
ddf_hist_compact = df_filtered[col].resample(dt_str).agg(
{'result': lambda x: calculate_histogram(x, bin_lims=bin_lims)}) # I might want to add a .fillna(0) here, this would solve the problem of fillna in the resampling function
# Add and filter based on 'original_idx'
ddf_hist_compact[['original_idx']] = df_filtered[['temporary_col']].resample(dt_str).count()
ddf_hist_compact = ddf_hist_compact[ddf_hist_compact['original_idx'] != 0]
ddf_hist_compact = ddf_hist_compact.drop('original_idx', axis=1)
# Initialize DataFrame based on whether the result is empty or not
list_col = 'result'
max_list_length = len(bin_ctrs)
if ddf_hist_compact.empty:
columns = [f'{list_col}_{i}' for i in range(max_list_length)]
ddf_hist = pd.DataFrame(np.nan, columns=columns, index=flow.index)
else:
ddf_hist = ddf_hist_compact[list_col].apply(pd.Series)
ddf_hist.index = ddf_hist_compact.index
if calculate_conc:
# Join with the sample flow controller data
inc_hist_flow = ddf_hist.join(flow)
if rho_eff is not None and BC_type is not None:
# Calculate diameters and densities
density, Dmev = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type)
# Calculate number concentration
dNdlogDmev = counts2numConc(inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t) / get_dlogp(Dmev)
if flag_col is None:
dNdlogDmev.columns = [f'dNdlogDmev_all_{i:.2f}' for i in Dmev]
else:
dNdlogDmev.columns = [f'dNdlogDmev_{flag_col}_{i:.2f}' for i in Dmev]
# Calculate mass concentration
dMdlogDmev = dNdlogDp_to_dMdlogDp(dNdlogDmev, Dmev, rho=density)
if flag_col is None:
dMdlogDmev.columns = [f'dMdlogDmev_all_{i:.2f}' for i in Dmev]
else:
dMdlogDmev.columns = [f'dMdlogDmev_{flag_col}_{i:.2f}' for i in Dmev]
else:
# Calculate number concentration
dNdlogDmev = counts2numConc(inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t) / get_dlogp(bin_ctrs)
if flag_col is None:
dNdlogDmev.columns = [f'dNdlogDsc_all_{i:.2f}' for i in bin_ctrs]
else:
dNdlogDmev.columns = [f'dNdlogDsc_{flag_col}_{i:.2f}' for i in bin_ctrs]
dMdlogDmev = None
return dNdlogDmev, dMdlogDmev
else:
return ddf_hist, None
#%% Function to process single particle data already stored as parquet files
@delayed
def process_pbp_parquet(dir_path_pbp, dir_path_hk,
dt=1,
rho_eff=1800, BC_type='constant_effective_density',
inc_calib_curve='', inc_calib_params=[],
scatt_calib_curve='', scatt_calib_params=[],
config_file_dir='',
minM=None, maxM=None, n_incbins=None,
minOptD=None, maxOptD=None, n_scattbins=None,
minTL=None, maxTL=None, n_timelag=None,
save_final_data=True, path_parquet='', partition_on=['date', 'hour']
):
read_dir_pbp = dir_path_pbp
read_dir_hk = dir_path_hk
ddf_pbp = pd.read_parquet(read_dir_pbp)#, calculate_divisions=True)#.repartition(freq='1h')
ddf_hk = pd.read_parquet(read_dir_hk)#, calculate_divisions=True)#.repartition(freq='1h')
ini_params = read_xr_ini_file(glob(f'{config_file_dir}/*Calibration*.ini')[0])
ini_params['IncSatPoint'] = 1.7e9 # 2e9 # Guess for now, need to see where to obtain this from. 2e9 is probably better guess
ini_params['ScattSatPoint'] = 1.7e9 # Guess for now, need to see where to obtain this from. 1.85e9 is probably better
ini_params['DelayTimeThresh'] = 200 # Also a guess for now, to be updated
inc_mass_bin_lims = np.logspace(np.log10(minM), np.log10(maxM), n_incbins)
inc_mass_bin_lims_for_pd_cut = np.logspace(np.log10(minM), np.log10(maxM+0.01), n_incbins)
inc_mass_bin_ctrs = bin_lims_to_ctrs(inc_mass_bin_lims)
dt_str = f'{dt}s'
# ***** ***** *****
# Apply calibration
# ***** ***** *****
if inc_calib_curve:
if inc_calib_curve == 'polynomial':
ddf_pbp['BC mass'] = ddf_pbp['Incand relPeak'].apply(lambda x: polynomial(x, *inc_calib_params))#, meta=('BC mass', 'float64'))
minM_timelag = polynomial(5e6, *inc_calib_params)
elif inc_calib_curve == 'powerlaw':
ddf_pbp['BC mass'] = ddf_pbp['Incand relPeak'].apply(lambda x: powerlaw(x, inc_calib_params))#, meta=('BC mass', 'float64'))
minM_timelag = powerlaw(5e6, inc_calib_params)
else:
ddf_pbp['BC mass'] = ddf_pbp['Incand Mass (fg)']
minM_timelag = ddf_pbp.loc[(ddf_pbp['Incand relPeak']<=5.05e6) & (ddf_pbp['Incand relPeak']>=4.95e6), 'Incand Mass (fg)'].mean() # this could create problems if there are no incandescence signals in the selected range
ddf_pbp.loc[ddf_pbp['Incand relPeak']==0, 'BC mass'] = np.nan
ddf_pbp['BC mass bin'], mass_bins = pd.cut(ddf_pbp['BC mass'], bins=inc_mass_bin_lims_for_pd_cut, right=False, include_lowest=True, retbins=True)
if scatt_calib_curve:
if scatt_calib_curve == 'polynomial':
ddf_pbp['Opt diam'] = ddf_pbp['Scatter relPeak'].apply(lambda x: polynomial(x, *scatt_calib_params))#, meta=('Opt diam', 'float64'))
elif scatt_calib_curve == 'powerlaw':
ddf_pbp['Opt diam'] = ddf_pbp['Scatter relPeak'].apply(lambda x: powerlaw(x, *scatt_calib_params))#, meta=('Opt diam', 'float64'))
else:
ddf_pbp['Opt diam'] = ddf_pbp['Scatter Size (nm)']
ddf_pbp.loc[ddf_pbp['Scatter relPeak']==0, 'Opt diam'] = np.nan
ddf_pbp['time_lag_new'] = ddf_pbp['Incand Peak Time'] - ddf_pbp['Scatter Peak Time']
# ***** ***** ***** *****
# Set flags
# ***** ***** ***** *****
flag_inc_transit_time = (ddf_pbp['Incand Transit Time']>=ini_params['IncTransitMin']) & (ddf_pbp['Incand Transit Time']<=ini_params['IncTransitMax'])
flag_inc_fwhm = (ddf_pbp['Incand FWHM']>=ini_params['IncFWHMMin']) & (ddf_pbp['Incand FWHM']<=ini_params['IncFWHMMax'])
flag_inc_not_sat = (ddf_pbp['Incand relPeak']<ini_params['IncSatPoint'])
flag_scatt_transit_time = (ddf_pbp['Scatter Transit Time']>=ini_params['ScattTransitMin']) & (ddf_pbp['Scatter Transit Time']<=ini_params['ScattTransitMax'])
flag_scatt_fwhm = (ddf_pbp['Scatter FWHM']>=ini_params['ScattFWHMMin']) & (ddf_pbp['Scatter FWHM']<=ini_params['ScattFWHMMax'])
flag_scatt_not_sat = (ddf_pbp['Scatter relPeak']<ini_params['ScattSatPoint'])
flag_inc = (flag_inc_transit_time & flag_inc_fwhm)
flag_inc_in_range = flag_inc & (ddf_pbp['BC mass'] >= minM) & (ddf_pbp['BC mass'] <= maxM)
flag_inc_in_range_tl_analysis = flag_inc & (ddf_pbp['BC mass'] >= minM_timelag) & (ddf_pbp['BC mass'] <= maxM)
flag_scatt = (flag_scatt_transit_time & flag_scatt_fwhm)
flag_scatt_in_range = flag_scatt & (ddf_pbp['Opt diam'] >= minOptD) & (ddf_pbp['Opt diam'] <= maxOptD)
flag_negative_timelag = ddf_pbp['time_lag_new']<-10
flag_extreme_positive_timelag = ddf_pbp['time_lag_new']>=400
flag_timelag_0_50 = (ddf_pbp['time_lag_new']<50) & (ddf_pbp['time_lag_new']>=-10)
flag_timelag_greater_50 = (ddf_pbp['time_lag_new']>=50) & (ddf_pbp['time_lag_new']<400)
ddf_pbp['ratio_inc_scatt'] = np.log10(ddf_pbp['Incand relPeak']) / np.log10(ddf_pbp['Scatter relPeak'])
flag_low_ratio_inc_scatt = ddf_pbp['ratio_inc_scatt']<1.1
# ***** ***** ***** ***** ***** ***** *****
# Create column for purely scattering particle analysis
# ***** ***** ***** ***** ***** ***** *****
ddf_pbp['Opt diam scatt only'] = ddf_pbp['Opt diam'].where(flag_scatt & ~flag_inc, np.nan) #ddf_pbp['Opt diam'].mask(ddf_pbp['BC mass'].notnull(), np.nan)
# ***** ***** *****
# Set flag for thin/thick
# ***** ***** *****
ddf_pbp['cnts_thin'] = 0
ddf_pbp['cnts_thin_noScatt'] = 0
ddf_pbp['cnts_thick'] = 0
ddf_pbp['cnts_thick_sat'] = 0
ddf_pbp['cnts_thin_sat'] = 0
ddf_pbp['cnts_ntl_sat'] = 0
ddf_pbp['cnts_ntl'] = 0
ddf_pbp['cnts_extreme_positive_timelag'] = 0
ddf_pbp['cnts_thin_low_inc_scatt_ratio'] = 0
ddf_pbp['cnts_particles_for_tl_dist'] = 0 # this flag is used for the calculation of time lag distributions below (it includes the particles classfified as "thin" or "thick")
ddf_pbp.loc[~flag_scatt & flag_inc_in_range, 'cnts_thin_noScatt'] = 1
ddf_pbp.loc[flag_scatt & flag_scatt_not_sat & flag_inc_in_range & flag_timelag_0_50 & ~flag_low_ratio_inc_scatt, 'cnts_thin'] = 1
ddf_pbp.loc[flag_scatt & flag_scatt_not_sat & flag_inc_in_range & flag_timelag_0_50 & flag_low_ratio_inc_scatt, 'cnts_thin_low_inc_scatt_ratio'] = 1
ddf_pbp.loc[flag_scatt & flag_scatt_not_sat & flag_inc_in_range & flag_timelag_greater_50 & ~flag_extreme_positive_timelag, 'cnts_thick'] = 1
ddf_pbp.loc[flag_scatt & ~flag_scatt_not_sat & flag_inc_in_range & flag_timelag_greater_50 & ~flag_extreme_positive_timelag, 'cnts_thick_sat'] = 1
ddf_pbp.loc[flag_scatt & ~flag_scatt_not_sat & flag_inc_in_range & flag_timelag_0_50, 'cnts_thin_sat'] = 1
ddf_pbp.loc[flag_scatt & ~flag_scatt_not_sat & flag_inc_in_range & flag_negative_timelag, 'cnts_ntl_sat'] = 1
ddf_pbp.loc[flag_scatt & flag_scatt_not_sat & flag_inc_in_range & flag_negative_timelag, 'cnts_ntl'] = 1
ddf_pbp.loc[flag_scatt & flag_inc_in_range & flag_extreme_positive_timelag, 'cnts_extreme_positive_timelag'] = 1
ddf_pbp.loc[flag_scatt & flag_scatt_not_sat & flag_inc_in_range & ((flag_timelag_0_50 & ~flag_low_ratio_inc_scatt) | (flag_timelag_greater_50 & ~flag_extreme_positive_timelag)), 'cnts_particles_for_tl_dist'] = 1
ddf_pbp['cnts_thin_total'] = ddf_pbp['cnts_thin'] + ddf_pbp['cnts_thin_noScatt']
ddf_pbp['cnts_thick_total'] = ddf_pbp['cnts_thick'] + ddf_pbp['cnts_thick_sat'] + ddf_pbp['cnts_ntl_sat'] + ddf_pbp['cnts_ntl'] + ddf_pbp['cnts_thin_sat']
ddf_pbp['cnts_unclassified'] = ddf_pbp['cnts_extreme_positive_timelag'] + ddf_pbp['cnts_thin_low_inc_scatt_ratio']
ddf_pbp['temporary_col'] = 1 # this is to keep track of time stamps that are not originally in the file but are added when i do the resampling
# Create two new columns where the particles out of range are changed to np.nan (!! do not change them to 0, otherwise below where we resample and count, the 0s would be counted being a not null value!!)
ddf_pbp['BC mass within range'] = ddf_pbp['BC mass'].where(flag_inc_in_range, np.nan) # ddf_pbp['BC mass'].mask(mask_BC_within_range)
ddf_pbp['Opt diam scatt only within range'] = ddf_pbp['Opt diam scatt only'].where(flag_scatt_in_range & ~flag_inc, np.nan)
ddf_pbp_1s = ddf_pbp[['Dropped Records',
'Incand Mass (fg)', # This is the original column from the file, the calibration applied is the one uploaded in the software
'BC mass', # If calibration parameters are provided, this is the mass calculated form those parameters. If calib param are not provided this is equal to 'Incand Mass (fg)'
'BC mass within range', # In this column only the masses within the range specified by the user are considered
'cnts_thin',
'cnts_thin_noScatt',
'cnts_thick',
'cnts_thick_sat',
'cnts_thin_sat',
'cnts_ntl_sat',
'cnts_ntl',
'cnts_extreme_positive_timelag',
'cnts_thin_low_inc_scatt_ratio',
'cnts_thin_total',
'cnts_thick_total',
'cnts_unclassified'
]].resample(dt_str).sum()
ddf_pbp_1s[['BC numb from file', # number of BC particles counted from the column 'Incand Mass (fg)'
'BC numb', # number of BC particles counted from the column 'BC mass'
'BC numb within range', # number of BC particles counted from the column 'BC mass within range'
'scatter numb from file', # number of scattering particles counted from the column 'Scatter Size (nm)'
'Scatt numb', # number of purely scattering particles counted from the column 'Opt diam'
'Scatt numb within range', # number of purely scattering particles counted from the column 'Opt diam within range'
'original_idx',
]] = ddf_pbp[['Incand Mass (fg)',
'BC mass',
'BC mass within range',
'Scatter Size (nm)',
'Opt diam scatt only',
'Opt diam scatt only within range',
'temporary_col'
]].resample(dt_str).count()
ddf_pbp_1s[['Secs_2GB_mean']] = ddf_pbp[['Secs_2GB']].resample(dt_str).mean()
# ddf_pbp_1s = ddf_pbp_1s.map_partitions(add_date_column)
ddf_pbp_1s['date'] = ddf_pbp_1s.index.normalize()
ddf_pbp_1s['hour'] = ddf_pbp_1s.index.hour
ddf_pbp_1s = ddf_pbp_1s[ddf_pbp_1s['original_idx'] != 0]
ddf_pbp_1s = ddf_pbp_1s.drop('original_idx', axis=1)#.persist()
# **** **** **** **** **** **** ****
# Create a 1s ddf for the hk file
# **** **** **** **** **** **** ****
ddf_hk['temporary_col'] = 1
ddf_hk_1s = ddf_hk[['Sample Flow Controller Read (sccm)',
'Sample Flow Controller Read (vccm)']].resample(dt_str).mean() # do i need a fill na here?
ddf_hk_1s[['original_idx']] = ddf_hk[['temporary_col']].resample(dt_str).count()
ddf_hk_1s = ddf_hk_1s[ddf_hk_1s['original_idx'] != 0]
ddf_hk_1s = ddf_hk_1s.drop('original_idx', axis=1)#.persist()
# **** **** **** **** **** **** ****
# Create a 1s join ddf for the pbp_1s and hk_1s data
# **** **** **** **** **** **** ****
ddf_pbp_hk = pd.merge(ddf_pbp_1s, ddf_hk_1s, how='left', left_index=True, right_index=True)
# ddf_pbp_hk = ddf_pbp_hk.map_partitions(calculate_concentrations, t=dt)
ddf_pbp_hk['BC_massConc_std'] = ddf_pbp_hk['BC mass'] * 1e-9 / (ddf_pbp_hk['Sample Flow Controller Read (sccm)'] * (dt/60) * 1e-6)
ddf_pbp_hk['BC_massConc_vol'] = ddf_pbp_hk['BC mass'] * 1e-9 / (ddf_pbp_hk['Sample Flow Controller Read (vccm)'] * (dt/60) * 1e-6)
ddf_pbp_hk['BC_numConc_std'] = ddf_pbp_hk['BC numb'] / (ddf_pbp_hk['Sample Flow Controller Read (sccm)'] * (dt/60))
ddf_pbp_hk['BC_numConc_vol'] = ddf_pbp_hk['BC numb'] / (ddf_pbp_hk['Sample Flow Controller Read (vccm)'] * (dt/60))
ddf_pbp_hk['BC_massConc_within_range_std'] = ddf_pbp_hk['BC mass within range'] * 1e-9 / (ddf_pbp_hk['Sample Flow Controller Read (sccm)'] * (dt/60) * 1e-6)
ddf_pbp_hk['BC_massConc_within_range_vol'] = ddf_pbp_hk['BC mass within range'] * 1e-9 / (ddf_pbp_hk['Sample Flow Controller Read (vccm)'] * (dt/60) * 1e-6)
ddf_pbp_hk['BC_numConc_within_range_std'] = ddf_pbp_hk['BC numb within range'] / (ddf_pbp_hk['Sample Flow Controller Read (sccm)'] * (dt/60))
ddf_pbp_hk['BC_numConc_within_range_vol'] = ddf_pbp_hk['BC numb within range'] / (ddf_pbp_hk['Sample Flow Controller Read (vccm)'] * (dt/60))
ddf_pbp_hk['S_numConc_std'] = ddf_pbp_hk['Scatt numb'] / (ddf_pbp_hk['Sample Flow Controller Read (sccm)'] * (dt/60))
ddf_pbp_hk['S_numConc_vol'] = ddf_pbp_hk['Scatt numb'] / (ddf_pbp_hk['Sample Flow Controller Read (vccm)'] * (dt/60))
ddf_pbp_hk['S_numConc_within_range_std'] = ddf_pbp_hk['Scatt numb within range'] / (ddf_pbp_hk['Sample Flow Controller Read (sccm)'] * (dt/60))
ddf_pbp_hk['S_numConc_within_range_vol'] = ddf_pbp_hk['Scatt numb within range'] / (ddf_pbp_hk['Sample Flow Controller Read (vccm)'] * (dt/60))
# Calculate histograms of different classifications/flags:
ddf_pbp['temporary_col'] = 1
dNdlogDmev, dMdlogDmev = process_hist_and_dist(ddf_pbp, 'BC mass within range', None, None, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
dNdlogDmev_thin, dMdlogDmev_thin = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thin', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
dNdlogDmev_thin_noScatt, dMdlogDmev_thin_noScatt = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thin_noScatt', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
dNdlogDmev_thick, dMdlogDmev_thick = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thick', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
dNdlogDmev_thick_sat, dMdlogDmev_thick_sat = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thick_sat', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
dNdlogDmev_thin_sat, dMdlogDmev_thin_sat = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thin_sat', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
dNdlogDmev_ntl_sat, dMdlogDmev_ntl_sat = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_ntl_sat', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
dNdlogDmev_ntl, dMdlogDmev_ntl = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_ntl', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
dNdlogDmev_extreme_positive_timelag, dMdlogDmev_extreme_positive_timelag = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_extreme_positive_timelag', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
dNdlogDmev_thin_low_inc_scatt_ratio, dMdlogDmev_thin_low_inc_scatt_ratio = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thin_low_inc_scatt_ratio', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
dNdlogDmev_thin_total, dMdlogDmev_thin_total = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thin_total', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
dNdlogDmev_thick_total, dMdlogDmev_thick_total = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thick_total', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
dNdlogDmev_unclassified, dMdlogDmev_unclassified = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_unclassified', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
scatt_bin_lims = np.logspace(np.log10(minOptD), np.log10(maxOptD), n_scattbins)
scatt_bin_ctrs = bin_lims_to_ctrs(scatt_bin_lims)
dNdlogDsc, _ = process_hist_and_dist(ddf_pbp, 'Opt diam scatt only', None, None, scatt_bin_lims, scatt_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=None, BC_type=None, t=dt)
timelag_bins_lims = np.linspace(minTL, maxTL, n_timelag)
timelag_bin_ctrs = bin_lims_to_ctrs(timelag_bins_lims)
list_hist = []
for idx, (name, group) in enumerate(ddf_pbp.groupby('BC mass bin')):
a, _ = process_hist_and_dist(group, 'time_lag_new', 'cnts_particles_for_tl_dist', 1, timelag_bins_lims, timelag_bin_ctrs, dt_str, calculate_conc=True, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=None, BC_type=None, t=dt)
a.columns = [f'dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_timelag_{i:.1f}' for i in timelag_bin_ctrs]
list_hist.append(a)
time_lag_hists = pd.concat(list_hist, axis=1)
# **** **** **** **** **** ****
# Merge everything in 1 ddf and save
# **** **** **** **** **** ****
final_df = pd.concat([ddf_pbp_hk,
dNdlogDmev, dMdlogDmev,
dNdlogDmev_thin, dMdlogDmev_thin,
dNdlogDmev_thin_noScatt, dMdlogDmev_thin_noScatt,
dNdlogDmev_thick, dMdlogDmev_thick,
dNdlogDmev_thick_sat, dMdlogDmev_thick_sat,
dNdlogDmev_thin_sat, dMdlogDmev_thin_sat,
dNdlogDmev_ntl_sat, dMdlogDmev_ntl_sat,
dNdlogDmev_ntl, dMdlogDmev_ntl,
dNdlogDmev_extreme_positive_timelag, dMdlogDmev_extreme_positive_timelag,
dNdlogDmev_thin_low_inc_scatt_ratio, dMdlogDmev_thin_low_inc_scatt_ratio,
dNdlogDmev_thin_total, dMdlogDmev_thin_total,
dNdlogDmev_thick_total, dMdlogDmev_thick_total,
dNdlogDmev_unclassified, dMdlogDmev_unclassified,
dNdlogDsc,
time_lag_hists],
axis=1)
final_df['date'] = final_df.index.normalize()
final_df['hour'] = final_df.index.hour
final_df["date"] = final_df["date"].astype("date64[pyarrow]")
if save_final_data == True:
dd.from_pandas(final_df.sort_index(), npartitions=1).to_parquet(path = path_parquet,
engine='pyarrow',
partition_on=partition_on,
write_index=True
)
#%% Function to resample pbp to new time interval
def resample_to_dt(dir_path_pbp, dt=60, path_parquet='', save_final_data=False):
dd_data = pd.read_parquet(dir_path_pbp)
dd_data['Time'] = dd_data.index
dd_data['date'] = dd_data['Time'].dt.date.astype("date64[pyarrow]")
dd_data['temporary_col'] = 1
cols_for_mean = [col for col in dd_data.columns if any(substring in col for substring in ['date', 'Conc', 'dNdlogDmev', 'dMdlogDmev', 'dNdlogDsc', 'Flow'])]
#thin_cols_list = [col for col in dd_data.columns if 'thin' in col and all(substring not in col for substring in ['BC', 'dNdlogDmev', 'dMdlogDmev'])]
#thick_cols_list = [col for col in dd_data.columns if 'thick' in col and all(substring not in col for substring in ['BC', 'dNdlogDmev', 'dMdlogDmev'])]
#cols_for_sum = [col for col in dd_data.columns if any(substring in col for substring in ['timelag'])]+thin_cols_list+thick_cols_list
timelag_hist_cols = [col for col in dd_data.columns if 'timelag' in col and all(substring not in col for substring in ['cnts', 'dNdlogDmev', 'dMdlogDmev'])]
cnts_cols = [col for col in dd_data.columns if 'cnts' in col]
addiotnal_cols = [col for col in dd_data.columns if col in ['Dropped Records', 'Incand Mass (fg)', 'BC mass', 'BC mass within range',
'BC numb from file', 'BC numb', 'BC numb within range',
'scatter numb from file', 'Scatt numb', 'Scatt numb within range']]
cols_for_sum = timelag_hist_cols + cnts_cols + addiotnal_cols
cols_for_count = ['temporary_col']
data_resampled_mean = dd_data[cols_for_mean].fillna(0).resample(f'{dt}s').mean()
data_resampled_sum = dd_data[cols_for_sum].fillna(0).resample(f'{dt}s').sum()
data_resampled_count = dd_data[cols_for_count].resample(f'{dt}s').count()
#merged = dd.merge(data_resampled_mean, data_resampled_sum, left_index=True, right_index=True, how='outer')
merged = pd.concat([data_resampled_mean, data_resampled_sum], axis=1)
#merged['date'] = merged.index.normalize()
merged = merged[merged['date'].notna()]
#merged['date'] = merged.index.normalize()
#merged["date"] = merged["date"].astype("date64[pyarrow]")
if save_final_data:
dd.from_pandas(merged.sort_index(), npartitions=1).to_parquet(path=path_parquet,
engine='pyarrow',
partition_on=['date'],
coerce_timestamps='us',
allow_truncated_timestamps=True,
write_index=True,
append=False)
#%% Widget to plot raw traces
def raw_traces_plot(ch0_plot=[], ch1_plot=[],
add_str_title='',
xmin=None, xmax=None,
ddf_processed=False, verbose=False,
):
current_event_index = 1
def update_plot(event_index):
fig, ax1 = plt.subplots(figsize=(5, 3), layout='constrained')
if verbose and ddf_processed.any().any():
ax1.text(0.75, 0.4, 'ch0_flag_000: '+str(ddf_processed.iloc[event_index]['ch0_flag_000']), fontsize=8, transform=ax1.transAxes)
ax1.set_title(add_str_title+' , idx: '+str(event_index)+'\n'+str(ch0_plot.iloc[event_index].name))
ax1t = ax1.twinx()
if len(ch0_plot)!=0:
ax1.plot(ch0_plot.iloc[event_index], label='ch0', c='C0', ls='-')
if len(ch1_plot)!=0:
ax1t.plot(ch1_plot.iloc[event_index], label='ch1', c='C1', ls='-')
ax1.set_ylabel('Scattering', color='tab:blue')
ax1.tick_params(axis='y', labelcolor='tab:blue')
ax1.legend(loc=1)
ax1t.legend(loc=4)
if xmin and xmax is not None:
ax1.set_xlim(xmin, xmax)
plt.show()
def handle_forward_button_click(b):
nonlocal current_event_index
current_event_index = min(current_event_index + 1, len(ch0_plot) - 1)
event_slider.value = current_event_index
def handle_backward_button_click(b):
nonlocal current_event_index
current_event_index = max(current_event_index - 1, 0)
event_slider.value = current_event_index
event_slider = widgets.IntSlider(min=0, max=len(ch0_plot) - 1, step=1, value=current_event_index, description='Event:')
forward_button = widgets.Button(description='Forward')
backward_button = widgets.Button(description='Backward')
forward_button.on_click(handle_forward_button_click)
backward_button.on_click(handle_backward_button_click)
display(widgets.HBox([backward_button, forward_button]))
interactive_plot = widgets.interactive(update_plot, event_index=event_slider)
display(interactive_plot)
#%% Functions for plots
def save_image(filename):
with PdfPages(filename) as p:
# get_fignums Return list of existing
# figure numbers
fig_nums = plt.get_fignums()
figs = [plt.figure(n) for n in fig_nums]
# iterating over the numbers in list
for fig in figs:
# and saving the files
fig.savefig(p, format='pdf')