diff --git a/example_processing_code.py b/example_processing_code.py index 04856a4..261398f 100644 --- a/example_processing_code.py +++ b/example_processing_code.py @@ -11,6 +11,7 @@ import numpy as np import sys import pickle import dask.dataframe as dd +import dask from dask.distributed import Client from dask_jobqueue import SLURMCluster import datetime @@ -132,9 +133,10 @@ print("--- %s seconds ---" % (time.time() - start_time)) # config_path = '/data/user/bertoz_b/SP2XR/data/Granada/SP2XR_files/20240612/20240612061852' config_path = '/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_files/20200922/20200922064903' - -files_pbp = get_file_dict(target_directory_pbp_parquet, 'PbP') -files_hk = get_file_dict(target_directory_hk_parquet, 'HK') +# The level keyword has been added to control the partitioning columns of the processed pbp files, +# this level has to match the partition_on keyword in the process_pbp_parquet function below +files_pbp = get_file_dict(target_directory_pbp_parquet, 'PbP', level='date') +files_hk = get_file_dict(target_directory_hk_parquet, 'HK', level='date') list_pbp = [] @@ -151,20 +153,22 @@ start_time = time.time() cluster = SLURMCluster(cores=8, processes=8, memory="128GB", - walltime="10:59:00", - job_extra_directives=['--partition=general'] + walltime="11:59:00", + job_extra_directives=['--partition=daily'] ) cluster.scale(1) client = Client(cluster) print(client.dashboard_link) +# when at hour level the chunks can also be of 50 (hours), when level is day the chuncks should be max 25 (days) +# when using the cluster settings above (8cores, 128GB) i = 0 -for chunk_pbp, chunk_hk in zip(chunks(list_pbp, 50), chunks(list_hk, 50)): +for chunk_pbp, chunk_hk in zip(chunks(list_pbp, 25), chunks(list_hk, 25)): print(f'chunk: {i}') - dask.compute(*[process_new_test(dir_path_pbp, dir_path_hk, dt=1, + dask.compute(*[process_pbp_parquet(dir_path_pbp, dir_path_hk, dt=60, rho_eff=1800, BC_type='constant_effective_density', inc_calib_curve='polynomial', inc_calib_params=[0.05, 2.0470000507725255e-07], scatt_calib_curve='powerlaw', scatt_calib_params=[17.21724257, 0.16908516, -1.49431104], @@ -175,7 +179,7 @@ for chunk_pbp, chunk_hk in zip(chunks(list_pbp, 50), chunks(list_hk, 50)): minOptD=100, maxOptD=500, n_scattbins=20, minTL=-10, maxTL=400, n_timelag=100, - path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new/', + path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_20250212/', partition_on=['date'], save_final_data=True) for dir_path_pbp, dir_path_hk in zip(chunk_pbp, chunk_hk)]) gc.collect() i += 1