# -*- coding: utf-8 -*- """ Created on Mon May 27 11:45:19 2024 @author: bertoz_b """ import time import pandas as pd import numpy as np import sys import pickle import dask.dataframe as dd from dask.distributed import Client from dask_jobqueue import SLURMCluster import datetime import struct import zipfile from dask import delayed import itertools import gc from SP2XR_toolkit_2 import * def chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n] # ***** Part A 1 : from csv to parquet parent_directory = '/data/user/bertoz_b/SP2XR/data/NyA' source_directory = parent_directory + '/SP2XR_files' target_directory_pbp_parquet = parent_directory + '/SP2XR_pbp_parquet' filter_string_pbp = 'PbP' target_directory_sp2b_parquet = parent_directory + '/SP2XR_sp2b_parquet' filter_string_sp2b = 'sp2b' target_directory_hk_parquet = parent_directory + '/SP2XR_hk_parquet' filter_string_hk = 'hk' meta_file_pbp = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/pbp_meta.parquet', engine='fastparquet') meta_file_hk = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/hk_meta.parquet', engine='fastparquet') meta_file_sp2b = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/meta_sp2b_20240619.parquet', engine='pyarrow') matching_files_pbp = find_files(source_directory, filter_string_pbp) matching_files_hk = find_files(source_directory, filter_string_hk) # matching_files_sp2b = find_files(source_directory, filter_string_sp2b)[10000:50000] # PBP ''' start_time = time.time() cluster = SLURMCluster(cores=64, processes=64, memory="128GB", # 2GB/process should sufficient walltime="05:59:00", job_extra_directives=['--partition=daily'] ) cluster.scale(1) client = Client(cluster) print(client.dashboard_link) i = 0 for chunk_pbp in chunks(matching_files_pbp[3200:], 100): print(f'chunk: {i}') # pbp_orig = read_and_process(matching_files_pbp, target_directory_pbp_parquet, meta_file_pbp, meta_file_hk) dask.compute(*[read_csv_files_with_dask_2(f, meta_file_pbp, meta_file_hk, target_directory_pbp_parquet) for f in chunk_pbp]) gc.collect() i += 1 client.close() cluster.close() print("--- %s seconds ---" % (time.time() - start_time)) ''' # HK ''' start_time = time.time() cluster = SLURMCluster(cores=16, processes=16, memory="128GB", # 2GB/process should sufficient walltime="07:59:00", job_extra_directives=['--partition=daily'] ) cluster.scale(1) client = Client(cluster) print(client.dashboard_link) i = 0 for chunk_hk in chunks(matching_files_hk, 100): print(f'chunk: {i}') # hk_orig = read_and_process(matching_files_hk, target_directory_hk_parquet, meta_file_pbp, meta_file_hk) dask.compute(*[read_csv_files_with_dask_2(f, meta_file_pbp, meta_file_hk, target_directory_hk_parquet) for f in chunk_hk]) gc.collect() i += 1 client.close() cluster.close() print("--- %s seconds ---" % (time.time() - start_time)) ''' # SP2B ''' start_time = time.time() cluster = SLURMCluster(cores=64, processes=64, memory="128GB", # 2GB/process should sufficient walltime="02:59:00", job_extra_directives=['--partition=daily'] ) cluster.scale(1) client = Client(cluster) print(client.dashboard_link) sp2b_orig = read_and_process_sp2b(matching_files_sp2b, target_directory_sp2b_parquet, meta_file_sp2b) client.close() cluster.close() print("--- %s seconds ---" % (time.time() - start_time)) ''' #%% Process pbp data # config_path = '/data/user/bertoz_b/SP2XR/data/Granada/SP2XR_files/20240612/20240612061852' config_path = '/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_files/20200922/20200922064903' def get_file_dict(directory, file_type): """ Creates a dictionary with date and hour as keys and file paths as values for the given directory. """ file_dict = {} # Assuming the files have a structure like /path/to/directory/YYYYMMDD/HH/file.parquet for file_path in glob(f'{directory}/**/*.parquet', recursive=True): # Extracting date and hour from the file path parts = file_path.split(os.sep) date = parts[-3] hour = parts[-2] file_dict[(date, hour)] = os.path.join('/', *parts[:-1]) return file_dict files_pbp = get_file_dict(target_directory_pbp_parquet, 'PbP') files_hk = get_file_dict(target_directory_hk_parquet, 'HK') list_pbp = [] list_hk = [] for key in files_pbp: if key in files_hk: list_pbp.append(files_pbp[key]) list_hk.append(files_hk[key]) start_time = time.time() cluster = SLURMCluster(cores=8, processes=8, memory="128GB", walltime="10:59:00", job_extra_directives=['--partition=general'] ) # cluster = SLURMCluster(cores=8, # processes=8, # memory="64GB", # walltime="00:20:00", # job_extra_directives=['--partition=hourly'] # ) cluster.scale(1) client = Client(cluster) print(client.dashboard_link) i = 0 for chunk_pbp, chunk_hk in zip(chunks(list_pbp[8000:], 50), chunks(list_hk[8000:], 50)): print(f'chunk: {i}') dask.compute(*[process_new_test(dir_path_pbp, dir_path_hk, dt=1, rho_eff=1800, BC_type='constant_effective_density', inc_calib_curve='polynomial', inc_calib_params=[0.05, 2.0470000507725255e-07], scatt_calib_curve='powerlaw', scatt_calib_params=[17.21724257, 0.16908516, -1.49431104], config_file_dir=config_path, minM=0.3, maxM=400, n_incbins=50, minOptD=100, maxOptD=500, n_scattbins=20, minTL=-10, maxTL=400, n_timelag=100, path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new/', save_final_data=True) for dir_path_pbp, dir_path_hk in zip(chunk_pbp, chunk_hk)]) gc.collect() i += 1 client.close() cluster.close() print("--- %s seconds ---" % (time.time() - start_time)) #%% Resample pbp to 1min ''' def list_files_and_dirs(base_dir): """ List full paths of all files and directories in the given directory, including nested ones """ file_list = [] subdir_list = [] # os.walk yields a tuple (dirpath, dirnames, filenames) for dirpath, dirnames, filenames in os.walk(base_dir): # Add directories to subdir_list for dirname in dirnames: subdir_full_path = os.path.join(dirpath, dirname) subdir_list.append(subdir_full_path) # Add files to file_list for filename in filenames: file_full_path = os.path.join(dirpath, filename) file_list.append(file_full_path) return subdir_list, file_list def list_first_level_subdirs(base_dir): """ List full paths of all first-level subdirectories within the given directory """ subdir_list = [] # List all entries in the base directory for entry in os.listdir(base_dir): # Create full path full_path = os.path.join(base_dir, entry) # Check if this path is a directory if os.path.isdir(full_path): subdir_list.append(full_path) return subdir_list def resample_to_dt(dir_path_pbp, dt=60, path_parquet='', save_final_data=False): dd_data = pd.read_parquet(dir_path_pbp) dd_data['Time'] = dd_data.index dd_data['date'] = dd_data['Time'].dt.date.astype("date64[pyarrow]") dd_data['temporary_col'] = 1 cols_for_mean = [col for col in dd_data.columns if any(substring in col for substring in ['date', 'Conc', 'dNdlogDmev', 'dMdlogDmev', 'dNdlogDsc', 'Flow'])] #thin_cols_list = [col for col in dd_data.columns if 'thin' in col and all(substring not in col for substring in ['BC', 'dNdlogDmev', 'dMdlogDmev'])] #thick_cols_list = [col for col in dd_data.columns if 'thick' in col and all(substring not in col for substring in ['BC', 'dNdlogDmev', 'dMdlogDmev'])] #cols_for_sum = [col for col in dd_data.columns if any(substring in col for substring in ['timelag'])]+thin_cols_list+thick_cols_list timelag_hist_cols = [col for col in dd_data.columns if 'timelag' in col and all(substring not in col for substring in ['cnts', 'dNdlogDmev', 'dMdlogDmev'])] cnts_cols = [col for col in dd_data.columns if 'cnts' in col] addiotnal_cols = [col for col in dd_data.columns if col in ['Dropped Records', 'Incand Mass (fg)', 'BC mass', 'BC mass within range', 'BC numb from file', 'BC numb', 'BC numb within range', 'scatter numb from file', 'Scatt numb', 'Scatt numb within range']] cols_for_sum = timelag_hist_cols + cnts_cols + addiotnal_cols cols_for_count = ['temporary_col'] data_resampled_mean = dd_data[cols_for_mean].resample(f'{dt}s').mean() data_resampled_sum = dd_data[cols_for_sum].resample(f'{dt}s').sum() data_resampled_count = dd_data[cols_for_count].resample(f'{dt}s').count() #merged = dd.merge(data_resampled_mean, data_resampled_sum, left_index=True, right_index=True, how='outer') merged = pd.concat([data_resampled_mean, data_resampled_sum], axis=1) #merged['date'] = merged.index.normalize() merged = merged[merged['date'].notna()] #merged['date'] = merged.index.normalize() #merged["date"] = merged["date"].astype("date64[pyarrow]") if save_final_data: dd.from_pandas(merged.sort_index(), npartitions=1).to_parquet(path=path_parquet, engine='pyarrow', partition_on=['date'], coerce_timestamps='us', allow_truncated_timestamps=True, write_index=True, append=False) dir_pbp_1s = list_first_level_subdirs('/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new') start_time = time.time() cluster = SLURMCluster(cores=8, processes=8, memory="32GB", walltime="00:20:00", job_extra_directives=['--partition=hourly'] ) cluster.scale(1) client = Client(cluster) print(client.dashboard_link) i = 0 for chunk_pbp in chunks(dir_pbp_1s, 50): print(f'chunk: {i}') dask.compute(*[resample_to_dt(dir_path_pbp, dt=60, path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new_1min/', save_final_data=True) for dir_path_pbp in chunk_pbp]) gc.collect() i += 1 client.close() cluster.close() print("--- %s seconds ---" % (time.time() - start_time)) ''' #%% Process sp2b parquet ''' start_time = time.time() cluster = SLURMCluster(cores=8, processes=8, memory="16GB", walltime="00:30:00", job_extra_directives=['--partition=hourly'] ) cluster.scale(1) client = Client(cluster) print(client.dashboard_link) sp2_raw_traces = dd.read_parquet('/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_parquet', calculate_divisions=True)#.repartition(freq='1h') test2 = process_sp2b_parquet(sp2_raw_traces, scatt_sat_threshold = 1e9, inc_sat_threshold = 1e8, scatt_noise_threshold = 1e4, end_bkgr_ch0 = 100, end_bkgr_ch1 = 350, output_path = '/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_processed' ) client.close() cluster.close() print("--- %s seconds ---" % (time.time() - start_time)) '''