The Linux Kernel Mailing List
 help / color / mirror / Atom feed
From: Viacheslav Dubeyko <vdubeyko@redhat.com>
To: Alex Markuze <amarkuze@redhat.com>, ceph-devel@vger.kernel.org
Cc: linux-kernel@vger.kernel.org, idryomov@gmail.com
Subject: Re: [EXTERNAL] [PATCH v4 07/11] selftests: ceph: add reset consistency checker
Date: Thu, 07 May 2026 12:24:30 -0700	[thread overview]
Message-ID: <c7ec42b9399464cfc0c4f7707df810e07e4a2630.camel@redhat.com> (raw)
In-Reply-To: <20260507122737.2804094-8-amarkuze@redhat.com>

On Thu, 2026-05-07 at 12:27 +0000, Alex Markuze wrote:
> Add a Python post-run validator for the CephFS client reset stress
> test.  The script reads data files written by the stress runner and
> checks that every file was either written completely or is missing,
> with no partial or corrupted content.
> 
> This is a prerequisite for the stress test script which invokes it
> after each run.
> 
> Signed-off-by: Alex Markuze <amarkuze@redhat.com>
> ---
>  .../filesystems/ceph/validate_consistency.py  | 297 ++++++++++++++++++
>  1 file changed, 297 insertions(+)
>  create mode 100755 tools/testing/selftests/filesystems/ceph/validate_consistency.py
> 
> diff --git a/tools/testing/selftests/filesystems/ceph/validate_consistency.py b/tools/testing/selftests/filesystems/ceph/validate_consistency.py
> new file mode 100755
> index 000000000000..c230a59bdb3a
> --- /dev/null
> +++ b/tools/testing/selftests/filesystems/ceph/validate_consistency.py
> @@ -0,0 +1,297 @@
> +#!/usr/bin/env python3
> +# SPDX-License-Identifier: GPL-2.0
> +
> +import argparse
> +import bisect
> +import hashlib
> +import json
> +import os
> +from pathlib import Path
> +
> +
> +def sha256_file(path: Path) -> str:
> +    digest = hashlib.sha256()
> +    with path.open("rb") as handle:
> +        while True:
> +            chunk = handle.read(1 << 20)
> +            if not chunk:
> +                break
> +            digest.update(chunk)
> +    return digest.hexdigest()
> +
> +
> +def parse_io_log(path: Path):
> +    records = []
> +    if not path.exists():
> +        return records
> +    with path.open("r", encoding="utf-8") as handle:
> +        for line_no, line in enumerate(handle, 1):
> +            line = line.strip()
> +            if not line:
> +                continue
> +            parts = line.split(",")
> +            if len(parts) != 5:
> +                raise ValueError(f"io log line {line_no}: expected 5 columns, got {len(parts)}")
> +            ts_ms, seq, logical_id, relpath, digest = parts
> +            records.append(
> +                {
> +                    "ts_ms": int(ts_ms),
> +                    "seq": int(seq),
> +                    "logical_id": int(logical_id),
> +                    "relpath": relpath,
> +                    "digest": digest,
> +                }
> +            )
> +    return records
> +
> +
> +def parse_rename_log(path: Path):
> +    records = []
> +    if not path.exists():
> +        return records
> +    with path.open("r", encoding="utf-8") as handle:
> +        for line_no, line in enumerate(handle, 1):
> +            line = line.strip()
> +            if not line:
> +                continue
> +            parts = line.split(",")
> +            if len(parts) == 6:
> +                ts_ms, seq, logical_id, src_rel, dst_rel, rc = parts
> +            elif len(parts) == 7:
> +                ts_ms, worker_id, seq, logical_id, src_rel, dst_rel, rc = parts
> +                _ = worker_id  # worker id is informational only
> +            else:
> +                raise ValueError(
> +                    f"rename log line {line_no}: expected 6 or 7 columns, got {len(parts)}"
> +                )
> +            records.append(
> +                {
> +                    "ts_ms": int(ts_ms),
> +                    "seq": int(seq),
> +                    "logical_id": int(logical_id),
> +                    "src_rel": src_rel,
> +                    "dst_rel": dst_rel,
> +                    "rc": int(rc),
> +                }
> +            )
> +    return records
> +
> +
> +def parse_reset_log(path: Path):
> +    records = []
> +    if not path.exists():
> +        return records
> +    with path.open("r", encoding="utf-8") as handle:
> +        for line_no, line in enumerate(handle, 1):
> +            line = line.strip()
> +            if not line:
> +                continue
> +            parts = line.split(",")
> +            if len(parts) != 4:
> +                raise ValueError(f"reset log line {line_no}: expected 4 columns, got {len(parts)}")
> +            ts_ms, seq, reason, rc = parts
> +            records.append(
> +                {
> +                    "ts_ms": int(ts_ms),
> +                    "seq": int(seq),
> +                    "reason": reason,
> +                    "rc": int(rc),
> +                }
> +            )
> +    return records
> +
> +
> +def parse_status_file(path: Path):
> +    status = {}
> +    if not path.exists():
> +        return status
> +    with path.open("r", encoding="utf-8") as handle:
> +        for line in handle:
> +            line = line.strip()
> +            if not line or ":" not in line:
> +                continue
> +            key, value = line.split(":", 1)
> +            status[key.strip()] = value.strip()
> +    return status
> +
> +
> +def to_int(value: str, default: int = 0):
> +    try:
> +        return int(value)
> +    except Exception:
> +        return default
> +
> +
> +def validate_namespace(data_dir: Path, file_count: int, issues):
> +    actual_locations = {}
> +    actual_paths = {}
> +    for logical_id in range(file_count):
> +        name = f"file_{logical_id:05d}"
> +        found = []
> +        for subdir in ("A", "B"):
> +            candidate = data_dir / subdir / name
> +            if candidate.exists():
> +                found.append((subdir, candidate))
> +        if len(found) != 1:
> +            issues.append(
> +                f"namespace invariant failed for logical_id={logical_id:05d}: expected exactly one file in A/B, found {len(found)}"
> +            )
> +            continue
> +        actual_locations[logical_id] = found[0][0]
> +        actual_paths[logical_id] = found[0][1]
> +    return actual_locations, actual_paths
> +
> +
> +def validate_rename_invariant(rename_records, actual_locations, issues):
> +    expected_locations = {}
> +    for rec in rename_records:
> +        if rec["rc"] != 0:
> +            continue
> +        dst = rec["dst_rel"]
> +        if "/" not in dst:
> +            continue
> +        expected_locations[rec["logical_id"]] = dst.split("/", 1)[0]
> +
> +    for logical_id, expected in expected_locations.items():
> +        actual = actual_locations.get(logical_id)
> +        if actual is None:
> +            continue
> +        if actual != expected:
> +            issues.append(
> +                f"rename invariant failed for logical_id={logical_id:05d}: expected location={expected}, actual={actual}"
> +            )
> +
> +
> +def validate_data_invariant(io_records, actual_paths, issues):
> +    expected_hash = {}
> +    for rec in io_records:
> +        digest = rec["digest"]
> +        if not digest:
> +            continue
> +        expected_hash[rec["logical_id"]] = digest
> +
> +    for logical_id, digest in expected_hash.items():
> +        path = actual_paths.get(logical_id)
> +        if path is None:
> +            continue
> +        actual_digest = sha256_file(path)
> +        if digest != actual_digest:
> +            issues.append(
> +                f"data invariant failed for logical_id={logical_id:05d}: expected digest={digest}, actual digest={actual_digest}"
> +            )
> +
> +
> +def validate_reset_and_slo(args, reset_records, io_records, rename_records, status, issues):
> +    if not args.expect_reset:
> +        return
> +
> +    successful_reset_times = [rec["ts_ms"] for rec in reset_records if rec["rc"] == 0]
> +    if not successful_reset_times:
> +        issues.append("expected reset activity but no successful reset trigger was observed")
> +
> +    phase = status.get("phase")
> +    blocked_requests = to_int(status.get("blocked_requests", "0"), default=-1)
> +    last_errno = to_int(status.get("last_errno", "0"), default=1)
> +    failure_count = to_int(status.get("failure_count", "0"), default=-1)
> +
> +    if phase is None:
> +        issues.append("missing final reset status file or phase field")
> +    elif phase.lower() != "idle":
> +        issues.append(f"recovery invariant failed: phase={phase}, expected idle")
> +
> +    if blocked_requests != 0:
> +        issues.append(f"recovery invariant failed: blocked_requests={blocked_requests}, expected 0")
> +    if last_errno != 0:
> +        issues.append(f"recovery invariant failed: last_errno={last_errno}, expected 0")
> +    if failure_count > 0:
> +        issues.append(
> +            f"recovery invariant failed: failure_count={failure_count}, "
> +            "one or more resets failed during the run"
> +        )
> +
> +    op_times = [rec["ts_ms"] for rec in io_records]
> +    op_times.extend(rec["ts_ms"] for rec in rename_records if rec["rc"] == 0)
> +    op_times.sort()
> +
> +    if successful_reset_times and not op_times:
> +        issues.append("recovery SLO failed: no workload completion events were recorded")
> +        return
> +
> +    slo_ms = args.slo_seconds * 1000
> +    for ts in successful_reset_times:
> +        idx = bisect.bisect_left(op_times, ts)
> +        if idx >= len(op_times):
> +            issues.append(f"recovery SLO failed: no operation completion observed after reset at ts_ms={ts}")
> +            continue
> +        delta = op_times[idx] - ts
> +        if delta > slo_ms:
> +            issues.append(
> +                f"recovery SLO failed: first post-reset completion at {delta}ms exceeds threshold {slo_ms}ms (reset ts_ms={ts})"
> +            )
> +
> +
> +def main():
> +    parser = argparse.ArgumentParser(description="Validate Ceph reset stress artifacts")
> +    parser.add_argument("--data-dir", required=True)
> +    parser.add_argument("--file-count", required=True, type=int)
> +    parser.add_argument("--io-log", required=True)
> +    parser.add_argument("--rename-log", required=True)
> +    parser.add_argument("--reset-log", required=True)
> +    parser.add_argument("--status-final", required=False, default="")
> +    parser.add_argument("--slo-seconds", required=False, type=int, default=30)
> +    parser.add_argument("--expect-reset", action="store_true")
> +    parser.add_argument("--report-json", required=False, default="")
> +    args = parser.parse_args()
> +
> +    data_dir = Path(args.data_dir)
> +    io_log = Path(args.io_log)
> +    rename_log = Path(args.rename_log)
> +    reset_log = Path(args.reset_log)
> +    status_final = Path(args.status_final) if args.status_final else Path("__missing_status__")
> +
> +    issues = []
> +
> +    if not data_dir.exists():
> +        issues.append(f"data directory is missing: {data_dir}")
> +
> +    try:
> +        io_records = parse_io_log(io_log)
> +        rename_records = parse_rename_log(rename_log)
> +        reset_records = parse_reset_log(reset_log)
> +    except Exception as exc:
> +        issues.append(f"log parsing failed: {exc}")
> +        io_records = []
> +        rename_records = []
> +        reset_records = []
> +
> +    status = parse_status_file(status_final)
> +
> +    actual_locations, actual_paths = validate_namespace(data_dir, args.file_count, issues)
> +    validate_rename_invariant(rename_records, actual_locations, issues)
> +    validate_data_invariant(io_records, actual_paths, issues)
> +    validate_reset_and_slo(args, reset_records, io_records, rename_records, status, issues)
> +
> +    report = {
> +        "file_count": args.file_count,
> +        "io_records": len(io_records),
> +        "rename_records": len(rename_records),
> +        "reset_records": len(reset_records),
> +        "expect_reset": args.expect_reset,
> +        "issues": issues,
> +    }
> +
> +    if args.report_json:
> +        report_path = Path(args.report_json)
> +        report_path.write_text(json.dumps(report, indent=2, sort_keys=True), encoding="utf-8")
> +
> +    if issues:
> +        print("FAIL: consistency validation found issues")
> +        for issue in issues:
> +            print(f"  - {issue}")
> +        raise SystemExit(1)
> +
> +    print("PASS: consistency validation succeeded")
> +
> +
> +if __name__ == "__main__":
> +    main()

