aare/extra/compare_file_formats.py
Bechir Braham 5b7ab5a810
cluster finder (#72)
implement fixed size cluster finder algorithm 
clusterFile functions are commented and put on hold.

---------

Co-authored-by: Bechir <bechir.brahem420@gmail.com>
Co-authored-by: Erik Fröjdh <erik.frojdh@gmail.com>
2024-06-17 10:57:23 +02:00

180 lines
5.6 KiB
Python

import numpy as np
import zipfile
from zipfile import ZipFile as zf
import time
from pathlib import Path
import sqlite3
def timing_val(f):
def wrapper(*arg, **kw):
t1 = time.time()
res = f(*arg, **kw)
t2 = time.time()
return (t2 - t1), res, f.__name__
return wrapper
N_CLUSTERS = 1_000_000
"""
fixed size clusters:
header:
- magic_string: 4 bytes
- version: 1 byte
- n_records: 4 bytes
- indexed: 1 byte
- metadata_length: 1 byte (number of chars)
- metadata: metadata_length bytes (json string)
- field_count: 1 byte
- fields:
- field_label_length: 1 byte
- field_label: field_label_length bytes (string)
- dtype: 3 bytes (string)
- is_array: 1 byte (0: not array, 1:fixed_length_array, 2:variable_length_array)
- array_length: 4 bytes (used if is_array == 1)
data:
- field: (field_1_dtype_length bytes) or
"""
header_length = 4 + 1 + 4 + 1 + 1 + 2 + 1 + (1 + 5 + 3 + 1 + 4) * 3
cluster_dt = np.dtype([("x", "int16"), ("y", "int16"), ("data", "int32", (3, 3))])
def write_binary_clusters():
with open("fixed_size_clusters.bin", "wb") as f:
f.write(b"H" * header_length)
arr = np.zeros(N_CLUSTERS, dtype=cluster_dt)
f.write(arr.tobytes())
def write_numpy_clusters():
np.save("numpy_clusters.npy", np.zeros(N_CLUSTERS, dtype=cluster_dt))
def write_sqlite_clusters():
data = np.zeros(9, dtype=np.int32).tobytes()
c = conn.cursor()
c.execute("CREATE TABLE clusters (x int, y int, data blob)")
c.executemany("INSERT INTO clusters VALUES (?, ?, ?)", [(0, 0, data)] * N_CLUSTERS)
conn.commit()
READ_N_CLUSTERS = N_CLUSTERS
def read_binary_clusters():
with open("fixed_size_clusters.bin", "rb") as f:
f.read(header_length)
f.read(READ_N_CLUSTERS * cluster_dt.itemsize)
def read_numpy_clusters():
arr = np.load("numpy_clusters.npy")
def read_sqlite_clusters():
c = conn.cursor()
c.execute("SELECT * FROM clusters LIMIT ?", (READ_N_CLUSTERS,))
arr = c.fetchall()
N_APPEND_CLUSTERS = 100_000
def append_binary_clusters():
with open("fixed_size_clusters.bin", "ab") as f:
arr = np.zeros(N_APPEND_CLUSTERS, dtype=cluster_dt)
f.write(arr.tobytes())
def append_sqlite_clusters():
data = np.zeros(9, dtype=np.int32).tobytes()
c = conn.cursor()
c.executemany(
"INSERT INTO clusters VALUES (?, ?, ?)", [(0, 0, data)] * N_APPEND_CLUSTERS
)
conn.commit()
def p(write_time, file_size):
file_size = file_size / 1024 / 1024
print("%.3fs" % write_time, "%.3fMB" % file_size)
if __name__ == "__main__":
# setup
Path("fixed_size_clusters.bin").unlink(missing_ok=True)
Path("numpy_clusters.npy").unlink(missing_ok=True)
Path("sqlite_clusters.db").unlink(missing_ok=True)
Path("fixed_size_clusters.zip").unlink(missing_ok=True)
Path("numpy_clusters.zip").unlink(missing_ok=True)
Path("sqlite_clusters.zip").unlink(missing_ok=True)
conn = sqlite3.connect("sqlite_clusters.db")
# run
print("Testing file creation", f"(N_CLUSTERS={N_CLUSTERS}):")
print("Binary clusters:", end=" ")
bin_time, _, _ = timing_val(write_binary_clusters)()
bin_size = Path("fixed_size_clusters.bin").stat().st_size
p(bin_time, bin_size)
print("Numpy clusters:", end=" ")
np_time, _, _ = timing_val(write_numpy_clusters)()
np_size = Path("numpy_clusters.npy").stat().st_size
p(np_time, np_size)
print("SQLite clusters:", end=" ")
sql_time, _, _ = timing_val(write_sqlite_clusters)()
sql_size = Path("sqlite_clusters.db").stat().st_size
p(sql_time, sql_size)
print("\nTesting file reading", f"(READ_N_CLUSTERS={READ_N_CLUSTERS}):")
print("Binary clusters:", end=" ")
print("%.5fs" % timing_val(read_binary_clusters)()[0])
print("Numpy clusters:", end=" ")
print("%.5fs" % timing_val(read_numpy_clusters)()[0])
print("SQLite clusters:", end=" ")
print("%.5fs" % timing_val(read_sqlite_clusters)()[0])
print("\nTesting appending to file:")
print("Binary clusters:", end=" ")
print("%.5fs" % timing_val(append_binary_clusters)()[0])
print("SQLite clusters:", end=" ")
print("%.5fs" % timing_val(append_sqlite_clusters)()[0])
print("\nTesting zip compression:")
print("Binary clusters compressed:", end=" ")
with zf("fixed_size_clusters.zip", "w", zipfile.ZIP_DEFLATED) as z:
z.write("fixed_size_clusters.bin")
print(
"%.3fMB" % (Path("fixed_size_clusters.zip").stat().st_size / 1024 / 1024),
end=" ",
)
rate = (1 - Path("fixed_size_clusters.zip").stat().st_size / bin_size) * 100
print("rate:", "%.2f" % rate + "%")
print("Numpy clusters compressed:", end=" ")
with zf("numpy_clusters.zip", "w", zipfile.ZIP_DEFLATED) as z:
z.write("numpy_clusters.npy")
print("%.3fMB" % (Path("numpy_clusters.zip").stat().st_size / 1024 / 1024), end=" ")
rate = (1 - Path("numpy_clusters.zip").stat().st_size / bin_size) * 100
print("rate:", "%.2f" % rate + "%")
print("SQLite clusters compressed:", end=" ")
with zf("sqlite_clusters.zip", "w", zipfile.ZIP_DEFLATED) as z:
z.write("sqlite_clusters.db")
print(
"%.3fMB" % (Path("sqlite_clusters.zip").stat().st_size / 1024 / 1024), end=" "
)
rate = (1 - Path("sqlite_clusters.zip").stat().st_size / bin_size) * 100
print("rate:", "%.2f" % rate + "%")
# clean
conn.close()
# Path("fixed_size_clusters.bin").unlink(missing_ok=True)
# Path("numpy_clusters.npy").unlink(missing_ok=True)
# Path("sqlite_clusters.db").unlink(missing_ok=True)