From: Stefan Beller <sbeller@google.com>
To: git@vger.kernel.org
Cc: ramsay@ramsayjones.plus.com, jacob.keller@gmail.com,
peff@peff.net, gitster@pobox.com, jrnieder@gmail.com,
johannes.schindelin@gmail.com, Jens.Lehmann@web.de,
vlovich@gmail.com, sunshine@sunshineco.com,
Stefan Beller <sbeller@google.com>
Subject: [PATCHv4 00/14] fetch submodules in parallel and a preview on parallel "submodule update"
Date: Tue, 22 Sep 2015 18:45:18 -0700 [thread overview]
Message-ID: <1442972732-12118-1-git-send-email-sbeller@google.com> (raw)
Today there was lots of discussion on the correct way of reading the
strbufs as well as some discussion on the structure of the
asynchronous parallel process loop.
Patches 1-8 bring parallel fetching of submodules, and have had some good exposure
to review and feedback is incorporated.
Patches 9-14 bring parallel submodule updates.
Patch 14 is not ready yet (i.e. test suite failures), but the cleanups before
in patch 9-13 can be reviewed without wasting time.
Any feedback welcome,
Thanks,
Stefan
Diff to v3 below. The patches can also be found at [1]
[1] https://github.com/stefanbeller/git/tree/submodulec_nonthreaded_parallel_4
Jonathan Nieder (1):
submodule: Send "Fetching submodule <foo>" to standard error
Stefan Beller (13):
xread: poll on non blocking fds
xread_nonblock: add functionality to read from fds without blocking
strbuf: add strbuf_read_once to read without blocking
run-command: factor out return value computation
run-command: add an asynchronous parallel child processor
fetch_populated_submodules: use new parallel job processing
submodules: allow parallel fetching, add tests and documentation
submodule-config: Untangle logic in parse_config
submodule config: keep update strategy around
git submodule update: cmd_update_recursive
git submodule update: cmd_update_clone
git submodule update: cmd_update_fetch
Rewrite submodule update in C
Documentation/fetch-options.txt | 7 +
builtin/fetch.c | 6 +-
builtin/pull.c | 6 +
builtin/submodule--helper.c | 251 +++++++++++++++++++++++++++++
git-compat-util.h | 1 +
git-submodule.sh | 339 ++++++++++++++--------------------------
run-command.c | 320 ++++++++++++++++++++++++++++++++++---
run-command.h | 36 +++++
strbuf.c | 11 ++
strbuf.h | 9 ++
submodule-config.c | 85 +++++-----
submodule-config.h | 1 +
submodule.c | 120 +++++++++-----
submodule.h | 2 +-
t/t0061-run-command.sh | 20 +++
t/t5526-fetch-submodules.sh | 70 ++++++---
test-run-command.c | 24 +++
wrapper.c | 35 ++++-
18 files changed, 987 insertions(+), 356 deletions(-)
diff --git a/builtin/submodule--helper.c b/builtin/submodule--helper.c
index baa7563..b79117a 100644
--- a/builtin/submodule--helper.c
+++ b/builtin/submodule--helper.c
@@ -382,10 +382,14 @@ static int update_next_task(void *data,
argv_array_pushf(&cp->env_array, "sm_path=%s", sub->path);
argv_array_pushf(&cp->env_array, "name=%s", sub->name);
argv_array_pushf(&cp->env_array, "url=%s", sub->url);
+ argv_array_pushf(&cp->env_array, "sha1=%s", sha1_to_hex(ce->sha1));
argv_array_pushf(&cp->env_array, "update_module=%s", update_module);
cp->git_cmd = 1;
+ cp->no_stdin = 1;
cp->stdout_to_stderr = 1;
+ cp->err = -1;
+ argv_array_init(&cp->args);
argv_array_push(&cp->args, "submodule");
if (!file_exists(sm_gitdir))
argv_array_push(&cp->args, "update_clone");
diff --git a/run-command.c b/run-command.c
index 06d5a5d..494e1f8 100644
--- a/run-command.c
+++ b/run-command.c
@@ -276,8 +276,10 @@ static int wait_or_whine(pid_t pid, const char *argv0)
failed_errno = errno;
error("waitpid for %s failed: %s", argv0, strerror(errno));
} else {
- if (waiting != pid
- || (determine_return_value(status, &code, &failed_errno, argv0) < 0))
+ if (waiting != pid || (determine_return_value(status,
+ &code,
+ &failed_errno,
+ argv0) < 0))
error("waitpid is confused (%s)", argv0);
}
@@ -870,7 +872,6 @@ struct parallel_processes {
int max_processes;
int nr_processes;
- unsigned all_tasks_started : 1;
get_next_task_fn get_next_task;
start_failure_fn start_failure;
@@ -899,9 +900,9 @@ void default_start_failure(void *data,
struct strbuf sb = STRBUF_INIT;
for (i = 0; cp->argv[i]; i++)
- strbuf_addf(&sb, "%s ", cp->argv[i]);
+ strbuf_addf(&sb, " %s", cp->argv[i]);
- die_errno("Starting a child failed:\n%s", sb.buf);
+ die_errno("Starting a child failed:%s", sb.buf);
}
void default_return_value(void *data,
@@ -915,12 +916,12 @@ void default_return_value(void *data,
return;
for (i = 0; cp->argv[i]; i++)
- strbuf_addf(&sb, "%s ", cp->argv[i]);
+ strbuf_addf(&sb, " %s", cp->argv[i]);
- die_errno("A child failed with return code:\n%s\n%d", sb.buf, result);
+ die_errno("A child failed with return code %d:%s", result, sb.buf);
}
-static void run_processes_parallel_init(struct parallel_processes *pp,
+static void pp_init(struct parallel_processes *pp,
int n, void *data,
get_next_task_fn get_next_task,
start_failure_fn start_failure,
@@ -941,7 +942,6 @@ static void run_processes_parallel_init(struct parallel_processes *pp,
pp->return_value = return_value ? return_value : default_return_value;
pp->nr_processes = 0;
- pp->all_tasks_started = 0;
pp->output_owner = 0;
pp->children = xcalloc(n, sizeof(*pp->children));
pp->pfd = xcalloc(n, sizeof(*pp->pfd));
@@ -954,9 +954,10 @@ static void run_processes_parallel_init(struct parallel_processes *pp,
}
}
-static void run_processes_parallel_cleanup(struct parallel_processes *pp)
+static void pp_cleanup(struct parallel_processes *pp)
{
int i;
+
for (i = 0; i < pp->max_processes; i++)
strbuf_release(&pp->children[i].err);
@@ -976,7 +977,8 @@ static void set_nonblocking(int fd)
"output will be degraded");
}
-static void run_processes_parallel_start_one(struct parallel_processes *pp)
+/* returns 1 if a process was started, 0 otherwise */
+static int pp_start_one(struct parallel_processes *pp)
{
int i;
@@ -988,10 +990,9 @@ static void run_processes_parallel_start_one(struct parallel_processes *pp)
if (!pp->get_next_task(pp->data,
&pp->children[i].process,
- &pp->children[i].err)) {
- pp->all_tasks_started = 1;
- return;
- }
+ &pp->children[i].err))
+ return 1;
+
if (start_command(&pp->children[i].process))
pp->start_failure(pp->data,
&pp->children[i].process,
@@ -1002,23 +1003,17 @@ static void run_processes_parallel_start_one(struct parallel_processes *pp)
pp->nr_processes++;
pp->children[i].in_use = 1;
pp->pfd[i].fd = pp->children[i].process.err;
+ return 0;
}
-static void run_processes_parallel_start_as_needed(struct parallel_processes *pp)
-{
- while (pp->nr_processes < pp->max_processes &&
- !pp->all_tasks_started)
- run_processes_parallel_start_one(pp);
-}
-
-static void run_processes_parallel_buffer_stderr(struct parallel_processes *pp)
+static void pp_buffer_stderr(struct parallel_processes *pp)
{
int i;
while ((i = poll(pp->pfd, pp->max_processes, 100)) < 0) {
if (errno == EINTR)
continue;
- run_processes_parallel_cleanup(pp);
+ pp_cleanup(pp);
die_errno("poll");
}
@@ -1033,7 +1028,7 @@ static void run_processes_parallel_buffer_stderr(struct parallel_processes *pp)
}
}
-static void run_processes_parallel_output(struct parallel_processes *pp)
+static void pp_output(struct parallel_processes *pp)
{
int i = pp->output_owner;
if (pp->children[i].in_use &&
@@ -1043,7 +1038,7 @@ static void run_processes_parallel_output(struct parallel_processes *pp)
}
}
-static void run_processes_parallel_collect_finished(struct parallel_processes *pp)
+static void pp_collect_finished(struct parallel_processes *pp)
{
int i = 0;
pid_t pid;
@@ -1063,17 +1058,11 @@ static void run_processes_parallel_collect_finished(struct parallel_processes *p
pid == pp->children[i].process.pid)
break;
if (i == pp->max_processes)
- /*
- * waitpid returned another process id
- * which we are not waiting for.
- */
- return;
-
- if (strbuf_read_once(&pp->children[i].err,
- pp->children[i].process.err, 0) < 0 &&
- errno != EAGAIN)
- die_errno("strbuf_read_once");
+ die("BUG: found a child process we were not aware of");
+ if (strbuf_read(&pp->children[i].err,
+ pp->children[i].process.err, 0) < 0)
+ die_errno("strbuf_read");
if (determine_return_value(wait_status, &code, &errno,
pp->children[i].process.argv[0]) < 0)
@@ -1122,18 +1111,20 @@ int run_processes_parallel(int n, void *data,
return_value_fn return_value)
{
struct parallel_processes pp;
- run_processes_parallel_init(&pp, n, data,
- get_next_task,
- start_failure,
- return_value);
-
- while (!pp.all_tasks_started || pp.nr_processes > 0) {
- run_processes_parallel_start_as_needed(&pp);
- run_processes_parallel_buffer_stderr(&pp);
- run_processes_parallel_output(&pp);
- run_processes_parallel_collect_finished(&pp);
+ pp_init(&pp, n, data, get_next_task, start_failure, return_value);
+
+ while (1) {
+ while (pp.nr_processes < pp.max_processes &&
+ !pp_start_one(&pp))
+ ; /* nothing */
+ if (!pp.nr_processes)
+ break;
+ pp_buffer_stderr(&pp);
+ pp_output(&pp);
+ pp_collect_finished(&pp);
}
- run_processes_parallel_cleanup(&pp);
+
+ pp_cleanup(&pp);
return 0;
}
diff --git a/run-command.h b/run-command.h
index 0c1b363..3807fd1 100644
--- a/run-command.h
+++ b/run-command.h
@@ -155,6 +155,4 @@ int run_processes_parallel(int n, void *data,
start_failure_fn,
return_value_fn);
-void run_processes_parallel_schedule_error(struct strbuf *err);
-
#endif
diff --git a/strbuf.h b/strbuf.h
index 4d4e5b1..ea69665 100644
--- a/strbuf.h
+++ b/strbuf.h
@@ -367,8 +367,11 @@ extern size_t strbuf_fread(struct strbuf *, size_t, FILE *);
extern ssize_t strbuf_read(struct strbuf *, int fd, size_t hint);
/**
- * Same as strbuf_read, just returns non-blockingly by ignoring EAGAIN.
- * The fd must have set O_NONBLOCK.
+ * Read from a file descriptor that is marked as O_NONBLOCK without
+ * blocking. Returns the number of new bytes appended to the sb.
+ * Negative return value signals there was an error returned from
+ * underlying read(2), in which case the caller should check errno.
+ * e.g. errno == EAGAIN when the read may have blocked.
*/
extern ssize_t strbuf_read_once(struct strbuf *, int fd, size_t hint);
diff --git a/submodule-config.c b/submodule-config.c
index 0298a60..8b8c7d1 100644
--- a/submodule-config.c
+++ b/submodule-config.c
@@ -258,93 +258,72 @@ static int parse_config(const char *var, const char *value, void *data)
if (!name_and_item_from_var(var, &name, &item))
return 0;
- submodule = lookup_or_create_by_name(me->cache, me->gitmodules_sha1,
- name.buf);
+ submodule = lookup_or_create_by_name(me->cache,
+ me->gitmodules_sha1,
+ name.buf);
if (!strcmp(item.buf, "path")) {
- struct strbuf path = STRBUF_INIT;
- if (!value) {
+ if (!value)
ret = config_error_nonbool(var);
- goto release_return;
- }
- if (!me->overwrite && submodule->path != NULL) {
+ else if (!me->overwrite && submodule->path != NULL)
warn_multiple_config(me->commit_sha1, submodule->name,
"path");
- goto release_return;
+ else {
+ if (submodule->path)
+ cache_remove_path(me->cache, submodule);
+ free((void *) submodule->path);
+ submodule->path = xstrdup(value);
+ cache_put_path(me->cache, submodule);
}
-
- if (submodule->path)
- cache_remove_path(me->cache, submodule);
- free((void *) submodule->path);
- strbuf_addstr(&path, value);
- submodule->path = strbuf_detach(&path, NULL);
- cache_put_path(me->cache, submodule);
} else if (!strcmp(item.buf, "fetchrecursesubmodules")) {
/* when parsing worktree configurations we can die early */
int die_on_error = is_null_sha1(me->gitmodules_sha1);
if (!me->overwrite &&
- submodule->fetch_recurse != RECURSE_SUBMODULES_NONE) {
+ submodule->fetch_recurse != RECURSE_SUBMODULES_NONE)
warn_multiple_config(me->commit_sha1, submodule->name,
"fetchrecursesubmodules");
- goto release_return;
- }
-
- submodule->fetch_recurse = parse_fetch_recurse(var, value,
+ else
+ submodule->fetch_recurse = parse_fetch_recurse(
+ var, value,
die_on_error);
} else if (!strcmp(item.buf, "ignore")) {
- struct strbuf ignore = STRBUF_INIT;
- if (!me->overwrite && submodule->ignore != NULL) {
+ if (!value)
+ ret = config_error_nonbool(var);
+ else if (!me->overwrite && submodule->ignore != NULL)
warn_multiple_config(me->commit_sha1, submodule->name,
"ignore");
- goto release_return;
- }
- if (!value) {
- ret = config_error_nonbool(var);
- goto release_return;
- }
- if (strcmp(value, "untracked") && strcmp(value, "dirty") &&
- strcmp(value, "all") && strcmp(value, "none")) {
+ else if (strcmp(value, "untracked") &&
+ strcmp(value, "dirty") &&
+ strcmp(value, "all") &&
+ strcmp(value, "none"))
warning("Invalid parameter '%s' for config option "
"'submodule.%s.ignore'", value, var);
- goto release_return;
+ else {
+ free((void *) submodule->ignore);
+ submodule->ignore = xstrdup(value);
}
-
- free((void *) submodule->ignore);
- strbuf_addstr(&ignore, value);
- submodule->ignore = strbuf_detach(&ignore, NULL);
} else if (!strcmp(item.buf, "url")) {
- struct strbuf url = STRBUF_INIT;
if (!value) {
ret = config_error_nonbool(var);
- goto release_return;
- }
- if (!me->overwrite && submodule->url != NULL) {
+ } else if (!me->overwrite && submodule->url != NULL) {
warn_multiple_config(me->commit_sha1, submodule->name,
"url");
- goto release_return;
+ } else {
+ free((void *) submodule->url);
+ submodule->url = xstrdup(value);
}
-
- free((void *) submodule->url);
- strbuf_addstr(&url, value);
- submodule->url = strbuf_detach(&url, NULL);
} else if (!strcmp(item.buf, "update")) {
- struct strbuf update = STRBUF_INIT;
- if (!value) {
+ if (!value)
ret = config_error_nonbool(var);
- goto release_return;
- }
- if (!me->overwrite && submodule->update != NULL) {
+ else if (!me->overwrite && submodule->update != NULL)
warn_multiple_config(me->commit_sha1, submodule->name,
- "update");
- goto release_return;
+ "update");
+ else {
+ free((void *)submodule->update);
+ submodule->update = xstrdup(value);
}
-
- free((void *) submodule->update);
- strbuf_addstr(&update, value);
- submodule->update = strbuf_detach(&update, NULL);
}
-release_return:
strbuf_release(&name);
strbuf_release(&item);
diff --git a/submodule.c b/submodule.c
index d15364f..fdaf3e4 100644
--- a/submodule.c
+++ b/submodule.c
@@ -650,10 +650,12 @@ int fetch_populated_submodules(const struct argv_array *options,
{
int i;
struct submodule_parallel_fetch spf = SPF_INIT;
+
spf.work_tree = get_git_work_tree();
spf.command_line_option = command_line_option;
spf.quiet = quiet;
spf.prefix = prefix;
+
if (!spf.work_tree)
goto out;
@@ -738,12 +740,11 @@ int get_next_submodule(void *data, struct child_process *cp,
if (is_directory(git_dir)) {
child_process_init(cp);
cp->dir = strbuf_detach(&submodule_path, NULL);
+ cp->env = local_repo_env;
cp->git_cmd = 1;
- cp->no_stdout = 1;
cp->no_stdin = 1;
cp->stdout_to_stderr = 1;
cp->err = -1;
- cp->env = local_repo_env;
if (!spf->quiet)
strbuf_addf(err, "Fetching submodule %s%s\n",
spf->prefix, ce->name);
diff --git a/wrapper.c b/wrapper.c
index 54ce231..41a21e1 100644
--- a/wrapper.c
+++ b/wrapper.c
@@ -206,16 +206,10 @@ ssize_t xread(int fd, void *buf, size_t len)
continue;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
struct pollfd pfd;
- int i;
pfd.events = POLLIN;
pfd.fd = fd;
- i = poll(&pfd, 1, 100);
- if (i < 0) {
- if (errno == EINTR || errno == ENOMEM)
- continue;
- else
- die_errno("poll");
- }
+ /* We deliberately ignore the return value */
+ poll(&pfd, 1, -1);
}
}
return nr;
@@ -225,13 +219,13 @@ ssize_t xread(int fd, void *buf, size_t len)
/*
* xread_nonblock() is the same a read(), but it automatically restarts read()
* interrupted operations (EINTR). xread_nonblock() DOES NOT GUARANTEE that
- * "len" bytes is read even if the data is available.
+ * "len" bytes is read. EWOULDBLOCK is turned into EAGAIN.
*/
ssize_t xread_nonblock(int fd, void *buf, size_t len)
{
ssize_t nr;
if (len > MAX_IO_SIZE)
- len = MAX_IO_SIZE;
+ len = MAX_IO_SIZE;
while (1) {
nr = read(fd, buf, len);
if (nr < 0) {
--
2.5.0.272.ga84127c.dirty
next reply other threads:[~2015-09-23 1:45 UTC|newest]
Thread overview: 39+ messages / expand[flat|nested] mbox.gz Atom feed top
2015-09-23 1:45 Stefan Beller [this message]
2015-09-23 1:45 ` [PATCHv4 01/14] submodule: Send "Fetching submodule <foo>" to standard error Stefan Beller
2015-09-23 1:45 ` [PATCHv4 02/14] xread: poll on non blocking fds Stefan Beller
2015-09-23 1:45 ` [PATCHv4 03/14] xread_nonblock: add functionality to read from fds without blocking Stefan Beller
2015-09-23 1:45 ` [PATCHv4 04/14] strbuf: add strbuf_read_once to read " Stefan Beller
2015-09-23 1:45 ` [PATCHv4 05/14] run-command: factor out return value computation Stefan Beller
2015-09-23 1:45 ` [PATCHv4 06/14] run-command: add an asynchronous parallel child processor Stefan Beller
2015-09-23 6:29 ` Junio C Hamano
2015-09-23 17:53 ` Stefan Beller
2015-09-23 18:04 ` Junio C Hamano
2015-09-23 19:34 ` Junio C Hamano
2015-09-23 19:39 ` Stefan Beller
2015-09-23 19:47 ` Junio C Hamano
2015-09-23 6:47 ` Junio C Hamano
2015-09-23 14:59 ` Junio C Hamano
2015-09-23 17:54 ` Junio C Hamano
2015-09-23 23:41 ` [PATCHv5] Another squash on " Stefan Beller
2015-09-24 2:17 ` Junio C Hamano
2015-09-24 21:13 ` [PATCH 0/2] " Stefan Beller
2015-09-24 21:13 ` [PATCH 2/2] SQUASH for "fetch_populated_submodules: use new parallel job processing" Stefan Beller
2015-09-24 21:13 ` [PATCH 1/2] SQUASH??? Stefan Beller
2015-09-25 0:49 ` Junio C Hamano
2015-09-25 1:09 ` Junio C Hamano
2015-09-25 17:52 ` Stefan Beller
2015-09-25 17:56 ` Junio C Hamano
2015-09-25 1:08 ` [PATCH 0/2] Another squash on run-command: add an asynchronous parallel child processor Junio C Hamano
2015-09-25 18:56 ` Stefan Beller
2015-09-25 19:04 ` Junio C Hamano
2015-09-25 19:19 ` Stefan Beller
2015-09-25 19:32 ` Junio C Hamano
2015-09-23 1:45 ` [PATCHv4 07/14] fetch_populated_submodules: use new parallel job processing Stefan Beller
2015-09-23 1:45 ` [PATCHv4 08/14] submodules: allow parallel fetching, add tests and documentation Stefan Beller
2015-09-23 1:45 ` [PATCHv4 09/14] submodule-config: Untangle logic in parse_config Stefan Beller
2015-09-23 1:45 ` [PATCHv4 10/14] submodule config: keep update strategy around Stefan Beller
2015-09-23 1:45 ` [PATCHv4 11/14] git submodule update: cmd_update_recursive Stefan Beller
2015-09-23 1:45 ` [PATCHv4 12/14] git submodule update: cmd_update_clone Stefan Beller
2015-09-23 20:13 ` Junio C Hamano
2015-09-23 1:45 ` [PATCHv4 13/14] git submodule update: cmd_update_fetch Stefan Beller
2015-09-23 1:45 ` [PATCHv4 14/14] Rewrite submodule update in C Stefan Beller
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1442972732-12118-1-git-send-email-sbeller@google.com \
--to=sbeller@google.com \
--cc=Jens.Lehmann@web.de \
--cc=git@vger.kernel.org \
--cc=gitster@pobox.com \
--cc=jacob.keller@gmail.com \
--cc=johannes.schindelin@gmail.com \
--cc=jrnieder@gmail.com \
--cc=peff@peff.net \
--cc=ramsay@ramsayjones.plus.com \
--cc=sunshine@sunshineco.com \
--cc=vlovich@gmail.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).