* [PATCH 1/3] Add write ring buffer for non-blocking write
@ 2010-02-11 2:56 Zhenhua Zhang
2010-02-11 2:56 ` [PATCH 2/3] Add handle the case when write buffer is full Zhenhua Zhang
2010-02-11 18:44 ` [PATCH 1/3] Add write ring buffer for non-blocking write Denis Kenzior
0 siblings, 2 replies; 4+ messages in thread
From: Zhenhua Zhang @ 2010-02-11 2:56 UTC (permalink / raw)
To: ofono
[-- Attachment #1: Type: text/plain, Size: 6082 bytes --]
Use two layers to cache server side response data to client.
1. A fixed-length ring buffer.
2. A list of free ring buffers and a list of empty full ring buffer.
If current ring buffer is full, put it into full buffer list and fetch
a free buffer frome free list.
---
gatchat/gatserver.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++----
1 files changed, 98 insertions(+), 8 deletions(-)
diff --git a/gatchat/gatserver.c b/gatchat/gatserver.c
index bf7e847..6e76016 100644
--- a/gatchat/gatserver.c
+++ b/gatchat/gatserver.c
@@ -32,6 +32,15 @@
#include "ringbuffer.h"
#include "gatserver.h"
+#define MAX_BUFFER_NUM 5
+#define READ_BUF_SIZE 4096
+/* #define WRITE_SCHEDULER_DEBUG 1 */
+#ifdef WRITE_SCHEDULER_DEBUG
+#define WRITE_BUF_SIZE 4
+#else
+#define WRITE_BUF_SIZE 4096
+#endif
+
enum ParserState {
PARSER_STATE_IDLE,
PARSER_STATE_A,
@@ -90,17 +99,50 @@ struct _GAtServer {
struct v250_settings v250; /* V.250 command setting */
GIOChannel *channel; /* Server IO */
guint read_watch; /* GSource read id, 0 if none */
+ guint write_watch; /* GSource write id, 0 if none */
guint read_so_far; /* Number of bytes processed */
GAtDisconnectFunc user_disconnect; /* User disconnect func */
gpointer user_disconnect_data; /* User disconnect data */
GAtDebugFunc debugf; /* Debugging output function */
gpointer debug_data; /* Data to pass to debug func */
struct ring_buffer *read_buf; /* Current read buffer */
+ struct ring_buffer *write_buf; /* Current write buffer */
+ GSList *full_list; /* List of full ring buffer */
+ GSList *free_list; /* List of free ring buffer */
guint max_read_attempts; /* Max reads per select */
enum ParserState parser_state;
gboolean destroyed; /* Re-entrancy guard */
};
+static void g_at_server_wakeup_writer(GAtServer *server);
+
+static gboolean alloc_free_list(GAtServer *server)
+{
+ struct ring_buffer *buf;
+ guint i;
+
+ for (i = 0; i < MAX_BUFFER_NUM; i++) {
+ buf = ring_buffer_new(WRITE_BUF_SIZE);
+ if (!buf)
+ return FALSE;
+
+ server->free_list = g_slist_prepend(server->free_list, buf);
+ }
+
+ return TRUE;
+}
+
+static void send_common(GAtServer *server, const char *buf)
+{
+ gsize avail = ring_buffer_avail(server->write_buf);
+ gsize len = strlen(buf);
+
+ if (avail >= len)
+ ring_buffer_write(server->write_buf, buf, len);
+
+ g_at_server_wakeup_writer(server);
+}
+
static void g_at_server_send_result(GAtServer *server, GAtServerResult result)
{
struct v250_settings v250 = server->v250;
@@ -108,7 +150,6 @@ static void g_at_server_send_result(GAtServer *server, GAtServerResult result)
char buf[1024];
char t = v250.s3;
char r = v250.s4;
- gsize wbuf;
if (v250.quiet)
return;
@@ -125,8 +166,7 @@ static void g_at_server_send_result(GAtServer *server, GAtServerResult result)
g_at_util_debug_chat(FALSE, buf, strlen(buf),
server->debugf, server->debug_data);
- g_io_channel_write(server->channel, (char *) buf, strlen(buf),
- &wbuf);
+ send_common(server, buf);
}
static inline gboolean is_at_command_prefix(const char c)
@@ -432,12 +472,30 @@ static gboolean received_data(GIOChannel *channel, GIOCondition cond,
return TRUE;
}
+static gboolean can_write_data(GIOChannel *channel, GIOCondition cond,
+ gpointer data)
+{
+ return FALSE;
+}
+
static void g_at_server_cleanup(GAtServer *server)
{
/* Cleanup all received data */
ring_buffer_free(server->read_buf);
server->read_buf = NULL;
+ /* Cleanup pending data to write */
+ ring_buffer_free(server->write_buf);
+ server->write_buf = NULL;
+
+ if (server->full_list)
+ g_slist_foreach(server->full_list, (GFunc)ring_buffer_free,
+ NULL);
+
+ if (server->free_list)
+ g_slist_foreach(server->free_list, (GFunc)ring_buffer_free,
+ NULL);
+
server->channel = NULL;
}
@@ -446,8 +504,6 @@ static void read_watcher_destroy_notify(GAtServer *server)
g_at_server_cleanup(server);
server->read_watch = 0;
- server->channel = NULL;
-
if (server->user_disconnect)
server->user_disconnect(server->user_disconnect_data);
@@ -455,6 +511,23 @@ static void read_watcher_destroy_notify(GAtServer *server)
g_free(server);
}
+static void write_watcher_destroy_notify(GAtServer *server)
+{
+ server->write_watch = 0;
+}
+
+static void g_at_server_wakeup_writer(GAtServer *server)
+{
+ if (server->write_watch != 0)
+ return;
+
+ server->write_watch = g_io_add_watch_full(server->channel,
+ G_PRIORITY_DEFAULT,
+ G_IO_OUT | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
+ can_write_data, server,
+ (GDestroyNotify)write_watcher_destroy_notify);
+}
+
static void v250_settings_create(struct v250_settings *v250)
{
v250->s3 = '\r';
@@ -482,12 +555,19 @@ GAtServer *g_at_server_new(GIOChannel *io)
server->ref_count = 1;
v250_settings_create(&server->v250);
server->channel = io;
- server->read_buf = ring_buffer_new(4096);
- server->max_read_attempts = 3;
-
+ server->read_buf = ring_buffer_new(READ_BUF_SIZE);
if (!server->read_buf)
goto error;
+ server->write_buf = ring_buffer_new(WRITE_BUF_SIZE);
+ if (!server->write_buf)
+ goto error;
+
+ if (!alloc_free_list(server))
+ goto error;
+
+ server->max_read_attempts = 3;
+
if (!g_at_util_setup_io(server->channel, G_IO_FLAG_NONBLOCK))
goto error;
@@ -502,6 +582,13 @@ error:
if (server->read_buf)
ring_buffer_free(server->read_buf);
+ if (server->write_buf)
+ ring_buffer_free(server->write_buf);
+
+ if (server->free_list)
+ g_slist_foreach(server->free_list, (GFunc)ring_buffer_free,
+ NULL);
+
if (server)
g_free(server);
@@ -552,6 +639,9 @@ gboolean g_at_server_shutdown(GAtServer *server)
server->user_disconnect = NULL;
server->user_disconnect_data = NULL;
+ if (server->write_watch)
+ g_source_remove(server->write_watch);
+
if (server->read_watch)
g_source_remove(server->read_watch);
--
1.6.6.1
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH 2/3] Add handle the case when write buffer is full
2010-02-11 2:56 [PATCH 1/3] Add write ring buffer for non-blocking write Zhenhua Zhang
@ 2010-02-11 2:56 ` Zhenhua Zhang
2010-02-11 2:56 ` [PATCH 3/3] Add write server response into non blocking IO Zhenhua Zhang
2010-02-11 18:44 ` [PATCH 1/3] Add write ring buffer for non-blocking write Denis Kenzior
1 sibling, 1 reply; 4+ messages in thread
From: Zhenhua Zhang @ 2010-02-11 2:56 UTC (permalink / raw)
To: ofono
[-- Attachment #1: Type: text/plain, Size: 2366 bytes --]
If current write_buf is fulled, put it into the full list and fetch
a new ring buffer from the free list.
---
gatchat/gatserver.c | 63 ++++++++++++++++++++++++++++++++++++++++++++++++--
1 files changed, 60 insertions(+), 3 deletions(-)
diff --git a/gatchat/gatserver.c b/gatchat/gatserver.c
index 6e76016..2c160bf 100644
--- a/gatchat/gatserver.c
+++ b/gatchat/gatserver.c
@@ -132,13 +132,70 @@ static gboolean alloc_free_list(GAtServer *server)
return TRUE;
}
+static gboolean replace_write_buf(GAtServer *server)
+{
+ struct ring_buffer *free_buf;
+
+ /* Add current buffer into full list and replace current buffer
+ * by a new free buffer */
+ server->full_list = g_slist_append(server->full_list,
+ server->write_buf);
+
+ /* All free lists are exhausted. Allocate more free lists */
+ if (!server->free_list) {
+ if (!alloc_free_list(server)) {
+ /* Failed so shutdown the socket */
+ g_at_server_shutdown(server);
+
+ return FALSE;
+ }
+ }
+
+ free_buf = server->free_list->data;
+
+ server->free_list = g_slist_remove(server->free_list, free_buf);
+
+ server->write_buf = free_buf;
+
+ return TRUE;
+}
+
static void send_common(GAtServer *server, const char *buf)
{
gsize avail = ring_buffer_avail(server->write_buf);
- gsize len = strlen(buf);
+ gsize towrite = strlen(buf);
+ gsize bytes_written;
+ gsize offset = 0;
+
+ if (avail > towrite) {
+ ring_buffer_write(server->write_buf, buf, towrite);
- if (avail >= len)
- ring_buffer_write(server->write_buf, buf, len);
+ g_at_server_wakeup_writer(server);
+
+ return;
+ }
+
+ /* Write as much as we can */
+ bytes_written = ring_buffer_write(server->write_buf, buf, avail);
+ towrite -= bytes_written;
+ offset = bytes_written;
+
+ /* If current write buf is full, replace it with next free buffer */
+ if (!replace_write_buf(server))
+ return;
+
+ bytes_written = ring_buffer_write(server->write_buf, buf + offset,
+ towrite);
+ while (bytes_written < towrite) {
+ /* The next free buf is full, replace with next free one */
+ if (!replace_write_buf(server))
+ return;
+
+ towrite -= bytes_written;
+ offset += bytes_written;
+ bytes_written = ring_buffer_write(server->write_buf,
+ buf + offset, towrite);
+ }
g_at_server_wakeup_writer(server);
}
--
1.6.6.1
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH 3/3] Add write server response into non blocking IO
2010-02-11 2:56 ` [PATCH 2/3] Add handle the case when write buffer is full Zhenhua Zhang
@ 2010-02-11 2:56 ` Zhenhua Zhang
0 siblings, 0 replies; 4+ messages in thread
From: Zhenhua Zhang @ 2010-02-11 2:56 UTC (permalink / raw)
To: ofono
[-- Attachment #1: Type: text/plain, Size: 2161 bytes --]
This and previous patch are for writing server response into
non-blocking GIOChannel when we have G_IO_OUT signal.
---
gatchat/gatserver.c | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 67 insertions(+), 0 deletions(-)
diff --git a/gatchat/gatserver.c b/gatchat/gatserver.c
index 2c160bf..4cc3cf6 100644
--- a/gatchat/gatserver.c
+++ b/gatchat/gatserver.c
@@ -532,6 +532,73 @@ static gboolean received_data(GIOChannel *channel, GIOCondition cond,
static gboolean can_write_data(GIOChannel *channel, GIOCondition cond,
gpointer data)
{
+ GAtServer *server = data;
+ GIOError err;
+ gsize bytes_written;
+ gsize towrite;
+ struct ring_buffer *write_buf;
+ unsigned char *buf;
+ gboolean write_again = FALSE;
+#ifdef WRITE_SCHEDULER_DEBUG
+ int limiter;
+#endif
+
+ if (cond & (G_IO_NVAL | G_IO_HUP | G_IO_ERR))
+ return FALSE;
+
+ /* If we have full buffer, write them out first */
+ if (server->full_list)
+ write_buf = server->full_list->data;
+ else
+ write_buf = server->write_buf;
+
+ buf = ring_buffer_read_ptr(write_buf, 0);
+
+ towrite = ring_buffer_len_no_wrap(write_buf);
+ if (towrite < (gsize)ring_buffer_len(write_buf))
+ write_again = TRUE;
+
+#ifdef WRITE_SCHEDULER_DEBUG
+ limiter = towrite;
+
+ if (limiter > 5)
+ limiter = 5;
+#endif
+
+ err = g_io_channel_write(server->channel,
+ (char *)buf,
+#ifdef WRITE_SCHEDULER_DEBUG
+ limiter,
+#else
+ towrite,
+#endif
+ &bytes_written);
+
+ if (err != G_IO_ERROR_NONE) {
+ g_source_remove(server->read_watch);
+ return FALSE;
+ }
+
+ g_at_util_debug_chat(FALSE, (char *)buf, bytes_written, server->debugf,
+ server->debug_data);
+
+ ring_buffer_drain(write_buf, bytes_written);
+
+ if (server->full_list) {
+ /* All data in current full list is written, free it */
+ if (ring_buffer_len(write_buf) == 0) {
+ server->full_list = g_slist_remove(server->full_list,
+ write_buf);
+
+ ring_buffer_free(write_buf);
+ }
+
+ return TRUE;
+ }
+
+ if (bytes_written < towrite || write_again == TRUE)
+ return TRUE;
+
return FALSE;
}
--
1.6.6.1
^ permalink raw reply related [flat|nested] 4+ messages in thread
* Re: [PATCH 1/3] Add write ring buffer for non-blocking write
2010-02-11 2:56 [PATCH 1/3] Add write ring buffer for non-blocking write Zhenhua Zhang
2010-02-11 2:56 ` [PATCH 2/3] Add handle the case when write buffer is full Zhenhua Zhang
@ 2010-02-11 18:44 ` Denis Kenzior
1 sibling, 0 replies; 4+ messages in thread
From: Denis Kenzior @ 2010-02-11 18:44 UTC (permalink / raw)
To: ofono
[-- Attachment #1: Type: text/plain, Size: 6899 bytes --]
Hi Zhenhua,
> Use two layers to cache server side response data to client.
> 1. A fixed-length ring buffer.
> 2. A list of free ring buffers and a list of empty full ring buffer.
>
> If current ring buffer is full, put it into full buffer list and fetch
> a free buffer frome free list.
> ---
> gatchat/gatserver.c | 106
> +++++++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 98
> insertions(+), 8 deletions(-)
>
> diff --git a/gatchat/gatserver.c b/gatchat/gatserver.c
> index bf7e847..6e76016 100644
> --- a/gatchat/gatserver.c
> +++ b/gatchat/gatserver.c
> @@ -32,6 +32,15 @@
> #include "ringbuffer.h"
> #include "gatserver.h"
>
> +#define MAX_BUFFER_NUM 5
> +#define READ_BUF_SIZE 4096
> +/* #define WRITE_SCHEDULER_DEBUG 1 */
> +#ifdef WRITE_SCHEDULER_DEBUG
> +#define WRITE_BUF_SIZE 4
> +#else
> +#define WRITE_BUF_SIZE 4096
> +#endif
> +
So I don't like this limitation, we should simply alloc a new buffer when we
run out of space.
> enum ParserState {
> PARSER_STATE_IDLE,
> PARSER_STATE_A,
> @@ -90,17 +99,50 @@ struct _GAtServer {
> struct v250_settings v250; /* V.250 command setting */
> GIOChannel *channel; /* Server IO */
> guint read_watch; /* GSource read id, 0 if none */
> + guint write_watch; /* GSource write id, 0 if none */
> guint read_so_far; /* Number of bytes processed */
> GAtDisconnectFunc user_disconnect; /* User disconnect func */
> gpointer user_disconnect_data; /* User disconnect data */
> GAtDebugFunc debugf; /* Debugging output function */
> gpointer debug_data; /* Data to pass to debug func */
> struct ring_buffer *read_buf; /* Current read buffer */
> + struct ring_buffer *write_buf; /* Current write buffer */
> + GSList *full_list; /* List of full ring buffer */
> + GSList *free_list; /* List of free ring buffer */
See above, we can really just maintain one list, the full list. The head and
tail pointers should be easily accessible (write new data out from the head,
put data on the tail.) Maintain at least one buffer in the list.
> guint max_read_attempts; /* Max reads per select */
> enum ParserState parser_state;
> gboolean destroyed; /* Re-entrancy guard */
> };
>
> +static void g_at_server_wakeup_writer(GAtServer *server);
> +
> +static gboolean alloc_free_list(GAtServer *server)
> +{
> + struct ring_buffer *buf;
> + guint i;
> +
> + for (i = 0; i < MAX_BUFFER_NUM; i++) {
> + buf = ring_buffer_new(WRITE_BUF_SIZE);
> + if (!buf)
> + return FALSE;
> +
> + server->free_list = g_slist_prepend(server->free_list, buf);
> + }
> +
> + return TRUE;
> +}
> +
> +static void send_common(GAtServer *server, const char *buf)
> +{
> + gsize avail = ring_buffer_avail(server->write_buf);
> + gsize len = strlen(buf);
This part is wrong, the data can actually be binary as well.
> +
> + if (avail >= len)
> + ring_buffer_write(server->write_buf, buf, len);
> +
> + g_at_server_wakeup_writer(server);
> +}
> +
> static void g_at_server_send_result(GAtServer *server, GAtServerResult
> result) {
> struct v250_settings v250 = server->v250;
> @@ -108,7 +150,6 @@ static void g_at_server_send_result(GAtServer *server,
> GAtServerResult result) char buf[1024];
> char t = v250.s3;
> char r = v250.s4;
> - gsize wbuf;
>
> if (v250.quiet)
> return;
> @@ -125,8 +166,7 @@ static void g_at_server_send_result(GAtServer *server,
> GAtServerResult result) g_at_util_debug_chat(FALSE, buf, strlen(buf),
> server->debugf, server->debug_data);
>
> - g_io_channel_write(server->channel, (char *) buf, strlen(buf),
> - &wbuf);
> + send_common(server, buf);
> }
>
> static inline gboolean is_at_command_prefix(const char c)
> @@ -432,12 +472,30 @@ static gboolean received_data(GIOChannel *channel,
> GIOCondition cond, return TRUE;
> }
>
> +static gboolean can_write_data(GIOChannel *channel, GIOCondition cond,
> + gpointer data)
> +{
> + return FALSE;
> +}
> +
> static void g_at_server_cleanup(GAtServer *server)
> {
> /* Cleanup all received data */
> ring_buffer_free(server->read_buf);
> server->read_buf = NULL;
>
> + /* Cleanup pending data to write */
> + ring_buffer_free(server->write_buf);
> + server->write_buf = NULL;
> +
> + if (server->full_list)
> + g_slist_foreach(server->full_list, (GFunc)ring_buffer_free,
> + NULL);
> +
> + if (server->free_list)
> + g_slist_foreach(server->free_list, (GFunc)ring_buffer_free,
> + NULL);
> +
> server->channel = NULL;
> }
>
> @@ -446,8 +504,6 @@ static void read_watcher_destroy_notify(GAtServer
> *server) g_at_server_cleanup(server);
> server->read_watch = 0;
>
> - server->channel = NULL;
> -
> if (server->user_disconnect)
> server->user_disconnect(server->user_disconnect_data);
>
> @@ -455,6 +511,23 @@ static void read_watcher_destroy_notify(GAtServer
> *server) g_free(server);
> }
>
> +static void write_watcher_destroy_notify(GAtServer *server)
> +{
> + server->write_watch = 0;
> +}
> +
> +static void g_at_server_wakeup_writer(GAtServer *server)
> +{
> + if (server->write_watch != 0)
> + return;
> +
> + server->write_watch = g_io_add_watch_full(server->channel,
> + G_PRIORITY_DEFAULT,
> + G_IO_OUT | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
> + can_write_data, server,
> + (GDestroyNotify)write_watcher_destroy_notify);
> +}
> +
> static void v250_settings_create(struct v250_settings *v250)
> {
> v250->s3 = '\r';
> @@ -482,12 +555,19 @@ GAtServer *g_at_server_new(GIOChannel *io)
> server->ref_count = 1;
> v250_settings_create(&server->v250);
> server->channel = io;
> - server->read_buf = ring_buffer_new(4096);
> - server->max_read_attempts = 3;
> -
> + server->read_buf = ring_buffer_new(READ_BUF_SIZE);
> if (!server->read_buf)
> goto error;
>
> + server->write_buf = ring_buffer_new(WRITE_BUF_SIZE);
> + if (!server->write_buf)
> + goto error;
> +
> + if (!alloc_free_list(server))
> + goto error;
> +
> + server->max_read_attempts = 3;
> +
> if (!g_at_util_setup_io(server->channel, G_IO_FLAG_NONBLOCK))
> goto error;
>
> @@ -502,6 +582,13 @@ error:
> if (server->read_buf)
> ring_buffer_free(server->read_buf);
>
> + if (server->write_buf)
> + ring_buffer_free(server->write_buf);
> +
> + if (server->free_list)
> + g_slist_foreach(server->free_list, (GFunc)ring_buffer_free,
> + NULL);
> +
> if (server)
> g_free(server);
>
> @@ -552,6 +639,9 @@ gboolean g_at_server_shutdown(GAtServer *server)
> server->user_disconnect = NULL;
> server->user_disconnect_data = NULL;
>
> + if (server->write_watch)
> + g_source_remove(server->write_watch);
> +
> if (server->read_watch)
> g_source_remove(server->read_watch);
>
Regards,
-Denis
^ permalink raw reply [flat|nested] 4+ messages in thread
end of thread, other threads:[~2010-02-11 18:44 UTC | newest]
Thread overview: 4+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2010-02-11 2:56 [PATCH 1/3] Add write ring buffer for non-blocking write Zhenhua Zhang
2010-02-11 2:56 ` [PATCH 2/3] Add handle the case when write buffer is full Zhenhua Zhang
2010-02-11 2:56 ` [PATCH 3/3] Add write server response into non blocking IO Zhenhua Zhang
2010-02-11 18:44 ` [PATCH 1/3] Add write ring buffer for non-blocking write Denis Kenzior
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.