# -*- coding: utf-8 -*- """ Created on Mon May 27 11:45:19 2024 @author: bertoz_b """ import time import pandas as pd import dask.dataframe as dd import dask from dask.distributed import Client from dask_jobqueue import SLURMCluster import gc from SP2XR_toolkit import ( find_files, read_and_process_sp2b, read_csv_files_with_dask_2, process_sp2b_parquet, resample_to_dt, chunks, list_first_level_subdirs, process_pbp_parquet, get_file_dict, ) # %% Define directories and folders parent_directory = "/data/user/bertoz_b/SP2XR/data/NyA" source_directory = parent_directory + "/SP2XR_files" target_directory_pbp_parquet = parent_directory + "/SP2XR_pbp_parquet" filter_string_pbp = "PbP" target_directory_sp2b_parquet = parent_directory + "/SP2XR_sp2b_parquet" filter_string_sp2b = "sp2b" target_directory_hk_parquet = parent_directory + "/SP2XR_hk_parquet" filter_string_hk = "hk" meta_file_pbp = pd.read_parquet( "/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/pbp_meta.parquet", engine="fastparquet", ) meta_file_hk = pd.read_parquet( "/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/hk_meta.parquet", engine="fastparquet", ) meta_file_sp2b = pd.read_parquet( "/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/meta_sp2b_20240619.parquet", engine="pyarrow", ) matching_files_pbp = find_files(source_directory, filter_string_pbp) matching_files_hk = find_files(source_directory, filter_string_hk) matching_files_sp2b = find_files(source_directory, filter_string_sp2b)[10000:50000] # %% PBP: From csv/zip to parquet start_time = time.time() cluster = SLURMCluster( cores=64, processes=64, memory="128GB", walltime="05:59:00", job_extra_directives=["--partition=daily"], ) cluster.scale(1) client = Client(cluster) print(client.dashboard_link) i = 0 for chunk_pbp in chunks(matching_files_pbp, 100): print(f"chunk: {i}") dask.compute( *[ read_csv_files_with_dask_2( f, meta_file_pbp, meta_file_hk, target_directory_pbp_parquet ) for f in chunk_pbp ] ) gc.collect() i += 1 client.close() cluster.close() print("--- %s seconds ---" % (time.time() - start_time)) # %% HK: From csv/zip to parquet start_time = time.time() cluster = SLURMCluster( cores=16, processes=16, memory="128GB", walltime="07:59:00", job_extra_directives=["--partition=daily"], ) cluster.scale(1) client = Client(cluster) print(client.dashboard_link) i = 0 for chunk_hk in chunks(matching_files_hk, 100): print(f"chunk: {i}") dask.compute( *[ read_csv_files_with_dask_2( f, meta_file_pbp, meta_file_hk, target_directory_hk_parquet ) for f in chunk_hk ] ) gc.collect() i += 1 client.close() cluster.close() print("--- %s seconds ---" % (time.time() - start_time)) # %% SP2B: From csv/zip to parquet start_time = time.time() cluster = SLURMCluster( cores=64, processes=64, memory="128GB", # 2GB/process should sufficient walltime="02:59:00", job_extra_directives=["--partition=daily"], ) cluster.scale(1) client = Client(cluster) print(client.dashboard_link) sp2b_orig = read_and_process_sp2b( matching_files_sp2b, target_directory_sp2b_parquet, meta_file_sp2b ) client.close() cluster.close() print("--- %s seconds ---" % (time.time() - start_time)) # %% PBP: from single particle to specified dt # config_path = '/data/user/bertoz_b/SP2XR/data/Granada/SP2XR_files/20240612/20240612061852' config_path = "/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_files/20200922/20200922064903" # The level keyword has been added to control the partitioning columns of the processed pbp files, # this level has to match the partition_on keyword in the process_pbp_parquet function below files_pbp = get_file_dict(target_directory_pbp_parquet, "PbP", level="date") files_hk = get_file_dict(target_directory_hk_parquet, "HK", level="date") list_pbp = [] list_hk = [] for key in files_pbp: if key in files_hk: list_pbp.append(files_pbp[key]) list_hk.append(files_hk[key]) start_time = time.time() cluster = SLURMCluster( cores=8, processes=8, memory="128GB", walltime="11:59:00", job_extra_directives=["--partition=daily"], ) cluster.scale(1) client = Client(cluster) print(client.dashboard_link) # when at hour level the chunks can also be of 50 (hours), when level is day the chuncks should be max 25 (days) # when using the cluster settings above (8cores, 128GB) i = 0 for chunk_pbp, chunk_hk in zip(chunks(list_pbp, 25), chunks(list_hk, 25)): print(f"chunk: {i}") dask.compute( *[ process_pbp_parquet( dir_path_pbp, dir_path_hk, dt=60, rho_eff=1800, BC_type="constant_effective_density", inc_calib_curve="polynomial", inc_calib_params=[0.05, 2.0470000507725255e-07], scatt_calib_curve="powerlaw", scatt_calib_params=[17.21724257, 0.16908516, -1.49431104], config_file_dir=config_path, minM=0.3, maxM=400, n_incbins=50, minOptD=100, maxOptD=500, n_scattbins=20, minTL=-10, maxTL=400, n_timelag=100, path_parquet="/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_20250212/", partition_on=["date"], save_final_data=True, ) for dir_path_pbp, dir_path_hk in zip(chunk_pbp, chunk_hk) ] ) gc.collect() i += 1 client.close() cluster.close() print("--- %s seconds ---" % (time.time() - start_time)) # %% PBP: Resample dir_pbp_1s = list_first_level_subdirs( "/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new" ) start_time = time.time() cluster = SLURMCluster( cores=8, processes=8, memory="32GB", walltime="00:20:00", job_extra_directives=["--partition=hourly"], ) cluster.scale(1) client = Client(cluster) print(client.dashboard_link) i = 0 for chunk_pbp in chunks(dir_pbp_1s, 50): print(f"chunk: {i}") dask.compute( *[ resample_to_dt( dir_path_pbp, dt=60, path_parquet="/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new_1min/", save_final_data=True, ) for dir_path_pbp in chunk_pbp ] ) gc.collect() i += 1 client.close() cluster.close() print("--- %s seconds ---" % (time.time() - start_time)) # %% SP2B: Process parquet files # This is a very old function, it might need revisiting start_time = time.time() cluster = SLURMCluster( cores=8, processes=8, memory="16GB", walltime="00:30:00", job_extra_directives=["--partition=hourly"], ) cluster.scale(1) client = Client(cluster) print(client.dashboard_link) sp2_raw_traces = dd.read_parquet( "/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_parquet", calculate_divisions=True ) # .repartition(freq='1h') test2 = process_sp2b_parquet( sp2_raw_traces, scatt_sat_threshold=1e9, inc_sat_threshold=1e8, scatt_noise_threshold=1e4, end_bkgr_ch0=100, end_bkgr_ch1=350, output_path="/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_processed", ) client.close() cluster.close() print("--- %s seconds ---" % (time.time() - start_time))