From: Stefan Beller <sbeller@google.com>
To: git@vger.kernel.org
Cc: gitster@pobox.com, jrnieder@gmail.com, hvoigt@hvoigt.net,
jens.lehmann@web.de, Stefan Beller <sbeller@google.com>
Subject: [RFC PATCH 2/3] run-commands: add an async queue processor
Date: Thu, 20 Aug 2015 18:40:36 -0700 [thread overview]
Message-ID: <1440121237-24576-2-git-send-email-sbeller@google.com> (raw)
In-Reply-To: <1440121237-24576-1-git-send-email-sbeller@google.com>
This adds functionality to do work in parallel.
The whole life cycle of such a thread pool would look like
struct task_queue * tq = create_task_queue(32); // no of threads
for (...)
add_task(tq, process_one_item_function, item); // non blocking
...
int ret = finish_task_queue(tq); // blocks until all tasks are done
if (!tq)
die ("Not all items were be processed");
The caller must take care of handling the output.
Signed-off-by: Stefan Beller <sbeller@google.com>
---
I sent this a while ago to the list, no comments on it :(
The core functionality stayed the same, but I hope to improved naming and
location of the code.
The WIP is only for the NO_PTHREADS case.
run-command.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----
run-command.h | 30 +++++++++
2 files changed, 230 insertions(+), 12 deletions(-)
diff --git a/run-command.c b/run-command.c
index 28e1d55..4029011 100644
--- a/run-command.c
+++ b/run-command.c
@@ -4,6 +4,21 @@
#include "sigchain.h"
#include "argv-array.h"
+#ifdef NO_PTHREADS
+
+#else
+
+#include "thread-utils.h"
+
+#include <pthread.h>
+#include <semaphore.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#endif
+
+#include "git-compat-util.h"
+
void child_process_init(struct child_process *child)
{
memset(child, 0, sizeof(*child));
@@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void))
#endif
+void setup_main_thread()
+{
+ if (!main_thread_set) {
+ /*
+ * We assume that the first time that start_async is called
+ * it is from the main thread.
+ */
+ main_thread_set = 1;
+ main_thread = pthread_self();
+ pthread_key_create(&async_key, NULL);
+ pthread_key_create(&async_die_counter, NULL);
+ set_die_routine(die_async);
+ set_die_is_recursing_routine(async_die_is_recursing);
+ }
+}
+
int start_async(struct async *async)
{
int need_in, need_out;
@@ -740,18 +771,7 @@ int start_async(struct async *async)
else if (async->out)
close(async->out);
#else
- if (!main_thread_set) {
- /*
- * We assume that the first time that start_async is called
- * it is from the main thread.
- */
- main_thread_set = 1;
- main_thread = pthread_self();
- pthread_key_create(&async_key, NULL);
- pthread_key_create(&async_die_counter, NULL);
- set_die_routine(die_async);
- set_die_is_recursing_routine(async_die_is_recursing);
- }
+ setup_main_thread();
if (proc_in >= 0)
set_cloexec(proc_in);
@@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
close(cmd->out);
return finish_command(cmd);
}
+
+#ifndef NO_PTHREADS
+struct job_list {
+ int (*fct)(struct task_queue *aq, void *task);
+ void *task;
+ struct job_list *next;
+};
+#endif
+
+struct task_queue {
+#ifndef NO_PTHREADS
+ /*
+ * To avoid deadlocks always aquire the semaphores with lowest priority
+ * first, priorites are in descending order as listed.
+ *
+ * The `mutex` is a general purpose lock for modifying data in the async
+ * queue, such as adding a new task or adding a return value from
+ * an already run task.
+ *
+ * `workingcount` and `freecount` are opposing semaphores, the sum of
+ * their values should equal `max_threads` at any time while the `mutex`
+ * is available.
+ */
+ sem_t mutex;
+ sem_t workingcount;
+ sem_t freecount;
+
+ pthread_t *threads;
+ unsigned max_threads;
+
+ struct job_list *first;
+ struct job_list *last;
+#endif
+ int early_return;
+};
+
+#ifndef NO_PTHREADS
+
+static void get_task(struct task_queue *aq,
+ int (**fct)(struct task_queue *aq, void *task),
+ void **task,
+ int *early_return)
+{
+ struct job_list *job;
+
+ sem_wait(&aq->workingcount);
+ sem_wait(&aq->mutex);
+
+ if (!aq->first)
+ die("BUG: internal error with dequeuing jobs for threads");
+ job = aq->first;
+ *fct = job->fct;
+ *task = job->task;
+ aq->early_return |= *early_return;
+ *early_return = aq->early_return;
+ aq->first = job->next;
+ if (!aq->first)
+ aq->last = NULL;
+
+ sem_post(&aq->freecount);
+ sem_post(&aq->mutex);
+
+ free(job);
+}
+
+static void* dispatcher(void *args)
+{
+ void *task;
+ int (*fct)(struct task_queue *aq, void *data);
+ int early_return = 0;
+ struct task_queue *aq = args;
+
+ get_task(aq, &fct, &task, &early_return);
+ while (fct || early_return != 0) {
+ early_return = fct(aq, task);
+ get_task(aq, &fct, &task, &early_return);
+ }
+
+ pthread_exit(0);
+}
+#endif
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+ struct task_queue *aq = xmalloc(sizeof(*aq));
+
+#ifndef NO_PTHREADS
+ int i;
+ if (!max_threads)
+ aq->max_threads = online_cpus();
+ else
+ aq->max_threads = max_threads;
+
+ sem_init(&aq->mutex, 0, 1);
+ sem_init(&aq->workingcount, 0, 0);
+ sem_init(&aq->freecount, 0, aq->max_threads);
+ aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t));
+
+ for (i = 0; i < aq->max_threads; i++)
+ pthread_create(&aq->threads[i], 0, &dispatcher, aq);
+
+ aq->first = NULL;
+ aq->last = NULL;
+
+ setup_main_thread();
+#endif
+ aq->early_return = 0;
+
+ return aq;
+}
+
+void add_task(struct task_queue *aq,
+ int (*fct)(struct task_queue *aq, void *task),
+ void *task)
+{
+#ifndef NO_PTHREADS
+ struct job_list *job_list;
+
+ job_list = xmalloc(sizeof(*job_list));
+ job_list->task = task;
+ job_list->fct = fct;
+ job_list->next = NULL;
+
+ sem_wait(&aq->freecount);
+ sem_wait(&aq->mutex);
+
+ if (!aq->last) {
+ aq->last = job_list;
+ aq->first = aq->last;
+ } else {
+ aq->last->next = job_list;
+ aq->last = aq->last->next;
+ }
+
+ sem_post(&aq->workingcount);
+ sem_post(&aq->mutex);
+#else
+ ALLOC_GROW(aq->ret->ret, aq->ret->count + 1, aq->ret->alloc);
+ aq->ret->ret[aq->ret->count++] = aq->function(job);
+#endif
+}
+
+int finish_task_queue(struct task_queue *aq)
+{
+ int ret;
+#ifndef NO_PTHREADS
+ int i;
+ for (i = 0; i < aq->max_threads; i++)
+ add_task(aq, NULL, NULL);
+
+ for (i = 0; i < aq->max_threads; i++)
+ pthread_join(aq->threads[i], 0);
+
+ sem_destroy(&aq->mutex);
+ sem_destroy(&aq->workingcount);
+ sem_destroy(&aq->freecount);
+
+ if (aq->first)
+ die("BUG: internal error with queuing jobs for threads");
+
+ free(aq->threads);
+#endif
+ ret = aq->early_return;
+
+ free(aq);
+ return ret;
+}
+
diff --git a/run-command.h b/run-command.h
index 5b4425a..c2cfd49 100644
--- a/run-command.h
+++ b/run-command.h
@@ -119,4 +119,34 @@ struct async {
int start_async(struct async *async);
int finish_async(struct async *async);
+/*
+ * Creates a struct `task_queue`, which holds a list of tasks. Up to
+ * `max_threads` threads are active to process the enqueued tasks
+ * processing the tasks in a first in first out order.
+ *
+ * If `max_threads` is zero the number of cores available will be used.
+ *
+ * Currently this only works in environments with pthreads, in other
+ * environments, the task will be processed sequentially in `add_task`.
+ */
+struct task_queue *create_task_queue(unsigned max_threads);
+
+/*
+ * The function and data are put into the task queue.
+ *
+ * The function `fct` must not be NULL, as that's used internally
+ * in `finish_task_queue` to signal shutdown. If the return code
+ * of `fct` is unequal to 0, the tasks will stop eventually,
+ * the current parallel tasks will be flushed out.
+ */
+void add_task(struct task_queue *aq,
+ int (*fct)(struct task_queue *aq, void *task),
+ void *task);
+
+/*
+ * Waits for all tasks to be done and frees the object. The return code
+ * is zero if all enqueued tasks were processed.
+ */
+int finish_task_queue(struct task_queue *aq);
+
#endif
--
2.5.0.264.g01b5c38.dirty
next prev parent reply other threads:[~2015-08-21 1:40 UTC|newest]
Thread overview: 16+ messages / expand[flat|nested] mbox.gz Atom feed top
2015-08-21 1:40 [PATCH 1/3] submodule: implement `module_clone` as a builtin helper Stefan Beller
2015-08-21 1:40 ` Stefan Beller [this message]
2015-08-21 19:05 ` [RFC PATCH 2/3] run-commands: add an async queue processor Junio C Hamano
2015-08-21 19:44 ` Jeff King
2015-08-21 19:48 ` Stefan Beller
2015-08-21 19:51 ` Jeff King
2015-08-21 20:12 ` Stefan Beller
2015-08-21 20:41 ` Junio C Hamano
2015-08-21 23:40 ` Stefan Beller
2015-08-24 21:22 ` Junio C Hamano
2015-08-21 19:45 ` Stefan Beller
2015-08-21 20:47 ` Junio C Hamano
2015-08-21 20:56 ` Stefan Beller
2015-08-21 1:40 ` [WIP/PATCH 3/3] submodule: helper to run foreach in parallel Stefan Beller
2015-08-21 19:23 ` Junio C Hamano
2015-08-21 20:21 ` Stefan Beller
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1440121237-24576-2-git-send-email-sbeller@google.com \
--to=sbeller@google.com \
--cc=git@vger.kernel.org \
--cc=gitster@pobox.com \
--cc=hvoigt@hvoigt.net \
--cc=jens.lehmann@web.de \
--cc=jrnieder@gmail.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.