style: apply Black auto‑formatting

This commit is contained in:
Bertozzi Barbara
2025-07-24 19:53:51 +02:00
parent 34367c54af
commit 68f466c139
2 changed files with 1913 additions and 1147 deletions

View File

@@ -24,50 +24,66 @@ import gc
from SP2XR_toolkit import *
#%% Define directories and folders
# %% Define directories and folders
parent_directory = '/data/user/bertoz_b/SP2XR/data/NyA'
source_directory = parent_directory + '/SP2XR_files'
parent_directory = "/data/user/bertoz_b/SP2XR/data/NyA"
source_directory = parent_directory + "/SP2XR_files"
target_directory_pbp_parquet = parent_directory + '/SP2XR_pbp_parquet'
filter_string_pbp = 'PbP'
target_directory_pbp_parquet = parent_directory + "/SP2XR_pbp_parquet"
filter_string_pbp = "PbP"
target_directory_sp2b_parquet = parent_directory + '/SP2XR_sp2b_parquet'
filter_string_sp2b = 'sp2b'
target_directory_sp2b_parquet = parent_directory + "/SP2XR_sp2b_parquet"
filter_string_sp2b = "sp2b"
target_directory_hk_parquet = parent_directory + '/SP2XR_hk_parquet'
filter_string_hk = 'hk'
target_directory_hk_parquet = parent_directory + "/SP2XR_hk_parquet"
filter_string_hk = "hk"
meta_file_pbp = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/pbp_meta.parquet', engine='fastparquet')
meta_file_hk = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/hk_meta.parquet', engine='fastparquet')
meta_file_sp2b = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/meta_sp2b_20240619.parquet', engine='pyarrow')
meta_file_pbp = pd.read_parquet(
"/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/pbp_meta.parquet",
engine="fastparquet",
)
meta_file_hk = pd.read_parquet(
"/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/hk_meta.parquet",
engine="fastparquet",
)
meta_file_sp2b = pd.read_parquet(
"/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/meta_sp2b_20240619.parquet",
engine="pyarrow",
)
matching_files_pbp = find_files(source_directory, filter_string_pbp)
matching_files_hk = find_files(source_directory, filter_string_hk)
# matching_files_sp2b = find_files(source_directory, filter_string_sp2b)[10000:50000]
#%% PBP: From csv/zip to parquet
# %% PBP: From csv/zip to parquet
start_time = time.time()
cluster = SLURMCluster(cores=64,
processes=64,
memory="128GB",
walltime="05:59:00",
job_extra_directives=['--partition=daily']
)
cluster = SLURMCluster(
cores=64,
processes=64,
memory="128GB",
walltime="05:59:00",
job_extra_directives=["--partition=daily"],
)
cluster.scale(1)
client = Client(cluster)
print(client.dashboard_link)
i = 0
for chunk_pbp in chunks(matching_files_pbp, 100):
print(f'chunk: {i}')
print(f"chunk: {i}")
dask.compute(*[read_csv_files_with_dask_2(f, meta_file_pbp, meta_file_hk, target_directory_pbp_parquet) for f in chunk_pbp])
dask.compute(
*[
read_csv_files_with_dask_2(
f, meta_file_pbp, meta_file_hk, target_directory_pbp_parquet
)
for f in chunk_pbp
]
)
gc.collect()
i += 1
@@ -77,27 +93,34 @@ cluster.close()
print("--- %s seconds ---" % (time.time() - start_time))
#%% HK: From csv/zip to parquet
# %% HK: From csv/zip to parquet
start_time = time.time()
cluster = SLURMCluster(cores=16,
processes=16,
memory="128GB",
walltime="07:59:00",
job_extra_directives=['--partition=daily']
)
cluster = SLURMCluster(
cores=16,
processes=16,
memory="128GB",
walltime="07:59:00",
job_extra_directives=["--partition=daily"],
)
cluster.scale(1)
client = Client(cluster)
print(client.dashboard_link)
i = 0
for chunk_hk in chunks(matching_files_hk, 100):
print(f'chunk: {i}')
print(f"chunk: {i}")
dask.compute(
*[
read_csv_files_with_dask_2(
f, meta_file_pbp, meta_file_hk, target_directory_hk_parquet
)
for f in chunk_hk
]
)
dask.compute(*[read_csv_files_with_dask_2(f, meta_file_pbp, meta_file_hk, target_directory_hk_parquet) for f in chunk_hk])
gc.collect()
i += 1
@@ -105,38 +128,40 @@ client.close()
cluster.close()
print("--- %s seconds ---" % (time.time() - start_time))
#%% SP2B: From csv/zip to parquet
# %% SP2B: From csv/zip to parquet
start_time = time.time()
cluster = SLURMCluster(cores=64,
processes=64,
memory="128GB", # 2GB/process should sufficient
walltime="02:59:00",
job_extra_directives=['--partition=daily']
)
cluster = SLURMCluster(
cores=64,
processes=64,
memory="128GB", # 2GB/process should sufficient
walltime="02:59:00",
job_extra_directives=["--partition=daily"],
)
cluster.scale(1)
client = Client(cluster)
print(client.dashboard_link)
sp2b_orig = read_and_process_sp2b(matching_files_sp2b, target_directory_sp2b_parquet, meta_file_sp2b)
sp2b_orig = read_and_process_sp2b(
matching_files_sp2b, target_directory_sp2b_parquet, meta_file_sp2b
)
client.close()
cluster.close()
print("--- %s seconds ---" % (time.time() - start_time))
#%% PBP: from single particle to specified dt
# %% PBP: from single particle to specified dt
# config_path = '/data/user/bertoz_b/SP2XR/data/Granada/SP2XR_files/20240612/20240612061852'
config_path = '/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_files/20200922/20200922064903'
config_path = "/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_files/20200922/20200922064903"
# The level keyword has been added to control the partitioning columns of the processed pbp files,
# this level has to match the partition_on keyword in the process_pbp_parquet function below
files_pbp = get_file_dict(target_directory_pbp_parquet, 'PbP', level='date')
files_hk = get_file_dict(target_directory_hk_parquet, 'HK', level='date')
files_pbp = get_file_dict(target_directory_pbp_parquet, "PbP", level="date")
files_hk = get_file_dict(target_directory_hk_parquet, "HK", level="date")
list_pbp = []
@@ -145,17 +170,17 @@ for key in files_pbp:
if key in files_hk:
list_pbp.append(files_pbp[key])
list_hk.append(files_hk[key])
start_time = time.time()
cluster = SLURMCluster(cores=8,
processes=8,
memory="128GB",
walltime="11:59:00",
job_extra_directives=['--partition=daily']
)
cluster = SLURMCluster(
cores=8,
processes=8,
memory="128GB",
walltime="11:59:00",
job_extra_directives=["--partition=daily"],
)
cluster.scale(1)
client = Client(cluster)
@@ -164,45 +189,62 @@ print(client.dashboard_link)
# when at hour level the chunks can also be of 50 (hours), when level is day the chuncks should be max 25 (days)
# when using the cluster settings above (8cores, 128GB)
i = 0
for chunk_pbp, chunk_hk in zip(chunks(list_pbp, 25), chunks(list_hk, 25)):
print(f'chunk: {i}')
for chunk_pbp, chunk_hk in zip(chunks(list_pbp, 25), chunks(list_hk, 25)):
print(f"chunk: {i}")
dask.compute(*[process_pbp_parquet(dir_path_pbp, dir_path_hk, dt=60,
rho_eff=1800, BC_type='constant_effective_density',
inc_calib_curve='polynomial', inc_calib_params=[0.05, 2.0470000507725255e-07],
scatt_calib_curve='powerlaw', scatt_calib_params=[17.21724257, 0.16908516, -1.49431104],
config_file_dir=config_path,
minM=0.3, maxM=400, n_incbins=50,
minOptD=100, maxOptD=500, n_scattbins=20,
minTL=-10, maxTL=400, n_timelag=100,
path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_20250212/', partition_on=['date'],
save_final_data=True) for dir_path_pbp, dir_path_hk in zip(chunk_pbp, chunk_hk)])
dask.compute(
*[
process_pbp_parquet(
dir_path_pbp,
dir_path_hk,
dt=60,
rho_eff=1800,
BC_type="constant_effective_density",
inc_calib_curve="polynomial",
inc_calib_params=[0.05, 2.0470000507725255e-07],
scatt_calib_curve="powerlaw",
scatt_calib_params=[17.21724257, 0.16908516, -1.49431104],
config_file_dir=config_path,
minM=0.3,
maxM=400,
n_incbins=50,
minOptD=100,
maxOptD=500,
n_scattbins=20,
minTL=-10,
maxTL=400,
n_timelag=100,
path_parquet="/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_20250212/",
partition_on=["date"],
save_final_data=True,
)
for dir_path_pbp, dir_path_hk in zip(chunk_pbp, chunk_hk)
]
)
gc.collect()
i += 1
client.close()
cluster.close()
print("--- %s seconds ---" % (time.time() - start_time))
# %% PBP: Resample
dir_pbp_1s = list_first_level_subdirs(
"/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new"
)
#%% PBP: Resample
dir_pbp_1s = list_first_level_subdirs('/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new')
start_time = time.time()
cluster = SLURMCluster(cores=8,
processes=8,
memory="32GB",
walltime="00:20:00",
job_extra_directives=['--partition=hourly']
)
cluster = SLURMCluster(
cores=8,
processes=8,
memory="32GB",
walltime="00:20:00",
job_extra_directives=["--partition=hourly"],
)
cluster.scale(1)
client = Client(cluster)
@@ -211,49 +253,57 @@ print(client.dashboard_link)
i = 0
for chunk_pbp in chunks(dir_pbp_1s, 50):
print(f'chunk: {i}')
print(f"chunk: {i}")
dask.compute(*[resample_to_dt(dir_path_pbp, dt=60,
path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new_1min/',
save_final_data=True) for dir_path_pbp in chunk_pbp])
dask.compute(
*[
resample_to_dt(
dir_path_pbp,
dt=60,
path_parquet="/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new_1min/",
save_final_data=True,
)
for dir_path_pbp in chunk_pbp
]
)
gc.collect()
i += 1
client.close()
cluster.close()
print("--- %s seconds ---" % (time.time() - start_time))
#%% SP2B: Process parquet files
# %% SP2B: Process parquet files
# This is a very old function, it might need revisiting
start_time = time.time()
cluster = SLURMCluster(cores=8,
processes=8,
memory="16GB",
walltime="00:30:00",
job_extra_directives=['--partition=hourly']
)
cluster = SLURMCluster(
cores=8,
processes=8,
memory="16GB",
walltime="00:30:00",
job_extra_directives=["--partition=hourly"],
)
cluster.scale(1)
client = Client(cluster)
print(client.dashboard_link)
sp2_raw_traces = dd.read_parquet(
"/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_parquet", calculate_divisions=True
) # .repartition(freq='1h')
sp2_raw_traces = dd.read_parquet('/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_parquet', calculate_divisions=True)#.repartition(freq='1h')
test2 = process_sp2b_parquet(sp2_raw_traces,
scatt_sat_threshold = 1e9,
inc_sat_threshold = 1e8,
scatt_noise_threshold = 1e4,
end_bkgr_ch0 = 100,
end_bkgr_ch1 = 350,
output_path = '/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_processed'
)
test2 = process_sp2b_parquet(
sp2_raw_traces,
scatt_sat_threshold=1e9,
inc_sat_threshold=1e8,
scatt_noise_threshold=1e4,
end_bkgr_ch0=100,
end_bkgr_ch1=350,
output_path="/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_processed",
)
client.close()