* [PATCH 2/3] tracing: block-able ring_buffer consumer
@ 2009-08-27 3:03 Lai Jiangshan
2009-08-29 10:21 ` Frederic Weisbecker
2009-09-09 21:06 ` Steven Rostedt
0 siblings, 2 replies; 8+ messages in thread
From: Lai Jiangshan @ 2009-08-27 3:03 UTC (permalink / raw)
To: Ingo Molnar, Steven Rostedt, Frederic Weisbecker, LKML
makes consumer side(per_cpu/cpu#/trace_pipe_raw) block-able,
which is a TODO in trace.c
Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com>
---
diff --git a/include/linux/ftrace.h b/include/linux/ftrace.h
index dc3b132..b5dcf34 100644
--- a/include/linux/ftrace.h
+++ b/include/linux/ftrace.h
@@ -512,4 +512,10 @@ static inline void trace_hw_branch_oops(void) {}
#endif /* CONFIG_HW_BRANCH_TRACER */
+#ifdef CONFIG_TRACING
+void tracing_notify(void);
+#else
+static inline void tracing_notify(void) {}
+#endif
+
#endif /* _LINUX_FTRACE_H */
diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
index 7fca716..b81ceed 100644
--- a/include/linux/ring_buffer.h
+++ b/include/linux/ring_buffer.h
@@ -185,6 +185,10 @@ void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data);
int ring_buffer_read_page(struct ring_buffer *buffer, void **data_page,
size_t len, int cpu, int full);
+void ring_buffer_notify(struct ring_buffer *buffer);
+signed long ring_buffer_wait_page(struct ring_buffer *buffer, int cpu,
+ signed long timeout);
+
struct trace_seq;
int ring_buffer_print_entry_header(struct trace_seq *s);
diff --git a/kernel/timer.c b/kernel/timer.c
index 6e712df..79f5596 100644
--- a/kernel/timer.c
+++ b/kernel/timer.c
@@ -39,6 +39,7 @@
#include <linux/kallsyms.h>
#include <linux/perf_counter.h>
#include <linux/sched.h>
+#include <linux/ftrace.h>
#include <asm/uaccess.h>
#include <asm/unistd.h>
@@ -1178,6 +1179,7 @@ void update_process_times(int user_tick)
printk_tick();
scheduler_tick();
run_posix_cpu_timers(p);
+ tracing_notify();
}
/*
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index f1e1533..db82b38 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -443,6 +443,7 @@ struct ring_buffer_per_cpu {
u64 write_stamp;
u64 read_stamp;
atomic_t record_disabled;
+ wait_queue_head_t sleepers;
};
struct ring_buffer {
@@ -999,6 +999,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
spin_lock_init(&cpu_buffer->reader_lock);
lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
+ init_waitqueue_head(&cpu_buffer->sleepers);
bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
GFP_KERNEL, cpu_to_node(cpu));
@@ -3318,6 +3319,77 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
EXPORT_SYMBOL_GPL(ring_buffer_read);
/**
+ * ring_buffer_notify - notify the sleepers when there is any available page
+ * @buffer: The ring buffer.
+ */
+void ring_buffer_notify(struct ring_buffer *buffer)
+{
+ unsigned long flags;
+ struct ring_buffer_per_cpu *cpu_buffer;
+
+ cpu_buffer = buffer->buffers[smp_processor_id()];
+
+ if (!spin_trylock_irqsave(&cpu_buffer->reader_lock, flags))
+ return;
+
+ if (waitqueue_active(&cpu_buffer->sleepers)) {
+ struct buffer_page *reader_page;
+ struct buffer_page *commit_page;
+
+ reader_page = cpu_buffer->reader_page;
+ commit_page = ACCESS_ONCE(cpu_buffer->commit_page);
+
+ /*
+ * ring_buffer_notify() is fast path, so we don't use the slow
+ * rb_get_reader_page(cpu_buffer, 1) to detect available pages.
+ */
+ if (reader_page == commit_page)
+ goto out;
+
+ if (reader_page->read < rb_page_commit(reader_page)
+ || rb_set_head_page(cpu_buffer) != commit_page)
+ wake_up(&cpu_buffer->sleepers);
+ }
+
+out:
+ spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+}
+
+static
+int rb_page_available(struct ring_buffer_per_cpu *cpu_buffer)
+{
+ struct buffer_page *reader_page;
+
+ spin_lock_irq(&cpu_buffer->reader_lock);
+ reader_page = rb_get_reader_page(cpu_buffer, 1);
+ spin_unlock_irq(&cpu_buffer->reader_lock);
+
+ return !!reader_page;
+}
+
+/**
+ * ring_buffer_wait_page - wait until there are available pages to read
+ * @buffer: The ring buffer.
+ * @cpu: The CPU buffer to be wait
+ * @timeout: timeout value in jiffies
+ *
+ * Make the current task sleep until there are available pages to read or
+ * until @timeout jiffies have elapsed or until it is interrupted by signals.
+ *
+ * The function returns 0 if the @timeout elapsed, -ERESTARTSYS if it
+ * was interrupted by a signal, and the remaining jiffies otherwise
+ * if there are available pages to read before the timeout elapsed.
+ */
+signed long
+ring_buffer_wait_page(struct ring_buffer *buffer, int cpu, signed long timeout)
+{
+ struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
+
+ return wait_event_interruptible_timeout(cpu_buffer->sleepers,
+ rb_page_available(cpu_buffer), timeout);
+}
+
+/**
* ring_buffer_size - return the size of the ring buffer (in bytes)
* @buffer: The ring buffer.
*/
diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
index b7d873b..ee435ed 100644
--- a/kernel/trace/trace.c
+++ b/kernel/trace/trace.c
@@ -2457,6 +2457,12 @@ int tracing_update_buffers(void)
return ret;
}
+void tracing_notify(void)
+{
+ if (global_trace.buffer)
+ ring_buffer_notify(global_trace.buffer);
+}
+
struct trace_option_dentry;
static struct trace_option_dentry *
@@ -3232,12 +3237,30 @@ tracing_buffers_read(struct file *filp, char __user *ubuf,
info->read = 0;
+ /*
+ * We try our best to read from full page,
+ * but we wait 2 seconds at most.
+ */
+ if (count >= PAGE_SIZE && !(filp->f_flags & O_NONBLOCK))
+ ring_buffer_wait_page(info->tr->buffer, info->cpu, HZ * 2);
+
+again:
ret = ring_buffer_read_page(info->tr->buffer,
&info->spare,
count,
info->cpu, 0);
- if (ret < 0)
- return 0;
+
+ if (ret < 0) {
+ ret = 0;
+ if (!(filp->f_flags & O_NONBLOCK)) {
+ ret = ring_buffer_wait_page(info->tr->buffer,
+ info->cpu, MAX_SCHEDULE_TIMEOUT);
+ if (ret > 0)
+ goto again;
+ }
+
+ return ret;
+ }
pos = ring_buffer_page_len(info->spare);
@@ -3363,6 +3386,7 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
len &= PAGE_MASK;
}
+again:
entries = ring_buffer_entries_cpu(info->tr->buffer, info->cpu);
for (i = 0; i < PIPE_BUFFERS && len && entries; i++, len -= PAGE_SIZE) {
@@ -3416,9 +3440,13 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
if (!spd.nr_pages) {
if (flags & SPLICE_F_NONBLOCK)
ret = -EAGAIN;
- else
- ret = 0;
- /* TODO: block */
+ else {
+ ret = ring_buffer_wait_page(info->tr->buffer,
+ info->cpu, MAX_SCHEDULE_TIMEOUT);
+ if ((ssize_t)ret > 0)
+ goto again;
+ }
+
return ret;
}
^ permalink raw reply related [flat|nested] 8+ messages in thread
* Re: [PATCH 2/3] tracing: block-able ring_buffer consumer
2009-08-27 3:03 [PATCH 2/3] tracing: block-able ring_buffer consumer Lai Jiangshan
@ 2009-08-29 10:21 ` Frederic Weisbecker
2009-09-01 12:53 ` Lai Jiangshan
2009-09-09 21:10 ` Steven Rostedt
2009-09-09 21:06 ` Steven Rostedt
1 sibling, 2 replies; 8+ messages in thread
From: Frederic Weisbecker @ 2009-08-29 10:21 UTC (permalink / raw)
To: Lai Jiangshan; +Cc: Ingo Molnar, Steven Rostedt, LKML
On Thu, Aug 27, 2009 at 11:03:04AM +0800, Lai Jiangshan wrote:
>
> makes consumer side(per_cpu/cpu#/trace_pipe_raw) block-able,
> which is a TODO in trace.c
>
> Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com>
> ---
> diff --git a/include/linux/ftrace.h b/include/linux/ftrace.h
> index dc3b132..b5dcf34 100644
> --- a/include/linux/ftrace.h
> +++ b/include/linux/ftrace.h
> @@ -512,4 +512,10 @@ static inline void trace_hw_branch_oops(void) {}
>
> #endif /* CONFIG_HW_BRANCH_TRACER */
>
> +#ifdef CONFIG_TRACING
> +void tracing_notify(void);
> +#else
> +static inline void tracing_notify(void) {}
> +#endif
> +
> #endif /* _LINUX_FTRACE_H */
> diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
> index 7fca716..b81ceed 100644
> --- a/include/linux/ring_buffer.h
> +++ b/include/linux/ring_buffer.h
> @@ -185,6 +185,10 @@ void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data);
> int ring_buffer_read_page(struct ring_buffer *buffer, void **data_page,
> size_t len, int cpu, int full);
>
> +void ring_buffer_notify(struct ring_buffer *buffer);
> +signed long ring_buffer_wait_page(struct ring_buffer *buffer, int cpu,
No need to use signed, it's implicit in the long type.
> + signed long timeout);
> +
> struct trace_seq;
>
> int ring_buffer_print_entry_header(struct trace_seq *s);
> diff --git a/kernel/timer.c b/kernel/timer.c
> index 6e712df..79f5596 100644
> --- a/kernel/timer.c
> +++ b/kernel/timer.c
> @@ -39,6 +39,7 @@
> #include <linux/kallsyms.h>
> #include <linux/perf_counter.h>
> #include <linux/sched.h>
> +#include <linux/ftrace.h>
>
> #include <asm/uaccess.h>
> #include <asm/unistd.h>
> @@ -1178,6 +1179,7 @@ void update_process_times(int user_tick)
> printk_tick();
> scheduler_tick();
> run_posix_cpu_timers(p);
> + tracing_notify();
Hmm, that looks really not a good idea. The tracing shouldn't ever impact
the system when it is inactive.
Especially in such a fast path like the timer interrupt.
> }
>
> /*
> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> index f1e1533..db82b38 100644
> --- a/kernel/trace/ring_buffer.c
> +++ b/kernel/trace/ring_buffer.c
> @@ -443,6 +443,7 @@ struct ring_buffer_per_cpu {
> u64 write_stamp;
> u64 read_stamp;
> atomic_t record_disabled;
> + wait_queue_head_t sleepers;
That seems a too generic name. May be consumer_queue?
> };
>
> struct ring_buffer {
> @@ -999,6 +999,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
> spin_lock_init(&cpu_buffer->reader_lock);
> lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
> cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
> + init_waitqueue_head(&cpu_buffer->sleepers);
>
> bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
> GFP_KERNEL, cpu_to_node(cpu));
> @@ -3318,6 +3319,77 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
> EXPORT_SYMBOL_GPL(ring_buffer_read);
>
> /**
> + * ring_buffer_notify - notify the sleepers when there is any available page
> + * @buffer: The ring buffer.
> + */
> +void ring_buffer_notify(struct ring_buffer *buffer)
> +{
> + unsigned long flags;
> + struct ring_buffer_per_cpu *cpu_buffer;
> +
> + cpu_buffer = buffer->buffers[smp_processor_id()];
> +
> + if (!spin_trylock_irqsave(&cpu_buffer->reader_lock, flags))
> + return;
> +
> + if (waitqueue_active(&cpu_buffer->sleepers)) {
> + struct buffer_page *reader_page;
> + struct buffer_page *commit_page;
> +
> + reader_page = cpu_buffer->reader_page;
> + commit_page = ACCESS_ONCE(cpu_buffer->commit_page);
ACCESS_ONCE makes sense if you loop, to ensure the value
is not cached through iteration, but there I'm not sure this is
useful.
> +
> + /*
> + * ring_buffer_notify() is fast path, so we don't use the slow
> + * rb_get_reader_page(cpu_buffer, 1) to detect available pages.
> + */
> + if (reader_page == commit_page)
> + goto out;
> +
> + if (reader_page->read < rb_page_commit(reader_page)
> + || rb_set_head_page(cpu_buffer) != commit_page)
This may need a small comment to explain you are checking that the reader
is not completely consumed.
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [PATCH 2/3] tracing: block-able ring_buffer consumer
2009-08-29 10:21 ` Frederic Weisbecker
@ 2009-09-01 12:53 ` Lai Jiangshan
2009-09-01 23:05 ` Frederic Weisbecker
2009-09-09 21:10 ` Steven Rostedt
1 sibling, 1 reply; 8+ messages in thread
From: Lai Jiangshan @ 2009-09-01 12:53 UTC (permalink / raw)
To: Frederic Weisbecker; +Cc: Ingo Molnar, Steven Rostedt, LKML
Frederic Weisbecker wrote:
> On Thu, Aug 27, 2009 at 11:03:04AM +0800, Lai Jiangshan wrote:
>> makes consumer side(per_cpu/cpu#/trace_pipe_raw) block-able,
>> which is a TODO in trace.c
>>
>> Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com>
>> ---
>> diff --git a/include/linux/ftrace.h b/include/linux/ftrace.h
>> index dc3b132..b5dcf34 100644
>> --- a/include/linux/ftrace.h
>> +++ b/include/linux/ftrace.h
>> @@ -512,4 +512,10 @@ static inline void trace_hw_branch_oops(void) {}
>>
>> #endif /* CONFIG_HW_BRANCH_TRACER */
>>
>> +#ifdef CONFIG_TRACING
>> +void tracing_notify(void);
>> +#else
>> +static inline void tracing_notify(void) {}
>> +#endif
>> +
>> #endif /* _LINUX_FTRACE_H */
>> diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
>> index 7fca716..b81ceed 100644
>> --- a/include/linux/ring_buffer.h
>> +++ b/include/linux/ring_buffer.h
>> @@ -185,6 +185,10 @@ void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data);
>> int ring_buffer_read_page(struct ring_buffer *buffer, void **data_page,
>> size_t len, int cpu, int full);
>>
>> +void ring_buffer_notify(struct ring_buffer *buffer);
>> +signed long ring_buffer_wait_page(struct ring_buffer *buffer, int cpu,
>
>
> No need to use signed, it's implicit in the long type.
Ouch, I tried my best to make it returns the same type as
schedule_timeout() returns and forgot "long" == "signed long"
>
>
>
>> + signed long timeout);
>> +
>> struct trace_seq;
>>
>> int ring_buffer_print_entry_header(struct trace_seq *s);
>> diff --git a/kernel/timer.c b/kernel/timer.c
>> index 6e712df..79f5596 100644
>> --- a/kernel/timer.c
>> +++ b/kernel/timer.c
>> @@ -39,6 +39,7 @@
>> #include <linux/kallsyms.h>
>> #include <linux/perf_counter.h>
>> #include <linux/sched.h>
>> +#include <linux/ftrace.h>
>>
>> #include <asm/uaccess.h>
>> #include <asm/unistd.h>
>> @@ -1178,6 +1179,7 @@ void update_process_times(int user_tick)
>> printk_tick();
>> scheduler_tick();
>> run_posix_cpu_timers(p);
>> + tracing_notify();
>
>
>
> Hmm, that looks really not a good idea. The tracing shouldn't ever impact
> the system when it is inactive.
> Especially in such a fast path like the timer interrupt.
It do nothing at most time.
It just calls tracing_notify() and then returns, but...
it still has several mb()s, I can remove this mb()s in next patch.
We cannot call wake_up() and the tracing write side, so we delay
the wake_up() until next timer interrupt.
>
>
>
>> }
>>
>> /*
>> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
>> index f1e1533..db82b38 100644
>> --- a/kernel/trace/ring_buffer.c
>> +++ b/kernel/trace/ring_buffer.c
>> @@ -443,6 +443,7 @@ struct ring_buffer_per_cpu {
>> u64 write_stamp;
>> u64 read_stamp;
>> atomic_t record_disabled;
>> + wait_queue_head_t sleepers;
>
>
> That seems a too generic name. May be consumer_queue?
>
>
>> };
>>
>> struct ring_buffer {
>> @@ -999,6 +999,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
>> spin_lock_init(&cpu_buffer->reader_lock);
>> lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
>> cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
>> + init_waitqueue_head(&cpu_buffer->sleepers);
>>
>> bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
>> GFP_KERNEL, cpu_to_node(cpu));
>> @@ -3318,6 +3319,77 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
>> EXPORT_SYMBOL_GPL(ring_buffer_read);
>>
>> /**
>> + * ring_buffer_notify - notify the sleepers when there is any available page
>> + * @buffer: The ring buffer.
>> + */
>> +void ring_buffer_notify(struct ring_buffer *buffer)
>> +{
>> + unsigned long flags;
>> + struct ring_buffer_per_cpu *cpu_buffer;
>> +
>> + cpu_buffer = buffer->buffers[smp_processor_id()];
>> +
>> + if (!spin_trylock_irqsave(&cpu_buffer->reader_lock, flags))
>> + return;
>> +
>> + if (waitqueue_active(&cpu_buffer->sleepers)) {
>> + struct buffer_page *reader_page;
>> + struct buffer_page *commit_page;
>> +
>> + reader_page = cpu_buffer->reader_page;
>> + commit_page = ACCESS_ONCE(cpu_buffer->commit_page);
>
>
> ACCESS_ONCE makes sense if you loop, to ensure the value
> is not cached through iteration, but there I'm not sure this is
> useful.
>
commit_page is used twice here, It needs to be loaded only once.
>
>
>> +
>> + /*
>> + * ring_buffer_notify() is fast path, so we don't use the slow
>> + * rb_get_reader_page(cpu_buffer, 1) to detect available pages.
>> + */
>> + if (reader_page == commit_page)
>> + goto out;
>> +
>> + if (reader_page->read < rb_page_commit(reader_page)
>> + || rb_set_head_page(cpu_buffer) != commit_page)
>
>
>
> This may need a small comment to explain you are checking that the reader
> is not completely consumed.
>
>
Yes, it needs.
Thank a lot.
Lai.
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [PATCH 2/3] tracing: block-able ring_buffer consumer
2009-09-01 12:53 ` Lai Jiangshan
@ 2009-09-01 23:05 ` Frederic Weisbecker
2009-09-01 23:12 ` Frederic Weisbecker
0 siblings, 1 reply; 8+ messages in thread
From: Frederic Weisbecker @ 2009-09-01 23:05 UTC (permalink / raw)
To: Lai Jiangshan; +Cc: Ingo Molnar, Steven Rostedt, LKML
On Tue, Sep 01, 2009 at 08:53:03PM +0800, Lai Jiangshan wrote:
> >> @@ -1178,6 +1179,7 @@ void update_process_times(int user_tick)
> >> printk_tick();
> >> scheduler_tick();
> >> run_posix_cpu_timers(p);
> >> + tracing_notify();
> >
> >
> >
> > Hmm, that looks really not a good idea. The tracing shouldn't ever impact
> > the system when it is inactive.
> > Especially in such a fast path like the timer interrupt.
>
> It do nothing at most time.
> It just calls tracing_notify() and then returns, but...
> it still has several mb()s, I can remove this mb()s in next patch.
The timer interrupt is one of the real fast path of the kernel.
Adding an extra call there plus some condition checks even when
the tracing is off is not a nice thing.
I bet we could even measure the overhead of this, especially
in a 1000 Hz kernel.
Also do we have a low latency requirement that justifies this check
on every tick? I don't think so. It's just about having tracing
results.
> We cannot call wake_up() and the tracing write side, so we delay
> the wake_up() until next timer interrupt.
Indeed we can't because of a random cpu rq lock that we may be
already holding.
May be try something similar to what we are doing for sensible tracers.
Normal tracers use default_wait_pipe() which uses a common wake up.
The others that may wake up while already holding a rq lock use
a polling wait:
void poll_wait_pipe(struct trace_iterator *iter)
{
set_current_state(TASK_INTERRUPTIBLE);
/* sleep for 100 msecs, and try again. */
schedule_timeout(HZ / 10);
}
On the worst case the reader will wait for 1/100 secs
(the comment is wrong).
You can probably use the same thing for ring buffer splice
and poll waiters.
Frederic.
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [PATCH 2/3] tracing: block-able ring_buffer consumer
2009-09-01 23:05 ` Frederic Weisbecker
@ 2009-09-01 23:12 ` Frederic Weisbecker
0 siblings, 0 replies; 8+ messages in thread
From: Frederic Weisbecker @ 2009-09-01 23:12 UTC (permalink / raw)
To: Lai Jiangshan; +Cc: Ingo Molnar, Steven Rostedt, LKML
On Wed, Sep 02, 2009 at 01:05:10AM +0200, Frederic Weisbecker wrote:
> void poll_wait_pipe(struct trace_iterator *iter)
> {
> set_current_state(TASK_INTERRUPTIBLE);
> /* sleep for 100 msecs, and try again. */
> schedule_timeout(HZ / 10);
> }
>
> On the worst case the reader will wait for 1/100 secs
> (the comment is wrong).
No the comment is not wrong. It's just that I really suck in math :-)
>
> You can probably use the same thing for ring buffer splice
> and poll waiters.
>
> Frederic.
>
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [PATCH 2/3] tracing: block-able ring_buffer consumer
2009-08-27 3:03 [PATCH 2/3] tracing: block-able ring_buffer consumer Lai Jiangshan
2009-08-29 10:21 ` Frederic Weisbecker
@ 2009-09-09 21:06 ` Steven Rostedt
1 sibling, 0 replies; 8+ messages in thread
From: Steven Rostedt @ 2009-09-09 21:06 UTC (permalink / raw)
To: Lai Jiangshan; +Cc: Ingo Molnar, Frederic Weisbecker, LKML
On Thu, 2009-08-27 at 11:03 +0800, Lai Jiangshan wrote:
> makes consumer side(per_cpu/cpu#/trace_pipe_raw) block-able,
> which is a TODO in trace.c
I'd break this up into two patches. One that adds the ring buffer notify
infrastructure to the ring buffer. The other that adds the ftrace user.
>
> Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com>
> ---
> diff --git a/include/linux/ftrace.h b/include/linux/ftrace.h
> index dc3b132..b5dcf34 100644
> --- a/include/linux/ftrace.h
> +++ b/include/linux/ftrace.h
> @@ -512,4 +512,10 @@ static inline void trace_hw_branch_oops(void) {}
>
> #endif /* CONFIG_HW_BRANCH_TRACER */
>
> +#ifdef CONFIG_TRACING
> +void tracing_notify(void);
> +#else
> +static inline void tracing_notify(void) {}
> +#endif
> +
> #endif /* _LINUX_FTRACE_H */
> diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
> index 7fca716..b81ceed 100644
> --- a/include/linux/ring_buffer.h
> +++ b/include/linux/ring_buffer.h
> @@ -185,6 +185,10 @@ void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data);
> int ring_buffer_read_page(struct ring_buffer *buffer, void **data_page,
> size_t len, int cpu, int full);
>
> +void ring_buffer_notify(struct ring_buffer *buffer);
> +signed long ring_buffer_wait_page(struct ring_buffer *buffer, int cpu,
> + signed long timeout);
> +
> struct trace_seq;
>
> int ring_buffer_print_entry_header(struct trace_seq *s);
> diff --git a/kernel/timer.c b/kernel/timer.c
> index 6e712df..79f5596 100644
> --- a/kernel/timer.c
> +++ b/kernel/timer.c
> @@ -39,6 +39,7 @@
> #include <linux/kallsyms.h>
> #include <linux/perf_counter.h>
> #include <linux/sched.h>
> +#include <linux/ftrace.h>
>
> #include <asm/uaccess.h>
> #include <asm/unistd.h>
> @@ -1178,6 +1179,7 @@ void update_process_times(int user_tick)
> printk_tick();
> scheduler_tick();
> run_posix_cpu_timers(p);
> + tracing_notify();
> }
>
> /*
> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> index f1e1533..db82b38 100644
> --- a/kernel/trace/ring_buffer.c
> +++ b/kernel/trace/ring_buffer.c
> @@ -443,6 +443,7 @@ struct ring_buffer_per_cpu {
> u64 write_stamp;
> u64 read_stamp;
> atomic_t record_disabled;
> + wait_queue_head_t sleepers;
I prefer "waiters".
> };
>
> struct ring_buffer {
> @@ -999,6 +999,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
> spin_lock_init(&cpu_buffer->reader_lock);
> lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
> cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
> + init_waitqueue_head(&cpu_buffer->sleepers);
>
> bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
> GFP_KERNEL, cpu_to_node(cpu));
> @@ -3318,6 +3319,77 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
> EXPORT_SYMBOL_GPL(ring_buffer_read);
>
> /**
> + * ring_buffer_notify - notify the sleepers when there is any available page
> + * @buffer: The ring buffer.
> + */
> +void ring_buffer_notify(struct ring_buffer *buffer)
> +{
> + unsigned long flags;
> + struct ring_buffer_per_cpu *cpu_buffer;
> +
> + cpu_buffer = buffer->buffers[smp_processor_id()];
> +
> + if (!spin_trylock_irqsave(&cpu_buffer->reader_lock, flags))
> + return;
> +
> + if (waitqueue_active(&cpu_buffer->sleepers)) {
> + struct buffer_page *reader_page;
> + struct buffer_page *commit_page;
> +
> + reader_page = cpu_buffer->reader_page;
> + commit_page = ACCESS_ONCE(cpu_buffer->commit_page);
> +
> + /*
> + * ring_buffer_notify() is fast path, so we don't use the slow
> + * rb_get_reader_page(cpu_buffer, 1) to detect available pages.
> + */
> + if (reader_page == commit_page)
> + goto out;
> +
> + if (reader_page->read < rb_page_commit(reader_page)
> + || rb_set_head_page(cpu_buffer) != commit_page)
> + wake_up(&cpu_buffer->sleepers);
> + }
> +
> +out:
> + spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> +}
> +
> +static
> +int rb_page_available(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> + struct buffer_page *reader_page;
> +
> + spin_lock_irq(&cpu_buffer->reader_lock);
> + reader_page = rb_get_reader_page(cpu_buffer, 1);
> + spin_unlock_irq(&cpu_buffer->reader_lock);
> +
> + return !!reader_page;
> +}
> +
> +/**
> + * ring_buffer_wait_page - wait until there are available pages to read
> + * @buffer: The ring buffer.
> + * @cpu: The CPU buffer to be wait
> + * @timeout: timeout value in jiffies
> + *
> + * Make the current task sleep until there are available pages to read or
> + * until @timeout jiffies have elapsed or until it is interrupted by signals.
> + *
> + * The function returns 0 if the @timeout elapsed, -ERESTARTSYS if it
> + * was interrupted by a signal, and the remaining jiffies otherwise
> + * if there are available pages to read before the timeout elapsed.
> + */
> +signed long
> +ring_buffer_wait_page(struct ring_buffer *buffer, int cpu, signed long timeout)
> +{
> + struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
> +
> + return wait_event_interruptible_timeout(cpu_buffer->sleepers,
> + rb_page_available(cpu_buffer), timeout);
> +}
> +
> +/**
> * ring_buffer_size - return the size of the ring buffer (in bytes)
> * @buffer: The ring buffer.
> */
> diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
> index b7d873b..ee435ed 100644
> --- a/kernel/trace/trace.c
> +++ b/kernel/trace/trace.c
> @@ -2457,6 +2457,12 @@ int tracing_update_buffers(void)
> return ret;
> }
>
> +void tracing_notify(void)
> +{
> + if (global_trace.buffer)
> + ring_buffer_notify(global_trace.buffer);
> +}
> +
> struct trace_option_dentry;
>
> static struct trace_option_dentry *
> @@ -3232,12 +3237,30 @@ tracing_buffers_read(struct file *filp, char __user *ubuf,
>
> info->read = 0;
>
> + /*
> + * We try our best to read from full page,
> + * but we wait 2 seconds at most.
> + */
> + if (count >= PAGE_SIZE && !(filp->f_flags & O_NONBLOCK))
> + ring_buffer_wait_page(info->tr->buffer, info->cpu, HZ * 2);
Why do we always wait if count >= PAGE_SIZE. I don't see any check to
see if we don't already have data available.
> +
> +again:
> ret = ring_buffer_read_page(info->tr->buffer,
> &info->spare,
> count,
> info->cpu, 0);
> - if (ret < 0)
> - return 0;
> +
> + if (ret < 0) {
> + ret = 0;
> + if (!(filp->f_flags & O_NONBLOCK)) {
> + ret = ring_buffer_wait_page(info->tr->buffer,
> + info->cpu, MAX_SCHEDULE_TIMEOUT);
> + if (ret > 0)
> + goto again;
> + }
> +
> + return ret;
> + }
>
> pos = ring_buffer_page_len(info->spare);
>
> @@ -3363,6 +3386,7 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
> len &= PAGE_MASK;
> }
>
> +again:
> entries = ring_buffer_entries_cpu(info->tr->buffer, info->cpu);
>
> for (i = 0; i < PIPE_BUFFERS && len && entries; i++, len -= PAGE_SIZE) {
> @@ -3416,9 +3440,13 @@ tracing_buffers_splice_read(struct file *file, loff_t *ppos,
> if (!spd.nr_pages) {
> if (flags & SPLICE_F_NONBLOCK)
> ret = -EAGAIN;
> - else
> - ret = 0;
> - /* TODO: block */
> + else {
> + ret = ring_buffer_wait_page(info->tr->buffer,
> + info->cpu, MAX_SCHEDULE_TIMEOUT);
> + if ((ssize_t)ret > 0)
> + goto again;
> + }
> +
> return ret;
> }
Other than my comments, I like this patch.
-- Steve
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [PATCH 2/3] tracing: block-able ring_buffer consumer
2009-08-29 10:21 ` Frederic Weisbecker
2009-09-01 12:53 ` Lai Jiangshan
@ 2009-09-09 21:10 ` Steven Rostedt
2009-09-10 2:06 ` Frederic Weisbecker
1 sibling, 1 reply; 8+ messages in thread
From: Steven Rostedt @ 2009-09-09 21:10 UTC (permalink / raw)
To: Frederic Weisbecker; +Cc: Lai Jiangshan, Ingo Molnar, LKML
On Sat, 2009-08-29 at 12:21 +0200, Frederic Weisbecker wrote:
> On Thu, Aug 27, 2009 at 11:03:04AM +0800, Lai Jiangshan wrote:
> >
> > makes consumer side(per_cpu/cpu#/trace_pipe_raw) block-able,
> > which is a TODO in trace.c
> >
> > Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com>
> > ---
> >
> > int ring_buffer_print_entry_header(struct trace_seq *s);
> > diff --git a/kernel/timer.c b/kernel/timer.c
> > index 6e712df..79f5596 100644
> > --- a/kernel/timer.c
> > +++ b/kernel/timer.c
> > @@ -39,6 +39,7 @@
> > #include <linux/kallsyms.h>
> > #include <linux/perf_counter.h>
> > #include <linux/sched.h>
> > +#include <linux/ftrace.h>
> >
> > #include <asm/uaccess.h>
> > #include <asm/unistd.h>
> > @@ -1178,6 +1179,7 @@ void update_process_times(int user_tick)
> > printk_tick();
> > scheduler_tick();
> > run_posix_cpu_timers(p);
> > + tracing_notify();
>
>
>
> Hmm, that looks really not a good idea. The tracing shouldn't ever impact
> the system when it is inactive.
> Especially in such a fast path like the timer interrupt.
>
Perhaps we should put a trace point there instead. Then we could add a
probe to it (doesn't need to be an event).
trace_update_process_times() ?
>
>
> > }
> >
> > /*
> > diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> > index f1e1533..db82b38 100644
> > --- a/kernel/trace/ring_buffer.c
> > +++ b/kernel/trace/ring_buffer.c
> > @@ -443,6 +443,7 @@ struct ring_buffer_per_cpu {
> > u64 write_stamp;
> > u64 read_stamp;
> > atomic_t record_disabled;
> > + wait_queue_head_t sleepers;
>
>
> That seems a too generic name. May be consumer_queue?
"waiters" is what is usually used.
>
>
> > };
> >
> > struct ring_buffer {
> > @@ -999,6 +999,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
> > spin_lock_init(&cpu_buffer->reader_lock);
> > lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
> > cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
> > + init_waitqueue_head(&cpu_buffer->sleepers);
> >
> > bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
> > GFP_KERNEL, cpu_to_node(cpu));
> > @@ -3318,6 +3319,77 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
> > EXPORT_SYMBOL_GPL(ring_buffer_read);
> >
> > /**
> > + * ring_buffer_notify - notify the sleepers when there is any available page
> > + * @buffer: The ring buffer.
> > + */
> > +void ring_buffer_notify(struct ring_buffer *buffer)
> > +{
> > + unsigned long flags;
> > + struct ring_buffer_per_cpu *cpu_buffer;
> > +
> > + cpu_buffer = buffer->buffers[smp_processor_id()];
> > +
> > + if (!spin_trylock_irqsave(&cpu_buffer->reader_lock, flags))
> > + return;
> > +
> > + if (waitqueue_active(&cpu_buffer->sleepers)) {
> > + struct buffer_page *reader_page;
> > + struct buffer_page *commit_page;
> > +
> > + reader_page = cpu_buffer->reader_page;
> > + commit_page = ACCESS_ONCE(cpu_buffer->commit_page);
>
>
> ACCESS_ONCE makes sense if you loop, to ensure the value
> is not cached through iteration, but there I'm not sure this is
> useful.
ACCESS_ONCE is fine. Otherwise we may read it again on the check below.
Of course the worse that will happen is we don't wake up on this tick.
>
>
>
> > +
> > + /*
> > + * ring_buffer_notify() is fast path, so we don't use the slow
> > + * rb_get_reader_page(cpu_buffer, 1) to detect available pages.
> > + */
> > + if (reader_page == commit_page)
> > + goto out;
> > +
> > + if (reader_page->read < rb_page_commit(reader_page)
> > + || rb_set_head_page(cpu_buffer) != commit_page)
>
>
>
> This may need a small comment to explain you are checking that the reader
> is not completely consumed.
Heh, it was obvious for me ;-)
-- Steve
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [PATCH 2/3] tracing: block-able ring_buffer consumer
2009-09-09 21:10 ` Steven Rostedt
@ 2009-09-10 2:06 ` Frederic Weisbecker
0 siblings, 0 replies; 8+ messages in thread
From: Frederic Weisbecker @ 2009-09-10 2:06 UTC (permalink / raw)
To: Steven Rostedt; +Cc: Lai Jiangshan, Ingo Molnar, LKML
On Wed, Sep 09, 2009 at 05:10:02PM -0400, Steven Rostedt wrote:
> On Sat, 2009-08-29 at 12:21 +0200, Frederic Weisbecker wrote:
> > On Thu, Aug 27, 2009 at 11:03:04AM +0800, Lai Jiangshan wrote:
> > >
> > > makes consumer side(per_cpu/cpu#/trace_pipe_raw) block-able,
> > > which is a TODO in trace.c
> > >
> > > Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com>
> > > ---
> > >
> > > int ring_buffer_print_entry_header(struct trace_seq *s);
> > > diff --git a/kernel/timer.c b/kernel/timer.c
> > > index 6e712df..79f5596 100644
> > > --- a/kernel/timer.c
> > > +++ b/kernel/timer.c
> > > @@ -39,6 +39,7 @@
> > > #include <linux/kallsyms.h>
> > > #include <linux/perf_counter.h>
> > > #include <linux/sched.h>
> > > +#include <linux/ftrace.h>
> > >
> > > #include <asm/uaccess.h>
> > > #include <asm/unistd.h>
> > > @@ -1178,6 +1179,7 @@ void update_process_times(int user_tick)
> > > printk_tick();
> > > scheduler_tick();
> > > run_posix_cpu_timers(p);
> > > + tracing_notify();
> >
> >
> >
> > Hmm, that looks really not a good idea. The tracing shouldn't ever impact
> > the system when it is inactive.
> > Especially in such a fast path like the timer interrupt.
> >
>
> Perhaps we should put a trace point there instead. Then we could add a
> probe to it (doesn't need to be an event).
>
> trace_update_process_times() ?
Yeah that would do the trick although I still doubt about the need
to do this check at every tick.
>
> >
> >
> > > }
> > >
> > > /*
> > > diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> > > index f1e1533..db82b38 100644
> > > --- a/kernel/trace/ring_buffer.c
> > > +++ b/kernel/trace/ring_buffer.c
> > > @@ -443,6 +443,7 @@ struct ring_buffer_per_cpu {
> > > u64 write_stamp;
> > > u64 read_stamp;
> > > atomic_t record_disabled;
> > > + wait_queue_head_t sleepers;
> >
> >
> > That seems a too generic name. May be consumer_queue?
>
> "waiters" is what is usually used.
Yeah.
>
> >
> >
> > > };
> > >
> > > struct ring_buffer {
> > > @@ -999,6 +999,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
> > > spin_lock_init(&cpu_buffer->reader_lock);
> > > lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
> > > cpu_buffer->lock = (raw_spinlock_t)__RAW_SPIN_LOCK_UNLOCKED;
> > > + init_waitqueue_head(&cpu_buffer->sleepers);
> > >
> > > bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
> > > GFP_KERNEL, cpu_to_node(cpu));
> > > @@ -3318,6 +3319,77 @@ ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
> > > EXPORT_SYMBOL_GPL(ring_buffer_read);
> > >
> > > /**
> > > + * ring_buffer_notify - notify the sleepers when there is any available page
> > > + * @buffer: The ring buffer.
> > > + */
> > > +void ring_buffer_notify(struct ring_buffer *buffer)
> > > +{
> > > + unsigned long flags;
> > > + struct ring_buffer_per_cpu *cpu_buffer;
> > > +
> > > + cpu_buffer = buffer->buffers[smp_processor_id()];
> > > +
> > > + if (!spin_trylock_irqsave(&cpu_buffer->reader_lock, flags))
> > > + return;
> > > +
> > > + if (waitqueue_active(&cpu_buffer->sleepers)) {
> > > + struct buffer_page *reader_page;
> > > + struct buffer_page *commit_page;
> > > +
> > > + reader_page = cpu_buffer->reader_page;
> > > + commit_page = ACCESS_ONCE(cpu_buffer->commit_page);
> >
> >
> > ACCESS_ONCE makes sense if you loop, to ensure the value
> > is not cached through iteration, but there I'm not sure this is
> > useful.
>
> ACCESS_ONCE is fine. Otherwise we may read it again on the check below.
> Of course the worse that will happen is we don't wake up on this tick.
Yeah, I haven't seen the fact we may check more than once there.
> >
> >
> >
> > > +
> > > + /*
> > > + * ring_buffer_notify() is fast path, so we don't use the slow
> > > + * rb_get_reader_page(cpu_buffer, 1) to detect available pages.
> > > + */
> > > + if (reader_page == commit_page)
> > > + goto out;
> > > +
> > > + if (reader_page->read < rb_page_commit(reader_page)
> > > + || rb_set_head_page(cpu_buffer) != commit_page)
> >
> >
> >
> > This may need a small comment to explain you are checking that the reader
> > is not completely consumed.
>
> Heh, it was obvious for me ;-)
For you, of course ;-)
>
> -- Steve
>
>
^ permalink raw reply [flat|nested] 8+ messages in thread
end of thread, other threads:[~2009-09-10 2:06 UTC | newest]
Thread overview: 8+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2009-08-27 3:03 [PATCH 2/3] tracing: block-able ring_buffer consumer Lai Jiangshan
2009-08-29 10:21 ` Frederic Weisbecker
2009-09-01 12:53 ` Lai Jiangshan
2009-09-01 23:05 ` Frederic Weisbecker
2009-09-01 23:12 ` Frederic Weisbecker
2009-09-09 21:10 ` Steven Rostedt
2009-09-10 2:06 ` Frederic Weisbecker
2009-09-09 21:06 ` Steven Rostedt
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).