From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from [140.186.70.92] (port=52446 helo=eggs.gnu.org) by lists.gnu.org with esmtp (Exim 4.43) id 1PwjGx-0005cP-LL for qemu-devel@nongnu.org; Mon, 07 Mar 2011 17:52:42 -0500 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1Pwj1p-0004GO-OD for qemu-devel@nongnu.org; Mon, 07 Mar 2011 17:35:52 -0500 Received: from e1.ny.us.ibm.com ([32.97.182.141]:35630) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1Pwj1p-0004G8-Es for qemu-devel@nongnu.org; Mon, 07 Mar 2011 17:35:49 -0500 Received: from d01dlp02.pok.ibm.com (d01dlp02.pok.ibm.com [9.56.224.85]) by e1.ny.us.ibm.com (8.14.4/8.13.1) with ESMTP id p27MPwAB017060 for ; Mon, 7 Mar 2011 17:25:58 -0500 Received: from d01relay01.pok.ibm.com (d01relay01.pok.ibm.com [9.56.227.233]) by d01dlp02.pok.ibm.com (Postfix) with ESMTP id A39676E8036 for ; Mon, 7 Mar 2011 17:35:47 -0500 (EST) Received: from d01av03.pok.ibm.com (d01av03.pok.ibm.com [9.56.224.217]) by d01relay01.pok.ibm.com (8.13.8/8.13.8/NCO v10.0) with ESMTP id p27MZlMl397240 for ; Mon, 7 Mar 2011 17:35:47 -0500 Received: from d01av03.pok.ibm.com (loopback [127.0.0.1]) by d01av03.pok.ibm.com (8.14.4/8.13.1/NCO v10.0 AVout) with ESMTP id p27MZkne018859 for ; Mon, 7 Mar 2011 19:35:47 -0300 Message-ID: <4D755DC0.2010001@linux.vnet.ibm.com> Date: Mon, 07 Mar 2011 16:35:44 -0600 From: Michael Roth MIME-Version: 1.0 References: <1299528642-23631-1-git-send-email-mdroth@linux.vnet.ibm.com> <1299528642-23631-5-git-send-email-mdroth@linux.vnet.ibm.com> <1299533059.2267.48.camel@aglitke> In-Reply-To: <1299533059.2267.48.camel@aglitke> Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: 7bit Subject: [Qemu-devel] Re: [RFC][PATCH v7 04/16] virtagent: bi-directional RPC handling logic List-Id: qemu-devel.nongnu.org List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Adam Litke Cc: stefanha@linux.vnet.ibm.com, markus_mueller@de.ibm.com, qemu-devel@nongnu.org, abeekhof@redhat.com, aliguori@linux.vnet.ibm.com, Jes.Sorensen@redhat.com On 03/07/2011 03:24 PM, Adam Litke wrote: > On Mon, 2011-03-07 at 14:10 -0600, Michael Roth wrote: >> This implements the state machine/logic used to manage >> send/receive/execute phases of RPCs we send or receive. It does so using >> a set of abstract methods we implement with the application and >> transport level code which will follow. >> >> Signed-off-by: Michael Roth >> --- >> virtagent-manager.c | 326 +++++++++++++++++++++++++++++++++++++++++++++++++++ >> virtagent-manager.h | 130 ++++++++++++++++++++ >> 2 files changed, 456 insertions(+), 0 deletions(-) >> create mode 100644 virtagent-manager.c >> create mode 100644 virtagent-manager.h >> >> diff --git a/virtagent-manager.c b/virtagent-manager.c >> new file mode 100644 >> index 0000000..51d26a3 >> --- /dev/null >> +++ b/virtagent-manager.c >> @@ -0,0 +1,326 @@ >> +/* >> + * virtagent - job queue management >> + * >> + * Copyright IBM Corp. 2011 >> + * >> + * Authors: >> + * Michael Roth >> + * >> + * This work is licensed under the terms of the GNU GPL, version 2 or later. >> + * See the COPYING file in the top-level directory. >> + * >> + */ >> + >> +#include "virtagent-common.h" >> + >> +typedef struct VAServerJob { >> + char tag[64]; >> + void *opaque; >> + VAServerJobOps ops; >> + QTAILQ_ENTRY(VAServerJob) next; >> + enum { >> + VA_SERVER_JOB_STATE_NEW = 0, >> + VA_SERVER_JOB_STATE_BUSY, >> + VA_SERVER_JOB_STATE_EXECUTED, >> + VA_SERVER_JOB_STATE_SENT, >> + VA_SERVER_JOB_STATE_DONE, >> + } state; >> +} VAServerJob; >> + >> +typedef struct VAClientJob { >> + char tag[64]; >> + void *opaque; >> + void *resp_opaque; >> + VAClientJobOps ops; >> + QTAILQ_ENTRY(VAClientJob) next; >> + enum { >> + VA_CLIENT_JOB_STATE_NEW = 0, >> + VA_CLIENT_JOB_STATE_BUSY, >> + VA_CLIENT_JOB_STATE_SENT, >> + VA_CLIENT_JOB_STATE_READ, >> + VA_CLIENT_JOB_STATE_DONE, >> + } state; >> +} VAClientJob; >> + >> +#define SEND_COUNT_MAX 1 >> +#define EXECUTE_COUNT_MAX 4 > > It's not immediately clear what the difference between SEND_COUNT_MAX > and EXECUTE_COUNT_MAX is. Some comments would help. Also, will the > code work if these numbers are changed? If not, a note about what > someone needs to look at when changing these would seem appropriate. > Basically the SEND_COUNT_MAX is the number of RPCs the client can have in flight at a time. EXECUTE_COUNT_MAX is the number of jobs the server can execute concurrently/asynchronously (execute as in actually do the "execute corresponding RPC" phase of a server job's lifecycle). These should be tweakable without much side-effect. These aren't currently that important since a monitor tends to limit us to 1 RPC at a time, and the guest agent doesn't make any substantial use of guest->host RPCs atm, so SEND_COUNT_MAX has little impact. We don't currently execute RPCs concurrently/asynchronously either, so EXECUTE_COUNT_MAX doesn't do much. But when threaded RPC execution is re-implemented this will come back into play. I'll make sure to add some comments on this. >> + >> +struct VAManager { >> + int send_count; /* sends in flight */ >> + int execute_count; /* number of jobs currently executing */ >> + QTAILQ_HEAD(, VAServerJob) server_jobs; >> + QTAILQ_HEAD(, VAClientJob) client_jobs; >> +}; >> + >> +/* server job operations/helpers */ >> + >> +static VAServerJob *va_server_job_by_tag(VAManager *m, const char *tag) >> +{ >> + VAServerJob *j; >> + QTAILQ_FOREACH(j,&m->server_jobs, next) { >> + if (strcmp(j->tag, tag) == 0) { >> + return j; >> + } >> + } >> + return NULL; >> +} >> + >> +int va_server_job_add(VAManager *m, const char *tag, void *opaque, >> + VAServerJobOps ops) >> +{ >> + VAServerJob *j = qemu_mallocz(sizeof(VAServerJob)); >> + TRACE("called"); > > Qemu has a good tracing infrastructure. If this is trace point is > useful enough to keep around, it should try to use that. If it's not > that important, I'd remove it entirely. I believe this has been flagged > in an earlier RFC too. These are really just to aid in development. I plan on NOOPing these via the DEBUG_VA flag before merge. Can also remove them if it's too nasty. Only a very small subset of these would be useful for the trace facility, I'll have a better idea of which ones once I stop relying on the TRACE() stuff. > >> + j->state = VA_SERVER_JOB_STATE_NEW; >> + j->ops = ops; >> + j->opaque = opaque; >> + memset(j->tag, 0, 64); >> + pstrcpy(j->tag, 63, tag); > > Magic numbers. Should use something like #define TAG_LEN 64 > >> + QTAILQ_INSERT_TAIL(&m->server_jobs, j, next); >> + va_kick(m); >> + return 0; >> +} >> + >> +static void va_server_job_execute(VAServerJob *j) >> +{ >> + TRACE("called"); >> + j->state = VA_SERVER_JOB_STATE_BUSY; >> + j->ops.execute(j->opaque, j->tag); >> +} >> + >> +/* TODO: need a way to pass information back */ >> +void va_server_job_execute_done(VAManager *m, const char *tag) >> +{ >> + VAServerJob *j = va_server_job_by_tag(m, tag); >> + TRACE("called"); >> + if (!j) { >> + LOG("server job with tag \"%s\" not found", tag); >> + return; >> + } >> + j->state = VA_SERVER_JOB_STATE_EXECUTED; >> + va_kick(m); >> +} >> + >> +static void va_server_job_send(VAServerJob *j) >> +{ >> + TRACE("called"); >> + j->state = VA_SERVER_JOB_STATE_BUSY; >> + j->ops.send(j->opaque, j->tag); >> +} >> + >> +void va_server_job_send_done(VAManager *m, const char *tag) >> +{ >> + VAServerJob *j = va_server_job_by_tag(m, tag); >> + TRACE("called"); >> + if (!j) { >> + LOG("server job with tag \"%s\" not found", tag); >> + return; >> + } >> + j->state = VA_SERVER_JOB_STATE_SENT; >> + m->send_count--; >> + va_kick(m); >> +} >> + >> +static void va_server_job_callback(VAServerJob *j) >> +{ >> + TRACE("called"); >> + j->state = VA_SERVER_JOB_STATE_BUSY; >> + if (j->ops.callback) { >> + j->ops.callback(j->opaque, j->tag); >> + } >> + j->state = VA_SERVER_JOB_STATE_DONE; >> +} >> + >> +void va_server_job_cancel(VAManager *m, const char *tag) >> +{ >> + VAServerJob *j = va_server_job_by_tag(m, tag); >> + TRACE("called"); >> + if (!j) { >> + LOG("server job with tag \"%s\" not found", tag); >> + return; >> + } >> + /* TODO: need to decrement sends/execs in flight appropriately */ >> + /* make callback and move to done state, kick() will handle cleanup */ >> + va_server_job_callback(j); >> + va_kick(m); >> +} >> + >> +/* client job operations */ >> + >> +static VAClientJob *va_client_job_by_tag(VAManager *m, const char *tag) >> +{ >> + VAClientJob *j; >> + QTAILQ_FOREACH(j,&m->client_jobs, next) { >> + if (strcmp(j->tag, tag) == 0) { >> + return j; >> + } >> + } >> + return NULL; >> +} >> + >> +int va_client_job_add(VAManager *m, const char *tag, void *opaque, >> + VAClientJobOps ops) >> +{ >> + VAClientJob *j = qemu_mallocz(sizeof(VAClientJob)); >> + TRACE("called"); >> + j->ops = ops; >> + j->opaque = opaque; >> + memset(j->tag, 0, 64); >> + pstrcpy(j->tag, 63, tag); >> + QTAILQ_INSERT_TAIL(&m->client_jobs, j, next); >> + va_kick(m); >> + return 0; >> +} >> + >> +static void va_client_job_send(VAClientJob *j) >> +{ >> + TRACE("called"); >> + j->state = VA_CLIENT_JOB_STATE_BUSY; >> + j->ops.send(j->opaque, j->tag); >> +} >> + >> +void va_client_job_send_done(VAManager *m, const char *tag) >> +{ >> + VAClientJob *j = va_client_job_by_tag(m, tag); >> + TRACE("called"); >> + if (!j) { >> + LOG("client job with tag \"%s\" not found", tag); >> + return; >> + } >> + j->state = VA_CLIENT_JOB_STATE_SENT; >> + m->send_count--; >> + va_kick(m); >> +} >> + >> +void va_client_job_read_done(VAManager *m, const char *tag, void *resp) >> +{ >> + VAClientJob *j = va_client_job_by_tag(m, tag); >> + TRACE("called"); >> + if (!j) { >> + LOG("client job with tag \"%s\" not found", tag); >> + return; >> + } >> + j->state = VA_CLIENT_JOB_STATE_READ; >> + j->resp_opaque = resp; >> + va_kick(m); >> +} >> + >> +static void va_client_job_callback(VAClientJob *j) >> +{ >> + TRACE("called"); >> + j->state = VA_CLIENT_JOB_STATE_BUSY; >> + if (j->ops.callback) { >> + j->ops.callback(j->opaque, j->resp_opaque, j->tag); >> + } >> + j->state = VA_CLIENT_JOB_STATE_DONE; >> +} >> + >> +void va_client_job_cancel(VAManager *m, const char *tag) >> +{ >> + VAClientJob *j = va_client_job_by_tag(m, tag); >> + TRACE("called"); >> + if (!j) { >> + LOG("client job with tag \"%s\" not found", tag); >> + return; >> + } >> + /* TODO: need to decrement sends/execs in flight appropriately */ >> + /* make callback and move to done state, kick() will handle cleanup */ >> + va_client_job_callback(j); >> + va_kick(m); >> +} >> + >> +/* general management functions */ >> + >> +VAManager *va_manager_new(void) >> +{ >> + VAManager *m = qemu_mallocz(sizeof(VAManager)); >> + QTAILQ_INIT(&m->client_jobs); >> + QTAILQ_INIT(&m->server_jobs); >> + return m; >> +} >> + >> +static void va_process_server_job(VAManager *m, VAServerJob *sj) >> +{ >> + switch (sj->state) { >> + case VA_SERVER_JOB_STATE_NEW: >> + TRACE("marker"); >> + va_server_job_execute(sj); >> + break; >> + case VA_SERVER_JOB_STATE_EXECUTED: >> + TRACE("marker"); >> + if (m->send_count< SEND_COUNT_MAX) { >> + TRACE("marker"); >> + va_server_job_send(sj); >> + m->send_count++; >> + } >> + break; >> + case VA_SERVER_JOB_STATE_SENT: >> + TRACE("marker"); >> + va_server_job_callback(sj); >> + break; >> + case VA_SERVER_JOB_STATE_BUSY: >> + TRACE("marker, server job currently busy"); >> + break; >> + case VA_SERVER_JOB_STATE_DONE: >> + TRACE("marker"); >> + QTAILQ_REMOVE(&m->server_jobs, sj, next); >> + break; >> + default: >> + LOG("error, unknown server job state"); >> + break; >> + } >> +} >> + >> +static void va_process_client_job(VAManager *m, VAClientJob *cj) >> +{ >> + switch (cj->state) { >> + case VA_CLIENT_JOB_STATE_NEW: >> + TRACE("marker"); >> + if (m->send_count< SEND_COUNT_MAX) { >> + TRACE("marker"); >> + va_client_job_send(cj); >> + m->send_count++; >> + } >> + break; >> + case VA_CLIENT_JOB_STATE_SENT: >> + TRACE("marker"); >> + //nothing to do here, awaiting read_done() >> + break; >> + case VA_CLIENT_JOB_STATE_READ: >> + TRACE("marker"); >> + va_client_job_callback(cj); >> + break; >> + case VA_CLIENT_JOB_STATE_DONE: >> + TRACE("marker"); >> + QTAILQ_REMOVE(&m->client_jobs, cj, next); >> + break; >> + case VA_CLIENT_JOB_STATE_BUSY: >> + TRACE("marker, client job currently busy"); >> + break; >> + default: >> + LOG("error, unknown client job state"); >> + break; >> + } >> +} >> + >> +void va_kick(VAManager *m) >> +{ >> + VAServerJob *sj, *sj_tmp; >> + VAClientJob *cj, *cj_tmp; >> + >> + TRACE("called"); >> + TRACE("send_count: %u, execute_count: %u", m->send_count, m->execute_count); >> + >> + /* TODO: make sure there is no starvation of jobs/operations here */ >> + >> + /* look for any work to be done among pending server jobs */ >> + QTAILQ_FOREACH_SAFE(sj,&m->server_jobs, next, sj_tmp) { >> + TRACE("marker, server tag: %s", sj->tag); >> + va_process_server_job(m, sj); >> + } >> + >> + /* look for work to be done among pending client jobs */ >> + QTAILQ_FOREACH_SAFE(cj,&m->client_jobs, next, cj_tmp) { >> + TRACE("marker, client tag: %s", cj->tag); >> + va_process_client_job(m, cj); >> + } >> +} >> diff --git a/virtagent-manager.h b/virtagent-manager.h >> new file mode 100644 >> index 0000000..7b463fb >> --- /dev/null >> +++ b/virtagent-manager.h >> @@ -0,0 +1,130 @@ >> +#ifndef VIRTAGENT_MANAGER_H >> +#define VIRTAGENT_MANAGER_H >> + >> +#include "qemu-common.h" >> +#include "qemu-queue.h" >> + >> +/* >> + * Protocol Overview: >> + * >> + * The virtagent protocol depends on a state machine to manage communication >> + * over a single connection stream, currently a virtio or isa serial channel. >> + * The basic characterization of the work being done is that clients >> + * send/handle client jobs locally, which are then read/handled remotely as >> + * server jobs. A client job consists of a request which is sent, and a >> + * response which is eventually recieved. A server job consists of a request >> + * which is recieved from the other end, and a response which is sent back. > > "i before e, except after c ..." (I misspell receive all the time too). > TIL about vim's spell check feature :) >> + * >> + * Server jobs are given priority over client jobs, i.e. if we send a client >> + * job (our request) and recieve a server job (their request), rather than >> + * await a response to the client job, we immediately begin processing the >> + * server job and then send back the response. This prevents us from being >> + * deadlocked in a situation where both sides have sent a client job and are >> + * awaiting the response before handling the other side's client job. >> + * >> + * Multiple in-flight requests are supported, but high request rates can >> + * potentially starve out the other side's client jobs / requests, so we'll >> + * behaved participants should periodically backoff on high request rates, or >> + * limit themselves to 1 request at a time (anything more than 1 can still >> + * potentionally remove any window for the other end to service it's own >> + * client jobs, since we can begin sending the next request before it begins >> + * send the response for the 2nd). >> + * >> + * On a related note, in the future, bidirectional user/session-level guest >> + * agents may also be supported via a forwarding service made available >> + * through the system-level guest agent. In this case it is up to the >> + * system-level agent to handle forwarding requests in such a way that we >> + * don't starve the host-side service out sheerly by having too many >> + * sessions/users trying to send RPCs at a constant rate. This would be >> + * supported through this job Manager via an additional "forwarder" job type. >> + * >> + * To encapsulate some of this logic, we define here a "Manager" class, which >> + * provides an abstract interface to a state machine which handles most of >> + * the above logic transparently to the transport/application-level code. >> + * This also makes it possible to utilize alternative >> + * transport/application-level protocols in the future. >> + * >> + */ >> + >> +/* >> + * Two types of jobs are generated from various components of virtagent. >> + * Each job type has a priority, and a set of prioritized functions as well. >> + * >> + * The read handler generates new server jobs as it recieves requests from >> + * the channel. Server jobs make progress through the following operations. >> + * >> + * EXECUTE->EXECUTE_DONE->SEND->SEND_DONE >> + * >> + * EXECUTE (provided by user, manager calls) >> + * When server jobs are added, eventually (as execution slots become >> + * available) an execute() will be called to begin executing the job. An >> + * error value will be returned if there is no room in the queue for another >> + * server job. >> + * >> + * EXECUTE_DONE (provided by manager, user calls) >> + * As server jobs complete, execute_completed() is called to update execution >> + * status of that job (failure/success), inject the payload, and kick off the >> + * next operation. >> + * >> + * SEND (provided by user, manager calls) >> + * Eventually the send() operation is made. This will cause the send handler >> + * to begin sending the response. >> + * >> + * SEND_DONE (provided by manager, user calls) >> + * Upon completion of that send, the send_completed() operation will be >> + * called. This will free up the job, and kick off the next operation. >> + */ > > Very helpful protocol overview. Thanks for adding this. > >> +typedef int (va_job_op)(void *opaque, const char *tag); >> +typedef struct VAServerJobOps { >> + va_job_op *execute; >> + va_job_op *send; >> + va_job_op *callback; >> +} VAServerJobOps; >> + >> +/* >> + * The client component generates new client jobs as they're made by >> + * virtagent in response to monitored events or user-issued commands. >> + * Client jobs progress via the following operations. >> + * >> + * SEND->SEND_DONE->READ_DONE >> + * >> + * SEND (provided by user, called by manager) >> + * After client jobs are added, send() will eventually be called to queue >> + * the job up for xmit over the channel. >> + * >> + * SEND_DONE (provided by manager, called by user) >> + * Upon completion of the send, send_completed() should be called with >> + * failure/success indication. >> + * >> + * READ_DONE (provided by manager, called by user) >> + * When a response for the request is read back via the transport layer, >> + * read_done() will be called by the user to indicate success/failure, >> + * inject the response, and make the associated callback. >> + */ >> +typedef int (va_client_job_cb)(void *opaque, void *resp_opaque, >> + const char *tag); >> +typedef struct VAClientJobOps { >> + va_job_op *send; >> + va_client_job_cb *callback; >> +} VAClientJobOps; >> + >> +typedef struct VAManager VAManager; >> + >> +VAManager *va_manager_new(void); >> +void va_kick(VAManager *m); >> + >> +/* interfaces for server jobs */ >> +int va_server_job_add(VAManager *m, const char *tag, void *opaque, >> + VAServerJobOps ops); >> +void va_server_job_execute_done(VAManager *m, const char *tag); >> +void va_server_job_send_done(VAManager *m, const char *tag); >> +void va_server_job_cancel(VAManager *m, const char *tag); >> + >> +/* interfaces for client jobs */ >> +int va_client_job_add(VAManager *m, const char *tag, void *opaque, >> + VAClientJobOps ops); >> +void va_client_job_cancel(VAManager *m, const char *tag); >> +void va_client_job_send_done(VAManager *m, const char *tag); >> +void va_client_job_read_done(VAManager *m, const char *tag, void *resp); >> + >> +#endif /* VIRTAGENT_MANAGER_H */ >