* [Patch 2/3] tabled: use argument of a thread
@ 2009-12-16 5:10 Pete Zaitcev
0 siblings, 0 replies; only message in thread
From: Pete Zaitcev @ 2009-12-16 5:10 UTC (permalink / raw)
To: Jeff Garzik; +Cc: Project Hail List
We replace a comment with code to better show the intent. Once we have
several threads, we can plug TLS easier into this.
Signed-off-by: Pete Zaitcev <zaitcev@redhat.com>
---
server/replica.c | 69 ++++++++++++++++++++++++++-------------------
1 file changed, 41 insertions(+), 28 deletions(-)
Not sure if we actually want this at this stage, but I coded it up
yesterday after we talked about argument passing and TLS, to see just
how many rep_xxx functions need extra arguments. It was not too bad.
commit 32c8b8072c901e549ecbd8f1a29581b37f6cec16
Author: Master <zaitcev@lembas.zaitcev.lan>
Date: Tue Dec 15 21:20:22 2009 -0700
Pass arguments to a thread by official means.
diff --git a/server/replica.c b/server/replica.c
index 067accb..83b559d 100644
--- a/server/replica.c
+++ b/server/replica.c
@@ -27,11 +27,16 @@
#include <elist.h>
#include "tabled.h"
+struct rep_arg {
+ struct event_base *evbase;
+};
+
/*
* Replication Job
*/
struct rep_job {
struct list_head jlink;
+ struct rep_arg *arg;
uint64_t oid;
uint64_t size; /* all of the object */
@@ -57,11 +62,17 @@ static struct rep_jobs active = { 0, LIST_HEAD_INIT(active.jlist) };
static struct rep_jobs queue = { 0, LIST_HEAD_INIT(queue.jlist) };
static struct rep_jobs done = { 0, LIST_HEAD_INIT(done.jlist) };
-static struct event_base *evbase;
+/*
+ * These should actually be thread-local, but we only have one thread.
+ */
static struct event kscan_timer; /* db4 key rescan timer */
+static time_t kscan_last;
+
+/*
+ * These are module-scope things: global locks and flags, thread list, etc.
+ */
static bool kscan_enabled = false;
static GThread *scan_thread;
-static time_t kscan_last;
static void job_dispatch(void);
@@ -253,7 +264,7 @@ static void job_dispatch()
if (!job->buf)
goto err_malloc;
- rc = stor_open(&job->in_ce, job->src, evbase);
+ rc = stor_open(&job->in_ce, job->src, job->arg->evbase);
if (rc) {
applog(LOG_WARNING, "Cannot open input chunk, nid %u (%d)",
job->src->id, rc);
@@ -261,7 +272,7 @@ static void job_dispatch()
}
job->in_ce.cli = job;
- rc = stor_open(&job->out_ce, job->dst, evbase);
+ rc = stor_open(&job->out_ce, job->dst, job->arg->evbase);
if (rc) {
applog(LOG_WARNING, "Cannot open output chunk, nid %u (%d)",
job->dst->id, rc);
@@ -366,7 +377,8 @@ static struct rep_job *job_find_by_oid(uint64_t oid)
}
/* start replicating the key somewhere */
-static void rep_job_start(size_t klen, struct db_obj_key *key,
+static void rep_job_start(struct rep_arg *arg,
+ size_t klen, struct db_obj_key *key,
uint64_t oid, uint64_t objsize,
int nnum, struct storage_node *nvec[])
{
@@ -386,18 +398,15 @@ static void rep_job_start(size_t klen, struct db_obj_key *key,
job = job_alloc(klen, key);
if (!job)
goto err_alloc;
+ job->arg = arg;
job->oid = oid;
job->size = objsize;
job->src = job_select_src(nnum, nvec);
- if (!job->src) {
- /* P3 */ applog(LOG_INFO, "no src oid %llX", (long long) oid);
+ if (!job->src)
goto err_src;
- }
job->dst = job_select_dst(nnum, nvec);
- if (!job->dst) {
- /* P3 */ applog(LOG_INFO, "no dst oid %llX", (long long) oid);
+ if (!job->dst)
goto err_dst;
- }
if (job->src->id == job->dst->id) {
/* Is this bad enough to invoke exit(1) right here? */
applog(LOG_ERR, "Internal error, copy from/to nid %u",
@@ -540,7 +549,8 @@ static int rep_scan_parse(struct cursor *cp, struct db_obj_ent *obj)
}
/* meat of scan - check if replication is need on the key */
-static void rep_scan_verify(struct cursor *cp, struct db_obj_ent *obj)
+static void rep_scan_verify(struct rep_arg *arg,
+ struct cursor *cp, struct db_obj_ent *obj)
{
char bucket_name[65];
char object_name[1025];
@@ -595,9 +605,8 @@ static void rep_scan_verify(struct cursor *cp, struct db_obj_ent *obj)
allcnt, redcnt);
if (redcnt < MAXWAY) { /* maybe have MINWAY too? */
- rep_job_start(cp->klen, cp->key, oid,
- GUINT64_FROM_LE(obj->size),
- redcnt, redvec);
+ rep_job_start(arg, cp->klen, cp->key, oid,
+ GUINT64_FROM_LE(obj->size), redcnt, redvec);
}
for (i = 0; i < redcnt; i++)
@@ -723,7 +732,7 @@ static void rep_retire(void)
}
}
-static void rep_scan(void)
+static void rep_scan(struct rep_arg *arg)
{
struct cursor cur;
struct db_obj_ent *obj;
@@ -767,7 +776,7 @@ static void rep_scan(void)
}
if (!GUINT32_FROM_LE(obj->flags) & DB_OBJ_INLINE)
- rep_scan_verify(&cur, obj);
+ rep_scan_verify(arg, &cur, obj);
free(obj);
kcnt++;
@@ -792,17 +801,20 @@ static void add_kscan_timer(void)
static void tdb_keyscan(int fd, short events, void *userdata)
{
+ struct rep_arg *arg = userdata;
+
if (kscan_enabled)
- rep_scan();
+ rep_scan(arg);
add_kscan_timer();
}
static gpointer rep_thread_func(gpointer data)
{
+ struct rep_arg *arg = data;
int rc;
- evtimer_set(&kscan_timer, tdb_keyscan, NULL);
- event_base_set(evbase, &kscan_timer);
+ evtimer_set(&kscan_timer, tdb_keyscan, arg);
+ event_base_set(arg->evbase, &kscan_timer);
/*
* We must add an event now, or else event_base_dispatch will
@@ -811,7 +823,7 @@ static gpointer rep_thread_func(gpointer data)
add_kscan_timer();
for (;;) {
- rc = event_base_dispatch(evbase);
+ rc = event_base_dispatch(arg->evbase);
applog(LOG_ERR, "rep event_base_dispatch exits (%d)", rc);
sleep(300); /* Should not happen, so maybe exit(1)? */
}
@@ -821,15 +833,16 @@ static gpointer rep_thread_func(gpointer data)
void rep_init(struct event_base *ev_base)
{
GError *error;
+ struct rep_arg *arg;
- /* We could pass this event_base as an arg to our replica thread
- * via g_thread_create(), but that seems pointless given that
- * we are storing the event base as a module-local static
- * anyway.
- */
- evbase = ev_base;
+ arg = malloc(sizeof(struct rep_arg));
+ if (!arg) {
+ applog(LOG_ERR, "No core");
+ exit(1);
+ }
+ arg->evbase = ev_base;
- scan_thread = g_thread_create(rep_thread_func, NULL, FALSE, &error);
+ scan_thread = g_thread_create(rep_thread_func, arg, FALSE, &error);
if (scan_thread == NULL) {
applog(LOG_ERR, "Failed to start replication thread: %s",
error->message);
^ permalink raw reply related [flat|nested] only message in thread
only message in thread, other threads:[~2009-12-16 5:10 UTC | newest]
Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2009-12-16 5:10 [Patch 2/3] tabled: use argument of a thread Pete Zaitcev
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.