linux-fsdevel.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
From: Viacheslav Dubeyko <Slava.Dubeyko@ibm.com>
To: Alex Markuze <amarkuze@redhat.com>,
	"ceph-devel@vger.kernel.org" <ceph-devel@vger.kernel.org>
Cc: Viacheslav Dubeyko <vdubeyko@redhat.com>,
	"idryomov@gmail.com" <idryomov@gmail.com>,
	"linux-fsdevel@vger.kernel.org" <linux-fsdevel@vger.kernel.org>
Subject: Re:  [PATCH v2 3/3] ceph: add subvolume metrics collection and reporting
Date: Tue, 2 Dec 2025 22:54:14 +0000	[thread overview]
Message-ID: <0011d9cc8b461616324a70211cc9e1b3b1ea5d0e.camel@ibm.com> (raw)
In-Reply-To: <20251202155750.2565696-4-amarkuze@redhat.com>

On Tue, 2025-12-02 at 15:57 +0000, Alex Markuze wrote:
> Add complete subvolume metrics infrastructure for tracking and reporting
> per-subvolume I/O metrics to the MDS. This enables administrators to
> monitor I/O patterns at the subvolume granularity.
> 
> The implementation includes:
> 
> - New CEPHFS_FEATURE_SUBVOLUME_METRICS feature flag for MDS negotiation
> - Red-black tree based metrics tracker (subvolume_metrics.c/h)
> - Wire format encoding matching the MDS C++ AggregatedIOMetrics struct
> - Integration with the existing metrics reporting infrastructure
> - Recording of I/O operations from file read/write paths
> - Debugfs interface for monitoring collected metrics
> 
> Metrics tracked per subvolume:
> - Read/write operation counts
> - Read/write byte counts
> - Read/write latency sums (for average calculation)
> 
> The metrics are periodically sent to the MDS as part of the existing
> CLIENT_METRICS message when the MDS advertises support for the
> SUBVOLUME_METRICS feature.
> 
> Signed-off-by: Alex Markuze <amarkuze@redhat.com>
> ---
>  fs/ceph/Makefile            |   2 +-
>  fs/ceph/addr.c              |  10 +
>  fs/ceph/debugfs.c           | 153 ++++++++++++++
>  fs/ceph/file.c              |  58 ++++-
>  fs/ceph/mds_client.c        |  70 +++++--
>  fs/ceph/mds_client.h        |  13 +-
>  fs/ceph/metric.c            | 172 ++++++++++++++-
>  fs/ceph/metric.h            |  27 ++-
>  fs/ceph/subvolume_metrics.c | 408 ++++++++++++++++++++++++++++++++++++
>  fs/ceph/subvolume_metrics.h |  68 ++++++
>  fs/ceph/super.c             |   1 +
>  fs/ceph/super.h             |   1 +
>  12 files changed, 957 insertions(+), 26 deletions(-)
>  create mode 100644 fs/ceph/subvolume_metrics.c
>  create mode 100644 fs/ceph/subvolume_metrics.h
> 
> diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
> index 1f77ca04c426..ebb29d11ac22 100644
> --- a/fs/ceph/Makefile
> +++ b/fs/ceph/Makefile
> @@ -8,7 +8,7 @@ obj-$(CONFIG_CEPH_FS) += ceph.o
>  ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
>  	export.o caps.o snap.o xattr.o quota.o io.o \
>  	mds_client.o mdsmap.o strings.o ceph_frag.o \
> -	debugfs.o util.o metric.o
> +	debugfs.o util.o metric.o subvolume_metrics.o
>  
>  ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
>  ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o
> diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
> index 322ed268f14a..feae80dc2816 100644
> --- a/fs/ceph/addr.c
> +++ b/fs/ceph/addr.c
> @@ -19,6 +19,7 @@
>  #include "mds_client.h"
>  #include "cache.h"
>  #include "metric.h"
> +#include "subvolume_metrics.h"
>  #include "crypto.h"
>  #include <linux/ceph/osd_client.h>
>  #include <linux/ceph/striper.h>
> @@ -823,6 +824,10 @@ static int write_folio_nounlock(struct folio *folio,
>  
>  	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
>  				  req->r_end_latency, len, err);
> +	if (err >= 0 && len > 0)

Do we really need to take into account the err value here?

> +		ceph_subvolume_metrics_record_io(fsc->mdsc, ci, true, len,
> +						 req->r_start_latency,
> +						 req->r_end_latency);
>  	fscrypt_free_bounce_page(bounce_page);
>  	ceph_osdc_put_request(req);
>  	if (err == 0)
> @@ -963,6 +968,11 @@ static void writepages_finish(struct ceph_osd_request *req)
>  	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
>  				  req->r_end_latency, len, rc);
>  
> +	if (rc >= 0 && len > 0)

Ditto. Do we really need to take into account the rc value?

> +		ceph_subvolume_metrics_record_io(mdsc, ci, true, len,
> +						 req->r_start_latency,
> +						 req->r_end_latency);
> +
>  	ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
>  
>  	osd_data = osd_req_op_extent_osd_data(req, 0);
> diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
> index f3fe786b4143..a7b1c63783a1 100644
> --- a/fs/ceph/debugfs.c
> +++ b/fs/ceph/debugfs.c
> @@ -9,11 +9,13 @@
>  #include <linux/seq_file.h>
>  #include <linux/math64.h>
>  #include <linux/ktime.h>
> +#include <linux/atomic.h>
>  
>  #include <linux/ceph/libceph.h>
>  #include <linux/ceph/mon_client.h>
>  #include <linux/ceph/auth.h>
>  #include <linux/ceph/debugfs.h>
> +#include <linux/ceph/decode.h>
>  
>  #include "super.h"
>  
> @@ -21,6 +23,31 @@
>  
>  #include "mds_client.h"
>  #include "metric.h"
> +#include "subvolume_metrics.h"
> +
> +extern bool disable_send_metrics;

Maybe, it should be part of some structure? This solution looks not very nice.

