New handling of conversion from CSV/ZIP to Parquet via config file
This commit is contained in:
62
config/conversion_config_template.yaml
Normal file
62
config/conversion_config_template.yaml
Normal file
@@ -0,0 +1,62 @@
|
||||
# SP2XR CSV/ZIP to Parquet Conversion Configuration Template
|
||||
#
|
||||
# This file contains all parameters for converting raw SP2XR data files
|
||||
# to time-indexed Parquet format.
|
||||
#
|
||||
# USAGE:
|
||||
# 1. Update the paths and parameters below for your dataset
|
||||
# 2. Run: python scripts/sp2xr_csv2parquet.py --config my_conversion_config.yaml
|
||||
#
|
||||
# NOTES:
|
||||
# - For local execution, the script auto-detects available CPU cores and memory
|
||||
# - Output files are organized by date and hour: target_directory/date=YYYY-MM-DD/hour=HH/
|
||||
# - Processing is parallelized using Dask for efficient handling of large datasets
|
||||
# - You can monitor progress via the Dask dashboard (URL printed when script starts)
|
||||
|
||||
|
||||
# Directory containing your raw SP2XR files (CSV or ZIP format)
|
||||
source_directory: data/SP2XR_orig_files
|
||||
|
||||
# Output directory for converted Parquet files
|
||||
target_directory: data/pbp_files_parquet_2
|
||||
|
||||
# Path to your data schema config file (generated by sp2xr_generate_config.py)
|
||||
schema_config: config/config_schema_with_mapping.yaml
|
||||
|
||||
|
||||
# Pattern to filter which files to process
|
||||
# "PbP" for particle-by-particle data, "hk" for housekeeping data
|
||||
file_filter: PbP
|
||||
|
||||
|
||||
# Number of files to process in each batch
|
||||
# Larger values = more memory usage but potentially faster
|
||||
# Smaller values = less memory usage but more overhead
|
||||
chunk_size: 100
|
||||
|
||||
|
||||
# Execution mode: "local" or "slurm"
|
||||
# - local: Use your local machine (laptop/desktop)
|
||||
# - slurm: Use a SLURM cluster (HPC environment)
|
||||
execution_mode: local
|
||||
|
||||
|
||||
|
||||
# --- SLURM-specific parameters (ignored if execution_mode: local) ---
|
||||
|
||||
# Number of CPU cores per SLURM job
|
||||
slurm_cores: 64
|
||||
|
||||
# Memory per SLURM job (e.g., "128GB", "256GB")
|
||||
slurm_memory: 128GB
|
||||
|
||||
# SLURM partition to use
|
||||
# Common values: "hourly", "daily", "general"
|
||||
slurm_partition: daily
|
||||
|
||||
# Walltime for SLURM job (e.g., "01:00:00" for 1 hour, "23:59:00" for 1 day)
|
||||
# If not specified, defaults based on partition:
|
||||
# - hourly: 00:59:00
|
||||
# - daily: 23:59:00
|
||||
# - general: 7-00:00:00
|
||||
slurm_walltime: null
|
||||
@@ -4,6 +4,8 @@ import gc
|
||||
import dask
|
||||
import multiprocessing
|
||||
import psutil
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from dask.distributed import Client, LocalCluster
|
||||
from dask_jobqueue import SLURMCluster
|
||||
from dask import delayed
|
||||
@@ -13,58 +15,63 @@ from sp2xr.helpers import find_files, chunks
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Batch convert SP2XR zip to parquet using Dask (local or SLURM cluster)"
|
||||
description="Batch convert SP2XR zip to parquet using Dask (local or SLURM cluster)",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Example:
|
||||
python sp2xr_csv2parquet.py --config my_conversion_config.yaml
|
||||
|
||||
See config/conversion_config_template.yaml for configuration options.
|
||||
""",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--source", required=True, help="Directory containing input zip/CSV files"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--target", required=True, help="Output directory for parquet files"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--config", required=True, help="Path to YAML schema config file"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--filter", default="PbP", help="Pattern to filter files (default: PbP)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--chunk",
|
||||
type=int,
|
||||
default=100,
|
||||
help="Number of files per batch (default: 100)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--local",
|
||||
action="store_true",
|
||||
help="Use local Dask client (auto-detects resources) instead of SLURM cluster",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cores",
|
||||
type=int,
|
||||
default=64,
|
||||
help="Cores per SLURM job (ignored for --local, default: 64)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--memory",
|
||||
default="128GB",
|
||||
help="Memory per SLURM job (ignored for --local, default: 128GB)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--walltime",
|
||||
default=None,
|
||||
help="Walltime for SLURM (ignored for --local, default depends on partition)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--partition",
|
||||
default="daily",
|
||||
help="SLURM partition (ignored for --local, default: daily)",
|
||||
"--config",
|
||||
required=True,
|
||||
help="Path to conversion config YAML file",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load config from file
|
||||
config_path = Path(args.config)
|
||||
if not config_path.exists():
|
||||
print(f"Error: Config file not found: {config_path}")
|
||||
return 1
|
||||
|
||||
with open(config_path, "r") as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
print(f"Loaded conversion config from: {config_path}")
|
||||
|
||||
# Extract parameters from config
|
||||
source = config.get("source_directory")
|
||||
target = config.get("target_directory")
|
||||
schema_config = config.get("schema_config")
|
||||
file_filter = config.get("file_filter", "PbP")
|
||||
chunk_size = config.get("chunk_size", 100)
|
||||
execution_mode = config.get("execution_mode", "local")
|
||||
|
||||
# SLURM parameters
|
||||
slurm_cores = config.get("slurm_cores", 64)
|
||||
slurm_memory = config.get("slurm_memory", "128GB")
|
||||
slurm_partition = config.get("slurm_partition", "daily")
|
||||
slurm_walltime = config.get("slurm_walltime")
|
||||
|
||||
# Validate required parameters
|
||||
if not source or not target or not schema_config:
|
||||
print("Error: Missing required parameters in config file:")
|
||||
print(" - source_directory")
|
||||
print(" - target_directory")
|
||||
print(" - schema_config")
|
||||
return 1
|
||||
|
||||
use_local = execution_mode == "local"
|
||||
|
||||
# --- Setup cluster (local or SLURM) ---
|
||||
start_time = time.time()
|
||||
|
||||
if args.local:
|
||||
if use_local:
|
||||
# Local execution: auto-detect resources
|
||||
total_cores = multiprocessing.cpu_count()
|
||||
total_memory = psutil.virtual_memory().total # in bytes
|
||||
@@ -83,26 +90,26 @@ def main():
|
||||
print(f"Dask dashboard: {client.dashboard_link}")
|
||||
else:
|
||||
# SLURM execution
|
||||
if args.walltime is None:
|
||||
if args.partition == "hourly":
|
||||
args.walltime = "00:59:00"
|
||||
elif args.partition == "daily":
|
||||
args.walltime = "23:59:00"
|
||||
elif args.partition == "general":
|
||||
args.walltime = "7-00:00:00"
|
||||
if slurm_walltime is None:
|
||||
if slurm_partition == "hourly":
|
||||
slurm_walltime = "00:59:00"
|
||||
elif slurm_partition == "daily":
|
||||
slurm_walltime = "23:59:00"
|
||||
elif slurm_partition == "general":
|
||||
slurm_walltime = "7-00:00:00"
|
||||
else:
|
||||
args.walltime = "00:59:00"
|
||||
slurm_walltime = "00:59:00"
|
||||
|
||||
print("Running in SLURM mode")
|
||||
print(f"Resources: {args.cores} cores, {args.memory} memory")
|
||||
print(f"Partition: {args.partition}, Walltime: {args.walltime}")
|
||||
print(f"Resources: {slurm_cores} cores, {slurm_memory} memory")
|
||||
print(f"Partition: {slurm_partition}, Walltime: {slurm_walltime}")
|
||||
|
||||
cluster = SLURMCluster(
|
||||
cores=args.cores,
|
||||
processes=args.cores,
|
||||
memory=args.memory,
|
||||
walltime=args.walltime,
|
||||
job_extra_directives=[f"--partition={args.partition}"],
|
||||
cores=slurm_cores,
|
||||
processes=slurm_cores,
|
||||
memory=slurm_memory,
|
||||
walltime=slurm_walltime,
|
||||
job_extra_directives=[f"--partition={slurm_partition}"],
|
||||
)
|
||||
cluster.scale(1)
|
||||
client = Client(cluster)
|
||||
@@ -110,19 +117,19 @@ def main():
|
||||
|
||||
try:
|
||||
# --- Find files ---
|
||||
files = find_files(args.source, args.filter)
|
||||
print(f"Found {len(files)} files matching pattern '{args.filter}'")
|
||||
files = find_files(source, file_filter)
|
||||
print(f"Found {len(files)} files matching pattern '{file_filter}'")
|
||||
if len(files) == 0:
|
||||
print("No files found matching the filter pattern")
|
||||
return
|
||||
|
||||
# --- Process in chunks ---
|
||||
i = 0
|
||||
total_chunks = (len(files) - 1) // args.chunk + 1
|
||||
for chunk in chunks(files, args.chunk):
|
||||
total_chunks = (len(files) - 1) // chunk_size + 1
|
||||
for chunk in chunks(files, chunk_size):
|
||||
print(f"Processing chunk {i+1} / {total_chunks}")
|
||||
tasks = [
|
||||
delayed(process_sp2xr_file)(f, args.config, args.target) for f in chunk
|
||||
delayed(process_sp2xr_file)(f, schema_config, target) for f in chunk
|
||||
]
|
||||
dask.compute(*tasks)
|
||||
gc.collect()
|
||||
|
||||
Reference in New Issue
Block a user