All of lore.kernel.org
 help / color / mirror / Atom feed
From: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
To: Steven Rostedt <rostedt@goodmis.org>,
	LKML <linux-kernel@vger.kernel.org>
Cc: Linus Torvalds <torvalds@linux-foundation.org>,
	Andrew Morton <akpm@linux-foundation.org>,
	Peter Zijlstra <peterz@infradead.org>,
	Ingo Molnar <mingo@elte.hu>,
	Frederic Weisbecker <fweisbec@gmail.com>,
	Thomas Gleixner <tglx@linutronix.de>,
	Christoph Hellwig <hch@lst.de>,
	Mathieu Desnoyers <mathieu.desnoyers@efficios.com>,
	Li Zefan <lizf@cn.fujitsu.com>,
	Lai Jiangshan <laijs@cn.fujitsu.com>,
	Johannes Berg <johannes.berg@intel.com>,
	Masami Hiramatsu <masami.hiramatsu.pt@hitachi.com>,
	Arnaldo Carvalho de Melo <acme@infradead.org>,
	Tom Zanussi <tzanussi@gmail.com>,
	KOSAKI Motohiro <kosaki.motohiro@jp.fujitsu.com>,
	Andi Kleen <andi@firstfloor.org>
Subject: [patch 19/20] Ring Buffer: Basic API
Date: Fri, 09 Jul 2010 18:57:46 -0400	[thread overview]
Message-ID: <20100709225819.667435403@efficios.com> (raw)
In-Reply-To: 20100709225727.312232266@efficios.com

