* [PATCH 1/5] submodule: implement `module_clone` as a builtin helper
2015-08-27 0:52 [RFC PATCH 0/5] Progressing with `git submodule foreach_parallel` Stefan Beller
@ 2015-08-27 0:52 ` Stefan Beller
2015-08-27 12:36 ` Johannes Schindelin
2015-08-27 0:52 ` [PATCH 2/5] thread-utils: add a threaded task queue Stefan Beller
` (3 subsequent siblings)
4 siblings, 1 reply; 12+ messages in thread
From: Stefan Beller @ 2015-08-27 0:52 UTC (permalink / raw)
To: git; +Cc: peff, jrnieder, gitster, Stefan Beller
`module_clone` is part of the update command, which I want to convert
to C next.
Signed-off-by: Stefan Beller <sbeller@google.com>
Signed-off-by: Junio C Hamano <gitster@pobox.com>
---
builtin/submodule--helper.c | 160 +++++++++++++++++++++++++++++++++++++++++++-
git-submodule.sh | 80 +---------------------
2 files changed, 162 insertions(+), 78 deletions(-)
diff --git a/builtin/submodule--helper.c b/builtin/submodule--helper.c
index 4b32a3c..f11fb9c 100644
--- a/builtin/submodule--helper.c
+++ b/builtin/submodule--helper.c
@@ -8,6 +8,7 @@
#include "submodule.h"
#include "submodule-config.h"
#include "string-list.h"
+#include "run-command.h"
static const struct cache_entry **ce_entries;
static int ce_alloc, ce_used;
@@ -124,6 +125,160 @@ static int module_name(int argc, const char **argv, const char *prefix)
return 0;
}
+static int clone_submodule(const char *path, const char *gitdir, const char *url,
+ const char *depth, const char *reference, int quiet)
+{
+ struct child_process cp;
+ child_process_init(&cp);
+
+ argv_array_push(&cp.args, "clone");
+ argv_array_push(&cp.args, "--no-checkout");
+ if (quiet)
+ argv_array_push(&cp.args, "--quiet");
+ if (depth && strcmp(depth, "")) {
+ argv_array_push(&cp.args, "--depth");
+ argv_array_push(&cp.args, depth);
+ }
+ if (reference && strcmp(reference, "")) {
+ argv_array_push(&cp.args, "--reference");
+ argv_array_push(&cp.args, reference);
+ }
+ if (gitdir) {
+ argv_array_push(&cp.args, "--separate-git-dir");
+ argv_array_push(&cp.args, gitdir);
+ }
+ argv_array_push(&cp.args, url);
+ argv_array_push(&cp.args, path);
+
+ cp.git_cmd = 1;
+ cp.env = local_repo_env;
+
+ cp.no_stdin = 1;
+ cp.no_stdout = 1;
+ cp.no_stderr = 1;
+
+ return run_command(&cp);
+}
+
+/*
+ * Clone a submodule
+ *
+ * $1 = submodule path
+ * $2 = submodule name
+ * $3 = URL to clone
+ * $4 = reference repository to reuse (empty for independent)
+ * $5 = depth argument for shallow clones (empty for deep)
+ *
+ * Prior to calling, cmd_update checks that a possibly existing
+ * path is not a git repository.
+ * Likewise, cmd_add checks that path does not exist at all,
+ * since it is the location of a new submodule.
+ */
+static int module_clone(int argc, const char **argv, const char *prefix)
+{
+ const char *path = NULL, *name = NULL, *url = NULL, *reference = NULL, *depth = NULL;
+ int quiet = 0;
+ FILE *submodule_dot_git;
+ const char *sm_gitdir, *p;
+ struct strbuf rel_path = STRBUF_INIT;
+ struct strbuf sb = STRBUF_INIT;
+
+ struct option module_update_options[] = {
+ OPT_STRING(0, "prefix", &alternative_path,
+ N_("path"),
+ N_("alternative anchor for relative paths")),
+ OPT_STRING(0, "path", &path,
+ N_("path"),
+ N_("where the new submodule will be cloned to")),
+ OPT_STRING(0, "name", &name,
+ N_("string"),
+ N_("name of the new submodule")),
+ OPT_STRING(0, "url", &url,
+ N_("string"),
+ N_("url where to clone the submodule from")),
+ OPT_STRING(0, "reference", &reference,
+ N_("string"),
+ N_("reference repository")),
+ OPT_STRING(0, "depth", &depth,
+ N_("string"),
+ N_("depth for shallow clones")),
+ OPT_END()
+ };
+
+ static const char * const git_submodule_helper_usage[] = {
+ N_("git submodule--helper update [--prefix=<path>] [--quiet] [--remote] [-N|--no-fetch]"
+ "[-f|--force] [--rebase|--merge] [--reference <repository>]"
+ "[--depth <depth>] [--recursive] [--] [<path>...]"),
+ NULL
+ };
+
+ argc = parse_options(argc, argv, prefix, module_update_options,
+ git_submodule_helper_usage, 0);
+
+ if (getenv("GIT_QUIET"))
+ quiet = 1;
+
+ strbuf_addf(&sb, "%s/modules/%s", get_git_dir(), name);
+ sm_gitdir = strbuf_detach(&sb, NULL);
+
+ if (!file_exists(sm_gitdir)) {
+ safe_create_leading_directories_const(sm_gitdir);
+ if (clone_submodule(path, sm_gitdir, url, depth, reference, quiet))
+ die(N_("Clone of '%s' into submodule path '%s' failed"),
+ url, path);
+ } else {
+ safe_create_leading_directories_const(path);
+ unlink(sm_gitdir);
+ }
+
+ /* Write a .git file in the submodule to redirect to the superproject. */
+ if (alternative_path && !strcmp(alternative_path, "")) {
+ p = relative_path(path, alternative_path, &sb);
+ strbuf_reset(&sb);
+ } else
+ p = path;
+
+ if (safe_create_leading_directories_const(p) < 0)
+ die("Could not create directory '%s'", p);
+
+ strbuf_addf(&sb, "%s/.git", p);
+
+ if (safe_create_leading_directories_const(sb.buf) < 0)
+ die(_("could not create leading directories of '%s'"), sb.buf);
+ submodule_dot_git = fopen(sb.buf, "w");
+ if (!submodule_dot_git)
+ die ("Cannot open file '%s': %s", sb.buf, strerror(errno));
+
+ fprintf(submodule_dot_git, "gitdir: %s\n",
+ relative_path(sm_gitdir, path, &rel_path));
+ if (fclose(submodule_dot_git))
+ die("Could not close file %s", sb.buf);
+ strbuf_reset(&sb);
+
+ /* Redirect the worktree of the submodule in the superprojects config */
+ if (!is_absolute_path(sm_gitdir)) {
+ char *s = (char*)sm_gitdir;
+ if (strbuf_getcwd(&sb))
+ die_errno("unable to get current working directory");
+ strbuf_addf(&sb, "/%s", sm_gitdir);
+ sm_gitdir = strbuf_detach(&sb, NULL);
+ free(s);
+ }
+
+ if (strbuf_getcwd(&sb))
+ die_errno("unable to get current working directory");
+ strbuf_addf(&sb, "/%s", path);
+
+ p = git_pathdup_submodule(path, "config");
+ if (!p)
+ die("Could not get submodule directory for '%s'", path);
+ git_config_set_in_file(p, "core.worktree",
+ relative_path(sb.buf, sm_gitdir, &rel_path));
+ strbuf_release(&sb);
+ free((char*)sm_gitdir);
+ return 0;
+}
+
int cmd_submodule__helper(int argc, const char **argv, const char *prefix)
{
if (argc < 2)
@@ -135,6 +290,9 @@ int cmd_submodule__helper(int argc, const char **argv, const char *prefix)
if (!strcmp(argv[1], "module_name"))
return module_name(argc - 2, argv + 2, prefix);
+ if (!strcmp(argv[1], "module_clone"))
+ return module_clone(argc - 1, argv + 1, prefix);
+
usage:
- usage("git submodule--helper [module_list module_name]\n");
+ usage("git submodule--helper [module_list module_name module_clone]\n");
}
diff --git a/git-submodule.sh b/git-submodule.sh
index e6ff38d..fb5155e 100755
--- a/git-submodule.sh
+++ b/git-submodule.sh
@@ -178,80 +178,6 @@ get_submodule_config () {
printf '%s' "${value:-$default}"
}
-#
-# Clone a submodule
-#
-# $1 = submodule path
-# $2 = submodule name
-# $3 = URL to clone
-# $4 = reference repository to reuse (empty for independent)
-# $5 = depth argument for shallow clones (empty for deep)
-#
-# Prior to calling, cmd_update checks that a possibly existing
-# path is not a git repository.
-# Likewise, cmd_add checks that path does not exist at all,
-# since it is the location of a new submodule.
-#
-module_clone()
-{
- sm_path=$1
- name=$2
- url=$3
- reference="$4"
- depth="$5"
- quiet=
- if test -n "$GIT_QUIET"
- then
- quiet=-q
- fi
-
- gitdir=
- gitdir_base=
- base_name=$(dirname "$name")
-
- gitdir=$(git rev-parse --git-dir)
- gitdir_base="$gitdir/modules/$base_name"
- gitdir="$gitdir/modules/$name"
-
- if test -d "$gitdir"
- then
- mkdir -p "$sm_path"
- rm -f "$gitdir/index"
- else
- mkdir -p "$gitdir_base"
- (
- clear_local_git_env
- git clone $quiet ${depth:+"$depth"} -n ${reference:+"$reference"} \
- --separate-git-dir "$gitdir" "$url" "$sm_path"
- ) ||
- die "$(eval_gettext "Clone of '\$url' into submodule path '\$sm_path' failed")"
- fi
-
- # We already are at the root of the work tree but cd_to_toplevel will
- # resolve any symlinks that might be present in $PWD
- a=$(cd_to_toplevel && cd "$gitdir" && pwd)/
- b=$(cd_to_toplevel && cd "$sm_path" && pwd)/
- # Remove all common leading directories after a sanity check
- if test "${a#$b}" != "$a" || test "${b#$a}" != "$b"; then
- die "$(eval_gettext "Gitdir '\$a' is part of the submodule path '\$b' or vice versa")"
- fi
- while test "${a%%/*}" = "${b%%/*}"
- do
- a=${a#*/}
- b=${b#*/}
- done
- # Now chop off the trailing '/'s that were added in the beginning
- a=${a%/}
- b=${b%/}
-
- # Turn each leading "*/" component into "../"
- rel=$(printf '%s\n' "$b" | sed -e 's|[^/][^/]*|..|g')
- printf '%s\n' "gitdir: $rel/$a" >"$sm_path/.git"
-
- rel=$(printf '%s\n' "$a" | sed -e 's|[^/][^/]*|..|g')
- (clear_local_git_env; cd "$sm_path" && GIT_WORK_TREE=. git config core.worktree "$rel/$b")
-}
-
isnumber()
{
n=$(($1 + 0)) 2>/dev/null && test "$n" = "$1"
@@ -301,7 +227,7 @@ cmd_add()
shift
;;
--depth=*)
- depth=$1
+ depth="$1"
;;
--)
shift
@@ -412,7 +338,7 @@ Use -f if you really want to add it." >&2
echo "$(eval_gettext "Reactivating local git directory for submodule '\$sm_name'.")"
fi
fi
- module_clone "$sm_path" "$sm_name" "$realrepo" "$reference" "$depth" || exit
+ git submodule--helper module_clone --prefix "$wt_prefix" --path "$sm_path" --name "$sm_name" --url "$realrepo" "$reference" "$depth" || exit
(
clear_local_git_env
cd "$sm_path" &&
@@ -774,7 +700,7 @@ Maybe you want to use 'update --init'?")"
if ! test -d "$sm_path"/.git && ! test -f "$sm_path"/.git
then
- module_clone "$sm_path" "$name" "$url" "$reference" "$depth" || exit
+ git submodule--helper module_clone --prefix "$prefix" --path "$sm_path" --name "$name" --url "$url" "$reference" "$depth" || exit
cloned_modules="$cloned_modules;$name"
subsha1=
else
--
2.5.0.264.g784836d
^ permalink raw reply related [flat|nested] 12+ messages in thread
* Re: [PATCH 1/5] submodule: implement `module_clone` as a builtin helper
2015-08-27 0:52 ` [PATCH 1/5] submodule: implement `module_clone` as a builtin helper Stefan Beller
@ 2015-08-27 12:36 ` Johannes Schindelin
2015-08-27 21:57 ` Stefan Beller
0 siblings, 1 reply; 12+ messages in thread
From: Johannes Schindelin @ 2015-08-27 12:36 UTC (permalink / raw)
To: Stefan Beller; +Cc: git, peff, jrnieder, gitster
Hi Stefan,
thank you so much for doing this. `git submodule` is really, really slow on Windows...
On 2015-08-27 02:52, Stefan Beller wrote:
> diff --git a/builtin/submodule--helper.c b/builtin/submodule--helper.c
> index 4b32a3c..f11fb9c 100644
> --- a/builtin/submodule--helper.c
> +++ b/builtin/submodule--helper.c
> [...]
> +
> +/*
> + * Clone a submodule
> + *
> + * $1 = submodule path
> + * $2 = submodule name
> + * $3 = URL to clone
> + * $4 = reference repository to reuse (empty for independent)
> + * $5 = depth argument for shallow clones (empty for deep)
I think this description is now safely obsolete and can be deleted: you introduced explicit options like --depth to avoid the "magic" of positional parameters.
> +static int module_clone(int argc, const char **argv, const char *prefix)
> +{
> + const char *path = NULL, *name = NULL, *url = NULL, *reference =
> NULL, *depth = NULL;
This line is a little long ;-)
> @@ -135,6 +290,9 @@ int cmd_submodule__helper(int argc, const char
> **argv, const char *prefix)
> if (!strcmp(argv[1], "module_name"))
> return module_name(argc - 2, argv + 2, prefix);
>
> + if (!strcmp(argv[1], "module_clone"))
> + return module_clone(argc - 1, argv + 1, prefix);
> +
> usage:
> - usage("git submodule--helper [module_list module_name]\n");
> + usage("git submodule--helper [module_list module_name module_clone]\n");
Was the convention not to use ( ... | ... | ... )?
Thanks,
Dscho
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH 1/5] submodule: implement `module_clone` as a builtin helper
2015-08-27 12:36 ` Johannes Schindelin
@ 2015-08-27 21:57 ` Stefan Beller
0 siblings, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-08-27 21:57 UTC (permalink / raw)
To: Johannes Schindelin
Cc: git@vger.kernel.org, Jeff King, Jonathan Nieder, Junio C Hamano
On Thu, Aug 27, 2015 at 5:36 AM, Johannes Schindelin
<johannes.schindelin@gmx.de> wrote:
> Hi Stefan,
>
> thank you so much for doing this. `git submodule` is really, really slow on Windows...
>
> On 2015-08-27 02:52, Stefan Beller wrote:
>
>> diff --git a/builtin/submodule--helper.c b/builtin/submodule--helper.c
>> index 4b32a3c..f11fb9c 100644
>> --- a/builtin/submodule--helper.c
>> +++ b/builtin/submodule--helper.c
>> [...]
>> +
>> +/*
>> + * Clone a submodule
>> + *
>> + * $1 = submodule path
>> + * $2 = submodule name
>> + * $3 = URL to clone
>> + * $4 = reference repository to reuse (empty for independent)
>> + * $5 = depth argument for shallow clones (empty for deep)
>
> I think this description is now safely obsolete and can be deleted: you introduced explicit options like --depth to avoid the "magic" of positional parameters.
ok I'll remove that in a reroll.
>
>> +static int module_clone(int argc, const char **argv, const char *prefix)
>> +{
>> + const char *path = NULL, *name = NULL, *url = NULL, *reference =
>> NULL, *depth = NULL;
>
> This line is a little long ;-)
fixed in reroll.
>
>> @@ -135,6 +290,9 @@ int cmd_submodule__helper(int argc, const char
>> **argv, const char *prefix)
>> if (!strcmp(argv[1], "module_name"))
>> return module_name(argc - 2, argv + 2, prefix);
>>
>> + if (!strcmp(argv[1], "module_clone"))
>> + return module_clone(argc - 1, argv + 1, prefix);
>> +
>> usage:
>> - usage("git submodule--helper [module_list module_name]\n");
>> + usage("git submodule--helper [module_list module_name module_clone]\n");
>
> Was the convention not to use ( ... | ... | ... )?
git tag puts it as
usage: git tag [-a | -s | -u <key-id>]
I looked at other --helper (git-difftool--helper which has no help to
be invoced on the command line
and git-bisect--helper which doesn't have the problem as the help is clear).
So I'll go with [brackets | separated | by | pipes | inside ] then,
just as git-tag.
>
> Thanks,
> Dscho
^ permalink raw reply [flat|nested] 12+ messages in thread
* [PATCH 2/5] thread-utils: add a threaded task queue
2015-08-27 0:52 [RFC PATCH 0/5] Progressing with `git submodule foreach_parallel` Stefan Beller
2015-08-27 0:52 ` [PATCH 1/5] submodule: implement `module_clone` as a builtin helper Stefan Beller
@ 2015-08-27 0:52 ` Stefan Beller
2015-08-27 12:59 ` Johannes Schindelin
2015-08-27 0:52 ` [PATCH 3/5] submodule: helper to run foreach in parallel Stefan Beller
` (2 subsequent siblings)
4 siblings, 1 reply; 12+ messages in thread
From: Stefan Beller @ 2015-08-27 0:52 UTC (permalink / raw)
To: git; +Cc: peff, jrnieder, gitster, Stefan Beller
This adds functionality to do work in a parallel threaded
fashion while the boiler plate code for setting up threads
and tearing them down as well as queuing up tasks is hidden
behind the new API.
Signed-off-by: Stefan Beller <sbeller@google.com>
---
run-command.c | 29 +++++---
thread-utils.c | 227 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
thread-utils.h | 35 +++++++++
3 files changed, 279 insertions(+), 12 deletions(-)
diff --git a/run-command.c b/run-command.c
index 28e1d55..cb15cd9 100644
--- a/run-command.c
+++ b/run-command.c
@@ -668,6 +668,22 @@ int git_atexit(void (*handler)(void))
#endif
+void setup_main_thread(void)
+{
+ if (!main_thread_set) {
+ /*
+ * We assume that the first time that start_async is called
+ * it is from the main thread.
+ */
+ main_thread_set = 1;
+ main_thread = pthread_self();
+ pthread_key_create(&async_key, NULL);
+ pthread_key_create(&async_die_counter, NULL);
+ set_die_routine(die_async);
+ set_die_is_recursing_routine(async_die_is_recursing);
+ }
+}
+
int start_async(struct async *async)
{
int need_in, need_out;
@@ -740,18 +756,7 @@ int start_async(struct async *async)
else if (async->out)
close(async->out);
#else
- if (!main_thread_set) {
- /*
- * We assume that the first time that start_async is called
- * it is from the main thread.
- */
- main_thread_set = 1;
- main_thread = pthread_self();
- pthread_key_create(&async_key, NULL);
- pthread_key_create(&async_die_counter, NULL);
- set_die_routine(die_async);
- set_die_is_recursing_routine(async_die_is_recursing);
- }
+ setup_main_thread();
if (proc_in >= 0)
set_cloexec(proc_in);
diff --git a/thread-utils.c b/thread-utils.c
index a2135e0..b45ab92 100644
--- a/thread-utils.c
+++ b/thread-utils.c
@@ -1,5 +1,7 @@
#include "cache.h"
#include "thread-utils.h"
+#include "run-command.h"
+#include "git-compat-util.h"
#if defined(hpux) || defined(__hpux) || defined(_hpux)
# include <sys/pstat.h>
@@ -75,3 +77,228 @@ int init_recursive_mutex(pthread_mutex_t *m)
}
return ret;
}
+
+#ifndef NO_PTHREADS
+struct job_list {
+ int (*fct)(struct task_queue *tq, void *task);
+ void *task;
+ struct job_list *next;
+};
+
+static pthread_t main_thread;
+static int main_thread_set;
+static pthread_key_t async_key;
+static pthread_key_t async_die_counter;
+
+static NORETURN void die_async(const char *err, va_list params)
+{
+ vreportf("fatal: ", err, params);
+
+ if (!pthread_equal(main_thread, pthread_self()))
+ pthread_exit((void *)128);
+
+ exit(128);
+}
+
+static int async_die_is_recursing(void)
+{
+ void *ret = pthread_getspecific(async_die_counter);
+ pthread_setspecific(async_die_counter, (void *)1);
+ return ret != NULL;
+}
+
+/* FIXME: deduplicate this code with run-command.c */
+static void setup_main_thread(void)
+{
+ if (!main_thread_set) {
+ main_thread_set = 1;
+ main_thread = pthread_self();
+ pthread_key_create(&async_key, NULL);
+ pthread_key_create(&async_die_counter, NULL);
+ set_die_routine(die_async);
+ set_die_is_recursing_routine(async_die_is_recursing);
+ }
+}
+
+struct task_queue {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond_non_empty;
+
+ int queued_tasks;
+ struct job_list *first;
+ struct job_list *last;
+
+ pthread_t *threads;
+ unsigned max_threads;
+ unsigned max_tasks;
+
+ void (*finish_function)(struct task_queue *tq);
+ int early_return;
+};
+
+static void next_task(struct task_queue *tq,
+ int (**fct)(struct task_queue *tq, void *task),
+ void **task,
+ int *early_return)
+{
+ struct job_list *job = NULL;
+
+ pthread_mutex_lock(&tq->mutex);
+ while (tq->queued_tasks == 0)
+ pthread_cond_wait(&tq->cond_non_empty, &tq->mutex);
+
+ tq->early_return |= *early_return;
+
+ if (!tq->early_return) {
+ job = tq->first;
+ tq->first = job->next;
+ if (!tq->first)
+ tq->last = NULL;
+ tq->queued_tasks--;
+ }
+
+ pthread_mutex_unlock(&tq->mutex);
+
+ if (job) {
+ *fct = job->fct;
+ *task = job->task;
+ } else {
+ *fct = NULL;
+ *task = NULL;
+ }
+
+ free(job);
+}
+
+static void *dispatcher(void *args)
+{
+ void *task;
+ int (*fct)(struct task_queue *tq, void *task);
+ int early_return = 0;
+ struct task_queue *tq = args;
+
+ next_task(tq, &fct, &task, &early_return);
+ while (fct && !early_return) {
+ early_return = fct(tq, task);
+ next_task(tq, &fct, &task, &early_return);
+ }
+
+ if (tq->finish_function)
+ tq->finish_function(tq);
+
+ pthread_exit(0);
+}
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+ struct task_queue *tq = xmalloc(sizeof(*tq));
+
+ int i, ret;
+ if (!max_threads)
+ tq->max_threads = online_cpus();
+ else
+ tq->max_threads = max_threads;
+
+ pthread_mutex_init(&tq->mutex, NULL);
+ pthread_cond_init(&tq->cond_non_empty, NULL);
+
+ tq->threads = xmalloc(tq->max_threads * sizeof(pthread_t));
+
+ tq->queued_tasks = 0;
+ tq->first = NULL;
+ tq->last = NULL;
+
+ setup_main_thread();
+
+ for (i = 0; i < tq->max_threads; i++) {
+ ret = pthread_create(&tq->threads[i], 0, &dispatcher, tq);
+ if (ret)
+ die("unable to create thread: %s", strerror(ret));
+ }
+
+ tq->early_return = 0;
+
+ return tq;
+}
+
+void add_task(struct task_queue *tq,
+ int (*fct)(struct task_queue *tq, void *task),
+ void *task)
+{
+ struct job_list *job_list;
+
+ job_list = xmalloc(sizeof(*job_list));
+ job_list->task = task;
+ job_list->fct = fct;
+ job_list->next = NULL;
+
+ pthread_mutex_lock(&tq->mutex);
+
+ if (!tq->last) {
+ tq->last = job_list;
+ tq->first = tq->last;
+ } else {
+ tq->last->next = job_list;
+ tq->last = tq->last->next;
+ }
+ tq->queued_tasks++;
+
+ pthread_mutex_unlock(&tq->mutex);
+ pthread_cond_signal(&tq->cond_non_empty);
+}
+
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq))
+{
+ int ret;
+ int i;
+
+ tq->finish_function = fct;
+
+ for (i = 0; i < tq->max_threads; i++)
+ add_task(tq, NULL, NULL);
+
+ for (i = 0; i < tq->max_threads; i++)
+ pthread_join(tq->threads[i], 0);
+
+ pthread_mutex_destroy(&tq->mutex);
+ pthread_cond_destroy(&tq->cond_non_empty);
+
+ if (tq->first)
+ die("BUG: internal error with queuing jobs for threads");
+
+ free(tq->threads);
+ ret = tq->early_return;
+
+ free(tq);
+ return ret;
+}
+#else /* NO_PTHREADS */
+
+struct task_queue {
+ int early_return;
+};
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+ struct task_queue *tq = xmalloc(sizeof(*tq));
+
+ tq->early_return = 0;
+}
+
+void add_task(struct task_queue *tq,
+ int (*fct)(struct task_queue *tq, void *task),
+ void *task)
+{
+ if (tq->early_return)
+ return;
+
+ tq->early_return |= fct(tq, task);
+}
+
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq))
+{
+ int ret = tq->early_return;
+ free(tq);
+ return ret;
+}
+#endif
diff --git a/thread-utils.h b/thread-utils.h
index d9a769d..f41cfb1 100644
--- a/thread-utils.h
+++ b/thread-utils.h
@@ -12,4 +12,39 @@ extern int init_recursive_mutex(pthread_mutex_t*);
#define online_cpus() 1
#endif
+
+/*
+ * Creates a struct `task_queue`, which holds a list of tasks. Up to
+ * `max_threads` threads are active to process the enqueued tasks
+ * processing the tasks in a first in first out order.
+ *
+ * If `max_threads` is zero the number of cores available will be used.
+ *
+ * Currently this only works in environments with pthreads, in other
+ * environments, the task will be processed sequentially in `add_task`.
+ */
+struct task_queue *create_task_queue(unsigned max_threads);
+
+/*
+ * The function and data are put into the task queue.
+ *
+ * The function `fct` must not be NULL, as that's used internally
+ * in `finish_task_queue` to signal shutdown. If the return code
+ * of `fct` is unequal to 0, the tasks will stop eventually,
+ * the current parallel tasks will be flushed out.
+ */
+void add_task(struct task_queue *tq,
+ int (*fct)(struct task_queue *tq, void *task),
+ void *task);
+
+/*
+ * Waits for all tasks to be done and frees the object. The return code
+ * is zero if all enqueued tasks were processed.
+ *
+ * The function `fct` is called once in each thread after the last task
+ * for that thread was processed. If no thread local cleanup needs to be
+ * performed, pass NULL.
+ */
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue *tq));
+
#endif /* THREAD_COMPAT_H */
--
2.5.0.264.g784836d
^ permalink raw reply related [flat|nested] 12+ messages in thread
* Re: [PATCH 2/5] thread-utils: add a threaded task queue
2015-08-27 0:52 ` [PATCH 2/5] thread-utils: add a threaded task queue Stefan Beller
@ 2015-08-27 12:59 ` Johannes Schindelin
2015-08-27 17:02 ` Stefan Beller
2015-08-28 15:34 ` Junio C Hamano
0 siblings, 2 replies; 12+ messages in thread
From: Johannes Schindelin @ 2015-08-27 12:59 UTC (permalink / raw)
To: Stefan Beller; +Cc: git, peff, jrnieder, gitster
Hi Stefan,
On 2015-08-27 02:52, Stefan Beller wrote:
> diff --git a/run-command.c b/run-command.c
> index 28e1d55..cb15cd9 100644
> --- a/run-command.c
> +++ b/run-command.c
> @@ -668,6 +668,22 @@ int git_atexit(void (*handler)(void))
>
> #endif
>
> +void setup_main_thread(void)
> [...]
>
> diff --git a/thread-utils.c b/thread-utils.c
> index a2135e0..b45ab92 100644
> --- a/thread-utils.c
> +++ b/thread-utils.c
> [...]
> +/* FIXME: deduplicate this code with run-command.c */
> +static void setup_main_thread(void)
Do you remember off-hand why the code could not be moved to thread-utils.c wholesale? Just curious.
> +#else /* NO_PTHREADS */
> +
> +struct task_queue {
> + int early_return;
> +};
> +
> +struct task_queue *create_task_queue(unsigned max_threads)
> +{
> + struct task_queue *tq = xmalloc(sizeof(*tq));
> +
> + tq->early_return = 0;
> +}
> +
> +void add_task(struct task_queue *tq,
> + int (*fct)(struct task_queue *tq, void *task),
Might make sense to typedef this... Maybe task_t?
> + void *task)
> +{
> + if (tq->early_return)
> + return;
Ah, so "early_return" actually means "interrupted" or "canceled"?
I guess I will have to set aside some time to wrap my head around the way tasks are handled here, in particular how the two `early_return` variables (`dispatcher()`'s local variable and the field in the `task_queue`) interact.
Thanks!
Dscho
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH 2/5] thread-utils: add a threaded task queue
2015-08-27 12:59 ` Johannes Schindelin
@ 2015-08-27 17:02 ` Stefan Beller
2015-08-28 15:34 ` Junio C Hamano
1 sibling, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-08-27 17:02 UTC (permalink / raw)
To: Johannes Schindelin
Cc: git@vger.kernel.org, Jeff King, Jonathan Nieder, Junio C Hamano
On Thu, Aug 27, 2015 at 5:59 AM, Johannes Schindelin
<johannes.schindelin@gmx.de> wrote:
> Hi Stefan,
>
> On 2015-08-27 02:52, Stefan Beller wrote:
>
>> diff --git a/run-command.c b/run-command.c
>> index 28e1d55..cb15cd9 100644
>> --- a/run-command.c
>> +++ b/run-command.c
>> @@ -668,6 +668,22 @@ int git_atexit(void (*handler)(void))
>>
>> #endif
>>
>> +void setup_main_thread(void)
>> [...]
>>
>> diff --git a/thread-utils.c b/thread-utils.c
>> index a2135e0..b45ab92 100644
>> --- a/thread-utils.c
>> +++ b/thread-utils.c
>> [...]
>> +/* FIXME: deduplicate this code with run-command.c */
>> +static void setup_main_thread(void)
>
> Do you remember off-hand why the code could not be moved to thread-utils.c wholesale? Just curious.
The code in run-command has a few things regarding the struct async
handling in there,
which we don't need/want. I just realized there is some duplicate
code, but I couldn't cut
it clearly out.
>
>> +#else /* NO_PTHREADS */
>> +
>> +struct task_queue {
>> + int early_return;
>> +};
>> +
>> +struct task_queue *create_task_queue(unsigned max_threads)
>> +{
>> + struct task_queue *tq = xmalloc(sizeof(*tq));
>> +
>> + tq->early_return = 0;
>> +}
>> +
>> +void add_task(struct task_queue *tq,
>> + int (*fct)(struct task_queue *tq, void *task),
>
> Might make sense to typedef this... Maybe task_t?
>
>> + void *task)
>> +{
>> + if (tq->early_return)
>> + return;
>
> Ah, so "early_return" actually means "interrupted" or "canceled"?
The early_return is meant to return early in case of an error in some thread.
In the threaded version, the `dispatcher` is executed in each thread.
It gets its
new tasks via `next_task`, which takes the early_return value from the thread,
ORs it into the early_return of the task queue (which all threads have
access to).
So by the ORing into the task queues early_return the signal to abort
early is propagated to a place all threads have access to. And in case
that value is
set, the `next_task` will return NULL as an indication to cleanup and
pthread_exit.
>
> I guess I will have to set aside some time to wrap my head around the way tasks are handled here, in particular how the two `early_return` variables (`dispatcher()`'s local variable and the field in the `task_queue`) interact.
>
> Thanks!
> Dscho
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH 2/5] thread-utils: add a threaded task queue
2015-08-27 12:59 ` Johannes Schindelin
2015-08-27 17:02 ` Stefan Beller
@ 2015-08-28 15:34 ` Junio C Hamano
1 sibling, 0 replies; 12+ messages in thread
From: Junio C Hamano @ 2015-08-28 15:34 UTC (permalink / raw)
To: Johannes Schindelin; +Cc: Stefan Beller, git, peff, jrnieder
Johannes Schindelin <johannes.schindelin@gmx.de> writes:
>> +void add_task(struct task_queue *tq,
>> + int (*fct)(struct task_queue *tq, void *task),
>
> Might make sense to typedef this... Maybe task_t?
Let's not introduce user defined type that ends with _t that is seen
globally.
>> + void *task)
>> +{
>> + if (tq->early_return)
>> + return;
>
> Ah, so "early_return" actually means "interrupted" or "canceled"?
>
> I guess I will have to set aside some time to wrap my head around the
> way tasks are handled here, in particular how the two `early_return`
> variables (`dispatcher()`'s local variable and the field in the
> task_queue`) interact.
We had a very similar conversation in $gmane/276324 as the
early-return and get_task interaction was not quite intuitive.
I thought Stefan said something about this part of the logic being
unreadable and needs rework. Perhaps that will come in the next
reroll, or something?
I tend to agree with you that interrupted or cancelled would be a
good name for this thing; at least it would help understanding what
is going on than "early-return".
Thanks.
^ permalink raw reply [flat|nested] 12+ messages in thread
* [PATCH 3/5] submodule: helper to run foreach in parallel
2015-08-27 0:52 [RFC PATCH 0/5] Progressing with `git submodule foreach_parallel` Stefan Beller
2015-08-27 0:52 ` [PATCH 1/5] submodule: implement `module_clone` as a builtin helper Stefan Beller
2015-08-27 0:52 ` [PATCH 2/5] thread-utils: add a threaded task queue Stefan Beller
@ 2015-08-27 0:52 ` Stefan Beller
2015-08-27 0:52 ` [PATCH 4/5] index-pack: Use the new worker pool Stefan Beller
2015-08-27 0:52 ` [PATCH 5/5] pack-objects: Use " Stefan Beller
4 siblings, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-08-27 0:52 UTC (permalink / raw)
To: git; +Cc: peff, jrnieder, gitster, Stefan Beller
Similar to `git submodule foreach` the new command
`git submodule foreach_parallel` will run a command
on each submodule.
The commands are run in parallel up to the number of
cores by default, or you can specify '-j 4' tun just
run with 4 threads for example.
One major difference to `git submodule foreach` is the
handling of input and output to the commands. Because
of the parallel nature of the execution it is not trivial
how to schedule the std{in,out,err} channel for submodule
the command is run in. So in this patch there is no
support for stdin.
The goal of the output for std{out, err} is to look like
the single threaded version as much as possible, so
stdout and stderr from one submodule operation are
buffered together in one single channel and output
together when the output is allowed.
To do that, we'll have a mutex for the output, which
each thread will try to acquire and directly pipe their
output to the standard output if they are lucky to
get the mutex.
If they do not have the mutex each thread will buffer
their output.
Example:
Let's assume we have 5 submodules A,B,C,D,E and the
operation on each submodule takes a different amount
of time (say `git fetch`), then the output of
`git submodule foreach` might look like this:
time -->
output: |---A---| |-B-| |----C-----------| |-D-| |-E-|
When we schedule these threads into two threads, a schedule
and sample output over time may look like this:
thread 1: |---A---| |-D-| |-E-|
thread 2: |-B-| |----C-----------|
output: |---A---| B |----C-------| E D
So A will be perceived as it would run normally in
the single threaded version of foreach. As B has finished
by the time the mutex becomes available, the whole buffer
will just be dumped into the standard output. This will be
perceived by the user as just a 'very fast' operation of B.
Once that is done, C takes the mutex, and flushes the piled
up buffer to standard output. In case the subcommand is a
git command, we have a progress display, which will just
look like the first half of C happend really quickly.
Notice how E and D are put out in a different order than the
original as the new parallel foreach doesn't care about the
order.
So this way of output is really good for human consumption
and not for machine consumption as you always see the progress,
but it is not easy to tell which output comes from which
command as there is no indication other than displaying
"Entering <submodule path>" for each beginning section of
output.
Maybe we want to integrate the unthreaded foreach eventually
into the new code base in C and have special cases for that,
such as accepting stdin again.
Signed-off-by: Stefan Beller <sbeller@google.com>
---
builtin/submodule--helper.c | 160 +++++++++++++++++++++++++++++++++++++++++++-
git-submodule.sh | 11 ++-
2 files changed, 168 insertions(+), 3 deletions(-)
diff --git a/builtin/submodule--helper.c b/builtin/submodule--helper.c
index f11fb9c..2c06f28 100644
--- a/builtin/submodule--helper.c
+++ b/builtin/submodule--helper.c
@@ -8,8 +8,11 @@
#include "submodule.h"
#include "submodule-config.h"
#include "string-list.h"
+#include "thread-utils.h"
#include "run-command.h"
-
+#ifndef NO_PTHREADS
+#include <semaphore.h>
+#endif
static const struct cache_entry **ce_entries;
static int ce_alloc, ce_used;
static const char *alternative_path;
@@ -279,6 +282,155 @@ static int module_clone(int argc, const char **argv, const char *prefix)
return 0;
}
+struct submodule_args {
+ const char *name;
+ const char *path;
+ const char *sha1;
+ const char *toplevel;
+ const char *prefix;
+ const char **cmd;
+ pthread_mutex_t *mutex;
+};
+
+int run_cmd_submodule(struct task_queue *aq, void *task)
+{
+ int i, lock_acquired = 0;
+ struct submodule_args *args = task;
+ struct strbuf out = STRBUF_INIT;
+ struct strbuf sb = STRBUF_INIT;
+ struct child_process *cp = xmalloc(sizeof(*cp));
+ char buf[1024];
+
+ strbuf_addf(&out, N_("Entering %s\n"), relative_path(args->path, args->prefix, &sb));
+
+ child_process_init(cp);
+ argv_array_pushv(&cp->args, args->cmd);
+
+ argv_array_pushf(&cp->env_array, "name=%s", args->name);
+ argv_array_pushf(&cp->env_array, "path=%s", args->path);
+ argv_array_pushf(&cp->env_array, "sha1=%s", args->sha1);
+ argv_array_pushf(&cp->env_array, "toplevel=%s", args->toplevel);
+
+ for (i = 0; local_repo_env[i]; i++)
+ argv_array_push(&cp->env_array, local_repo_env[i]);
+
+ cp->no_stdin = 1;
+ cp->out = 0;
+ cp->err = -1;
+ cp->dir = args->path;
+ cp->stdout_to_stderr = 1;
+ cp->use_shell = 1;
+
+ if (start_command(cp)) {
+ die("Could not start command");
+ for (i = 0; cp->args.argv; i++)
+ fprintf(stderr, "%s\n", cp->args.argv[i]);
+ }
+
+ while (1) {
+ ssize_t len = xread(cp->err, buf, sizeof(buf));
+ if (len < 0)
+ die("Read from child failed");
+ else if (len == 0)
+ break;
+ else {
+ strbuf_add(&out, buf, len);
+ }
+ if (!pthread_mutex_trylock(args->mutex))
+ lock_acquired = 1;
+ if (lock_acquired) {
+ fputs(out.buf, stderr);
+ strbuf_reset(&out);
+ }
+ }
+ if (finish_command(cp))
+ die("command died with error");
+
+ if (!lock_acquired)
+ pthread_mutex_lock(args->mutex);
+
+ fputs(out.buf, stderr);
+ pthread_mutex_unlock(args->mutex);
+
+ return 0;
+}
+
+int module_foreach_parallel(int argc, const char **argv, const char *prefix)
+{
+ int i, recursive = 0, number_threads = 0, quiet = 0;
+ static struct pathspec pathspec;
+ struct strbuf sb = STRBUF_INIT;
+ struct task_queue *aq;
+ char **cmd;
+ const char **nullargv = {NULL};
+ pthread_mutex_t mutex;
+
+ struct option module_update_options[] = {
+ OPT_STRING(0, "prefix", &alternative_path,
+ N_("path"),
+ N_("alternative anchor for relative paths")),
+ OPT_STRING(0, "cmd", &cmd,
+ N_("string"),
+ N_("command to run")),
+ OPT_BOOL('r', "--recursive", &recursive,
+ N_("Recurse into nexted submodules")),
+ OPT_INTEGER('j', "jobs", &number_threads,
+ N_("Recurse into nexted submodules")),
+ OPT__QUIET(&quiet, N_("Suppress output")),
+ OPT_END()
+ };
+
+ static const char * const git_submodule_helper_usage[] = {
+ N_("git submodule--helper foreach [--prefix=<path>] [<path>...]"),
+ NULL
+ };
+
+ argc = parse_options(argc, argv, prefix, module_update_options,
+ git_submodule_helper_usage, 0);
+
+ if (module_list_compute(0, nullargv, NULL, &pathspec) < 0)
+ return 1;
+
+ gitmodules_config();
+
+ pthread_mutex_init(&mutex, NULL);
+ aq = create_task_queue(number_threads);
+
+ for (i = 0; i < ce_used; i++) {
+ const struct submodule *sub;
+ const struct cache_entry *ce = ce_entries[i];
+ struct submodule_args *args = malloc(sizeof(*args));
+
+ if (ce_stage(ce))
+ args->sha1 = xstrdup(sha1_to_hex(null_sha1));
+ else
+ args->sha1 = xstrdup(sha1_to_hex(ce->sha1));
+
+ strbuf_reset(&sb);
+ strbuf_addf(&sb, "%s/.git", ce->name);
+ if (!file_exists(sb.buf)) {
+ free(args);
+ continue;
+ }
+
+ args->path = ce->name;
+ sub = submodule_from_path(null_sha1, args->path);
+ if (!sub)
+ die("No submodule mapping found in .gitmodules for path '%s'", args->path);
+
+ args->name = sub->name;
+ args->toplevel = xgetcwd();
+ args->cmd = argv;
+ args->mutex = &mutex;
+ args->prefix = alternative_path;
+ add_task(aq, run_cmd_submodule, args);
+ }
+
+ finish_task_queue(aq, NULL);
+ pthread_mutex_destroy(&mutex);
+ return 0;
+}
+
int cmd_submodule__helper(int argc, const char **argv, const char *prefix)
{
if (argc < 2)
@@ -293,6 +445,10 @@ int cmd_submodule__helper(int argc, const char **argv, const char *prefix)
if (!strcmp(argv[1], "module_clone"))
return module_clone(argc - 1, argv + 1, prefix);
+ if (!strcmp(argv[1], "foreach_parallel"))
+ return module_foreach_parallel(argc - 1, argv + 1, prefix);
+
usage:
- usage("git submodule--helper [module_list module_name module_clone]\n");
+ fprintf(stderr, "%s", argv[1]);
+ usage("git submodule--helper [module_list module_name module_clone foreach_parallel]\n");
}
diff --git a/git-submodule.sh b/git-submodule.sh
index fb5155e..f06488a 100755
--- a/git-submodule.sh
+++ b/git-submodule.sh
@@ -431,6 +431,15 @@ cmd_foreach()
}
#
+# Execute an arbitrary command sequence in each checked out
+# submodule in parallel.
+#
+cmd_foreach_parallel()
+{
+ git submodule--helper foreach_parallel --prefix "$wt_prefix" $@
+}
+
+#
# Register submodules in .git/config
#
# $@ = requested paths (default to all)
@@ -1225,7 +1234,7 @@ cmd_sync()
while test $# != 0 && test -z "$command"
do
case "$1" in
- add | foreach | init | deinit | update | status | summary | sync)
+ add | foreach | foreach_parallel | init | deinit | update | status | summary | sync)
command=$1
;;
-q|--quiet)
--
2.5.0.264.g784836d
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH 4/5] index-pack: Use the new worker pool
2015-08-27 0:52 [RFC PATCH 0/5] Progressing with `git submodule foreach_parallel` Stefan Beller
` (2 preceding siblings ...)
2015-08-27 0:52 ` [PATCH 3/5] submodule: helper to run foreach in parallel Stefan Beller
@ 2015-08-27 0:52 ` Stefan Beller
2015-08-27 0:52 ` [PATCH 5/5] pack-objects: Use " Stefan Beller
4 siblings, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-08-27 0:52 UTC (permalink / raw)
To: git; +Cc: peff, jrnieder, gitster, Stefan Beller
This demonstrates how the new threading API may be used.
There is no change in the workflow, just using the new
threading API instead of keeping track of the pthreads
ourselves.
Signed-off-by: Stefan Beller <sbeller@google.com>
---
builtin/index-pack.c | 23 ++++++++++++-----------
1 file changed, 12 insertions(+), 11 deletions(-)
diff --git a/builtin/index-pack.c b/builtin/index-pack.c
index 3f10840..187b281 100644
--- a/builtin/index-pack.c
+++ b/builtin/index-pack.c
@@ -1075,7 +1075,7 @@ static void resolve_base(struct object_entry *obj)
}
#ifndef NO_PTHREADS
-static void *threaded_second_pass(void *data)
+static int threaded_second_pass(struct task_queue *tq, void *data)
{
set_thread_data(data);
for (;;) {
@@ -1096,7 +1096,7 @@ static void *threaded_second_pass(void *data)
resolve_base(&objects[i]);
}
- return NULL;
+ return 0;
}
#endif
@@ -1195,18 +1195,19 @@ static void resolve_deltas(void)
nr_ref_deltas + nr_ofs_deltas);
#ifndef NO_PTHREADS
- nr_dispatched = 0;
+
if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
+ struct task_queue *tq;
+ nr_dispatched = 0;
init_thread();
- for (i = 0; i < nr_threads; i++) {
- int ret = pthread_create(&thread_data[i].thread, NULL,
- threaded_second_pass, thread_data + i);
- if (ret)
- die(_("unable to create thread: %s"),
- strerror(ret));
- }
+
+ tq = create_task_queue(nr_threads);
for (i = 0; i < nr_threads; i++)
- pthread_join(thread_data[i].thread, NULL);
+ add_task(tq, threaded_second_pass, thread_data + i);
+
+ if (finish_task_queue(tq, NULL))
+ die("Not all threads have finished");
+
cleanup_thread();
return;
}
--
2.5.0.264.g784836d
^ permalink raw reply related [flat|nested] 12+ messages in thread
* [PATCH 5/5] pack-objects: Use new worker pool
2015-08-27 0:52 [RFC PATCH 0/5] Progressing with `git submodule foreach_parallel` Stefan Beller
` (3 preceding siblings ...)
2015-08-27 0:52 ` [PATCH 4/5] index-pack: Use the new worker pool Stefan Beller
@ 2015-08-27 0:52 ` Stefan Beller
4 siblings, 0 replies; 12+ messages in thread
From: Stefan Beller @ 2015-08-27 0:52 UTC (permalink / raw)
To: git; +Cc: peff, jrnieder, gitster, Stefan Beller
Before we had <n> threads doing the delta finding work, and the main thread
was load balancing the threads, i.e. moving work from a thread with a large
amount left to an idle thread whenever such a situation arose.
This moves the load balancing to the threads themselves. As soon as one
thread is done working it will look at its peer threads and will pickup
half the work load from the thread with the largest pending load.
By having the load balancing as part of the threads, the locking and
communication model becomes easier, such that we don't need so many
mutexes any more.
It also demonstrates the usage of the new threading pool being easily
applied in different situations.
Signed-off-by: Stefan Beller <sbeller@google.com>
---
builtin/pack-objects.c | 175 ++++++++++++++++---------------------------------
1 file changed, 57 insertions(+), 118 deletions(-)
diff --git a/builtin/pack-objects.c b/builtin/pack-objects.c
index 62cc16d..f46d2df 100644
--- a/builtin/pack-objects.c
+++ b/builtin/pack-objects.c
@@ -17,6 +17,7 @@
#include "pack-objects.h"
#include "progress.h"
#include "refs.h"
+#include "run-command.h"
#include "streaming.h"
#include "thread-utils.h"
#include "pack-bitmap.h"
@@ -1887,26 +1888,12 @@ static void try_to_free_from_threads(size_t size)
static try_to_free_t old_try_to_free_routine;
-/*
- * The main thread waits on the condition that (at least) one of the workers
- * has stopped working (which is indicated in the .working member of
- * struct thread_params).
- * When a work thread has completed its work, it sets .working to 0 and
- * signals the main thread and waits on the condition that .data_ready
- * becomes 1.
- */
-
struct thread_params {
- pthread_t thread;
struct object_entry **list;
unsigned list_size;
unsigned remaining;
int window;
int depth;
- int working;
- int data_ready;
- pthread_mutex_t mutex;
- pthread_cond_t cond;
unsigned *processed;
};
@@ -1933,7 +1920,52 @@ static void cleanup_threaded_search(void)
pthread_mutex_destroy(&progress_mutex);
}
-static void *threaded_find_deltas(void *arg)
+static struct thread_params *p;
+
+static void threaded_split_largest_workload(struct thread_params *target)
+{
+ int i;
+
+ struct object_entry **list;
+ struct thread_params *victim = NULL;
+ unsigned sub_size = 0;
+
+ /* Find a victim */
+ progress_lock();
+ for (i = 0; i < delta_search_threads; i++)
+ if (p[i].remaining > 2*window &&
+ (!victim || victim->remaining < p[i].remaining))
+ victim = &p[i];
+
+ if (victim) {
+ sub_size = victim->remaining / 2;
+ list = victim->list + victim->list_size - sub_size;
+ while (sub_size && list[0]->hash &&
+ list[0]->hash == list[-1]->hash) {
+ list++;
+ sub_size--;
+ }
+ if (!sub_size) {
+ /*
+ * It is possible for some "paths" to have
+ * so many objects that no hash boundary
+ * might be found. Let's just steal the
+ * exact half in that case.
+ */
+ sub_size = victim->remaining / 2;
+ list -= sub_size;
+ }
+ victim->list_size -= sub_size;
+ victim->remaining -= sub_size;
+
+ target->list = list;
+ target->list_size = sub_size;
+ target->remaining = sub_size;
+ }
+ progress_unlock();
+}
+
+static int threaded_find_deltas(struct task_queue *tq, void *arg)
{
struct thread_params *me = arg;
@@ -1941,34 +1973,17 @@ static void *threaded_find_deltas(void *arg)
find_deltas(me->list, &me->remaining,
me->window, me->depth, me->processed);
- progress_lock();
- me->working = 0;
- pthread_cond_signal(&progress_cond);
- progress_unlock();
-
- /*
- * We must not set ->data_ready before we wait on the
- * condition because the main thread may have set it to 1
- * before we get here. In order to be sure that new
- * work is available if we see 1 in ->data_ready, it
- * was initialized to 0 before this thread was spawned
- * and we reset it to 0 right away.
- */
- pthread_mutex_lock(&me->mutex);
- while (!me->data_ready)
- pthread_cond_wait(&me->cond, &me->mutex);
- me->data_ready = 0;
- pthread_mutex_unlock(&me->mutex);
+ threaded_split_largest_workload(me);
}
- /* leave ->working 1 so that this doesn't get more work assigned */
- return NULL;
+
+ return 0;
}
static void ll_find_deltas(struct object_entry **list, unsigned list_size,
int window, int depth, unsigned *processed)
{
- struct thread_params *p;
- int i, ret, active_threads = 0;
+ struct task_queue *tq;
+ int i;
init_threaded_search();
@@ -1980,8 +1995,11 @@ static void ll_find_deltas(struct object_entry **list, unsigned list_size,
if (progress > pack_to_stdout)
fprintf(stderr, "Delta compression using up to %d threads.\n",
delta_search_threads);
+
p = xcalloc(delta_search_threads, sizeof(*p));
+ tq = create_task_queue(delta_search_threads);
+
/* Partition the work amongst work threads. */
for (i = 0; i < delta_search_threads; i++) {
unsigned sub_size = list_size / (delta_search_threads - i);
@@ -1993,8 +2011,6 @@ static void ll_find_deltas(struct object_entry **list, unsigned list_size,
p[i].window = window;
p[i].depth = depth;
p[i].processed = processed;
- p[i].working = 1;
- p[i].data_ready = 0;
/* try to split chunks on "path" boundaries */
while (sub_size && sub_size < list_size &&
@@ -2008,87 +2024,10 @@ static void ll_find_deltas(struct object_entry **list, unsigned list_size,
list += sub_size;
list_size -= sub_size;
+ add_task(tq, threaded_find_deltas, &p[i]);
}
- /* Start work threads. */
- for (i = 0; i < delta_search_threads; i++) {
- if (!p[i].list_size)
- continue;
- pthread_mutex_init(&p[i].mutex, NULL);
- pthread_cond_init(&p[i].cond, NULL);
- ret = pthread_create(&p[i].thread, NULL,
- threaded_find_deltas, &p[i]);
- if (ret)
- die("unable to create thread: %s", strerror(ret));
- active_threads++;
- }
-
- /*
- * Now let's wait for work completion. Each time a thread is done
- * with its work, we steal half of the remaining work from the
- * thread with the largest number of unprocessed objects and give
- * it to that newly idle thread. This ensure good load balancing
- * until the remaining object list segments are simply too short
- * to be worth splitting anymore.
- */
- while (active_threads) {
- struct thread_params *target = NULL;
- struct thread_params *victim = NULL;
- unsigned sub_size = 0;
-
- progress_lock();
- for (;;) {
- for (i = 0; !target && i < delta_search_threads; i++)
- if (!p[i].working)
- target = &p[i];
- if (target)
- break;
- pthread_cond_wait(&progress_cond, &progress_mutex);
- }
-
- for (i = 0; i < delta_search_threads; i++)
- if (p[i].remaining > 2*window &&
- (!victim || victim->remaining < p[i].remaining))
- victim = &p[i];
- if (victim) {
- sub_size = victim->remaining / 2;
- list = victim->list + victim->list_size - sub_size;
- while (sub_size && list[0]->hash &&
- list[0]->hash == list[-1]->hash) {
- list++;
- sub_size--;
- }
- if (!sub_size) {
- /*
- * It is possible for some "paths" to have
- * so many objects that no hash boundary
- * might be found. Let's just steal the
- * exact half in that case.
- */
- sub_size = victim->remaining / 2;
- list -= sub_size;
- }
- target->list = list;
- victim->list_size -= sub_size;
- victim->remaining -= sub_size;
- }
- target->list_size = sub_size;
- target->remaining = sub_size;
- target->working = 1;
- progress_unlock();
-
- pthread_mutex_lock(&target->mutex);
- target->data_ready = 1;
- pthread_cond_signal(&target->cond);
- pthread_mutex_unlock(&target->mutex);
-
- if (!sub_size) {
- pthread_join(target->thread, NULL);
- pthread_cond_destroy(&target->cond);
- pthread_mutex_destroy(&target->mutex);
- active_threads--;
- }
- }
+ finish_task_queue(tq, NULL);
cleanup_threaded_search();
free(p);
}
--
2.5.0.264.g784836d
^ permalink raw reply related [flat|nested] 12+ messages in thread