Remove NyA code

This commit is contained in:
2024-09-18 14:03:37 +02:00
parent d2bd9f7c75
commit 890314ad24

View File

@ -1,363 +0,0 @@
# -*- coding: utf-8 -*-
"""
Created on Mon May 27 11:45:19 2024
@author: bertoz_b
"""
import time
import pandas as pd
import numpy as np
import sys
import pickle
import dask.dataframe as dd
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
import datetime
import struct
import zipfile
from dask import delayed
import itertools
import gc
from SP2XR_toolkit_2 import *
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
# ***** Part A 1 : from csv to parquet
parent_directory = '/data/user/bertoz_b/SP2XR/data/NyA'
source_directory = parent_directory + '/SP2XR_files'
target_directory_pbp_parquet = parent_directory + '/SP2XR_pbp_parquet'
filter_string_pbp = 'PbP'
target_directory_sp2b_parquet = parent_directory + '/SP2XR_sp2b_parquet'
filter_string_sp2b = 'sp2b'
target_directory_hk_parquet = parent_directory + '/SP2XR_hk_parquet'
filter_string_hk = 'hk'
meta_file_pbp = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/pbp_meta.parquet', engine='fastparquet')
meta_file_hk = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/hk_meta.parquet', engine='fastparquet')
meta_file_sp2b = pd.read_parquet('/data/user/bertoz_b/SP2XR/final_test_script_and_data/meta_files/meta_sp2b_20240619.parquet', engine='pyarrow')
matching_files_pbp = find_files(source_directory, filter_string_pbp)
matching_files_hk = find_files(source_directory, filter_string_hk)
# matching_files_sp2b = find_files(source_directory, filter_string_sp2b)[10000:50000]
# PBP
'''
start_time = time.time()
cluster = SLURMCluster(cores=64,
processes=64,
memory="128GB", # 2GB/process should sufficient
walltime="05:59:00",
job_extra_directives=['--partition=daily']
)
cluster.scale(1)
client = Client(cluster)
print(client.dashboard_link)
i = 0
for chunk_pbp in chunks(matching_files_pbp[3200:], 100):
print(f'chunk: {i}')
# pbp_orig = read_and_process(matching_files_pbp, target_directory_pbp_parquet, meta_file_pbp, meta_file_hk)
dask.compute(*[read_csv_files_with_dask_2(f, meta_file_pbp, meta_file_hk, target_directory_pbp_parquet) for f in chunk_pbp])
gc.collect()
i += 1
client.close()
cluster.close()
print("--- %s seconds ---" % (time.time() - start_time))
'''
# HK
'''
start_time = time.time()
cluster = SLURMCluster(cores=16,
processes=16,
memory="128GB", # 2GB/process should sufficient
walltime="07:59:00",
job_extra_directives=['--partition=daily']
)
cluster.scale(1)
client = Client(cluster)
print(client.dashboard_link)
i = 0
for chunk_hk in chunks(matching_files_hk, 100):
print(f'chunk: {i}')
# hk_orig = read_and_process(matching_files_hk, target_directory_hk_parquet, meta_file_pbp, meta_file_hk)
dask.compute(*[read_csv_files_with_dask_2(f, meta_file_pbp, meta_file_hk, target_directory_hk_parquet) for f in chunk_hk])
gc.collect()
i += 1
client.close()
cluster.close()
print("--- %s seconds ---" % (time.time() - start_time))
'''
# SP2B
'''
start_time = time.time()
cluster = SLURMCluster(cores=64,
processes=64,
memory="128GB", # 2GB/process should sufficient
walltime="02:59:00",
job_extra_directives=['--partition=daily']
)
cluster.scale(1)
client = Client(cluster)
print(client.dashboard_link)
sp2b_orig = read_and_process_sp2b(matching_files_sp2b, target_directory_sp2b_parquet, meta_file_sp2b)
client.close()
cluster.close()
print("--- %s seconds ---" % (time.time() - start_time))
'''
#%% Process pbp data
# config_path = '/data/user/bertoz_b/SP2XR/data/Granada/SP2XR_files/20240612/20240612061852'
config_path = '/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_files/20200922/20200922064903'
def get_file_dict(directory, file_type):
"""
Creates a dictionary with date and hour as keys and file paths as values
for the given directory.
"""
file_dict = {}
# Assuming the files have a structure like /path/to/directory/YYYYMMDD/HH/file.parquet
for file_path in glob(f'{directory}/**/*.parquet', recursive=True):
# Extracting date and hour from the file path
parts = file_path.split(os.sep)
date = parts[-3]
hour = parts[-2]
file_dict[(date, hour)] = os.path.join('/', *parts[:-1])
return file_dict
files_pbp = get_file_dict(target_directory_pbp_parquet, 'PbP')
files_hk = get_file_dict(target_directory_hk_parquet, 'HK')
list_pbp = []
list_hk = []
for key in files_pbp:
if key in files_hk:
list_pbp.append(files_pbp[key])
list_hk.append(files_hk[key])
start_time = time.time()
cluster = SLURMCluster(cores=8,
processes=8,
memory="128GB",
walltime="10:59:00",
job_extra_directives=['--partition=general']
)
# cluster = SLURMCluster(cores=8,
# processes=8,
# memory="64GB",
# walltime="00:20:00",
# job_extra_directives=['--partition=hourly']
# )
cluster.scale(1)
client = Client(cluster)
print(client.dashboard_link)
i = 0
for chunk_pbp, chunk_hk in zip(chunks(list_pbp[8000:], 50), chunks(list_hk[8000:], 50)):
print(f'chunk: {i}')
dask.compute(*[process_new_test(dir_path_pbp, dir_path_hk, dt=1,
rho_eff=1800, BC_type='constant_effective_density',
inc_calib_curve='polynomial', inc_calib_params=[0.05, 2.0470000507725255e-07],
scatt_calib_curve='powerlaw', scatt_calib_params=[17.21724257, 0.16908516, -1.49431104],
config_file_dir=config_path,
minM=0.3, maxM=400, n_incbins=50,
minOptD=100, maxOptD=500, n_scattbins=20,
minTL=-10, maxTL=400, n_timelag=100,
path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new/',
save_final_data=True) for dir_path_pbp, dir_path_hk in zip(chunk_pbp, chunk_hk)])
gc.collect()
i += 1
client.close()
cluster.close()
print("--- %s seconds ---" % (time.time() - start_time))
#%% Resample pbp to 1min
'''
def list_files_and_dirs(base_dir):
""" List full paths of all files and directories in the given directory, including nested ones """
file_list = []
subdir_list = []
# os.walk yields a tuple (dirpath, dirnames, filenames)
for dirpath, dirnames, filenames in os.walk(base_dir):
# Add directories to subdir_list
for dirname in dirnames:
subdir_full_path = os.path.join(dirpath, dirname)
subdir_list.append(subdir_full_path)
# Add files to file_list
for filename in filenames:
file_full_path = os.path.join(dirpath, filename)
file_list.append(file_full_path)
return subdir_list, file_list
def list_first_level_subdirs(base_dir):
""" List full paths of all first-level subdirectories within the given directory """
subdir_list = []
# List all entries in the base directory
for entry in os.listdir(base_dir):
# Create full path
full_path = os.path.join(base_dir, entry)
# Check if this path is a directory
if os.path.isdir(full_path):
subdir_list.append(full_path)
return subdir_list
def resample_to_dt(dir_path_pbp, dt=60, path_parquet='', save_final_data=False):
dd_data = pd.read_parquet(dir_path_pbp)
dd_data['Time'] = dd_data.index
dd_data['date'] = dd_data['Time'].dt.date.astype("date64[pyarrow]")
dd_data['temporary_col'] = 1
cols_for_mean = [col for col in dd_data.columns if any(substring in col for substring in ['date', 'Conc', 'dNdlogDmev', 'dMdlogDmev', 'dNdlogDsc', 'Flow'])]
#thin_cols_list = [col for col in dd_data.columns if 'thin' in col and all(substring not in col for substring in ['BC', 'dNdlogDmev', 'dMdlogDmev'])]
#thick_cols_list = [col for col in dd_data.columns if 'thick' in col and all(substring not in col for substring in ['BC', 'dNdlogDmev', 'dMdlogDmev'])]
#cols_for_sum = [col for col in dd_data.columns if any(substring in col for substring in ['timelag'])]+thin_cols_list+thick_cols_list
timelag_hist_cols = [col for col in dd_data.columns if 'timelag' in col and all(substring not in col for substring in ['cnts', 'dNdlogDmev', 'dMdlogDmev'])]
cnts_cols = [col for col in dd_data.columns if 'cnts' in col]
addiotnal_cols = [col for col in dd_data.columns if col in ['Dropped Records', 'Incand Mass (fg)', 'BC mass', 'BC mass within range',
'BC numb from file', 'BC numb', 'BC numb within range',
'scatter numb from file', 'Scatt numb', 'Scatt numb within range']]
cols_for_sum = timelag_hist_cols + cnts_cols + addiotnal_cols
cols_for_count = ['temporary_col']
data_resampled_mean = dd_data[cols_for_mean].resample(f'{dt}s').mean()
data_resampled_sum = dd_data[cols_for_sum].resample(f'{dt}s').sum()
data_resampled_count = dd_data[cols_for_count].resample(f'{dt}s').count()
#merged = dd.merge(data_resampled_mean, data_resampled_sum, left_index=True, right_index=True, how='outer')
merged = pd.concat([data_resampled_mean, data_resampled_sum], axis=1)
#merged['date'] = merged.index.normalize()
merged = merged[merged['date'].notna()]
#merged['date'] = merged.index.normalize()
#merged["date"] = merged["date"].astype("date64[pyarrow]")
if save_final_data:
dd.from_pandas(merged.sort_index(), npartitions=1).to_parquet(path=path_parquet,
engine='pyarrow',
partition_on=['date'],
coerce_timestamps='us',
allow_truncated_timestamps=True,
write_index=True,
append=False)
dir_pbp_1s = list_first_level_subdirs('/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new')
start_time = time.time()
cluster = SLURMCluster(cores=8,
processes=8,
memory="32GB",
walltime="00:20:00",
job_extra_directives=['--partition=hourly']
)
cluster.scale(1)
client = Client(cluster)
print(client.dashboard_link)
i = 0
for chunk_pbp in chunks(dir_pbp_1s, 50):
print(f'chunk: {i}')
dask.compute(*[resample_to_dt(dir_path_pbp, dt=60,
path_parquet='/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_pbp_processed_new_1min/',
save_final_data=True) for dir_path_pbp in chunk_pbp])
gc.collect()
i += 1
client.close()
cluster.close()
print("--- %s seconds ---" % (time.time() - start_time))
'''
#%% Process sp2b parquet
'''
start_time = time.time()
cluster = SLURMCluster(cores=8,
processes=8,
memory="16GB",
walltime="00:30:00",
job_extra_directives=['--partition=hourly']
)
cluster.scale(1)
client = Client(cluster)
print(client.dashboard_link)
sp2_raw_traces = dd.read_parquet('/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_parquet', calculate_divisions=True)#.repartition(freq='1h')
test2 = process_sp2b_parquet(sp2_raw_traces,
scatt_sat_threshold = 1e9,
inc_sat_threshold = 1e8,
scatt_noise_threshold = 1e4,
end_bkgr_ch0 = 100,
end_bkgr_ch1 = 350,
output_path = '/data/user/bertoz_b/SP2XR/data/NyA/SP2XR_sp2b_processed'
)
client.close()
cluster.close()
print("--- %s seconds ---" % (time.time() - start_time))
'''