> +
> +struct ceph_session_feature_desc {
> +	unsigned int bit;

What bit means here? Maybe, it makes sense to have comments for the structure?

> +	const char *name;
> +};
> +
> +static const struct ceph_session_feature_desc ceph_session_feature_table[] = {
> +	{ CEPHFS_FEATURE_METRIC_COLLECT, "METRIC_COLLECT" },
> +	{ CEPHFS_FEATURE_REPLY_ENCODING, "REPLY_ENCODING" },
> +	{ CEPHFS_FEATURE_RECLAIM_CLIENT, "RECLAIM_CLIENT" },
> +	{ CEPHFS_FEATURE_LAZY_CAP_WANTED, "LAZY_CAP_WANTED" },
> +	{ CEPHFS_FEATURE_MULTI_RECONNECT, "MULTI_RECONNECT" },
> +	{ CEPHFS_FEATURE_DELEG_INO, "DELEG_INO" },
> +	{ CEPHFS_FEATURE_ALTERNATE_NAME, "ALTERNATE_NAME" },
> +	{ CEPHFS_FEATURE_NOTIFY_SESSION_STATE, "NOTIFY_SESSION_STATE" },
> +	{ CEPHFS_FEATURE_OP_GETVXATTR, "OP_GETVXATTR" },
> +	{ CEPHFS_FEATURE_32BITS_RETRY_FWD, "32BITS_RETRY_FWD" },
> +	{ CEPHFS_FEATURE_NEW_SNAPREALM_INFO, "NEW_SNAPREALM_INFO" },
> +	{ CEPHFS_FEATURE_HAS_OWNER_UIDGID, "HAS_OWNER_UIDGID" },
> +	{ CEPHFS_FEATURE_MDS_AUTH_CAPS_CHECK, "MDS_AUTH_CAPS_CHECK" },
> +	{ CEPHFS_FEATURE_SUBVOLUME_METRICS, "SUBVOLUME_METRICS" },
> +};
>  
>  static int mdsmap_show(struct seq_file *s, void *p)
>  {
> @@ -360,6 +387,60 @@ static int status_show(struct seq_file *s, void *p)
>  	return 0;
>  }
>  
> +static int subvolume_metrics_show(struct seq_file *s, void *p)
> +{
> +	struct ceph_fs_client *fsc = s->private;
> +	struct ceph_mds_client *mdsc = fsc->mdsc;
> +	struct ceph_subvol_metric_snapshot *snapshot = NULL;
> +	u32 nr = 0;
> +	u64 total_sent = 0;
> +	u64 nonzero_sends = 0;
> +	u32 i;
> +
> +	if (!mdsc) {

Does it really possible that mdsc could be not available? Could we have race
conditions here? Because, I don't see any use of lock here.

> +		seq_puts(s, "mds client unavailable\n");
> +		return 0;
> +	}
> +
> +	mutex_lock(&mdsc->subvol_metrics_last_mutex);
> +	if (mdsc->subvol_metrics_last && mdsc->subvol_metrics_last_nr) {
> +		nr = mdsc->subvol_metrics_last_nr;

Could be nr unreasonably huge enough here? Does it make sense to check this
value before kmemdup call?

Maybe, kmemdup_array() will be more useful here?

> +		snapshot = kmemdup(mdsc->subvol_metrics_last,
> +				   nr * sizeof(*snapshot),
> +				   GFP_KERNEL);
> +		if (!snapshot)
> +			nr = 0;
> +	}
> +	total_sent = mdsc->subvol_metrics_sent;
> +	nonzero_sends = mdsc->subvol_metrics_nonzero_sends;
> +	mutex_unlock(&mdsc->subvol_metrics_last_mutex);
> +
> +	seq_puts(s, "Last sent subvolume metrics:\n");
> +	if (!nr) {
> +		seq_puts(s, "  (none)\n");
> +	} else {
> +		seq_puts(s, "  subvol_id          rd_ops    wr_ops    rd_bytes       wr_bytes       rd_lat_us      wr_lat_us\n");
> +		for (i = 0; i < nr; i++) {
> +			const struct ceph_subvol_metric_snapshot *e = &snapshot[i];
> +
> +			seq_printf(s, "  %-18llu %-9llu %-9llu %-14llu %-14llu %-14llu %-14llu\n",
> +				   e->subvolume_id,
> +				   e->read_ops, e->write_ops,
> +				   e->read_bytes, e->write_bytes,
> +				   e->read_latency_us, e->write_latency_us);
> +		}
> +	}
> +	kfree(snapshot);
> +
> +	seq_puts(s, "\nStatistics:\n");
> +	seq_printf(s, "  entries_sent:      %llu\n", total_sent);
> +	seq_printf(s, "  non_zero_sends:    %llu\n", nonzero_sends);
> +
> +	seq_puts(s, "\nPending (unsent) subvolume metrics:\n");
> +	ceph_subvolume_metrics_dump(&mdsc->subvol_metrics, s);
> +	return 0;
> +}
> +
>  DEFINE_SHOW_ATTRIBUTE(mdsmap);
>  DEFINE_SHOW_ATTRIBUTE(mdsc);
>  DEFINE_SHOW_ATTRIBUTE(caps);
> @@ -369,7 +450,72 @@ DEFINE_SHOW_ATTRIBUTE(metrics_file);
>  DEFINE_SHOW_ATTRIBUTE(metrics_latency);
>  DEFINE_SHOW_ATTRIBUTE(metrics_size);
>  DEFINE_SHOW_ATTRIBUTE(metrics_caps);
> +DEFINE_SHOW_ATTRIBUTE(subvolume_metrics);
> +
> +static int metric_features_show(struct seq_file *s, void *p)
> +{
> +	struct ceph_fs_client *fsc = s->private;
> +	struct ceph_mds_client *mdsc = fsc->mdsc;
> +	unsigned long session_features = 0;
> +	bool have_session = false;
> +	bool metric_collect = false;
> +	bool subvol_support = false;
> +	bool metrics_enabled = false;
> +	bool subvol_enabled = false;
> +	int i;
> +
> +	if (!mdsc) {

Ditto. Please, see my questions above.

> +		seq_puts(s, "mds client unavailable\n");
> +		return 0;
> +	}
> +
> +	mutex_lock(&mdsc->mutex);
> +	if (mdsc->metric.session) {
> +		have_session = true;
> +		session_features = mdsc->metric.session->s_features;
> +	}
> +	mutex_unlock(&mdsc->mutex);
> +
> +	if (have_session) {
> +		metric_collect =
> +			test_bit(CEPHFS_FEATURE_METRIC_COLLECT,
> +				 &session_features);
> +		subvol_support =
> +			test_bit(CEPHFS_FEATURE_SUBVOLUME_METRICS,
> +				 &session_features);
> +	}
> +
> +	metrics_enabled = !disable_send_metrics && have_session && metric_collect;
> +	subvol_enabled = metrics_enabled && subvol_support;

Maybe, static inline function can contain this check?

> +
> +	seq_printf(s,
> +		   "metrics_enabled: %s (disable_send_metrics=%d, session=%s, metric_collect=%s)\n",
> +		   metrics_enabled ? "yes" : "no",
> +		   disable_send_metrics ? 1 : 0,
> +		   have_session ? "yes" : "no",
> +		   metric_collect ? "yes" : "no");
> +	seq_printf(s, "subvolume_metrics_enabled: %s\n",
> +		   subvol_enabled ? "yes" : "no");
> +	seq_printf(s, "session_feature_bits: 0x%lx\n", session_features);
> +
> +	if (!have_session) {
> +		seq_puts(s, "(no active MDS session for metrics)\n");
> +		return 0;
> +	}
> +
> +	for (i = 0; i < ARRAY_SIZE(ceph_session_feature_table); i++) {
> +		const struct ceph_session_feature_desc *desc =
> +			&ceph_session_feature_table[i];
> +		bool set = test_bit(desc->bit, &session_features);
> +
> +		seq_printf(s, "  %-24s : %s\n", desc->name,
> +			   set ? "yes" : "no");
> +	}
> +
> +	return 0;
> +}
>  
> +DEFINE_SHOW_ATTRIBUTE(metric_features);
>  
>  /*
>   * debugfs
> @@ -404,6 +550,7 @@ void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc)
>  	debugfs_remove(fsc->debugfs_caps);
>  	debugfs_remove(fsc->debugfs_status);
>  	debugfs_remove(fsc->debugfs_mdsc);
> +	debugfs_remove(fsc->debugfs_subvolume_metrics);
>  	debugfs_remove_recursive(fsc->debugfs_metrics_dir);
>  	doutc(fsc->client, "done\n");
>  }
> @@ -468,6 +615,12 @@ void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
>  			    &metrics_size_fops);
>  	debugfs_create_file("caps", 0400, fsc->debugfs_metrics_dir, fsc,
>  			    &metrics_caps_fops);
> +	debugfs_create_file("metric_features", 0400, fsc->debugfs_metrics_dir,
> +			    fsc, &metric_features_fops);
> +	fsc->debugfs_subvolume_metrics =
> +		debugfs_create_file("subvolumes", 0400,
> +				    fsc->debugfs_metrics_dir, fsc,
> +				    &subvolume_metrics_fops);
>  	doutc(fsc->client, "done\n");
>  }
>  
> diff --git a/fs/ceph/file.c b/fs/ceph/file.c
> index 99b30f784ee2..8c0e29c464b7 100644
> --- a/fs/ceph/file.c
> +++ b/fs/ceph/file.c
> @@ -19,6 +19,19 @@
>  #include "cache.h"
>  #include "io.h"
>  #include "metric.h"
> +#include "subvolume_metrics.h"
> +
> +static inline void ceph_record_subvolume_io(struct inode *inode, bool is_write,
> +					    ktime_t start, ktime_t end,
> +					    size_t bytes)
> +{
> +	if (!bytes)
> +		return;
> +
> +	ceph_subvolume_metrics_record_io(ceph_sb_to_mdsc(inode->i_sb),
> +					 ceph_inode(inode),
> +					 is_write, bytes, start, end);
> +}
>  
>  static __le32 ceph_flags_sys2wire(struct ceph_mds_client *mdsc, u32 flags)
>  {
> @@ -1140,6 +1153,11 @@ ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos,
>  					 req->r_start_latency,
>  					 req->r_end_latency,
>  					 read_len, ret);
> +		if (ret > 0)

What's about ret == 0? Do we need to take into account ret value at all?

> +			ceph_record_subvolume_io(inode, false,
> +						 req->r_start_latency,
> +						 req->r_end_latency,
> +						 ret);
>  
>  		if (ret > 0)
>  			objver = req->r_version;
> @@ -1385,12 +1403,23 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
>  
>  	/* r_start_latency == 0 means the request was not submitted */
>  	if (req->r_start_latency) {
> -		if (aio_req->write)
> +		if (aio_req->write) {
>  			ceph_update_write_metrics(metric, req->r_start_latency,
>  						  req->r_end_latency, len, rc);
> -		else
> +			if (rc >= 0 && len)
> +				ceph_record_subvolume_io(inode, true,
> +							 req->r_start_latency,
> +							 req->r_end_latency,
> +							 len);
> +		} else {
>  			ceph_update_read_metrics(metric, req->r_start_latency,
>  						 req->r_end_latency, len, rc);
> +			if (rc > 0)

What's about rc == 0?

> +				ceph_record_subvolume_io(inode, false,
> +							 req->r_start_latency,
> +							 req->r_end_latency,
> +							 rc);
> +		}
>  	}
>  
>  	put_bvecs(osd_data->bvec_pos.bvecs, osd_data->num_bvecs,
> @@ -1614,12 +1643,23 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
>  		ceph_osdc_start_request(req->r_osdc, req);
>  		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
>  
> -		if (write)
> +		if (write) {
>  			ceph_update_write_metrics(metric, req->r_start_latency,
>  						  req->r_end_latency, len, ret);
> -		else
> +			if (ret >= 0 && len)
> +				ceph_record_subvolume_io(inode, true,
> +							 req->r_start_latency,
> +							 req->r_end_latency,
> +							 len);
> +		} else {
>  			ceph_update_read_metrics(metric, req->r_start_latency,
>  						 req->r_end_latency, len, ret);
> +			if (ret > 0)

What's about ret == 0?

> +				ceph_record_subvolume_io(inode, false,
> +							 req->r_start_latency,
> +							 req->r_end_latency,
> +							 ret);
> +		}
>  
>  		size = i_size_read(inode);
>  		if (!write) {
> @@ -1872,6 +1912,11 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
>  						 req->r_start_latency,
>  						 req->r_end_latency,
>  						 read_len, ret);
> +			if (ret > 0)

What's about ret == 0?

> +				ceph_record_subvolume_io(inode, false,
> +							 req->r_start_latency,
> +							 req->r_end_latency,
> +							 ret);
>  
>  			/* Ok if object is not already present */
>  			if (ret == -ENOENT) {
> @@ -2036,6 +2081,11 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
>  
>  		ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
>  					  req->r_end_latency, len, ret);
> +		if (ret >= 0 && write_len)
> +			ceph_record_subvolume_io(inode, true,
> +						 req->r_start_latency,
> +						 req->r_end_latency,
> +						 write_len);
>  		ceph_osdc_put_request(req);
>  		if (ret != 0) {
>  			doutc(cl, "osd write returned %d\n", ret);
> diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
> index 099b8f22683b..2b831f48c844 100644
> --- a/fs/ceph/mds_client.c
> +++ b/fs/ceph/mds_client.c
> @@ -67,6 +67,22 @@ static void ceph_cap_reclaim_work(struct work_struct *work);
>  
>  static const struct ceph_connection_operations mds_con_ops;
>  
> +static void ceph_metric_bind_session(struct ceph_mds_client *mdsc,
> +				     struct ceph_mds_session *session)
> +{
> +	struct ceph_mds_session *old;
> +
> +	if (!mdsc || !session || disable_send_metrics)

Maybe, disable_send_metrics could be the part of struct ceph_mds_session or
struct ceph_mds_client?

> +		return;
> +
> +	old = mdsc->metric.session;
> +	mdsc->metric.session = ceph_get_mds_session(session);
> +	if (old)
> +		ceph_put_mds_session(old);
> +
> +	metric_schedule_delayed(&mdsc->metric);
> +}
> +
>  
>  /*
>   * mds reply parsing
> @@ -95,21 +111,23 @@ static int parse_reply_info_quota(void **p, void *end,
>  	return -EIO;
>  }
>  
> -/*
> - * parse individual inode info
> - */
>  static int parse_reply_info_in(void **p, void *end,
>  			       struct ceph_mds_reply_info_in *info,
> -			       u64 features)
> +			       u64 features,
> +			       struct ceph_mds_client *mdsc)
>  {
>  	int err = 0;
>  	u8 struct_v = 0;
> +	u8 struct_compat = 0;
> +	u32 struct_len = 0;
> +	struct ceph_client *cl = mdsc ? mdsc->fsc->client : NULL;
> +
> +	info->subvolume_id = 0;
> +	doutc(cl, "subv_metric parse start features=0x%llx\n", features);
>  
>  	info->subvolume_id = 0;
>  
>  	if (features == (u64)-1) {
> -		u32 struct_len;
> -		u8 struct_compat;
>  		ceph_decode_8_safe(p, end, struct_v, bad);
>  		ceph_decode_8_safe(p, end, struct_compat, bad);
>  		/* struct_v is expected to be >= 1. we only understand
> @@ -389,12 +407,13 @@ static int parse_reply_info_lease(void **p, void *end,
>   */
>  static int parse_reply_info_trace(void **p, void *end,
>  				  struct ceph_mds_reply_info_parsed *info,
> -				  u64 features)
> +				  u64 features,
> +				  struct ceph_mds_client *mdsc)
>  {
>  	int err;
>  
>  	if (info->head->is_dentry) {
> -		err = parse_reply_info_in(p, end, &info->diri, features);
> +		err = parse_reply_info_in(p, end, &info->diri, features, mdsc);
>  		if (err < 0)
>  			goto out_bad;
>  
> @@ -414,7 +433,8 @@ static int parse_reply_info_trace(void **p, void *end,
>  	}
>  
>  	if (info->head->is_target) {
> -		err = parse_reply_info_in(p, end, &info->targeti, features);
> +		err = parse_reply_info_in(p, end, &info->targeti, features,
> +					  mdsc);
>  		if (err < 0)
>  			goto out_bad;
>  	}
> @@ -435,7 +455,8 @@ static int parse_reply_info_trace(void **p, void *end,
>   */
>  static int parse_reply_info_readdir(void **p, void *end,
>  				    struct ceph_mds_request *req,
> -				    u64 features)
> +				    u64 features,
> +				    struct ceph_mds_client *mdsc)
>  {
>  	struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
>  	struct ceph_client *cl = req->r_mdsc->fsc->client;
> @@ -550,7 +571,7 @@ static int parse_reply_info_readdir(void **p, void *end,
>  		rde->name_len = oname.len;
>  
>  		/* inode */
> -		err = parse_reply_info_in(p, end, &rde->inode, features);
> +		err = parse_reply_info_in(p, end, &rde->inode, features, mdsc);
>  		if (err < 0)
>  			goto out_bad;
>  		/* ceph_readdir_prepopulate() will update it */
> @@ -758,7 +779,8 @@ static int parse_reply_info_extra(void **p, void *end,
>  	if (op == CEPH_MDS_OP_GETFILELOCK)
>  		return parse_reply_info_filelock(p, end, info, features);
>  	else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
> -		return parse_reply_info_readdir(p, end, req, features);
> +		return parse_reply_info_readdir(p, end, req, features,
> +						req->r_mdsc);
>  	else if (op == CEPH_MDS_OP_CREATE)
>  		return parse_reply_info_create(p, end, info, features, s);
>  	else if (op == CEPH_MDS_OP_GETVXATTR)
> @@ -787,7 +809,8 @@ static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
>  	ceph_decode_32_safe(&p, end, len, bad);
>  	if (len > 0) {
>  		ceph_decode_need(&p, end, len, bad);
> -		err = parse_reply_info_trace(&p, p+len, info, features);
> +		err = parse_reply_info_trace(&p, p + len, info, features,
> +					     s->s_mdsc);
>  		if (err < 0)
>  			goto out_bad;
>  	}
> @@ -796,7 +819,7 @@ static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
>  	ceph_decode_32_safe(&p, end, len, bad);
>  	if (len > 0) {
>  		ceph_decode_need(&p, end, len, bad);
> -		err = parse_reply_info_extra(&p, p+len, req, features, s);
> +		err = parse_reply_info_extra(&p, p + len, req, features, s);

Does this change really necessary? :)

>  		if (err < 0)
>  			goto out_bad;
>  	}
> @@ -4326,6 +4349,11 @@ static void handle_session(struct ceph_mds_session *session,
>  		}
>  		mdsc->s_cap_auths_num = cap_auths_num;
>  		mdsc->s_cap_auths = cap_auths;
> +
> +		session->s_features = features;
> +		if (test_bit(CEPHFS_FEATURE_METRIC_COLLECT,
> +			     &session->s_features))
> +			ceph_metric_bind_session(mdsc, session);
>  	}
>  	if (op == CEPH_SESSION_CLOSE) {
>  		ceph_get_mds_session(session);
> @@ -4352,7 +4380,11 @@ static void handle_session(struct ceph_mds_session *session,
>  			pr_info_client(cl, "mds%d reconnect success\n",
>  				       session->s_mds);
>  
> -		session->s_features = features;
> +		if (test_bit(CEPHFS_FEATURE_SUBVOLUME_METRICS,
> +			     &session->s_features))
> +			ceph_subvolume_metrics_enable(&mdsc->subvol_metrics, true);
> +		else
> +			ceph_subvolume_metrics_enable(&mdsc->subvol_metrics, false);
>  		if (session->s_state == CEPH_MDS_SESSION_OPEN) {
>  			pr_notice_client(cl, "mds%d is already opened\n",
>  					 session->s_mds);
> @@ -5591,6 +5623,12 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
>  	err = ceph_metric_init(&mdsc->metric);
>  	if (err)
>  		goto err_mdsmap;
> +	ceph_subvolume_metrics_init(&mdsc->subvol_metrics);
> +	mutex_init(&mdsc->subvol_metrics_last_mutex);
> +	mdsc->subvol_metrics_last = NULL;
> +	mdsc->subvol_metrics_last_nr = 0;
> +	mdsc->subvol_metrics_sent = 0;
> +	mdsc->subvol_metrics_nonzero_sends = 0;
>  
>  	spin_lock_init(&mdsc->dentry_list_lock);
>  	INIT_LIST_HEAD(&mdsc->dentry_leases);
> @@ -6123,6 +6161,8 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
>  	ceph_mdsc_stop(mdsc);
>  
>  	ceph_metric_destroy(&mdsc->metric);
> +	ceph_subvolume_metrics_destroy(&mdsc->subvol_metrics);
> +	kfree(mdsc->subvol_metrics_last);

Why do not free everything in ceph_subvolume_metrics_destroy()?

>  
>  	fsc->mdsc = NULL;
>  	kfree(mdsc);
> diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
> index bd3690baa65c..4e6c87f8414c 100644
> --- a/fs/ceph/mds_client.h
> +++ b/fs/ceph/mds_client.h
> @@ -18,6 +18,7 @@
>  
>  #include "mdsmap.h"
>  #include "metric.h"
> +#include "subvolume_metrics.h"
>  #include "super.h"
>  
>  /* The first 8 bits are reserved for old ceph releases */
> @@ -36,8 +37,9 @@ enum ceph_feature_type {
>  	CEPHFS_FEATURE_NEW_SNAPREALM_INFO,
>  	CEPHFS_FEATURE_HAS_OWNER_UIDGID,
>  	CEPHFS_FEATURE_MDS_AUTH_CAPS_CHECK,
> +	CEPHFS_FEATURE_SUBVOLUME_METRICS,
>  
> -	CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_MDS_AUTH_CAPS_CHECK,
> +	CEPHFS_FEATURE_MAX = CEPHFS_FEATURE_SUBVOLUME_METRICS,
>  };
>  
>  #define CEPHFS_FEATURES_CLIENT_SUPPORTED {	\
> @@ -54,6 +56,7 @@ enum ceph_feature_type {
>  	CEPHFS_FEATURE_32BITS_RETRY_FWD,	\
>  	CEPHFS_FEATURE_HAS_OWNER_UIDGID,	\
>  	CEPHFS_FEATURE_MDS_AUTH_CAPS_CHECK,	\
> +	CEPHFS_FEATURE_SUBVOLUME_METRICS,	\
>  }
>  
>  /*
> @@ -537,6 +540,14 @@ struct ceph_mds_client {
>  	struct list_head  dentry_dir_leases; /* lru list */
>  
>  	struct ceph_client_metric metric;
> +	struct ceph_subvolume_metrics_tracker subvol_metrics;
> +
> +	/* Subvolume metrics send tracking */
> +	struct mutex		subvol_metrics_last_mutex;
> +	struct ceph_subvol_metric_snapshot *subvol_metrics_last;
> +	u32			subvol_metrics_last_nr;
> +	u64			subvol_metrics_sent;
> +	u64			subvol_metrics_nonzero_sends;
>  
>  	spinlock_t		snapid_map_lock;
>  	struct rb_root		snapid_map_tree;
> diff --git a/fs/ceph/metric.c b/fs/ceph/metric.c
> index 871c1090e520..8ff6bcb50bc4 100644
> --- a/fs/ceph/metric.c
> +++ b/fs/ceph/metric.c
> @@ -4,10 +4,85 @@
>  #include <linux/types.h>
>  #include <linux/percpu_counter.h>
>  #include <linux/math64.h>
> +#include <linux/ratelimit.h>
> +
> +#include <linux/ceph/decode.h>
>  
>  #include "metric.h"
>  #include "mds_client.h"
>  
> +static DEFINE_RATELIMIT_STATE(metrics_no_session_rs, HZ, 1);
> +static bool metrics_disable_warned;

Ditto. Why does it not the part of any structure?

> +
> +static inline u32 ceph_subvolume_entry_payload_len(void)
> +{
> +	return sizeof(struct ceph_subvolume_metric_entry_wire);
> +}
> +
> +static inline u32 ceph_subvolume_entry_encoded_len(void)
> +{
> +	return CEPH_ENCODING_START_BLK_LEN +
> +		ceph_subvolume_entry_payload_len();
> +}
> +
> +static inline u32 ceph_subvolume_outer_payload_len(u32 nr_subvols)
> +{
> +	/* count is encoded as le64 (size_t on wire) to match FUSE client */
> +	return sizeof(__le64) +
> +		nr_subvols * ceph_subvolume_entry_encoded_len();
> +}
> +
> +static inline u32 ceph_subvolume_metric_data_len(u32 nr_subvols)
> +{
> +	return CEPH_ENCODING_START_BLK_LEN +
> +		ceph_subvolume_outer_payload_len(nr_subvols);
> +}
> +
> +static inline u32 ceph_subvolume_clamp_u32(u64 val)

What is the point of such function?

> +{
> +	return val > U32_MAX ? U32_MAX : (u32)val;
> +}
> +
> +static void ceph_init_subvolume_wire_entry(
> +	struct ceph_subvolume_metric_entry_wire *dst,
> +	const struct ceph_subvol_metric_snapshot *src)
> +{
> +	dst->subvolume_id = cpu_to_le64(src->subvolume_id);
> +	dst->read_ops = cpu_to_le32(ceph_subvolume_clamp_u32(src->read_ops));
> +	dst->write_ops = cpu_to_le32(ceph_subvolume_clamp_u32(src->write_ops));
> +	dst->read_bytes = cpu_to_le64(src->read_bytes);
> +	dst->write_bytes = cpu_to_le64(src->write_bytes);
> +	dst->read_latency_us = cpu_to_le64(src->read_latency_us);
> +	dst->write_latency_us = cpu_to_le64(src->write_latency_us);
> +	dst->time_stamp = 0;
> +}
> +
> +static int ceph_encode_subvolume_metrics(void **p, void *end,
> +					 struct ceph_subvol_metric_snapshot *subvols,
> +					 u32 nr_subvols)
> +{
> +	u32 i;
> +
> +	ceph_start_encoding(p, 1, 1,
> +			    ceph_subvolume_outer_payload_len(nr_subvols));
> +	/* count is encoded as le64 (size_t on wire) to match FUSE client */
> +	ceph_encode_64_safe(p, end, (u64)nr_subvols, enc_err);
> +
> +	for (i = 0; i < nr_subvols; i++) {
> +		struct ceph_subvolume_metric_entry_wire wire_entry;
> +
> +		ceph_init_subvolume_wire_entry(&wire_entry, &subvols[i]);
> +		ceph_start_encoding(p, 1, 1,
> +				    ceph_subvolume_entry_payload_len());
> +		ceph_encode_copy_safe(p, end, &wire_entry,
> +				      sizeof(wire_entry), enc_err);
> +	}
> +
> +	return 0;
> +enc_err:
> +	return -ERANGE;
> +}
> +
>  static void ktime_to_ceph_timespec(struct ceph_timespec *ts, ktime_t val)
>  {
>  	struct timespec64 t = ktime_to_timespec64(val);
> @@ -29,10 +104,14 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
>  	struct ceph_read_io_size *rsize;
>  	struct ceph_write_io_size *wsize;
>  	struct ceph_client_metric *m = &mdsc->metric;
> +	struct ceph_subvol_metric_snapshot *subvols = NULL;
>  	u64 nr_caps = atomic64_read(&m->total_caps);
>  	u32 header_len = sizeof(struct ceph_metric_header);
>  	struct ceph_client *cl = mdsc->fsc->client;
>  	struct ceph_msg *msg;
> +	u32 nr_subvols = 0;
> +	size_t subvol_len = 0;
> +	void *cursor;
>  	s64 sum;
>  	s32 items = 0;
>  	s32 len;
> @@ -45,15 +124,37 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
>  	}
>  	mutex_unlock(&mdsc->mutex);
>  
> +	if (ceph_subvolume_metrics_enabled(&mdsc->subvol_metrics) &&
> +	    test_bit(CEPHFS_FEATURE_SUBVOLUME_METRICS, &s->s_features)) {
> +		int ret;
> +
> +		ret = ceph_subvolume_metrics_snapshot(&mdsc->subvol_metrics,
> +						      &subvols, &nr_subvols,
> +						      true);
> +		if (ret) {
> +			pr_warn_client(cl, "failed to snapshot subvolume metrics: %d\n",
> +				       ret);
> +			nr_subvols = 0;
> +			subvols = NULL;
> +		}
> +	}
> +
> +	if (nr_subvols) {
> +		/* type (le32) + ENCODE_START payload - no metric header */
> +		subvol_len = sizeof(__le32) +
> +			     ceph_subvolume_metric_data_len(nr_subvols);
> +	}
> +
>  	len = sizeof(*head) + sizeof(*cap) + sizeof(*read) + sizeof(*write)
>  	      + sizeof(*meta) + sizeof(*dlease) + sizeof(*files)
>  	      + sizeof(*icaps) + sizeof(*inodes) + sizeof(*rsize)
> -	      + sizeof(*wsize);
> +	      + sizeof(*wsize) + subvol_len;
>  
>  	msg = ceph_msg_new(CEPH_MSG_CLIENT_METRICS, len, GFP_NOFS, true);
>  	if (!msg) {
>  		pr_err_client(cl, "to mds%d, failed to allocate message\n",
>  			      s->s_mds);
> +		kfree(subvols);

It is not clear here. Where subvols have been allocated before?

>  		return false;
>  	}
>  
> @@ -172,13 +273,56 @@ static bool ceph_mdsc_send_metrics(struct ceph_mds_client *mdsc,
>  	wsize->total_size = cpu_to_le64(m->metric[METRIC_WRITE].size_sum);
>  	items++;
>  
> +	cursor = wsize + 1;
> +
> +	if (nr_subvols) {
> +		void *payload;
> +		void *payload_end;
> +		int ret;
> +
> +		/* Emit only the type (le32), no ver/compat/data_len */
> +		ceph_encode_32(&cursor, CLIENT_METRIC_TYPE_SUBVOLUME_METRICS);
> +		items++;
> +
> +		payload = cursor;
> +		payload_end = (char *)payload +
> +			      ceph_subvolume_metric_data_len(nr_subvols);
> +
> +		ret = ceph_encode_subvolume_metrics(&payload, payload_end,
> +						    subvols, nr_subvols);
> +		if (ret) {
> +			pr_warn_client(cl,
> +				       "failed to encode subvolume metrics\n");
> +			kfree(subvols);

Ditto.

> +			ceph_msg_put(msg);
> +			return false;
> +		}
> +
> +		WARN_ON(payload != payload_end);
> +		cursor = payload;
> +	}
> +
>  	put_unaligned_le32(items, &head->num);
> -	msg->front.iov_len = len;
> +	msg->front.iov_len = (char *)cursor - (char *)head;
>  	msg->hdr.version = cpu_to_le16(1);
>  	msg->hdr.compat_version = cpu_to_le16(1);
>  	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
> +
>  	ceph_con_send(&s->s_con, msg);
>  
> +	if (nr_subvols) {
> +		mutex_lock(&mdsc->subvol_metrics_last_mutex);
> +		kfree(mdsc->subvol_metrics_last);
> +		mdsc->subvol_metrics_last = subvols;
> +		mdsc->subvol_metrics_last_nr = nr_subvols;
> +		mdsc->subvol_metrics_sent += nr_subvols;
> +		mdsc->subvol_metrics_nonzero_sends++;
> +		mutex_unlock(&mdsc->subvol_metrics_last_mutex);
> +
> +		subvols = NULL;
> +	}
> +	kfree(subvols);

Maybe, it makes sense to jump here from all places of calling kfree(subvols)?

> +
>  	return true;
>  }
>  
> @@ -201,6 +345,12 @@ static void metric_get_session(struct ceph_mds_client *mdsc)
>  		 */
>  		if (check_session_state(s) &&
>  		    test_bit(CEPHFS_FEATURE_METRIC_COLLECT, &s->s_features)) {
> +			if (ceph_subvolume_metrics_enabled(&mdsc->subvol_metrics) &&
> +			    !test_bit(CEPHFS_FEATURE_SUBVOLUME_METRICS,
> +				      &s->s_features)) {
> +				ceph_put_mds_session(s);
> +				continue;
> +			}
>  			mdsc->metric.session = s;
>  			break;
>  		}
> @@ -217,8 +367,17 @@ static void metric_delayed_work(struct work_struct *work)
>  	struct ceph_mds_client *mdsc =
>  		container_of(m, struct ceph_mds_client, metric);
>  
> -	if (mdsc->stopping || disable_send_metrics)
> +	if (mdsc->stopping)
> +		return;
> +
> +	if (disable_send_metrics) {
> +		if (!metrics_disable_warned) {
> +			pr_err("ceph: metrics worker disabled via module parameter\n");

It looks like not error but why pr_err() was used here?

> +			metrics_disable_warned = true;
> +		}
>  		return;
> +	}
> +	metrics_disable_warned = false;
>  
>  	if (!m->session || !check_session_state(m->session)) {
>  		if (m->session) {
> @@ -227,10 +386,15 @@ static void metric_delayed_work(struct work_struct *work)
>  		}
>  		metric_get_session(mdsc);
>  	}
> +
>  	if (m->session) {
>  		ceph_mdsc_send_metrics(mdsc, m->session);
> -		metric_schedule_delayed(m);
> +	} else {
> +		if (__ratelimit(&metrics_no_session_rs))
> +			pr_warn("ceph: metrics worker missing MDS session\n");

Why not pr_warn_ratelimited()?

>  	}
> +
> +	metric_schedule_delayed(m);
>  }
>  
>  int ceph_metric_init(struct ceph_client_metric *m)
> diff --git a/fs/ceph/metric.h b/fs/ceph/metric.h
> index 0d0c44bd3332..7e4aac63f6a6 100644
> --- a/fs/ceph/metric.h
> +++ b/fs/ceph/metric.h
> @@ -25,8 +25,9 @@ enum ceph_metric_type {
>  	CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY,
>  	CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY,
>  	CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY,
> +	CLIENT_METRIC_TYPE_SUBVOLUME_METRICS,
>  
> -	CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY,
> +	CLIENT_METRIC_TYPE_MAX = CLIENT_METRIC_TYPE_SUBVOLUME_METRICS,
>  };
>  
>  /*
> @@ -50,6 +51,7 @@ enum ceph_metric_type {
>  	CLIENT_METRIC_TYPE_STDEV_WRITE_LATENCY,	   \
>  	CLIENT_METRIC_TYPE_AVG_METADATA_LATENCY,   \
>  	CLIENT_METRIC_TYPE_STDEV_METADATA_LATENCY, \
> +	CLIENT_METRIC_TYPE_SUBVOLUME_METRICS,	   \
>  						   \
>  	CLIENT_METRIC_TYPE_MAX,			   \
>  }
> @@ -139,6 +141,29 @@ struct ceph_write_io_size {
>  	__le64 total_size;
>  } __packed;
>  
> +/* Wire format for subvolume metrics - matches C++ AggregatedIOMetrics */
> +struct ceph_subvolume_metric_entry_wire {
> +	__le64 subvolume_id;
> +	__le32 read_ops;
> +	__le32 write_ops;
> +	__le64 read_bytes;
> +	__le64 write_bytes;
> +	__le64 read_latency_us;
> +	__le64 write_latency_us;
> +	__le64 time_stamp;
> +} __packed;

Why do we not provide detailed comments of fields?

> +
> +/* Old struct kept for internal tracking, not used on wire */
> +struct ceph_subvolume_metric_entry {
> +	__le64 subvolume_id;
> +	__le64 read_ops;
> +	__le64 write_ops;
> +	__le64 read_bytes;
> +	__le64 write_bytes;
> +	__le64 read_latency_us;
> +	__le64 write_latency_us;
> +} __packed;

Ditto.

> +
>  struct ceph_metric_head {
>  	__le32 num;	/* the number of metrics that will be sent */
>  } __packed;

Structure with one member?

> diff --git a/fs/ceph/subvolume_metrics.c b/fs/ceph/subvolume_metrics.c
> new file mode 100644
> index 000000000000..7036ff5418ab
> --- /dev/null
> +++ b/fs/ceph/subvolume_metrics.c
> @@ -0,0 +1,408 @@
> +// SPDX-License-Identifier: GPL-2.0
> +#include <linux/ceph/ceph_debug.h>
> +
> +#include <linux/math64.h>
> +#include <linux/slab.h>
> +#include <linux/seq_file.h>
> +
> +#include "subvolume_metrics.h"
> +#include "mds_client.h"
> +#include "super.h"
> +
> +struct ceph_subvol_metric_rb_entry {
> +	struct rb_node node;
> +	u64 subvolume_id;
> +	u64 read_ops;
> +	u64 write_ops;
> +	u64 read_bytes;
> +	u64 write_bytes;
> +	u64 read_latency_us;
> +	u64 write_latency_us;
> +};

What's about comments for fields?

> +
> +void ceph_subvolume_metrics_init(struct ceph_subvolume_metrics_tracker *tracker)
> +{
> +	spin_lock_init(&tracker->lock);
> +	tracker->tree = RB_ROOT_CACHED;
> +	tracker->nr_entries = 0;
> +	tracker->enabled = false;
> +	atomic64_set(&tracker->snapshot_attempts, 0);
> +	atomic64_set(&tracker->snapshot_empty, 0);
> +	atomic64_set(&tracker->snapshot_failures, 0);
> +	atomic64_set(&tracker->record_calls, 0);
> +	atomic64_set(&tracker->record_disabled, 0);
> +	atomic64_set(&tracker->record_no_subvol, 0);
> +	atomic64_set(&tracker->total_read_ops, 0);
> +	atomic64_set(&tracker->total_read_bytes, 0);
> +	atomic64_set(&tracker->total_write_ops, 0);
> +	atomic64_set(&tracker->total_write_bytes, 0);
> +}
> +
> +static struct ceph_subvol_metric_rb_entry *
> +__lookup_entry(struct ceph_subvolume_metrics_tracker *tracker, u64 subvol_id)
> +{
> +	struct rb_node *node;
> +
> +	node = tracker->tree.rb_root.rb_node;
> +	while (node) {
> +		struct ceph_subvol_metric_rb_entry *entry =
> +			rb_entry(node, struct ceph_subvol_metric_rb_entry, node);
> +
> +		if (subvol_id < entry->subvolume_id)
> +			node = node->rb_left;
> +		else if (subvol_id > entry->subvolume_id)
> +			node = node->rb_right;
> +		else
> +			return entry;
> +	}
> +
> +	return NULL;
> +}
> +
> +static struct ceph_subvol_metric_rb_entry *
> +__insert_entry(struct ceph_subvolume_metrics_tracker *tracker,
> +	       struct ceph_subvol_metric_rb_entry *entry)
> +{
> +	struct rb_node **link = &tracker->tree.rb_root.rb_node;
> +	struct rb_node *parent = NULL;
> +	bool leftmost = true;
> +
> +	while (*link) {
> +		struct ceph_subvol_metric_rb_entry *cur =
> +			rb_entry(*link, struct ceph_subvol_metric_rb_entry, node);
> +
> +		parent = *link;
> +		if (entry->subvolume_id < cur->subvolume_id)
> +			link = &(*link)->rb_left;
> +		else if (entry->subvolume_id > cur->subvolume_id) {
> +			link = &(*link)->rb_right;
> +			leftmost = false;
> +		} else
> +			return cur;
> +	}
> +
> +	rb_link_node(&entry->node, parent, link);
> +	rb_insert_color_cached(&entry->node, &tracker->tree, leftmost);
> +	tracker->nr_entries++;
> +	return entry;
> +}
> +
> +static void ceph_subvolume_metrics_clear_locked(
> +		struct ceph_subvolume_metrics_tracker *tracker)
> +{
> +	struct rb_node *node = rb_first_cached(&tracker->tree);
> +
> +	while (node) {
> +		struct ceph_subvol_metric_rb_entry *entry =
> +			rb_entry(node, struct ceph_subvol_metric_rb_entry, node);
> +		struct rb_node *next = rb_next(node);
> +
> +		rb_erase_cached(&entry->node, &tracker->tree);
> +		tracker->nr_entries--;
> +		kfree(entry);
> +		node = next;
> +	}
> +
> +	tracker->tree = RB_ROOT_CACHED;
> +}
> +
> +void ceph_subvolume_metrics_destroy(struct ceph_subvolume_metrics_tracker *tracker)
> +{
> +	spin_lock(&tracker->lock);
> +	ceph_subvolume_metrics_clear_locked(tracker);
> +	tracker->enabled = false;
> +	spin_unlock(&tracker->lock);
> +}
> +
> +void ceph_subvolume_metrics_enable(struct ceph_subvolume_metrics_tracker *tracker,
> +				   bool enable)
> +{
> +	spin_lock(&tracker->lock);
> +	if (enable) {

Probably, we don't need curly braces here.

> +		tracker->enabled = true;
> +	} else {
> +		tracker->enabled = false;
> +		ceph_subvolume_metrics_clear_locked(tracker);
> +	}
> +	spin_unlock(&tracker->lock);
> +}
> +
> +void ceph_subvolume_metrics_record(struct ceph_subvolume_metrics_tracker *tracker,
> +				   u64 subvol_id, bool is_write,
> +				   size_t size, u64 latency_us)
> +{
> +	struct ceph_subvol_metric_rb_entry *entry, *new_entry = NULL;
> +	bool retry = false;
> +
> +	/* 0 means unknown/unset subvolume (matches FUSE client convention) */
> +	if (!READ_ONCE(tracker->enabled) || !subvol_id || !size || !latency_us)
> +		return;
> +
> +	do {
> +		spin_lock(&tracker->lock);
> +		if (!tracker->enabled) {
> +			spin_unlock(&tracker->lock);
> +			kfree(new_entry);
> +			return;
> +		}
> +
> +		entry = __lookup_entry(tracker, subvol_id);
> +		if (!entry) {
> +			if (!new_entry) {
> +				spin_unlock(&tracker->lock);
> +				new_entry = kzalloc(sizeof(*new_entry), GFP_NOFS);

Do we need kmem_cache here?

> +				if (!new_entry)
> +					return;
> +				new_entry->subvolume_id = subvol_id;
> +				retry = true;
> +				continue;
> +			}
> +			entry = __insert_entry(tracker, new_entry);
> +			if (entry != new_entry) {
> +				/* raced with another insert */
> +				spin_unlock(&tracker->lock);
> +				kfree(new_entry);
> +				new_entry = NULL;
> +				retry = true;
> +				continue;
> +			}
> +			new_entry = NULL;
> +		}
> +
> +		if (is_write) {
> +			entry->write_ops++;
> +			entry->write_bytes += size;
> +			entry->write_latency_us += latency_us;
> +			atomic64_inc(&tracker->total_write_ops);
> +			atomic64_add(size, &tracker->total_write_bytes);
> +		} else {
> +			entry->read_ops++;
> +			entry->read_bytes += size;
> +			entry->read_latency_us += latency_us;
> +			atomic64_inc(&tracker->total_read_ops);
> +			atomic64_add(size, &tracker->total_read_bytes);
> +		}
> +		spin_unlock(&tracker->lock);
> +		kfree(new_entry);
> +		return;
> +	} while (retry);
> +}
> +
> +int ceph_subvolume_metrics_snapshot(struct ceph_subvolume_metrics_tracker *tracker,
> +				    struct ceph_subvol_metric_snapshot **out,
> +				    u32 *nr, bool consume)
> +{
> +	struct ceph_subvol_metric_snapshot *snap = NULL;
> +	struct rb_node *node;
> +	u32 count = 0, idx = 0;
> +	int ret = 0;
> +
> +	*out = NULL;
> +	*nr = 0;
> +
> +	if (!READ_ONCE(tracker->enabled))
> +		return 0;
> +
> +	atomic64_inc(&tracker->snapshot_attempts);
> +
> +	spin_lock(&tracker->lock);
> +	for (node = rb_first_cached(&tracker->tree); node; node = rb_next(node)) {
> +		struct ceph_subvol_metric_rb_entry *entry =
> +			rb_entry(node, struct ceph_subvol_metric_rb_entry, node);
> +
> +		/* Include entries with ANY I/O activity (read OR write) */
> +		if (entry->read_ops || entry->write_ops)
> +			count++;
> +	}
> +	spin_unlock(&tracker->lock);
> +
> +	if (!count) {
> +		atomic64_inc(&tracker->snapshot_empty);
> +		return 0;
> +	}
> +
> +	snap = kcalloc(count, sizeof(*snap), GFP_NOFS);
> +	if (!snap) {
> +		atomic64_inc(&tracker->snapshot_failures);
> +		return -ENOMEM;
> +	}
> +
> +	spin_lock(&tracker->lock);
> +	node = rb_first_cached(&tracker->tree);
> +	while (node) {
> +		struct ceph_subvol_metric_rb_entry *entry =
> +			rb_entry(node, struct ceph_subvol_metric_rb_entry, node);
> +		struct rb_node *next = rb_next(node);
> +
> +		/* Skip entries with NO I/O activity at all */
> +		if (!entry->read_ops && !entry->write_ops) {
> +			rb_erase_cached(&entry->node, &tracker->tree);
> +			tracker->nr_entries--;
> +			kfree(entry);
> +			node = next;
> +			continue;
> +		}
> +
> +		if (idx >= count) {
> +			pr_warn("ceph: subvol metrics snapshot race (idx=%u count=%u)\n",
> +				idx, count);
> +			break;
> +		}
> +
> +		snap[idx].subvolume_id = entry->subvolume_id;
> +		snap[idx].read_ops = entry->read_ops;
> +		snap[idx].write_ops = entry->write_ops;
> +		snap[idx].read_bytes = entry->read_bytes;
> +		snap[idx].write_bytes = entry->write_bytes;
> +		snap[idx].read_latency_us = entry->read_latency_us;
> +		snap[idx].write_latency_us = entry->write_latency_us;
> +		idx++;
> +
> +		if (consume) {
> +			entry->read_ops = 0;
> +			entry->write_ops = 0;
> +			entry->read_bytes = 0;
> +			entry->write_bytes = 0;
> +			entry->read_latency_us = 0;
> +			entry->write_latency_us = 0;
> +			rb_erase_cached(&entry->node, &tracker->tree);
> +			tracker->nr_entries--;
> +			kfree(entry);
> +		}
> +		node = next;
> +	}
> +	spin_unlock(&tracker->lock);
> +
> +	if (!idx) {
> +		kfree(snap);
> +		snap = NULL;
> +		ret = 0;
> +	} else {
> +		*nr = idx;
> +		*out = snap;
> +	}
> +
> +	return ret;
> +}
> +
> +void ceph_subvolume_metrics_free_snapshot(struct ceph_subvol_metric_snapshot *snapshot)
> +{
> +	kfree(snapshot);
> +}
> +
> +static u64 div_rem(u64 dividend, u64 divisor)

Do we really need to introduce such function?

> +{
> +	return divisor ? div64_u64(dividend, divisor) : 0;
> +}
> +
> +void ceph_subvolume_metrics_dump(struct ceph_subvolume_metrics_tracker *tracker,
> +				 struct seq_file *s)
> +{
> +	struct rb_node *node;
> +	struct ceph_subvol_metric_snapshot *snapshot = NULL;
> +	u32 count = 0, idx = 0;
> +
> +	spin_lock(&tracker->lock);
> +	if (!tracker->enabled) {
> +		spin_unlock(&tracker->lock);
> +		seq_puts(s, "subvolume metrics disabled\n");
> +		return;
> +	}
> +
> +	for (node = rb_first_cached(&tracker->tree); node; node = rb_next(node)) {
> +		struct ceph_subvol_metric_rb_entry *entry =
> +			rb_entry(node, struct ceph_subvol_metric_rb_entry, node);
> +
> +		if (entry->read_ops || entry->write_ops)
> +			count++;
> +	}
> +	spin_unlock(&tracker->lock);
> +
> +	if (!count) {
> +		seq_puts(s, "(no subvolume metrics collected)\n");
> +		return;
> +	}
> +

Maybe, it make sense to check the count value before trying to allocate memory?

> +	snapshot = kcalloc(count, sizeof(*snapshot), GFP_KERNEL);
> +	if (!snapshot) {
> +		seq_puts(s, "(unable to allocate memory for snapshot)\n");
> +		return;

Why do we not return error code, finally?

> +	}
> +
> +	spin_lock(&tracker->lock);
> +	for (node = rb_first_cached(&tracker->tree); node; node = rb_next(node)) {
> +		struct ceph_subvol_metric_rb_entry *entry =
> +			rb_entry(node, struct ceph_subvol_metric_rb_entry, node);
> +
> +		if (!entry->read_ops && !entry->write_ops)
> +			continue;
> +
> +		if (idx >= count)
> +			break;
> +
> +		snapshot[idx].subvolume_id = entry->subvolume_id;
> +		snapshot[idx].read_ops = entry->read_ops;
> +		snapshot[idx].write_ops = entry->write_ops;
> +		snapshot[idx].read_bytes = entry->read_bytes;
> +		snapshot[idx].write_bytes = entry->write_bytes;
> +		snapshot[idx].read_latency_us = entry->read_latency_us;
> +		snapshot[idx].write_latency_us = entry->write_latency_us;
> +		idx++;
> +	}
> +	spin_unlock(&tracker->lock);
> +
> +	seq_puts(s, "subvol_id       rd_ops    rd_bytes    rd_avg_lat_us  wr_ops    wr_bytes    wr_avg_lat_us\n");
> +	seq_puts(s, "------------------------------------------------------------------------------------------------\n");
> +
> +	for (idx = 0; idx < count; idx++) {
> +		u64 avg_rd_lat = div_rem(snapshot[idx].read_latency_us,
> +					 snapshot[idx].read_ops);
> +		u64 avg_wr_lat = div_rem(snapshot[idx].write_latency_us,
> +					 snapshot[idx].write_ops);
> +
> +		seq_printf(s, "%-15llu%-10llu%-12llu%-16llu%-10llu%-12llu%-16llu\n",
> +			   snapshot[idx].subvolume_id,
> +			   snapshot[idx].read_ops,
> +			   snapshot[idx].read_bytes,
> +			   avg_rd_lat,
> +			   snapshot[idx].write_ops,
> +			   snapshot[idx].write_bytes,
> +			   avg_wr_lat);
> +	}
> +
> +	kfree(snapshot);
> +}
> +
> +void ceph_subvolume_metrics_record_io(struct ceph_mds_client *mdsc,
> +				      struct ceph_inode_info *ci,
> +				      bool is_write, size_t bytes,
> +				      ktime_t start, ktime_t end)
> +{
> +	struct ceph_subvolume_metrics_tracker *tracker;
> +	u64 subvol_id;
> +	s64 delta_us;
> +
> +	if (!mdsc || !ci || !bytes)
> +		return;
> +
> +	tracker = &mdsc->subvol_metrics;
> +	atomic64_inc(&tracker->record_calls);
> +
> +	if (!ceph_subvolume_metrics_enabled(tracker)) {
> +		atomic64_inc(&tracker->record_disabled);
> +		return;
> +	}
> +
> +	subvol_id = READ_ONCE(ci->i_subvolume_id);
> +	if (!subvol_id) {
> +		atomic64_inc(&tracker->record_no_subvol);
> +		return;
> +	}
> +
> +	delta_us = ktime_to_us(ktime_sub(end, start));
> +	if (delta_us <= 0)
> +		delta_us = 1;
> +
> +	ceph_subvolume_metrics_record(tracker, subvol_id, is_write,
> +				      bytes, (u64)delta_us);
> +}
> diff --git a/fs/ceph/subvolume_metrics.h b/fs/ceph/subvolume_metrics.h
> new file mode 100644
> index 000000000000..872867c75c41
> --- /dev/null
> +++ b/fs/ceph/subvolume_metrics.h
> @@ -0,0 +1,68 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +#ifndef _FS_CEPH_SUBVOLUME_METRICS_H
> +#define _FS_CEPH_SUBVOLUME_METRICS_H
> +
> +#include <linux/types.h>
> +#include <linux/rbtree.h>
> +#include <linux/spinlock.h>
> +#include <linux/ktime.h>
> +#include <linux/atomic.h>
> +
> +struct seq_file;
> +struct ceph_mds_client;
> +struct ceph_inode_info;
> +
> +struct ceph_subvol_metric_snapshot {
> +	u64 subvolume_id;
> +	u64 read_ops;
> +	u64 write_ops;
> +	u64 read_bytes;
> +	u64 write_bytes;
> +	u64 read_latency_us;
> +	u64 write_latency_us;
> +};

Why do we not provides comments for the fields?

> +
> +struct ceph_subvolume_metrics_tracker {
> +	spinlock_t lock;
> +	struct rb_root_cached tree;
> +	u32 nr_entries;
> +	bool enabled;
> +	atomic64_t snapshot_attempts;
> +	atomic64_t snapshot_empty;
> +	atomic64_t snapshot_failures;
> +	atomic64_t record_calls;
> +	atomic64_t record_disabled;
> +	atomic64_t record_no_subvol;
> +	/* Cumulative counters (survive snapshots) */
> +	atomic64_t total_read_ops;
> +	atomic64_t total_read_bytes;
> +	atomic64_t total_write_ops;
> +	atomic64_t total_write_bytes;
> +};

Ditto.

Thanks,
Slava.

> +
> +void ceph_subvolume_metrics_init(struct ceph_subvolume_metrics_tracker *tracker);
> +void ceph_subvolume_metrics_destroy(struct ceph_subvolume_metrics_tracker *tracker);
> +void ceph_subvolume_metrics_enable(struct ceph_subvolume_metrics_tracker *tracker,
> +				   bool enable);
> +void ceph_subvolume_metrics_record(struct ceph_subvolume_metrics_tracker *tracker,
> +				   u64 subvol_id, bool is_write,
> +				   size_t size, u64 latency_us);
> +int ceph_subvolume_metrics_snapshot(struct ceph_subvolume_metrics_tracker *tracker,
> +				    struct ceph_subvol_metric_snapshot **out,
> +				    u32 *nr, bool consume);
> +void ceph_subvolume_metrics_free_snapshot(struct ceph_subvol_metric_snapshot *snapshot);
> +void ceph_subvolume_metrics_dump(struct ceph_subvolume_metrics_tracker *tracker,
> +				 struct seq_file *s);
> +
> +void ceph_subvolume_metrics_record_io(struct ceph_mds_client *mdsc,
> +				      struct ceph_inode_info *ci,
> +				      bool is_write, size_t bytes,
> +				      ktime_t start, ktime_t end);
> +
> +static inline bool ceph_subvolume_metrics_enabled(
> +		const struct ceph_subvolume_metrics_tracker *tracker)
> +{
> +	return READ_ONCE(tracker->enabled);
> +}
> +
> +#endif /* _FS_CEPH_SUBVOLUME_METRICS_H */
> diff --git a/fs/ceph/super.c b/fs/ceph/super.c
> index f6bf24b5c683..528452aa8beb 100644
> --- a/fs/ceph/super.c
> +++ b/fs/ceph/super.c
> @@ -21,6 +21,7 @@
>  #include "mds_client.h"
>  #include "cache.h"
>  #include "crypto.h"
> +#include "subvolume_metrics.h"
>  
>  #include <linux/ceph/ceph_features.h>
>  #include <linux/ceph/decode.h>
> diff --git a/fs/ceph/super.h b/fs/ceph/super.h
> index c0372a725960..a03c373efd52 100644
> --- a/fs/ceph/super.h
> +++ b/fs/ceph/super.h
> @@ -167,6 +167,7 @@ struct ceph_fs_client {
>  	struct dentry *debugfs_status;
>  	struct dentry *debugfs_mds_sessions;
>  	struct dentry *debugfs_metrics_dir;
> +	struct dentry *debugfs_subvolume_metrics;
>  #endif
>  
>  #ifdef CONFIG_CEPH_FSCACHE

  reply	other threads:[~2025-12-02 22:54 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2025-12-02 15:57 [PATCH v2 0/3] ceph: add subvolume metrics reporting support Alex Markuze
2025-12-02 15:57 ` [PATCH v2 1/3] ceph: handle InodeStat v8 versioned field in reply parsing Alex Markuze
2025-12-02 20:44   ` Viacheslav Dubeyko
2025-12-02 15:57 ` [PATCH v2 2/3] ceph: parse subvolume_id from InodeStat v9 and store in inode Alex Markuze
2025-12-02 20:50   ` Viacheslav Dubeyko
2025-12-03 15:48     ` Alex Markuze
2025-12-02 15:57 ` [PATCH v2 3/3] ceph: add subvolume metrics collection and reporting Alex Markuze
2025-12-02 22:54   ` Viacheslav Dubeyko [this message]
2025-12-03 15:57     ` Alex Markuze

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=0011d9cc8b461616324a70211cc9e1b3b1ea5d0e.camel@ibm.com \
    --to=slava.dubeyko@ibm.com \
    --cc=amarkuze@redhat.com \
    --cc=ceph-devel@vger.kernel.org \
    --cc=idryomov@gmail.com \
    --cc=linux-fsdevel@vger.kernel.org \
    --cc=vdubeyko@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).