All of lore.kernel.org
 help / color / mirror / Atom feed
From: Stefan Beller <sbeller@google.com>
To: git@vger.kernel.org
Cc: peff@peff.net, jrnieder@gmail.com, gitster@pobox.com,
	johannes.schindelin@gmx.de, Stefan Beller <sbeller@google.com>
Subject: [PATCH 4/9] thread-utils: add a threaded task queue
Date: Thu, 27 Aug 2015 18:14:50 -0700	[thread overview]
Message-ID: <1440724495-708-5-git-send-email-sbeller@google.com> (raw)
In-Reply-To: <1440724495-708-1-git-send-email-sbeller@google.com>

This adds functionality to do work in a parallel threaded
fashion while the boiler plate code for setting up threads
and tearing them down as well as queuing up tasks is hidden
behind the new API.

Signed-off-by: Stefan Beller <sbeller@google.com>
---
 run-command.c  |  39 +++++++-----
 run-command.h  |   3 +
 thread-utils.c | 192 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 thread-utils.h |  35 +++++++++++
 4 files changed, 253 insertions(+), 16 deletions(-)

diff --git a/run-command.c b/run-command.c
index 28e1d55..3d37f8c 100644
--- a/run-command.c
+++ b/run-command.c
@@ -610,10 +610,12 @@ static NORETURN void die_async(const char *err, va_list params)
 
 	if (!pthread_equal(main_thread, pthread_self())) {
 		struct async *async = pthread_getspecific(async_key);
-		if (async->proc_in >= 0)
-			close(async->proc_in);
-		if (async->proc_out >= 0)
-			close(async->proc_out);
+		if (async) {
+			if (async->proc_in >= 0)
+				close(async->proc_in);
+			if (async->proc_out >= 0)
+				close(async->proc_out);
+		}
 		pthread_exit((void *)128);
 	}
 
@@ -668,6 +670,22 @@ int git_atexit(void (*handler)(void))
 
 #endif
 
+void setup_main_thread(void)
+{
+	if (!main_thread_set) {
+		/*
+		 * We assume that the first time that start_async is called
+		 * it is from the main thread.
+		 */
+		main_thread_set = 1;
+		main_thread = pthread_self();
+		pthread_key_create(&async_key, NULL);
+		pthread_key_create(&async_die_counter, NULL);
+		set_die_routine(die_async);
+		set_die_is_recursing_routine(async_die_is_recursing);
+	}
+}
+
 int start_async(struct async *async)
 {
 	int need_in, need_out;
@@ -740,18 +758,7 @@ int start_async(struct async *async)
 	else if (async->out)
 		close(async->out);
 #else
-	if (!main_thread_set) {
-		/*
-		 * We assume that the first time that start_async is called
-		 * it is from the main thread.
-		 */
-		main_thread_set = 1;
-		main_thread = pthread_self();
-		pthread_key_create(&async_key, NULL);
-		pthread_key_create(&async_die_counter, NULL);
-		set_die_routine(die_async);
-		set_die_is_recursing_routine(async_die_is_recursing);
-	}
+	setup_main_thread();
 
 	if (proc_in >= 0)
 		set_cloexec(proc_in);
diff --git a/run-command.h b/run-command.h
index 5b4425a..176a5b2 100644
--- a/run-command.h
+++ b/run-command.h
@@ -119,4 +119,7 @@ struct async {
 int start_async(struct async *async);
 int finish_async(struct async *async);
 
+/* die gracefully from within threads */
+void setup_main_thread(void);
+
 #endif
diff --git a/thread-utils.c b/thread-utils.c
index a2135e0..30ccd79 100644
--- a/thread-utils.c
+++ b/thread-utils.c
@@ -1,5 +1,7 @@
 #include "cache.h"
 #include "thread-utils.h"
+#include "run-command.h"
+#include "git-compat-util.h"
 
 #if defined(hpux) || defined(__hpux) || defined(_hpux)
 #  include <sys/pstat.h>
@@ -75,3 +77,193 @@ int init_recursive_mutex(pthread_mutex_t *m)
 	}
 	return ret;
 }