Reviewed-by: Viacheslav Dubeyko <Slava.Dubeyko@ibm.com>

Thanks,
Slava.


  reply	other threads:[~2026-05-07 19:24 UTC|newest]

Thread overview: 24+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2026-05-07 12:27 [PATCH v4 00/11] ceph: manual client session reset Alex Markuze
2026-05-07 12:27 ` [PATCH v4 01/11] ceph: convert inode flags to named bit positions and atomic bitops Alex Markuze
2026-05-07 18:35   ` Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 02/11] ceph: use proper endian conversion for flock_len in reconnect Alex Markuze
2026-05-07 12:27 ` [PATCH v4 03/11] ceph: harden send_mds_reconnect and handle active-MDS peer reset Alex Markuze
2026-05-07 18:43   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 04/11] ceph: add diagnostic timeout loop to wait_caps_flush() Alex Markuze
2026-05-07 19:01   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 05/11] ceph: add client reset state machine and session teardown Alex Markuze
2026-05-07 19:17   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 06/11] ceph: add manual reset debugfs control and tracepoints Alex Markuze
2026-05-07 19:22   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 07/11] selftests: ceph: add reset consistency checker Alex Markuze
2026-05-07 19:24   ` Viacheslav Dubeyko [this message]
2026-05-07 12:27 ` [PATCH v4 08/11] selftests: ceph: add reset stress test Alex Markuze
2026-05-07 19:29   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 09/11] selftests: ceph: add reset corner-case tests Alex Markuze
2026-05-07 19:31   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 10/11] selftests: ceph: add validation harness Alex Markuze
2026-05-07 19:33   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 12:27 ` [PATCH v4 11/11] selftests: ceph: wire up Ceph reset kselftests and documentation Alex Markuze
2026-05-07 19:38   ` [EXTERNAL] " Viacheslav Dubeyko
2026-05-07 18:28 ` [EXTERNAL] [PATCH v4 00/11] ceph: manual client session reset Viacheslav Dubeyko
2026-05-08 17:49   ` Viacheslav Dubeyko

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=c7ec42b9399464cfc0c4f7707df810e07e4a2630.camel@redhat.com \
    --to=vdubeyko@redhat.com \
    --cc=amarkuze@redhat.com \
    --cc=ceph-devel@vger.kernel.org \
    --cc=idryomov@gmail.com \
    --cc=linux-kernel@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox