2183 lines
96 KiB
Python
2183 lines
96 KiB
Python
# -*- coding: utf-8 -*-
|
|
# %% Import libraries
|
|
|
|
|
|
import os
|
|
from collections import defaultdict
|
|
import dask.dataframe as dd
|
|
# import dask.delayed
|
|
import pandas as pd
|
|
import numpy as np
|
|
import re
|
|
import warnings
|
|
from scipy.optimize import curve_fit
|
|
from numpy.polynomial import Polynomial
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
from glob import glob
|
|
import datetime
|
|
import struct
|
|
import zipfile
|
|
from dask import delayed
|
|
|
|
import time
|
|
|
|
import ipywidgets as widgets
|
|
from IPython.display import display, clear_output
|
|
|
|
|
|
from matplotlib.backends.backend_pdf import PdfPages
|
|
|
|
# %% Functions for listing files with specific string in name
|
|
|
|
def find_files(directory, string, avoid='xxxxxxxxxx'):
|
|
'''
|
|
This function finds all the files that contain 'string' in a directory and its subdirs.
|
|
If the same file exsist with both .csv and .zip format, only the .csv file is kept in the list
|
|
|
|
Parameters
|
|
----------
|
|
directory : str
|
|
DESCRIPTION.
|
|
string : str
|
|
DESCRIPTION.
|
|
|
|
Returns
|
|
-------
|
|
filtered_paths : list of directories
|
|
DESCRIPTION.
|
|
|
|
'''
|
|
# Step 1: Find all matching files first
|
|
all_matching_files = []
|
|
for root, dirs, files in os.walk(directory):
|
|
for file in files:
|
|
if string in file and avoid not in root:
|
|
all_matching_files.append(os.path.join(root, file))
|
|
|
|
# Step 2: Organize files by their basename
|
|
files_by_basename = defaultdict(list)
|
|
for path in all_matching_files:
|
|
basename = os.path.splitext(os.path.basename(path))[0]
|
|
files_by_basename[basename].append(path)
|
|
|
|
# Step 3: Filter based on extension priority
|
|
filtered_paths = []
|
|
for basename, paths in files_by_basename.items():
|
|
if any(path.endswith('.csv') for path in paths) and any(path.endswith('.zip') for path in paths):
|
|
# Add only the CSV version
|
|
filtered_paths.extend([path for path in paths if path.endswith('.csv')])
|
|
else:
|
|
# If only one version exists, add it without checking the extension
|
|
filtered_paths.extend(paths)
|
|
|
|
# Step 4: Filter based on size ?
|
|
|
|
return filtered_paths
|
|
|
|
def get_file_dict(directory, file_type, level='hour'):
|
|
"""
|
|
Creates a dictionary with date and hour as keys and file paths as values
|
|
for the given directory.
|
|
"""
|
|
file_dict = {}
|
|
# Assuming the files have a structure like /path/to/directory/YYYYMMDD/HH/file.parquet
|
|
for file_path in glob(f'{directory}/**/*.parquet', recursive=True):
|
|
# Extracting date and hour from the file path
|
|
parts = file_path.split(os.sep)
|
|
date = parts[-3]
|
|
hour = parts[-2]
|
|
if level == 'hour':
|
|
file_dict[(date, hour)] = os.path.join('/', *parts[:-1])
|
|
elif level == 'date':
|
|
file_dict[(date)] = os.path.join('/', *parts[:-2])
|
|
return file_dict
|
|
|
|
|
|
def chunks(lst, n):
|
|
"""Yield successive n-sized chunks from lst."""
|
|
for i in range(0, len(lst), n):
|
|
yield lst[i:i + n]
|
|
|
|
def list_files_and_dirs(base_dir):
|
|
""" List full paths of all files and directories in the given directory, including nested ones """
|
|
file_list = []
|
|
subdir_list = []
|
|
|
|
# os.walk yields a tuple (dirpath, dirnames, filenames)
|
|
for dirpath, dirnames, filenames in os.walk(base_dir):
|
|
# Add directories to subdir_list
|
|
for dirname in dirnames:
|
|
subdir_full_path = os.path.join(dirpath, dirname)
|
|
subdir_list.append(subdir_full_path)
|
|
|
|
# Add files to file_list
|
|
for filename in filenames:
|
|
file_full_path = os.path.join(dirpath, filename)
|
|
file_list.append(file_full_path)
|
|
|
|
return subdir_list, file_list
|
|
|
|
|
|
def list_first_level_subdirs(base_dir):
|
|
""" List full paths of all first-level subdirectories within the given directory """
|
|
subdir_list = []
|
|
# List all entries in the base directory
|
|
for entry in os.listdir(base_dir):
|
|
# Create full path
|
|
full_path = os.path.join(base_dir, entry)
|
|
# Check if this path is a directory
|
|
if os.path.isdir(full_path):
|
|
subdir_list.append(full_path)
|
|
return subdir_list
|
|
|
|
|
|
#%% Functions to read .csv or .zip PbP or HK files
|
|
|
|
def extract_datetime(df):
|
|
'''
|
|
Thi function selects the datetime out of the SP2XR file name.
|
|
|
|
Parameters
|
|
----------
|
|
df : Pandas DataFrame
|
|
DataFRame conteining the column 'orig_file_name'.
|
|
|
|
Returns
|
|
-------
|
|
Pandas Series
|
|
Date and time corresponding to the date present in the 'orig_file_name' column.
|
|
|
|
'''
|
|
# return pd.to_datetime(df['orig_file_name'].split('_')[-2])
|
|
return pd.to_datetime(df['path'].split('_')[-2])
|
|
|
|
|
|
def calculate_delta_sec(df):
|
|
'''
|
|
This function calculates the difference in seconds between the columns
|
|
'Time (sec)' and 'first_val' present in the input dataframe
|
|
|
|
Parameters
|
|
----------
|
|
df : pandas dataframe
|
|
The columns 'Time (sec)' and 'first_val' must be present in the DataFrame.
|
|
|
|
Returns
|
|
-------
|
|
int
|
|
Floored seconds between the values in the two columns.
|
|
|
|
'''
|
|
return np.floor(df['Time (sec)']) - df['first_val']
|
|
|
|
|
|
@delayed
|
|
def read_csv_files_with_dask(file_path, meta_pbp, meta_hk, target_directory):
|
|
'''
|
|
This function reads Pbp or HK files from the SP2XR
|
|
|
|
Parameters
|
|
----------
|
|
file_path : str
|
|
Complete path of the file to read.
|
|
meta : pandas DataFrame
|
|
Empty pandas dataframe with the structure expected for the file that is read.
|
|
This is ised in case the file is empty --> The function will return an empty DataFrame
|
|
with this structure.
|
|
|
|
Returns
|
|
-------
|
|
Dask DataFrame
|
|
Content of the file as Dask DataFrame.
|
|
|
|
'''
|
|
if file_path:
|
|
tmp_hk = pd.DataFrame()
|
|
|
|
hk_0001 = re.sub(r'PbP', 'hk', file_path)
|
|
hk_0001 = re.sub(r'(_x)\d{4}', r'\g<1>0001', hk_0001)
|
|
hk_0001 = re.sub(r'\.(csv|zip)$', '', hk_0001)
|
|
if os.path.exists(f"{hk_0001}.csv"):
|
|
try:
|
|
tmp_hk = pd.read_csv(f"{hk_0001}.csv", nrows=1, parse_dates=['Time Stamp'], usecols=['Time Stamp', 'Time (sec)'])
|
|
except pd.errors.EmptyDataError:
|
|
tmp_hk = pd.DataFrame()
|
|
except zipfile.BadZipFile:
|
|
print(f'!! Bad file: {file_path}')
|
|
tmp_hk = pd.DataFrame()
|
|
elif os.path.exists(f"{hk_0001}.zip"):
|
|
try:
|
|
tmp_hk = pd.read_csv(f"{hk_0001}.zip", nrows=1, parse_dates=['Time Stamp'], usecols=['Time Stamp', 'Time (sec)'])
|
|
except pd.errors.EmptyDataError:
|
|
tmp_hk = pd.DataFrame()
|
|
|
|
if not tmp_hk.empty:
|
|
first_val, t0 = tmp_hk[['Time (sec)', 'Time Stamp']].values[0]
|
|
if 'PbP' in file_path:
|
|
temp = meta_pbp
|
|
data_type = pd.Series(temp.dtypes.values, index=temp.columns).to_dict()
|
|
try:
|
|
df = dd.read_csv(file_path, dtype=data_type, blocksize=None)#, include_path_column=True)
|
|
df = df.fillna(0) # is this because otherwise we cannot calculate the time_lag?
|
|
# df['time_lag'] = df['Incand Peak Time'] - df['Scatter Peak Time'] # 02.09.2024 this line implies that for particles with nan in the scatt transit time time_lag=incand transit time. better to calculate timelag for particles with both scatt and incand and set 0 for particles with only incand
|
|
#!!! MISSING CORRECT TIME LAG CALCULATIONS
|
|
except zipfile.BadZipFile:
|
|
print(f'!! Bad zip file: {file_path}')
|
|
df = pd.DataFrame()
|
|
|
|
elif 'hk' in file_path:
|
|
temp = meta_hk
|
|
data_type = pd.Series(temp.dtypes.values, index=temp.columns).to_dict()
|
|
filtered_dtype_dict = {key: value for key, value in data_type.items() if value != 'datetime64[ns]'}
|
|
try:
|
|
df = dd.read_csv(file_path, dtype=filtered_dtype_dict, parse_dates=['Time Stamp'], blocksize=None, assume_missing=True)
|
|
# df = dd.read_csv(file_path, dtype=data_type, parse_dates=['Time Stamp'], blocksize=None)#, assume_missing=True)
|
|
'''if 'Time Stamp' in df.columns:
|
|
datetime_format = '%m/%d/%Y %H:%M:%S.%f'
|
|
df['Time Stamp'] = df['Time Stamp'].map_partitions(pd.to_datetime, format=datetime_format, meta=('Time Stamp', 'datetime64[ns]'))
|
|
'''
|
|
except ValueError as e:
|
|
# Handle the error if the 'Time Stamp' column is missing or any other parsing error occurs
|
|
if 'Missing column provided to \'parse_dates\'' in str(e):
|
|
print(f"Error for {file_path}: Missing column provided to 'parse_dates': 'Time Stamp'")
|
|
df = pd.DataFrame()
|
|
except pd.errors.EmptyDataError:
|
|
df = pd.DataFrame()
|
|
except zipfile.BadZipFile:
|
|
print(f'!! Bad zip file: {file_path}')
|
|
df = pd.DataFrame()
|
|
|
|
if len(df.columns) > 0:
|
|
df = df.loc[~df.isna().all(axis=1)]
|
|
|
|
df['path'] = str(file_path)
|
|
df['first_val'] = first_val
|
|
df['t0'] = t0
|
|
file_name_cut = file_path.split('\\')[-1].split('_')[-2] + '_' + file_path.split('\\')[-1].split('_')[-1].split('.')[-2]
|
|
df['file'] = file_name_cut
|
|
folder_name = file_path.split('\\')[-1].split('_')[-2]
|
|
df['folder_name'] = folder_name
|
|
|
|
|
|
if 'Time Stamp' in df.columns:
|
|
df['Time Stamp'] = df['Time Stamp'].map_partitions(pd.to_datetime, meta=('Time Stamp', 'datetime64[ns]'))
|
|
|
|
df['delta_sec'] = df.map_partitions(calculate_delta_sec, meta=('delta_sec', 'float64'))
|
|
df['calculated_time'] = df['t0'] + dd.to_timedelta(df['delta_sec'], unit='s')
|
|
df['file_datetime'] = df.apply(extract_datetime, axis=1, meta=('file_datetime', 'datetime64[ns]'))
|
|
df['date_floored'] = df['calculated_time'].dt.floor('H')
|
|
df["date"] = df["calculated_time"].dt.date.astype("date64[pyarrow]")
|
|
df['hour'] = df['calculated_time'].dt.hour.astype('i8')
|
|
df['floor_time'] = df['calculated_time'].dt.floor('S')
|
|
df['Secs_2GB'] = df['Time (sec)'].apply(np.floor, meta=('Secs_2GB', 'i8'))
|
|
|
|
|
|
fn = file_path.split('\\')[-1].split('_')[-2] + '_' + file_path.split('\\')[-1].split('_')[-1].split('.')[-2]
|
|
def name(part_idx):
|
|
return f'{fn}.parquet'
|
|
|
|
df = df.set_index('calculated_time', drop=True, sort=False, sorted=True)
|
|
|
|
|
|
|
|
df.to_parquet(path=target_directory,
|
|
engine='pyarrow',
|
|
partition_on=['date', 'hour'],
|
|
coerce_timestamps='us',
|
|
allow_truncated_timestamps=True,
|
|
name_function=name,
|
|
write_index=True,
|
|
append=False)
|
|
del df
|
|
else:
|
|
warnings.warn("tmp_hk empty or not existing")
|
|
|
|
|
|
else:
|
|
raise ValueError("No CSV files found.")
|
|
|
|
|
|
# %% Functions to read sp2b files
|
|
|
|
@delayed
|
|
def read_sp2b_from_sp2xr_zipped_2(file_path, meta, target_directory):
|
|
labview_epoch = datetime.datetime(1904, 1, 1)
|
|
results = []
|
|
'''empty_file = pd.DataFrame(columns=['ch0', 'ch1', 'flag', 'time stamp',
|
|
'res1', 'event index', 'Time/10000',
|
|
'Time remainder', 'res5', 'res6',
|
|
'res7', 'res8', 'array_2_dim', 'array_2', 'Time new'])'''
|
|
|
|
empty_file = pd.DataFrame({'ch0': pd.Series(dtype='int'),
|
|
'ch1': pd.Series(dtype='int'),
|
|
'flag': pd.Series(dtype='int'),
|
|
'time stamp': pd.Series(dtype='int'),
|
|
'res1': pd.Series(dtype='int'),
|
|
'event index': pd.Series(dtype='int'),
|
|
'Time/10000': pd.Series(dtype='int'),
|
|
'Time remainder': pd.Series(dtype='int'),
|
|
'res5': pd.Series(dtype='int'),
|
|
'res6': pd.Series(dtype='int'),
|
|
'res7': pd.Series(dtype='int'),
|
|
'res8': pd.Series(dtype='int'),
|
|
'array_2_dim': pd.Series(dtype='int'),
|
|
'array_2': pd.Series(dtype='int'),
|
|
'Time new': pd.Series(dtype='int'),
|
|
})
|
|
|
|
|
|
def process_block(f, file_size):
|
|
while f.tell() < file_size:
|
|
initial_pos = f.tell()
|
|
size_2d_array = struct.unpack("> 2i", f.read(8))
|
|
|
|
if size_2d_array[0] == 0:
|
|
# End of data marker found
|
|
break
|
|
|
|
expected_data_size = size_2d_array[0] * size_2d_array[1] * 4
|
|
|
|
if f.tell() + expected_data_size + 2 + 7*4 + 2*8 + 4 > file_size:
|
|
# Not enough data left for the expected block size, indicating incomplete block
|
|
break
|
|
|
|
# If sufficient data is present, proceed to read and unpack the block as before
|
|
array_data = struct.unpack(f"> {size_2d_array[1] * size_2d_array[0]}i", f.read(expected_data_size))
|
|
event_df = pd.DataFrame({'ch0': [array_data[::2]], 'ch1': [array_data[1::2]]})
|
|
event_df[['flag']] = struct.unpack("> H", f.read(2))
|
|
time_stamp, res1, event_index, Time_div10000, Time_remainder, res5, res6 = struct.unpack("> 7f", f.read(7 * 4))
|
|
event_df[['time stamp', 'res1', 'event index', 'Time/10000', 'Time remainder', 'res5', 'res6']] = time_stamp, res1, event_index, Time_div10000, Time_remainder, res5, res6
|
|
event_df[['res7', 'res8']] = struct.unpack("> 2d", f.read(2 * 8))
|
|
array2_dim = struct.unpack("> i", f.read(4))[0]
|
|
event_df['array_2_dim'] = array2_dim
|
|
array_data_2 = [struct.unpack(f"< {array2_dim}i", f.read(array2_dim * 4))]
|
|
event_df['array_2'] = [array_data_2]
|
|
event_df['Time new'] = labview_epoch + datetime.timedelta(seconds=Time_div10000*10000+Time_remainder)
|
|
|
|
results.append(event_df)
|
|
|
|
if file_path[-3:] == 'zip':
|
|
try:
|
|
with zipfile.ZipFile(file_path, 'r') as zip_ref:
|
|
for file_name in zip_ref.namelist():
|
|
with zip_ref.open(file_name, 'r') as f:
|
|
file_size = zip_ref.getinfo(file_name).file_size
|
|
if file_size > 100000:
|
|
process_block(f, file_size)
|
|
else:
|
|
print(f'{file_path} is too small')
|
|
results.append(empty_file)
|
|
except zipfile.BadZipFile:
|
|
print(f'!! Bad file: {file_path}')
|
|
results.append(empty_file)
|
|
elif file_path[-3:] == 'p2b':
|
|
file_size = os.path.getsize(file_path)
|
|
if file_size > 100:
|
|
with open(file_path, 'rb') as f:
|
|
process_block(f, file_size)
|
|
else:
|
|
results.append(empty_file)
|
|
results = pd.concat(results, axis=0)
|
|
if not results.empty:
|
|
# results = pd.concat(results, axis=0)
|
|
ch0 = pd.DataFrame(results['ch0'].to_list(), index=results['Time new'])
|
|
ch1 = pd.DataFrame(results['ch1'].to_list(), index=results['Time new'])
|
|
orig_time_stamp = pd.DataFrame(results['time stamp'].to_list(), index=results['Time new'], columns=['orig_time_stamp'])
|
|
|
|
|
|
if 8182 in ch0:
|
|
wrong_lines = ch0[ch0.loc[:, 8182].notna()].index
|
|
ch0.drop(wrong_lines, inplace=True)
|
|
ch0.dropna(axis=1, how='all', inplace=True)
|
|
ch1.drop(wrong_lines, inplace=True)
|
|
ch1.dropna(axis=1, how='all', inplace=True)
|
|
orig_time_stamp.drop(wrong_lines, inplace=True)
|
|
orig_time_stamp.dropna(axis=1, how='all', inplace=True)
|
|
|
|
ch_all = pd.merge(ch0, ch1, how="outer", suffixes=("_ch0", "_ch1"), left_index=True, right_index=True)
|
|
ch_all['path'] = file_path
|
|
ch_all2 = pd.merge(ch_all, orig_time_stamp, left_index=True, right_index=True, how="outer")
|
|
#ch_all['orig_time_stamp'] = orig_time_stamp#.loc[ch_all.index]
|
|
#return ch_all2 #dd.from_pandas(ch_all2, npartitions=1)
|
|
|
|
|
|
if not ch_all2.empty:
|
|
ch_all2 = dd.from_pandas(ch_all2, npartitions=1)
|
|
ch_all2['Time'] = ch_all2.index.map_partitions(lambda index: index, meta=('Time', 'datetime64[ns]'))
|
|
ch_all2['Time_round'] = ch_all2['Time'].dt.round('min').astype('datetime64[ns]')
|
|
#combined_ddf['date'] = combined_ddf.index.map_partitions(lambda index: index.normalize(), meta=('date', 'datetime64[ns]'))
|
|
# combined_ddf['hour'] = combined_ddf.index.map_partitions(lambda index: index.hour, meta=('hour', 'i8'))
|
|
ch_all2["date"] = ch_all2["Time"].dt.date.astype("date64[pyarrow]")
|
|
ch_all2['hour'] = ch_all2['Time'].dt.hour.astype('i8')
|
|
|
|
|
|
|
|
|
|
def extract_folder_name(df):
|
|
return df['path'].split('\\')[-1].split('_')[-2]
|
|
ch_all2['folder_name'] = ch_all2.apply(extract_folder_name, axis=1, meta=('folder_name', 'str'))
|
|
|
|
|
|
def extract_file_name(df):
|
|
return df['path'].split('\\')[-1].split('_')[-2] + '_' + df['path'].split('\\')[-1].split('_')[-1].split('.')[-2]
|
|
ch_all2['file'] = ch_all2.apply(extract_file_name, axis=1, meta=('file', 'str'))
|
|
|
|
|
|
|
|
fn = file_path.split('\\')[-1].split('_')[-2] + '_' + file_path.split('\\')[-1].split('_')[-1].split('.')[-2]
|
|
def name(part_idx):
|
|
return f'{fn}.parquet'
|
|
|
|
ch_all2.to_parquet(path=target_directory,
|
|
engine='pyarrow',
|
|
partition_on=['date', 'hour'],
|
|
name_function=name,
|
|
write_index=True,
|
|
append=False, schema='infer')
|
|
|
|
|
|
else:
|
|
return meta
|
|
|
|
|
|
|
|
def read_sp2b_from_sp2xr_zipped(file_path, meta):
|
|
labview_epoch = datetime.datetime(1904, 1, 1)
|
|
results = []
|
|
'''empty_file = pd.DataFrame(columns=['ch0', 'ch1', 'flag', 'time stamp',
|
|
'res1', 'event index', 'Time/10000',
|
|
'Time remainder', 'res5', 'res6',
|
|
'res7', 'res8', 'array_2_dim', 'array_2', 'Time new'])'''
|
|
|
|
empty_file = pd.DataFrame({'ch0': pd.Series(dtype='int'),
|
|
'ch1': pd.Series(dtype='int'),
|
|
'flag': pd.Series(dtype='int'),
|
|
'time stamp': pd.Series(dtype='int'),
|
|
'res1': pd.Series(dtype='int'),
|
|
'event index': pd.Series(dtype='int'),
|
|
'Time/10000': pd.Series(dtype='int'),
|
|
'Time remainder': pd.Series(dtype='int'),
|
|
'res5': pd.Series(dtype='int'),
|
|
'res6': pd.Series(dtype='int'),
|
|
'res7': pd.Series(dtype='int'),
|
|
'res8': pd.Series(dtype='int'),
|
|
'array_2_dim': pd.Series(dtype='int'),
|
|
'array_2': pd.Series(dtype='int'),
|
|
'Time new': pd.Series(dtype='int'),
|
|
})
|
|
|
|
|
|
def process_block(f, file_size):
|
|
while f.tell() < file_size:
|
|
initial_pos = f.tell()
|
|
size_2d_array = struct.unpack("> 2i", f.read(8))
|
|
|
|
if size_2d_array[0] == 0:
|
|
# End of data marker found
|
|
break
|
|
|
|
expected_data_size = size_2d_array[0] * size_2d_array[1] * 4
|
|
|
|
if f.tell() + expected_data_size + 2 + 7*4 + 2*8 + 4 > file_size:
|
|
# Not enough data left for the expected block size, indicating incomplete block
|
|
break
|
|
|
|
# If sufficient data is present, proceed to read and unpack the block as before
|
|
array_data = struct.unpack(f"> {size_2d_array[1] * size_2d_array[0]}i", f.read(expected_data_size))
|
|
event_df = pd.DataFrame({'ch0': [array_data[::2]], 'ch1': [array_data[1::2]]})
|
|
event_df[['flag']] = struct.unpack("> H", f.read(2))
|
|
time_stamp, res1, event_index, Time_div10000, Time_remainder, res5, res6 = struct.unpack("> 7f", f.read(7 * 4))
|
|
event_df[['time stamp', 'res1', 'event index', 'Time/10000', 'Time remainder', 'res5', 'res6']] = time_stamp, res1, event_index, Time_div10000, Time_remainder, res5, res6
|
|
event_df[['res7', 'res8']] = struct.unpack("> 2d", f.read(2 * 8))
|
|
array2_dim = struct.unpack("> i", f.read(4))[0]
|
|
event_df['array_2_dim'] = array2_dim
|
|
array_data_2 = [struct.unpack(f"< {array2_dim}i", f.read(array2_dim * 4))]
|
|
event_df['array_2'] = [array_data_2]
|
|
event_df['Time new'] = labview_epoch + datetime.timedelta(seconds=Time_div10000*10000+Time_remainder)
|
|
|
|
results.append(event_df)
|
|
|
|
if file_path[-3:] == 'zip':
|
|
try:
|
|
with zipfile.ZipFile(file_path, 'r') as zip_ref:
|
|
for file_name in zip_ref.namelist():
|
|
with zip_ref.open(file_name, 'r') as f:
|
|
file_size = zip_ref.getinfo(file_name).file_size
|
|
if file_size > 100:
|
|
process_block(f, file_size)
|
|
else:
|
|
results.append(empty_file)
|
|
except zipfile.BadZipFile:
|
|
print(f'!! Bad file: {file_path}')
|
|
results.append(empty_file)
|
|
elif file_path[-3:] == 'p2b':
|
|
file_size = os.path.getsize(file_path)
|
|
if file_size > 100:
|
|
with open(file_path, 'rb') as f:
|
|
process_block(f, file_size)
|
|
else:
|
|
results.append(empty_file)
|
|
results = pd.concat(results, axis=0)
|
|
if not results.empty:
|
|
# results = pd.concat(results, axis=0)
|
|
ch0 = pd.DataFrame(results['ch0'].to_list(), index=results['Time new'])
|
|
ch1 = pd.DataFrame(results['ch1'].to_list(), index=results['Time new'])
|
|
orig_time_stamp = pd.DataFrame(results['time stamp'].to_list(), index=results['Time new'], columns=['orig_time_stamp'])
|
|
|
|
|
|
if 8182 in ch0:
|
|
wrong_lines = ch0[ch0.loc[:, 8182].notna()].index
|
|
ch0.drop(wrong_lines, inplace=True)
|
|
ch0.dropna(axis=1, how='all', inplace=True)
|
|
ch1.drop(wrong_lines, inplace=True)
|
|
ch1.dropna(axis=1, how='all', inplace=True)
|
|
orig_time_stamp.drop(wrong_lines, inplace=True)
|
|
orig_time_stamp.dropna(axis=1, how='all', inplace=True)
|
|
|
|
ch_all = pd.merge(ch0, ch1, how="outer", suffixes=("_ch0", "_ch1"), left_index=True, right_index=True)
|
|
ch_all['path'] = file_path
|
|
ch_all2 = pd.merge(ch_all, orig_time_stamp, left_index=True, right_index=True, how="outer")
|
|
#ch_all['orig_time_stamp'] = orig_time_stamp#.loc[ch_all.index]
|
|
return ch_all2 #dd.from_pandas(ch_all2, npartitions=1)
|
|
else:
|
|
return meta
|
|
|
|
def read_and_process_sp2b(matches, target_directory, meta_file):
|
|
if len(matches) > 0:
|
|
|
|
delayed_results = [delayed(read_sp2b_from_sp2xr_zipped)(file, meta_file) for file in matches]
|
|
combined_ddf = dd.from_delayed(delayed_results)#.persist()
|
|
# ddf = [read_sp2b_from_sp2xr_zipped(file, meta_file) for file in matches]
|
|
# combined_ddf = dd.concat(ddf)
|
|
|
|
if combined_ddf.npartitions>0:
|
|
combined_ddf['Time'] = combined_ddf.index.map_partitions(lambda index: index, meta=('Time', 'datetime64[ns]'))
|
|
combined_ddf['Time_round'] = combined_ddf['Time'].dt.round('min').astype('datetime64[ns]')
|
|
#combined_ddf['date'] = combined_ddf.index.map_partitions(lambda index: index.normalize(), meta=('date', 'datetime64[ns]'))
|
|
# combined_ddf['hour'] = combined_ddf.index.map_partitions(lambda index: index.hour, meta=('hour', 'i8'))
|
|
combined_ddf["date"] = combined_ddf["Time"].dt.date.astype("date64[pyarrow]")
|
|
combined_ddf['hour'] = combined_ddf['Time'].dt.hour.astype('i8')
|
|
|
|
|
|
# combined_ddf.to_parquet(path=target_directory,
|
|
# engine='pyarrow',
|
|
# partition_on=['date','hour'],
|
|
# #save_divisons=True,
|
|
# append=True,
|
|
# write_index=True)
|
|
|
|
|
|
def extract_folder_name(df):
|
|
return df['path'].split('\\')[-1].split('_')[-2]
|
|
combined_ddf['folder_name'] = combined_ddf.apply(extract_folder_name, axis=1, meta=('folder_name', 'str'))
|
|
|
|
|
|
def extract_file_name(df):
|
|
return df['path'].split('\\')[-1].split('_')[-2] + '_' + df['path'].split('\\')[-1].split('_')[-1].split('.')[-2]
|
|
combined_ddf['file'] = combined_ddf.apply(extract_file_name, axis=1, meta=('file', 'str'))
|
|
|
|
|
|
# def extract_first_element(df):
|
|
# return df['orig_file_name2'].head(1)
|
|
# first_elements = combined_ddf.map_partitions(extract_first_element).compute().to_list() # !!! can i remove this compute?
|
|
|
|
|
|
# def name_function(part_idx):
|
|
# return f"{first_elements[part_idx]}.parquet"
|
|
|
|
fn = file_path.split('\\')[-1].split('_')[-2] + '_' + file_path.split('\\')[-1].split('_')[-1].split('.')[-2]
|
|
def name(part_idx):
|
|
return f'{fn}.parquet'
|
|
|
|
combined_ddf.to_parquet(path=target_directory,
|
|
engine='pyarrow',
|
|
partition_on=['date', 'hour'],
|
|
name_function=name,
|
|
write_index=True,
|
|
append=False, schema='infer')
|
|
|
|
|
|
|
|
return combined_ddf
|
|
return None
|
|
return None
|
|
|
|
|
|
def process_sp2b_parquet(ddf_sp2xr,
|
|
scatt_sat_threshold = 1e9,
|
|
inc_sat_threshold = 1e8,
|
|
scatt_noise_threshold = 1e4,
|
|
|
|
end_bkgr_ch0 = 100,
|
|
end_bkgr_ch1 = 350,
|
|
output_path = ''
|
|
):
|
|
|
|
|
|
|
|
timestr = time.strftime("%Y%m%d-%H%M%S")
|
|
|
|
|
|
|
|
|
|
ch0_columns = [col for col in ddf_sp2xr.columns if '_ch0' in col]
|
|
ch0_sp2xr = ddf_sp2xr[ch0_columns]
|
|
ch0_sp2xr = ch0_sp2xr.rename(columns=lambda col: int(col.replace('_ch0', '')))
|
|
#ch0_sp2xr.index = ddf_sp2xr['Time new']
|
|
ch0_sp2xr = ch0_sp2xr.sub(ch0_sp2xr.loc[:, :end_bkgr_ch0].mean(axis=1), axis=0)
|
|
|
|
ch1_columns = [col for col in ddf_sp2xr.columns if '_ch1' in col]
|
|
ch1_sp2xr = ddf_sp2xr[ch1_columns]
|
|
ch1_sp2xr = ch1_sp2xr.rename(columns=lambda col: int(col.replace('_ch1', '')))
|
|
#ch1_sp2xr.index = ddf_sp2xr['Time new']
|
|
ch1_sp2xr = ch1_sp2xr.sub(ch1_sp2xr.loc[:, :end_bkgr_ch1].mean(axis=1), axis=0)
|
|
|
|
ch0_sp2xr_array = ch0_sp2xr.values
|
|
ch1_sp2xr_array = ch1_sp2xr.values
|
|
|
|
max_values_ch0 = ch0_sp2xr_array.max(axis=1)
|
|
max_values_ch1 = ch1_sp2xr_array.max(axis=1)
|
|
|
|
max_indices_ch0 = ch0_sp2xr_array.argmax(axis=1) # location of first occurence is returned if maximum value is repeated
|
|
max_indices_ch1 = ch1_sp2xr_array.argmax(axis=1)
|
|
|
|
flag_000 = (max_indices_ch1 > 0)
|
|
flag_111 = (max_values_ch0 < scatt_sat_threshold)
|
|
flag_222 = (max_values_ch1 < inc_sat_threshold)
|
|
flag_333 = (max_indices_ch1 >= max_indices_ch0)
|
|
|
|
array_test = ch0_sp2xr_array
|
|
sub_test = max_indices_ch1[:,None]
|
|
mask = sub_test>np.arange(array_test.shape[1])
|
|
array_test[~mask]=-1e3
|
|
|
|
max_values_ch0_modified = array_test.max(axis=1)
|
|
max_indices_ch0_modified = array_test.argmax(axis=1)
|
|
flag_444 = np.logical_or(flag_333, (max_values_ch0_modified > scatt_noise_threshold))
|
|
|
|
|
|
time_lag = max_indices_ch1 - max_indices_ch0
|
|
time_lag_modified = max_indices_ch1 - max_indices_ch0_modified
|
|
|
|
|
|
to_concat = [ddf_sp2xr.index.values, max_values_ch0, max_values_ch1,
|
|
max_indices_ch0, max_indices_ch1,
|
|
max_values_ch0_modified, max_indices_ch0_modified,
|
|
flag_000, flag_111, flag_222, flag_333, flag_444,
|
|
time_lag, time_lag_modified]
|
|
|
|
ddf = dd.concat([dd.from_dask_array(c) for c in to_concat], axis=1)
|
|
#ddf.index = ddf_sp2xr['Time new']
|
|
ddf.columns = ['Time new', 'max_values_ch0', 'max_values_ch1',
|
|
'max_indices_ch0', 'max_indices_ch1',
|
|
'max_values_ch0_modified', 'max_indices_ch0_modified',
|
|
'flag_000', 'flag_111', 'flag_222', 'flag_333', 'flag_444',
|
|
'time_lag', 'time_lag_modified']
|
|
ddf["date"] = ddf['Time new'].dt.date.astype("date64[pyarrow]")
|
|
ddf['hour'] = ddf['Time new'].dt.hour.astype('i8')
|
|
metadata = {
|
|
'author': 'Barbara Bertozzi',
|
|
'created': timestr,
|
|
'scatt_sat_threshold': str(scatt_sat_threshold),
|
|
'inc_sat_threshold': str(inc_sat_threshold),
|
|
'scatt_noise_threshold': str(scatt_noise_threshold),
|
|
'end_bkgr_ch0': str(end_bkgr_ch0),
|
|
'end_bkgr_ch1': str(end_bkgr_ch1)
|
|
}
|
|
|
|
ddf.to_parquet(path=output_path,
|
|
partition_on=['date', 'hour'],
|
|
engine='pyarrow', write_index=True, custom_metadata=metadata, write_metadata_file=True)
|
|
|
|
return ddf
|
|
|
|
|
|
|
|
# %% Functions for BC calibration materials and conversions (mass to diam, diam to mass, ...)
|
|
|
|
#%% Convertions mass to D , N to mass
|
|
|
|
def mass2meqDiam(mass_array, rho=1800):
|
|
"""
|
|
Calculate mass equivalent diameters for the input array of mass values.
|
|
inputs: mass_array values in fg, particle density in kg m-3 (rho)
|
|
outputs: array of mass equivalent diameters in nm
|
|
"""
|
|
Dp = np.array([1e9*(6*m*1e-18/rho/np.pi)**(1/3) for m in mass_array])
|
|
return Dp
|
|
|
|
def dNdlogDp_to_dMdlogDp(N, Dp, rho=1800):
|
|
''' This is from functions.py from Rob'''
|
|
"""
|
|
Given a normalized number size distribution [dN/dlogDp, cm^-3] defined over the diameters given by Dp [nm]
|
|
return the corresponding the mass size distribution [dM/dlogDp, ug/m3]
|
|
|
|
Optional density input given in units of kg/m3
|
|
"""
|
|
#M = N*((np.pi*(Dp*1e-9)**3)/6)*rho*1e18
|
|
M = N * np.pi / 6 * rho * (Dp**3) * 1e-12
|
|
return M
|
|
|
|
|
|
def BC_mass_to_diam(mass, rho_eff=1800, BC_type=''):
|
|
'''
|
|
Thi function calculates the effective density and mobility (volume equivalent?) diameter
|
|
for BC calibration materials applying different parameterizations.
|
|
|
|
Parameters
|
|
----------
|
|
mass : float or array of floats
|
|
BC mass in fg.
|
|
rho_eff : float, optional
|
|
Effective density [kg/m3] to use in case BC_type='constant_mobility_density'. The default is 1800.
|
|
BC_type : string, optional
|
|
Identification string for the effective density parameterization to use. The default is ''.
|
|
|
|
Raises
|
|
------
|
|
ValueError
|
|
If the string provided in BC_type is not inlcuded in the list below an error is raised.
|
|
|
|
Returns
|
|
-------
|
|
density : float of array of floats
|
|
Effective density [kg/m3] corresponding to the input mass(es).
|
|
diam : float or array of floats
|
|
Diameter [nm] corresponding to the mass and effective density.
|
|
|
|
'''
|
|
if BC_type == 'constant_effective_density':
|
|
density = np.zeros_like(mass)+rho_eff
|
|
diam = 1e3*((6*mass)/(np.pi*rho_eff))**(1/3)
|
|
elif BC_type == 'Aquadag_RCAST_2009':
|
|
SplineCoefM2D, nSplineSegmM2D = Aquadag_RhoVsLogMspline_Var25(SplineCoef=[])
|
|
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
|
|
elif BC_type == 'Aquadag_Moteki_AST2010':
|
|
SplineCoefM2D, nSplineSegmM2D = Aquadag_RhoVsLogMspline_Var25(SplineCoef=[]) # this line only loads the coefficients from Moteki 2010
|
|
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
|
|
elif BC_type == 'Aquadag_PSI_2010':
|
|
SplineCoefM2D, nSplineSegmM2D = Aquadag_RhoVsLogMspline_Var8(SplineCoef=[])
|
|
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
|
|
elif BC_type == 'FS_RCAST_2009':
|
|
SplineCoefM2D, nSplineSegmM2D = Fullerene_RhoVsLogMspline_Var2(SplineCoef=[])
|
|
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
|
|
elif BC_type == 'FS_Moteki_2010':
|
|
SplineCoefM2D, nSplineSegmM2D = Fullerene_RhoVsLogMspline_Var5(SplineCoef=[])
|
|
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
|
|
elif BC_type == 'FS_PSI_2010':
|
|
SplineCoefM2D, nSplineSegmM2D = Fullerene_RhoVsLogMspline_Var8(SplineCoef=[])
|
|
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
|
|
elif BC_type == 'FS_PSI_2010_old':
|
|
SplineCoefM2D, nSplineSegmM2D = Fullerene_RhoVsLogMspline_Var8_old(SplineCoef=[])
|
|
density = SP2_calibCurveSpline(mass, SplineCoefM2D)
|
|
elif BC_type == 'Glassy_Carbon':
|
|
diam = GlassyCarbonAlpha_Mass2Diam(mass)
|
|
else:
|
|
raise ValueError("Calibration material not defined")
|
|
|
|
diam = 1e3*(6*mass/np.pi/density)**(1/3) # 1e3: unit conversion
|
|
|
|
return density, diam
|
|
|
|
|
|
def BC_diam_to_mass(diam, rho_eff=1800, BC_type=''):
|
|
|
|
if BC_type == '': # pers comm Kondo 2009, spline fit
|
|
SplineCoefD2M, nSplineSegmD2M = Fullerene_RhoVsLogDspline_Var2(SplineCoef=[])
|
|
density = SP2_calibCurveSpline(diam, SplineCoefD2M)
|
|
elif BC_type == 'FS_Moteki_2010': # Moteki et al., AST, 2010
|
|
SplineCoefD2M, nSplineSegmD2M = Fullerene_RhoVsLogDspline_Var5(SplineCoef=[])
|
|
density = SP2_calibCurveSpline(diam, SplineCoefD2M)
|
|
elif BC_type == 'FS_PSI_2010':
|
|
SplineCoefD2M, nSplineSegmD2M = Fullerene_RhoVsLogDspline_Var8(SplineCoef=[])
|
|
density = SP2_calibCurveSpline(diam, SplineCoefD2M)
|
|
elif BC_type == 'FS_PSI_2010_old':
|
|
SplineCoefD2M, nSplineSegmD2M = Fullerene_RhoVsLogDspline_Var8_old(SplineCoef=[])
|
|
density = SP2_calibCurveSpline(diam, SplineCoefD2M)
|
|
|
|
else:
|
|
raise ValueError("Calibration material not defined")
|
|
|
|
mass = 1e-9 * density * np.pi / 6 * np.array(diam)**3
|
|
|
|
return density, mass
|
|
|
|
|
|
def SP2_calibCurveSpline(mass, SplineCoef):
|
|
'''
|
|
Given the mass and the spline coefficients list, it calculates the corresponding density.
|
|
|
|
Parameters
|
|
----------
|
|
mass : float or array of floats
|
|
BC mass [fg].
|
|
SplineCoefM2D : list of floats
|
|
Coefficients and lowercuts of th espline to be used.
|
|
|
|
Returns
|
|
-------
|
|
values : float or array of floats
|
|
BC effective density [kg/m3] corresponding to the mass and specific parameterization.
|
|
|
|
'''
|
|
coefficients_array = np.array(SplineCoef)
|
|
segments = np.delete(coefficients_array, np.arange(3, len(coefficients_array), 4))
|
|
segments_2 = np.array([segments[i:i+3] for i in range(0, len(segments), 3)])[::-1]
|
|
lowercuts = coefficients_array[3::4]
|
|
sorted_lowercuts = np.sort(np.append(lowercuts, [np.log10(mass[0]), np.log10(mass[-1])]))
|
|
|
|
values = np.zeros_like(mass) # Initialize an array to store interpolated values
|
|
|
|
for i in range(len(sorted_lowercuts) - 1):
|
|
if i == (len(sorted_lowercuts)-2):
|
|
mask = (np.log10(mass) >= sorted_lowercuts[i]) & (np.log10(mass) <= sorted_lowercuts[i + 1])
|
|
else:
|
|
mask = (np.log10(mass) >= sorted_lowercuts[i]) & (np.log10(mass) < sorted_lowercuts[i + 1])
|
|
if np.any(mask):
|
|
poly = Polynomial(segments_2[i])
|
|
# values[mask] = poly(np.log10(mass[mask]))
|
|
values[mask] = poly(np.log10(np.array(mass)[mask]))
|
|
|
|
return values
|
|
|
|
|
|
def Aquadag_RhoVsLogMspline_Var25(SplineCoef=[]):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
SplineCoef.append(1392.54)
|
|
SplineCoef.append(-618.898)
|
|
SplineCoef.append(104.364)
|
|
SplineCoef.append(1.87)
|
|
SplineCoef.append(689)
|
|
SplineCoef.append(133.555)
|
|
SplineCoef.append(-96.8268)
|
|
SplineCoef.append(0.64)
|
|
SplineCoef.append(785.701)
|
|
SplineCoef.append(-168.635)
|
|
SplineCoef.append(139.258)
|
|
return SplineCoef, int(SP2_calibCurveSplineCheck(SplineCoef))#SP2_calibCurveSplineCheck(SplineCoef)
|
|
|
|
|
|
def Aquadag_RhoVsLogMspline_Var8(SplineCoef=[]):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
SplineCoef.append(1662.41)
|
|
SplineCoef.append(-1087.41)
|
|
SplineCoef.append(214.636)
|
|
SplineCoef.append(1.7)
|
|
SplineCoef.append(689.745)
|
|
SplineCoef.append(56.897)
|
|
SplineCoef.append(-121.925)
|
|
SplineCoef.append(0.33)
|
|
SplineCoef.append(720.248)
|
|
SplineCoef.append(-127.967)
|
|
SplineCoef.append(158.172)
|
|
SplineCoef.append(-1.23)
|
|
SplineCoef.append(299.662)
|
|
SplineCoef.append(-811.846)
|
|
SplineCoef.append(-119.828)
|
|
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
|
|
|
|
|
|
def Fullerene_RhoVsLogMspline_Var2(SplineCoef=[]):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
SplineCoef.append(1180.1)
|
|
SplineCoef.append(-540.043)
|
|
SplineCoef.append(106.852)
|
|
SplineCoef.append(1.8)
|
|
SplineCoef.append(713.102)
|
|
SplineCoef.append(-21.1519)
|
|
SplineCoef.append(-37.2841)
|
|
SplineCoef.append(0.55)
|
|
SplineCoef.append(916.24)
|
|
SplineCoef.append(-759.835)
|
|
SplineCoef.append(634.246)
|
|
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
|
|
|
|
def Fullerene_RhoVsLogMspline_Var5(SplineCoef=[]):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
SplineCoef.append(1539.19)
|
|
SplineCoef.append(-904.484)
|
|
SplineCoef.append(189.201)
|
|
SplineCoef.append(1.83)
|
|
SplineCoef.append(503.999)
|
|
SplineCoef.append(226.877)
|
|
SplineCoef.append(-119.914)
|
|
SplineCoef.append(0.8)
|
|
SplineCoef.append(718.903)
|
|
SplineCoef.append(-310.384)
|
|
SplineCoef.append(215.874)
|
|
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
|
|
|
|
def Fullerene_RhoVsLogMspline_Var8(SplineCoef=[]):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
SplineCoef.append(62.7393)
|
|
SplineCoef.append(781.804)
|
|
SplineCoef.append(-325.113)
|
|
SplineCoef.append(1.6)
|
|
SplineCoef.append(652.935)
|
|
SplineCoef.append(44.0588)
|
|
SplineCoef.append(-94.5682)
|
|
SplineCoef.append(0.6)
|
|
SplineCoef.append(750.496)
|
|
SplineCoef.append(-281.145)
|
|
SplineCoef.append(176.435)
|
|
SplineCoef.append(-0.7)
|
|
SplineCoef.append(573.019)
|
|
SplineCoef.append(-788.222)
|
|
SplineCoef.append(-185.763)
|
|
SplineCoef.append(-1.7)
|
|
SplineCoef.append(1016.31)
|
|
SplineCoef.append(-266.707)
|
|
SplineCoef.append(-32.3765)
|
|
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
|
|
|
|
def Fullerene_RhoVsLogMspline_Var8_old(SplineCoef=[]):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
SplineCoef.append(984.914)
|
|
SplineCoef.append(-468.01)
|
|
SplineCoef.append(101)
|
|
SplineCoef.append(1.19)
|
|
SplineCoef.append(568.667)
|
|
SplineCoef.append(231.565)
|
|
SplineCoef.append(-192.939)
|
|
SplineCoef.append(0.715)
|
|
SplineCoef.append(747.715)
|
|
SplineCoef.append(-269.27)
|
|
SplineCoef.append(157.295)
|
|
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
|
|
|
|
def GlassyCarbonAlpha_Mass2Diam(mass):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
diam = 1e3* ( 6 * mass / np.pi / 1420 )**(1/3)
|
|
return diam
|
|
|
|
def Fullerene_RhoVsLogDspline_Var2(SplineCoef=[]):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
|
|
# units: rho [kg/m³]; diameter [nm]
|
|
# experimental mobility/mass data taken from: personal communication with the Kondo research group on 04/09/2009;
|
|
SplineCoef.append(8799.39)
|
|
SplineCoef.append(-5478.89)
|
|
SplineCoef.append(904.097)
|
|
SplineCoef.append(2.8)
|
|
SplineCoef.append(-115.387)
|
|
SplineCoef.append(888.81)
|
|
SplineCoef.append(-232.992)
|
|
SplineCoef.append(2.35)
|
|
SplineCoef.append(17587.6)
|
|
SplineCoef.append(-14177.6)
|
|
SplineCoef.append(2972.62)
|
|
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
|
|
|
|
def Fullerene_RhoVsLogDspline_Var5(SplineCoef=[]):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
|
|
# units: rho [kg/m³]; diameter [nm]
|
|
# note: experimental mobility/mass data taken from: Moteki et al., Aerosol Sci. Technol., 44, 663-675, 2010;
|
|
SplineCoef.append(13295.3)
|
|
SplineCoef.append(-8550.29)
|
|
SplineCoef.append(1423.82)
|
|
SplineCoef.append(2.8)
|
|
SplineCoef.append(-4979.54)
|
|
SplineCoef.append(4503.18)
|
|
SplineCoef.append(-907.154)
|
|
SplineCoef.append(2.45)
|
|
SplineCoef.append(8500)
|
|
SplineCoef.append(-6500.53)
|
|
SplineCoef.append(1338.5)
|
|
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
|
|
|
|
|
|
def Fullerene_RhoVsLogDspline_Var8(SplineCoef):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
|
|
# units: rho [kg/m³]; diameter [nm]
|
|
# note: experimental mobility/mass data taken from: PSI data for fullerene soot sample obtained from Shuka Schwarz, measured at ETH in Oct. 2010;
|
|
SplineCoef.append(-2281.86)
|
|
SplineCoef.append(2687.72)
|
|
SplineCoef.append(-613.973)
|
|
SplineCoef.append(2.735)
|
|
SplineCoef.append(-1987.85)
|
|
SplineCoef.append(2472.72)
|
|
SplineCoef.append(-574.667)
|
|
SplineCoef.append(2.35)
|
|
SplineCoef.append(7319.9)
|
|
SplineCoef.append(-5448.76)
|
|
SplineCoef.append(1110.76)
|
|
SplineCoef.append(1.85)
|
|
SplineCoef.append(-520.822)
|
|
SplineCoef.append(3027.69)
|
|
SplineCoef.append(-1180.18)
|
|
SplineCoef.append(1.48)
|
|
SplineCoef.append(1421.03)
|
|
SplineCoef.append(403.564)
|
|
SplineCoef.append(-293.649)
|
|
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
|
|
|
|
|
|
def Fullerene_RhoVsLogDspline_Var8_old(SplineCoef):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
|
|
# units: rho [kg/m³]; diameter [nm]
|
|
# note: experimental mobility/mass data taken from: PSI data for fullerene soot sample obtained from Shuka Schwarz, measured at ETH in Oct. 2010;
|
|
SplineCoef.append(6744.81)
|
|
SplineCoef.append(-4200.82)
|
|
SplineCoef.append(700)
|
|
SplineCoef.append(2.57)
|
|
SplineCoef.append(-6902.08)
|
|
SplineCoef.append(6419.32)
|
|
SplineCoef.append(-1366.18)
|
|
SplineCoef.append(2.4)
|
|
SplineCoef.append(7288.07)
|
|
SplineCoef.append(-5405.8)
|
|
SplineCoef.append(1097.39)
|
|
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
|
|
|
|
def Aquadag_RhoVsLogDspline_Var25(SplineCoef):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
|
|
# units: rho [kg/m³]; diameter [nm]
|
|
# note: experimental mobility/mass data from: Moteki et al., Aerosol Sci. Technol., 44, 663-675, 2010 (and they agree with pers. comm. from 04/09/2009)
|
|
SplineCoef.append(10000)
|
|
SplineCoef.append(-6088.15)
|
|
SplineCoef.append(974.717)
|
|
SplineCoef.append(2.8)
|
|
SplineCoef.append(-3200)
|
|
SplineCoef.append(3340.42)
|
|
SplineCoef.append(-708.956)
|
|
SplineCoef.append(2.35)
|
|
SplineCoef.append(6490.76)
|
|
SplineCoef.append(-4907.03)
|
|
SplineCoef.append(1045.82)
|
|
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
|
|
|
|
def Aquadag_RhoVsLogDspline_Var8(SplineCoef):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
|
|
# units: rho [kg/m³]; diameter [nm]
|
|
# note: experimental mobility/mass data taken from: "grand average" of PSI's (APM at ETH in Oct. 2010) and DMT's (Subu; CPMA in 2011 by XXXX) data for classic and new aquadag batches;
|
|
SplineCoef.append(10092.9)
|
|
SplineCoef.append(-6171.83)
|
|
SplineCoef.append(970.172)
|
|
SplineCoef.append(2.75)
|
|
SplineCoef.append(-2627.45)
|
|
SplineCoef.append(3079.33)
|
|
SplineCoef.append(-711.856)
|
|
SplineCoef.append(2.25)
|
|
SplineCoef.append(6140.89)
|
|
SplineCoef.append(-4714.75)
|
|
SplineCoef.append(1020.16)
|
|
SplineCoef.append(1.65)
|
|
SplineCoef.append(1063.19)
|
|
SplineCoef.append(1440.04)
|
|
SplineCoef.append(-844.928)
|
|
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
|
|
|
|
def Aquadag_RhoVsLogDspline_Var8_old(SplineCoef):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
# This function creates the spline fit coefficients for the relationship: density versus log(diam)
|
|
# units: rho [kg/m³]; diameter [nm]
|
|
# note: experimental mobility/mass data taken from: "grand average" of PSI's (APM at ETH in Oct. 2010) and DMT's (Subu; CPMA in 2011 by XXXX) data for classic and new aquadag batches;
|
|
SplineCoef[0]=7900.57
|
|
SplineCoef[1]=-4964.2
|
|
SplineCoef[2]=830.317
|
|
SplineCoef[3]=2.6
|
|
SplineCoef[4]=-5371.15
|
|
SplineCoef[5]=5244.82
|
|
SplineCoef[6]=-1132.96
|
|
SplineCoef[7]=2.35
|
|
SplineCoef[8]=5809.57
|
|
SplineCoef[9]=-4270.69
|
|
SplineCoef[10]=891.62
|
|
SplineCoef[11]=1.6
|
|
SplineCoef[12]=904.355
|
|
SplineCoef[13]=1860.83
|
|
SplineCoef[14]=-1024.48
|
|
return SplineCoef, SP2_calibCurveSplineCheck(SplineCoef)
|
|
|
|
|
|
def GlassyCarbonAlpha_Diam2Mass(DiamMob):
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
# This function calculates the mass of a glassy carbon particle of a given mobility diameter.
|
|
# exact description: Glassy carbon spherical powder, 0.4-12 micron, type 2, Alpha Aesar Art. No. 38008
|
|
# note: the bulk density is taken from the supplier's data sheet and an email by Ronald Becht (Technical service, Alfa Aesar)
|
|
kGlassyCarbonAlphaRho = 1420 # kg/m3
|
|
mass = kGlassyCarbonAlphaRho * np.pi / 6 * DiamMob**3 * 1e-9
|
|
return mass
|
|
|
|
|
|
def SP2_calibCurveSplineCheck(CalibCoef):
|
|
# this function comes from the sp2 toolkit
|
|
# This function checks the dimension of the calibration coefficient wave.
|
|
nCoef = len(CalibCoef)
|
|
if np.mod(nCoef, 4) != 3:
|
|
raise ValueError("The number of coefficients handed over to the function must be 3,7,11, or ...!")
|
|
return np.ceil(nCoef/4)
|
|
|
|
|
|
|
|
# %% Function for calibration (part B)
|
|
|
|
def MultiChargeSizesEqualMob(diam, ne, TC=20, press=1013):
|
|
# This function returns the size of a particle carrying multiple charges but having the same electrical mobility as a singly charged particle with diameter diam.
|
|
# this function comes from the sp2 toolkit. Substantially it is a wrapper for the calib coeff
|
|
# diam in nm
|
|
if ne == 1:
|
|
return diam
|
|
else:
|
|
Zel = DiamToElectrMobility_Tp(1, TC, press, diam)
|
|
return ElectrMobilityToDiam_Tp(ne, TC, press, Zel)
|
|
|
|
|
|
def ElectrMobilityToDiam_Tp(ne, TC, press, Zel, precision=0.001, GasType=0):
|
|
# This function calculates the particle diameter (units: [nm]) as a function of the number of
|
|
# elementary charges, temperature, pressure, and electrical mobility
|
|
# T(C), press=mbar, Zel (electrical mobility [m²/Vs]), precision (%), Gas type default air
|
|
|
|
IterDiam = 100 # initial guess=100 nm diameter
|
|
Lambda = MeanFreePathBaron4_6(TC, press, GasType=GasType)
|
|
Visc = ViscosityOfGases(GasType, TC)
|
|
|
|
Cc = CunninghamSlipCorrBaron4_8(Lambda, IterDiam)
|
|
prevDiam = 1e9 * ne * 1.602e-19 * Cc / (3 * np.pi * Visc * Zel)
|
|
|
|
while ((np.abs(IterDiam - prevDiam) / min(IterDiam, prevDiam)) > (precision / 100)):
|
|
IterDiam = prevDiam
|
|
Cc = CunninghamSlipCorrBaron4_8(Lambda, prevDiam)
|
|
prevDiam = 1e9 * ne * 1.602e-19 * Cc / (3 * np.pi * Visc * Zel)
|
|
return prevDiam
|
|
|
|
|
|
|
|
def MeanFreePathBaron4_6(T, press, GasType=0):
|
|
'''
|
|
This function calculates the mean free path (units: [nm]) of gas molecules as a function of temperature and pressure.
|
|
Source: Baron&Willeke, 2001, eq. 4-6, p.65
|
|
martin.gysel@psi.ch; 17/08/2005; 24/05/2016
|
|
T: temperature, [°C]
|
|
press: pressure, [mbar=hectopascal]
|
|
GasType: 0: air //default
|
|
1: O2 //likely not yet implemented; see sub-procedure
|
|
2: N2 //likely not yet implemented; see sub-procedure
|
|
3: Argon //implemented
|
|
4: Helium //likely not yet implemented; see sub-procedure
|
|
5: CO2 //likely not yet implemented; see sub-procedure
|
|
6: CO //likely not yet implemented; see sub-procedure
|
|
'''
|
|
# unit conversion
|
|
kTzero = -273.15 # absolute zero temperature [°C]; source: http://de.wikipedia.org/wiki/Absoluter_Nullpunkt (note remark on 273.16)
|
|
T -= kTzero # convert [°C] to [K]
|
|
# gas specific settings
|
|
# variable Lref,S,Tref,pref
|
|
if GasType == 0: # air
|
|
Lref = 66.4 # reference lambda at NTP (20 °C, 1013 mbar)
|
|
# note: TSI uses 66.5 at NTP
|
|
S = 110.4 # Sutherland constant
|
|
Tref = 293.15 # reference temperature [K]
|
|
pref = 1013 # reference pressure [mbar]
|
|
elif GasType == 3: # argon
|
|
Lref = 69.4 # reference lambda at NTP (20 °C, 1013 mbar)
|
|
S = 141.4 # Sutherland constant
|
|
Tref = 293.15 # reference temperature [K]
|
|
pref = 1013 # reference pressure [mbar]
|
|
# calculate mean free path
|
|
Lambda = Lref * (pref / press) * (T / Tref) * (1 + S / Tref) / (1 + S / T)
|
|
return Lambda
|
|
|
|
def CunninghamSlipCorrBaron4_8(Lambda, Diam):
|
|
'''
|
|
This function calculates the Cunningham slip correction factor for solid particles as a function of the mean free path of the air molecules.
|
|
slightly different parameter for oil droplets
|
|
Source: Baron, 2001, eq. 4-8, p. 66
|
|
Note: That is the parametrisation that I use for the SMPS inversion.
|
|
martin.gysel@psi.ch, 17.08.2005
|
|
Lambda: mean free path of the air molecules, [nm]
|
|
Diam: particle diameter, [nm]
|
|
'''
|
|
Diam *= 1e-9 # convert from [nm] to [m]
|
|
Lambda *= 1e-9 # convert from [nm] to [m]
|
|
|
|
Kn = 2 * Lambda / Diam
|
|
a = 1.142
|
|
b = 0.558
|
|
g = 0.999
|
|
Cc = 1 + Kn * (a + b * np.exp(-g / Kn))
|
|
return Cc
|
|
|
|
def ViscosityOfGases(GasType=0, TC=20):
|
|
'''
|
|
This function returns the viscosity of a certain gas at a certain temperature TC
|
|
return value: viscosity of the specified gas at the temperature TC [kg/m/s]=[Pa*s]
|
|
Adapted from IGOR function by martin.gysel@psi.ch; 24/05/2016;
|
|
TC : temperature, [°C]
|
|
GasType: 0: air //default
|
|
1: O2 //likely not yet implemented; see sub-procedure
|
|
2: N2 //likely not yet implemented; see sub-procedure
|
|
3: Argon //implemented
|
|
4: Helium //likely not yet implemented; see sub-procedure
|
|
5: CO2 //likely not yet implemented; see sub-procedure
|
|
6: CO //likely not yet implemented; see sub-procedure
|
|
'''
|
|
if GasType == 0: # air
|
|
visc = ViscosityOfAirBaron4_10(TC)
|
|
elif GasType == 3: # argon
|
|
visc = ViscosityOfArgon(TC)
|
|
else:
|
|
print('Specified gas not yet implemented')
|
|
return visc
|
|
|
|
def ViscosityOfArgon(TC):
|
|
'''
|
|
This function calculates the dynamic viscosity of argon (units: [kg/m/s]=[Pa*s]) as a function of air temperature.
|
|
Source:
|
|
Baron&Willeke, 2001, eq. 4-10, p. 66
|
|
Reference value and Sutherland constant => B & W p. 65
|
|
Adapted from IGOR function by martin.gysel@psi.ch; 24/05/2016
|
|
TC : temperature, [°C]
|
|
'''
|
|
S = 141.4 # Sutherland constant for Argon
|
|
TC -= 273.15 # convert [°C] -> [K]
|
|
Vref = 2.2292E-5 # reference viscosity of Argon at 293.15 K
|
|
Tref = 293.15
|
|
visc = Vref * (Tref+S) / (TC+S) * (TC/Tref)**(3/2)
|
|
return visc
|
|
|
|
def ViscosityOfAirBaron4_10(TC):
|
|
'''
|
|
This function calculates the dynamic viscosity of air (units: [kg/m/s]=[Pa*s]) as a function of air temperature.
|
|
Source: Baron&Willeke, 2001, eq. 4-10, p. 66
|
|
Function adapted from IGOR function by martin.gysel@psi.ch
|
|
mg, 17.08.2005, tested against Ernest Weingartners "VOODOO"
|
|
TC : temperature, [°C]
|
|
'''
|
|
S = 110.4 # Sutherland constant for air
|
|
TC += 273.15 # convert [°C] -> [K]
|
|
Vref = 1.8325e-5 # reference viscosity at NTP (20 °C, 1013 mbar)
|
|
# note: TSI uses 1.8203e-5 at NTP
|
|
Tref=293.15
|
|
visc = Vref * (Tref+S) / (TC+S) * (TC/Tref)**(3/2)
|
|
return visc
|
|
|
|
def DiamToElectrMobility_Tp(ne, T, press, Diam, GasType=0):
|
|
# This function calculates the electrical mobility (units: [m²/Vs]) of a particle as a function of number of
|
|
# elementary charges, temperature (C), pressure (mbar), and particle diameter (nm)
|
|
# return value: electrical mobility [m²/Vs]
|
|
|
|
keCharge = 1.6021892E-19 # elementary charge [C]
|
|
|
|
Lambda = MeanFreePathBaron4_6(T, press, GasType=GasType)
|
|
Cc = CunninghamSlipCorrBaron4_8(Lambda,Diam)
|
|
Visc = ViscosityOfGases(GasType, T)
|
|
Zel = ne * keCharge * Cc / (3 * np.pi * Visc * 1e-9 * Diam)
|
|
return Zel
|
|
|
|
|
|
|
|
|
|
def lognormal(x, *params):
|
|
'''
|
|
Lognormal function for a generic number of modes
|
|
|
|
Parameters
|
|
----------
|
|
x : np array
|
|
x-array.
|
|
*params : list
|
|
[mu1, sigma1, N1, mu2, sigma2, N2, ...] : Parameters of the lognormal distribution.
|
|
|
|
Returns
|
|
-------
|
|
Yfit : np array
|
|
y-array of the (multi mode) lognormal distribution.
|
|
|
|
'''
|
|
Yfit = np.zeros(len(x))
|
|
n_modes = int(len(params) // 3)
|
|
for i in np.arange(0, n_modes):
|
|
mu, sigma, N = params[i], params[i + n_modes], params[i + 2*n_modes]
|
|
Yfit += N / (np.log10(sigma) * np.sqrt(2 * np.pi)) * np.exp(-((np.log10(x / mu)) ** 2) / (2 * np.log10(sigma) ** 2))
|
|
return Yfit
|
|
|
|
|
|
def lognorm_fit_peak(data, bin_centers, ini_guess, fit_min_relPeak=1e4, fit_max_relPeak=1e9):
|
|
'''
|
|
This function fits multiple lognormally distributed curves simultanesouly.
|
|
The number of lognormal curves is defined by the shape of the initial guess.
|
|
|
|
Parameters
|
|
----------
|
|
data : numpy array
|
|
Distribution of peak heights.
|
|
bin_centers : list
|
|
Locations of the bin centers used to calculate the distribution "data".
|
|
ini_guess : list of list
|
|
List containing three lists (mu, sigma, N).
|
|
The length of the three sublist has to be equal and represent the number of modes to be fitted.
|
|
fit_min_relPeak : float, optional
|
|
Lower limit of peak height to consider for the fit. The default is 1e4.
|
|
fit_max_relPeak : float, optional
|
|
Upper limit of peak height to consider for the fit. The default is 1e9.
|
|
|
|
Returns
|
|
-------
|
|
popt : TYPE
|
|
Fitted parameters.
|
|
|
|
'''
|
|
|
|
# Initial guess for fitting parameters:
|
|
p0 = ini_guess[0] + ini_guess[1] + ini_guess[2]
|
|
# Select only subset of peak heights
|
|
mask = (bin_centers>=fit_min_relPeak) & (bin_centers<=fit_max_relPeak)
|
|
x = bin_centers[mask]
|
|
y = data[mask]
|
|
# Perform the fit
|
|
popt, _ = curve_fit(lognormal, x, y, p0=p0)
|
|
|
|
return popt
|
|
|
|
def polynomial(x, *coefficients):
|
|
'''
|
|
Function to generate a generic polynomial curve. The number of input coefficicnets define the degree of the polynomial curve
|
|
|
|
Parameters
|
|
----------
|
|
x : np array
|
|
x-array for the evaluation of the polynomial curve.
|
|
*coefficients : array of floats
|
|
Coefficients for the polynomial curve. First value is for order 0, and so on.
|
|
|
|
Returns
|
|
-------
|
|
result : np array
|
|
Evaluation of the polynomial curve at x-array.
|
|
|
|
'''
|
|
result = 0
|
|
for i, coef in enumerate(coefficients):
|
|
result += coef * (x ** i)
|
|
return result
|
|
|
|
def powerlaw(x, A, B, C):
|
|
y = A*(x**B) + C
|
|
return y
|
|
|
|
|
|
|
|
def calculate_calib_coeff(pbp_data, calib_dict,
|
|
size_selection_method='',
|
|
calib_material='', rho_eff=1800,
|
|
fit=None, fit_p0=[], bounds=[], save_calib_coeff=False,
|
|
append_calib_curve={},
|
|
save_peak_hist_lognorm_fit_params=False,
|
|
do_peak_histogram_plots=False, save_peak_histogram_plots=False,
|
|
do_calib_curve_plot=False, save_calib_curve_plot=False,
|
|
plot_dir=''):
|
|
'''
|
|
|
|
|
|
Parameters
|
|
----------
|
|
pbp_data : TYPE
|
|
DESCRIPTION.
|
|
calib_dict : TYPE
|
|
DESCRIPTION.
|
|
# aerosol_type : TYPE, optional
|
|
# DESCRIPTION. The default is ''.
|
|
size_selection_method : TYPE, optional
|
|
DESCRIPTION. The default is ''.
|
|
calib_material : TYPE, optional
|
|
DESCRIPTION. The default is ''.
|
|
Possible inputs are: constant_effective_density, Aquadag_RCAST_2009, Aquadag_Moteki_AST2010, Aquadag_PSI_2010, FS_RCAST_2009, FS_Moteki_2010, FS_PSI_2010, FS_PSI_2010_old, Glassy Carbon
|
|
rho_eff : float, optional
|
|
Effective density [kg/m3] to use for calib_material='constant_effective_density'. The default is 1800.
|
|
fit : TYPE, optional
|
|
DESCRIPTION. The default is None.
|
|
fit_p0 : TYPE, optional
|
|
DESCRIPTION. The default is [].
|
|
bounds : TYPE, optional
|
|
DESCRIPTION. The default is [].
|
|
save_calib_coeff : TYPE, optional
|
|
DESCRIPTION. The default is False.
|
|
append_calib_curve : TYPE, optional
|
|
DESCRIPTION. The default is {}.
|
|
save_peak_hist_lognorm_fit_params : TYPE, optional
|
|
DESCRIPTION. The default is False.
|
|
do_peak_histogram_plots : TYPE, optional
|
|
DESCRIPTION. The default is False.
|
|
save_peak_histogram_plots : TYPE, optional
|
|
DESCRIPTION. The default is False.
|
|
do_calib_curve_plot : TYPE, optional
|
|
DESCRIPTION. The default is False.
|
|
save_calib_curve_plot : TYPE, optional
|
|
DESCRIPTION. The default is False.
|
|
plot_dir : TYPE, optional
|
|
DESCRIPTION. The default is ''.
|
|
|
|
Raises
|
|
------
|
|
ValueError
|
|
DESCRIPTION.
|
|
|
|
Returns
|
|
-------
|
|
calib_dict : TYPE
|
|
DESCRIPTION.
|
|
popt : TYPE
|
|
DESCRIPTION.
|
|
|
|
'''
|
|
tmp_peak_height_fit = []
|
|
tmp_mass_fit = []
|
|
tmp_diam_fit = []
|
|
color_list = ['C0', 'C1', 'C2', 'C3']
|
|
tmp_color = []
|
|
|
|
# *** CHeck which size selection method is used and make sure the Diam or the Mass are present in the calib_dict
|
|
if size_selection_method=='APM':
|
|
for key in calib_dict.keys():
|
|
if calib_dict[key]['mass'] is None:
|
|
raise ValueError(f"Missing mass value for {key}")
|
|
elif size_selection_method=='DMA':
|
|
for key in calib_dict.keys():
|
|
if calib_dict[key]['diam'] is None:
|
|
raise ValueError(f"Missing diameter value for {key}")
|
|
else:
|
|
raise ValueError("You have to select a size selection method first!")
|
|
|
|
|
|
for key in calib_dict.keys():
|
|
folder_name = calib_dict[key]['folder_name']
|
|
aerosol_type = calib_dict[key]['type']
|
|
n_modes = len(calib_dict[key]['mu'])
|
|
nbins = calib_dict[key]['nbins']
|
|
mass = calib_dict[key]['mass']
|
|
diam = calib_dict[key]['diam']
|
|
|
|
tmp_pbp = pbp_data.query('folder_name==@folder_name').loc[:, 'Incand relPeak']
|
|
|
|
ini_guess = [calib_dict[key]['mu'], calib_dict[key]['sigma'], calib_dict[key]['N']]
|
|
|
|
# If not specified by the user select a default range for the histogram:
|
|
if len(calib_dict[key]['bins_range'])==0:
|
|
min_bin_relPeak = 1e4
|
|
max_bin_relPeak = 1e9
|
|
else:
|
|
min_bin_relPeak = calib_dict[key]['bins_range'][0]
|
|
max_bin_relPeak = calib_dict[key]['bins_range'][1]
|
|
|
|
# If not specified by the user select a default range to fit the histogram:
|
|
if len(calib_dict[key]['fit_range'])==0:
|
|
fit_min_relPeak = 1e4
|
|
fit_max_relPeak = 1e9
|
|
else:
|
|
fit_min_relPeak = calib_dict[key]['fit_range'][0]
|
|
fit_max_relPeak = calib_dict[key]['fit_range'][1]
|
|
|
|
|
|
bin_edges = np.logspace(np.log10(min_bin_relPeak), np.log10(max_bin_relPeak), nbins)
|
|
bin_centers = bin_edges[:-1] + np.diff(bin_edges) / 2
|
|
# Create the hisotgram:
|
|
count, _ = np.histogram(tmp_pbp, bins=bin_edges)
|
|
# Fit the histogram with a number of lognormal curves defined by the lenght of the initial guess:
|
|
pu = lognorm_fit_peak(count, bin_centers, ini_guess,
|
|
fit_min_relPeak=fit_min_relPeak, fit_max_relPeak=fit_max_relPeak)
|
|
|
|
tmp_peak_height_fit.append(pu[0:n_modes])
|
|
ne = np.arange(n_modes)+1
|
|
if size_selection_method == 'APM':
|
|
mass_list = [mass]*ne
|
|
tmp_mass_fit.append(mass_list)
|
|
diam_list = BC_mass_to_diam(mass_list, BC_type=calib_material)[1]
|
|
tmp_diam_fit.append(diam_list)
|
|
elif size_selection_method == 'DMA':
|
|
diam_list = [MultiChargeSizesEqualMob(diam, e, TC=20, press=1013) for e in ne]
|
|
tmp_diam_fit.append(diam_list)
|
|
mass_list = BC_diam_to_mass(diam_list, BC_type=calib_material)[1]
|
|
tmp_mass_fit.append(mass_list)
|
|
|
|
#tmp_diam_fit.append()
|
|
tmp_color.append(color_list[:n_modes])
|
|
|
|
|
|
if save_peak_hist_lognorm_fit_params:
|
|
calib_dict[key]['peak_height_lognorm_fit'] = pu[0:n_modes]
|
|
if size_selection_method == 'APM':
|
|
calib_dict[key]['peak_height_corresponding_mass'] = mass_list
|
|
calib_dict[key]['peak_height_corresponding_diam'] = diam_list
|
|
y_fit = lognormal(bin_centers, *np.ravel(pu))
|
|
|
|
|
|
|
|
if do_peak_histogram_plots:
|
|
fig, axs = plt.subplots()
|
|
if size_selection_method == 'APM':
|
|
axs.set_title(f'file:{folder_name} - mass={mass} fg - aerosol:{aerosol_type}')
|
|
elif size_selection_method == 'DMA':
|
|
axs.set_title(f'file:{folder_name} - diam={diam} nm - aerosol:{aerosol_type}')
|
|
sns.histplot(tmp_pbp, stat='frequency', log_scale=(True, False), bins=nbins, color='grey', alpha=0.5)
|
|
|
|
axst = axs.twinx()
|
|
axst.plot(bin_centers, count, lw=2, c='k')
|
|
axst.plot(bin_centers[(bin_centers>=fit_min_relPeak) & (bin_centers<=fit_max_relPeak)],
|
|
count[(bin_centers>fit_min_relPeak) & (bin_centers<fit_max_relPeak)], c='r', lw=2)
|
|
axst.plot(bin_centers, y_fit, c='k', lw=2, ls='--')
|
|
for i in np.arange(n_modes):
|
|
axs.axvline(pu[i], c='g', lw=2)
|
|
axst.set_ylim(0, )
|
|
if save_peak_histogram_plots:
|
|
if size_selection_method == 'APM':
|
|
plt.savefig(plot_dir+f'/hist_plot_mass_{mass}.png', dpi=600)
|
|
elif size_selection_method == 'DMA':
|
|
plt.savefig(plot_dir+f'/hist_plot_diam_{diam}.png', dpi=600)
|
|
tmp_peak_height_fit = np.concatenate( tmp_peak_height_fit, axis=0 )
|
|
if len(tmp_mass_fit)>0:
|
|
tmp_mass_or_diam_fit = np.concatenate( tmp_mass_fit, axis=0 )
|
|
elif len(tmp_diam_fit)>0:
|
|
tmp_mass_or_diam_fit = np.concatenate( tmp_diam_fit, axis=0 )
|
|
tmp_color = np.concatenate( tmp_color, axis=0 )
|
|
|
|
if fit == 'polynomial':
|
|
popt, _ = curve_fit(polynomial, tmp_peak_height_fit, tmp_mass_or_diam_fit, p0=fit_p0, bounds=bounds)
|
|
elif fit == 'powerlaw':
|
|
if len(fit_p0) != 3:
|
|
print(f'Problem with the initial guess provided for power law fit. Number of coeff should be 3, and not {len(fit_p0)}')
|
|
else:
|
|
popt, _ = curve_fit(powerlaw, tmp_peak_height_fit, tmp_mass_or_diam_fit, p0=fit_p0, bounds=bounds)
|
|
|
|
|
|
|
|
if do_calib_curve_plot:
|
|
fig, axs = plt.subplots(layout='tight')
|
|
axs.scatter(tmp_peak_height_fit, tmp_mass_or_diam_fit, c=tmp_color, label=f'current calib data ({folder_name})')
|
|
axs.set_xscale('log')
|
|
axs.set_yscale('log')
|
|
axs.grid(which='both')
|
|
|
|
if len(append_calib_curve)>0:
|
|
for key in append_calib_curve:
|
|
axs.plot(append_calib_curve[key]['peak'], append_calib_curve[key]['mass'],
|
|
label=append_calib_curve[key]['label'], c=append_calib_curve[key]['color'],
|
|
ls=append_calib_curve[key]['ls'], marker=append_calib_curve[key]['marker'])
|
|
axs.legend()
|
|
|
|
if fit == 'polynomial':
|
|
calib_curve_fit = polynomial(bin_centers, *np.ravel(popt))
|
|
elif fit == 'powerlaw':
|
|
calib_curve_fit = powerlaw(bin_centers, *np.ravel(popt))
|
|
if fit:
|
|
axs.plot(bin_centers, calib_curve_fit, lw=2, c='C0')
|
|
|
|
if save_calib_curve_plot:
|
|
plt.savefig(plot_dir+'mass_peakH.png', dpi=600)
|
|
|
|
return calib_dict, popt
|
|
|
|
|
|
|
|
|
|
# %% Function to read config file
|
|
|
|
|
|
def read_xr_ini_file(fname):
|
|
"""
|
|
Read a .ini SP2-XR file, save parameters of interest in a dictionary
|
|
"""
|
|
params = {}
|
|
with open(fname, 'r') as f:
|
|
for line in f:
|
|
if 'Save Every Nth Particle' in line:
|
|
try:
|
|
params['SaveRate'] = float(line.split('=')[-1].replace('\n', ''))
|
|
except ValueError:
|
|
params['SaveRate'] = 1
|
|
if ('Incand Transit Time Min' in line) and ('Disqualified' not in line):
|
|
params['IncTransitMin'] = float(line.split('=')[-1].replace('\n', ''))
|
|
if ('Incand Transit Time Max' in line) and ('Disqualified' not in line):
|
|
params['IncTransitMax'] = float(line.split('=')[-1].replace('\n', ''))
|
|
if ('Incand FWHM Min' in line) and ('Disqualified' not in line):
|
|
params['IncFWHMMin'] = float(line.split('=')[-1].replace('\n', ''))
|
|
if ('Incand HWHM Min' in line) and ('Disqualified' not in line):
|
|
params['IncHWHMMin'] = float(line.split('=')[-1].replace('\n', ''))
|
|
if ('Incand FWHM Max' in line) and ('Disqualified' not in line):
|
|
params['IncFWHMMax'] = float(line.split('=')[-1].replace('\n', ''))
|
|
if ('Incand HWHM Max' in line) and ('Disqualified' not in line):
|
|
params['IncHWHMMax'] = float(line.split('=')[-1].replace('\n', ''))
|
|
if ('Incand Inter Particle Time Min' in line) and ('Disqualified' not in line):
|
|
params['IncInterTimeMin'] = float(line.split('=')[-1].replace('\n', ''))
|
|
if ('Scatter Transit Time Min' in line) and ('Disqualified' not in line):
|
|
params['ScattTransitMin'] = float(line.split('=')[-1].replace('\n', ''))
|
|
if ('Scatter Transit Time Max' in line) and ('Disqualified' not in line):
|
|
params['ScattTransitMax'] = float(line.split('=')[-1].replace('\n', ''))
|
|
if ('Scatter FWHM Min' in line) and ('Disqualified' not in line):
|
|
params['ScattFWHMMin'] = float(line.split('=')[-1].replace('\n', ''))
|
|
if ('Scatter HWHM Min' in line) and ('Disqualified' not in line):
|
|
params['ScattHWHMMin'] = float(line.split('=')[-1].replace('\n', ''))
|
|
if ('Scatter FWHM Max' in line) and ('Disqualified' not in line):
|
|
params['ScattFWHMMax'] = float(line.split('=')[-1].replace('\n', ''))
|
|
if ('Scatter HWHM Max' in line) and ('Disqualified' not in line):
|
|
params['ScattHWHMMax'] = float(line.split('=')[-1].replace('\n', ''))
|
|
if ('Scatter Inter Particle Time Min' in line) and ('Disqualified' not in line):
|
|
params['ScattInterTimeMin'] = float(line.split('=')[-1].replace('\n', ''))
|
|
return params
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#%% Functions to create histograms and distributions
|
|
|
|
|
|
def bin_ctrs_to_lims(x_ctrs):
|
|
''' This is from functions.py from Rob'''
|
|
"""
|
|
Take an array of bin centers and return an array 1 element larger of bin limits
|
|
"""
|
|
delta = np.diff(x_ctrs)/2
|
|
x_lims = np.append(x_ctrs[0]-delta[0], x_ctrs[:-1]+delta)
|
|
x_lims = np.append(x_lims, x_ctrs[-1]+delta[-1])
|
|
return x_lims
|
|
|
|
def bin_lims_to_ctrs(x_lims):
|
|
"""
|
|
Take an array of bin limits and return an array 1 element smaller of bin centers
|
|
"""
|
|
x_ctrs = (x_lims[1:] + x_lims[:-1])/2
|
|
return x_ctrs
|
|
|
|
|
|
def get_dlogp(Dp):
|
|
''' This is from functions.py from Rob'''
|
|
"""
|
|
Given an array of diameters Dp calculate an array of the log of the difference dlogdp
|
|
"""
|
|
dlogdp = np.log10(bin_ctrs_to_lims(Dp)[1:]) - np.log10(bin_ctrs_to_lims(Dp)[:-1])
|
|
return dlogdp
|
|
|
|
|
|
def counts2numConc(numb, flow, t=1):
|
|
''' This is from functions.py from Rob'''
|
|
"""
|
|
Calculate number concentration from counts (cnts), flow rate in [ccm] (Q) and sampling time in [secs] (t)
|
|
Return numConc with units of [cm^-3]
|
|
"""
|
|
conc = numb.divide(flow * (t/60), axis=0)
|
|
return conc
|
|
|
|
|
|
|
|
def calculate_histogram(series, bin_lims=np.logspace(np.log10(0.3), np.log10(400), 50)):
|
|
if series.dropna().empty:
|
|
# Return zeros for counts if the series is empty
|
|
counts = np.full(len(bin_lims) - 1, np.nan, dtype=float)
|
|
else:
|
|
counts, _ = np.histogram(series.dropna(), bins=bin_lims)
|
|
return counts.tolist()
|
|
|
|
|
|
|
|
def process_hist_and_dist(df, col, flag_col, flag_value, bin_lims, bin_ctrs, dt_str, calculate_conc=True, flow=None, rho_eff=1800, BC_type='', t=1):
|
|
# Apply condition if provided
|
|
if flag_col and flag_value is not None:
|
|
df_filtered = df.loc[df[flag_col] == flag_value]
|
|
else:
|
|
df_filtered = df
|
|
|
|
if flag_col is not None and 'cnts' in flag_col:
|
|
flag_col = flag_col[5:]
|
|
|
|
# Resample and calculate histogram
|
|
ddf_hist_compact = df_filtered[col].resample(dt_str).agg(
|
|
{'result': lambda x: calculate_histogram(x, bin_lims=bin_lims)}) # I might want to add a .fillna(0) here, this would solve the problem of fillna in the resampling function
|
|
|
|
# Add and filter based on 'original_idx'
|
|
ddf_hist_compact[['original_idx']] = df_filtered[['temporary_col']].resample(dt_str).count()
|
|
ddf_hist_compact = ddf_hist_compact[ddf_hist_compact['original_idx'] != 0]
|
|
ddf_hist_compact = ddf_hist_compact.drop('original_idx', axis=1)
|
|
|
|
# Initialize DataFrame based on whether the result is empty or not
|
|
list_col = 'result'
|
|
max_list_length = len(bin_ctrs)
|
|
|
|
if ddf_hist_compact.empty:
|
|
columns = [f'{list_col}_{i}' for i in range(max_list_length)]
|
|
ddf_hist = pd.DataFrame(np.nan, columns=columns, index=flow.index)
|
|
else:
|
|
ddf_hist = ddf_hist_compact[list_col].apply(pd.Series)
|
|
ddf_hist.index = ddf_hist_compact.index
|
|
|
|
|
|
if calculate_conc:
|
|
# Join with the sample flow controller data
|
|
inc_hist_flow = ddf_hist.join(flow)
|
|
|
|
if rho_eff is not None and BC_type is not None:
|
|
# Calculate diameters and densities
|
|
density, Dmev = BC_mass_to_diam(bin_ctrs, rho_eff=rho_eff, BC_type=BC_type)
|
|
|
|
# Calculate number concentration
|
|
dNdlogDmev = counts2numConc(inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t) / get_dlogp(Dmev)
|
|
if flag_col is None:
|
|
dNdlogDmev.columns = [f'dNdlogDmev_all_{i:.2f}' for i in Dmev]
|
|
else:
|
|
dNdlogDmev.columns = [f'dNdlogDmev_{flag_col}_{i:.2f}' for i in Dmev]
|
|
|
|
# Calculate mass concentration
|
|
dMdlogDmev = dNdlogDp_to_dMdlogDp(dNdlogDmev, Dmev, rho=density)
|
|
if flag_col is None:
|
|
dMdlogDmev.columns = [f'dMdlogDmev_all_{i:.2f}' for i in Dmev]
|
|
else:
|
|
dMdlogDmev.columns = [f'dMdlogDmev_{flag_col}_{i:.2f}' for i in Dmev]
|
|
|
|
else:
|
|
|
|
# Calculate number concentration
|
|
dNdlogDmev = counts2numConc(inc_hist_flow.iloc[:, :-1], inc_hist_flow.iloc[:, -1], t=t) / get_dlogp(bin_ctrs)
|
|
if flag_col is None:
|
|
dNdlogDmev.columns = [f'dNdlogDsc_all_{i:.2f}' for i in bin_ctrs]
|
|
else:
|
|
dNdlogDmev.columns = [f'dNdlogDsc_{flag_col}_{i:.2f}' for i in bin_ctrs]
|
|
|
|
dMdlogDmev = None
|
|
return dNdlogDmev, dMdlogDmev
|
|
|
|
else:
|
|
return ddf_hist, None
|
|
|
|
|
|
|
|
|
|
#%% Function to process single particle data already stored as parquet files
|
|
|
|
@delayed
|
|
def process_pbp_parquet(dir_path_pbp, dir_path_hk,
|
|
dt=1,
|
|
rho_eff=1800, BC_type='constant_effective_density',
|
|
inc_calib_curve='', inc_calib_params=[],
|
|
scatt_calib_curve='', scatt_calib_params=[],
|
|
config_file_dir='',
|
|
|
|
|
|
minM=None, maxM=None, n_incbins=None,
|
|
minOptD=None, maxOptD=None, n_scattbins=None,
|
|
minTL=None, maxTL=None, n_timelag=None,
|
|
save_final_data=True, path_parquet='', partition_on=['date', 'hour']
|
|
):
|
|
|
|
read_dir_pbp = dir_path_pbp
|
|
read_dir_hk = dir_path_hk
|
|
ddf_pbp = pd.read_parquet(read_dir_pbp)#, calculate_divisions=True)#.repartition(freq='1h')
|
|
ddf_hk = pd.read_parquet(read_dir_hk)#, calculate_divisions=True)#.repartition(freq='1h')
|
|
|
|
ini_params = read_xr_ini_file(glob(f'{config_file_dir}/*Calibration*.ini')[0])
|
|
ini_params['IncSatPoint'] = 1.7e9 # 2e9 # Guess for now, need to see where to obtain this from. 2e9 is probably better guess
|
|
ini_params['ScattSatPoint'] = 1.7e9 # Guess for now, need to see where to obtain this from. 1.85e9 is probably better
|
|
ini_params['DelayTimeThresh'] = 200 # Also a guess for now, to be updated
|
|
|
|
inc_mass_bin_lims = np.logspace(np.log10(minM), np.log10(maxM), n_incbins)
|
|
inc_mass_bin_lims_for_pd_cut = np.logspace(np.log10(minM), np.log10(maxM+0.01), n_incbins)
|
|
inc_mass_bin_ctrs = bin_lims_to_ctrs(inc_mass_bin_lims)
|
|
|
|
|
|
dt_str = f'{dt}s'
|
|
|
|
|
|
|
|
# ***** ***** *****
|
|
# Apply calibration
|
|
# ***** ***** *****
|
|
|
|
if inc_calib_curve:
|
|
if inc_calib_curve == 'polynomial':
|
|
ddf_pbp['BC mass'] = ddf_pbp['Incand relPeak'].apply(lambda x: polynomial(x, *inc_calib_params))#, meta=('BC mass', 'float64'))
|
|
minM_timelag = polynomial(5e6, *inc_calib_params)
|
|
elif inc_calib_curve == 'powerlaw':
|
|
ddf_pbp['BC mass'] = ddf_pbp['Incand relPeak'].apply(lambda x: powerlaw(x, inc_calib_params))#, meta=('BC mass', 'float64'))
|
|
minM_timelag = powerlaw(5e6, inc_calib_params)
|
|
else:
|
|
ddf_pbp['BC mass'] = ddf_pbp['Incand Mass (fg)']
|
|
minM_timelag = ddf_pbp.loc[(ddf_pbp['Incand relPeak']<=5.05e6) & (ddf_pbp['Incand relPeak']>=4.95e6), 'Incand Mass (fg)'].mean() # this could create problems if there are no incandescence signals in the selected range
|
|
|
|
ddf_pbp.loc[ddf_pbp['Incand relPeak']==0, 'BC mass'] = np.nan
|
|
|
|
ddf_pbp['BC mass bin'], mass_bins = pd.cut(ddf_pbp['BC mass'], bins=inc_mass_bin_lims_for_pd_cut, right=False, include_lowest=True, retbins=True)
|
|
|
|
|
|
if scatt_calib_curve:
|
|
if scatt_calib_curve == 'polynomial':
|
|
ddf_pbp['Opt diam'] = ddf_pbp['Scatter relPeak'].apply(lambda x: polynomial(x, *scatt_calib_params))#, meta=('Opt diam', 'float64'))
|
|
elif scatt_calib_curve == 'powerlaw':
|
|
ddf_pbp['Opt diam'] = ddf_pbp['Scatter relPeak'].apply(lambda x: powerlaw(x, *scatt_calib_params))#, meta=('Opt diam', 'float64'))
|
|
else:
|
|
ddf_pbp['Opt diam'] = ddf_pbp['Scatter Size (nm)']
|
|
|
|
ddf_pbp.loc[ddf_pbp['Scatter relPeak']==0, 'Opt diam'] = np.nan
|
|
|
|
|
|
ddf_pbp['time_lag_new'] = ddf_pbp['Incand Peak Time'] - ddf_pbp['Scatter Peak Time']
|
|
|
|
|
|
# ***** ***** ***** *****
|
|
# Set flags
|
|
# ***** ***** ***** *****
|
|
flag_inc_transit_time = (ddf_pbp['Incand Transit Time']>=ini_params['IncTransitMin']) & (ddf_pbp['Incand Transit Time']<=ini_params['IncTransitMax'])
|
|
flag_inc_fwhm = (ddf_pbp['Incand FWHM']>=ini_params['IncFWHMMin']) & (ddf_pbp['Incand FWHM']<=ini_params['IncFWHMMax'])
|
|
flag_inc_not_sat = (ddf_pbp['Incand relPeak']<ini_params['IncSatPoint'])
|
|
|
|
flag_scatt_transit_time = (ddf_pbp['Scatter Transit Time']>=ini_params['ScattTransitMin']) & (ddf_pbp['Scatter Transit Time']<=ini_params['ScattTransitMax'])
|
|
flag_scatt_fwhm = (ddf_pbp['Scatter FWHM']>=ini_params['ScattFWHMMin']) & (ddf_pbp['Scatter FWHM']<=ini_params['ScattFWHMMax'])
|
|
flag_scatt_not_sat = (ddf_pbp['Scatter relPeak']<ini_params['ScattSatPoint'])
|
|
|
|
flag_inc = (flag_inc_transit_time & flag_inc_fwhm)
|
|
flag_inc_in_range = flag_inc & (ddf_pbp['BC mass'] >= minM) & (ddf_pbp['BC mass'] <= maxM)
|
|
flag_inc_in_range_tl_analysis = flag_inc & (ddf_pbp['BC mass'] >= minM_timelag) & (ddf_pbp['BC mass'] <= maxM)
|
|
|
|
flag_scatt = (flag_scatt_transit_time & flag_scatt_fwhm)
|
|
flag_scatt_in_range = flag_scatt & (ddf_pbp['Opt diam'] >= minOptD) & (ddf_pbp['Opt diam'] <= maxOptD)
|
|
|
|
|
|
flag_negative_timelag = ddf_pbp['time_lag_new']<-10
|
|
flag_extreme_positive_timelag = ddf_pbp['time_lag_new']>=400
|
|
flag_timelag_0_50 = (ddf_pbp['time_lag_new']<50) & (ddf_pbp['time_lag_new']>=-10)
|
|
flag_timelag_greater_50 = (ddf_pbp['time_lag_new']>=50) & (ddf_pbp['time_lag_new']<400)
|
|
|
|
ddf_pbp['ratio_inc_scatt'] = np.log10(ddf_pbp['Incand relPeak']) / np.log10(ddf_pbp['Scatter relPeak'])
|
|
flag_low_ratio_inc_scatt = ddf_pbp['ratio_inc_scatt']<1.1
|
|
|
|
|
|
# ***** ***** ***** ***** ***** ***** *****
|
|
# Create column for purely scattering particle analysis
|
|
# ***** ***** ***** ***** ***** ***** *****
|
|
ddf_pbp['Opt diam scatt only'] = ddf_pbp['Opt diam'].where(flag_scatt & ~flag_inc, np.nan) #ddf_pbp['Opt diam'].mask(ddf_pbp['BC mass'].notnull(), np.nan)
|
|
|
|
|
|
|
|
# ***** ***** *****
|
|
# Set flag for thin/thick
|
|
# ***** ***** *****
|
|
|
|
ddf_pbp['cnts_thin'] = 0
|
|
ddf_pbp['cnts_thin_noScatt'] = 0
|
|
ddf_pbp['cnts_thick'] = 0
|
|
ddf_pbp['cnts_thick_sat'] = 0
|
|
ddf_pbp['cnts_thin_sat'] = 0
|
|
ddf_pbp['cnts_ntl_sat'] = 0
|
|
ddf_pbp['cnts_ntl'] = 0
|
|
ddf_pbp['cnts_extreme_positive_timelag'] = 0
|
|
ddf_pbp['cnts_thin_low_inc_scatt_ratio'] = 0
|
|
ddf_pbp['cnts_particles_for_tl_dist'] = 0 # this flag is used for the calculation of time lag distributions below (it includes the particles classfified as "thin" or "thick")
|
|
|
|
ddf_pbp.loc[~flag_scatt & flag_inc_in_range, 'cnts_thin_noScatt'] = 1
|
|
ddf_pbp.loc[flag_scatt & flag_scatt_not_sat & flag_inc_in_range & flag_timelag_0_50 & ~flag_low_ratio_inc_scatt, 'cnts_thin'] = 1
|
|
ddf_pbp.loc[flag_scatt & flag_scatt_not_sat & flag_inc_in_range & flag_timelag_0_50 & flag_low_ratio_inc_scatt, 'cnts_thin_low_inc_scatt_ratio'] = 1
|
|
ddf_pbp.loc[flag_scatt & flag_scatt_not_sat & flag_inc_in_range & flag_timelag_greater_50 & ~flag_extreme_positive_timelag, 'cnts_thick'] = 1
|
|
ddf_pbp.loc[flag_scatt & ~flag_scatt_not_sat & flag_inc_in_range & flag_timelag_greater_50 & ~flag_extreme_positive_timelag, 'cnts_thick_sat'] = 1
|
|
ddf_pbp.loc[flag_scatt & ~flag_scatt_not_sat & flag_inc_in_range & flag_timelag_0_50, 'cnts_thin_sat'] = 1
|
|
ddf_pbp.loc[flag_scatt & ~flag_scatt_not_sat & flag_inc_in_range & flag_negative_timelag, 'cnts_ntl_sat'] = 1
|
|
ddf_pbp.loc[flag_scatt & flag_scatt_not_sat & flag_inc_in_range & flag_negative_timelag, 'cnts_ntl'] = 1
|
|
ddf_pbp.loc[flag_scatt & flag_inc_in_range & flag_extreme_positive_timelag, 'cnts_extreme_positive_timelag'] = 1
|
|
ddf_pbp.loc[flag_scatt & flag_scatt_not_sat & flag_inc_in_range & ((flag_timelag_0_50 & ~flag_low_ratio_inc_scatt) | (flag_timelag_greater_50 & ~flag_extreme_positive_timelag)), 'cnts_particles_for_tl_dist'] = 1
|
|
|
|
|
|
ddf_pbp['cnts_thin_total'] = ddf_pbp['cnts_thin'] + ddf_pbp['cnts_thin_noScatt']
|
|
ddf_pbp['cnts_thick_total'] = ddf_pbp['cnts_thick'] + ddf_pbp['cnts_thick_sat'] + ddf_pbp['cnts_ntl_sat'] + ddf_pbp['cnts_ntl'] + ddf_pbp['cnts_thin_sat']
|
|
ddf_pbp['cnts_unclassified'] = ddf_pbp['cnts_extreme_positive_timelag'] + ddf_pbp['cnts_thin_low_inc_scatt_ratio']
|
|
|
|
|
|
|
|
ddf_pbp['temporary_col'] = 1 # this is to keep track of time stamps that are not originally in the file but are added when i do the resampling
|
|
|
|
# Create two new columns where the particles out of range are changed to np.nan (!! do not change them to 0, otherwise below where we resample and count, the 0s would be counted being a not null value!!)
|
|
ddf_pbp['BC mass within range'] = ddf_pbp['BC mass'].where(flag_inc_in_range, np.nan) # ddf_pbp['BC mass'].mask(mask_BC_within_range)
|
|
ddf_pbp['Opt diam scatt only within range'] = ddf_pbp['Opt diam scatt only'].where(flag_scatt_in_range & ~flag_inc, np.nan)
|
|
|
|
|
|
|
|
|
|
ddf_pbp_1s = ddf_pbp[['Dropped Records',
|
|
'Incand Mass (fg)', # This is the original column from the file, the calibration applied is the one uploaded in the software
|
|
'BC mass', # If calibration parameters are provided, this is the mass calculated form those parameters. If calib param are not provided this is equal to 'Incand Mass (fg)'
|
|
'BC mass within range', # In this column only the masses within the range specified by the user are considered
|
|
'cnts_thin',
|
|
'cnts_thin_noScatt',
|
|
'cnts_thick',
|
|
'cnts_thick_sat',
|
|
'cnts_thin_sat',
|
|
'cnts_ntl_sat',
|
|
'cnts_ntl',
|
|
'cnts_extreme_positive_timelag',
|
|
'cnts_thin_low_inc_scatt_ratio',
|
|
'cnts_thin_total',
|
|
'cnts_thick_total',
|
|
'cnts_unclassified'
|
|
]].resample(dt_str).sum()
|
|
|
|
ddf_pbp_1s[['BC numb from file', # number of BC particles counted from the column 'Incand Mass (fg)'
|
|
'BC numb', # number of BC particles counted from the column 'BC mass'
|
|
'BC numb within range', # number of BC particles counted from the column 'BC mass within range'
|
|
'scatter numb from file', # number of scattering particles counted from the column 'Scatter Size (nm)'
|
|
'Scatt numb', # number of purely scattering particles counted from the column 'Opt diam'
|
|
'Scatt numb within range', # number of purely scattering particles counted from the column 'Opt diam within range'
|
|
'original_idx',
|
|
]] = ddf_pbp[['Incand Mass (fg)',
|
|
'BC mass',
|
|
'BC mass within range',
|
|
'Scatter Size (nm)',
|
|
'Opt diam scatt only',
|
|
'Opt diam scatt only within range',
|
|
'temporary_col'
|
|
]].resample(dt_str).count()
|
|
|
|
|
|
ddf_pbp_1s[['Secs_2GB_mean']] = ddf_pbp[['Secs_2GB']].resample(dt_str).mean()
|
|
|
|
|
|
# ddf_pbp_1s = ddf_pbp_1s.map_partitions(add_date_column)
|
|
ddf_pbp_1s['date'] = ddf_pbp_1s.index.normalize()
|
|
ddf_pbp_1s['hour'] = ddf_pbp_1s.index.hour
|
|
|
|
|
|
|
|
ddf_pbp_1s = ddf_pbp_1s[ddf_pbp_1s['original_idx'] != 0]
|
|
ddf_pbp_1s = ddf_pbp_1s.drop('original_idx', axis=1)#.persist()
|
|
|
|
# **** **** **** **** **** **** ****
|
|
# Create a 1s ddf for the hk file
|
|
# **** **** **** **** **** **** ****
|
|
|
|
ddf_hk['temporary_col'] = 1
|
|
|
|
ddf_hk_1s = ddf_hk[['Sample Flow Controller Read (sccm)',
|
|
'Sample Flow Controller Read (vccm)']].resample(dt_str).mean() # do i need a fill na here?
|
|
ddf_hk_1s[['original_idx']] = ddf_hk[['temporary_col']].resample(dt_str).count()
|
|
|
|
ddf_hk_1s = ddf_hk_1s[ddf_hk_1s['original_idx'] != 0]
|
|
ddf_hk_1s = ddf_hk_1s.drop('original_idx', axis=1)#.persist()
|
|
|
|
|
|
# **** **** **** **** **** **** ****
|
|
# Create a 1s join ddf for the pbp_1s and hk_1s data
|
|
# **** **** **** **** **** **** ****
|
|
|
|
|
|
ddf_pbp_hk = pd.merge(ddf_pbp_1s, ddf_hk_1s, how='left', left_index=True, right_index=True)
|
|
# ddf_pbp_hk = ddf_pbp_hk.map_partitions(calculate_concentrations, t=dt)
|
|
ddf_pbp_hk['BC_massConc_std'] = ddf_pbp_hk['BC mass'] * 1e-9 / (ddf_pbp_hk['Sample Flow Controller Read (sccm)'] * (dt/60) * 1e-6)
|
|
ddf_pbp_hk['BC_massConc_vol'] = ddf_pbp_hk['BC mass'] * 1e-9 / (ddf_pbp_hk['Sample Flow Controller Read (vccm)'] * (dt/60) * 1e-6)
|
|
|
|
ddf_pbp_hk['BC_numConc_std'] = ddf_pbp_hk['BC numb'] / (ddf_pbp_hk['Sample Flow Controller Read (sccm)'] * (dt/60))
|
|
ddf_pbp_hk['BC_numConc_vol'] = ddf_pbp_hk['BC numb'] / (ddf_pbp_hk['Sample Flow Controller Read (vccm)'] * (dt/60))
|
|
|
|
ddf_pbp_hk['BC_massConc_within_range_std'] = ddf_pbp_hk['BC mass within range'] * 1e-9 / (ddf_pbp_hk['Sample Flow Controller Read (sccm)'] * (dt/60) * 1e-6)
|
|
ddf_pbp_hk['BC_massConc_within_range_vol'] = ddf_pbp_hk['BC mass within range'] * 1e-9 / (ddf_pbp_hk['Sample Flow Controller Read (vccm)'] * (dt/60) * 1e-6)
|
|
|
|
ddf_pbp_hk['BC_numConc_within_range_std'] = ddf_pbp_hk['BC numb within range'] / (ddf_pbp_hk['Sample Flow Controller Read (sccm)'] * (dt/60))
|
|
ddf_pbp_hk['BC_numConc_within_range_vol'] = ddf_pbp_hk['BC numb within range'] / (ddf_pbp_hk['Sample Flow Controller Read (vccm)'] * (dt/60))
|
|
|
|
ddf_pbp_hk['S_numConc_std'] = ddf_pbp_hk['Scatt numb'] / (ddf_pbp_hk['Sample Flow Controller Read (sccm)'] * (dt/60))
|
|
ddf_pbp_hk['S_numConc_vol'] = ddf_pbp_hk['Scatt numb'] / (ddf_pbp_hk['Sample Flow Controller Read (vccm)'] * (dt/60))
|
|
|
|
ddf_pbp_hk['S_numConc_within_range_std'] = ddf_pbp_hk['Scatt numb within range'] / (ddf_pbp_hk['Sample Flow Controller Read (sccm)'] * (dt/60))
|
|
ddf_pbp_hk['S_numConc_within_range_vol'] = ddf_pbp_hk['Scatt numb within range'] / (ddf_pbp_hk['Sample Flow Controller Read (vccm)'] * (dt/60))
|
|
|
|
|
|
|
|
|
|
# Calculate histograms of different classifications/flags:
|
|
ddf_pbp['temporary_col'] = 1
|
|
|
|
dNdlogDmev, dMdlogDmev = process_hist_and_dist(ddf_pbp, 'BC mass within range', None, None, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
|
|
dNdlogDmev_thin, dMdlogDmev_thin = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thin', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
|
|
dNdlogDmev_thin_noScatt, dMdlogDmev_thin_noScatt = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thin_noScatt', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
|
|
dNdlogDmev_thick, dMdlogDmev_thick = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thick', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
|
|
dNdlogDmev_thick_sat, dMdlogDmev_thick_sat = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thick_sat', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
|
|
dNdlogDmev_thin_sat, dMdlogDmev_thin_sat = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thin_sat', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
|
|
dNdlogDmev_ntl_sat, dMdlogDmev_ntl_sat = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_ntl_sat', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
|
|
dNdlogDmev_ntl, dMdlogDmev_ntl = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_ntl', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
|
|
dNdlogDmev_extreme_positive_timelag, dMdlogDmev_extreme_positive_timelag = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_extreme_positive_timelag', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
|
|
dNdlogDmev_thin_low_inc_scatt_ratio, dMdlogDmev_thin_low_inc_scatt_ratio = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thin_low_inc_scatt_ratio', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
|
|
dNdlogDmev_thin_total, dMdlogDmev_thin_total = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thin_total', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
|
|
dNdlogDmev_thick_total, dMdlogDmev_thick_total = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_thick_total', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
|
|
dNdlogDmev_unclassified, dMdlogDmev_unclassified = process_hist_and_dist(ddf_pbp, 'BC mass within range', 'cnts_unclassified', 1, inc_mass_bin_lims, inc_mass_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=rho_eff, BC_type=BC_type, t=dt)
|
|
|
|
|
|
scatt_bin_lims = np.logspace(np.log10(minOptD), np.log10(maxOptD), n_scattbins)
|
|
scatt_bin_ctrs = bin_lims_to_ctrs(scatt_bin_lims)
|
|
|
|
dNdlogDsc, _ = process_hist_and_dist(ddf_pbp, 'Opt diam scatt only', None, None, scatt_bin_lims, scatt_bin_ctrs, dt_str, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=None, BC_type=None, t=dt)
|
|
|
|
|
|
|
|
timelag_bins_lims = np.linspace(minTL, maxTL, n_timelag)
|
|
timelag_bin_ctrs = bin_lims_to_ctrs(timelag_bins_lims)
|
|
|
|
list_hist = []
|
|
for idx, (name, group) in enumerate(ddf_pbp.groupby('BC mass bin')):
|
|
a, _ = process_hist_and_dist(group, 'time_lag_new', 'cnts_particles_for_tl_dist', 1, timelag_bins_lims, timelag_bin_ctrs, dt_str, calculate_conc=True, flow=ddf_pbp_hk['Sample Flow Controller Read (vccm)'], rho_eff=None, BC_type=None, t=dt)
|
|
a.columns = [f'dNdlogDmev_{inc_mass_bin_ctrs[idx]:.2f}_timelag_{i:.1f}' for i in timelag_bin_ctrs]
|
|
|
|
list_hist.append(a)
|
|
time_lag_hists = pd.concat(list_hist, axis=1)
|
|
|
|
|
|
|
|
|
|
# **** **** **** **** **** ****
|
|
# Merge everything in 1 ddf and save
|
|
# **** **** **** **** **** ****
|
|
|
|
|
|
final_df = pd.concat([ddf_pbp_hk,
|
|
dNdlogDmev, dMdlogDmev,
|
|
dNdlogDmev_thin, dMdlogDmev_thin,
|
|
dNdlogDmev_thin_noScatt, dMdlogDmev_thin_noScatt,
|
|
dNdlogDmev_thick, dMdlogDmev_thick,
|
|
dNdlogDmev_thick_sat, dMdlogDmev_thick_sat,
|
|
dNdlogDmev_thin_sat, dMdlogDmev_thin_sat,
|
|
dNdlogDmev_ntl_sat, dMdlogDmev_ntl_sat,
|
|
dNdlogDmev_ntl, dMdlogDmev_ntl,
|
|
dNdlogDmev_extreme_positive_timelag, dMdlogDmev_extreme_positive_timelag,
|
|
dNdlogDmev_thin_low_inc_scatt_ratio, dMdlogDmev_thin_low_inc_scatt_ratio,
|
|
dNdlogDmev_thin_total, dMdlogDmev_thin_total,
|
|
dNdlogDmev_thick_total, dMdlogDmev_thick_total,
|
|
dNdlogDmev_unclassified, dMdlogDmev_unclassified,
|
|
dNdlogDsc,
|
|
time_lag_hists],
|
|
axis=1)
|
|
|
|
|
|
final_df['date'] = final_df.index.normalize()
|
|
final_df['hour'] = final_df.index.hour
|
|
final_df["date"] = final_df["date"].astype("date64[pyarrow]")
|
|
|
|
if save_final_data == True:
|
|
dd.from_pandas(final_df.sort_index(), npartitions=1).to_parquet(path = path_parquet,
|
|
engine='pyarrow',
|
|
partition_on=partition_on,
|
|
write_index=True
|
|
)
|
|
|
|
|
|
|
|
#%% Function to resample pbp to new time interval
|
|
|
|
|
|
def resample_to_dt(dir_path_pbp, dt=60, path_parquet='', save_final_data=False):
|
|
|
|
dd_data = pd.read_parquet(dir_path_pbp)
|
|
|
|
dd_data['Time'] = dd_data.index
|
|
dd_data['date'] = dd_data['Time'].dt.date.astype("date64[pyarrow]")
|
|
|
|
dd_data['temporary_col'] = 1
|
|
|
|
cols_for_mean = [col for col in dd_data.columns if any(substring in col for substring in ['date', 'Conc', 'dNdlogDmev', 'dMdlogDmev', 'dNdlogDsc', 'Flow'])]
|
|
#thin_cols_list = [col for col in dd_data.columns if 'thin' in col and all(substring not in col for substring in ['BC', 'dNdlogDmev', 'dMdlogDmev'])]
|
|
#thick_cols_list = [col for col in dd_data.columns if 'thick' in col and all(substring not in col for substring in ['BC', 'dNdlogDmev', 'dMdlogDmev'])]
|
|
#cols_for_sum = [col for col in dd_data.columns if any(substring in col for substring in ['timelag'])]+thin_cols_list+thick_cols_list
|
|
timelag_hist_cols = [col for col in dd_data.columns if 'timelag' in col and all(substring not in col for substring in ['cnts', 'dNdlogDmev', 'dMdlogDmev'])]
|
|
cnts_cols = [col for col in dd_data.columns if 'cnts' in col]
|
|
addiotnal_cols = [col for col in dd_data.columns if col in ['Dropped Records', 'Incand Mass (fg)', 'BC mass', 'BC mass within range',
|
|
'BC numb from file', 'BC numb', 'BC numb within range',
|
|
'scatter numb from file', 'Scatt numb', 'Scatt numb within range']]
|
|
cols_for_sum = timelag_hist_cols + cnts_cols + addiotnal_cols
|
|
cols_for_count = ['temporary_col']
|
|
|
|
data_resampled_mean = dd_data[cols_for_mean].fillna(0).resample(f'{dt}s').mean()
|
|
data_resampled_sum = dd_data[cols_for_sum].fillna(0).resample(f'{dt}s').sum()
|
|
data_resampled_count = dd_data[cols_for_count].resample(f'{dt}s').count()
|
|
|
|
|
|
#merged = dd.merge(data_resampled_mean, data_resampled_sum, left_index=True, right_index=True, how='outer')
|
|
merged = pd.concat([data_resampled_mean, data_resampled_sum], axis=1)
|
|
#merged['date'] = merged.index.normalize()
|
|
merged = merged[merged['date'].notna()]
|
|
#merged['date'] = merged.index.normalize()
|
|
#merged["date"] = merged["date"].astype("date64[pyarrow]")
|
|
|
|
|
|
if save_final_data:
|
|
dd.from_pandas(merged.sort_index(), npartitions=1).to_parquet(path=path_parquet,
|
|
engine='pyarrow',
|
|
partition_on=['date'],
|
|
coerce_timestamps='us',
|
|
allow_truncated_timestamps=True,
|
|
write_index=True,
|
|
append=False)
|
|
|
|
|
|
#%% Widget to plot raw traces
|
|
|
|
|
|
def raw_traces_plot(ch0_plot=[], ch1_plot=[],
|
|
add_str_title='',
|
|
xmin=None, xmax=None,
|
|
ddf_processed=False, verbose=False,
|
|
):
|
|
current_event_index = 1
|
|
|
|
def update_plot(event_index):
|
|
fig, ax1 = plt.subplots(figsize=(5, 3), layout='constrained')
|
|
|
|
if verbose and ddf_processed.any().any():
|
|
ax1.text(0.75, 0.4, 'ch0_flag_000: '+str(ddf_processed.iloc[event_index]['ch0_flag_000']), fontsize=8, transform=ax1.transAxes)
|
|
|
|
|
|
ax1.set_title(add_str_title+' , idx: '+str(event_index)+'\n'+str(ch0_plot.iloc[event_index].name))
|
|
ax1t = ax1.twinx()
|
|
|
|
if len(ch0_plot)!=0:
|
|
ax1.plot(ch0_plot.iloc[event_index], label='ch0', c='C0', ls='-')
|
|
if len(ch1_plot)!=0:
|
|
ax1t.plot(ch1_plot.iloc[event_index], label='ch1', c='C1', ls='-')
|
|
|
|
ax1.set_ylabel('Scattering', color='tab:blue')
|
|
ax1.tick_params(axis='y', labelcolor='tab:blue')
|
|
ax1.legend(loc=1)
|
|
ax1t.legend(loc=4)
|
|
|
|
if xmin and xmax is not None:
|
|
ax1.set_xlim(xmin, xmax)
|
|
|
|
plt.show()
|
|
|
|
def handle_forward_button_click(b):
|
|
nonlocal current_event_index
|
|
current_event_index = min(current_event_index + 1, len(ch0_plot) - 1)
|
|
event_slider.value = current_event_index
|
|
|
|
def handle_backward_button_click(b):
|
|
nonlocal current_event_index
|
|
current_event_index = max(current_event_index - 1, 0)
|
|
event_slider.value = current_event_index
|
|
|
|
event_slider = widgets.IntSlider(min=0, max=len(ch0_plot) - 1, step=1, value=current_event_index, description='Event:')
|
|
forward_button = widgets.Button(description='Forward')
|
|
backward_button = widgets.Button(description='Backward')
|
|
|
|
forward_button.on_click(handle_forward_button_click)
|
|
backward_button.on_click(handle_backward_button_click)
|
|
|
|
display(widgets.HBox([backward_button, forward_button]))
|
|
interactive_plot = widgets.interactive(update_plot, event_index=event_slider)
|
|
display(interactive_plot)
|
|
|
|
|
|
#%% Functions for plots
|
|
|
|
|
|
def save_image(filename):
|
|
|
|
with PdfPages(filename) as p:
|
|
|
|
# get_fignums Return list of existing
|
|
# figure numbers
|
|
fig_nums = plt.get_fignums()
|
|
figs = [plt.figure(n) for n in fig_nums]
|
|
|
|
# iterating over the numbers in list
|
|
for fig in figs:
|
|
|
|
# and saving the files
|
|
fig.savefig(p, format='pdf') |