[-- Attachment #1: ring-buffer-simple-api.patch --]
[-- Type: text/plain, Size: 45079 bytes --]

Offer basic APIs for pre-built ring buffer clients:

- Global Overwrite
- Global Discard

- Per-cpu Overwrite with channel-wide iterator
- Per-cpu Discard with channel-wide iterator

- Per-cpu Overwrite with buffer-local iterator
- Per-cpu Discard with buffer-local iterator

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
---
 include/linux/ringbuffer/global_discard.h         |   39 ++
 include/linux/ringbuffer/global_overwrite.h       |   40 ++
 include/linux/ringbuffer/percpu_discard.h         |   39 ++
 include/linux/ringbuffer/percpu_local_discard.h   |   47 ++
 include/linux/ringbuffer/percpu_local_overwrite.h |   47 ++
 include/linux/ringbuffer/percpu_overwrite.h       |   40 ++
 lib/ringbuffer/Makefile                           |   15 
 lib/ringbuffer/ring_buffer_global.c               |  317 ++++++++++++++++++
 lib/ringbuffer/ring_buffer_percpu.c               |  334 +++++++++++++++++++
 lib/ringbuffer/ring_buffer_percpu_local.c         |  371 ++++++++++++++++++++++
 10 files changed, 1283 insertions(+), 6 deletions(-)

Index: linux.trees.git/lib/ringbuffer/Makefile
===================================================================
--- linux.trees.git.orig/lib/ringbuffer/Makefile	2010-07-09 18:34:50.000000000 -0400
+++ linux.trees.git/lib/ringbuffer/Makefile	2010-07-09 18:35:16.000000000 -0400
@@ -1,6 +1,9 @@
-obj-y += ring_buffer_backend.o
-obj-y += ring_buffer_frontend.o
-obj-y += ring_buffer_iterator.o
-obj-y += ring_buffer_vfs.o
-obj-y += ring_buffer_splice.o
-obj-y += ring_buffer_mmap.o
+ring_buffer-objs := ring_buffer_backend.o ring_buffer_frontend.o \
+		ring_buffer_iterator.o ring_buffer_vfs.o \
+		ring_buffer_splice.o ring_buffer_mmap.o
+
+obj-$(CONFIG_LIB_RING_BUFFER) += ring_buffer.o
+
+obj-$(CONFIG_LIB_RING_BUFFER_CLIENTS) += ring_buffer_global.o
+obj-$(CONFIG_LIB_RING_BUFFER_CLIENTS) += ring_buffer_percpu.o
+obj-$(CONFIG_LIB_RING_BUFFER_CLIENTS) += ring_buffer_percpu_local.o
Index: linux.trees.git/include/linux/ringbuffer/global_discard.h
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/include/linux/ringbuffer/global_discard.h	2010-07-09 18:35:16.000000000 -0400
@@ -0,0 +1,39 @@
+#ifndef _RING_BUFFER_GLOBAL_DISCARD_H
+#define _RING_BUFFER_GLOBAL_DISCARD_H
+
+/*
+ * ring_buffer_global_discard.h
+ *
+ * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Ring buffer global discard API. Drops records when buffer is full.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/ringbuffer/iterator.h>
+#include <linux/types.h>
+
+/*
+ * ring_buffer_global_discard_create creates a global discard ring buffer.
+ * buf_size is the buffer size, which will be rounded up to the next power of 2
+ * (the floor value is 2*PAGE_SIZE). Returns the ring buffer channel address on
+ * success, NULL on error.
+ */
+extern struct channel *ring_buffer_global_discard_create(size_t buf_size);
+
+/*
+ * ring_buffer_global_discard_destroy finalizes all channel's buffers, waits
+ * for readers to release all references, and destroys the channel.
+ */
+extern void ring_buffer_global_discard_destroy(struct channel *chan);
+
+/*
+ * ring_buffer_global_discard_write writes a record into the ring buffer. The
+ * record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+extern int ring_buffer_global_discard_write(struct channel *chan,
+					    const void *src, size_t len);
+
+#endif /* _RING_BUFFER_GLOBAL_DISCARD_H */
Index: linux.trees.git/include/linux/ringbuffer/global_overwrite.h
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/include/linux/ringbuffer/global_overwrite.h	2010-07-09 18:35:16.000000000 -0400
@@ -0,0 +1,40 @@
+#ifndef _RING_BUFFER_GLOBAL_OVERWRITE_H
+#define _RING_BUFFER_GLOBAL_OVERWRITE_H
+
+/*
+ * ring_buffer_global_overwrite.h
+ *
+ * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Ring buffer global overwrite API. Overwrites oldest records when buffer is
+ * full.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/ringbuffer/iterator.h>
+#include <linux/types.h>
+
+/*
+ * ring_buffer_global_overwrite_create creates a global overwrite ring buffer.
+ * buf_size is the buffer size, which will be rounded up to the next power of 2
+ * (the floor value is 2*PAGE_SIZE). Returns the ring buffer channel address on
+ * success, NULL on error.
+ */
+extern struct channel *ring_buffer_global_overwrite_create(size_t buf_size);
+
+/*
+ * ring_buffer_global_overwrite_destroy finalizes all channel's buffers, waits
+ * for readers to release all references, and destroys the channel.
+ */
+extern void ring_buffer_global_overwrite_destroy(struct channel *chan);
+
+/*
+ * ring_buffer_global_overwrite_write writes a record into the ring buffer. The
+ * record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+extern int ring_buffer_global_overwrite_write(struct channel *chan,
+					      const void *src, size_t len);
+
+#endif /* _RING_BUFFER_GLOBAL_OVERWRITE_H */
Index: linux.trees.git/include/linux/ringbuffer/percpu_discard.h
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/include/linux/ringbuffer/percpu_discard.h	2010-07-09 18:35:16.000000000 -0400
@@ -0,0 +1,39 @@
+#ifndef _RING_BUFFER_PERCPU_DISCARD_H
+#define _RING_BUFFER_PERCPU_DISCARD_H
+
+/*
+ * ring_buffer_percpu_discard.h
+ *
+ * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Per-CPU ring buffer discard API. Drops records when buffer is full.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/ringbuffer/iterator.h>
+#include <linux/types.h>
+
+/*
+ * ring_buffer_percpu_discard_create creates a per-cpu producer-consumer ring
+ * buffer.  buf_size is the buffer size, which will be rounded up to the next
+ * power of 2 (the floor value is 2*PAGE_SIZE). Returns the ring buffer channel
+ * address on success, NULL on error.
+ */
+extern struct channel *ring_buffer_percpu_discard_create(size_t buf_size);
+
+/*
+ * ring_buffer_percpu_discard_destroy finalizes all channel's buffers, waits
+ * for readers to release all references, and destroys the channel.
+ */
+extern void ring_buffer_percpu_discard_destroy(struct channel *chan);
+
+/*
+ * ring_buffer_percpu_discard_write writes a record into the ring buffer. The
+ * record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+extern int ring_buffer_percpu_discard_write(struct channel *chan,
+					    const void *src, size_t len);
+
+#endif /* _RING_BUFFER_PERCPU_DISCARD_H */
Index: linux.trees.git/include/linux/ringbuffer/percpu_local_discard.h
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/include/linux/ringbuffer/percpu_local_discard.h	2010-07-09 18:35:43.000000000 -0400
@@ -0,0 +1,47 @@
+#ifndef _RING_BUFFER_PERCPU_LOCAL_DISCARD_H
+#define _RING_BUFFER_PERCPU_LOCAL_DISCARD_H
+
+/*
+ * ring_buffer_percpu_local_discard.h
+ *
+ * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Per-CPU local ring buffer discard API. Drops records when buffer is full.
+ * Presents per-cpu-buffer iterators.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/ringbuffer/iterator.h>
+#include <linux/types.h>
+
+/*
+ * ring_buffer_percpu_local_discard_create creates a per-cpu producer-consumer
+ * ring buffer with local iterators. buf_size is the buffer size, which will be
+ * rounded up to the next power of 2 (the floor value is 2*PAGE_SIZE). Returns
+ * the ring buffer channel address on success, NULL on error.
+ * on_buffer_create and on_buffer_finalize are callbacks called whenever a
+ * per-cpu buffer is created or finalized. on_buffer_create returns 0 on
+ * success.
+ */
+extern
+struct channel *ring_buffer_percpu_local_discard_create(size_t buf_size,
+		int (*on_buffer_create)(struct ring_buffer *buf, int cpu),
+		void (*on_buffer_finalize)(struct ring_buffer *buf, int cpu));
+
+/*
+ * ring_buffer_percpu_local_discard_destroy finalizes all channel's buffers,
+ * waits for readers to release all references, and destroys the channel.
+ */
+extern void ring_buffer_percpu_local_discard_destroy(struct channel *chan);
+
+/*
+ * ring_buffer_percpu_local_discard_write writes a record into the ring buffer.
+ * The record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+extern
+int ring_buffer_percpu_local_discard_write(struct channel *chan,
+					   const void *src, size_t len);
+
+#endif /* _RING_BUFFER_PERCPU_LOCAL_DISCARD_H */
Index: linux.trees.git/include/linux/ringbuffer/percpu_local_overwrite.h
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/include/linux/ringbuffer/percpu_local_overwrite.h	2010-07-09 18:35:16.000000000 -0400
@@ -0,0 +1,47 @@
+#ifndef _RING_BUFFER_PERCPU_LOCAL_OVERWRITE_H
+#define _RING_BUFFER_PERCPU_LOCAL_OVERWRITE_H
+
+/*
+ * ring_buffer_percpu_local_overwrite.h
+ *
+ * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Per-CPU local ring buffer overwrite API. Overwrites oldest records
+ * when buffer is full. Presents per-cpu-buffer iterators.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/ringbuffer/iterator.h>
+#include <linux/types.h>
+
+/*
+ * ring_buffer_percpu_local_overwrite_create creates a per-cpu overwrite ring
+ * buffer with local iterators. buf_size is the buffer size, which will be
+ * rounded up to the next power of 2 (the floor value is 2*PAGE_SIZE). Returns
+ * the ring buffer channel address on success, NULL on error.
+ * on_buffer_create and on_buffer_finalize are callbacks called whenever a
+ * per-cpu buffer is created or finalized. on_buffer_create returns 0 on
+ * success.
+ */
+extern
+struct channel *ring_buffer_percpu_local_overwrite_create(size_t buf_size,
+		int (*on_buffer_create)(struct ring_buffer *buf, int cpu),
+		void (*on_buffer_finalize)(struct ring_buffer *buf, int cpu));
+
+/*
+ * ring_buffer_percpu_local_overwrite_destroy finalizes all channel's buffers,
+ * waits for readers to release all references, and destroys the channel.
+ */
+extern void ring_buffer_percpu_local_overwrite_destroy(struct channel *chan);
+
+/*
+ * ring_buffer_percpu_local_overwrite_write writes a record into the ring
+ * buffer. The record starts at the "src" address and is "len" bytes long.
+ * Returns 0 on success, else it returns a negative error value.
+ */
+extern
+int ring_buffer_percpu_local_overwrite_write(struct channel *chan,
+					     const void *src, size_t len);
+
+#endif /* _RING_BUFFER_PERCPU_LOCAL_OVERWRITE_H */
Index: linux.trees.git/include/linux/ringbuffer/percpu_overwrite.h
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/include/linux/ringbuffer/percpu_overwrite.h	2010-07-09 18:35:16.000000000 -0400
@@ -0,0 +1,40 @@
+#ifndef _RING_BUFFER_PERCPU_OVERWRITE_H
+#define _RING_BUFFER_PERCPU_OVERWRITE_H
+
+/*
+ * ring_buffer_percpu_overwrite.h
+ *
+ * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Per-CPU ring buffer overwrite API. Overwrites oldest records when
+ * buffer is full.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/ringbuffer/iterator.h>
+#include <linux/types.h>
+
+/*
+ * ring_buffer_percpu_overwrite_create creates a per-cpu overwrite ring buffer.
+ * buf_size is the buffer size, which will be rounded up to the next power of 2
+ * (the floor value is 2*PAGE_SIZE). Returns the ring buffer channel address on
+ * success, NULL on error.
+ */
+extern struct channel *ring_buffer_percpu_overwrite_create(size_t buf_size);
+
+/*
+ * ring_buffer_percpu_overwrite_destroy finalizes all channel's buffers, waits
+ * for readers to release all references, and destroys the channel.
+ */
+extern void ring_buffer_percpu_overwrite_destroy(struct channel *chan);
+
+/*
+ * ring_buffer_percpu_overwrite_write writes a record into the ring buffer. The
+ * record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+extern int ring_buffer_percpu_overwrite_write(struct channel *chan,
+					      const void *src, size_t len);
+
+#endif /* _RING_BUFFER_PERCPU_OVERWRITE_H */
Index: linux.trees.git/lib/ringbuffer/ring_buffer_global.c
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/lib/ringbuffer/ring_buffer_global.c	2010-07-09 18:35:16.000000000 -0400
@@ -0,0 +1,317 @@
+/*
+ * ring_buffer_global.c
+ *
+ * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Ring buffer global library implementation.
+ * Creates instances of both overwrite and discard modes.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/types.h>
+#include <linux/module.h>
+#include <linux/ringbuffer/config.h>
+#include <linux/ringbuffer/global_overwrite.h>
+#include <linux/ringbuffer/global_discard.h>
+#include <linux/ringbuffer/vfs.h>
+
+struct subbuffer_header {
+	uint8_t header_end[0];		/* End of header */
+};
+
+struct record_header {
+	uint32_t len;			/* Size of record payload */
+	uint8_t header_end[0];		/* End of header */
+};
+
+static inline
+u64 ring_buffer_clock_read(struct channel *chan)
+{
+	return 0;
+}
+
+static inline
+unsigned char record_header_size(const struct ring_buffer_config *config,
+				 struct channel *chan, size_t offset,
+				 size_t data_size, size_t *pre_header_padding,
+				 unsigned int rflags,
+				 struct ring_buffer_ctx *ctx)
+{
+	return offsetof(struct record_header, header_end);
+}
+
+#include <linux/ringbuffer/api.h>
+
+static
+u64 client_ring_buffer_clock_read(struct channel *chan)
+{
+	return 0;
+}
+
+static
+size_t client_record_header_size(const struct ring_buffer_config *config,
+				 struct channel *chan, size_t offset,
+				 size_t data_size,
+				 size_t *pre_header_padding,
+				 unsigned int rflags,
+				 struct ring_buffer_ctx *ctx)
+{
+	return record_header_size(config, chan, offset, data_size,
+				  pre_header_padding, rflags, ctx);
+}
+
+static
+size_t client_subbuffer_header_size(void)
+{
+	return offsetof(struct subbuffer_header, header_end);
+}
+
+static
+void client_buffer_begin(struct ring_buffer *buf, u64 tsc,
+			 unsigned int subbuf_idx)
+{
+}
+
+static
+void client_buffer_end(struct ring_buffer *buf, u64 tsc,
+		       unsigned int subbuf_idx, unsigned long data_size)
+{
+}
+
+static
+int client_buffer_create(struct ring_buffer *buf, void *priv,
+			 int cpu, const char *name)
+{
+	return 0;
+}
+
+static
+void client_buffer_finalize(struct ring_buffer *buf, void *priv, int cpu)
+{
+}
+
+static
+void client_record_get(const struct ring_buffer_config *config,
+		       struct channel *chan, struct ring_buffer *buf,
+		       size_t offset, size_t *header_len,
+		       size_t *payload_len, u64 *timestamp)
+{
+	struct record_header header;
+	int ret;
+
+	ret = ring_buffer_read(&buf->backend, offset, &header,
+			       offsetof(struct record_header, header_end));
+	CHAN_WARN_ON(chan, ret != offsetof(struct record_header, header_end));
+	*header_len = offsetof(struct record_header, header_end);
+	*payload_len = header.len;
+	*timestamp = 0;
+}
+
+/*
+ * Typically 8 subbuffers of variable size.
+ * Maximum subbuffer size is 4GB. Allocate more subbuffers if more space is
+ * requested.
+ * Periodical buffer switch deferrable timer set to 100ms.
+ * Periodical reader wakeup delivery timer is disabled. It is useless because
+ * RING_BUFFER_WAKEUP_BY_WRITER is set.
+ */
+#define SG_SUBBUF_NUM_ORDER	3
+#define SG_SUBBUF_NUM		(1 << SG_SUBBUF_NUM_ORDER)
+#define SG_SWITCH_INTERVAL_MS	100U
+#define SG_SWITCH_INTERVAL_US	(SG_SWITCH_INTERVAL_MS * 1000)
+#define SG_READ_INTERVAL_US	0
+#define SG_U32_MAX		4294967295U	/* 2^32 - 1 */
+
+static const struct ring_buffer_config global_overwrite_config = {
+	.cb.ring_buffer_clock_read = client_ring_buffer_clock_read,
+	.cb.record_header_size = client_record_header_size,
+	.cb.subbuffer_header_size = client_subbuffer_header_size,
+	.cb.buffer_begin = client_buffer_begin,
+	.cb.buffer_end = client_buffer_end,
+	.cb.buffer_create = client_buffer_create,
+	.cb.buffer_finalize = client_buffer_finalize,
+	.cb.record_get = client_record_get,
+
+	.tsc_bits = 0,
+	.alloc = RING_BUFFER_ALLOC_GLOBAL,
+	.sync = RING_BUFFER_SYNC_GLOBAL,
+	.mode = RING_BUFFER_OVERWRITE,
+	.align = RING_BUFFER_PACKED,
+	.backend = RING_BUFFER_PAGE,
+	.output = RING_BUFFER_ITERATOR,
+	.oops = RING_BUFFER_NO_OOPS_CONSISTENCY,
+	.ipi = RING_BUFFER_NO_IPI_BARRIER,
+	.wakeup = RING_BUFFER_WAKEUP_BY_WRITER,
+};
+
+static const struct ring_buffer_config global_discard_config = {
+	.cb.ring_buffer_clock_read = client_ring_buffer_clock_read,
+	.cb.record_header_size = client_record_header_size,
+	.cb.subbuffer_header_size = client_subbuffer_header_size,
+	.cb.buffer_begin = client_buffer_begin,
+	.cb.buffer_end = client_buffer_end,
+	.cb.buffer_create = client_buffer_create,
+	.cb.buffer_finalize = client_buffer_finalize,
+	.cb.record_get = client_record_get,
+
+	.tsc_bits = 0,
+	.alloc = RING_BUFFER_ALLOC_GLOBAL,
+	.sync = RING_BUFFER_SYNC_GLOBAL,
+	.mode = RING_BUFFER_DISCARD,
+	.align = RING_BUFFER_PACKED,
+	.backend = RING_BUFFER_PAGE,
+	.output = RING_BUFFER_ITERATOR,
+	.oops = RING_BUFFER_NO_OOPS_CONSISTENCY,
+	.ipi = RING_BUFFER_NO_IPI_BARRIER,
+	.wakeup = RING_BUFFER_WAKEUP_BY_WRITER,
+};
+
+/* Wrapper library API */
+
+static
+struct channel *ring_buffer_global_create(
+				const struct ring_buffer_config *config,
+				size_t buf_size)
+{
+	size_t subbuf_size, subbuf_size_order;
+	unsigned int subbuf_num = SG_SUBBUF_NUM;
+
+	/* Typically use 8 subbuffers, minimum of PAGE_SIZE size each */
+	buf_size = max_t(size_t, buf_size, PAGE_SIZE << SG_SUBBUF_NUM_ORDER);
+	subbuf_size = buf_size >> SG_SUBBUF_NUM_ORDER;
+	/*
+	 * Ensure the event payload size fits on u32 event header.
+	 * Maximum subbuffer size is therefore 4GB.
+	 */
+	subbuf_size = min_t(size_t, SG_U32_MAX, subbuf_size);
+
+	/* Allocate more than 8 subbuffers if necessary. */
+	if (subbuf_size < (buf_size >> SG_SUBBUF_NUM_ORDER)) {
+		subbuf_size_order = get_count_order(subbuf_size);
+		subbuf_num = buf_size >> subbuf_size_order;
+	}
+
+	return channel_create(config, "sg", NULL, NULL,
+			      subbuf_size, subbuf_num,
+			      SG_SWITCH_INTERVAL_US, SG_READ_INTERVAL_US);
+}
+
+/**
+ * ring_buffer_global_overwrite_create - creates a global overwrite ring buffer.
+ * @buf_size: the buffer size
+ *
+ * Returns the ring buffer channel address on success, NULL on error.
+ */
+struct channel *ring_buffer_global_overwrite_create(size_t buf_size)
+{
+	return ring_buffer_global_create(&global_overwrite_config, buf_size);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_global_overwrite_create);
+
+/**
+ * ring_buffer_global_discard_create - creates a global discard ring buffer.
+ * @buf_size: the buffer size
+ *
+ * Returns the ring buffer channel address on success, NULL on error.
+ */
+
+struct channel *ring_buffer_global_discard_create(size_t buf_size)
+{
+	return ring_buffer_global_create(&global_discard_config, buf_size);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_global_discard_create);
+
+static
+void ring_buffer_global_destroy(struct channel *chan)
+{
+	channel_destroy(chan);
+}
+
+/**
+ * ring_buffer_global_overwrite_destroy - teardown global overwrite ring buffer.
+ * @chan: ring buffer channel
+ */
+void ring_buffer_global_overwrite_destroy(struct channel *chan)
+{
+	ring_buffer_global_destroy(chan);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_global_overwrite_destroy);
+
+/**
+ * ring_buffer_global_overwrite_destroy - teardown global discard ring buffer.
+ * @chan: ring buffer channel
+ */
+void ring_buffer_global_discard_destroy(struct channel *chan)
+{
+	ring_buffer_global_destroy(chan);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_global_discard_destroy);
+
+/**
+ * ring_buffer_global_write - writes a record into the ring buffer.
+ * @chan: ring buffer channel
+ * @src: start of input to copy from
+ * @len: length of record
+ *
+ * The record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+static
+int ring_buffer_global_write(const struct ring_buffer_config *config,
+			     struct channel *chan, const void *src, size_t len)
+{
+	struct record_header header;
+	struct ring_buffer_ctx ctx;
+	int ret;
+
+	ring_buffer_ctx_init(&ctx, chan, NULL, len, 0, 0);
+	ret = ring_buffer_reserve(config, &ctx);
+	if (ret)
+		goto end;
+	header.len = len;
+	ring_buffer_write(config, &ctx, &header,
+			  offsetof(struct record_header, header_end));
+	ring_buffer_write(config, &ctx, src, len);
+	ring_buffer_commit(config, &ctx);
+end:
+	return ret;
+
+}
+
+/**
+ * ring_buffer_global_overwrite_write - writes a record into the ring buffer.
+ * @chan: ring buffer channel
+ * @src: start of input to copy from
+ * @len: length of record
+ *
+ * The record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+int ring_buffer_global_overwrite_write(struct channel *chan, const void *src,
+				       size_t len)
+{
+	return ring_buffer_global_write(&global_overwrite_config, chan, src,
+					len);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_global_overwrite_write);
+
+/**
+ * ring_buffer_global_discard_write - writes a record into the ring buffer.
+ * @chan: ring buffer channel
+ * @src: start of input to copy from
+ * @len: length of record
+ *
+ * The record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+int ring_buffer_global_discard_write(struct channel *chan, const void *src,
+				     size_t len)
+{
+	return ring_buffer_global_write(&global_discard_config, chan, src, len);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_global_discard_write);
+
+MODULE_LICENSE("GPL and additional rights");
+MODULE_AUTHOR("Mathieu Desnoyers");
+MODULE_DESCRIPTION("Ring Buffer Library Global Client");
Index: linux.trees.git/lib/ringbuffer/ring_buffer_percpu.c
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/lib/ringbuffer/ring_buffer_percpu.c	2010-07-09 18:35:16.000000000 -0400
@@ -0,0 +1,334 @@
+/*
+ * ring_buffer_percpu.c
+ *
+ * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Per-CPU ring buffer library implementation.
+ * Creates instances of both overwrite and discard modes.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/types.h>
+#include <linux/module.h>
+#include <linux/ringbuffer/config.h>
+#include <linux/ringbuffer/percpu_overwrite.h>
+#include <linux/ringbuffer/percpu_discard.h>
+#include <linux/ringbuffer/vfs.h>
+#include <linux/prio_heap.h>
+#include <linux/trace_clock.h>
+
+struct subbuffer_header {
+	uint8_t header_end[0];		/* End of header */
+};
+
+struct record_header {
+	uint64_t timestamp;		/* Record timestamp */
+	uint32_t len;			/* Size of record payload */
+	uint8_t header_end[0];		/* End of header */
+};
+
+static inline
+u64 ring_buffer_clock_read(struct channel *chan)
+{
+	return trace_clock();
+}
+
+static inline
+unsigned char record_header_size(const struct ring_buffer_config *config,
+				 struct channel *chan, size_t offset,
+				 size_t data_size, size_t *pre_header_padding,
+				 unsigned int rflags,
+				 struct ring_buffer_ctx *ctx)
+{
+	return offsetof(struct record_header, header_end);
+}
+
+#include <linux/ringbuffer/api.h>
+
+static
+u64 client_ring_buffer_clock_read(struct channel *chan)
+{
+	return ring_buffer_clock_read(chan);
+}
+
+static
+size_t client_record_header_size(const struct ring_buffer_config *config,
+				 struct channel *chan, size_t offset,
+				 size_t data_size,
+				 size_t *pre_header_padding,
+				 unsigned int rflags,
+				 struct ring_buffer_ctx *ctx)
+{
+	return record_header_size(config, chan, offset, data_size,
+				  pre_header_padding, rflags, ctx);
+}
+
+static
+size_t client_subbuffer_header_size(void)
+{
+	return offsetof(struct subbuffer_header, header_end);
+}
+
+static
+void client_buffer_begin(struct ring_buffer *buf, u64 tsc,
+			 unsigned int subbuf_idx)
+{
+}
+
+static
+void client_buffer_end(struct ring_buffer *buf, u64 tsc,
+		       unsigned int subbuf_idx, unsigned long data_size)
+{
+}
+
+static
+int client_buffer_create(struct ring_buffer *buf, void *priv,
+			 int cpu, const char *name)
+{
+	return 0;
+}
+
+static
+void client_buffer_finalize(struct ring_buffer *buf, void *priv, int cpu)
+{
+}
+
+static
+void client_record_get(const struct ring_buffer_config *config,
+		       struct channel *chan, struct ring_buffer *buf,
+		       size_t offset, size_t *header_len,
+		       size_t *payload_len, u64 *timestamp)
+{
+	int ret;
+	struct record_header header;
+
+	ret = ring_buffer_read(&buf->backend, offset, &header,
+			       offsetof(struct record_header, header_end));
+	CHAN_WARN_ON(chan, ret != offsetof(struct record_header, header_end));
+	*header_len = offsetof(struct record_header, header_end);
+	*payload_len = header.len;
+	*timestamp = header.timestamp;
+}
+
+/*
+ * Typically 8 subbuffers of variable size per CPU. We allocate more than 2
+ * subbuffers per cpu to provide room for the merge-sort.
+ * Maximum subbuffer size is 4GB. Allocate more subbuffers if more space is
+ * requested.
+ * Periodical buffer switch deferrable timer is set to 100ms. This will wake up
+ * blocking reads when partially filled subbuffers are ready for reading.
+ * Periodical reader wakeup delivery timer is disabled. It is useless because
+ * RING_BUFFER_WAKEUP_BY_WRITER is set.
+ */
+#define SP_SUBBUF_NUM_ORDER	3
+#define SP_SUBBUF_NUM		(1 << SP_SUBBUF_NUM_ORDER)
+#define SP_SWITCH_INTERVAL_MS	100U
+#define SP_SWITCH_INTERVAL_US	(SP_SWITCH_INTERVAL_MS * 1000)
+#define SP_READ_INTERVAL_US	0
+#define SP_U32_MAX		4294967295U	/* 2^32 - 1 */
+
+static const struct ring_buffer_config percpu_overwrite_config = {
+	.cb.ring_buffer_clock_read = client_ring_buffer_clock_read,
+	.cb.record_header_size = client_record_header_size,
+	.cb.subbuffer_header_size = client_subbuffer_header_size,
+	.cb.buffer_begin = client_buffer_begin,
+	.cb.buffer_end = client_buffer_end,
+	.cb.buffer_create = client_buffer_create,
+	.cb.buffer_finalize = client_buffer_finalize,
+	.cb.record_get = client_record_get,
+
+	.tsc_bits = 64,
+	.alloc = RING_BUFFER_ALLOC_PER_CPU,
+	.sync = RING_BUFFER_SYNC_PER_CPU,
+	.mode = RING_BUFFER_OVERWRITE,
+	.align = RING_BUFFER_PACKED,
+	.backend = RING_BUFFER_PAGE,
+	.output = RING_BUFFER_ITERATOR,
+	.oops = RING_BUFFER_NO_OOPS_CONSISTENCY,
+	.ipi = RING_BUFFER_IPI_BARRIER,
+	.wakeup = RING_BUFFER_WAKEUP_BY_WRITER,
+};
+
+static const struct ring_buffer_config percpu_discard_config = {
+	.cb.ring_buffer_clock_read = client_ring_buffer_clock_read,
+	.cb.record_header_size = client_record_header_size,
+	.cb.subbuffer_header_size = client_subbuffer_header_size,
+	.cb.buffer_begin = client_buffer_begin,
+	.cb.buffer_end = client_buffer_end,
+	.cb.buffer_create = client_buffer_create,
+	.cb.buffer_finalize = client_buffer_finalize,
+	.cb.record_get = client_record_get,
+
+	.tsc_bits = 64,
+	.alloc = RING_BUFFER_ALLOC_PER_CPU,
+	.sync = RING_BUFFER_SYNC_PER_CPU,
+	.mode = RING_BUFFER_DISCARD,
+	.align = RING_BUFFER_PACKED,
+	.backend = RING_BUFFER_PAGE,
+	.output = RING_BUFFER_ITERATOR,
+	.oops = RING_BUFFER_NO_OOPS_CONSISTENCY,
+	.ipi = RING_BUFFER_IPI_BARRIER,
+	.wakeup = RING_BUFFER_WAKEUP_BY_WRITER,
+};
+
+/* Wrapper library API */
+
+static
+struct channel *ring_buffer_percpu_create(
+				const struct ring_buffer_config *config,
+				size_t buf_size)
+{
+	size_t subbuf_size, subbuf_size_order;
+	unsigned int subbuf_num = SP_SUBBUF_NUM;
+
+	/* Typically use 8 subbuffers, minimum of PAGE_SIZE size each */
+	buf_size = max_t(size_t, buf_size, PAGE_SIZE << SP_SUBBUF_NUM_ORDER);
+	subbuf_size = buf_size >> SP_SUBBUF_NUM_ORDER;
+	/*
+	 * Ensure the event payload size fits on u32 event header.
+	 * Maximum subbuffer size is therefore 4GB.
+	 */
+	subbuf_size = min_t(size_t, SP_U32_MAX, subbuf_size);
+
+	/* Allocate more than 8 subbuffers if necessary. */
+	if (subbuf_size < (buf_size >> SP_SUBBUF_NUM_ORDER)) {
+		subbuf_size_order = get_count_order(subbuf_size);
+		subbuf_num = buf_size >> subbuf_size_order;
+	}
+
+	return channel_create(config, "sp", NULL, NULL,
+			      subbuf_size, subbuf_num,
+			      SP_SWITCH_INTERVAL_US, SP_READ_INTERVAL_US);
+}
+
+/**
+ * ring_buffer_percpu_overwrite_create - creates a per-cpu overwrite ring
+ *                                       buffer.
+ * @buf_size: the buffer size
+ *
+ * Returns the ring buffer channel address on success, NULL on error.
+ */
+struct channel *ring_buffer_percpu_overwrite_create(size_t buf_size)
+{
+	return ring_buffer_percpu_create(&percpu_overwrite_config, buf_size);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_percpu_overwrite_create);
+
+/**
+ * ring_buffer_percpu_discard_create - creates a per-cpu discard ring buffer.
+ * @buf_size: the buffer size
+ *
+ * Returns the ring buffer channel address on success, NULL on error.
+ */
+
+struct channel *ring_buffer_percpu_discard_create(size_t buf_size)
+{
+	return ring_buffer_percpu_create(&percpu_discard_config, buf_size);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_percpu_discard_create);
+
+static
+void ring_buffer_percpu_destroy(struct channel *chan)
+{
+	channel_destroy(chan);
+}
+
+/**
+ * ring_buffer_percpu_overwrite_destroy - deletes a per-cpu
+ *                                        overwrite ring buffer.
+ * @chan: ring buffer channel
+ */
+void ring_buffer_percpu_overwrite_destroy(struct channel *chan)
+{
+	ring_buffer_percpu_destroy(chan);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_percpu_overwrite_destroy);
+
+/**
+ * ring_buffer_percpu_discard_destroy - deletes a per-cpu
+ *                                      discard ring buffer.
+ * @chan: ring buffer channel
+ */
+void ring_buffer_percpu_discard_destroy(struct channel *chan)
+{
+	ring_buffer_percpu_destroy(chan);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_percpu_discard_destroy);
+
+/**
+ * ring_buffer_percpu_write - writes a record into the ring buffer.
+ * @chan: ring buffer channel
+ * @src: start of input to copy from
+ * @len: length of record
+ *
+ * The record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+static
+int ring_buffer_percpu_write(const struct ring_buffer_config *config,
+			     struct channel *chan, const void *src, size_t len)
+{
+	struct percpu_private *priv = channel_get_private(chan);
+	struct record_header header;
+	struct ring_buffer_ctx ctx;
+	int ret, cpu;
+
+	cpu = ring_buffer_get_cpu(config);
+	if (cpu < 0) {
+		ret = cpu;
+		goto end;
+	}
+	ring_buffer_ctx_init(&ctx, chan, priv, len, 0, cpu);
+	ret = ring_buffer_reserve(config, &ctx);
+	if (ret)
+		goto put;
+	header.timestamp = ctx.tsc;
+	header.len = len;
+	ring_buffer_write(config, &ctx, &header,
+			  offsetof(struct record_header, header_end));
+	ring_buffer_write(config, &ctx, src, len);
+	ring_buffer_commit(config, &ctx);
+put:
+	ring_buffer_put_cpu(config);
+end:
+	return ret;
+}
+
+/**
+ * ring_buffer_percpu_overwrite_write - writes a record into the ring buffer.
+ * @chan: ring buffer channel
+ * @src: start of input to copy from
+ * @len: length of record
+ *
+ * The record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+int ring_buffer_percpu_overwrite_write(struct channel *chan, const void *src,
+				       size_t len)
+{
+	return ring_buffer_percpu_write(&percpu_overwrite_config, chan, src,
+					len);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_percpu_overwrite_write);
+
+/**
+ * ring_buffer_percpu_discard_write - writes a record into the ring buffer.
+ * @chan: ring buffer channel
+ * @src: start of input to copy from
+ * @len: length of record
+ *
+ * The record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+int ring_buffer_percpu_discard_write(struct channel *chan, const void *src,
+				     size_t len)
+{
+	return ring_buffer_percpu_write(&percpu_discard_config, chan, src,
+					len);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_percpu_discard_write);
+
+MODULE_LICENSE("GPL and additional rights");
+MODULE_AUTHOR("Mathieu Desnoyers");
+MODULE_DESCRIPTION("Ring Buffer Library Per-CPU Client");
Index: linux.trees.git/lib/ringbuffer/ring_buffer_percpu_local.c
===================================================================
--- /dev/null	1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/lib/ringbuffer/ring_buffer_percpu_local.c	2010-07-09 18:35:16.000000000 -0400
@@ -0,0 +1,371 @@
+/*
+ * ring_buffer_percpu_local.c
+ *
+ * Copyright (C) 2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * Per-CPU ring buffer library implementation.
+ * Creates instances of both overwrite and discard modes.
+ * Presents per-cpu-buffer iterators.
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/types.h>
+#include <linux/module.h>
+#include <linux/ringbuffer/config.h>
+#include <linux/ringbuffer/percpu_local_overwrite.h>
+#include <linux/ringbuffer/percpu_local_discard.h>
+#include <linux/ringbuffer/vfs.h>
+#include <linux/prio_heap.h>
+
+struct channel_priv {
+	/* Returns 0 on success */
+	int (*on_buffer_create)(struct ring_buffer *buf, int cpu);
+	void (*on_buffer_finalize)(struct ring_buffer *buf, int cpu);
+};
+
+struct subbuffer_header {
+	uint8_t header_end[0];		/* End of header */
+};
+
+struct record_header {
+	uint32_t len;			/* Size of record payload */
+	uint8_t header_end[0];		/* End of header */
+};
+
+static inline
+u64 ring_buffer_clock_read(struct channel *chan)
+{
+	return 0;
+}
+
+static inline
+unsigned char record_header_size(const struct ring_buffer_config *config,
+				 struct channel *chan, size_t offset,
+				 size_t data_size, size_t *pre_header_padding,
+				 unsigned int rflags,
+				 struct ring_buffer_ctx *ctx)
+{
+	return offsetof(struct record_header, header_end);
+}
+
+#include <linux/ringbuffer/api.h>
+
+static
+u64 client_ring_buffer_clock_read(struct channel *chan)
+{
+	return ring_buffer_clock_read(chan);
+}
+
+static
+size_t client_record_header_size(const struct ring_buffer_config *config,
+				 struct channel *chan, size_t offset,
+				 size_t data_size,
+				 size_t *pre_header_padding,
+				 unsigned int rflags,
+				 struct ring_buffer_ctx *ctx)
+{
+	return record_header_size(config, chan, offset, data_size,
+				  pre_header_padding, rflags, ctx);
+}
+
+static
+size_t client_subbuffer_header_size(void)
+{
+	return offsetof(struct subbuffer_header, header_end);
+}
+
+static
+void client_buffer_begin(struct ring_buffer *buf, u64 tsc,
+			 unsigned int subbuf_idx)
+{
+}
+
+static
+void client_buffer_end(struct ring_buffer *buf, u64 tsc,
+		       unsigned int subbuf_idx, unsigned long data_size)
+{
+}
+
+static
+int client_buffer_create(struct ring_buffer *buf, void *priv,
+			 int cpu, const char *name)
+{
+	struct channel_priv *chan_priv = priv;
+	return chan_priv->on_buffer_create(buf, cpu);
+}
+
+static
+void client_buffer_finalize(struct ring_buffer *buf, void *priv, int cpu)
+{
+	struct channel_priv *chan_priv = priv;
+	chan_priv->on_buffer_finalize(buf, cpu);
+}
+
+static
+void client_record_get(const struct ring_buffer_config *config,
+		       struct channel *chan, struct ring_buffer *buf,
+		       size_t offset, size_t *header_len,
+		       size_t *payload_len, u64 *timestamp)
+{
+	int ret;
+	struct record_header header;
+
+	ret = ring_buffer_read(&buf->backend, offset, &header,
+			       offsetof(struct record_header, header_end));
+	CHAN_WARN_ON(chan, ret != offsetof(struct record_header, header_end));
+	*header_len = offsetof(struct record_header, header_end);
+	*payload_len = header.len;
+	/* Timestamp is left unset. We don't use channel iterators. */
+}
+
+/*
+ * Typically 8 subbuffers of variable size per CPU.
+ * Maximum subbuffer size is 4GB. Allocate more subbuffers if more space is
+ * requested.
+ * Periodical buffer switch deferrable timer is set to 100ms. This will wake up
+ * blocking reads when partially filled subbuffers are ready for reading.
+ * Periodical reader wakeup delivery timer is disabled. It is useless because
+ * RING_BUFFER_WAKEUP_BY_WRITER is set.
+ */
+#define SP_SUBBUF_NUM_ORDER	3
+#define SP_SUBBUF_NUM		(1 << SP_SUBBUF_NUM_ORDER)
+#define SP_SWITCH_INTERVAL_MS	100U
+#define SP_SWITCH_INTERVAL_US	(SP_SWITCH_INTERVAL_MS * 1000)
+#define SP_READ_INTERVAL_US	0
+#define SP_U32_MAX		4294967295U	/* 2^32 - 1 */
+
+static const struct ring_buffer_config percpu_local_overwrite_config = {
+	.cb.ring_buffer_clock_read = client_ring_buffer_clock_read,
+	.cb.record_header_size = client_record_header_size,
+	.cb.subbuffer_header_size = client_subbuffer_header_size,
+	.cb.buffer_begin = client_buffer_begin,
+	.cb.buffer_end = client_buffer_end,
+	.cb.buffer_create = client_buffer_create,
+	.cb.buffer_finalize = client_buffer_finalize,
+	.cb.record_get = client_record_get,
+
+	.tsc_bits = 64,
+	.alloc = RING_BUFFER_ALLOC_PER_CPU,
+	.sync = RING_BUFFER_SYNC_PER_CPU,
+	.mode = RING_BUFFER_OVERWRITE,
+	.align = RING_BUFFER_PACKED,
+	.backend = RING_BUFFER_PAGE,
+	.output = RING_BUFFER_ITERATOR,
+	.oops = RING_BUFFER_NO_OOPS_CONSISTENCY,
+	.ipi = RING_BUFFER_IPI_BARRIER,
+	.wakeup = RING_BUFFER_WAKEUP_BY_WRITER,
+};
+
+static const struct ring_buffer_config percpu_local_discard_config = {
+	.cb.ring_buffer_clock_read = client_ring_buffer_clock_read,
+	.cb.record_header_size = client_record_header_size,
+	.cb.subbuffer_header_size = client_subbuffer_header_size,
+	.cb.buffer_begin = client_buffer_begin,
+	.cb.buffer_end = client_buffer_end,
+	.cb.buffer_create = client_buffer_create,
+	.cb.buffer_finalize = client_buffer_finalize,
+	.cb.record_get = client_record_get,
+
+	.tsc_bits = 64,
+	.alloc = RING_BUFFER_ALLOC_PER_CPU,
+	.sync = RING_BUFFER_SYNC_PER_CPU,
+	.mode = RING_BUFFER_DISCARD,
+	.align = RING_BUFFER_PACKED,
+	.backend = RING_BUFFER_PAGE,
+	.output = RING_BUFFER_ITERATOR,
+	.oops = RING_BUFFER_NO_OOPS_CONSISTENCY,
+	.ipi = RING_BUFFER_IPI_BARRIER,
+	.wakeup = RING_BUFFER_WAKEUP_BY_WRITER,
+};
+
+/* Wrapper library API */
+
+static
+struct channel *ring_buffer_spl_create(const struct ring_buffer_config *config,
+		size_t buf_size,
+		int (*on_buffer_create)(struct ring_buffer *buf, int cpu),
+		void (*on_buffer_finalize)(struct ring_buffer *buf, int cpu))
+{
+	struct channel *chan;
+	size_t subbuf_size, subbuf_size_order;
+	unsigned int subbuf_num = SP_SUBBUF_NUM;
+	struct channel_priv *priv;
+
+	/* Typically use 8 subbuffers, minimum of PAGE_SIZE size each */
+	buf_size = max_t(size_t, buf_size, PAGE_SIZE << SP_SUBBUF_NUM_ORDER);
+	subbuf_size = buf_size >> SP_SUBBUF_NUM_ORDER;
+	/*
+	 * Ensure the event payload size fits on u32 event header.
+	 * Maximum subbuffer size is therefore 4GB.
+	 */
+	subbuf_size = min_t(size_t, SP_U32_MAX, subbuf_size);
+
+	/* Allocate more than 8 subbuffers if necessary. */
+	if (subbuf_size < (buf_size >> SP_SUBBUF_NUM_ORDER)) {
+		subbuf_size_order = get_count_order(subbuf_size);
+		subbuf_num = buf_size >> subbuf_size_order;
+	}
+
+	priv = kmalloc(sizeof(*priv), GFP_KERNEL);
+	if (!priv)
+		return NULL;
+	priv->on_buffer_create = on_buffer_create;
+	priv->on_buffer_finalize = on_buffer_finalize;
+
+	chan = channel_create(config, "spl", priv, NULL,
+			      subbuf_size, subbuf_num,
+			      SP_SWITCH_INTERVAL_US, SP_READ_INTERVAL_US);
+	if (!chan)
+		goto free_priv;
+	return chan;
+
+free_priv:
+	kfree(priv);
+	return NULL;
+}
+
+/**
+ * ring_buffer_percpu_local_overwrite_create - creates a per-cpu overwrite
+*                                              ring buffer.
+ * @buf_size: the buffer size
+ * @on_buffer_create: callback to be called on per-cpu buffer creation
+ * @on_buffer_finalize: callback to be called on per-cpu buffer finalize
+ *
+ * Returns the ring buffer channel address on success, NULL on error.
+ */
+struct channel *ring_buffer_percpu_local_overwrite_create(size_t buf_size,
+		int (*on_buffer_create)(struct ring_buffer *buf, int cpu),
+		void (*on_buffer_finalize)(struct ring_buffer *buf, int cpu))
+
+{
+	return ring_buffer_spl_create(&percpu_local_overwrite_config, buf_size,
+				      on_buffer_create, on_buffer_finalize);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_percpu_local_overwrite_create);
+
+/**
+ * ring_buffer_percpu_local_discard_create - creates a per-cpu discard ring
+ *                                           buffer.
+ * @buf_size: the buffer size
+ * @on_buffer_create: callback to be called on per-cpu buffer creation
+ * @on_buffer_finalize: callback to be called on per-cpu buffer finalize
+ *
+ * Returns the ring buffer channel address on success, NULL on error.
+ */
+
+struct channel *ring_buffer_percpu_local_discard_create(size_t buf_size,
+		int (*on_buffer_create)(struct ring_buffer *buf, int cpu),
+		void (*on_buffer_finalize)(struct ring_buffer *buf, int cpu))
+{
+	return ring_buffer_spl_create(&percpu_local_discard_config, buf_size,
+				      on_buffer_create, on_buffer_finalize);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_percpu_local_discard_create);
+
+static
+void ring_buffer_percpu_local_destroy(struct channel *chan)
+{
+	struct channel_priv *priv;
+
+	priv = channel_destroy(chan);
+	kfree(priv);
+}
+
+/**
+ * ring_buffer_percpu_local_overwrite_destroy - deletes a per-cpu overwrite
+ *                                              ring buffer.
+ * @chan: ring buffer channel
+ */
+void ring_buffer_percpu_local_overwrite_destroy(struct channel *chan)
+{
+	ring_buffer_percpu_local_destroy(chan);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_percpu_local_overwrite_destroy);
+/**
+ * ring_buffer_percpu_local_discard_destroy - deletes a per-cpu discard
+ *                                            ring buffer.
+ * @chan: ring buffer channel
+ */
+void ring_buffer_percpu_local_discard_destroy(struct channel *chan)
+{
+	ring_buffer_percpu_local_destroy(chan);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_percpu_local_discard_destroy);
+
+/**
+ * ring_buffer_percpu_write - writes a record into the ring buffer.
+ * @chan: ring buffer channel
+ * @src: start of input to copy from
+ * @len: length of record
+ *
+ * The record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+static
+int ring_buffer_percpu_write(const struct ring_buffer_config *config,
+			 struct channel *chan, const void *src, size_t len)
+{
+	struct percpu_private *priv = channel_get_private(chan);
+	struct record_header header;
+	struct ring_buffer_ctx ctx;
+	int ret, cpu;
+
+	cpu = ring_buffer_get_cpu(config);
+	if (cpu < 0) {
+		ret = cpu;
+		goto end;
+	}
+	ring_buffer_ctx_init(&ctx, chan, priv, len, 0, cpu);
+	ret = ring_buffer_reserve(config, &ctx);
+	if (ret)
+		goto put;
+	header.len = len;
+	ring_buffer_write(config, &ctx, &header,
+			  offsetof(struct record_header, header_end));
+	ring_buffer_write(config, &ctx, src, len);
+	ring_buffer_commit(config, &ctx);
+put:
+	ring_buffer_put_cpu(config);
+end:
+	return ret;
+}
+
+/**
+ * ring_buffer_percpu_local_overwrite_write - writes a record into the ring
+ *                                            buffer.
+ * @chan: ring buffer channel
+ * @src: start of input to copy from
+ * @len: length of record
+ *
+ * The record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+int ring_buffer_percpu_local_overwrite_write(struct channel *chan,
+					     const void *src, size_t len)
+{
+	return ring_buffer_percpu_write(&percpu_local_overwrite_config, chan,
+					src, len);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_percpu_local_overwrite_write);
+
+/**
+ * ring_buffer_percpu_local_discard_write - writes a record into the ring buffer.
+ * @chan: ring buffer channel
+ * @src: start of input to copy from
+ * @len: length of record
+ *
+ * The record starts at the "src" address and is "len" bytes long. Returns 0 on
+ * success, else it returns a negative error value.
+ */
+int ring_buffer_percpu_local_discard_write(struct channel *chan,
+					   const void *src, size_t len)
+{
+	return ring_buffer_percpu_write(&percpu_local_discard_config, chan, src,
+					len);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_percpu_local_discard_write);
+
+MODULE_LICENSE("GPL and additional rights");
+MODULE_AUTHOR("Mathieu Desnoyers");
+MODULE_DESCRIPTION("Ring Buffer Library Per-CPU Local Client");


  parent reply	other threads:[~2010-07-09 23:39 UTC|newest]

