The Linux Kernel Mailing List
 help / color / mirror / Atom feed
From: Alex Markuze <amarkuze@redhat.com>
To: ceph-devel@vger.kernel.org
Cc: linux-kernel@vger.kernel.org, idryomov@gmail.com,
	vdubeyko@redhat.com, Alex Markuze <amarkuze@redhat.com>
Subject: [PATCH v4 07/11] selftests: ceph: add reset consistency checker
Date: Thu,  7 May 2026 12:27:33 +0000	[thread overview]
Message-ID: <20260507122737.2804094-8-amarkuze@redhat.com> (raw)
In-Reply-To: <20260507122737.2804094-1-amarkuze@redhat.com>

Add a Python post-run validator for the CephFS client reset stress
test.  The script reads data files written by the stress runner and
checks that every file was either written completely or is missing,
with no partial or corrupted content.

This is a prerequisite for the stress test script which invokes it
after each run.

Signed-off-by: Alex Markuze <amarkuze@redhat.com>
---
 .../filesystems/ceph/validate_consistency.py  | 297 ++++++++++++++++++
 1 file changed, 297 insertions(+)
 create mode 100755 tools/testing/selftests/filesystems/ceph/validate_consistency.py

diff --git a/tools/testing/selftests/filesystems/ceph/validate_consistency.py b/tools/testing/selftests/filesystems/ceph/validate_consistency.py
new file mode 100755
index 000000000000..c230a59bdb3a
--- /dev/null
+++ b/tools/testing/selftests/filesystems/ceph/validate_consistency.py
@@ -0,0 +1,297 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+
+import argparse
+import bisect
+import hashlib
+import json
+import os
+from pathlib import Path
+
+
+def sha256_file(path: Path) -> str:
+    digest = hashlib.sha256()
+    with path.open("rb") as handle:
+        while True:
+            chunk = handle.read(1 << 20)
+            if not chunk:
+                break
+            digest.update(chunk)
+    return digest.hexdigest()
+
+
+def parse_io_log(path: Path):
+    records = []
+    if not path.exists():
+        return records
+    with path.open("r", encoding="utf-8") as handle:
+        for line_no, line in enumerate(handle, 1):
+            line = line.strip()
+            if not line:
+                continue
+            parts = line.split(",")
+            if len(parts) != 5:
+                raise ValueError(f"io log line {line_no}: expected 5 columns, got {len(parts)}")
+            ts_ms, seq, logical_id, relpath, digest = parts
+            records.append(
+                {
+                    "ts_ms": int(ts_ms),
+                    "seq": int(seq),
+                    "logical_id": int(logical_id),
+                    "relpath": relpath,
+                    "digest": digest,
+                }
+            )
+    return records
+
+
+def parse_rename_log(path: Path):
+    records = []
+    if not path.exists():
+        return records
+    with path.open("r", encoding="utf-8") as handle:
+        for line_no, line in enumerate(handle, 1):
+            line = line.strip()
+            if not line:
+                continue
+            parts = line.split(",")
+            if len(parts) == 6:
+                ts_ms, seq, logical_id, src_rel, dst_rel, rc = parts
+            elif len(parts) == 7:
+                ts_ms, worker_id, seq, logical_id, src_rel, dst_rel, rc = parts
+                _ = worker_id  # worker id is informational only
+            else:
+                raise ValueError(
+                    f"rename log line {line_no}: expected 6 or 7 columns, got {len(parts)}"
+                )
+            records.append(
+                {
+                    "ts_ms": int(ts_ms),
+                    "seq": int(seq),
+                    "logical_id": int(logical_id),
+                    "src_rel": src_rel,
+                    "dst_rel": dst_rel,
+                    "rc": int(rc),
+                }
+            )
+    return records
+
+
+def parse_reset_log(path: Path):
+    records = []
+    if not path.exists():
+        return records
+    with path.open("r", encoding="utf-8") as handle:
+        for line_no, line in enumerate(handle, 1):
+            line = line.strip()
+            if not line:
+                continue
+            parts = line.split(",")
+            if len(parts) != 4:
+                raise ValueError(f"reset log line {line_no}: expected 4 columns, got {len(parts)}")
+            ts_ms, seq, reason, rc = parts
+            records.append(
+                {
+                    "ts_ms": int(ts_ms),
+                    "seq": int(seq),
+                    "reason": reason,
+                    "rc": int(rc),
+                }
+            )
+    return records
+
+
+def parse_status_file(path: Path):
+    status = {}
+    if not path.exists():
+        return status
+    with path.open("r", encoding="utf-8") as handle:
+        for line in handle:
+            line = line.strip()
+            if not line or ":" not in line:
+                continue
+            key, value = line.split(":", 1)
+            status[key.strip()] = value.strip()
+    return status
+
+
+def to_int(value: str, default: int = 0):
+    try:
+        return int(value)
+    except Exception:
+        return default
+
+
+def validate_namespace(data_dir: Path, file_count: int, issues):
+    actual_locations = {}
+    actual_paths = {}
+    for logical_id in range(file_count):
+        name = f"file_{logical_id:05d}"
+        found = []
+        for subdir in ("A", "B"):
+            candidate = data_dir / subdir / name
+            if candidate.exists():
+                found.append((subdir, candidate))
+        if len(found) != 1:
+            issues.append(
+                f"namespace invariant failed for logical_id={logical_id:05d}: expected exactly one file in A/B, found {len(found)}"
+            )
+            continue
+        actual_locations[logical_id] = found[0][0]
+        actual_paths[logical_id] = found[0][1]
+    return actual_locations, actual_paths
+
+
+def validate_rename_invariant(rename_records, actual_locations, issues):
+    expected_locations = {}
+    for rec in rename_records:
+        if rec["rc"] != 0:
+            continue
+        dst = rec["dst_rel"]
+        if "/" not in dst:
+            continue
+        expected_locations[rec["logical_id"]] = dst.split("/", 1)[0]
+
+    for logical_id, expected in expected_locations.items():
+        actual = actual_locations.get(logical_id)
+        if actual is None:
+            continue
+        if actual != expected:
+            issues.append(
+                f"rename invariant failed for logical_id={logical_id:05d}: expected location={expected}, actual={actual}"
+            )
+
+
+def validate_data_invariant(io_records, actual_paths, issues):
+    expected_hash = {}
+    for rec in io_records:
+        digest = rec["digest"]
+        if not digest:
+            continue
+        expected_hash[rec["logical_id"]] = digest
+
+    for logical_id, digest in expected_hash.items():
+        path = actual_paths.get(logical_id)
+        if path is None:
+            continue
+        actual_digest = sha256_file(path)
+        if digest != actual_digest:
+            issues.append(
+                f"data invariant failed for logical_id={logical_id:05d}: expected digest={digest}, actual digest={actual_digest}"
+            )
+
+
+def validate_reset_and_slo(args, reset_records, io_records, rename_records, status, issues):
+    if not args.expect_reset:
+        return
+
+    successful_reset_times = [rec["ts_ms"] for rec in reset_records if rec["rc"] == 0]
+    if not successful_reset_times:
+        issues.append("expected reset activity but no successful reset trigger was observed")
+
+    phase = status.get("phase")
+    blocked_requests = to_int(status.get("blocked_requests", "0"), default=-1)
+    last_errno = to_int(status.get("last_errno", "0"), default=1)
+    failure_count = to_int(status.get("failure_count", "0"), default=-1)
+
+    if phase is None:
+        issues.append("missing final reset status file or phase field")
+    elif phase.lower() != "idle":
+        issues.append(f"recovery invariant failed: phase={phase}, expected idle")
+
+    if blocked_requests != 0:
+        issues.append(f"recovery invariant failed: blocked_requests={blocked_requests}, expected 0")
+    if last_errno != 0:
+        issues.append(f"recovery invariant failed: last_errno={last_errno}, expected 0")
+    if failure_count > 0:
+        issues.append(
+            f"recovery invariant failed: failure_count={failure_count}, "
+            "one or more resets failed during the run"
+        )
+
+    op_times = [rec["ts_ms"] for rec in io_records]
+    op_times.extend(rec["ts_ms"] for rec in rename_records if rec["rc"] == 0)
+    op_times.sort()
+
+    if successful_reset_times and not op_times:
+        issues.append("recovery SLO failed: no workload completion events were recorded")
+        return
+
+    slo_ms = args.slo_seconds * 1000
+    for ts in successful_reset_times:
+        idx = bisect.bisect_left(op_times, ts)
+        if idx >= len(op_times):
+            issues.append(f"recovery SLO failed: no operation completion observed after reset at ts_ms={ts}")
+            continue
+        delta = op_times[idx] - ts
+        if delta > slo_ms:
+            issues.append(
+                f"recovery SLO failed: first post-reset completion at {delta}ms exceeds threshold {slo_ms}ms (reset ts_ms={ts})"
+            )
+
+
+def main():
+    parser = argparse.ArgumentParser(description="Validate Ceph reset stress artifacts")
+    parser.add_argument("--data-dir", required=True)
+    parser.add_argument("--file-count", required=True, type=int)
+    parser.add_argument("--io-log", required=True)
+    parser.add_argument("--rename-log", required=True)
+    parser.add_argument("--reset-log", required=True)
+    parser.add_argument("--status-final", required=False, default="")
+    parser.add_argument("--slo-seconds", required=False, type=int, default=30)
+    parser.add_argument("--expect-reset", action="store_true")
+    parser.add_argument("--report-json", required=False, default="")
+    args = parser.parse_args()
+
+    data_dir = Path(args.data_dir)
+    io_log = Path(args.io_log)
+    rename_log = Path(args.rename_log)
+    reset_log = Path(args.reset_log)
+    status_final = Path(args.status_final) if args.status_final else Path("__missing_status__")
+
+    issues = []
+
+    if not data_dir.exists():
+        issues.append(f"data directory is missing: {data_dir}")
+
+    try:
+        io_records = parse_io_log(io_log)
+        rename_records = parse_rename_log(rename_log)
+        reset_records = parse_reset_log(reset_log)
+    except Exception as exc:
+        issues.append(f"log parsing failed: {exc}")
+        io_records = []
+        rename_records = []
+        reset_records = []
+
+    status = parse_status_file(status_final)
+
+    actual_locations, actual_paths = validate_namespace(data_dir, args.file_count, issues)
+    validate_rename_invariant(rename_records, actual_locations, issues)
+    validate_data_invariant(io_records, actual_paths, issues)
+    validate_reset_and_slo(args, reset_records, io_records, rename_records, status, issues)
+
+    report = {
+        "file_count": args.file_count,
+        "io_records": len(io_records),
+        "rename_records": len(rename_records),
+        "reset_records": len(reset_records),
+        "expect_reset": args.expect_reset,
+        "issues": issues,
+    }
+
+    if args.report_json:
+        report_path = Path(args.report_json)
+        report_path.write_text(json.dumps(report, indent=2, sort_keys=True), encoding="utf-8")
+
+    if issues:
+        print("FAIL: consistency validation found issues")
+        for issue in issues:
+            print(f"  - {issue}")
+        raise SystemExit(1)
+
+    print("PASS: consistency validation succeeded")
+
+
+if __name__ == "__main__":
+    main()
-- 
2.34.1


  parent reply	other threads:[~2026-05-07 12:27 UTC|newest]

