From 890314ad247708a2bafcd7416fde73f8da52acff Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Wed, 18 Sep 2024 14:03:37 +0200 Subject: [PATCH] Remove NyA code --- NyA_code/Processing_code_NyA.py | 363 -------------------------------- 1 file changed, 363 deletions(-) delete mode 100644 NyA_code/Processing_code_NyA.py diff --git a/NyA_code/Processing_code_NyA.py b/NyA_code/Processing_code_NyA.py deleted file mode 100644 index 8b8b897..0000000 --- a/NyA_code/Processing_code_NyA.py +++ /dev/null @@ -1,363 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Created on Mon May 27 11:45:19 2024 - -@author: bertoz_b -""" - -import time -import pandas as pd -import numpy as np -import sys -import pickle -import dask.dataframe as dd -from dask.distributed import Client -from dask_jobqueue import SLURMCluster -import datetime -import struct -import zipfile -from dask import delayed -import itertools -import gc - -from SP2XR_toolkit_2 import * - -def chunks(lst, n): - """Yield successive n-sized chunks from lst.""" - for i in range(0, len(lst), n): - yield lst[i:i + n] - - -# ***** Part A 1 : from csv to parquet - -parent_directory = '/data/user/bertoz_b/SP2XR/data/NyA' -source_directory = parent_directory + '/SP2XR_files' - -target_directory_pbp_parquet = parent_directory + '/SP2XR_pbp_parquet' -filter_string_pbp = 'PbP' - -target_directory_sp2b_parquet = parent_directory + '/SP2XR_sp2b_parquet' -filter_string_sp2b = 'sp2b' - -target_directory_hk_parquet = parent_directory + '/SP2XR_hk_parquet' -filter_string_hk = 'hk' - -meta_file_pbp = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/pbp_meta.parquet', engine='fastparquet') -meta_file_hk = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/hk_meta.parquet', engine='fastparquet') -meta_file_sp2b = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/meta_sp2b_20240619.parquet', engine='pyarrow') - -matching_files_pbp = find_files(source_directory, filter_string_pbp) -matching_files_hk = find_files(source_directory, filter_string_hk) -# matching_files_sp2b = find_files(source_directory, filter_string_sp2b)[10000:50000] - - -# PBP -''' -start_time = time.time() -cluster = SLURMCluster(cores=64, - processes=64, - memory="128GB", # 2GB/process should sufficient - walltime="05:59:00", - job_extra_directives=['--partition=daily'] - ) -cluster.scale(1) -client = Client(cluster) -print(client.dashboard_link) - - - -i = 0 -for chunk_pbp in chunks(matching_files_pbp[3200:], 100): - print(f'chunk: {i}') - - # pbp_orig = read_and_process(matching_files_pbp, target_directory_pbp_parquet, meta_file_pbp, meta_file_hk) - dask.compute(*[read_csv_files_with_dask_2(f, meta_file_pbp, meta_file_hk, target_directory_pbp_parquet) for f in chunk_pbp]) - - gc.collect() - i += 1 - -client.close() -cluster.close() -print("--- %s seconds ---" % (time.time() - start_time)) -''' - - -# HK -''' -start_time = time.time() -cluster = SLURMCluster(cores=16, - processes=16, - memory="128GB", # 2GB/process should sufficient - walltime="07:59:00", - job_extra_directives=['--partition=daily'] - ) -cluster.scale(1) -client = Client(cluster) -print(client.dashboard_link) - -i = 0 -for chunk_hk in chunks(matching_files_hk, 100): - print(f'chunk: {i}') - - # hk_orig = read_and_process(matching_files_hk, target_directory_hk_parquet, meta_file_pbp, meta_file_hk) - dask.compute(*[read_csv_files_with_dask_2(f, meta_file_pbp, meta_file_hk, target_directory_hk_parquet) for f in chunk_hk]) - - gc.collect() - i += 1 - -client.close() -cluster.close() -print("--- %s seconds ---" % (time.time() - start_time)) -''' - -# SP2B -''' -start_time = time.time() -cluster = SLURMCluster(cores=64, - processes=64, - memory="128GB", # 2GB/process should sufficient - walltime="02:59:00", - job_extra_directives=['--partition=daily'] - ) -cluster.scale(1) -client = Client(cluster) -print(client.dashboard_link) - -sp2b_orig = read_and_process_sp2b(matching_files_sp2b, target_directory_sp2b_parquet, meta_file_sp2b) - -client.close() -cluster.close() -print("--- %s seconds ---" % (time.time() - start_time)) -''' - - -#%% Process pbp data - - - -# config_path = '/data/user/bertoz_b/SP2XR/data/Granada/SP2XR_files/20240612/20240612061852' -config_path = '/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_files/20200922/20200922064903' - -def get_file_dict(directory, file_type): - """ - Creates a dictionary with date and hour as keys and file paths as values - for the given directory. - """ - file_dict = {} - # Assuming the files have a structure like /path/to/directory/YYYYMMDD/HH/file.parquet - for file_path in glob(f'{directory}/**/*.parquet', recursive=True): - # Extracting date and hour from the file path - parts = file_path.split(os.sep) - date = parts[-3] - hour = parts[-2] - file_dict[(date, hour)] = os.path.join('/', *parts[:-1]) - return file_dict - - -files_pbp = get_file_dict(target_directory_pbp_parquet, 'PbP') -files_hk = get_file_dict(target_directory_hk_parquet, 'HK') - -list_pbp = [] -list_hk = [] -for key in files_pbp: - if key in files_hk: - list_pbp.append(files_pbp[key]) - list_hk.append(files_hk[key]) - - - -start_time = time.time() - -cluster = SLURMCluster(cores=8, - processes=8, - memory="128GB", - walltime="10:59:00", - job_extra_directives=['--partition=general'] - ) -# cluster = SLURMCluster(cores=8, -# processes=8, -# memory="64GB", -# walltime="00:20:00", -# job_extra_directives=['--partition=hourly'] -# ) - -cluster.scale(1) -client = Client(cluster) -print(client.dashboard_link) - -i = 0 -for chunk_pbp, chunk_hk in zip(chunks(list_pbp[8000:], 50), chunks(list_hk[8000:], 50)): - print(f'chunk: {i}') - - - dask.compute(*[process_new_test(dir_path_pbp, dir_path_hk, dt=1, - rho_eff=1800, BC_type='constant_effective_density', - inc_calib_curve='polynomial', inc_calib_params=[0.05, 2.0470000507725255e-07], - scatt_calib_curve='powerlaw', scatt_calib_params=[17.21724257, 0.16908516, -1.49431104], - config_file_dir=config_path, - - - minM=0.3, maxM=400, n_incbins=50, - minOptD=100, maxOptD=500, n_scattbins=20, - minTL=-10, maxTL=400, n_timelag=100, - - path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new/', - save_final_data=True) for dir_path_pbp, dir_path_hk in zip(chunk_pbp, chunk_hk)]) - gc.collect() - i += 1 -client.close() -cluster.close() -print("--- %s seconds ---" % (time.time() - start_time)) - - - -#%% Resample pbp to 1min - -''' -def list_files_and_dirs(base_dir): - """ List full paths of all files and directories in the given directory, including nested ones """ - file_list = [] - subdir_list = [] - - # os.walk yields a tuple (dirpath, dirnames, filenames) - for dirpath, dirnames, filenames in os.walk(base_dir): - # Add directories to subdir_list - for dirname in dirnames: - subdir_full_path = os.path.join(dirpath, dirname) - subdir_list.append(subdir_full_path) - - # Add files to file_list - for filename in filenames: - file_full_path = os.path.join(dirpath, filename) - file_list.append(file_full_path) - - return subdir_list, file_list - - -def list_first_level_subdirs(base_dir): - """ List full paths of all first-level subdirectories within the given directory """ - subdir_list = [] - # List all entries in the base directory - for entry in os.listdir(base_dir): - # Create full path - full_path = os.path.join(base_dir, entry) - # Check if this path is a directory - if os.path.isdir(full_path): - subdir_list.append(full_path) - return subdir_list - - - -def resample_to_dt(dir_path_pbp, dt=60, path_parquet='', save_final_data=False): - - dd_data = pd.read_parquet(dir_path_pbp) - - dd_data['Time'] = dd_data.index - dd_data['date'] = dd_data['Time'].dt.date.astype("date64[pyarrow]") - - dd_data['temporary_col'] = 1 - - cols_for_mean = [col for col in dd_data.columns if any(substring in col for substring in ['date', 'Conc', 'dNdlogDmev', 'dMdlogDmev', 'dNdlogDsc', 'Flow'])] - #thin_cols_list = [col for col in dd_data.columns if 'thin' in col and all(substring not in col for substring in ['BC', 'dNdlogDmev', 'dMdlogDmev'])] - #thick_cols_list = [col for col in dd_data.columns if 'thick' in col and all(substring not in col for substring in ['BC', 'dNdlogDmev', 'dMdlogDmev'])] - #cols_for_sum = [col for col in dd_data.columns if any(substring in col for substring in ['timelag'])]+thin_cols_list+thick_cols_list - timelag_hist_cols = [col for col in dd_data.columns if 'timelag' in col and all(substring not in col for substring in ['cnts', 'dNdlogDmev', 'dMdlogDmev'])] - cnts_cols = [col for col in dd_data.columns if 'cnts' in col] - addiotnal_cols = [col for col in dd_data.columns if col in ['Dropped Records', 'Incand Mass (fg)', 'BC mass', 'BC mass within range', - 'BC numb from file', 'BC numb', 'BC numb within range', - 'scatter numb from file', 'Scatt numb', 'Scatt numb within range']] - cols_for_sum = timelag_hist_cols + cnts_cols + addiotnal_cols - cols_for_count = ['temporary_col'] - - data_resampled_mean = dd_data[cols_for_mean].resample(f'{dt}s').mean() - data_resampled_sum = dd_data[cols_for_sum].resample(f'{dt}s').sum() - data_resampled_count = dd_data[cols_for_count].resample(f'{dt}s').count() - - - #merged = dd.merge(data_resampled_mean, data_resampled_sum, left_index=True, right_index=True, how='outer') - merged = pd.concat([data_resampled_mean, data_resampled_sum], axis=1) - #merged['date'] = merged.index.normalize() - merged = merged[merged['date'].notna()] - #merged['date'] = merged.index.normalize() - #merged["date"] = merged["date"].astype("date64[pyarrow]") - - - if save_final_data: - dd.from_pandas(merged.sort_index(), npartitions=1).to_parquet(path=path_parquet, - engine='pyarrow', - partition_on=['date'], - coerce_timestamps='us', - allow_truncated_timestamps=True, - write_index=True, - append=False) - - - - -dir_pbp_1s = list_first_level_subdirs('/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new') - - -start_time = time.time() - -cluster = SLURMCluster(cores=8, - processes=8, - memory="32GB", - walltime="00:20:00", - job_extra_directives=['--partition=hourly'] - ) - -cluster.scale(1) -client = Client(cluster) -print(client.dashboard_link) - -i = 0 -for chunk_pbp in chunks(dir_pbp_1s, 50): - print(f'chunk: {i}') - - - dask.compute(*[resample_to_dt(dir_path_pbp, dt=60, - path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new_1min/', - save_final_data=True) for dir_path_pbp in chunk_pbp]) - gc.collect() - i += 1 -client.close() -cluster.close() -print("--- %s seconds ---" % (time.time() - start_time)) -''' - - -#%% Process sp2b parquet - -''' -start_time = time.time() - -cluster = SLURMCluster(cores=8, - processes=8, - memory="16GB", - walltime="00:30:00", - job_extra_directives=['--partition=hourly'] - ) -cluster.scale(1) -client = Client(cluster) -print(client.dashboard_link) - - - -sp2_raw_traces = dd.read_parquet('/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_parquet', calculate_divisions=True)#.repartition(freq='1h') - -test2 = process_sp2b_parquet(sp2_raw_traces, - scatt_sat_threshold = 1e9, - inc_sat_threshold = 1e8, - scatt_noise_threshold = 1e4, - - end_bkgr_ch0 = 100, - end_bkgr_ch1 = 350, - output_path = '/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_processed' - ) - - -client.close() -cluster.close() -print("--- %s seconds ---" % (time.time() - start_time)) - -''' \ No newline at end of file