keyword level added to control at which level to partition the parquet files

This commit is contained in:
2025-06-06 11:45:59 +02:00
parent a5bf02ec69
commit 9e0e541c9a

View File

@ -11,6 +11,7 @@ import numpy as np
import sys import sys
import pickle import pickle
import dask.dataframe as dd import dask.dataframe as dd
import dask
from dask.distributed import Client from dask.distributed import Client
from dask_jobqueue import SLURMCluster from dask_jobqueue import SLURMCluster
import datetime import datetime
@ -132,9 +133,10 @@ print("--- %s seconds ---" % (time.time() - start_time))
# config_path = '/data/user/bertoz_b/SP2XR/data/Granada/SP2XR_files/20240612/20240612061852' # config_path = '/data/user/bertoz_b/SP2XR/data/Granada/SP2XR_files/20240612/20240612061852'
config_path = '/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_files/20200922/20200922064903' config_path = '/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_files/20200922/20200922064903'
# The level keyword has been added to control the partitioning columns of the processed pbp files,
files_pbp = get_file_dict(target_directory_pbp_parquet, 'PbP') # this level has to match the partition_on keyword in the process_pbp_parquet function below
files_hk = get_file_dict(target_directory_hk_parquet, 'HK') files_pbp = get_file_dict(target_directory_pbp_parquet, 'PbP', level='date')
files_hk = get_file_dict(target_directory_hk_parquet, 'HK', level='date')
list_pbp = [] list_pbp = []
@ -151,20 +153,22 @@ start_time = time.time()
cluster = SLURMCluster(cores=8, cluster = SLURMCluster(cores=8,
processes=8, processes=8,
memory="128GB", memory="128GB",
walltime="10:59:00", walltime="11:59:00",
job_extra_directives=['--partition=general'] job_extra_directives=['--partition=daily']
) )
cluster.scale(1) cluster.scale(1)
client = Client(cluster) client = Client(cluster)
print(client.dashboard_link) print(client.dashboard_link)
# when at hour level the chunks can also be of 50 (hours), when level is day the chuncks should be max 25 (days)
# when using the cluster settings above (8cores, 128GB)
i = 0 i = 0
for chunk_pbp, chunk_hk in zip(chunks(list_pbp, 50), chunks(list_hk, 50)): for chunk_pbp, chunk_hk in zip(chunks(list_pbp, 25), chunks(list_hk, 25)):
print(f'chunk: {i}') print(f'chunk: {i}')
dask.compute(*[process_new_test(dir_path_pbp, dir_path_hk, dt=1, dask.compute(*[process_pbp_parquet(dir_path_pbp, dir_path_hk, dt=60,
rho_eff=1800, BC_type='constant_effective_density', rho_eff=1800, BC_type='constant_effective_density',
inc_calib_curve='polynomial', inc_calib_params=[0.05, 2.0470000507725255e-07], inc_calib_curve='polynomial', inc_calib_params=[0.05, 2.0470000507725255e-07],
scatt_calib_curve='powerlaw', scatt_calib_params=[17.21724257, 0.16908516, -1.49431104], scatt_calib_curve='powerlaw', scatt_calib_params=[17.21724257, 0.16908516, -1.49431104],
@ -175,7 +179,7 @@ for chunk_pbp, chunk_hk in zip(chunks(list_pbp, 50), chunks(list_hk, 50)):
minOptD=100, maxOptD=500, n_scattbins=20, minOptD=100, maxOptD=500, n_scattbins=20,
minTL=-10, maxTL=400, n_timelag=100, minTL=-10, maxTL=400, n_timelag=100,
path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new/', path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_20250212/', partition_on=['date'],
save_final_data=True) for dir_path_pbp, dir_path_hk in zip(chunk_pbp, chunk_hk)]) save_final_data=True) for dir_path_pbp, dir_path_hk in zip(chunk_pbp, chunk_hk)])
gc.collect() gc.collect()
i += 1 i += 1