From a0666be19f380781a371bf1c685a81075339b12b Mon Sep 17 00:00:00 2001 From: Barbara Bertozzi Date: Thu, 11 Sep 2025 12:01:39 +0200 Subject: [PATCH] Remove example_processing_code.py --- example_processing_code.py | 313 ------------------------------------- 1 file changed, 313 deletions(-) delete mode 100644 example_processing_code.py diff --git a/example_processing_code.py b/example_processing_code.py deleted file mode 100644 index c01193b..0000000 --- a/example_processing_code.py +++ /dev/null @@ -1,313 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Created on Mon May 27 11:45:19 2024 - -@author: bertoz_b -""" - -import time -import pandas as pd -import dask.dataframe as dd -import dask -from dask.distributed import Client -from dask_jobqueue import SLURMCluster -import gc - -from SP2XR_toolkit import ( - find_files, - read_and_process_sp2b, - read_csv_files_with_dask_2, - process_sp2b_parquet, - resample_to_dt, - chunks, - list_first_level_subdirs, - process_pbp_parquet, - get_file_dict, -) - - -# %% Define directories and folders - -parent_directory = "/data/user/bertoz_b/SP2XR/data/NyA" -source_directory = parent_directory + "/SP2XR_files" - -target_directory_pbp_parquet = parent_directory + "/SP2XR_pbp_parquet" -filter_string_pbp = "PbP" - -target_directory_sp2b_parquet = parent_directory + "/SP2XR_sp2b_parquet" -filter_string_sp2b = "sp2b" - -target_directory_hk_parquet = parent_directory + "/SP2XR_hk_parquet" -filter_string_hk = "hk" - -meta_file_pbp = pd.read_parquet( - "/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/pbp_meta.parquet", - engine="fastparquet", -) -meta_file_hk = pd.read_parquet( - "/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/hk_meta.parquet", - engine="fastparquet", -) -meta_file_sp2b = pd.read_parquet( - "/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/meta_sp2b_20240619.parquet", - engine="pyarrow", -) - -matching_files_pbp = find_files(source_directory, filter_string_pbp) -matching_files_hk = find_files(source_directory, filter_string_hk) -matching_files_sp2b = find_files(source_directory, filter_string_sp2b)[10000:50000] - - -# %% PBP: From csv/zip to parquet - - -start_time = time.time() -cluster = SLURMCluster( - cores=64, - processes=64, - memory="128GB", - walltime="05:59:00", - job_extra_directives=["--partition=daily"], -) -cluster.scale(1) -client = Client(cluster) -print(client.dashboard_link) - - -i = 0 -for chunk_pbp in chunks(matching_files_pbp, 100): - print(f"chunk: {i}") - - dask.compute( - *[ - read_csv_files_with_dask_2( - f, meta_file_pbp, meta_file_hk, target_directory_pbp_parquet - ) - for f in chunk_pbp - ] - ) - - gc.collect() - i += 1 - -client.close() -cluster.close() -print("--- %s seconds ---" % (time.time() - start_time)) - - -# %% HK: From csv/zip to parquet - - -start_time = time.time() -cluster = SLURMCluster( - cores=16, - processes=16, - memory="128GB", - walltime="07:59:00", - job_extra_directives=["--partition=daily"], -) -cluster.scale(1) -client = Client(cluster) -print(client.dashboard_link) - -i = 0 -for chunk_hk in chunks(matching_files_hk, 100): - print(f"chunk: {i}") - - dask.compute( - *[ - read_csv_files_with_dask_2( - f, meta_file_pbp, meta_file_hk, target_directory_hk_parquet - ) - for f in chunk_hk - ] - ) - - gc.collect() - i += 1 - -client.close() -cluster.close() -print("--- %s seconds ---" % (time.time() - start_time)) - -# %% SP2B: From csv/zip to parquet - - -start_time = time.time() -cluster = SLURMCluster( - cores=64, - processes=64, - memory="128GB", # 2GB/process should sufficient - walltime="02:59:00", - job_extra_directives=["--partition=daily"], -) -cluster.scale(1) -client = Client(cluster) -print(client.dashboard_link) - -sp2b_orig = read_and_process_sp2b( - matching_files_sp2b, target_directory_sp2b_parquet, meta_file_sp2b -) - -client.close() -cluster.close() -print("--- %s seconds ---" % (time.time() - start_time)) - - -# %% PBP: from single particle to specified dt - - -# config_path = '/data/user/bertoz_b/SP2XR/data/Granada/SP2XR_files/20240612/20240612061852' -config_path = "/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_files/20200922/20200922064903" - -# The level keyword has been added to control the partitioning columns of the processed pbp files, -# this level has to match the partition_on keyword in the process_pbp_parquet function below -files_pbp = get_file_dict(target_directory_pbp_parquet, "PbP", level="date") -files_hk = get_file_dict(target_directory_hk_parquet, "HK", level="date") - - -list_pbp = [] -list_hk = [] -for key in files_pbp: - if key in files_hk: - list_pbp.append(files_pbp[key]) - list_hk.append(files_hk[key]) - - -start_time = time.time() - -cluster = SLURMCluster( - cores=8, - processes=8, - memory="128GB", - walltime="11:59:00", - job_extra_directives=["--partition=daily"], -) - -cluster.scale(1) -client = Client(cluster) -print(client.dashboard_link) - -# when at hour level the chunks can also be of 50 (hours), when level is day the chuncks should be max 25 (days) -# when using the cluster settings above (8cores, 128GB) -i = 0 -for chunk_pbp, chunk_hk in zip(chunks(list_pbp, 25), chunks(list_hk, 25)): - print(f"chunk: {i}") - - dask.compute( - *[ - process_pbp_parquet( - dir_path_pbp, - dir_path_hk, - dt=60, - rho_eff=1800, - BC_type="constant_effective_density", - inc_calib_curve="polynomial", - inc_calib_params=[0.05, 2.0470000507725255e-07], - scatt_calib_curve="powerlaw", - scatt_calib_params=[17.21724257, 0.16908516, -1.49431104], - config_file_dir=config_path, - minM=0.3, - maxM=400, - n_incbins=50, - minOptD=100, - maxOptD=500, - n_scattbins=20, - minTL=-10, - maxTL=400, - n_timelag=100, - path_parquet="/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_20250212/", - partition_on=["date"], - save_final_data=True, - ) - for dir_path_pbp, dir_path_hk in zip(chunk_pbp, chunk_hk) - ] - ) - gc.collect() - i += 1 - -client.close() -cluster.close() -print("--- %s seconds ---" % (time.time() - start_time)) - - -# %% PBP: Resample - -dir_pbp_1s = list_first_level_subdirs( - "/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new" -) - - -start_time = time.time() - -cluster = SLURMCluster( - cores=8, - processes=8, - memory="32GB", - walltime="00:20:00", - job_extra_directives=["--partition=hourly"], -) - -cluster.scale(1) -client = Client(cluster) -print(client.dashboard_link) - - -i = 0 -for chunk_pbp in chunks(dir_pbp_1s, 50): - print(f"chunk: {i}") - - dask.compute( - *[ - resample_to_dt( - dir_path_pbp, - dt=60, - path_parquet="/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new_1min/", - save_final_data=True, - ) - for dir_path_pbp in chunk_pbp - ] - ) - gc.collect() - i += 1 - -client.close() -cluster.close() -print("--- %s seconds ---" % (time.time() - start_time)) - - -# %% SP2B: Process parquet files -# This is a very old function, it might need revisiting - -start_time = time.time() - -cluster = SLURMCluster( - cores=8, - processes=8, - memory="16GB", - walltime="00:30:00", - job_extra_directives=["--partition=hourly"], -) -cluster.scale(1) -client = Client(cluster) -print(client.dashboard_link) - - -sp2_raw_traces = dd.read_parquet( - "/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_parquet", calculate_divisions=True -) # .repartition(freq='1h') - -test2 = process_sp2b_parquet( - sp2_raw_traces, - scatt_sat_threshold=1e9, - inc_sat_threshold=1e8, - scatt_noise_threshold=1e4, - end_bkgr_ch0=100, - end_bkgr_ch1=350, - output_path="/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_processed", -) - - -client.close() -cluster.close() -print("--- %s seconds ---" % (time.time() - start_time))