258 lines
7.7 KiB
Python
258 lines
7.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Created on Mon May 27 11:45:19 2024
|
|
|
|
@author: bertoz_b
|
|
"""
|
|
|
|
import time
|
|
import pandas as pd
|
|
import numpy as np
|
|
import sys
|
|
import pickle
|
|
import dask.dataframe as dd
|
|
from dask.distributed import Client
|
|
from dask_jobqueue import SLURMCluster
|
|
import datetime
|
|
import struct
|
|
import zipfile
|
|
from dask import delayed
|
|
import itertools
|
|
import gc
|
|
|
|
from SP2XR_toolkit import *
|
|
|
|
|
|
#%% Define directories and folders
|
|
|
|
parent_directory = '/data/user/bertoz_b/SP2XR/data/NyA'
|
|
source_directory = parent_directory + '/SP2XR_files'
|
|
|
|
target_directory_pbp_parquet = parent_directory + '/SP2XR_pbp_parquet'
|
|
filter_string_pbp = 'PbP'
|
|
|
|
target_directory_sp2b_parquet = parent_directory + '/SP2XR_sp2b_parquet'
|
|
filter_string_sp2b = 'sp2b'
|
|
|
|
target_directory_hk_parquet = parent_directory + '/SP2XR_hk_parquet'
|
|
filter_string_hk = 'hk'
|
|
|
|
meta_file_pbp = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/pbp_meta.parquet', engine='fastparquet')
|
|
meta_file_hk = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/hk_meta.parquet', engine='fastparquet')
|
|
meta_file_sp2b = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/meta_sp2b_20240619.parquet', engine='pyarrow')
|
|
|
|
matching_files_pbp = find_files(source_directory, filter_string_pbp)
|
|
matching_files_hk = find_files(source_directory, filter_string_hk)
|
|
# matching_files_sp2b = find_files(source_directory, filter_string_sp2b)[10000:50000]
|
|
|
|
|
|
#%% PBP: From csv/zip to parquet
|
|
|
|
|
|
start_time = time.time()
|
|
cluster = SLURMCluster(cores=64,
|
|
processes=64,
|
|
memory="128GB",
|
|
walltime="05:59:00",
|
|
job_extra_directives=['--partition=daily']
|
|
)
|
|
cluster.scale(1)
|
|
client = Client(cluster)
|
|
print(client.dashboard_link)
|
|
|
|
|
|
|
|
i = 0
|
|
for chunk_pbp in chunks(matching_files_pbp, 100):
|
|
print(f'chunk: {i}')
|
|
|
|
dask.compute(*[read_csv_files_with_dask_2(f, meta_file_pbp, meta_file_hk, target_directory_pbp_parquet) for f in chunk_pbp])
|
|
|
|
gc.collect()
|
|
i += 1
|
|
|
|
client.close()
|
|
cluster.close()
|
|
print("--- %s seconds ---" % (time.time() - start_time))
|
|
|
|
|
|
|
|
#%% HK: From csv/zip to parquet
|
|
|
|
|
|
start_time = time.time()
|
|
cluster = SLURMCluster(cores=16,
|
|
processes=16,
|
|
memory="128GB",
|
|
walltime="07:59:00",
|
|
job_extra_directives=['--partition=daily']
|
|
)
|
|
cluster.scale(1)
|
|
client = Client(cluster)
|
|
print(client.dashboard_link)
|
|
|
|
i = 0
|
|
for chunk_hk in chunks(matching_files_hk, 100):
|
|
print(f'chunk: {i}')
|
|
|
|
dask.compute(*[read_csv_files_with_dask_2(f, meta_file_pbp, meta_file_hk, target_directory_hk_parquet) for f in chunk_hk])
|
|
|
|
gc.collect()
|
|
i += 1
|
|
|
|
client.close()
|
|
cluster.close()
|
|
print("--- %s seconds ---" % (time.time() - start_time))
|
|
|
|
#%% SP2B: From csv/zip to parquet
|
|
|
|
|
|
start_time = time.time()
|
|
cluster = SLURMCluster(cores=64,
|
|
processes=64,
|
|
memory="128GB", # 2GB/process should sufficient
|
|
walltime="02:59:00",
|
|
job_extra_directives=['--partition=daily']
|
|
)
|
|
cluster.scale(1)
|
|
client = Client(cluster)
|
|
print(client.dashboard_link)
|
|
|
|
sp2b_orig = read_and_process_sp2b(matching_files_sp2b, target_directory_sp2b_parquet, meta_file_sp2b)
|
|
|
|
client.close()
|
|
cluster.close()
|
|
print("--- %s seconds ---" % (time.time() - start_time))
|
|
|
|
|
|
|
|
#%% PBP: from single particle to specified dt
|
|
|
|
|
|
# config_path = '/data/user/bertoz_b/SP2XR/data/Granada/SP2XR_files/20240612/20240612061852'
|
|
config_path = '/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_files/20200922/20200922064903'
|
|
|
|
|
|
files_pbp = get_file_dict(target_directory_pbp_parquet, 'PbP')
|
|
files_hk = get_file_dict(target_directory_hk_parquet, 'HK')
|
|
|
|
|
|
list_pbp = []
|
|
list_hk = []
|
|
for key in files_pbp:
|
|
if key in files_hk:
|
|
list_pbp.append(files_pbp[key])
|
|
list_hk.append(files_hk[key])
|
|
|
|
|
|
|
|
start_time = time.time()
|
|
|
|
cluster = SLURMCluster(cores=8,
|
|
processes=8,
|
|
memory="128GB",
|
|
walltime="10:59:00",
|
|
job_extra_directives=['--partition=general']
|
|
)
|
|
|
|
cluster.scale(1)
|
|
client = Client(cluster)
|
|
print(client.dashboard_link)
|
|
|
|
i = 0
|
|
for chunk_pbp, chunk_hk in zip(chunks(list_pbp, 50), chunks(list_hk, 50)):
|
|
print(f'chunk: {i}')
|
|
|
|
|
|
dask.compute(*[process_new_test(dir_path_pbp, dir_path_hk, dt=1,
|
|
rho_eff=1800, BC_type='constant_effective_density',
|
|
inc_calib_curve='polynomial', inc_calib_params=[0.05, 2.0470000507725255e-07],
|
|
scatt_calib_curve='powerlaw', scatt_calib_params=[17.21724257, 0.16908516, -1.49431104],
|
|
config_file_dir=config_path,
|
|
|
|
|
|
minM=0.3, maxM=400, n_incbins=50,
|
|
minOptD=100, maxOptD=500, n_scattbins=20,
|
|
minTL=-10, maxTL=400, n_timelag=100,
|
|
|
|
path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new/',
|
|
save_final_data=True) for dir_path_pbp, dir_path_hk in zip(chunk_pbp, chunk_hk)])
|
|
gc.collect()
|
|
i += 1
|
|
|
|
client.close()
|
|
cluster.close()
|
|
print("--- %s seconds ---" % (time.time() - start_time))
|
|
|
|
|
|
|
|
#%% PBP: Resample
|
|
|
|
dir_pbp_1s = list_first_level_subdirs('/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new')
|
|
|
|
|
|
start_time = time.time()
|
|
|
|
cluster = SLURMCluster(cores=8,
|
|
processes=8,
|
|
memory="32GB",
|
|
walltime="00:20:00",
|
|
job_extra_directives=['--partition=hourly']
|
|
)
|
|
|
|
cluster.scale(1)
|
|
client = Client(cluster)
|
|
print(client.dashboard_link)
|
|
|
|
|
|
i = 0
|
|
for chunk_pbp in chunks(dir_pbp_1s, 50):
|
|
print(f'chunk: {i}')
|
|
|
|
|
|
dask.compute(*[resample_to_dt(dir_path_pbp, dt=60,
|
|
path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new_1min/',
|
|
save_final_data=True) for dir_path_pbp in chunk_pbp])
|
|
gc.collect()
|
|
i += 1
|
|
|
|
client.close()
|
|
cluster.close()
|
|
print("--- %s seconds ---" % (time.time() - start_time))
|
|
|
|
|
|
|
|
#%% SP2B: Process parquet files
|
|
# This is a very old function, it might need revisiting
|
|
|
|
start_time = time.time()
|
|
|
|
cluster = SLURMCluster(cores=8,
|
|
processes=8,
|
|
memory="16GB",
|
|
walltime="00:30:00",
|
|
job_extra_directives=['--partition=hourly']
|
|
)
|
|
cluster.scale(1)
|
|
client = Client(cluster)
|
|
print(client.dashboard_link)
|
|
|
|
|
|
|
|
sp2_raw_traces = dd.read_parquet('/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_parquet', calculate_divisions=True)#.repartition(freq='1h')
|
|
|
|
test2 = process_sp2b_parquet(sp2_raw_traces,
|
|
scatt_sat_threshold = 1e9,
|
|
inc_sat_threshold = 1e8,
|
|
scatt_noise_threshold = 1e4,
|
|
|
|
end_bkgr_ch0 = 100,
|
|
end_bkgr_ch1 = 350,
|
|
output_path = '/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_processed'
|
|
)
|
|
|
|
|
|
client.close()
|
|
cluster.close()
|
|
print("--- %s seconds ---" % (time.time() - start_time))
|