+
+#ifndef NO_PTHREADS
+struct job_list {
+	int (*fct)(struct task_queue *tq, void *task);
+	void *task;
+	struct job_list *next;
+};
+
+struct task_queue {
+	pthread_mutex_t mutex;
+	pthread_cond_t cond_non_empty;
+
+	int queued_tasks;
+	struct job_list *first;
+	struct job_list *last;
+
+	pthread_t *threads;
+	unsigned max_threads;
+	unsigned max_tasks;
+
+	void (*finish_function)(struct task_queue *tq);
+	int early_return;
+};
+
+static void next_task(struct task_queue *tq,
+		      int (**fct)(struct task_queue *tq, void *task),
+		      void **task,
+		      int *early_return)
+{
+	struct job_list *job = NULL;
+
+	pthread_mutex_lock(&tq->mutex);
+	while (tq->queued_tasks == 0)
+		pthread_cond_wait(&tq->cond_non_empty, &tq->mutex);
+
+	tq->early_return |= *early_return;
+
+	if (!tq->early_return) {
+		job = tq->first;
+		tq->first = job->next;
+		if (!tq->first)
+			tq->last = NULL;
+		tq->queued_tasks--;
+	}
+
+	pthread_mutex_unlock(&tq->mutex);
+
+	if (job) {
+		*fct = job->fct;
+		*task = job->task;
+	} else {
+		*fct = NULL;
+		*task = NULL;
+	}
+
+	free(job);
+}
+
+static void *dispatcher(void *args)
+{
+	void *task;
+	int (*fct)(struct task_queue *tq, void *task);
+	int early_return = 0;
+	struct task_queue *tq = args;
+
+	next_task(tq, &fct, &task, &early_return);
+	while (fct && !early_return) {
+		early_return = fct(tq, task);
+		next_task(tq, &fct, &task, &early_return);
+	}
+
+	if (tq->finish_function)
+		tq->finish_function(tq);
+
+	pthread_exit(0);
+}
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+	struct task_queue *tq = xmalloc(sizeof(*tq));
+
+	int i, ret;
+	if (!max_threads)
+		tq->max_threads = online_cpus();
+	else
+		tq->max_threads = max_threads;
+
+	pthread_mutex_init(&tq->mutex, NULL);
+	pthread_cond_init(&tq->cond_non_empty, NULL);
+
+	tq->threads = xmalloc(tq->max_threads * sizeof(pthread_t));
+
+	tq->queued_tasks = 0;
+	tq->first = NULL;
+	tq->last = NULL;
+
+	setup_main_thread();
+
+	for (i = 0; i < tq->max_threads; i++) {
+		ret = pthread_create(&tq->threads[i], 0, &dispatcher, tq);
+		if (ret)
+			die("unable to create thread: %s", strerror(ret));
+	}
+
+	tq->early_return = 0;
+
+	return tq;
+}
+
+void add_task(struct task_queue *tq,
+	      int (*fct)(struct task_queue *tq, void *task),
+	      void *task)
+{
+	struct job_list *job_list;
+
+	job_list = xmalloc(sizeof(*job_list));
+	job_list->task = task;
+	job_list->fct = fct;
+	job_list->next = NULL;
+
+	pthread_mutex_lock(&tq->mutex);
+
+	if (!tq->last) {
+		tq->last = job_list;
+		tq->first = tq->last;
+	} else {
+		tq->last->next = job_list;
+		tq->last = tq->last->next;
+	}
+	tq->queued_tasks++;
+
+	pthread_mutex_unlock(&tq->mutex);
+	pthread_cond_signal(&tq->cond_non_empty);
+}
+
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq))
+{
+	int ret;
+	int i;
+
+	tq->finish_function = fct;
+
+	for (i = 0; i < tq->max_threads; i++)
+		add_task(tq, NULL, NULL);
+
+	for (i = 0; i < tq->max_threads; i++)
+		pthread_join(tq->threads[i], 0);
+
+	pthread_mutex_destroy(&tq->mutex);
+	pthread_cond_destroy(&tq->cond_non_empty);
+
+	if (tq->first)
+		die("BUG: internal error with queuing jobs for threads");
+
+	free(tq->threads);
+	ret = tq->early_return;
+
+	free(tq);
+	return ret;
+}
+#else /* NO_PTHREADS */
+
+struct task_queue {
+	int early_return;
+};
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+	struct task_queue *tq = xmalloc(sizeof(*tq));
+
+	tq->early_return = 0;
+}
+
+void add_task(struct task_queue *tq,
+	      int (*fct)(struct task_queue *tq, void *task),
+	      void *task)
+{
+	if (tq->early_return)
+		return;
+
+	tq->early_return |= fct(tq, task);
+}
+
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq))
+{
+	int ret = tq->early_return;
+	free(tq);
+	return ret;
+}
+#endif
diff --git a/thread-utils.h b/thread-utils.h
index d9a769d..f41cfb1 100644
--- a/thread-utils.h
+++ b/thread-utils.h
@@ -12,4 +12,39 @@ extern int init_recursive_mutex(pthread_mutex_t*);
 #define online_cpus() 1
 
 #endif