Thread overview: 24+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-05-07 12:27 [PATCH v4 00/11] ceph: manual client session reset Alex Markuze
2026-05-07 12:27 ` [PATCH v4 01/11] ceph: convert inode flags to named bit positions and atomic bitops Alex Markuze
2026-05-07 18:35   ` Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 02/11] ceph: use proper endian conversion for flock_len in reconnect Alex Markuze
2026-05-07 12:27 ` [PATCH v4 03/11] ceph: harden send_mds_reconnect and handle active-MDS peer reset Alex Markuze
2026-05-07 18:43   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 04/11] ceph: add diagnostic timeout loop to wait_caps_flush() Alex Markuze
2026-05-07 19:01   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 05/11] ceph: add client reset state machine and session teardown Alex Markuze
2026-05-07 19:17   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 06/11] ceph: add manual reset debugfs control and tracepoints Alex Markuze
2026-05-07 19:22   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` Alex Markuze [this message]
2026-05-07 19:24   ` [EXTERNAL] [PATCH v4 07/11] selftests: ceph: add reset consistency checker Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 08/11] selftests: ceph: add reset stress test Alex Markuze
2026-05-07 19:29   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 09/11] selftests: ceph: add reset corner-case tests Alex Markuze
2026-05-07 19:31   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 10/11] selftests: ceph: add validation harness Alex Markuze
2026-05-07 19:33   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 11/11] selftests: ceph: wire up Ceph reset kselftests and documentation Alex Markuze
2026-05-07 19:38   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 18:28 ` [EXTERNAL] [PATCH v4 00/11] ceph: manual client session reset Viacheslav Dubeyko
2026-05-08 17:49   ` Viacheslav Dubeyko

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20260507122737.2804094-8-amarkuze@redhat.com \
    --to=amarkuze@redhat.com \
    --cc=ceph-devel@vger.kernel.org \
    --cc=idryomov@gmail.com \
    --cc=linux-kernel@vger.kernel.org \
    --cc=vdubeyko@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox