All of lore.kernel.org
 help / color / mirror / Atom feed
* [PATCH] PPP: Optimize write buffer management
@ 2011-03-01 15:07 Patrick Porlan
  2011-03-02  3:47 ` Denis Kenzior
  0 siblings, 1 reply; 8+ messages in thread
From: Patrick Porlan @ 2011-03-01 15:07 UTC (permalink / raw)
  To: ofono

[-- Attachment #1: Type: text/plain, Size: 9327 bytes --]

Replace modulo operations in ringbuffer.c by masking operations.
That's possible because the size of the ring buffers is always
a power of two, and yields a small performance improvement.

More importantly, extend the write buffer handling in gathdlc.c
to minimize stalling and process switching during large PPP transfers.
The single write buffer is replaced by a circular queue of buffers,
allowing for much larger emission windows without hugely impacting
memory consumption. This reduces the time required to send 50 MB
between a couple of local PPP interfaces on my laptop from 53s to 2.6s.
---
 gatchat/gathdlc.c    |   85 +++++++++++++++++++++++++++++++++++++-------------
 gatchat/ringbuffer.c |   14 +++++---
 2 files changed, 71 insertions(+), 28 deletions(-)

diff --git a/gatchat/gathdlc.c b/gatchat/gathdlc.c
index dd11043..9cc6a74 100644
--- a/gatchat/gathdlc.c
+++ b/gatchat/gathdlc.c
@@ -37,7 +37,9 @@
 #include "gatio.h"
 #include "gathdlc.h"
 
-#define BUFFER_SIZE 2048
+#define BUFFER_SIZE	4096
+#define MAX_BUFFERS	64	/* Maximum number of in-flight write buffers: this needs to be a power of 2 */
+#define HDLC_OVERHEAD	256	/* Rough estimate of HDLC protocol overhead for a frame */
 
 #define HDLC_FLAG	0x7e	/* Flag sequence */
 #define HDLC_ESCAPE	0x7d	/* Asynchronous control escape */
@@ -51,7 +53,9 @@
 struct _GAtHDLC {
 	gint ref_count;
 	GAtIO *io;
-	struct ring_buffer *write_buffer;
+	struct ring_buffer *wbuf_ring[MAX_BUFFERS]; /* Array of pointers to circular write buffers - itself managed as a ring */
+	gint wbuf_head; /* First unsent buffer index */
+	gint wbuf_tail; /* Last unsent buffer index */
 	unsigned char *decode_buffer;
 	guint decode_offset;
 	guint16 decode_fcs;
@@ -190,6 +194,7 @@ GAtHDLC *g_at_hdlc_new_from_io(GAtIO *io)
 {
 	GAtHDLC *hdlc;
 	unsigned char *buf;
+	struct ring_buffer* write_buffer;
 
 	if (io == NULL)
 		return NULL;
@@ -207,16 +212,19 @@ GAtHDLC *g_at_hdlc_new_from_io(GAtIO *io)
 	hdlc->xmit_accm[3] = 0x60000000; /* 0x7d, 0x7e */
 	hdlc->recv_accm = ~0U;
 
-	hdlc->write_buffer = ring_buffer_new(BUFFER_SIZE * 2);
-	if (!hdlc->write_buffer)
+	write_buffer = ring_buffer_new(BUFFER_SIZE);
+	if (!write_buffer)
 		goto error;
 
+	hdlc->wbuf_head = hdlc->wbuf_tail = 0;
+	hdlc->wbuf_ring[0] = write_buffer;
+
 	/* Write an initial 0x7e as wakeup character */
-	buf = ring_buffer_write_ptr(hdlc->write_buffer, 0);
+	buf = ring_buffer_write_ptr(write_buffer, 0);
 	*buf = HDLC_FLAG;
-	ring_buffer_write_advance(hdlc->write_buffer, 1);
+	ring_buffer_write_advance(write_buffer, 1);
 
-	hdlc->decode_buffer = g_try_malloc(BUFFER_SIZE * 2);
+	hdlc->decode_buffer = g_try_malloc(BUFFER_SIZE);
 	if (!hdlc->decode_buffer)
 		goto error;
 
@@ -228,8 +236,8 @@ GAtHDLC *g_at_hdlc_new_from_io(GAtIO *io)
 	return hdlc;
 
 error:
-	if (hdlc->write_buffer)
-		ring_buffer_free(hdlc->write_buffer);
+	if (write_buffer)
+	    ring_buffer_free(write_buffer);
 
 	if (hdlc->decode_buffer)
 		g_free(hdlc->decode_buffer);
@@ -266,6 +274,8 @@ GAtHDLC *g_at_hdlc_ref(GAtHDLC *hdlc)
 
 void g_at_hdlc_unref(GAtHDLC *hdlc)
 {
+	int i;
+
 	if (hdlc == NULL)
 		return;
 
@@ -280,7 +290,11 @@ void g_at_hdlc_unref(GAtHDLC *hdlc)
 	g_at_io_unref(hdlc->io);
 	hdlc->io = NULL;
 
-	ring_buffer_free(hdlc->write_buffer);
+	for (i = hdlc->wbuf_head; i != hdlc->wbuf_tail; i = (i+1) % MAX_BUFFERS)
+	    ring_buffer_free(hdlc->wbuf_ring[i]);
+
+	ring_buffer_free(hdlc->wbuf_ring[i]);
+
 	g_free(hdlc->decode_buffer);
 
 	if (hdlc->in_read_handler)
@@ -314,17 +328,26 @@ static gboolean can_write_data(gpointer data)
 	unsigned int len;
 	unsigned char *buf;
 	gsize bytes_written;
+	struct ring_buffer* write_buffer = hdlc->wbuf_ring[hdlc->wbuf_head];
 
-	len = ring_buffer_len_no_wrap(hdlc->write_buffer);
-	buf = ring_buffer_read_ptr(hdlc->write_buffer, 0);
+	len = ring_buffer_len_no_wrap(write_buffer);
+	buf = ring_buffer_read_ptr(write_buffer, 0);
 
 	bytes_written = g_at_io_write(hdlc->io, (gchar *) buf, len);
 	hdlc_record(hdlc->record_fd, FALSE, buf, bytes_written);
-	ring_buffer_drain(hdlc->write_buffer, bytes_written);
+	ring_buffer_drain(write_buffer, bytes_written);
 
-	if (ring_buffer_len(hdlc->write_buffer) > 0)
+	if (ring_buffer_len(write_buffer) > 0)
 		return TRUE;
 
+	/* We're done with this buffer : adjust head if there is at least another buffer in flight */
+	if (hdlc->wbuf_head != hdlc->wbuf_tail) {
+		ring_buffer_free(write_buffer);
+		hdlc->wbuf_head = (hdlc->wbuf_head + 1) % MAX_BUFFERS;
+
+		return TRUE;
+	}
+
 	return FALSE;
 }
 
@@ -356,19 +379,37 @@ GAtIO *g_at_hdlc_get_io(GAtHDLC *hdlc)
 
 gboolean g_at_hdlc_send(GAtHDLC *hdlc, const unsigned char *data, gsize size)
 {
-	unsigned int avail = ring_buffer_avail(hdlc->write_buffer);
-	unsigned int wrap = ring_buffer_avail_no_wrap(hdlc->write_buffer);
-	unsigned char *buf = ring_buffer_write_ptr(hdlc->write_buffer, 0);
+	struct ring_buffer* write_buffer = hdlc->wbuf_ring[hdlc->wbuf_tail];
+	unsigned int avail = ring_buffer_avail(write_buffer);
+	unsigned int wrap = ring_buffer_avail_no_wrap(write_buffer);
+	unsigned char *buf;
 	unsigned char tail[2];
 	unsigned int i = 0;
 	guint16 fcs = HDLC_INITFCS;
 	gboolean escape = FALSE;
 	gsize pos = 0;
 
-	if (avail < size)
-		return FALSE;
+	if (avail < size + HDLC_OVERHEAD) {
+		/* Switch to a new buffer */
+
+		if ((hdlc->wbuf_tail + 1) % MAX_BUFFERS == hdlc->wbuf_head)
+			return FALSE;	/* Ring full */
+
+		write_buffer = ring_buffer_new(BUFFER_SIZE);
+
+		if (!write_buffer)
+			return FALSE;
+
+		hdlc->wbuf_tail = (hdlc->wbuf_tail + 1) % MAX_BUFFERS;
+		hdlc->wbuf_ring[hdlc->wbuf_tail] = write_buffer;
+
+		avail = ring_buffer_avail(write_buffer);
+		wrap = ring_buffer_avail_no_wrap(write_buffer);
+		buf = ring_buffer_write_ptr(write_buffer, 0);
+	}
 
 	i = 0;
+	buf = ring_buffer_write_ptr(write_buffer, 0);
 
 	while (pos < avail && i < size) {
 		if (escape == TRUE) {
@@ -387,7 +428,7 @@ gboolean g_at_hdlc_send(GAtHDLC *hdlc, const unsigned char *data, gsize size)
 		pos++;
 
 		if (pos == wrap)
-			buf = ring_buffer_write_ptr(hdlc->write_buffer, pos);
+			buf = ring_buffer_write_ptr(write_buffer, pos);
 	}
 
 	if (i < size)
@@ -414,7 +455,7 @@ gboolean g_at_hdlc_send(GAtHDLC *hdlc, const unsigned char *data, gsize size)
 		pos++;
 
 		if (pos == wrap)
-			buf = ring_buffer_write_ptr(hdlc->write_buffer, pos);
+		    buf = ring_buffer_write_ptr(write_buffer, pos);
 	}
 
 	if (i < sizeof(tail))
@@ -426,7 +467,7 @@ gboolean g_at_hdlc_send(GAtHDLC *hdlc, const unsigned char *data, gsize size)
 	*buf = HDLC_FLAG;
 	pos++;
 
-	ring_buffer_write_advance(hdlc->write_buffer, pos);
+	ring_buffer_write_advance(write_buffer, pos);
 
 	g_at_io_set_write_handler(hdlc->io, can_write_data, hdlc);
 
diff --git a/gatchat/ringbuffer.c b/gatchat/ringbuffer.c
index becd3f8..27be3a8 100644
--- a/gatchat/ringbuffer.c
+++ b/gatchat/ringbuffer.c
@@ -34,6 +34,7 @@
 struct ring_buffer {
 	unsigned char *buffer;
 	unsigned int size;
+	unsigned int mask;
 	unsigned int in;
 	unsigned int out;
 };
@@ -61,6 +62,7 @@ struct ring_buffer *ring_buffer_new(unsigned int size)
 	}
 
 	buffer->size = real_size;
+	buffer->mask = real_size - 1;
 	buffer->in = 0;
 	buffer->out = 0;
 
@@ -78,7 +80,7 @@ int ring_buffer_write(struct ring_buffer *buf, const void *data,
 	len = MIN(len, buf->size - buf->in + buf->out);
 
 	/* Determine how much to write before wrapping */
-	offset = buf->in % buf->size;
+	offset = buf->in & buf->mask;
 	end = MIN(len, buf->size - offset);
 	memcpy(buf->buffer+offset, d, end);
 
@@ -93,12 +95,12 @@ int ring_buffer_write(struct ring_buffer *buf, const void *data,
 unsigned char *ring_buffer_write_ptr(struct ring_buffer *buf,
 					unsigned int offset)
 {
-	return buf->buffer + (buf->in + offset) % buf->size;
+	return buf->buffer + ((buf->in + offset) & buf->mask);
 }
 
 int ring_buffer_avail_no_wrap(struct ring_buffer *buf)
 {
-	unsigned int offset = buf->in % buf->size;
+	unsigned int offset = buf->in & buf->mask;
 	unsigned int len = buf->size - buf->in + buf->out;
 
 	return MIN(len, buf->size - offset);
@@ -121,7 +123,7 @@ int ring_buffer_read(struct ring_buffer *buf, void *data, unsigned int len)
 	len = MIN(len, buf->in - buf->out);
 
 	/* Grab data from buffer starting at offset until the end */
-	offset = buf->out % buf->size;
+	offset = buf->out & buf->mask;
 	end = MIN(len, buf->size - offset);
 	memcpy(d, buf->buffer + offset, end);
 
@@ -150,7 +152,7 @@ int ring_buffer_drain(struct ring_buffer *buf, unsigned int len)
 
 int ring_buffer_len_no_wrap(struct ring_buffer *buf)
 {
-	unsigned int offset = buf->out % buf->size;
+	unsigned int offset = buf->out & buf->mask;
 	unsigned int len = buf->in - buf->out;
 
 	return MIN(len, buf->size - offset);
@@ -159,7 +161,7 @@ int ring_buffer_len_no_wrap(struct ring_buffer *buf)
 unsigned char *ring_buffer_read_ptr(struct ring_buffer *buf,
 					unsigned int offset)
 {
-	return buf->buffer + (buf->out + offset) % buf->size;
+	return buf->buffer + ((buf->out + offset) & buf->mask);
 }
 
 int ring_buffer_len(struct ring_buffer *buf)
-- 
1.7.1


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* Re: [PATCH] PPP: Optimize write buffer management
  2011-03-01 15:07 Patrick Porlan
@ 2011-03-02  3:47 ` Denis Kenzior
  2011-03-02  8:42   ` Patrick Porlan
  0 siblings, 1 reply; 8+ messages in thread
From: Denis Kenzior @ 2011-03-02  3:47 UTC (permalink / raw)
  To: ofono

[-- Attachment #1: Type: text/plain, Size: 11545 bytes --]

Hi Patrick,

On 03/01/2011 09:07 AM, Patrick Porlan wrote:
> Replace modulo operations in ringbuffer.c by masking operations.
> That's possible because the size of the ring buffers is always
> a power of two, and yields a small performance improvement.
> 
> More importantly, extend the write buffer handling in gathdlc.c
> to minimize stalling and process switching during large PPP transfers.
> The single write buffer is replaced by a circular queue of buffers,
> allowing for much larger emission windows without hugely impacting
> memory consumption. This reduces the time required to send 50 MB
> between a couple of local PPP interfaces on my laptop from 53s to 2.6s.

This is a great description, but right after reading it one should
realize that this patch is better broken down into two.  The first patch
addressing the ringbuffer performance improvements and the second one
dealing with PPP/HDLC queuing.  In fact, if there were two patches, then
ringbuffer changes could go in right away.

> ---
>  gatchat/gathdlc.c    |   85 +++++++++++++++++++++++++++++++++++++-------------
>  gatchat/ringbuffer.c |   14 +++++---
>  2 files changed, 71 insertions(+), 28 deletions(-)
> 
> diff --git a/gatchat/gathdlc.c b/gatchat/gathdlc.c
> index dd11043..9cc6a74 100644
> --- a/gatchat/gathdlc.c
> +++ b/gatchat/gathdlc.c
> @@ -37,7 +37,9 @@
>  #include "gatio.h"
>  #include "gathdlc.h"
>  
> -#define BUFFER_SIZE 2048
> +#define BUFFER_SIZE	4096
> +#define MAX_BUFFERS	64	/* Maximum number of in-flight write buffers: this needs to be a power of 2 */
> +#define HDLC_OVERHEAD	256	/* Rough estimate of HDLC protocol overhead for a frame */
>  

Please try to avoid going over 80 characters a line.  Sometimes it is
unavoidable, but lets try very hard to avoid it.

>  #define HDLC_FLAG	0x7e	/* Flag sequence */
>  #define HDLC_ESCAPE	0x7d	/* Asynchronous control escape */
> @@ -51,7 +53,9 @@
>  struct _GAtHDLC {
>  	gint ref_count;
>  	GAtIO *io;
> -	struct ring_buffer *write_buffer;
> +	struct ring_buffer *wbuf_ring[MAX_BUFFERS]; /* Array of pointers to circular write buffers - itself managed as a ring */
> +	gint wbuf_head; /* First unsent buffer index */
> +	gint wbuf_tail; /* Last unsent buffer index */

Is there a particular reason why you chose to use a ring buffer of ring
buffers? A simple GQueue might be much easier to understand.  If you are
worried about 'infinite queuing' then a simple counter might help to
alleviate that.

>  	unsigned char *decode_buffer;
>  	guint decode_offset;
>  	guint16 decode_fcs;
> @@ -190,6 +194,7 @@ GAtHDLC *g_at_hdlc_new_from_io(GAtIO *io)
>  {
>  	GAtHDLC *hdlc;
>  	unsigned char *buf;
> +	struct ring_buffer* write_buffer;
>  
>  	if (io == NULL)
>  		return NULL;
> @@ -207,16 +212,19 @@ GAtHDLC *g_at_hdlc_new_from_io(GAtIO *io)
>  	hdlc->xmit_accm[3] = 0x60000000; /* 0x7d, 0x7e */
>  	hdlc->recv_accm = ~0U;
>  
> -	hdlc->write_buffer = ring_buffer_new(BUFFER_SIZE * 2);
> -	if (!hdlc->write_buffer)
> +	write_buffer = ring_buffer_new(BUFFER_SIZE);
> +	if (!write_buffer)

You change BUFFER_SIZE from 2048 to 4096 and remove the multiplication
here.  It is a good idea to send these types of changes in a separate
patch for two reasons:
	- Logically they should be separate
	- It is much easier to review for correctness outside the context of a
large patch

>  		goto error;
>  
> +	hdlc->wbuf_head = hdlc->wbuf_tail = 0;
> +	hdlc->wbuf_ring[0] = write_buffer;
> +
>  	/* Write an initial 0x7e as wakeup character */
> -	buf = ring_buffer_write_ptr(hdlc->write_buffer, 0);
> +	buf = ring_buffer_write_ptr(write_buffer, 0);
>  	*buf = HDLC_FLAG;
> -	ring_buffer_write_advance(hdlc->write_buffer, 1);
> +	ring_buffer_write_advance(write_buffer, 1);
>  
> -	hdlc->decode_buffer = g_try_malloc(BUFFER_SIZE * 2);
> +	hdlc->decode_buffer = g_try_malloc(BUFFER_SIZE);
>  	if (!hdlc->decode_buffer)
>  		goto error;
>  
> @@ -228,8 +236,8 @@ GAtHDLC *g_at_hdlc_new_from_io(GAtIO *io)
>  	return hdlc;
>  
>  error:
> -	if (hdlc->write_buffer)
> -		ring_buffer_free(hdlc->write_buffer);
> +	if (write_buffer)
> +	    ring_buffer_free(write_buffer);

Please watch out for space / tab indentation mixups.  You have one here

>  
>  	if (hdlc->decode_buffer)
>  		g_free(hdlc->decode_buffer);
> @@ -266,6 +274,8 @@ GAtHDLC *g_at_hdlc_ref(GAtHDLC *hdlc)
>  
>  void g_at_hdlc_unref(GAtHDLC *hdlc)
>  {
> +	int i;
> +
>  	if (hdlc == NULL)
>  		return;
>  
> @@ -280,7 +290,11 @@ void g_at_hdlc_unref(GAtHDLC *hdlc)
>  	g_at_io_unref(hdlc->io);
>  	hdlc->io = NULL;
>  
> -	ring_buffer_free(hdlc->write_buffer);
> +	for (i = hdlc->wbuf_head; i != hdlc->wbuf_tail; i = (i+1) % MAX_BUFFERS)
> +	    ring_buffer_free(hdlc->wbuf_ring[i]);

And here as well

> +
> +	ring_buffer_free(hdlc->wbuf_ring[i]);
> +
>  	g_free(hdlc->decode_buffer);
>  
>  	if (hdlc->in_read_handler)
> @@ -314,17 +328,26 @@ static gboolean can_write_data(gpointer data)
>  	unsigned int len;
>  	unsigned char *buf;
>  	gsize bytes_written;
> +	struct ring_buffer* write_buffer = hdlc->wbuf_ring[hdlc->wbuf_head];
>  
> -	len = ring_buffer_len_no_wrap(hdlc->write_buffer);
> -	buf = ring_buffer_read_ptr(hdlc->write_buffer, 0);
> +	len = ring_buffer_len_no_wrap(write_buffer);
> +	buf = ring_buffer_read_ptr(write_buffer, 0);
>  
>  	bytes_written = g_at_io_write(hdlc->io, (gchar *) buf, len);
>  	hdlc_record(hdlc->record_fd, FALSE, buf, bytes_written);
> -	ring_buffer_drain(hdlc->write_buffer, bytes_written);
> +	ring_buffer_drain(write_buffer, bytes_written);
>  
> -	if (ring_buffer_len(hdlc->write_buffer) > 0)
> +	if (ring_buffer_len(write_buffer) > 0)
>  		return TRUE;
>  
> +	/* We're done with this buffer : adjust head if there is at least another buffer in flight */
> +	if (hdlc->wbuf_head != hdlc->wbuf_tail) {
> +		ring_buffer_free(write_buffer);
> +		hdlc->wbuf_head = (hdlc->wbuf_head + 1) % MAX_BUFFERS;
> +
> +		return TRUE;
> +	}
> +
>  	return FALSE;
>  }
>  
> @@ -356,19 +379,37 @@ GAtIO *g_at_hdlc_get_io(GAtHDLC *hdlc)
>  
>  gboolean g_at_hdlc_send(GAtHDLC *hdlc, const unsigned char *data, gsize size)
>  {
> -	unsigned int avail = ring_buffer_avail(hdlc->write_buffer);
> -	unsigned int wrap = ring_buffer_avail_no_wrap(hdlc->write_buffer);
> -	unsigned char *buf = ring_buffer_write_ptr(hdlc->write_buffer, 0);
> +	struct ring_buffer* write_buffer = hdlc->wbuf_ring[hdlc->wbuf_tail];
> +	unsigned int avail = ring_buffer_avail(write_buffer);
> +	unsigned int wrap = ring_buffer_avail_no_wrap(write_buffer);
> +	unsigned char *buf;
>  	unsigned char tail[2];
>  	unsigned int i = 0;
>  	guint16 fcs = HDLC_INITFCS;
>  	gboolean escape = FALSE;
>  	gsize pos = 0;
>  
> -	if (avail < size)
> -		return FALSE;
> +	if (avail < size + HDLC_OVERHEAD) {

So I think we have to be a bit careful here.  HDLC framing can in theory
(if you're maximally unlucky) result in doubling of the data size once
it is framed.  This means that we might have enough space in the current
buffer according to this estimate, but still exceed it once the actual
framing is performed.  If so, then we have to drop the frame.

There are two possibilities:
	- Retry again with a full buffer this time
	- Always pick a buffer if we have less than 2x size available

> +		/* Switch to a new buffer */
> +
> +		if ((hdlc->wbuf_tail + 1) % MAX_BUFFERS == hdlc->wbuf_head)
> +			return FALSE;	/* Ring full */
> +
> +		write_buffer = ring_buffer_new(BUFFER_SIZE);
> +
> +		if (!write_buffer)
> +			return FALSE;
> +
> +		hdlc->wbuf_tail = (hdlc->wbuf_tail + 1) % MAX_BUFFERS;
> +		hdlc->wbuf_ring[hdlc->wbuf_tail] = write_buffer;
> +
> +		avail = ring_buffer_avail(write_buffer);
> +		wrap = ring_buffer_avail_no_wrap(write_buffer);
> +		buf = ring_buffer_write_ptr(write_buffer, 0);
> +	}
>  
>  	i = 0;
> +	buf = ring_buffer_write_ptr(write_buffer, 0);
>  
>  	while (pos < avail && i < size) {
>  		if (escape == TRUE) {
> @@ -387,7 +428,7 @@ gboolean g_at_hdlc_send(GAtHDLC *hdlc, const unsigned char *data, gsize size)
>  		pos++;
>  
>  		if (pos == wrap)
> -			buf = ring_buffer_write_ptr(hdlc->write_buffer, pos);
> +			buf = ring_buffer_write_ptr(write_buffer, pos);
>  	}
>  
>  	if (i < size)
> @@ -414,7 +455,7 @@ gboolean g_at_hdlc_send(GAtHDLC *hdlc, const unsigned char *data, gsize size)
>  		pos++;
>  
>  		if (pos == wrap)
> -			buf = ring_buffer_write_ptr(hdlc->write_buffer, pos);
> +		    buf = ring_buffer_write_ptr(write_buffer, pos);
>  	}
>  
>  	if (i < sizeof(tail))
> @@ -426,7 +467,7 @@ gboolean g_at_hdlc_send(GAtHDLC *hdlc, const unsigned char *data, gsize size)
>  	*buf = HDLC_FLAG;
>  	pos++;
>  
> -	ring_buffer_write_advance(hdlc->write_buffer, pos);
> +	ring_buffer_write_advance(write_buffer, pos);
>  
>  	g_at_io_set_write_handler(hdlc->io, can_write_data, hdlc);
>  
> diff --git a/gatchat/ringbuffer.c b/gatchat/ringbuffer.c
> index becd3f8..27be3a8 100644
> --- a/gatchat/ringbuffer.c
> +++ b/gatchat/ringbuffer.c
> @@ -34,6 +34,7 @@
>  struct ring_buffer {
>  	unsigned char *buffer;
>  	unsigned int size;
> +	unsigned int mask;
>  	unsigned int in;
>  	unsigned int out;
>  };
> @@ -61,6 +62,7 @@ struct ring_buffer *ring_buffer_new(unsigned int size)
>  	}
>  
>  	buffer->size = real_size;
> +	buffer->mask = real_size - 1;
>  	buffer->in = 0;
>  	buffer->out = 0;
>  
> @@ -78,7 +80,7 @@ int ring_buffer_write(struct ring_buffer *buf, const void *data,
>  	len = MIN(len, buf->size - buf->in + buf->out);
>  
>  	/* Determine how much to write before wrapping */
> -	offset = buf->in % buf->size;
> +	offset = buf->in & buf->mask;
>  	end = MIN(len, buf->size - offset);
>  	memcpy(buf->buffer+offset, d, end);
>  
> @@ -93,12 +95,12 @@ int ring_buffer_write(struct ring_buffer *buf, const void *data,
>  unsigned char *ring_buffer_write_ptr(struct ring_buffer *buf,
>  					unsigned int offset)
>  {
> -	return buf->buffer + (buf->in + offset) % buf->size;
> +	return buf->buffer + ((buf->in + offset) & buf->mask);
>  }
>  
>  int ring_buffer_avail_no_wrap(struct ring_buffer *buf)
>  {
> -	unsigned int offset = buf->in % buf->size;
> +	unsigned int offset = buf->in & buf->mask;
>  	unsigned int len = buf->size - buf->in + buf->out;
>  
>  	return MIN(len, buf->size - offset);
> @@ -121,7 +123,7 @@ int ring_buffer_read(struct ring_buffer *buf, void *data, unsigned int len)
>  	len = MIN(len, buf->in - buf->out);
>  
>  	/* Grab data from buffer starting at offset until the end */
> -	offset = buf->out % buf->size;
> +	offset = buf->out & buf->mask;
>  	end = MIN(len, buf->size - offset);
>  	memcpy(d, buf->buffer + offset, end);
>  
> @@ -150,7 +152,7 @@ int ring_buffer_drain(struct ring_buffer *buf, unsigned int len)
>  
>  int ring_buffer_len_no_wrap(struct ring_buffer *buf)
>  {
> -	unsigned int offset = buf->out % buf->size;
> +	unsigned int offset = buf->out & buf->mask;
>  	unsigned int len = buf->in - buf->out;
>  
>  	return MIN(len, buf->size - offset);
> @@ -159,7 +161,7 @@ int ring_buffer_len_no_wrap(struct ring_buffer *buf)
>  unsigned char *ring_buffer_read_ptr(struct ring_buffer *buf,
>  					unsigned int offset)
>  {
> -	return buf->buffer + (buf->out + offset) % buf->size;
> +	return buf->buffer + ((buf->out + offset) & buf->mask);
>  }
>  
>  int ring_buffer_len(struct ring_buffer *buf)

Regards,
-Denis

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [PATCH] PPP: Optimize write buffer management
  2011-03-02  3:47 ` Denis Kenzior
