From: Pete Zaitcev <zaitcev@redhat.com>
To: Jeff Garzik <jeff@garzik.org>
Cc: Project Hail List <hail-devel@vger.kernel.org>
Subject: [Patch 2/3] tabled: use argument of a thread
Date: Tue, 15 Dec 2009 22:10:02 -0700 [thread overview]
Message-ID: <20091215221002.30be525e@redhat.com> (raw)
We replace a comment with code to better show the intent. Once we have
several threads, we can plug TLS easier into this.
Signed-off-by: Pete Zaitcev <zaitcev@redhat.com>
---
server/replica.c | 69 ++++++++++++++++++++++++++-------------------
1 file changed, 41 insertions(+), 28 deletions(-)
Not sure if we actually want this at this stage, but I coded it up
yesterday after we talked about argument passing and TLS, to see just
how many rep_xxx functions need extra arguments. It was not too bad.
commit 32c8b8072c901e549ecbd8f1a29581b37f6cec16
Author: Master <zaitcev@lembas.zaitcev.lan>
Date: Tue Dec 15 21:20:22 2009 -0700
Pass arguments to a thread by official means.
diff --git a/server/replica.c b/server/replica.c
index 067accb..83b559d 100644
--- a/server/replica.c
+++ b/server/replica.c
@@ -27,11 +27,16 @@
#include <elist.h>
#include "tabled.h"
+struct rep_arg {
+ struct event_base *evbase;
+};
+
/*
* Replication Job
*/
struct rep_job {
struct list_head jlink;
+ struct rep_arg *arg;
uint64_t oid;
uint64_t size; /* all of the object */
@@ -57,11 +62,17 @@ static struct rep_jobs active = { 0, LIST_HEAD_INIT(active.jlist) };
static struct rep_jobs queue = { 0, LIST_HEAD_INIT(queue.jlist) };
static struct rep_jobs done = { 0, LIST_HEAD_INIT(done.jlist) };
-static struct event_base *evbase;
+/*
+ * These should actually be thread-local, but we only have one thread.
+ */
static struct event kscan_timer; /* db4 key rescan timer */
+static time_t kscan_last;
+
+/*
+ * These are module-scope things: global locks and flags, thread list, etc.
+ */
static bool kscan_enabled = false;
static GThread *scan_thread;
-static time_t kscan_last;
static void job_dispatch(void);
@@ -253,7 +264,7 @@ static void job_dispatch()
if (!job->buf)
goto err_malloc;
- rc = stor_open(&job->in_ce, job->src, evbase);
+ rc = stor_open(&job->in_ce, job->src, job->arg->evbase);
if (rc) {
applog(LOG_WARNING, "Cannot open input chunk, nid %u (%d)",
job->src->id, rc);
@@ -261,7 +272,7 @@ static void job_dispatch()
}
job->in_ce.cli = job;
- rc = stor_open(&job->out_ce, job->dst, evbase);
+ rc = stor_open(&job->out_ce, job->dst, job->arg->evbase);
if (rc) {
applog(LOG_WARNING, "Cannot open output chunk, nid %u (%d)",
job->dst->id, rc);
@@ -366,7 +377,8 @@ static struct rep_job *job_find_by_oid(uint64_t oid)
}
/* start replicating the key somewhere */
-static void rep_job_start(size_t klen, struct db_obj_key *key,
+static void rep_job_start(struct rep_arg *arg,
+ size_t klen, struct db_obj_key *key,
uint64_t oid, uint64_t objsize,
int nnum, struct storage_node *nvec[])
{
@@ -386,18 +398,15 @@ static void rep_job_start(size_t klen, struct db_obj_key *key,
job = job_alloc(klen, key);
if (!job)
goto err_alloc;
+ job->arg = arg;
job->oid = oid;
job->size = objsize;
job->src = job_select_src(nnum, nvec);
- if (!job->src) {
- /* P3 */ applog(LOG_INFO, "no src oid %llX", (long long) oid);
+ if (!job->src)
goto err_src;
- }
job->dst = job_select_dst(nnum, nvec);
- if (!job->dst) {
- /* P3 */ applog(LOG_INFO, "no dst oid %llX", (long long) oid);
+ if (!job->dst)
goto err_dst;
- }
if (job->src->id == job->dst->id) {
/* Is this bad enough to invoke exit(1) right here? */
applog(LOG_ERR, "Internal error, copy from/to nid %u",
@@ -540,7 +549,8 @@ static int rep_scan_parse(struct cursor *cp, struct db_obj_ent *obj)
}
/* meat of scan - check if replication is need on the key */
-static void rep_scan_verify(struct cursor *cp, struct db_obj_ent *obj)
+static void rep_scan_verify(struct rep_arg *arg,
+ struct cursor *cp, struct db_obj_ent *obj)
{
char bucket_name[65];
char object_name[1025];
@@ -595,9 +605,8 @@ static void rep_scan_verify(struct cursor *cp, struct db_obj_ent *obj)
allcnt, redcnt);
if (redcnt < MAXWAY) { /* maybe have MINWAY too? */
- rep_job_start(cp->klen, cp->key, oid,
- GUINT64_FROM_LE(obj->size),
- redcnt, redvec);
+ rep_job_start(arg, cp->klen, cp->key, oid,
+ GUINT64_FROM_LE(obj->size), redcnt, redvec);
}
for (i = 0; i < redcnt; i++)
@@ -723,7 +732,7 @@ static void rep_retire(void)
}
}
-static void rep_scan(void)
+static void rep_scan(struct rep_arg *arg)
{
struct cursor cur;
struct db_obj_ent *obj;
@@ -767,7 +776,7 @@ static void rep_scan(void)
}
if (!GUINT32_FROM_LE(obj->flags) & DB_OBJ_INLINE)
- rep_scan_verify(&cur, obj);
+ rep_scan_verify(arg, &cur, obj);
free(obj);
kcnt++;
@@ -792,17 +801,20 @@ static void add_kscan_timer(void)
static void tdb_keyscan(int fd, short events, void *userdata)
{
+ struct rep_arg *arg = userdata;
+
if (kscan_enabled)
- rep_scan();
+ rep_scan(arg);
add_kscan_timer();
}
static gpointer rep_thread_func(gpointer data)
{
+ struct rep_arg *arg = data;
int rc;
- evtimer_set(&kscan_timer, tdb_keyscan, NULL);
- event_base_set(evbase, &kscan_timer);
+ evtimer_set(&kscan_timer, tdb_keyscan, arg);
+ event_base_set(arg->evbase, &kscan_timer);
/*
* We must add an event now, or else event_base_dispatch will
@@ -811,7 +823,7 @@ static gpointer rep_thread_func(gpointer data)
add_kscan_timer();
for (;;) {
- rc = event_base_dispatch(evbase);
+ rc = event_base_dispatch(arg->evbase);
applog(LOG_ERR, "rep event_base_dispatch exits (%d)", rc);
sleep(300); /* Should not happen, so maybe exit(1)? */
}
@@ -821,15 +833,16 @@ static gpointer rep_thread_func(gpointer data)
void rep_init(struct event_base *ev_base)
{
GError *error;
+ struct rep_arg *arg;
- /* We could pass this event_base as an arg to our replica thread
- * via g_thread_create(), but that seems pointless given that
- * we are storing the event base as a module-local static
- * anyway.
- */
- evbase = ev_base;
+ arg = malloc(sizeof(struct rep_arg));
+ if (!arg) {
+ applog(LOG_ERR, "No core");
+ exit(1);
+ }
+ arg->evbase = ev_base;
- scan_thread = g_thread_create(rep_thread_func, NULL, FALSE, &error);
+ scan_thread = g_thread_create(rep_thread_func, arg, FALSE, &error);
if (scan_thread == NULL) {
applog(LOG_ERR, "Failed to start replication thread: %s",
error->message);
reply other threads:[~2009-12-16 5:10 UTC|newest]
Thread overview: [no followups] expand[flat|nested] mbox.gz Atom feed
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20091215221002.30be525e@redhat.com \
--to=zaitcev@redhat.com \
--cc=hail-devel@vger.kernel.org \
--cc=jeff@garzik.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.