From: Viacheslav Dubeyko <vdubeyko@redhat.com>
To: Alex Markuze <amarkuze@redhat.com>, ceph-devel@vger.kernel.org
Cc: linux-kernel@vger.kernel.org, idryomov@gmail.com
Subject: Re: [EXTERNAL] [PATCH v4 07/11] selftests: ceph: add reset consistency checker
Date: Thu, 07 May 2026 12:24:30 -0700 [thread overview]
Message-ID: <c7ec42b9399464cfc0c4f7707df810e07e4a2630.camel@redhat.com> (raw)
In-Reply-To: <20260507122737.2804094-8-amarkuze@redhat.com>
On Thu, 2026-05-07 at 12:27 +0000, Alex Markuze wrote:
> Add a Python post-run validator for the CephFS client reset stress
> test. The script reads data files written by the stress runner and
> checks that every file was either written completely or is missing,
> with no partial or corrupted content.
>
> This is a prerequisite for the stress test script which invokes it
> after each run.
>
> Signed-off-by: Alex Markuze <amarkuze@redhat.com>
> ---
> .../filesystems/ceph/validate_consistency.py | 297 ++++++++++++++++++
> 1 file changed, 297 insertions(+)
> create mode 100755 tools/testing/selftests/filesystems/ceph/validate_consistency.py
>
> diff --git a/tools/testing/selftests/filesystems/ceph/validate_consistency.py b/tools/testing/selftests/filesystems/ceph/validate_consistency.py
> new file mode 100755
> index 000000000000..c230a59bdb3a
> --- /dev/null
> +++ b/tools/testing/selftests/filesystems/ceph/validate_consistency.py
> @@ -0,0 +1,297 @@
> +#!/usr/bin/env python3
> +# SPDX-License-Identifier: GPL-2.0
> +
> +import argparse
> +import bisect
> +import hashlib
> +import json
> +import os
> +from pathlib import Path
> +
> +
> +def sha256_file(path: Path) -> str:
> + digest = hashlib.sha256()
> + with path.open("rb") as handle:
> + while True:
> + chunk = handle.read(1 << 20)
> + if not chunk:
> + break
> + digest.update(chunk)
> + return digest.hexdigest()
> +
> +
> +def parse_io_log(path: Path):
> + records = []
> + if not path.exists():
> + return records
> + with path.open("r", encoding="utf-8") as handle:
> + for line_no, line in enumerate(handle, 1):
> + line = line.strip()
> + if not line:
> + continue
> + parts = line.split(",")
> + if len(parts) != 5:
> + raise ValueError(f"io log line {line_no}: expected 5 columns, got {len(parts)}")
> + ts_ms, seq, logical_id, relpath, digest = parts
> + records.append(
> + {
> + "ts_ms": int(ts_ms),
> + "seq": int(seq),
> + "logical_id": int(logical_id),
> + "relpath": relpath,
> + "digest": digest,
> + }
> + )
> + return records
> +
> +
> +def parse_rename_log(path: Path):
> + records = []
> + if not path.exists():
> + return records
> + with path.open("r", encoding="utf-8") as handle:
> + for line_no, line in enumerate(handle, 1):
> + line = line.strip()
> + if not line:
> + continue
> + parts = line.split(",")
> + if len(parts) == 6:
> + ts_ms, seq, logical_id, src_rel, dst_rel, rc = parts
> + elif len(parts) == 7:
> + ts_ms, worker_id, seq, logical_id, src_rel, dst_rel, rc = parts
> + _ = worker_id # worker id is informational only
> + else:
> + raise ValueError(
> + f"rename log line {line_no}: expected 6 or 7 columns, got {len(parts)}"
> + )
> + records.append(
> + {
> + "ts_ms": int(ts_ms),
> + "seq": int(seq),
> + "logical_id": int(logical_id),
> + "src_rel": src_rel,
> + "dst_rel": dst_rel,
> + "rc": int(rc),
> + }
> + )
> + return records
> +
> +
> +def parse_reset_log(path: Path):
> + records = []
> + if not path.exists():
> + return records
> + with path.open("r", encoding="utf-8") as handle:
> + for line_no, line in enumerate(handle, 1):
> + line = line.strip()
> + if not line:
> + continue
> + parts = line.split(",")
> + if len(parts) != 4:
> + raise ValueError(f"reset log line {line_no}: expected 4 columns, got {len(parts)}")
> + ts_ms, seq, reason, rc = parts
> + records.append(
> + {
> + "ts_ms": int(ts_ms),
> + "seq": int(seq),
> + "reason": reason,
> + "rc": int(rc),
> + }
> + )
> + return records
> +
> +
> +def parse_status_file(path: Path):
> + status = {}
> + if not path.exists():
> + return status
> + with path.open("r", encoding="utf-8") as handle:
> + for line in handle:
> + line = line.strip()
> + if not line or ":" not in line:
> + continue
> + key, value = line.split(":", 1)
> + status[key.strip()] = value.strip()
> + return status
> +
> +
> +def to_int(value: str, default: int = 0):
> + try:
> + return int(value)
> + except Exception:
> + return default
> +
> +
> +def validate_namespace(data_dir: Path, file_count: int, issues):
> + actual_locations = {}
> + actual_paths = {}
> + for logical_id in range(file_count):
> + name = f"file_{logical_id:05d}"
> + found = []
> + for subdir in ("A", "B"):
> + candidate = data_dir / subdir / name
> + if candidate.exists():
> + found.append((subdir, candidate))
> + if len(found) != 1:
> + issues.append(
> + f"namespace invariant failed for logical_id={logical_id:05d}: expected exactly one file in A/B, found {len(found)}"
> + )
> + continue
> + actual_locations[logical_id] = found[0][0]
> + actual_paths[logical_id] = found[0][1]
> + return actual_locations, actual_paths
> +
> +
> +def validate_rename_invariant(rename_records, actual_locations, issues):
> + expected_locations = {}
> + for rec in rename_records:
> + if rec["rc"] != 0:
> + continue
> + dst = rec["dst_rel"]
> + if "/" not in dst:
> + continue
> + expected_locations[rec["logical_id"]] = dst.split("/", 1)[0]
> +
> + for logical_id, expected in expected_locations.items():
> + actual = actual_locations.get(logical_id)
> + if actual is None:
> + continue
> + if actual != expected:
> + issues.append(
> + f"rename invariant failed for logical_id={logical_id:05d}: expected location={expected}, actual={actual}"
> + )
> +
> +
> +def validate_data_invariant(io_records, actual_paths, issues):
> + expected_hash = {}
> + for rec in io_records:
> + digest = rec["digest"]
> + if not digest:
> + continue
> + expected_hash[rec["logical_id"]] = digest
> +
> + for logical_id, digest in expected_hash.items():
> + path = actual_paths.get(logical_id)
> + if path is None:
> + continue
> + actual_digest = sha256_file(path)
> + if digest != actual_digest:
> + issues.append(
> + f"data invariant failed for logical_id={logical_id:05d}: expected digest={digest}, actual digest={actual_digest}"
> + )
> +
> +
> +def validate_reset_and_slo(args, reset_records, io_records, rename_records, status, issues):
> + if not args.expect_reset:
> + return
> +
> + successful_reset_times = [rec["ts_ms"] for rec in reset_records if rec["rc"] == 0]
> + if not successful_reset_times:
> + issues.append("expected reset activity but no successful reset trigger was observed")
> +
> + phase = status.get("phase")
> + blocked_requests = to_int(status.get("blocked_requests", "0"), default=-1)
> + last_errno = to_int(status.get("last_errno", "0"), default=1)
> + failure_count = to_int(status.get("failure_count", "0"), default=-1)
> +
> + if phase is None:
> + issues.append("missing final reset status file or phase field")
> + elif phase.lower() != "idle":
> + issues.append(f"recovery invariant failed: phase={phase}, expected idle")
> +
> + if blocked_requests != 0:
> + issues.append(f"recovery invariant failed: blocked_requests={blocked_requests}, expected 0")
> + if last_errno != 0:
> + issues.append(f"recovery invariant failed: last_errno={last_errno}, expected 0")
> + if failure_count > 0:
> + issues.append(
> + f"recovery invariant failed: failure_count={failure_count}, "
> + "one or more resets failed during the run"
> + )
> +
> + op_times = [rec["ts_ms"] for rec in io_records]
> + op_times.extend(rec["ts_ms"] for rec in rename_records if rec["rc"] == 0)
> + op_times.sort()
> +
> + if successful_reset_times and not op_times:
> + issues.append("recovery SLO failed: no workload completion events were recorded")
> + return
> +
> + slo_ms = args.slo_seconds * 1000
> + for ts in successful_reset_times:
> + idx = bisect.bisect_left(op_times, ts)
> + if idx >= len(op_times):
> + issues.append(f"recovery SLO failed: no operation completion observed after reset at ts_ms={ts}")
> + continue
> + delta = op_times[idx] - ts
> + if delta > slo_ms:
> + issues.append(
> + f"recovery SLO failed: first post-reset completion at {delta}ms exceeds threshold {slo_ms}ms (reset ts_ms={ts})"
> + )
> +
> +
> +def main():
> + parser = argparse.ArgumentParser(description="Validate Ceph reset stress artifacts")
> + parser.add_argument("--data-dir", required=True)
> + parser.add_argument("--file-count", required=True, type=int)
> + parser.add_argument("--io-log", required=True)
> + parser.add_argument("--rename-log", required=True)
> + parser.add_argument("--reset-log", required=True)
> + parser.add_argument("--status-final", required=False, default="")
> + parser.add_argument("--slo-seconds", required=False, type=int, default=30)
> + parser.add_argument("--expect-reset", action="store_true")
> + parser.add_argument("--report-json", required=False, default="")
> + args = parser.parse_args()
> +
> + data_dir = Path(args.data_dir)
> + io_log = Path(args.io_log)
> + rename_log = Path(args.rename_log)
> + reset_log = Path(args.reset_log)
> + status_final = Path(args.status_final) if args.status_final else Path("__missing_status__")
> +
> + issues = []
> +
> + if not data_dir.exists():
> + issues.append(f"data directory is missing: {data_dir}")
> +
> + try:
> + io_records = parse_io_log(io_log)
> + rename_records = parse_rename_log(rename_log)
> + reset_records = parse_reset_log(reset_log)
> + except Exception as exc:
> + issues.append(f"log parsing failed: {exc}")
> + io_records = []
> + rename_records = []
> + reset_records = []
> +
> + status = parse_status_file(status_final)
> +
> + actual_locations, actual_paths = validate_namespace(data_dir, args.file_count, issues)
> + validate_rename_invariant(rename_records, actual_locations, issues)
> + validate_data_invariant(io_records, actual_paths, issues)
> + validate_reset_and_slo(args, reset_records, io_records, rename_records, status, issues)
> +
> + report = {
> + "file_count": args.file_count,
> + "io_records": len(io_records),
> + "rename_records": len(rename_records),
> + "reset_records": len(reset_records),
> + "expect_reset": args.expect_reset,
> + "issues": issues,
> + }
> +
> + if args.report_json:
> + report_path = Path(args.report_json)
> + report_path.write_text(json.dumps(report, indent=2, sort_keys=True), encoding="utf-8")
> +
> + if issues:
> + print("FAIL: consistency validation found issues")
> + for issue in issues:
> + print(f" - {issue}")
> + raise SystemExit(1)
> +
> + print("PASS: consistency validation succeeded")
> +
> +
> +if __name__ == "__main__":
> + main()
Reviewed-by: Viacheslav Dubeyko <Slava.Dubeyko@ibm.com>
Thanks,
Slava.
next prev parent reply other threads:[~2026-05-07 19:24 UTC|newest]
Thread overview: 24+ messages / expand[flat|nested] mbox.gz Atom feed top
2026-05-07 12:27 [PATCH v4 00/11] ceph: manual client session reset Alex Markuze
2026-05-07 12:27 ` [PATCH v4 01/11] ceph: convert inode flags to named bit positions and atomic bitops Alex Markuze
2026-05-07 18:35 ` Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 02/11] ceph: use proper endian conversion for flock_len in reconnect Alex Markuze
2026-05-07 12:27 ` [PATCH v4 03/11] ceph: harden send_mds_reconnect and handle active-MDS peer reset Alex Markuze
2026-05-07 18:43 ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 04/11] ceph: add diagnostic timeout loop to wait_caps_flush() Alex Markuze
2026-05-07 19:01 ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 05/11] ceph: add client reset state machine and session teardown Alex Markuze
2026-05-07 19:17 ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 06/11] ceph: add manual reset debugfs control and tracepoints Alex Markuze
2026-05-07 19:22 ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 07/11] selftests: ceph: add reset consistency checker Alex Markuze
2026-05-07 19:24 ` Viacheslav Dubeyko [this message]
2026-05-07 12:27 ` [PATCH v4 08/11] selftests: ceph: add reset stress test Alex Markuze
2026-05-07 19:29 ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 09/11] selftests: ceph: add reset corner-case tests Alex Markuze
2026-05-07 19:31 ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 10/11] selftests: ceph: add validation harness Alex Markuze
2026-05-07 19:33 ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 11/11] selftests: ceph: wire up Ceph reset kselftests and documentation Alex Markuze
2026-05-07 19:38 ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 18:28 ` [EXTERNAL] [PATCH v4 00/11] ceph: manual client session reset Viacheslav Dubeyko
2026-05-08 17:49 ` Viacheslav Dubeyko
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=c7ec42b9399464cfc0c4f7707df810e07e4a2630.camel@redhat.com \
--to=vdubeyko@redhat.com \
--cc=amarkuze@redhat.com \
--cc=ceph-devel@vger.kernel.org \
--cc=idryomov@gmail.com \
--cc=linux-kernel@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox