From: Vincent Donnefort <vdonnefort@google.com>
To: Steven Rostedt <rostedt@goodmis.org>
Cc: mhiramat@kernel.org, linux-kernel@vger.kernel.org,
linux-trace-kernel@vger.kernel.org,
mathieu.desnoyers@efficios.com, kernel-team@android.com
Subject: Re: [PATCH v16 2/6] ring-buffer: Introducing ring-buffer mapping functions
Date: Mon, 12 Feb 2024 10:44:26 +0000 [thread overview]
Message-ID: <Zcn2iotAnIWT4F_O@google.com> (raw)
In-Reply-To: <20240211171837.6a4ea1a5@rorschach.local.home>
[...]
> > +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> > +{
> > + struct trace_buffer_meta *meta = cpu_buffer->meta_page;
> > +
> > + meta->reader.read = cpu_buffer->reader_page->read;
> > + meta->reader.id = cpu_buffer->reader_page->id;
> > + meta->reader.lost_events = cpu_buffer->lost_events;
> > +
> > + meta->entries = local_read(&cpu_buffer->entries);
> > + meta->overrun = local_read(&cpu_buffer->overrun);
> > + meta->read = cpu_buffer->read;
> > +
> > + /* Some archs do not have data cache coherency between kernel and user-space */
> > + flush_dcache_folio(virt_to_folio(cpu_buffer->meta_page));
> > +}
> > +
> > static void
> > rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
> > {
> > @@ -5204,6 +5227,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
> > cpu_buffer->lost_events = 0;
> > cpu_buffer->last_overrun = 0;
> >
> > + if (READ_ONCE(cpu_buffer->mapped))
>
> Isn't the buffer_mutex held when we modify mapped? I believe it's held
> here. I don't think we need a READ_ONCE() here. Is there a reason for it?
>
> Hmm, looking down, it looks like you take the buffer->mutex after
> setting mapped, is that necessary? If we take the buffer->mutex we can
> sync the reset with mapping.
The idea was to not take any of the buffer mutex, reader lock if the refcount is
simply inc/dec. Locks are only used if the meta-page is
installed/uninstalled.
The WRITE_ONCE/READ_ONCE is there only to make sure no compiler optimisation
could lead a reader to wrongly interpret that refcount while it is inc/dec. That
is probably not necessary and I'm happy to either drop it completely or replace
it by taking buffer mutex and reader lock whenever the refcount is inc/dec.
>
> > + rb_update_meta_page(cpu_buffer);
> > +
> > rb_head_page_activate(cpu_buffer);
> > cpu_buffer->pages_removed = 0;
> > }
> > @@ -5418,6 +5444,12 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
> > cpu_buffer_a = buffer_a->buffers[cpu];
> > cpu_buffer_b = buffer_b->buffers[cpu];
> >
> > + /* It's up to the callers to not try to swap mapped buffers */
> > + if (WARN_ON_ONCE(cpu_buffer_a->mapped || cpu_buffer_b->mapped)) {
> > + ret = -EBUSY;
> > + goto out;
> > + }
> > +
> > /* At least make sure the two buffers are somewhat the same */
> > if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
> > goto out;
> > @@ -5682,7 +5714,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
> > * Otherwise, we can simply swap the page with the one passed in.
> > */
> > if (read || (len < (commit - read)) ||
> > - cpu_buffer->reader_page == cpu_buffer->commit_page) {
> > + cpu_buffer->reader_page == cpu_buffer->commit_page ||
> > + READ_ONCE(cpu_buffer->mapped)) {
>
> Here the buffer_mutex isn't held, but still, what's the purpose of the READ_ONCE?
>
> I wonder if we just grab the reader_lock when setting the mapped
> variable if that would be better than using READ/WRITE_ONCE, which I'm
> not sure is helpful.
ditto above.
>
>
> > struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
> > unsigned int rpos = read;
> > unsigned int pos = 0;
> > @@ -5901,6 +5934,11 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
> >
> > cpu_buffer = buffer->buffers[cpu];
> >
> > + if (READ_ONCE(cpu_buffer->mapped)) {
> > + err = -EBUSY;
> > + goto error;
> > + }
> > +
> > /* Update the number of pages to match the new size */
> > nr_pages = old_size * buffer->buffers[cpu]->nr_pages;
> > nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size);
> > @@ -6002,6 +6040,304 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
> > }
> > EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set);
> >
> > +#define subbuf_page(off, start) \
> > + virt_to_page((void *)(start + (off << PAGE_SHIFT)))
> > +
> > +#define foreach_subbuf_page(sub_order, start, page) \
> > + page = subbuf_page(0, (start)); \
> > + for (int __off = 0; __off < (1 << (sub_order)); \
> > + __off++, page = subbuf_page(__off, (start)))
> > +
> > +static inline void subbuf_map_prepare(unsigned long subbuf_start, int order)
> > +{
> > + struct page *page;
> > +
> > + /*
> > + * When allocating order > 0 pages, only the first struct page has a
> > + * refcount > 1. Increasing the refcount here ensures none of the struct
> > + * page composing the sub-buffer is freeed when the mapping is closed.
> > + */
> > + foreach_subbuf_page(order, subbuf_start, page)
> > + page_ref_inc(page);
> > +}
> > +
> > +static inline void subbuf_unmap(unsigned long subbuf_start, int order)
> > +{
> > + struct page *page;
> > +
> > + foreach_subbuf_page(order, subbuf_start, page) {
> > + page_ref_dec(page);
> > + page->mapping = NULL;
> > + }
> > +}
> > +
> > +static void rb_free_subbuf_ids(struct ring_buffer_per_cpu *cpu_buffer)
> > +{
> > + int sub_id;
> > +
> > + for (sub_id = 0; sub_id < cpu_buffer->nr_pages + 1; sub_id++)
> > + subbuf_unmap(cpu_buffer->subbuf_ids[sub_id],
> > + cpu_buffer->buffer->subbuf_order);
> > +
> > + kfree(cpu_buffer->subbuf_ids);
> > + cpu_buffer->subbuf_ids = NULL;
> > +}
> > +
> > +static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> > +{
> > + if (cpu_buffer->meta_page)
> > + return 0;
> > +
> > + cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER | __GFP_ZERO));
>
> The above is the main reason I'm looking for a v17. If that
> alloc_page() fails, what exactly is going to be returned here?
>
> #define page_to_virt(x) __va(PFN_PHYS(page_to_pfn(x)))
> #define __va(x) ((void *)((unsigned long)(x)+PAGE_OFFSET))
>
> I don't think that will be the result you expect. We need to do the
> alloc_page(), test the result of that, and then call page_to_virt() on a
> page that is not NULL.
Ouch, you're right!
>
> > + if (!cpu_buffer->meta_page)
> > + return -ENOMEM;
> > +
> > + return 0;
> > +}
> > +
> > +static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> > +{
> > + unsigned long addr = (unsigned long)cpu_buffer->meta_page;
> > +
>
> I'm thinking for extra safety we could add:
>
> if (!addr)
> return;
>
> So this function could be called twice without issue.
Ack.
>
> > + virt_to_page((void *)addr)->mapping = NULL;
> > + free_page(addr);
> > + cpu_buffer->meta_page = NULL;
> > +}
> > +
> > +static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
> > + unsigned long *subbuf_ids)
> > +{
> > + struct trace_buffer_meta *meta = cpu_buffer->meta_page;
> > + unsigned int nr_subbufs = cpu_buffer->nr_pages + 1;
> > + struct buffer_page *first_subbuf, *subbuf;
> > + int id = 0;
> > +
> > + subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
> > + subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order);
> > + cpu_buffer->reader_page->id = id++;
> > +
> > + first_subbuf = subbuf = rb_set_head_page(cpu_buffer);
> > + do {
> > + if (id >= nr_subbufs) {
> > + WARN_ON(1);
> > + break;
> > + }
>
> if (WARN_ON(id >= nr_subbufs))
> break;
>
> > +
> > + subbuf_ids[id] = (unsigned long)subbuf->page;
> > + subbuf->id = id;
> > + subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order);
> > +
> > + rb_inc_page(&subbuf);
> > + id++;
> > + } while (subbuf != first_subbuf);
> > +
> > + /* install subbuf ID to kern VA translation */
> > + cpu_buffer->subbuf_ids = subbuf_ids;
> > +
> > + meta->meta_page_size = PAGE_SIZE;
> > + meta->meta_struct_len = sizeof(*meta);
> > + meta->nr_subbufs = nr_subbufs;
> > + meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE;
> > +
> > + rb_update_meta_page(cpu_buffer);
> > +}
> > +
> > +static inline struct ring_buffer_per_cpu *
> > +rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
> > +{
> > + struct ring_buffer_per_cpu *cpu_buffer;
> > +
> > + if (!cpumask_test_cpu(cpu, buffer->cpumask))
> > + return ERR_PTR(-EINVAL);
> > +
> > + cpu_buffer = buffer->buffers[cpu];
> > +
> > + mutex_lock(&cpu_buffer->mapping_lock);
> > +
> > + if (!cpu_buffer->mapped) {
> > + mutex_unlock(&cpu_buffer->mapping_lock);
> > + return ERR_PTR(-ENODEV);
> > + }
> > +
> > + return cpu_buffer;
> > +}
> > +
> > +static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
> > +{
> > + mutex_unlock(&cpu_buffer->mapping_lock);
> > +}
> > +
> > +int ring_buffer_map(struct trace_buffer *buffer, int cpu)
> > +{
> > + struct ring_buffer_per_cpu *cpu_buffer;
> > + unsigned long flags, *subbuf_ids;
> > + int err = 0;
> > +
> > + if (!cpumask_test_cpu(cpu, buffer->cpumask))
> > + return -EINVAL;
> > +
> > + cpu_buffer = buffer->buffers[cpu];
> > +
> > + mutex_lock(&cpu_buffer->mapping_lock);
>
> Is there a reason not to take the buffer->mutex before setting mapped?
>
> > +
> > + if (cpu_buffer->mapped) {
> > + if (cpu_buffer->mapped == UINT_MAX)
> > + err = -EBUSY;
> > + else
> > + WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1);
>
> As mentioned before, it may be better to take the reader_lock when
> setting mapped and that should add the protection we want with mapped
> set and other readers.
>
> > + mutex_unlock(&cpu_buffer->mapping_lock);
> > + return err;
> > + }
> > +
> > + /* prevent another thread from changing buffer/sub-buffer sizes */
> > + mutex_lock(&buffer->mutex);
> > +
> > + err = rb_alloc_meta_page(cpu_buffer);
> > + if (err)
> > + goto unlock;
> > +
> > + /* subbuf_ids include the reader while nr_pages does not */
> > + subbuf_ids = kzalloc(sizeof(*subbuf_ids) * (cpu_buffer->nr_pages + 1),
> > + GFP_KERNEL);
>
> Instead use:
>
> subbuf_ids = kcalloc(cpu_buffer->nr_pages + 1, sizeof(*subbuf_ids), GFP_KERNEL);
>
> > + if (!subbuf_ids) {
> > + rb_free_meta_page(cpu_buffer);
> > + err = -ENOMEM;
> > + goto unlock;
> > + }
> > +
> > + atomic_inc(&cpu_buffer->resize_disabled);
> > +
> > + /*
> > + * Lock all readers to block any subbuf swap until the subbuf IDs are
> > + * assigned.
> > + */
> > + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> > +
> > + rb_setup_ids_meta_page(cpu_buffer, subbuf_ids);
> > + cpu_buffer->mapped = 1;
> > +
> > + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> > +unlock:
> > + mutex_unlock(&buffer->mutex);
> > + mutex_unlock(&cpu_buffer->mapping_lock);
> > +
> > + return err;
> > +}
> > +
> > +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
> > +{
> > + struct ring_buffer_per_cpu *cpu_buffer;
> > + unsigned long flags;
> > + int err = 0;
> > +
> > + if (!cpumask_test_cpu(cpu, buffer->cpumask))
> > + return -EINVAL;
> > +
> > + cpu_buffer = buffer->buffers[cpu];
> > +
> > + mutex_lock(&cpu_buffer->mapping_lock);
> > +
> > + if (!cpu_buffer->mapped) {
> > + err = -ENODEV;
> > + goto out;
> > + } else if (cpu_buffer->mapped > 1) {
> > + WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1);
> > + goto out;
> > + }
> > +
> > + mutex_lock(&buffer->mutex);
> > + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> > +
> > + cpu_buffer->mapped = 0;
> > +
> > + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> > +
> > + rb_free_subbuf_ids(cpu_buffer);
> > + rb_free_meta_page(cpu_buffer);
> > + atomic_dec(&cpu_buffer->resize_disabled);
> > +
> > + mutex_unlock(&buffer->mutex);
> > +out:
> > + mutex_unlock(&cpu_buffer->mapping_lock);
> > +
> > + return err;
> > +}
> > +
> > +/*
> > + * +--------------+ pgoff == 0
> > + * | meta page |
> > + * +--------------+ pgoff == 1
> > + * | subbuffer 0 |
> > + * +--------------+ pgoff == 1 + (1 << subbuf_order)
> > + * | subbuffer 1 |
> > + * ...
> > + */
> > +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
> > + unsigned long pgoff)
> > +{
> > + struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
> > + unsigned long subbuf_id, subbuf_offset, addr;
> > + struct page *page;
> > +
> > + if (!pgoff)
> > + return virt_to_page((void *)cpu_buffer->meta_page);
> > +
> > + pgoff--;
> > +
> > + subbuf_id = pgoff >> buffer->subbuf_order;
> > + if (subbuf_id > cpu_buffer->nr_pages)
> > + return NULL;
> > +
> > + subbuf_offset = pgoff & ((1UL << buffer->subbuf_order) - 1);
> > + addr = cpu_buffer->subbuf_ids[subbuf_id] + (subbuf_offset * PAGE_SIZE);
> > + page = virt_to_page((void *)addr);
> > +
> > + return page;
> > +}
> > +
> > +int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu)
> > +{
> > + struct ring_buffer_per_cpu *cpu_buffer;
> > + unsigned long reader_size;
> > + unsigned long flags;
> > +
> > + cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
> > + if (IS_ERR(cpu_buffer))
> > + return (int)PTR_ERR(cpu_buffer);
> > +
> > + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> > +consume:
> > + if (rb_per_cpu_empty(cpu_buffer))
> > + goto out;
> > +
> > + reader_size = rb_page_size(cpu_buffer->reader_page);
> > +
> > + /*
> > + * There are data to be read on the current reader page, we can
> > + * return to the caller. But before that, we assume the latter will read
> > + * everything. Let's update the kernel reader accordingly.
> > + */
> > + if (cpu_buffer->reader_page->read < reader_size) {
> > + while (cpu_buffer->reader_page->read < reader_size)
> > + rb_advance_reader(cpu_buffer);
> > + goto out;
> > + }
> > +
> > + if (WARN_ON(!rb_get_reader_page(cpu_buffer)))
> > + goto out;
> > +
> > + goto consume;
> > +out:
> > + rb_update_meta_page(cpu_buffer);
> > + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> > + rb_put_mapped_buffer(cpu_buffer);
> > +
>
> Hmm, there's no protection here. If this task gets preempted for a long
> time, it is possible that the cpu_buffer->reader_page can become NULL,
> causing a NULL kernel dereference.
Right, this should be behind the reader lock as well :-\
>
> -- Steve
>
>
> > + /* Some archs do not have data cache coherency between kernel and user-space */
> > + flush_dcache_folio(virt_to_folio(cpu_buffer->reader_page->page));
> > +
> > + return 0;
> > +}
> > +
> > /*
> > * We only allocate new buffers, never free them if the CPU goes down.
> > * If we were to free the buffer, then the user would lose any trace that was in
>
next prev parent reply other threads:[~2024-02-12 10:44 UTC|newest]
Thread overview: 12+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-02-09 16:34 [PATCH v16 0/6] Introducing trace buffer mapping by user-space Vincent Donnefort
2024-02-09 16:34 ` [PATCH v16 1/6] ring-buffer: Zero ring-buffer sub-buffers Vincent Donnefort
2024-02-09 16:34 ` [PATCH v16 2/6] ring-buffer: Introducing ring-buffer mapping functions Vincent Donnefort
2024-02-11 22:18 ` Steven Rostedt
2024-02-12 10:44 ` Vincent Donnefort [this message]
2024-02-12 21:47 ` Steven Rostedt
2024-02-09 16:34 ` [PATCH v16 3/6] tracing: Add snapshot refcount Vincent Donnefort
2024-02-09 16:34 ` [PATCH v16 4/6] tracing: Allow user-space mapping of the ring-buffer Vincent Donnefort
2024-02-11 22:31 ` Steven Rostedt
2024-02-09 16:34 ` [PATCH v16 5/6] Documentation: tracing: Add ring-buffer mapping Vincent Donnefort
2024-02-11 22:38 ` Steven Rostedt
2024-02-09 16:34 ` [PATCH v16 6/6] ring-buffer/selftest: Add ring-buffer mapping test Vincent Donnefort
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=Zcn2iotAnIWT4F_O@google.com \
--to=vdonnefort@google.com \
--cc=kernel-team@android.com \
--cc=linux-kernel@vger.kernel.org \
--cc=linux-trace-kernel@vger.kernel.org \
--cc=mathieu.desnoyers@efficios.com \
--cc=mhiramat@kernel.org \
--cc=rostedt@goodmis.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.