@ 2011-03-02  8:42   ` Patrick Porlan
  2011-03-02 15:28     ` Denis Kenzior
  0 siblings, 1 reply; 8+ messages in thread
From: Patrick Porlan @ 2011-03-02  8:42 UTC (permalink / raw)
  To: ofono

[-- Attachment #1: Type: text/plain, Size: 2207 bytes --]

Hi Denis,

On Tue, 2011-03-01 at 21:47 -0600, Denis Kenzior wrote:
> This is a great description, but right after reading it one should
> realize that this patch is better broken down into two.  The first patch
> addressing the ringbuffer performance improvements and the second one
> dealing with PPP/HDLC queuing.  In fact, if there were two patches, then
> ringbuffer changes could go in right away.

Fair enough ; I'll separate the changes.


> Please try to avoid going over 80 characters a line.  Sometimes it is
> unavoidable, but lets try very hard to avoid it.

Understood.


> Is there a particular reason why you chose to use a ring buffer of ring
> buffers? A simple GQueue might be much easier to understand.  If you are
> worried about 'infinite queuing' then a simple counter might help to
> alleviate that.

Well, this implementation opens the door to further optimizations, such
as reusing cells (buffers) to avoid frequent allocation/deallocation.


> You change BUFFER_SIZE from 2048 to 4096 and remove the multiplication
> here.  It is a good idea to send these types of changes in a separate
> patch for two reasons:
> 	- Logically they should be separate
> 	- It is much easier to review for correctness outside the context of a
> large patch

Got it. By the way, why is it written this way? Is it to stress that we
want space for two frames?


> Please watch out for space / tab indentation mixups. 

OK. I'll fix that.


> So I think we have to be a bit careful here.  HDLC framing can in theory
> (if you're maximally unlucky) result in doubling of the data size once
> it is framed.  This means that we might have enough space in the current
> buffer according to this estimate, but still exceed it once the actual
> framing is performed.  If so, then we have to drop the frame.
> 
> There are two possibilities:
> 	- Retry again with a full buffer this time
> 	- Always pick a buffer if we have less than 2x size available
> 

Thanks for the insight. Picking a new buffer may be best. That will
cause more buffer "run away" though, strengthening the case for better
buffer recycling strategies.

Regards,

-- Patrick


^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [PATCH] PPP: Optimize write buffer management
  2011-03-02  8:42   ` Patrick Porlan
@ 2011-03-02 15:28     ` Denis Kenzior
  2011-03-08 16:08       ` Patrick Porlan
  0 siblings, 1 reply; 8+ messages in thread
From: Denis Kenzior @ 2011-03-02 15:28 UTC (permalink / raw)
  To: ofono

[-- Attachment #1: Type: text/plain, Size: 2139 bytes --]

Hi Patrick,

>> Is there a particular reason why you chose to use a ring buffer of ring
>> buffers? A simple GQueue might be much easier to understand.  If you are
>> worried about 'infinite queuing' then a simple counter might help to
>> alleviate that.
> 
> Well, this implementation opens the door to further optimizations, such
> as reusing cells (buffers) to avoid frequent allocation/deallocation.
> 

I think I know where you're going with this, but I'm unconvinced the
circular buffer of buffers is buying you anything over the much simpler
to understand queue of buffers approach.

> 
>> You change BUFFER_SIZE from 2048 to 4096 and remove the multiplication
>> here.  It is a good idea to send these types of changes in a separate
>> patch for two reasons:
>> 	- Logically they should be separate
>> 	- It is much easier to review for correctness outside the context of a
>> large patch
> 
> Got it. By the way, why is it written this way? Is it to stress that we
> want space for two frames?
>

You'd have to ask Marcel, but yes I think this was the intent.

>> So I think we have to be a bit careful here.  HDLC framing can in theory
>> (if you're maximally unlucky) result in doubling of the data size once
>> it is framed.  This means that we might have enough space in the current
>> buffer according to this estimate, but still exceed it once the actual
>> framing is performed.  If so, then we have to drop the frame.
>>
>> There are two possibilities:
>> 	- Retry again with a full buffer this time
>> 	- Always pick a buffer if we have less than 2x size available
>>
> 
> Thanks for the insight. Picking a new buffer may be best. That will
> cause more buffer "run away" though, strengthening the case for better
> buffer recycling strategies.
> 

Yes, we definitely want to have some sort of buffer pool management
strategy to share a pool of buffers between different entities (e.g.
GAtMux, GAtServer, GAtHDLC).  Have you looked at g_slice* inside glib
yet?  I think we can take advantage of the fact that most of our ring
buffers are sized at 4k.

Regards,
-Denis

^ permalink raw reply	[flat|nested] 8+ messages in thread

* [PATCH] PPP: Optimize write buffer management
@ 2011-03-08 15:58 Patrick Porlan
  2011-03-15 19:28 ` Denis Kenzior
  0 siblings, 1 reply; 8+ messages in thread
From: Patrick Porlan @ 2011-03-08 15:58 UTC (permalink / raw)
  To: ofono

[-- Attachment #1: Type: text/plain, Size: 6687 bytes --]

Extend the write buffer handling in gathdlc.c to minimize stalling and
process switching during large PPP transfers. The single write buffer
is replaced by a queue of buffers, allowing for much larger emission
windows without hugely impacting memory consumption. This reduces the
time required to send 50 MB between a couple of local PPP interfaces on
my laptop from ~53s to ~3s.
---
 gatchat/gathdlc.c |   95 ++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 73 insertions(+), 22 deletions(-)

diff --git a/gatchat/gathdlc.c b/gatchat/gathdlc.c
index 6c39e6c..893e061 100644
--- a/gatchat/gathdlc.c
+++ b/gatchat/gathdlc.c
@@ -37,7 +37,9 @@
 #include "gatio.h"
 #include "gathdlc.h"
 
-#define BUFFER_SIZE 2048
+#define BUFFER_SIZE	(2 * 2048)
+#define MAX_BUFFERS	64	/* Maximum number of in-flight write buffers */
+#define HDLC_OVERHEAD	256	/* Rough estimate of HDLC protocol overhead */
 
 #define HDLC_FLAG	0x7e	/* Flag sequence */
 #define HDLC_ESCAPE	0x7d	/* Asynchronous control escape */
@@ -51,7 +53,7 @@
 struct _GAtHDLC {
 	gint ref_count;
 	GAtIO *io;
-	struct ring_buffer *write_buffer;
+	GQueue *write_queue;	/* Write buffer queue */
 	unsigned char *decode_buffer;
 	guint decode_offset;
 	guint16 decode_fcs;
@@ -201,6 +203,7 @@ GAtHDLC *g_at_hdlc_new_from_io(GAtIO *io)
 {
 	GAtHDLC *hdlc;
 	unsigned char *buf;
+	struct ring_buffer* write_buffer;
 
 	if (io == NULL)
 		return NULL;
@@ -218,16 +221,22 @@ GAtHDLC *g_at_hdlc_new_from_io(GAtIO *io)
 	hdlc->xmit_accm[3] = 0x60000000; /* 0x7d, 0x7e */
 	hdlc->recv_accm = ~0U;
 
-	hdlc->write_buffer = ring_buffer_new(BUFFER_SIZE * 2);
-	if (!hdlc->write_buffer)
+	write_buffer = ring_buffer_new(BUFFER_SIZE);
+	if (!write_buffer)
+		goto error;
+
+	hdlc->write_queue = g_queue_new();
+	if (!hdlc->write_queue)
 		goto error;
 
+	g_queue_push_tail(hdlc->write_queue, write_buffer);
+
 	/* Write an initial 0x7e as wakeup character */
-	buf = ring_buffer_write_ptr(hdlc->write_buffer, 0);
+	buf = ring_buffer_write_ptr(write_buffer, 0);
 	*buf = HDLC_FLAG;
-	ring_buffer_write_advance(hdlc->write_buffer, 1);
+	ring_buffer_write_advance(write_buffer, 1);
 
-	hdlc->decode_buffer = g_try_malloc(BUFFER_SIZE * 2);
+	hdlc->decode_buffer = g_try_malloc(BUFFER_SIZE);
 	if (!hdlc->decode_buffer)
 		goto error;
 
@@ -239,8 +248,11 @@ GAtHDLC *g_at_hdlc_new_from_io(GAtIO *io)
 	return hdlc;
 
 error:
-	if (hdlc->write_buffer)
-		ring_buffer_free(hdlc->write_buffer);
+	if (hdlc->write_queue)
+		g_queue_free(hdlc->write_queue);
+
+	if (write_buffer)
+		ring_buffer_free(write_buffer);
 
 	if (hdlc->decode_buffer)
 		g_free(hdlc->decode_buffer);
@@ -277,6 +289,8 @@ GAtHDLC *g_at_hdlc_ref(GAtHDLC *hdlc)
 
 void g_at_hdlc_unref(GAtHDLC *hdlc)
 {
+	struct ring_buffer *write_buffer;
+
 	if (hdlc == NULL)
 		return;
 
@@ -294,7 +308,11 @@ void g_at_hdlc_unref(GAtHDLC *hdlc)
 	g_at_io_unref(hdlc->io);
 	hdlc->io = NULL;
 
-	ring_buffer_free(hdlc->write_buffer);
+	while ((write_buffer = g_queue_pop_head(hdlc->write_queue)))
+		ring_buffer_free(write_buffer);
+
+	g_queue_free(hdlc->write_queue);
+
 	g_free(hdlc->decode_buffer);
 
 	if (hdlc->in_read_handler)
@@ -328,15 +346,32 @@ static gboolean can_write_data(gpointer data)
 	unsigned int len;
 	unsigned char *buf;
 	gsize bytes_written;
+	struct ring_buffer* write_buffer;
 
-	len = ring_buffer_len_no_wrap(hdlc->write_buffer);
-	buf = ring_buffer_read_ptr(hdlc->write_buffer, 0);
+	/* Write data out from the head of the queue */
+	write_buffer = g_queue_peek_head(hdlc->write_queue);
+
+	len = ring_buffer_len_no_wrap(write_buffer);
+	buf = ring_buffer_read_ptr(write_buffer, 0);
 
 	bytes_written = g_at_io_write(hdlc->io, (gchar *) buf, len);
 	hdlc_record(hdlc->record_fd, FALSE, buf, bytes_written);
-	ring_buffer_drain(hdlc->write_buffer, bytes_written);
+	ring_buffer_drain(write_buffer, bytes_written);
 
-	if (ring_buffer_len(hdlc->write_buffer) > 0)
+	if (ring_buffer_len(write_buffer) > 0)
+		return TRUE;
+
+	/* All data in current buffer is written, free it
+	 * unless it's the last buffer in the queue.
+	 */
+	if ((ring_buffer_len(write_buffer) == 0) &&
+			(g_queue_get_length(hdlc->write_queue) > 1)) {
+		write_buffer = g_queue_pop_head(hdlc->write_queue);
+		ring_buffer_free(write_buffer);
+		write_buffer = g_queue_peek_head(hdlc->write_queue);
+	}
+
+	if (ring_buffer_len(write_buffer) > 0)
 		return TRUE;
 
 	return FALSE;
@@ -370,19 +405,35 @@ GAtIO *g_at_hdlc_get_io(GAtHDLC *hdlc)
 
 gboolean g_at_hdlc_send(GAtHDLC *hdlc, const unsigned char *data, gsize size)
 {
-	unsigned int avail = ring_buffer_avail(hdlc->write_buffer);
-	unsigned int wrap = ring_buffer_avail_no_wrap(hdlc->write_buffer);
-	unsigned char *buf = ring_buffer_write_ptr(hdlc->write_buffer, 0);
+	struct ring_buffer* write_buffer = g_queue_peek_tail(hdlc->write_queue);
+
+	unsigned int avail = ring_buffer_avail(write_buffer);
+	unsigned int wrap = ring_buffer_avail_no_wrap(write_buffer);
+	unsigned char *buf;
 	unsigned char tail[2];
 	unsigned int i = 0;
 	guint16 fcs = HDLC_INITFCS;
 	gboolean escape = FALSE;
 	gsize pos = 0;
 
-	if (avail < size)
-		return FALSE;
+	if (avail < size + HDLC_OVERHEAD) {
+
+		if (g_queue_get_length(hdlc->write_queue) > MAX_BUFFERS)
+			return FALSE;	/* Too many pending buffers */
+
+		write_buffer = ring_buffer_new(BUFFER_SIZE);
+
+		if (!write_buffer)
+			return FALSE;
+
+		g_queue_push_tail(hdlc->write_queue, write_buffer);
+
+		avail = ring_buffer_avail(write_buffer);
+		wrap = ring_buffer_avail_no_wrap(write_buffer);
+	}
 
 	i = 0;
+	buf = ring_buffer_write_ptr(write_buffer, 0);
 
 	while (pos < avail && i < size) {
 		if (escape == TRUE) {
@@ -401,7 +452,7 @@ gboolean g_at_hdlc_send(GAtHDLC *hdlc, const unsigned char *data, gsize size)
 		pos++;
 
 		if (pos == wrap)
-			buf = ring_buffer_write_ptr(hdlc->write_buffer, pos);
+			buf = ring_buffer_write_ptr(write_buffer, pos);
 	}
 
 	if (i < size)
@@ -428,7 +479,7 @@ gboolean g_at_hdlc_send(GAtHDLC *hdlc, const unsigned char *data, gsize size)
 		pos++;
 
 		if (pos == wrap)
-			buf = ring_buffer_write_ptr(hdlc->write_buffer, pos);
+		    buf = ring_buffer_write_ptr(write_buffer, pos);
 	}
 
 	if (i < sizeof(tail))
@@ -440,7 +491,7 @@ gboolean g_at_hdlc_send(GAtHDLC *hdlc, const unsigned char *data, gsize size)
 	*buf = HDLC_FLAG;
 	pos++;
 
-	ring_buffer_write_advance(hdlc->write_buffer, pos);
+	ring_buffer_write_advance(write_buffer, pos);
 
 	g_at_io_set_write_handler(hdlc->io, can_write_data, hdlc);
 
-- 
1.7.1


^ permalink raw reply related	[flat|nested] 8+ messages in thread

* Re: [PATCH] PPP: Optimize write buffer management
  2011-03-02 15:28     ` Denis Kenzior
@ 2011-03-08 16:08       ` Patrick Porlan
  0 siblings, 0 replies; 8+ messages in thread
From: Patrick Porlan @ 2011-03-08 16:08 UTC (permalink / raw)
  To: ofono

[-- Attachment #1: Type: text/plain, Size: 1786 bytes --]

Hi Denis,

On Wed, 2011-03-02 at 09:28 -0600, Denis Kenzior wrote:

> >> Is there a particular reason why you chose to use a ring buffer of ring
> >> buffers? A simple GQueue might be much easier to understand.  If you are
> >> worried about 'infinite queuing' then a simple counter might help to
> >> alleviate that.

I just sent a new version of my patch that does it (forgot to tag it as
v2, sorry).


> >> You change BUFFER_SIZE from 2048 to 4096 and remove the multiplication
> >> here.

Having a constant named BUFFER_SIZE whose value is half a buffer size
did not seem right though. I reinstantated the multiplication, but at
the macro definition level.


> >> So I think we have to be a bit careful here.  HDLC framing can in theory
> >> (if you're maximally unlucky) result in doubling of the data size once
> >> it is framed.  This means that we might have enough space in the current
> >> buffer according to this estimate, but still exceed it once the actual
> >> framing is performed.  If so, then we have to drop the frame.

I checked with random data, and get some frames dropped with a 128
margin, but none with 256. Using a much larger margin might be
detrimental to memory usage, so I chose to leave it like this. We can
address this in a further patch if you wish.


> Yes, we definitely want to have some sort of buffer pool management
> strategy to share a pool of buffers between different entities (e.g.
> GAtMux, GAtServer, GAtHDLC).  Have you looked at g_slice* inside glib
> yet?  I think we can take advantage of the fact that most of our ring
> buffers are sized at 4k.

It looks like this allocator should be used in ringbuffer.c. Let's
discuss that and follow up with another patch.

Regards,

-- Patrick




^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [PATCH] PPP: Optimize write buffer management
  2011-03-08 15:58 [PATCH] PPP: Optimize write buffer management Patrick Porlan
@ 2011-03-15 19:28 ` Denis Kenzior
  2011-03-16  9:00   ` Patrick Porlan
  0 siblings, 1 reply; 8+ messages in thread
From: Denis Kenzior @ 2011-03-15 19:28 UTC (permalink / raw)
  To: ofono

[-- Attachment #1: Type: text/plain, Size: 1330 bytes --]

Hi Patrick,

On 03/08/2011 09:58 AM, Patrick Porlan wrote:
> Extend the write buffer handling in gathdlc.c to minimize stalling and
> process switching during large PPP transfers. The single write buffer
> is replaced by a queue of buffers, allowing for much larger emission
> windows without hugely impacting memory consumption. This reduces the
> time required to send 50 MB between a couple of local PPP interfaces on
> my laptop from ~53s to ~3s.
> ---
>  gatchat/gathdlc.c |   95 ++++++++++++++++++++++++++++++++++++++++------------
>  1 files changed, 73 insertions(+), 22 deletions(-)
> 

So I was pretty happy with this patch and went ahead and applied it.  I
made a few simple style modifications beforehand though.

<snip>

>  
> -	if (avail < size)
> -		return FALSE;
> +	if (avail < size + HDLC_OVERHEAD) {
> +

So I'm still a little worried about this part.  In theory the
HDLC_OVERHEAD should have two factors:
	- Our transmit ACCM
	- The size of the frame

Since we're dealing with MTUs of ~1500, our frames don't really ever
exceed it.  So 256 byte overhead is probably ok, but might be worth
checking if making this dynamic buys us anything.

Also, what do you think of growing the overhead and performing the
framing again in case our estimate is too low?

Regards,
-Denis

^ permalink raw reply	[flat|nested] 8+ messages in thread

* Re: [PATCH] PPP: Optimize write buffer management
  2011-03-15 19:28 ` Denis Kenzior
@ 2011-03-16  9:00   ` Patrick Porlan
  0 siblings, 0 replies; 8+ messages in thread
From: Patrick Porlan @ 2011-03-16  9:00 UTC (permalink / raw)
  To: ofono

[-- Attachment #1: Type: text/plain, Size: 1263 bytes --]

Hi Denis,

On Tue, 2011-03-15 at 14:28 -0500, Denis Kenzior wrote:

> Since we're dealing with MTUs of ~1500, our frames don't really ever
> exceed it.  So 256 byte overhead is probably ok, but might be worth
> checking if making this dynamic buys us anything.

Sounds like a good idea. We can expect to save a hundred of bytes or so
per 4 KB buffer in average: I witnessed overhead in the 100~150 bytes
but indeed it may be consistently lower than that, or higher than 256,
depending on the traffic.

> Also, what do you think of growing the overhead and performing the
> framing again in case our estimate is too low?

That becomes even more necessary if we start with a margin we expect to
bump into.

For the record, I tried the g_slice allocator in ringbuffer.c, and can't
really see a performance gain - or loss -, with the downside that there
is no "try" version, and that an allocation failure will therefore fault
the process. As for why it's so, I suppose that even a few thousand
allocations/deallocations only take a few ms. I'm also starting to think
that there is thread aware code in the glib that is not really needed
inside the ofono machinery. I'll send that patch to the list
nonetheless.

Regards,

-- Patrick


^ permalink raw reply	[flat|nested] 8+ messages in thread

end of thread, other threads:[~2011-03-16  9:00 UTC | newest]

Thread overview: 8+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2011-03-08 15:58 [PATCH] PPP: Optimize write buffer management Patrick Porlan
2011-03-15 19:28 ` Denis Kenzior
2011-03-16  9:00   ` Patrick Porlan
  -- strict thread matches above, loose matches on Subject: below --
2011-03-01 15:07 Patrick Porlan
2011-03-02  3:47 ` Denis Kenzior
2011-03-02  8:42   ` Patrick Porlan
2011-03-02 15:28     ` Denis Kenzior
2011-03-08 16:08       ` Patrick Porlan

This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.