#!/usr/bin/env python3 import argparse, math, os, struct, zlib from pathlib import Path import numpy as np import zipfile # ----------------------- # Defaults (from your YAML) # ----------------------- DEFAULT_MEAN_BYTES = 146_600_628 DEFAULT_STDEV_BYTES = 68_341_808 DEFAULT_RESIZE = 2_097_152 # 2 MiB DEFAULT_SAMPLES = 1 DEFAULT_DTYPE = "uint8" DEFAULT_SEED = 10 # ----------------------- # Helpers # ----------------------- DTYPE_SIZES = { "uint8": 1, "int8": 1, "uint16": 2, "int16": 2, "uint32": 4, "int32": 4, "float32": 4, "float64": 8, } def choose_target_bytes(mean_b, stdev_b, resize, randomize): if randomize and stdev_b > 0: draw = int(np.random.normal(loc=mean_b, scale=stdev_b)) draw = max(draw, 1) else: draw = int(mean_b) # Round to nearest multiple of resize return int(round(draw / resize) * resize) def choose_hw_for_bytes(total_bytes, samples, dtype_size): """ Choose H, W making H*W*samples*dtype_size == total_bytes. We factor total elements and spread powers of two across H and W to avoid super-skinny arrays. """ total_elems = total_bytes // (dtype_size * samples) if total_elems == 0: raise ValueError("Total elements computed as 0; check inputs.") n = total_elems # Factor out powers of two exp2 = (n & -n).bit_length() - 1 odd = n >> exp2 h = 1 << (exp2 // 2) w = (1 << (exp2 - exp2 // 2)) * odd return int(h), int(w) def save_npz(out_path: Path, *, mean_bytes, stdev_bytes, resize_bytes, samples, dtype_name, seed, compress, randomize): dtype = getattr(np, dtype_name) dtype_size = DTYPE_SIZES[dtype_name] np.random.seed(seed) target_bytes = choose_target_bytes(mean_bytes, stdev_bytes, resize_bytes, randomize) # Ensure divisibility: elems_per_sample = target_bytes // dtype_size // samples if elems_per_sample * dtype_size * samples != target_bytes: raise ValueError("Target bytes not divisible by dtype_size*samples; adjust params.") h, w = choose_hw_for_bytes(target_bytes, samples, dtype_size) x = np.random.randint(255, size=(h, w, samples), dtype=dtype if dtype_name == "uint8" else np.uint8) if dtype_name != "uint8": x = x.astype(dtype, copy=False) y = np.zeros((samples,), dtype=np.uint8) # matches DLIO NPZ generator convention out_path.parent.mkdir(parents=True, exist_ok=True) if compress: np.savez_compressed(out_path, x=x, y=y) else: np.savez(out_path, x=x, y=y) print(f"✅ Wrote {out_path}") try: sz = out_path.stat().st_size print(f" size={sz} bytes, x.shape={x.shape}, dtype={x.dtype}, samples={samples}") except FileNotFoundError: pass def list_and_crc(npz_path: Path, deep=False): print(f"📂 File: {npz_path}") with zipfile.ZipFile(npz_path, "r") as zf: names = zf.namelist() print(f"đŸ“Ļ Files in archive: {names}\n") for name in names: info = zf.getinfo(name) print(f"--- {name} ---") print(f"Stored CRC32 : 0x{info.CRC:08x}") print(f"Compressed Size : {info.compress_size}") print(f"Uncompressed Size : {info.file_size}") try: with zf.open(info) as f: _ = f.read() # will raise if CRC mismatch print("✅ CRC verified by zipfile.\n") except zipfile.BadZipFile as e: print(f"âš ī¸ CRC error via zipfile: {e}") if deep: ok = deep_crc_check(npz_path, info) print("🔎 Deep check :", "✅ OK\n" if ok else "❌ Mismatch\n") else: print("â„šī¸ Re-run with --deep-check to diagnose.\n") except Exception as e: print(f"❌ Unexpected error: {e}\n") def deep_crc_check(npz_path: Path, info: zipfile.ZipInfo) -> bool: """ Manual CRC of the *uncompressed* payload. Parse the local file header to find the compressed bytes, then decompress and compute CRC32 of the uncompressed stream. """ with npz_path.open("rb") as fh: fh.seek(info.header_offset) local = fh.read(30) # fixed part of local header # local file header sig 'PK\x03\x04' if local[:4] != b'PK\x03\x04': return False # filename length, extra length name_len, extra_len = struct.unpack("