Remove example_processing_code.py
This commit is contained in:
@@ -1,313 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Created on Mon May 27 11:45:19 2024
|
||||
|
||||
@author: bertoz_b
|
||||
"""
|
||||
|
||||
import time
|
||||
import pandas as pd
|
||||
import dask.dataframe as dd
|
||||
import dask
|
||||
from dask.distributed import Client
|
||||
from dask_jobqueue import SLURMCluster
|
||||
import gc
|
||||
|
||||
from SP2XR_toolkit import (
|
||||
find_files,
|
||||
read_and_process_sp2b,
|
||||
read_csv_files_with_dask_2,
|
||||
process_sp2b_parquet,
|
||||
resample_to_dt,
|
||||
chunks,
|
||||
list_first_level_subdirs,
|
||||
process_pbp_parquet,
|
||||
get_file_dict,
|
||||
)
|
||||
|
||||
|
||||
# %% Define directories and folders
|
||||
|
||||
parent_directory = "/data/user/bertoz_b/SP2XR/data/NyA"
|
||||
source_directory = parent_directory + "/SP2XR_files"
|
||||
|
||||
target_directory_pbp_parquet = parent_directory + "/SP2XR_pbp_parquet"
|
||||
filter_string_pbp = "PbP"
|
||||
|
||||
target_directory_sp2b_parquet = parent_directory + "/SP2XR_sp2b_parquet"
|
||||
filter_string_sp2b = "sp2b"
|
||||
|
||||
target_directory_hk_parquet = parent_directory + "/SP2XR_hk_parquet"
|
||||
filter_string_hk = "hk"
|
||||
|
||||
meta_file_pbp = pd.read_parquet(
|
||||
"/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/pbp_meta.parquet",
|
||||
engine="fastparquet",
|
||||
)
|
||||
meta_file_hk = pd.read_parquet(
|
||||
"/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/hk_meta.parquet",
|
||||
engine="fastparquet",
|
||||
)
|
||||
meta_file_sp2b = pd.read_parquet(
|
||||
"/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/meta_sp2b_20240619.parquet",
|
||||
engine="pyarrow",
|
||||
)
|
||||
|
||||
matching_files_pbp = find_files(source_directory, filter_string_pbp)
|
||||
matching_files_hk = find_files(source_directory, filter_string_hk)
|
||||
matching_files_sp2b = find_files(source_directory, filter_string_sp2b)[10000:50000]
|
||||
|
||||
|
||||
# %% PBP: From csv/zip to parquet
|
||||
|
||||
|
||||
start_time = time.time()
|
||||
cluster = SLURMCluster(
|
||||
cores=64,
|
||||
processes=64,
|
||||
memory="128GB",
|
||||
walltime="05:59:00",
|
||||
job_extra_directives=["--partition=daily"],
|
||||
)
|
||||
cluster.scale(1)
|
||||
client = Client(cluster)
|
||||
print(client.dashboard_link)
|
||||
|
||||
|
||||
i = 0
|
||||
for chunk_pbp in chunks(matching_files_pbp, 100):
|
||||
print(f"chunk: {i}")
|
||||
|
||||
dask.compute(
|
||||
*[
|
||||
read_csv_files_with_dask_2(
|
||||
f, meta_file_pbp, meta_file_hk, target_directory_pbp_parquet
|
||||
)
|
||||
for f in chunk_pbp
|
||||
]
|
||||
)
|
||||
|
||||
gc.collect()
|
||||
i += 1
|
||||
|
||||
client.close()
|
||||
cluster.close()
|
||||
print("--- %s seconds ---" % (time.time() - start_time))
|
||||
|
||||
|
||||
# %% HK: From csv/zip to parquet
|
||||
|
||||
|
||||
start_time = time.time()
|
||||
cluster = SLURMCluster(
|
||||
cores=16,
|
||||
processes=16,
|
||||
memory="128GB",
|
||||
walltime="07:59:00",
|
||||
job_extra_directives=["--partition=daily"],
|
||||
)
|
||||
cluster.scale(1)
|
||||
client = Client(cluster)
|
||||
print(client.dashboard_link)
|
||||
|
||||
i = 0
|
||||
for chunk_hk in chunks(matching_files_hk, 100):
|
||||
print(f"chunk: {i}")
|
||||
|
||||
dask.compute(
|
||||
*[
|
||||
read_csv_files_with_dask_2(
|
||||
f, meta_file_pbp, meta_file_hk, target_directory_hk_parquet
|
||||
)
|
||||
for f in chunk_hk
|
||||
]
|
||||
)
|
||||
|
||||
gc.collect()
|
||||
i += 1
|
||||
|
||||
client.close()
|
||||
cluster.close()
|
||||
print("--- %s seconds ---" % (time.time() - start_time))
|
||||
|
||||
# %% SP2B: From csv/zip to parquet
|
||||
|
||||
|
||||
start_time = time.time()
|
||||
cluster = SLURMCluster(
|
||||
cores=64,
|
||||
processes=64,
|
||||
memory="128GB", # 2GB/process should sufficient
|
||||
walltime="02:59:00",
|
||||
job_extra_directives=["--partition=daily"],
|
||||
)
|
||||
cluster.scale(1)
|
||||
client = Client(cluster)
|
||||
print(client.dashboard_link)
|
||||
|
||||
sp2b_orig = read_and_process_sp2b(
|
||||
matching_files_sp2b, target_directory_sp2b_parquet, meta_file_sp2b
|
||||
)
|
||||
|
||||
client.close()
|
||||
cluster.close()
|
||||
print("--- %s seconds ---" % (time.time() - start_time))
|
||||
|
||||
|
||||
# %% PBP: from single particle to specified dt
|
||||
|
||||
|
||||
# config_path = '/data/user/bertoz_b/SP2XR/data/Granada/SP2XR_files/20240612/20240612061852'
|
||||
config_path = "/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_files/20200922/20200922064903"
|
||||
|
||||
# The level keyword has been added to control the partitioning columns of the processed pbp files,
|
||||
# this level has to match the partition_on keyword in the process_pbp_parquet function below
|
||||
files_pbp = get_file_dict(target_directory_pbp_parquet, "PbP", level="date")
|
||||
files_hk = get_file_dict(target_directory_hk_parquet, "HK", level="date")
|
||||
|
||||
|
||||
list_pbp = []
|
||||
list_hk = []
|
||||
for key in files_pbp:
|
||||
if key in files_hk:
|
||||
list_pbp.append(files_pbp[key])
|
||||
list_hk.append(files_hk[key])
|
||||
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
cluster = SLURMCluster(
|
||||
cores=8,
|
||||
processes=8,
|
||||
memory="128GB",
|
||||
walltime="11:59:00",
|
||||
job_extra_directives=["--partition=daily"],
|
||||
)
|
||||
|
||||
cluster.scale(1)
|
||||
client = Client(cluster)
|
||||
print(client.dashboard_link)
|
||||
|
||||
# when at hour level the chunks can also be of 50 (hours), when level is day the chuncks should be max 25 (days)
|
||||
# when using the cluster settings above (8cores, 128GB)
|
||||
i = 0
|
||||
for chunk_pbp, chunk_hk in zip(chunks(list_pbp, 25), chunks(list_hk, 25)):
|
||||
print(f"chunk: {i}")
|
||||
|
||||
dask.compute(
|
||||
*[
|
||||
process_pbp_parquet(
|
||||
dir_path_pbp,
|
||||
dir_path_hk,
|
||||
dt=60,
|
||||
rho_eff=1800,
|
||||
BC_type="constant_effective_density",
|
||||
inc_calib_curve="polynomial",
|
||||
inc_calib_params=[0.05, 2.0470000507725255e-07],
|
||||
scatt_calib_curve="powerlaw",
|
||||
scatt_calib_params=[17.21724257, 0.16908516, -1.49431104],
|
||||
config_file_dir=config_path,
|
||||
minM=0.3,
|
||||
maxM=400,
|
||||
n_incbins=50,
|
||||
minOptD=100,
|
||||
maxOptD=500,
|
||||
n_scattbins=20,
|
||||
minTL=-10,
|
||||
maxTL=400,
|
||||
n_timelag=100,
|
||||
path_parquet="/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_20250212/",
|
||||
partition_on=["date"],
|
||||
save_final_data=True,
|
||||
)
|
||||
for dir_path_pbp, dir_path_hk in zip(chunk_pbp, chunk_hk)
|
||||
]
|
||||
)
|
||||
gc.collect()
|
||||
i += 1
|
||||
|
||||
client.close()
|
||||
cluster.close()
|
||||
print("--- %s seconds ---" % (time.time() - start_time))
|
||||
|
||||
|
||||
# %% PBP: Resample
|
||||
|
||||
dir_pbp_1s = list_first_level_subdirs(
|
||||
"/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new"
|
||||
)
|
||||
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
cluster = SLURMCluster(
|
||||
cores=8,
|
||||
processes=8,
|
||||
memory="32GB",
|
||||
walltime="00:20:00",
|
||||
job_extra_directives=["--partition=hourly"],
|
||||
)
|
||||
|
||||
cluster.scale(1)
|
||||
client = Client(cluster)
|
||||
print(client.dashboard_link)
|
||||
|
||||
|
||||
i = 0
|
||||
for chunk_pbp in chunks(dir_pbp_1s, 50):
|
||||
print(f"chunk: {i}")
|
||||
|
||||
dask.compute(
|
||||
*[
|
||||
resample_to_dt(
|
||||
dir_path_pbp,
|
||||
dt=60,
|
||||
path_parquet="/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new_1min/",
|
||||
save_final_data=True,
|
||||
)
|
||||
for dir_path_pbp in chunk_pbp
|
||||
]
|
||||
)
|
||||
gc.collect()
|
||||
i += 1
|
||||
|
||||
client.close()
|
||||
cluster.close()
|
||||
print("--- %s seconds ---" % (time.time() - start_time))
|
||||
|
||||
|
||||
# %% SP2B: Process parquet files
|
||||
# This is a very old function, it might need revisiting
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
cluster = SLURMCluster(
|
||||
cores=8,
|
||||
processes=8,
|
||||
memory="16GB",
|
||||
walltime="00:30:00",
|
||||
job_extra_directives=["--partition=hourly"],
|
||||
)
|
||||
cluster.scale(1)
|
||||
client = Client(cluster)
|
||||
print(client.dashboard_link)
|
||||
|
||||
|
||||
sp2_raw_traces = dd.read_parquet(
|
||||
"/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_parquet", calculate_divisions=True
|
||||
) # .repartition(freq='1h')
|
||||
|
||||
test2 = process_sp2b_parquet(
|
||||
sp2_raw_traces,
|
||||
scatt_sat_threshold=1e9,
|
||||
inc_sat_threshold=1e8,
|
||||
scatt_noise_threshold=1e4,
|
||||
end_bkgr_ch0=100,
|
||||
end_bkgr_ch1=350,
|
||||
output_path="/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_processed",
|
||||
)
|
||||
|
||||
|
||||
client.close()
|
||||
cluster.close()
|
||||
print("--- %s seconds ---" % (time.time() - start_time))
|
||||
Reference in New Issue
Block a user