+
+/*
+ * Creates a struct `task_queue`, which holds a list of tasks. Up to
+ * `max_threads` threads are active to process the enqueued tasks
+ * processing the tasks in a first in first out order.
+ *
+ * If `max_threads` is zero the number of cores available will be used.
+ *
+ * Currently this only works in environments with pthreads, in other
+ * environments, the task will be processed sequentially in `add_task`.
+ */
+struct task_queue *create_task_queue(unsigned max_threads);
+
+/*
+ * The function and data are put into the task queue.
+ *
+ * The function `fct` must not be NULL, as that's used internally
+ * in `finish_task_queue` to signal shutdown. If the return code
+ * of `fct` is unequal to 0, the tasks will stop eventually,
+ * the current parallel tasks will be flushed out.
+ */
+void add_task(struct task_queue *tq,
+	      int (*fct)(struct task_queue *tq, void *task),
+	      void *task);
+
+/*
+ * Waits for all tasks to be done and frees the object. The return code
+ * is zero if all enqueued tasks were processed.
+ *
+ * The function `fct` is called once in each thread after the last task
+ * for that thread was processed. If no thread local cleanup needs to be
+ * performed, pass NULL.
+ */
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq));
+
 #endif /* THREAD_COMPAT_H */
-- 
2.5.0.264.g5e52b0d

  parent reply	other threads:[~2015-08-28  1:15 UTC|newest]

Thread overview: 33+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-08-28  1:14 [PATCH 0/9] Progress with git submodule Stefan Beller
2015-08-28  1:14 ` [PATCH 1/9] submodule: implement `module_list` as a builtin helper Stefan Beller
2015-08-28  1:14 ` [PATCH 2/9] submodule: implement `module_name` " Stefan Beller
2015-08-28  1:14 ` [PATCH 3/9] submodule: implement `module_clone` " Stefan Beller
2015-08-31 18:53   ` Junio C Hamano
2015-08-28  1:14 ` Stefan Beller [this message]
2015-08-28  1:14 ` [PATCH 5/9] run-command: add synced output Stefan Beller
2015-08-28  1:14 ` [PATCH 6/9] submodule: helper to run foreach in parallel Stefan Beller
2015-08-28 17:08   ` Stefan Beller
2015-08-28  1:14 ` [PATCH 7/9] fetch: fetch submodules " Stefan Beller
2015-08-28 17:00   ` Stefan Beller
2015-08-28 17:01     ` Jonathan Nieder
2015-08-28 17:12       ` Junio C Hamano
2015-08-28 17:45         ` Stefan Beller
2015-08-28 18:20         ` Jonathan Nieder
2015-08-28 18:27           ` Junio C Hamano
2015-08-28 18:35             ` Jeff King
2015-08-28 18:41               ` Junio C Hamano
2015-08-28 18:41               ` Stefan Beller
2015-08-28 18:44                 ` Jeff King
2015-08-28 18:50                   ` Jonathan Nieder
2015-08-28 18:53                     ` Jeff King
2015-08-28 19:02                       ` Stefan Beller
2015-08-28 18:59                   ` Stefan Beller
2015-08-28 18:44               ` Jonathan Nieder
2015-08-28 18:36             ` Stefan Beller
2015-08-28 18:42             ` Jonathan Nieder
2015-08-31 18:56   ` Junio C Hamano
2015-08-31 19:05     ` Jeff King
2015-08-28  1:14 ` [PATCH 8/9] index-pack: Use the new worker pool Stefan Beller
2015-08-28  1:14 ` [PATCH 9/9] pack-objects: Use " Stefan Beller
2015-08-28 10:09 ` [PATCH 0/9] Progress with git submodule Johannes Schindelin
2015-08-28 16:35   ` Stefan Beller

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1440724495-708-5-git-send-email-sbeller@google.com \
    --to=sbeller@google.com \
    --cc=git@vger.kernel.org \
    --cc=gitster@pobox.com \
    --cc=johannes.schindelin@gmx.de \
    --cc=jrnieder@gmail.com \
    --cc=peff@peff.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.