Thread overview: 25+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2010-07-09 22:57 [patch 00/20] Generic Ring Buffer Library Mathieu Desnoyers
2010-07-09 22:57 ` [patch 01/20] Create generic alignment API (v8) Mathieu Desnoyers
2010-07-09 22:57   ` Mathieu Desnoyers
2010-08-06 11:41   ` Alexander Shishkin
2010-08-06 14:48     ` Mathieu Desnoyers
2010-08-06 14:48       ` Mathieu Desnoyers
2010-07-09 22:57 ` [patch 02/20] notifier atomic call chain notrace Mathieu Desnoyers
2010-07-09 22:57 ` [patch 03/20] idle notifier standardization Mathieu Desnoyers
2010-07-09 22:57 ` [patch 04/20] idle notifier standardization x86_32 Mathieu Desnoyers
2010-07-09 22:57 ` [patch 05/20] Poll : add poll_wait_set_exclusive Mathieu Desnoyers
2010-07-09 22:57 ` [patch 06/20] prio_heap: heap_remove(), heap_maximum(), heap_replace() and heap_cherrypick() Mathieu Desnoyers
2010-07-09 22:57 ` [patch 07/20] kthread_kill_stop() Mathieu Desnoyers
2010-07-09 22:57 ` [patch 08/20] inline memcpy Mathieu Desnoyers
2010-07-09 22:57 ` [patch 09/20] x86 " Mathieu Desnoyers
2010-07-09 22:57 ` [patch 10/20] Trace clock - build standalone Mathieu Desnoyers
2010-07-09 22:57 ` [patch 11/20] Ftrace ring buffer renaming Mathieu Desnoyers
2010-07-09 22:57 ` [patch 12/20] ring buffer backend Mathieu Desnoyers
2010-07-09 22:57 ` [patch 13/20] ring buffer frontend Mathieu Desnoyers
2010-07-09 22:57 ` [patch 14/20] Ring buffer library - documentation Mathieu Desnoyers
2010-07-09 22:57 ` [patch 15/20] Ring buffer library - VFS operations Mathieu Desnoyers
2010-07-09 22:57 ` [patch 16/20] Ring buffer library - client sample Mathieu Desnoyers
2010-07-09 22:57 ` [patch 17/20] Ring buffer benchmark library Mathieu Desnoyers
2010-07-09 22:57 ` [patch 18/20] Ring Buffer Record Iterator Mathieu Desnoyers
2010-07-09 22:57 ` Mathieu Desnoyers [this message]
2010-07-09 22:57 ` [patch 20/20] Ring buffer: benchmark simple API Mathieu Desnoyers

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20100709225819.667435403@efficios.com \
    --to=mathieu.desnoyers@efficios.com \
    --cc=acme@infradead.org \
    --cc=akpm@linux-foundation.org \
    --cc=andi@firstfloor.org \
    --cc=fweisbec@gmail.com \
    --cc=hch@lst.de \
    --cc=johannes.berg@intel.com \
    --cc=kosaki.motohiro@jp.fujitsu.com \
    --cc=laijs@cn.fujitsu.com \
    --cc=linux-kernel@vger.kernel.org \
    --cc=lizf@cn.fujitsu.com \
    --cc=masami.hiramatsu.pt@hitachi.com \
    --cc=mingo@elte.hu \
    --cc=peterz@infradead.org \
    --cc=rostedt@goodmis.org \
    --cc=tglx@linutronix.de \
    --cc=torvalds@linux-foundation.org \
    --cc=tzanussi@gmail.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.