Update the delete partition function to delete also the general metadata to prevent errors when rewriting parquet files
This commit is contained in:
@@ -181,14 +181,33 @@ def get_time_chunks_from_range(
|
||||
|
||||
def delete_partition_if_exists(output_path, partition_values):
|
||||
"""
|
||||
output_path: root output directory
|
||||
partition_values: dict like {'date': '2025-08-19', 'hour': 14}
|
||||
Delete partition directory and remove global metadata files.
|
||||
The metadata will be regenerated on the next write to reflect current partitions.
|
||||
|
||||
Args:
|
||||
output_path: root output directory
|
||||
partition_values: dict like {'date': '2025-08-19', 'hour': 14}
|
||||
"""
|
||||
parts = [f"{k}={v}" for k, v in partition_values.items()]
|
||||
partition_path = Path(output_path, *parts)
|
||||
if partition_path.exists():
|
||||
print(f"Deleting existing partition: {partition_path}")
|
||||
shutil.rmtree(partition_path)
|
||||
|
||||
# Remove global metadata files - they will be regenerated on next write
|
||||
# This prevents overlapping divisions errors when restarting jobs
|
||||
metadata_files = [
|
||||
Path(output_path) / "_metadata",
|
||||
Path(output_path) / "_common_metadata",
|
||||
]
|
||||
|
||||
for meta_file in metadata_files:
|
||||
if meta_file.exists():
|
||||
print(
|
||||
f"Removing global metadata file: {meta_file} (will be regenerated)"
|
||||
)
|
||||
meta_file.unlink()
|
||||
|
||||
|
||||
def _expand(p):
|
||||
return None if p is None else os.path.expandvars(os.path.expanduser(p))
|
||||
|
||||
Reference in New Issue
Block a user