* [RFC PATCHv1 0/2] Parallel git submodule fetching @ 2015-09-11 23:09 Stefan Beller 2015-09-11 23:09 ` [PATCH 1/2] Sending "Fetching submodule <foo>" output to stderr Stefan Beller 2015-09-11 23:09 ` [PATCH 2/2] fetch: fetch submodules in parallel Stefan Beller 0 siblings, 2 replies; 13+ messages in thread From: Stefan Beller @ 2015-09-11 23:09 UTC (permalink / raw) To: git Cc: peff, gitster, jrnieder, johannes.schindelin, Jens.Lehmann, vlovich, Stefan Beller This is a new approach to making git submodules faster without using threads. We'll directly run commands in parallel without an intermediate thread pool. patch 1 is fixing an error, which was hard to come by in a parallel world, but is not much of a problem in a single-childed world. The second patch works fine, (I'd love to hear feedback on the percieved change of output) though it has still two edges I want to tackle: * The fcntl call to make the pipes nonblocking is not checked for errors. * We also need to communicate back if one process cannot spawn or such, i.e. the reporting for all subprocesses combined needs to added. I think about adding another callback which is called to finish up a child process. Earlier attempts used a threading pool, which made the conversion very easy as you don't need to touch lots of existing code as you would just replace the run_command(..) call by an add_task(..). The threading pool would abstract away the whole parallelism and its problems. It would also have the backpressure to the main program, add_task would just not return if there was no place left in the queue. [1] http://www.spinics.net/lists/git/msg258435.html Any feedback welcome! Thanks, Stefan Jonathan Nieder (1): Sending "Fetching submodule <foo>" output to stderr Stefan Beller (1): fetch: fetch submodules in parallel Documentation/fetch-options.txt | 7 ++ builtin/fetch.c | 6 +- builtin/pull.c | 6 ++ run-command.c | 144 ++++++++++++++++++++++++++++++++++++++++ run-command.h | 29 ++++++++ strbuf.c | 31 +++++++++ strbuf.h | 1 + submodule.c | 99 +++++++++++++++++---------- submodule.h | 2 +- t/t0061-run-command.sh | 16 +++++ t/t5526-fetch-submodules.sh | 70 ++++++++++++------- test-run-command.c | 23 +++++++ 12 files changed, 373 insertions(+), 61 deletions(-) -- 2.6.0.rc0.131.gf624c3d ^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCH 1/2] Sending "Fetching submodule <foo>" output to stderr 2015-09-11 23:09 [RFC PATCHv1 0/2] Parallel git submodule fetching Stefan Beller @ 2015-09-11 23:09 ` Stefan Beller 2015-09-11 23:09 ` [PATCH 2/2] fetch: fetch submodules in parallel Stefan Beller 1 sibling, 0 replies; 13+ messages in thread From: Stefan Beller @ 2015-09-11 23:09 UTC (permalink / raw) To: git Cc: peff, gitster, jrnieder, johannes.schindelin, Jens.Lehmann, vlovich, Stefan Beller From: Jonathan Nieder <jrnieder@gmail.com> The "Pushing submodule <foo>" progress output correctly goes to stderr, but "Fetching submodule <foo>" is going to stdout by mistake. Fix it to write to stderr. Noticed while trying to implement a parallel submodule fetch. When this particular output line went to a different file descriptor, it was buffered separately, resulting in wrongly interleaved output if we copied it to the terminal naively. Signed-off-by: Jonathan Nieder <jrnieder@gmail.com> Signed-off-by: Stefan Beller <sbeller@google.com> --- submodule.c | 2 +- t/t5526-fetch-submodules.sh | 51 +++++++++++++++++++++++---------------------- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/submodule.c b/submodule.c index 9fcc86f..1d64e57 100644 --- a/submodule.c +++ b/submodule.c @@ -694,7 +694,7 @@ int fetch_populated_submodules(const struct argv_array *options, git_dir = submodule_git_dir.buf; if (is_directory(git_dir)) { if (!quiet) - printf("Fetching submodule %s%s\n", prefix, ce->name); + fprintf(stderr, "Fetching submodule %s%s\n", prefix, ce->name); cp.dir = submodule_path.buf; argv_array_push(&argv, default_argv); argv_array_push(&argv, "--submodule-prefix"); diff --git a/t/t5526-fetch-submodules.sh b/t/t5526-fetch-submodules.sh index a4532b0..17759b1 100755 --- a/t/t5526-fetch-submodules.sh +++ b/t/t5526-fetch-submodules.sh @@ -16,7 +16,8 @@ add_upstream_commit() { git add subfile && git commit -m new subfile && head2=$(git rev-parse --short HEAD) && - echo "From $pwd/submodule" > ../expect.err && + echo "Fetching submodule submodule" > ../expect.err && + echo "From $pwd/submodule" >> ../expect.err && echo " $head1..$head2 master -> origin/master" >> ../expect.err ) && ( @@ -27,6 +28,7 @@ add_upstream_commit() { git add deepsubfile && git commit -m new deepsubfile && head2=$(git rev-parse --short HEAD) && + echo "Fetching submodule submodule/subdir/deepsubmodule" >> ../expect.err echo "From $pwd/deepsubmodule" >> ../expect.err && echo " $head1..$head2 master -> origin/master" >> ../expect.err ) @@ -56,9 +58,7 @@ test_expect_success setup ' ( cd downstream && git submodule update --init --recursive - ) && - echo "Fetching submodule submodule" > expect.out && - echo "Fetching submodule submodule/subdir/deepsubmodule" >> expect.out + ) ' test_expect_success "fetch --recurse-submodules recurses into submodules" ' @@ -67,7 +67,7 @@ test_expect_success "fetch --recurse-submodules recurses into submodules" ' cd downstream && git fetch --recurse-submodules >../actual.out 2>../actual.err ) && - test_i18ncmp expect.out actual.out && + test_must_be_empty actual.out && test_i18ncmp expect.err actual.err ' @@ -96,7 +96,7 @@ test_expect_success "using fetchRecurseSubmodules=true in .gitmodules recurses i git config -f .gitmodules submodule.submodule.fetchRecurseSubmodules true && git fetch >../actual.out 2>../actual.err ) && - test_i18ncmp expect.out actual.out && + test_must_be_empty actual.out && test_i18ncmp expect.err actual.err ' @@ -127,7 +127,7 @@ test_expect_success "--recurse-submodules overrides fetchRecurseSubmodules setti git config --unset -f .gitmodules submodule.submodule.fetchRecurseSubmodules && git config --unset submodule.submodule.fetchRecurseSubmodules ) && - test_i18ncmp expect.out actual.out && + test_must_be_empty actual.out && test_i18ncmp expect.err actual.err ' @@ -146,7 +146,7 @@ test_expect_success "--dry-run propagates to submodules" ' cd downstream && git fetch --recurse-submodules --dry-run >../actual.out 2>../actual.err ) && - test_i18ncmp expect.out actual.out && + test_must_be_empty actual.out && test_i18ncmp expect.err actual.err ' @@ -155,7 +155,7 @@ test_expect_success "Without --dry-run propagates to submodules" ' cd downstream && git fetch --recurse-submodules >../actual.out 2>../actual.err ) && - test_i18ncmp expect.out actual.out && + test_must_be_empty actual.out && test_i18ncmp expect.err actual.err ' @@ -166,7 +166,7 @@ test_expect_success "recurseSubmodules=true propagates into submodules" ' git config fetch.recurseSubmodules true git fetch >../actual.out 2>../actual.err ) && - test_i18ncmp expect.out actual.out && + test_must_be_empty actual.out && test_i18ncmp expect.err actual.err ' @@ -180,7 +180,7 @@ test_expect_success "--recurse-submodules overrides config in submodule" ' ) && git fetch --recurse-submodules >../actual.out 2>../actual.err ) && - test_i18ncmp expect.out actual.out && + test_must_be_empty actual.out && test_i18ncmp expect.err actual.err ' @@ -214,16 +214,15 @@ test_expect_success "Recursion stops when no new submodule commits are fetched" git add submodule && git commit -m "new submodule" && head2=$(git rev-parse --short HEAD) && - echo "Fetching submodule submodule" > expect.out.sub && echo "From $pwd/." > expect.err.sub && echo " $head1..$head2 master -> origin/master" >>expect.err.sub && - head -2 expect.err >> expect.err.sub && + head -3 expect.err >> expect.err.sub && ( cd downstream && git fetch >../actual.out 2>../actual.err ) && test_i18ncmp expect.err.sub actual.err && - test_i18ncmp expect.out.sub actual.out + test_must_be_empty actual.out ' test_expect_success "Recursion doesn't happen when new superproject commits don't change any submodules" ' @@ -269,7 +268,7 @@ test_expect_success "Recursion picks up config in submodule" ' ) ) && test_i18ncmp expect.err.sub actual.err && - test_i18ncmp expect.out actual.out + test_must_be_empty actual.out ' test_expect_success "Recursion picks up all submodules when necessary" ' @@ -285,7 +284,8 @@ test_expect_success "Recursion picks up all submodules when necessary" ' git add subdir/deepsubmodule && git commit -m "new deepsubmodule" head2=$(git rev-parse --short HEAD) && - echo "From $pwd/submodule" > ../expect.err.sub && + echo "Fetching submodule submodule" > ../expect.err.sub && + echo "From $pwd/submodule" >> ../expect.err.sub && echo " $head1..$head2 master -> origin/master" >> ../expect.err.sub ) && head1=$(git rev-parse --short HEAD) && @@ -295,13 +295,13 @@ test_expect_success "Recursion picks up all submodules when necessary" ' echo "From $pwd/." > expect.err.2 && echo " $head1..$head2 master -> origin/master" >> expect.err.2 && cat expect.err.sub >> expect.err.2 && - tail -2 expect.err >> expect.err.2 && + tail -3 expect.err >> expect.err.2 && ( cd downstream && git fetch >../actual.out 2>../actual.err ) && test_i18ncmp expect.err.2 actual.err && - test_i18ncmp expect.out actual.out + test_must_be_empty actual.out ' test_expect_success "'--recurse-submodules=on-demand' doesn't recurse when no new commits are fetched in the superproject (and ignores config)" ' @@ -317,7 +317,8 @@ test_expect_success "'--recurse-submodules=on-demand' doesn't recurse when no ne git add subdir/deepsubmodule && git commit -m "new deepsubmodule" && head2=$(git rev-parse --short HEAD) && - echo "From $pwd/submodule" > ../expect.err.sub && + echo Fetching submodule submodule > ../expect.err.sub && + echo "From $pwd/submodule" >> ../expect.err.sub && echo " $head1..$head2 master -> origin/master" >> ../expect.err.sub ) && ( @@ -335,7 +336,7 @@ test_expect_success "'--recurse-submodules=on-demand' recurses as deep as necess git add submodule && git commit -m "new submodule" && head2=$(git rev-parse --short HEAD) && - tail -2 expect.err > expect.err.deepsub && + tail -3 expect.err > expect.err.deepsub && echo "From $pwd/." > expect.err && echo " $head1..$head2 master -> origin/master" >>expect.err && cat expect.err.sub >> expect.err && @@ -354,7 +355,7 @@ test_expect_success "'--recurse-submodules=on-demand' recurses as deep as necess git config --unset -f .gitmodules submodule.subdir/deepsubmodule.fetchRecursive ) ) && - test_i18ncmp expect.out actual.out && + test_must_be_empty actual.out && test_i18ncmp expect.err actual.err ' @@ -388,7 +389,7 @@ test_expect_success "'fetch.recurseSubmodules=on-demand' overrides global config head2=$(git rev-parse --short HEAD) && echo "From $pwd/." > expect.err.2 && echo " $head1..$head2 master -> origin/master" >>expect.err.2 && - head -2 expect.err >> expect.err.2 && + head -3 expect.err >> expect.err.2 && ( cd downstream && git config fetch.recurseSubmodules on-demand && @@ -399,7 +400,7 @@ test_expect_success "'fetch.recurseSubmodules=on-demand' overrides global config cd downstream && git config --unset fetch.recurseSubmodules ) && - test_i18ncmp expect.out.sub actual.out && + test_must_be_empty actual.out && test_i18ncmp expect.err.2 actual.err ' @@ -416,7 +417,7 @@ test_expect_success "'submodule.<sub>.fetchRecurseSubmodules=on-demand' override head2=$(git rev-parse --short HEAD) && echo "From $pwd/." > expect.err.2 && echo " $head1..$head2 master -> origin/master" >>expect.err.2 && - head -2 expect.err >> expect.err.2 && + head -3 expect.err >> expect.err.2 && ( cd downstream && git config submodule.submodule.fetchRecurseSubmodules on-demand && @@ -427,7 +428,7 @@ test_expect_success "'submodule.<sub>.fetchRecurseSubmodules=on-demand' override cd downstream && git config --unset submodule.submodule.fetchRecurseSubmodules ) && - test_i18ncmp expect.out.sub actual.out && + test_must_be_empty actual.out && test_i18ncmp expect.err.2 actual.err ' -- 2.6.0.rc0.131.gf624c3d ^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH 2/2] fetch: fetch submodules in parallel 2015-09-11 23:09 [RFC PATCHv1 0/2] Parallel git submodule fetching Stefan Beller 2015-09-11 23:09 ` [PATCH 1/2] Sending "Fetching submodule <foo>" output to stderr Stefan Beller @ 2015-09-11 23:09 ` Stefan Beller 2015-09-12 19:11 ` Junio C Hamano 1 sibling, 1 reply; 13+ messages in thread From: Stefan Beller @ 2015-09-11 23:09 UTC (permalink / raw) To: git Cc: peff, gitster, jrnieder, johannes.schindelin, Jens.Lehmann, vlovich, Stefan Beller If we run external commands in parallel we cannot pipe the output directly to the our stdout/err as it would mix up. So each process's output will flow through a pipe, which we buffer. One subprocess can be directly piped to out stdout/err for a low latency feedback to the user. Example: Let's assume we have 5 submodules A,B,C,D,E and each fetch takes a different amount of time as the different submodules vary in size, then the output of fetches in sequential order might look like this: time --> output: |---A---| |-B-| |----C-----------| |-D-| |-E-| When we schedule these submodules into maximal two parallel processes, a schedule and sample output over time may look like this: thread 1: |---A---| |-D-| |-E-| thread 2: |-B-| |----C-----------| output: |---A---|B|------C-------|DE So A will be perceived as it would run normally in the single child version. As B has finished by the time A is done, we can dump its whole progress buffer on stderr, such that it looks like it finished in no time. Once that is done, C is determined to be the visible child and its progress will be reported in real time. So this way of output is really good for human consumption, as it only changes the timing, not the actual output. For machine consumption the output needs to be prepared in the tasks, by either having a prefix per line or per block to indicate whose tasks output is displayed. Signed-off-by: Stefan Beller <sbeller@google.com> --- Documentation/fetch-options.txt | 7 ++ builtin/fetch.c | 6 +- builtin/pull.c | 6 ++ run-command.c | 144 ++++++++++++++++++++++++++++++++++++++++ run-command.h | 29 ++++++++ strbuf.c | 31 +++++++++ strbuf.h | 1 + submodule.c | 99 +++++++++++++++++---------- submodule.h | 2 +- t/t0061-run-command.sh | 16 +++++ t/t5526-fetch-submodules.sh | 19 ++++++ test-run-command.c | 23 +++++++ 12 files changed, 347 insertions(+), 36 deletions(-) diff --git a/Documentation/fetch-options.txt b/Documentation/fetch-options.txt index 45583d8..e2a59c3 100644 --- a/Documentation/fetch-options.txt +++ b/Documentation/fetch-options.txt @@ -100,6 +100,13 @@ ifndef::git-pull[] reference to a commit that isn't already in the local submodule clone. +-j:: +--jobs=<n>:: + Number of parallel children to be used for fetching submodules. + Each will fetch from different submodules, such that fetching many + submodules will be faster. By default submodules will be fetched + one at a time + --no-recurse-submodules:: Disable recursive fetching of submodules (this has the same effect as using the '--recurse-submodules=no' option). diff --git a/builtin/fetch.c b/builtin/fetch.c index ee1f1a9..09ff837 100644 --- a/builtin/fetch.c +++ b/builtin/fetch.c @@ -37,6 +37,7 @@ static int prune = -1; /* unspecified */ static int all, append, dry_run, force, keep, multiple, update_head_ok, verbosity; static int progress = -1, recurse_submodules = RECURSE_SUBMODULES_DEFAULT; static int tags = TAGS_DEFAULT, unshallow, update_shallow; +static int max_children = 1; static const char *depth; static const char *upload_pack; static struct strbuf default_rla = STRBUF_INIT; @@ -99,6 +100,8 @@ static struct option builtin_fetch_options[] = { N_("fetch all tags and associated objects"), TAGS_SET), OPT_SET_INT('n', NULL, &tags, N_("do not fetch all tags (--no-tags)"), TAGS_UNSET), + OPT_INTEGER('j', "jobs", &max_children, + N_("number of threads used for fetching")), OPT_BOOL('p', "prune", &prune, N_("prune remote-tracking branches no longer on remote")), { OPTION_CALLBACK, 0, "recurse-submodules", NULL, N_("on-demand"), @@ -1217,7 +1220,8 @@ int cmd_fetch(int argc, const char **argv, const char *prefix) result = fetch_populated_submodules(&options, submodule_prefix, recurse_submodules, - verbosity < 0); + verbosity < 0, + max_children); argv_array_clear(&options); } diff --git a/builtin/pull.c b/builtin/pull.c index 722a83c..fbbda67 100644 --- a/builtin/pull.c +++ b/builtin/pull.c @@ -94,6 +94,7 @@ static int opt_force; static char *opt_tags; static char *opt_prune; static char *opt_recurse_submodules; +static char *max_children; static int opt_dry_run; static char *opt_keep; static char *opt_depth; @@ -177,6 +178,9 @@ static struct option pull_options[] = { N_("on-demand"), N_("control recursive fetching of submodules"), PARSE_OPT_OPTARG), + OPT_PASSTHRU('j', "jobs", &max_children, N_("n"), + N_("number of threads used for fetching submodules"), + PARSE_OPT_OPTARG), OPT_BOOL(0, "dry-run", &opt_dry_run, N_("dry run")), OPT_PASSTHRU('k', "keep", &opt_keep, NULL, @@ -524,6 +528,8 @@ static int run_fetch(const char *repo, const char **refspecs) argv_array_push(&args, opt_prune); if (opt_recurse_submodules) argv_array_push(&args, opt_recurse_submodules); + if (max_children) + argv_array_push(&args, max_children); if (opt_dry_run) argv_array_push(&args, "--dry-run"); if (opt_keep) diff --git a/run-command.c b/run-command.c index 28e1d55..b8ff67b 100644 --- a/run-command.c +++ b/run-command.c @@ -852,3 +852,147 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint) close(cmd->out); return finish_command(cmd); } + +int run_processes_async(int n, get_next_task fn, void *data) +{ + int i, wait_status; + pid_t pid; + + /* no more tasks. Also set when aborting early. */ + int all_tasks_started = 0; + int nr_processes = 0; + int child_in_foreground = 0; + struct timeval timeout; + struct child_process *children = xcalloc(n, sizeof(*children)); + char *slots = xcalloc(n, sizeof(*slots)); + struct strbuf *err = xcalloc(n, sizeof(*err)); + fd_set fdset; + int maxfd; + struct strbuf finished_children = STRBUF_INIT; + int flags; + for (i = 0; i < n; i++) + strbuf_init(&err[i], 0); + + while (!all_tasks_started || nr_processes > 0) { + /* Start new processes. */ + while (!all_tasks_started && nr_processes < n) { + for (i = 0; i < n; i++) + if (!slots[i]) + break; /* found an empty slot */ + if (i == n) + die("BUG: bookkeeping is hard"); + + if (fn(data, &children[i], &err[i])) { + all_tasks_started = 1; + break; + } + if (start_command(&children[i])) + die(_("Could not start child process")); + flags = fcntl(children[i].err, F_GETFL); + fcntl(children[i].err, F_SETFL, flags | O_NONBLOCK); + nr_processes++; + slots[i] = 1; + } + + /* prepare data for select call */ + FD_ZERO(&fdset); + maxfd = 0; + for (i = 0; i < n; i++) { + if (!slots[i]) + continue; + FD_SET(children[i].err, &fdset); + if (children[i].err > maxfd) + maxfd = children[i].err; + } + timeout.tv_sec = 0; + timeout.tv_usec = 500000; + + i = select(maxfd + 1, &fdset, NULL, NULL, &timeout); + if (i < 0) { + if (errno == EINTR) + /* A signal was caught; try again */ + continue; + else if (errno == ENOMEM) + die_errno("BUG: keeping track of fds is hard"); + else if (errno == EINVAL) + die_errno("BUG: invalid arguments to select"); + else if (errno == EBADF) + die_errno("BUG: keeping track of fds is hard"); + else + die_errno("Unknown error with select"); + } + + /* Buffer output from all pipes. */ + for (i = 0; i < n; i++) { + if (!slots[i]) + continue; + if (FD_ISSET(children[i].err, &fdset)) + strbuf_read_noblock(&err[i], children[i].err, 0); + if (child_in_foreground == i) { + fputs(err[i].buf, stderr); + strbuf_reset(&err[i]); + fflush(stderr); + } + } + + /* Collect finished child processes. */ + while (nr_processes > 0) { + pid = waitpid(-1, &wait_status, WNOHANG); + if (pid == 0) + /* no child finished */ + break; + + if (pid < 0) { + if (errno == EINTR) + break; /* just try again next time */ + if (errno == EINVAL || errno == ECHILD) + die_errno("wait"); + } else { + /* Find the finished child. */ + for (i = 0; i < n; i++) + if (slots[i] && pid == children[i].pid) + break; + if (i == n) + /* waitpid returned another process id which + * we are not waiting on, so ignore it*/ + break; + } + + strbuf_read_noblock(&err[i], children[i].err, 0); + argv_array_clear(&children[i].args); + argv_array_clear(&children[i].env_array); + + slots[i] = 0; + nr_processes--; + + if (i != child_in_foreground) { + strbuf_addbuf(&finished_children, &err[i]); + strbuf_reset(&err[i]); + } else { + fputs(err[i].buf, stderr); + strbuf_reset(&err[i]); + + /* Output all other finished child processes */ + fputs(finished_children.buf, stderr); + strbuf_reset(&finished_children); + + /* + * Pick next process to output live. + * There can be no active process if n==1 + * NEEDSWORK: + * For now we pick it randomly by doing a round + * robin. Later we may want to pick the one with + * the most output or the longest or shortest + * running process time. + */ + for (i = 0; i < n; i++) + if (slots[(child_in_foreground + i) % n]) + break; + child_in_foreground = (child_in_foreground + i) % n; + fputs(err[child_in_foreground].buf, stderr); + strbuf_reset(&err[child_in_foreground]); + } + } + } + return 0; +} diff --git a/run-command.h b/run-command.h index 5b4425a..8f53ad6 100644 --- a/run-command.h +++ b/run-command.h @@ -119,4 +119,33 @@ struct async { int start_async(struct async *async); int finish_async(struct async *async); +/** + * Return 0 if the next child is ready to run. + * This callback takes care to initialize the child process and preload the + * out and error channel. The preloading of these outpout channels is useful + * if you want to have a message printed directly before the output of the + * child process. + * + * Return != 0 if there are no more tasks to be processed. + */ +typedef int (*get_next_task)(void *data, + struct child_process *cp, + struct strbuf *err); + +/** + * Runs up to n processes at the same time. Whenever a process can + * be started, the callback `get_next_task` is called to obtain the + * data fed to the child process. + * + * The children started via this function run in parallel and their output + * to both stdout and stderr is buffered, while one of the children will + * directly output to stdout/stderr. + * + * This leads to a problem with output from processes which put out to + * stdout/err alternatingly as the buffering will not be able to replay + * the + */ + +int run_processes_async(int n, get_next_task fn, void *data); + #endif diff --git a/strbuf.c b/strbuf.c index cce5eed..7f866c3 100644 --- a/strbuf.c +++ b/strbuf.c @@ -384,6 +384,37 @@ ssize_t strbuf_read(struct strbuf *sb, int fd, size_t hint) return sb->len - oldlen; } +ssize_t strbuf_read_noblock(struct strbuf *sb, int fd, size_t hint) +{ + size_t oldlen = sb->len; + size_t oldalloc = sb->alloc; + + strbuf_grow(sb, hint ? hint : 8192); + for (;;) { + ssize_t cnt; + + cnt = read(fd, sb->buf + sb->len, sb->alloc - sb->len - 1); + if (cnt < 0) { + if (errno == EINTR) + continue; + if (errno == EAGAIN) + break; + if (oldalloc == 0) + strbuf_release(sb); + else + strbuf_setlen(sb, oldlen); + return -1; + } + if (!cnt) + break; + sb->len += cnt; + strbuf_grow(sb, 8192); + } + + sb->buf[sb->len] = '\0'; + return sb->len - oldlen; +} + #define STRBUF_MAXLINK (2*PATH_MAX) int strbuf_readlink(struct strbuf *sb, const char *path, size_t hint) diff --git a/strbuf.h b/strbuf.h index aef2794..7ea462b 100644 --- a/strbuf.h +++ b/strbuf.h @@ -365,6 +365,7 @@ extern size_t strbuf_fread(struct strbuf *, size_t, FILE *); * any partial read is undone. */ extern ssize_t strbuf_read(struct strbuf *, int fd, size_t hint); +extern ssize_t strbuf_read_noblock(struct strbuf *, int fd, size_t hint); /** * Read the contents of a file, specified by its path. The third argument diff --git a/submodule.c b/submodule.c index 1d64e57..6d757c6 100644 --- a/submodule.c +++ b/submodule.c @@ -12,6 +12,7 @@ #include "sha1-array.h" #include "argv-array.h" #include "blob.h" +#include "thread-utils.h" static int config_fetch_recurse_submodules = RECURSE_SUBMODULES_ON_DEMAND; static struct string_list changed_submodule_paths; @@ -615,37 +616,61 @@ static void calculate_changed_submodule_paths(void) initialized_fetch_ref_tips = 0; } +struct submodule_parallel_fetch { + int count; + struct argv_array args; + const char *work_tree; + const char *prefix; + int command_line_option; + int quiet; +}; +#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL} + +int get_next_submodule(void *data, struct child_process *cp, + struct strbuf *err); + int fetch_populated_submodules(const struct argv_array *options, const char *prefix, int command_line_option, - int quiet) + int quiet, int max_parallel_jobs) { int i, result = 0; - struct child_process cp = CHILD_PROCESS_INIT; - struct argv_array argv = ARGV_ARRAY_INIT; - const char *work_tree = get_git_work_tree(); - if (!work_tree) + struct submodule_parallel_fetch spf = SPF_INIT; + spf.work_tree = get_git_work_tree(); + spf.command_line_option = command_line_option; + spf.quiet = quiet; + spf.prefix = prefix; + if (!spf.work_tree) goto out; if (read_cache() < 0) die("index file corrupt"); - argv_array_push(&argv, "fetch"); + argv_array_push(&spf.args, "fetch"); for (i = 0; i < options->argc; i++) - argv_array_push(&argv, options->argv[i]); - argv_array_push(&argv, "--recurse-submodules-default"); + argv_array_push(&spf.args, options->argv[i]); + argv_array_push(&spf.args, "--recurse-submodules-default"); /* default value, "--submodule-prefix" and its value are added later */ - cp.env = local_repo_env; - cp.git_cmd = 1; - cp.no_stdin = 1; - calculate_changed_submodule_paths(); + run_processes_async(max_parallel_jobs, get_next_submodule, &spf); + + argv_array_clear(&spf.args); +out: + string_list_clear(&changed_submodule_paths, 1); + return result; +} + +int get_next_submodule(void *data, struct child_process *cp, + struct strbuf *err) +{ + int ret = 0; + struct submodule_parallel_fetch *spf = data; - for (i = 0; i < active_nr; i++) { + for ( ; spf->count < active_nr; spf->count++) { struct strbuf submodule_path = STRBUF_INIT; struct strbuf submodule_git_dir = STRBUF_INIT; struct strbuf submodule_prefix = STRBUF_INIT; - const struct cache_entry *ce = active_cache[i]; + const struct cache_entry *ce = active_cache[spf->count]; const char *git_dir, *default_argv; const struct submodule *submodule; @@ -657,7 +682,7 @@ int fetch_populated_submodules(const struct argv_array *options, submodule = submodule_from_name(null_sha1, ce->name); default_argv = "yes"; - if (command_line_option == RECURSE_SUBMODULES_DEFAULT) { + if (spf->command_line_option == RECURSE_SUBMODULES_DEFAULT) { if (submodule && submodule->fetch_recurse != RECURSE_SUBMODULES_NONE) { @@ -680,40 +705,46 @@ int fetch_populated_submodules(const struct argv_array *options, default_argv = "on-demand"; } } - } else if (command_line_option == RECURSE_SUBMODULES_ON_DEMAND) { + } else if (spf->command_line_option == RECURSE_SUBMODULES_ON_DEMAND) { if (!unsorted_string_list_lookup(&changed_submodule_paths, ce->name)) continue; default_argv = "on-demand"; } - strbuf_addf(&submodule_path, "%s/%s", work_tree, ce->name); + strbuf_addf(&submodule_path, "%s/%s", spf->work_tree, ce->name); strbuf_addf(&submodule_git_dir, "%s/.git", submodule_path.buf); - strbuf_addf(&submodule_prefix, "%s%s/", prefix, ce->name); + strbuf_addf(&submodule_prefix, "%s%s/", spf->prefix, ce->name); git_dir = read_gitfile(submodule_git_dir.buf); if (!git_dir) git_dir = submodule_git_dir.buf; if (is_directory(git_dir)) { - if (!quiet) - fprintf(stderr, "Fetching submodule %s%s\n", prefix, ce->name); - cp.dir = submodule_path.buf; - argv_array_push(&argv, default_argv); - argv_array_push(&argv, "--submodule-prefix"); - argv_array_push(&argv, submodule_prefix.buf); - cp.argv = argv.argv; - if (run_command(&cp)) - result = 1; - argv_array_pop(&argv); - argv_array_pop(&argv); - argv_array_pop(&argv); + child_process_init(cp); + cp->dir = strbuf_detach(&submodule_path, NULL); + cp->git_cmd = 1; + cp->no_stdout = 1; + cp->no_stdin = 1; + cp->stdout_to_stderr = 1; + cp->err = -1; + cp->env = local_repo_env; + if (!spf->quiet) + strbuf_addf(err, "Fetching submodule %s%s\n", + spf->prefix, ce->name); + argv_array_init(&cp->args); + argv_array_pushv(&cp->args, spf->args.argv); + argv_array_push(&cp->args, default_argv); + argv_array_push(&cp->args, "--submodule-prefix"); + argv_array_push(&cp->args, submodule_prefix.buf); + ret = 1; } strbuf_release(&submodule_path); strbuf_release(&submodule_git_dir); strbuf_release(&submodule_prefix); + if (ret) { + spf->count++; + return 0; + } } - argv_array_clear(&argv); -out: - string_list_clear(&changed_submodule_paths, 1); - return result; + return 1; } unsigned is_submodule_modified(const char *path, int ignore_untracked) diff --git a/submodule.h b/submodule.h index 5507c3d..cbc0003 100644 --- a/submodule.h +++ b/submodule.h @@ -31,7 +31,7 @@ void set_config_fetch_recurse_submodules(int value); void check_for_new_submodule_commits(unsigned char new_sha1[20]); int fetch_populated_submodules(const struct argv_array *options, const char *prefix, int command_line_option, - int quiet); + int quiet, int max_parallel_jobs); unsigned is_submodule_modified(const char *path, int ignore_untracked); int submodule_uses_gitfile(const char *path); int ok_to_remove_submodule(const char *path); diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh index 9acf628..0970fb0 100755 --- a/t/t0061-run-command.sh +++ b/t/t0061-run-command.sh @@ -47,4 +47,20 @@ test_expect_success POSIXPERM,SANITY 'unreadable directory in PATH' ' test_cmp expect actual ' +cat >expect <<-EOF +Now running instance 0 +Hello World +Now running instance 1 +Hello World +Now running instance 2 +Hello World +Now running instance 3 +Hello World +EOF + +test_expect_success 'run_command runs in parallel' ' + test-run-command run-command-async sh -c "echo Hello World >&2;" 2>actual && + test_cmp expect actual +' + test_done diff --git a/t/t5526-fetch-submodules.sh b/t/t5526-fetch-submodules.sh index 17759b1..1b4ce69 100755 --- a/t/t5526-fetch-submodules.sh +++ b/t/t5526-fetch-submodules.sh @@ -71,6 +71,16 @@ test_expect_success "fetch --recurse-submodules recurses into submodules" ' test_i18ncmp expect.err actual.err ' +test_expect_success "fetch --recurse-submodules -j2 has the same output behaviour" ' + add_upstream_commit && + ( + cd downstream && + git fetch --recurse-submodules -j2 2>../actual.err + ) && + test_must_be_empty actual.out && + test_i18ncmp expect.err actual.err +' + test_expect_success "fetch alone only fetches superproject" ' add_upstream_commit && ( @@ -140,6 +150,15 @@ test_expect_success "--quiet propagates to submodules" ' ! test -s actual.err ' +test_expect_success "--quiet propagates to parallel submodules" ' + ( + cd downstream && + git fetch --recurse-submodules -j 2 --quiet >../actual.out 2>../actual.err + ) && + ! test -s actual.out && + ! test -s actual.err +' + test_expect_success "--dry-run propagates to submodules" ' add_upstream_commit && ( diff --git a/test-run-command.c b/test-run-command.c index 89c7de2..4817f6e 100644 --- a/test-run-command.c +++ b/test-run-command.c @@ -10,9 +10,29 @@ #include "git-compat-util.h" #include "run-command.h" +#include "argv-array.h" +#include "strbuf.h" #include <string.h> #include <errno.h> +static int number_callbacks; +int run_processes_async_next(void *data, + struct child_process *cp, + struct strbuf *err) +{ + struct child_process *d = data; + if (number_callbacks >= 4) + return 1; + + argv_array_pushv(&cp->args, d->argv); + cp->stdout_to_stderr = 1; + cp->no_stdin = 1; + cp->err = -1; + strbuf_addf(err, "Now running instance %d\n", number_callbacks); + number_callbacks++; + return 0; +} + int main(int argc, char **argv) { struct child_process proc = CHILD_PROCESS_INIT; @@ -30,6 +50,9 @@ int main(int argc, char **argv) if (!strcmp(argv[1], "run-command")) exit(run_command(&proc)); + if (!strcmp(argv[1], "run-command-async")) + exit(run_processes_async(4, run_processes_async_next, &proc)); + fprintf(stderr, "check usage\n"); return 1; } -- 2.6.0.rc0.131.gf624c3d ^ permalink raw reply related [flat|nested] 13+ messages in thread
* Re: [PATCH 2/2] fetch: fetch submodules in parallel 2015-09-11 23:09 ` [PATCH 2/2] fetch: fetch submodules in parallel Stefan Beller @ 2015-09-12 19:11 ` Junio C Hamano 2015-09-14 16:46 ` Stefan Beller 0 siblings, 1 reply; 13+ messages in thread From: Junio C Hamano @ 2015-09-12 19:11 UTC (permalink / raw) To: Stefan Beller Cc: git, peff, jrnieder, johannes.schindelin, Jens.Lehmann, vlovich Stefan Beller <sbeller@google.com> writes: > diff --git a/run-command.c b/run-command.c > index 28e1d55..b8ff67b 100644 > --- a/run-command.c > +++ b/run-command.c > @@ -852,3 +852,147 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint) > close(cmd->out); > return finish_command(cmd); > } > + > +int run_processes_async(int n, get_next_task fn, void *data) > +{ > + int i, wait_status; > + pid_t pid; > + > + /* no more tasks. Also set when aborting early. */ > + int all_tasks_started = 0; > + int nr_processes = 0; > + int child_in_foreground = 0; > + struct timeval timeout; > + struct child_process *children = xcalloc(n, sizeof(*children)); > + char *slots = xcalloc(n, sizeof(*slots)); > + struct strbuf *err = xcalloc(n, sizeof(*err)); > + fd_set fdset; > + int maxfd; > + struct strbuf finished_children = STRBUF_INIT; > + int flags; > + for (i = 0; i < n; i++) > + strbuf_init(&err[i], 0); > + > + while (!all_tasks_started || nr_processes > 0) { > + /* Start new processes. */ > + while (!all_tasks_started && nr_processes < n) { > + for (i = 0; i < n; i++) > + if (!slots[i]) > + break; /* found an empty slot */ > + if (i == n) > + die("BUG: bookkeeping is hard"); > + > + if (fn(data, &children[i], &err[i])) { > + all_tasks_started = 1; > + break; > + } > + if (start_command(&children[i])) > + die(_("Could not start child process")); > + flags = fcntl(children[i].err, F_GETFL); > + fcntl(children[i].err, F_SETFL, flags | O_NONBLOCK); This function in run-command.c looks as if it is a generic helper to be called by anybody, but it seems to only care about the standard error and not the standard output stream, which means potential users that do not dup them together cannot use it. Is that a big downside, or is it sufficient to document the API to say that children must do so? I offhand do not think the latter is unreasonable, but that may be only because I haven't thought things through. > + nr_processes++; > + slots[i] = 1; > + } > + > + /* prepare data for select call */ > + FD_ZERO(&fdset); > + maxfd = 0; > + for (i = 0; i < n; i++) { > + if (!slots[i]) > + continue; > + FD_SET(children[i].err, &fdset); > + if (children[i].err > maxfd) > + maxfd = children[i].err; > + } > + timeout.tv_sec = 0; > + timeout.tv_usec = 500000; > + > + i = select(maxfd + 1, &fdset, NULL, NULL, &timeout); I thought we try to use poll() and on systems with only select we allow compat/ to emulate in our code. > + if (i < 0) { > + if (errno == EINTR) > + /* A signal was caught; try again */ > + continue; > + else if (errno == ENOMEM) > + die_errno("BUG: keeping track of fds is hard"); > + else if (errno == EINVAL) > + die_errno("BUG: invalid arguments to select"); > + else if (errno == EBADF) > + die_errno("BUG: keeping track of fds is hard"); > + else > + die_errno("Unknown error with select"); I doubt that the later part of elseif cascade adds any value. You will see errno printed anyway. ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [PATCH 2/2] fetch: fetch submodules in parallel 2015-09-12 19:11 ` Junio C Hamano @ 2015-09-14 16:46 ` Stefan Beller 2015-09-14 17:17 ` Jeff King 0 siblings, 1 reply; 13+ messages in thread From: Stefan Beller @ 2015-09-14 16:46 UTC (permalink / raw) To: Junio C Hamano Cc: git@vger.kernel.org, Jeff King, Jonathan Nieder, Johannes Schindelin, Jens Lehmann, Vitali Lovich On Sat, Sep 12, 2015 at 12:11 PM, Junio C Hamano <gitster@pobox.com> wrote: >> + if (start_command(&children[i])) >> + die(_("Could not start child process")); >> + flags = fcntl(children[i].err, F_GETFL); >> + fcntl(children[i].err, F_SETFL, flags | O_NONBLOCK); > > This function in run-command.c looks as if it is a generic helper to > be called by anybody, but it seems to only care about the standard > error and not the standard output stream, which means potential > users that do not dup them together cannot use it. Is that a big > downside, or is it sufficient to document the API to say that > children must do so? I offhand do not think the latter is > unreasonable, but that may be only because I haven't thought things > through. Yes it ought to become a generic helper eventually. I tried implementing a buffering solution for both stdout and stderr, but that doesn't really workout well if you consider interleaved output on the pipes as we cannot accurately replay that later on. To do that we would need to store the timing information of the channels, at least the relative order of it like: (stdout, First comes text to stdout), (stderr, interrupted by text in stderr) (stdout, but stdout doesn't bother, blasting more text) (stderr, continues to interrupt) obtaining the information is inherently racy, as all we can do is polling/reading from both stdout/err as fast as possible but without proper synchronization mechanisms we cannot be sure. I will add documentation explaining why the async output case will only deal with one channel. I chose stderr as that's already available and needed in this use case. > >> + nr_processes++; >> + slots[i] = 1; >> + } >> + >> + /* prepare data for select call */ >> + FD_ZERO(&fdset); >> + maxfd = 0; >> + for (i = 0; i < n; i++) { >> + if (!slots[i]) >> + continue; >> + FD_SET(children[i].err, &fdset); >> + if (children[i].err > maxfd) >> + maxfd = children[i].err; >> + } >> + timeout.tv_sec = 0; >> + timeout.tv_usec = 500000; >> + >> + i = select(maxfd + 1, &fdset, NULL, NULL, &timeout); > > I thought we try to use poll() and on systems with only select we > allow compat/ to emulate in our code. I did not know that. I'll rewrite the patch to use poll instead. > >> + if (i < 0) { >> + if (errno == EINTR) >> + /* A signal was caught; try again */ >> + continue; >> + else if (errno == ENOMEM) >> + die_errno("BUG: keeping track of fds is hard"); >> + else if (errno == EINVAL) >> + die_errno("BUG: invalid arguments to select"); >> + else if (errno == EBADF) >> + die_errno("BUG: keeping track of fds is hard"); >> + else >> + die_errno("Unknown error with select"); > > I doubt that the later part of elseif cascade adds any value. You > will see errno printed anyway. ok. ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [PATCH 2/2] fetch: fetch submodules in parallel 2015-09-14 16:46 ` Stefan Beller @ 2015-09-14 17:17 ` Jeff King 2015-09-14 17:47 ` Stefan Beller ` (2 more replies) 0 siblings, 3 replies; 13+ messages in thread From: Jeff King @ 2015-09-14 17:17 UTC (permalink / raw) To: Stefan Beller Cc: Junio C Hamano, git@vger.kernel.org, Jonathan Nieder, Johannes Schindelin, Jens Lehmann, Vitali Lovich On Mon, Sep 14, 2015 at 09:46:58AM -0700, Stefan Beller wrote: > I tried implementing a buffering solution for both stdout and stderr, > but that doesn't really workout well if you consider interleaved output > on the pipes as we cannot accurately replay that later on. To do that > we would need to store the timing information of the channels, at least > the relative order of it like: > > (stdout, First comes text to stdout), > (stderr, interrupted by text in stderr) > (stdout, but stdout doesn't bother, blasting more text) > (stderr, continues to interrupt) > > obtaining the information is inherently racy, as all we can do is > polling/reading from both stdout/err as fast as possible but without > proper synchronization mechanisms we cannot be sure. I don't think you need exact timing information. This is no different than running the commands themselves, with stdout and stderr writing to a pty that your terminal emulator will then read() from. If the program produces intermingled stdout/stderr that clogs up the terminal, that is its problem. The only difference is that we're going to save it and later replay it all very quickly. So I think it would be sufficient just to retain the original order. > I will add documentation explaining why the async output case > will only deal with one channel. I chose stderr as that's already > available and needed in this use case. I suspect you could just set child->stdout_to_stderr in this case, and then you get your ordering for free. But probably in the general case people would want to run inspection commands that produce a useful stdout. To handle multiple channels, I think you could just do a linked list of buffers rather than a single strbuf. Like: struct io_chunk { int channel; char *buf; size_t len; struct io_chunk *next; }; and just keep appending chunks to the list (and to dump them, just walk the list, writing each to the appropriate channel descriptor). -Peff ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [PATCH 2/2] fetch: fetch submodules in parallel 2015-09-14 17:17 ` Jeff King @ 2015-09-14 17:47 ` Stefan Beller 2015-09-14 17:55 ` Jonathan Nieder 2015-09-14 17:56 ` [PATCH 2/2] fetch: " Junio C Hamano 2 siblings, 0 replies; 13+ messages in thread From: Stefan Beller @ 2015-09-14 17:47 UTC (permalink / raw) To: Jeff King Cc: Junio C Hamano, git@vger.kernel.org, Jonathan Nieder, Johannes Schindelin, Jens Lehmann, Vitali Lovich On Mon, Sep 14, 2015 at 10:17 AM, Jeff King <peff@peff.net> wrote: > On Mon, Sep 14, 2015 at 09:46:58AM -0700, Stefan Beller wrote: > >> I tried implementing a buffering solution for both stdout and stderr, >> but that doesn't really workout well if you consider interleaved output >> on the pipes as we cannot accurately replay that later on. To do that >> we would need to store the timing information of the channels, at least >> the relative order of it like: >> >> (stdout, First comes text to stdout), >> (stderr, interrupted by text in stderr) >> (stdout, but stdout doesn't bother, blasting more text) >> (stderr, continues to interrupt) >> >> obtaining the information is inherently racy, as all we can do is >> polling/reading from both stdout/err as fast as possible but without >> proper synchronization mechanisms we cannot be sure. > > I don't think you need exact timing information. This is no different > than running the commands themselves, with stdout and stderr writing to > a pty that your terminal emulator will then read() from. If the program > produces intermingled stdout/stderr that clogs up the terminal, that is > its problem. > > The only difference is that we're going to save it and later replay it > all very quickly. So I think it would be sufficient just to retain the > original order. > >> I will add documentation explaining why the async output case >> will only deal with one channel. I chose stderr as that's already >> available and needed in this use case. > > I suspect you could just set child->stdout_to_stderr in this case, and > then you get your ordering for free. But probably in the general case > people would want to run inspection commands that produce a useful > stdout. > > To handle multiple channels, I think you could just do a linked list of > buffers rather than a single strbuf. Like: I will have no problem coding such a thing in a user program, but how do you obtain this non racily from the child using the posix API? The poll/select command may return more than one fd ready, so then you don't know the ordering in which you would need to replay it. This may introduce subtle bugs? So I'd rather come up with a solution buffering 2 channels once we need it, keeping the stdout_to_stderr as a requirement for now. > > struct io_chunk { > int channel; > char *buf; > size_t len; > struct io_chunk *next; > }; > > and just keep appending chunks to the list (and to dump them, just walk > the list, writing each to the appropriate channel descriptor). > > -Peff ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [PATCH 2/2] fetch: fetch submodules in parallel 2015-09-14 17:17 ` Jeff King 2015-09-14 17:47 ` Stefan Beller @ 2015-09-14 17:55 ` Jonathan Nieder 2015-09-14 18:07 ` Jeff King 2015-09-14 17:56 ` [PATCH 2/2] fetch: " Junio C Hamano 2 siblings, 1 reply; 13+ messages in thread From: Jonathan Nieder @ 2015-09-14 17:55 UTC (permalink / raw) To: Jeff King Cc: Stefan Beller, Junio C Hamano, git@vger.kernel.org, Johannes Schindelin, Jens Lehmann, Vitali Lovich Jeff King wrote: > On Mon, Sep 14, 2015 at 09:46:58AM -0700, Stefan Beller wrote: >> I tried implementing a buffering solution for both stdout and stderr, >> but that doesn't really workout well if you consider interleaved output >> on the pipes as we cannot accurately replay that later on. [...] >> obtaining the information is inherently racy [...] > I don't think you need exact timing information. This is no different > than running the commands themselves, with stdout and stderr writing to > a pty that your terminal emulator will then read() from. If the program > produces intermingled stdout/stderr that clogs up the terminal, that is > its problem. The difference is that when stdout and stderr write to a pty, they write to the same pty. That is, suppose a child process does write(1, "A\n", 2); write(2, "B\n", 1); write(1, "C\n", 2); Then the output that should be echoed to the terminal is A B C Now the parent might do for (;;) { int n = select(...); ... do stuff ... } If all three writes happen during the "do stuff" step, then *if* the child's stdout and stderr went to different pipes, all the parent sees is child's stdout: A\nC\n child's stderr: B\n There is not sufficient information to recover the original output order. (Linux provides a pipe2(..., O_DIRECT) that almost provides sufficient information --- it tells you child's stdout: "A\n", "C\n" child's stderr: "B\n" but still doesn't give information about ordering.) That's probably okay: in most git commands, stderr shows a combination of diagnostic output and progress information and stdout shows the actual result, so interleaving between the two is not too common. One can imagine a "git grep --recurse-submodules" that wants to run a grep in each submodule and combine their output in some appropriate way. It's not clear what order is best for that use case: stderr (errors, plus progress in some imaginary future) at the beginning to show the story of how output was generated before the output? stderr at the end so errors are not hidden way up on the screen? Some kind of interleaving that pays attention to the format of stdout? That is more complicated than the "fetch --recurse-submodules" case that Stefan is currently tackling, so it seems wise to me to punt for now. Thanks and hope that helps, Jonathan ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [PATCH 2/2] fetch: fetch submodules in parallel 2015-09-14 17:55 ` Jonathan Nieder @ 2015-09-14 18:07 ` Jeff King 2015-09-14 21:50 ` [PATCHv2] " Stefan Beller 0 siblings, 1 reply; 13+ messages in thread From: Jeff King @ 2015-09-14 18:07 UTC (permalink / raw) To: Jonathan Nieder Cc: Stefan Beller, Junio C Hamano, git@vger.kernel.org, Johannes Schindelin, Jens Lehmann, Vitali Lovich On Mon, Sep 14, 2015 at 10:55:09AM -0700, Jonathan Nieder wrote: > > I don't think you need exact timing information. This is no different > > than running the commands themselves, with stdout and stderr writing to > > a pty that your terminal emulator will then read() from. If the program > > produces intermingled stdout/stderr that clogs up the terminal, that is > > its problem. > > The difference is that when stdout and stderr write to a pty, they write > to the same pty. That is, suppose a child process does > > write(1, "A\n", 2); > write(2, "B\n", 1); > write(1, "C\n", 2); Ah, right. The pty is where things get mixed, not the read() from the terminal. So it depends on the write() order. Thanks for the explanation. > One can imagine a "git grep --recurse-submodules" that wants to run a > grep in each submodule and combine their output in some appropriate > way. It's not clear what order is best for that use case: stderr > (errors, plus progress in some imaginary future) at the beginning to > show the story of how output was generated before the output? stderr > at the end so errors are not hidden way up on the screen? Some kind > of interleaving that pays attention to the format of stdout? I'd suggest a "best effort" interleaving, where we select and preserve the read() order. That makes the easy cases work (you get things in the original order), and the hard cases at least do something reasonable (we may reorder two items which come in the same atomic "tick" of our select, but at least they are nearby). That's just my gut feeling, though. > That is more complicated than the "fetch --recurse-submodules" case > that Stefan is currently tackling, so it seems wise to me to punt for > now. I can live with that. -Peff ^ permalink raw reply [flat|nested] 13+ messages in thread
* [PATCHv2] fetch submodules in parallel 2015-09-14 18:07 ` Jeff King @ 2015-09-14 21:50 ` Stefan Beller 2015-09-14 21:50 ` [PATCHv2] fetch: " Stefan Beller 2015-09-14 22:06 ` [PATCHv2] " Junio C Hamano 0 siblings, 2 replies; 13+ messages in thread From: Stefan Beller @ 2015-09-14 21:50 UTC (permalink / raw) To: gitster Cc: peff, git, jrnieder, johannes.schindelin, Jens.Lehmann, vlovich, Stefan Beller This replaces the last patch of the "Parallel git submodule fetching" series. Changes: * have correct return code in submodule fetching when one submodule fails * use poll instead of select now * broke down into more smaller functions instead of one giant. (I think it is an improvement, but I wouldn't be surprised if someone objects) * closed memory leaks * document the need for stdout_to_stderr I don't deem it RFC-ish any more but good to go. Any feedback welcome! Thanks, Stefan Stefan Beller (1): fetch: fetch submodules in parallel Documentation/fetch-options.txt | 7 + builtin/fetch.c | 6 +- builtin/pull.c | 6 + run-command.c | 278 ++++++++++++++++++++++++++++++++++++---- run-command.h | 36 ++++++ strbuf.c | 31 +++++ strbuf.h | 1 + submodule.c | 119 ++++++++++++----- submodule.h | 2 +- t/t0061-run-command.sh | 20 +++ t/t5526-fetch-submodules.sh | 19 +++ test-run-command.c | 24 ++++ 12 files changed, 490 insertions(+), 59 deletions(-) Interdiff to RFCv1: diff --git a/Documentation/fetch-options.txt b/Documentation/fetch-options.txt index d432f98..6b109f6 100644 --- a/Documentation/fetch-options.txt +++ b/Documentation/fetch-options.txt @@ -105,7 +105,7 @@ ifndef::git-pull[] Number of parallel children to be used for fetching submodules. Each will fetch from different submodules, such that fetching many submodules will be faster. By default submodules will be fetched - one at a time + one at a time. --no-recurse-submodules:: Disable recursive fetching of submodules (this has the same effect as diff --git a/builtin/fetch.c b/builtin/fetch.c index a1520bb..f28eac6 100644 --- a/builtin/fetch.c +++ b/builtin/fetch.c @@ -101,7 +101,7 @@ static struct option builtin_fetch_options[] = { OPT_SET_INT('n', NULL, &tags, N_("do not fetch all tags (--no-tags)"), TAGS_UNSET), OPT_INTEGER('j', "jobs", &max_children, - N_("number of threads used for fetching")), + N_("number of submodules fetched in parallel")), OPT_BOOL('p', "prune", &prune, N_("prune remote-tracking branches no longer on remote")), { OPTION_CALLBACK, 0, "recurse-submodules", NULL, N_("on-demand"), diff --git a/builtin/pull.c b/builtin/pull.c index bc117e9..f0af196 100644 --- a/builtin/pull.c +++ b/builtin/pull.c @@ -179,7 +179,7 @@ static struct option pull_options[] = { N_("control recursive fetching of submodules"), PARSE_OPT_OPTARG), OPT_PASSTHRU('j', "jobs", &max_children, N_("n"), - N_("number of threads used for fetching submodules"), + N_("number of submodules pulled in parallel"), PARSE_OPT_OPTARG), OPT_BOOL(0, "dry-run", &opt_dry_run, N_("dry run")), diff --git a/run-command.c b/run-command.c index b8ff67b..6f6f9fb 100644 --- a/run-command.c +++ b/run-command.c @@ -232,6 +232,35 @@ static inline void set_cloexec(int fd) fcntl(fd, F_SETFD, flags | FD_CLOEXEC); } +static int determine_return_value(int wait_status, + int *result, + int *error_code, + const char *argv0) +{ + if (WIFSIGNALED(wait_status)) { + *result = WTERMSIG(wait_status); + if (*result != SIGINT && *result != SIGQUIT) + error("%s died of signal %d", argv0, *result); + /* + * This return value is chosen so that code & 0xff + * mimics the exit code that a POSIX shell would report for + * a program that died from this signal. + */ + *result += 128; + } else if (WIFEXITED(wait_status)) { + *result = WEXITSTATUS(wait_status); + /* + * Convert special exit code when execvp failed. + */ + if (*result == 127) { + *result = -1; + *error_code = ENOENT; + } + } else + return 1; + return 0; +} + static int wait_or_whine(pid_t pid, const char *argv0) { int status, code = -1; @@ -244,29 +273,10 @@ static int wait_or_whine(pid_t pid, const char *argv0) if (waiting < 0) { failed_errno = errno; error("waitpid for %s failed: %s", argv0, strerror(errno)); - } else if (waiting != pid) { - error("waitpid is confused (%s)", argv0); - } else if (WIFSIGNALED(status)) { - code = WTERMSIG(status); - if (code != SIGINT && code != SIGQUIT) - error("%s died of signal %d", argv0, code); - /* - * This return value is chosen so that code & 0xff - * mimics the exit code that a POSIX shell would report for - * a program that died from this signal. - */ - code += 128; - } else if (WIFEXITED(status)) { - code = WEXITSTATUS(status); - /* - * Convert special exit code when execvp failed. - */ - if (code == 127) { - code = -1; - failed_errno = ENOENT; - } } else { - error("waitpid is confused (%s)", argv0); + if (waiting != pid + || (determine_return_value(status, &code, &failed_errno, argv0) < 0)) + error("waitpid is confused (%s)", argv0); } clear_child_for_cleanup(pid); @@ -853,146 +863,226 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint) return finish_command(cmd); } -int run_processes_async(int n, get_next_task fn, void *data) +static void unblock_fd(int fd) { - int i, wait_status; - pid_t pid; + int flags = fcntl(fd, F_GETFL); + if (flags < 0) { + warning("Could not get file status flags, " + "output will be degraded"); + return; + } + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK)) { + warning("Could not set file status flags, " + "output will be degraded"); + return; + } +} - /* no more tasks. Also set when aborting early. */ - int all_tasks_started = 0; - int nr_processes = 0; - int child_in_foreground = 0; - struct timeval timeout; - struct child_process *children = xcalloc(n, sizeof(*children)); - char *slots = xcalloc(n, sizeof(*slots)); - struct strbuf *err = xcalloc(n, sizeof(*err)); - fd_set fdset; - int maxfd; - struct strbuf finished_children = STRBUF_INIT; - int flags; - for (i = 0; i < n; i++) - strbuf_init(&err[i], 0); - - while (!all_tasks_started || nr_processes > 0) { - /* Start new processes. */ - while (!all_tasks_started && nr_processes < n) { - for (i = 0; i < n; i++) - if (!slots[i]) - break; /* found an empty slot */ - if (i == n) - die("BUG: bookkeeping is hard"); - - if (fn(data, &children[i], &err[i])) { - all_tasks_started = 1; - break; - } - if (start_command(&children[i])) - die(_("Could not start child process")); - flags = fcntl(children[i].err, F_GETFL); - fcntl(children[i].err, F_SETFL, flags | O_NONBLOCK); - nr_processes++; - slots[i] = 1; - } +struct parallel_processes { + int max_number_processes; + void *data; + get_next_task fn; + handle_child_starting_failure fn_err; + handle_child_return_value fn_exit; + + int nr_processes; + int all_tasks_started; + int foreground_child; + char *slots; + struct child_process *children; + struct pollfd *pfd; + struct strbuf *err; + struct strbuf finished_children; +}; + +static void run_processes_parallel_init(struct parallel_processes *pp, + int n, void *data, + get_next_task fn, + handle_child_starting_failure fn_err, + handle_child_return_value fn_exit) +{ + int i; + + pp->max_number_processes = n; + pp->data = data; + pp->fn = fn; + pp->fn_err = fn_err; + pp->fn_exit = fn_exit; + + pp->nr_processes = 0; + pp->all_tasks_started = 0; + pp->foreground_child = 0; + pp->slots = xcalloc(n, sizeof(*pp->slots)); + pp->children = xcalloc(n, sizeof(*pp->children)); + pp->pfd = xcalloc(n, sizeof(*pp->pfd)); + pp->err = xcalloc(n, sizeof(*pp->err)); + strbuf_init(&pp->finished_children, 0); + + for (i = 0; i < n; i++) { + strbuf_init(&pp->err[i], 0); + pp->pfd[i].events = POLLIN; + pp->pfd[i].fd = -1; + } +} + +static void run_processes_parallel_cleanup(struct parallel_processes *pp) +{ + int i; + for (i = 0; i < pp->max_number_processes; i++) + strbuf_release(&pp->err[i]); + + free(pp->children); + free(pp->slots); + free(pp->pfd); + free(pp->err); + strbuf_release(&pp->finished_children); +} - /* prepare data for select call */ - FD_ZERO(&fdset); - maxfd = 0; - for (i = 0; i < n; i++) { - if (!slots[i]) - continue; - FD_SET(children[i].err, &fdset); - if (children[i].err > maxfd) - maxfd = children[i].err; +static void run_processes_parallel_start_new(struct parallel_processes *pp) +{ + int i; + /* Start new processes. */ + while (!pp->all_tasks_started + && pp->nr_processes < pp->max_number_processes) { + for (i = 0; i < pp->max_number_processes; i++) + if (!pp->slots[i]) + break; /* found an empty slot */ + if (i == pp->max_number_processes) + die("BUG: bookkeeping is hard"); + + if (pp->fn(pp->data, &pp->children[i], &pp->err[i])) { + pp->all_tasks_started = 1; + break; } - timeout.tv_sec = 0; - timeout.tv_usec = 500000; + if (start_command(&pp->children[i])) + pp->fn_err(pp->data, &pp->children[i], &pp->err[i]); - i = select(maxfd + 1, &fdset, NULL, NULL, &timeout); - if (i < 0) { - if (errno == EINTR) - /* A signal was caught; try again */ - continue; - else if (errno == ENOMEM) - die_errno("BUG: keeping track of fds is hard"); - else if (errno == EINVAL) - die_errno("BUG: invalid arguments to select"); - else if (errno == EBADF) - die_errno("BUG: keeping track of fds is hard"); - else - die_errno("Unknown error with select"); + unblock_fd(pp->children[i].err); + + pp->nr_processes++; + pp->slots[i] = 1; + pp->pfd[i].fd = pp->children[i].err; + } +} + +static int run_processes_parallel_buffer_stderr(struct parallel_processes *pp) +{ + int i; + i = poll(pp->pfd, pp->max_number_processes, 100); + if (i < 0) { + if (errno == EINTR) + /* A signal was caught; try again */ + return -1; + else { + run_processes_parallel_cleanup(pp); + die_errno("poll"); } + } - /* Buffer output from all pipes. */ - for (i = 0; i < n; i++) { - if (!slots[i]) - continue; - if (FD_ISSET(children[i].err, &fdset)) - strbuf_read_noblock(&err[i], children[i].err, 0); - if (child_in_foreground == i) { - fputs(err[i].buf, stderr); - strbuf_reset(&err[i]); - fflush(stderr); - } + /* Buffer output from all pipes. */ + for (i = 0; i < pp->max_number_processes; i++) { + if (!pp->slots[i]) + continue; + if (pp->pfd[i].revents & POLLIN) + strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0); + if (pp->foreground_child == i) { + fputs(pp->err[i].buf, stderr); + strbuf_reset(&pp->err[i]); } + } + return 0; +} - /* Collect finished child processes. */ - while (nr_processes > 0) { - pid = waitpid(-1, &wait_status, WNOHANG); - if (pid == 0) - /* no child finished */ - break; - - if (pid < 0) { - if (errno == EINTR) - break; /* just try again next time */ - if (errno == EINVAL || errno == ECHILD) - die_errno("wait"); - } else { - /* Find the finished child. */ - for (i = 0; i < n; i++) - if (slots[i] && pid == children[i].pid) - break; - if (i == n) - /* waitpid returned another process id which - * we are not waiting on, so ignore it*/ + +static void run_processes_parallel_collect_finished(struct parallel_processes *pp) +{ + int i = 0; + pid_t pid; + int wait_status, code; + int n = pp->max_number_processes; + /* Collect finished child processes. */ + while (pp->nr_processes > 0) { + pid = waitpid(-1, &wait_status, WNOHANG); + if (pid == 0) + return; /* no child finished */ + + if (pid < 0) { + if (errno == EINTR) + return; /* just try again next time */ + if (errno == EINVAL || errno == ECHILD) + die_errno("wait"); + } else { + /* Find the finished child. */ + for (i = 0; i < pp->max_number_processes; i++) + if (pp->slots[i] && pid == pp->children[i].pid) break; - } + if (i == pp->max_number_processes) + /* + * waitpid returned another process id + * which we are not waiting for. + */ + return; + } + strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0); - strbuf_read_noblock(&err[i], children[i].err, 0); - argv_array_clear(&children[i].args); - argv_array_clear(&children[i].env_array); + if (determine_return_value(wait_status, &code, &errno, + pp->children[i].argv[0]) < 0) + error("waitpid is confused (%s)", + pp->children[i].argv[0]); - slots[i] = 0; - nr_processes--; + pp->fn_exit(pp->data, &pp->children[i], code); - if (i != child_in_foreground) { - strbuf_addbuf(&finished_children, &err[i]); - strbuf_reset(&err[i]); - } else { - fputs(err[i].buf, stderr); - strbuf_reset(&err[i]); + argv_array_clear(&pp->children[i].args); + argv_array_clear(&pp->children[i].env_array); - /* Output all other finished child processes */ - fputs(finished_children.buf, stderr); - strbuf_reset(&finished_children); + pp->nr_processes--; + pp->slots[i] = 0; + pp->pfd[i].fd = -1; - /* - * Pick next process to output live. - * There can be no active process if n==1 - * NEEDSWORK: - * For now we pick it randomly by doing a round - * robin. Later we may want to pick the one with - * the most output or the longest or shortest - * running process time. - */ - for (i = 0; i < n; i++) - if (slots[(child_in_foreground + i) % n]) - break; - child_in_foreground = (child_in_foreground + i) % n; - fputs(err[child_in_foreground].buf, stderr); - strbuf_reset(&err[child_in_foreground]); - } + if (i != pp->foreground_child) { + strbuf_addbuf(&pp->finished_children, &pp->err[i]); + strbuf_reset(&pp->err[i]); + } else { + fputs(pp->err[i].buf, stderr); + strbuf_reset(&pp->err[i]); + + /* Output all other finished child processes */ + fputs(pp->finished_children.buf, stderr); + strbuf_reset(&pp->finished_children); + + /* + * Pick next process to output live. + * NEEDSWORK: + * For now we pick it randomly by doing a round + * robin. Later we may want to pick the one with + * the most output or the longest or shortest + * running process time. + */ + for (i = 0; i < n; i++) + if (pp->slots[(pp->foreground_child + i) % n]) + break; + pp->foreground_child = (pp->foreground_child + i) % n; + fputs(pp->err[pp->foreground_child].buf, stderr); + strbuf_reset(&pp->err[pp->foreground_child]); } } +} + +int run_processes_parallel(int n, void *data, + get_next_task fn, + handle_child_starting_failure fn_err, + handle_child_return_value fn_exit) +{ + struct parallel_processes pp; + run_processes_parallel_init(&pp, n, data, fn, fn_err, fn_exit); + + while (!pp.all_tasks_started || pp.nr_processes > 0) { + run_processes_parallel_start_new(&pp); + if (run_processes_parallel_buffer_stderr(&pp)) + continue; + run_processes_parallel_collect_finished(&pp); + } + run_processes_parallel_cleanup(&pp); + return 0; } diff --git a/run-command.h b/run-command.h index 8f53ad6..0487f71 100644 --- a/run-command.h +++ b/run-command.h @@ -120,32 +120,39 @@ int start_async(struct async *async); int finish_async(struct async *async); /** - * Return 0 if the next child is ready to run. - * This callback takes care to initialize the child process and preload the - * out and error channel. The preloading of these outpout channels is useful - * if you want to have a message printed directly before the output of the - * child process. + * This callback should initialize the child process and preload the + * error channel. The preloading of is useful if you want to have a message + * printed directly before the output of the child process. + * You MUST set stdout_to_stderr. * + * Return 0 if the next child is ready to run. * Return != 0 if there are no more tasks to be processed. */ typedef int (*get_next_task)(void *data, struct child_process *cp, struct strbuf *err); +typedef void (*handle_child_starting_failure)(void *data, + struct child_process *cp, + struct strbuf *err); + +typedef void (*handle_child_return_value)(void *data, + struct child_process *cp, + int result); + /** - * Runs up to n processes at the same time. Whenever a process can - * be started, the callback `get_next_task` is called to obtain the - * data fed to the child process. + * Runs up to n processes at the same time. Whenever a process can be + * started, the callback `get_next_task` is called to obtain the data + * fed to the child process. * * The children started via this function run in parallel and their output - * to both stdout and stderr is buffered, while one of the children will - * directly output to stdout/stderr. - * - * This leads to a problem with output from processes which put out to - * stdout/err alternatingly as the buffering will not be able to replay - * the + * to stderr is buffered, while one of the children will directly output + * to stderr. */ -int run_processes_async(int n, get_next_task fn, void *data); +int run_processes_parallel(int n, void *data, + get_next_task fn, + handle_child_starting_failure, + handle_child_return_value); #endif diff --git a/submodule.c b/submodule.c index 6d757c6..a0e06e8 100644 --- a/submodule.c +++ b/submodule.c @@ -623,17 +623,32 @@ struct submodule_parallel_fetch { const char *prefix; int command_line_option; int quiet; + int result; }; -#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL} +#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL, NULL, 0, 0, 0} int get_next_submodule(void *data, struct child_process *cp, struct strbuf *err); +void handle_submodule_fetch_start_err(void *data, struct child_process *cp, struct strbuf *err) +{ + struct submodule_parallel_fetch *spf = data; + spf->result = 1; +} + +void handle_submodule_fetch_finish( void *data, struct child_process *cp, int retvalue) +{ + struct submodule_parallel_fetch *spf = data; + + if (retvalue) + spf->result = 1; +} + int fetch_populated_submodules(const struct argv_array *options, const char *prefix, int command_line_option, int quiet, int max_parallel_jobs) { - int i, result = 0; + int i; struct submodule_parallel_fetch spf = SPF_INIT; spf.work_tree = get_git_work_tree(); spf.command_line_option = command_line_option; @@ -652,12 +667,15 @@ int fetch_populated_submodules(const struct argv_array *options, /* default value, "--submodule-prefix" and its value are added later */ calculate_changed_submodule_paths(); - run_processes_async(max_parallel_jobs, get_next_submodule, &spf); + run_processes_parallel(max_parallel_jobs, &spf, + get_next_submodule, + handle_submodule_fetch_start_err, + handle_submodule_fetch_finish); argv_array_clear(&spf.args); out: string_list_clear(&changed_submodule_paths, 1); - return result; + return spf.result; } int get_next_submodule(void *data, struct child_process *cp, diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh index 0970fb0..37c89b9 100755 --- a/t/t0061-run-command.sh +++ b/t/t0061-run-command.sh @@ -48,18 +48,22 @@ test_expect_success POSIXPERM,SANITY 'unreadable directory in PATH' ' ' cat >expect <<-EOF -Now running instance 0 -Hello World -Now running instance 1 -Hello World -Now running instance 2 -Hello World -Now running instance 3 -Hello World +preloaded output of a child +Hello +World +preloaded output of a child +Hello +World +preloaded output of a child +Hello +World +preloaded output of a child +Hello +World EOF test_expect_success 'run_command runs in parallel' ' - test-run-command run-command-async sh -c "echo Hello World >&2;" 2>actual && + test-run-command run-command-async sh -c "printf \"%s\n%s\n\" Hello World" 2>actual && test_cmp expect actual ' diff --git a/test-run-command.c b/test-run-command.c index 4817f6e..71fd3ca 100644 --- a/test-run-command.c +++ b/test-run-command.c @@ -16,9 +16,9 @@ #include <errno.h> static int number_callbacks; -int run_processes_async_next(void *data, - struct child_process *cp, - struct strbuf *err) +int parallel_next(void *data, + struct child_process *cp, + struct strbuf *err) { struct child_process *d = data; if (number_callbacks >= 4) @@ -28,7 +28,7 @@ int run_processes_async_next(void *data, cp->stdout_to_stderr = 1; cp->no_stdin = 1; cp->err = -1; - strbuf_addf(err, "Now running instance %d\n", number_callbacks); + strbuf_addf(err, "preloaded output of a child\n"); number_callbacks++; return 0; } @@ -51,7 +51,8 @@ int main(int argc, char **argv) exit(run_command(&proc)); if (!strcmp(argv[1], "run-command-async")) - exit(run_processes_async(4, run_processes_async_next, &proc)); + exit(run_processes_parallel(4, &proc, parallel_next, + NULL, NULL)); fprintf(stderr, "check usage\n"); return 1; -- 2.6.0.rc0.131.gf624c3d ^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCHv2] fetch: fetch submodules in parallel 2015-09-14 21:50 ` [PATCHv2] " Stefan Beller @ 2015-09-14 21:50 ` Stefan Beller 2015-09-14 22:06 ` [PATCHv2] " Junio C Hamano 1 sibling, 0 replies; 13+ messages in thread From: Stefan Beller @ 2015-09-14 21:50 UTC (permalink / raw) To: gitster Cc: peff, git, jrnieder, johannes.schindelin, Jens.Lehmann, vlovich, Stefan Beller If we run external commands in parallel we cannot pipe the output directly to the our stdout/err as it would mix up. So each process's output will flow through a pipe, which we buffer. One subprocess can be directly piped to out stdout/err for a low latency feedback to the user. Example: Let's assume we have 5 submodules A,B,C,D,E and each fetch takes a different amount of time as the different submodules vary in size, then the output of fetches in sequential order might look like this: time --> output: |---A---| |-B-| |----C-----------| |-D-| |-E-| When we schedule these submodules into maximal two parallel processes, a schedule and sample output over time may look like this: thread 1: |---A---| |-D-| |-E-| thread 2: |-B-| |----C-----------| output: |---A---|B|------C-------|DE So A will be perceived as it would run normally in the single child version. As B has finished by the time A is done, we can dump its whole progress buffer on stderr, such that it looks like it finished in no time. Once that is done, C is determined to be the visible child and its progress will be reported in real time. So this way of output is really good for human consumption, as it only changes the timing, not the actual output. For machine consumption the output needs to be prepared in the tasks, by either having a prefix per line or per block to indicate whose tasks output is displayed. Signed-off-by: Stefan Beller <sbeller@google.com> --- Documentation/fetch-options.txt | 7 + builtin/fetch.c | 6 +- builtin/pull.c | 6 + run-command.c | 278 ++++++++++++++++++++++++++++++++++++---- run-command.h | 36 ++++++ strbuf.c | 31 +++++ strbuf.h | 1 + submodule.c | 119 ++++++++++++----- submodule.h | 2 +- t/t0061-run-command.sh | 20 +++ t/t5526-fetch-submodules.sh | 19 +++ test-run-command.c | 24 ++++ 12 files changed, 490 insertions(+), 59 deletions(-) diff --git a/Documentation/fetch-options.txt b/Documentation/fetch-options.txt index 45583d8..6b109f6 100644 --- a/Documentation/fetch-options.txt +++ b/Documentation/fetch-options.txt @@ -100,6 +100,13 @@ ifndef::git-pull[] reference to a commit that isn't already in the local submodule clone. +-j:: +--jobs=<n>:: + Number of parallel children to be used for fetching submodules. + Each will fetch from different submodules, such that fetching many + submodules will be faster. By default submodules will be fetched + one at a time. + --no-recurse-submodules:: Disable recursive fetching of submodules (this has the same effect as using the '--recurse-submodules=no' option). diff --git a/builtin/fetch.c b/builtin/fetch.c index ee1f1a9..f28eac6 100644 --- a/builtin/fetch.c +++ b/builtin/fetch.c @@ -37,6 +37,7 @@ static int prune = -1; /* unspecified */ static int all, append, dry_run, force, keep, multiple, update_head_ok, verbosity; static int progress = -1, recurse_submodules = RECURSE_SUBMODULES_DEFAULT; static int tags = TAGS_DEFAULT, unshallow, update_shallow; +static int max_children = 1; static const char *depth; static const char *upload_pack; static struct strbuf default_rla = STRBUF_INIT; @@ -99,6 +100,8 @@ static struct option builtin_fetch_options[] = { N_("fetch all tags and associated objects"), TAGS_SET), OPT_SET_INT('n', NULL, &tags, N_("do not fetch all tags (--no-tags)"), TAGS_UNSET), + OPT_INTEGER('j', "jobs", &max_children, + N_("number of submodules fetched in parallel")), OPT_BOOL('p', "prune", &prune, N_("prune remote-tracking branches no longer on remote")), { OPTION_CALLBACK, 0, "recurse-submodules", NULL, N_("on-demand"), @@ -1217,7 +1220,8 @@ int cmd_fetch(int argc, const char **argv, const char *prefix) result = fetch_populated_submodules(&options, submodule_prefix, recurse_submodules, - verbosity < 0); + verbosity < 0, + max_children); argv_array_clear(&options); } diff --git a/builtin/pull.c b/builtin/pull.c index 722a83c..f0af196 100644 --- a/builtin/pull.c +++ b/builtin/pull.c @@ -94,6 +94,7 @@ static int opt_force; static char *opt_tags; static char *opt_prune; static char *opt_recurse_submodules; +static char *max_children; static int opt_dry_run; static char *opt_keep; static char *opt_depth; @@ -177,6 +178,9 @@ static struct option pull_options[] = { N_("on-demand"), N_("control recursive fetching of submodules"), PARSE_OPT_OPTARG), + OPT_PASSTHRU('j', "jobs", &max_children, N_("n"), + N_("number of submodules pulled in parallel"), + PARSE_OPT_OPTARG), OPT_BOOL(0, "dry-run", &opt_dry_run, N_("dry run")), OPT_PASSTHRU('k', "keep", &opt_keep, NULL, @@ -524,6 +528,8 @@ static int run_fetch(const char *repo, const char **refspecs) argv_array_push(&args, opt_prune); if (opt_recurse_submodules) argv_array_push(&args, opt_recurse_submodules); + if (max_children) + argv_array_push(&args, max_children); if (opt_dry_run) argv_array_push(&args, "--dry-run"); if (opt_keep) diff --git a/run-command.c b/run-command.c index 28e1d55..6f6f9fb 100644 --- a/run-command.c +++ b/run-command.c @@ -232,6 +232,35 @@ static inline void set_cloexec(int fd) fcntl(fd, F_SETFD, flags | FD_CLOEXEC); } +static int determine_return_value(int wait_status, + int *result, + int *error_code, + const char *argv0) +{ + if (WIFSIGNALED(wait_status)) { + *result = WTERMSIG(wait_status); + if (*result != SIGINT && *result != SIGQUIT) + error("%s died of signal %d", argv0, *result); + /* + * This return value is chosen so that code & 0xff + * mimics the exit code that a POSIX shell would report for + * a program that died from this signal. + */ + *result += 128; + } else if (WIFEXITED(wait_status)) { + *result = WEXITSTATUS(wait_status); + /* + * Convert special exit code when execvp failed. + */ + if (*result == 127) { + *result = -1; + *error_code = ENOENT; + } + } else + return 1; + return 0; +} + static int wait_or_whine(pid_t pid, const char *argv0) { int status, code = -1; @@ -244,29 +273,10 @@ static int wait_or_whine(pid_t pid, const char *argv0) if (waiting < 0) { failed_errno = errno; error("waitpid for %s failed: %s", argv0, strerror(errno)); - } else if (waiting != pid) { - error("waitpid is confused (%s)", argv0); - } else if (WIFSIGNALED(status)) { - code = WTERMSIG(status); - if (code != SIGINT && code != SIGQUIT) - error("%s died of signal %d", argv0, code); - /* - * This return value is chosen so that code & 0xff - * mimics the exit code that a POSIX shell would report for - * a program that died from this signal. - */ - code += 128; - } else if (WIFEXITED(status)) { - code = WEXITSTATUS(status); - /* - * Convert special exit code when execvp failed. - */ - if (code == 127) { - code = -1; - failed_errno = ENOENT; - } } else { - error("waitpid is confused (%s)", argv0); + if (waiting != pid + || (determine_return_value(status, &code, &failed_errno, argv0) < 0)) + error("waitpid is confused (%s)", argv0); } clear_child_for_cleanup(pid); @@ -852,3 +862,227 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint) close(cmd->out); return finish_command(cmd); } + +static void unblock_fd(int fd) +{ + int flags = fcntl(fd, F_GETFL); + if (flags < 0) { + warning("Could not get file status flags, " + "output will be degraded"); + return; + } + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK)) { + warning("Could not set file status flags, " + "output will be degraded"); + return; + } +} + +struct parallel_processes { + int max_number_processes; + void *data; + get_next_task fn; + handle_child_starting_failure fn_err; + handle_child_return_value fn_exit; + + int nr_processes; + int all_tasks_started; + int foreground_child; + char *slots; + struct child_process *children; + struct pollfd *pfd; + struct strbuf *err; + struct strbuf finished_children; +}; + +static void run_processes_parallel_init(struct parallel_processes *pp, + int n, void *data, + get_next_task fn, + handle_child_starting_failure fn_err, + handle_child_return_value fn_exit) +{ + int i; + + pp->max_number_processes = n; + pp->data = data; + pp->fn = fn; + pp->fn_err = fn_err; + pp->fn_exit = fn_exit; + + pp->nr_processes = 0; + pp->all_tasks_started = 0; + pp->foreground_child = 0; + pp->slots = xcalloc(n, sizeof(*pp->slots)); + pp->children = xcalloc(n, sizeof(*pp->children)); + pp->pfd = xcalloc(n, sizeof(*pp->pfd)); + pp->err = xcalloc(n, sizeof(*pp->err)); + strbuf_init(&pp->finished_children, 0); + + for (i = 0; i < n; i++) { + strbuf_init(&pp->err[i], 0); + pp->pfd[i].events = POLLIN; + pp->pfd[i].fd = -1; + } +} + +static void run_processes_parallel_cleanup(struct parallel_processes *pp) +{ + int i; + for (i = 0; i < pp->max_number_processes; i++) + strbuf_release(&pp->err[i]); + + free(pp->children); + free(pp->slots); + free(pp->pfd); + free(pp->err); + strbuf_release(&pp->finished_children); +} + +static void run_processes_parallel_start_new(struct parallel_processes *pp) +{ + int i; + /* Start new processes. */ + while (!pp->all_tasks_started + && pp->nr_processes < pp->max_number_processes) { + for (i = 0; i < pp->max_number_processes; i++) + if (!pp->slots[i]) + break; /* found an empty slot */ + if (i == pp->max_number_processes) + die("BUG: bookkeeping is hard"); + + if (pp->fn(pp->data, &pp->children[i], &pp->err[i])) { + pp->all_tasks_started = 1; + break; + } + if (start_command(&pp->children[i])) + pp->fn_err(pp->data, &pp->children[i], &pp->err[i]); + + unblock_fd(pp->children[i].err); + + pp->nr_processes++; + pp->slots[i] = 1; + pp->pfd[i].fd = pp->children[i].err; + } +} + +static int run_processes_parallel_buffer_stderr(struct parallel_processes *pp) +{ + int i; + i = poll(pp->pfd, pp->max_number_processes, 100); + if (i < 0) { + if (errno == EINTR) + /* A signal was caught; try again */ + return -1; + else { + run_processes_parallel_cleanup(pp); + die_errno("poll"); + } + } + + /* Buffer output from all pipes. */ + for (i = 0; i < pp->max_number_processes; i++) { + if (!pp->slots[i]) + continue; + if (pp->pfd[i].revents & POLLIN) + strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0); + if (pp->foreground_child == i) { + fputs(pp->err[i].buf, stderr); + strbuf_reset(&pp->err[i]); + } + } + return 0; +} + + +static void run_processes_parallel_collect_finished(struct parallel_processes *pp) +{ + int i = 0; + pid_t pid; + int wait_status, code; + int n = pp->max_number_processes; + /* Collect finished child processes. */ + while (pp->nr_processes > 0) { + pid = waitpid(-1, &wait_status, WNOHANG); + if (pid == 0) + return; /* no child finished */ + + if (pid < 0) { + if (errno == EINTR) + return; /* just try again next time */ + if (errno == EINVAL || errno == ECHILD) + die_errno("wait"); + } else { + /* Find the finished child. */ + for (i = 0; i < pp->max_number_processes; i++) + if (pp->slots[i] && pid == pp->children[i].pid) + break; + if (i == pp->max_number_processes) + /* + * waitpid returned another process id + * which we are not waiting for. + */ + return; + } + strbuf_read_noblock(&pp->err[i], pp->children[i].err, 0); + + if (determine_return_value(wait_status, &code, &errno, + pp->children[i].argv[0]) < 0) + error("waitpid is confused (%s)", + pp->children[i].argv[0]); + + pp->fn_exit(pp->data, &pp->children[i], code); + + argv_array_clear(&pp->children[i].args); + argv_array_clear(&pp->children[i].env_array); + + pp->nr_processes--; + pp->slots[i] = 0; + pp->pfd[i].fd = -1; + + if (i != pp->foreground_child) { + strbuf_addbuf(&pp->finished_children, &pp->err[i]); + strbuf_reset(&pp->err[i]); + } else { + fputs(pp->err[i].buf, stderr); + strbuf_reset(&pp->err[i]); + + /* Output all other finished child processes */ + fputs(pp->finished_children.buf, stderr); + strbuf_reset(&pp->finished_children); + + /* + * Pick next process to output live. + * NEEDSWORK: + * For now we pick it randomly by doing a round + * robin. Later we may want to pick the one with + * the most output or the longest or shortest + * running process time. + */ + for (i = 0; i < n; i++) + if (pp->slots[(pp->foreground_child + i) % n]) + break; + pp->foreground_child = (pp->foreground_child + i) % n; + fputs(pp->err[pp->foreground_child].buf, stderr); + strbuf_reset(&pp->err[pp->foreground_child]); + } + } +} + +int run_processes_parallel(int n, void *data, + get_next_task fn, + handle_child_starting_failure fn_err, + handle_child_return_value fn_exit) +{ + struct parallel_processes pp; + run_processes_parallel_init(&pp, n, data, fn, fn_err, fn_exit); + + while (!pp.all_tasks_started || pp.nr_processes > 0) { + run_processes_parallel_start_new(&pp); + if (run_processes_parallel_buffer_stderr(&pp)) + continue; + run_processes_parallel_collect_finished(&pp); + } + run_processes_parallel_cleanup(&pp); + + return 0; +} diff --git a/run-command.h b/run-command.h index 5b4425a..0487f71 100644 --- a/run-command.h +++ b/run-command.h @@ -119,4 +119,40 @@ struct async { int start_async(struct async *async); int finish_async(struct async *async); +/** + * This callback should initialize the child process and preload the + * error channel. The preloading of is useful if you want to have a message + * printed directly before the output of the child process. + * You MUST set stdout_to_stderr. + * + * Return 0 if the next child is ready to run. + * Return != 0 if there are no more tasks to be processed. + */ +typedef int (*get_next_task)(void *data, + struct child_process *cp, + struct strbuf *err); + +typedef void (*handle_child_starting_failure)(void *data, + struct child_process *cp, + struct strbuf *err); + +typedef void (*handle_child_return_value)(void *data, + struct child_process *cp, + int result); + +/** + * Runs up to n processes at the same time. Whenever a process can be + * started, the callback `get_next_task` is called to obtain the data + * fed to the child process. + * + * The children started via this function run in parallel and their output + * to stderr is buffered, while one of the children will directly output + * to stderr. + */ + +int run_processes_parallel(int n, void *data, + get_next_task fn, + handle_child_starting_failure, + handle_child_return_value); + #endif diff --git a/strbuf.c b/strbuf.c index cce5eed..7f866c3 100644 --- a/strbuf.c +++ b/strbuf.c @@ -384,6 +384,37 @@ ssize_t strbuf_read(struct strbuf *sb, int fd, size_t hint) return sb->len - oldlen; } +ssize_t strbuf_read_noblock(struct strbuf *sb, int fd, size_t hint) +{ + size_t oldlen = sb->len; + size_t oldalloc = sb->alloc; + + strbuf_grow(sb, hint ? hint : 8192); + for (;;) { + ssize_t cnt; + + cnt = read(fd, sb->buf + sb->len, sb->alloc - sb->len - 1); + if (cnt < 0) { + if (errno == EINTR) + continue; + if (errno == EAGAIN) + break; + if (oldalloc == 0) + strbuf_release(sb); + else + strbuf_setlen(sb, oldlen); + return -1; + } + if (!cnt) + break; + sb->len += cnt; + strbuf_grow(sb, 8192); + } + + sb->buf[sb->len] = '\0'; + return sb->len - oldlen; +} + #define STRBUF_MAXLINK (2*PATH_MAX) int strbuf_readlink(struct strbuf *sb, const char *path, size_t hint) diff --git a/strbuf.h b/strbuf.h index aef2794..7ea462b 100644 --- a/strbuf.h +++ b/strbuf.h @@ -365,6 +365,7 @@ extern size_t strbuf_fread(struct strbuf *, size_t, FILE *); * any partial read is undone. */ extern ssize_t strbuf_read(struct strbuf *, int fd, size_t hint); +extern ssize_t strbuf_read_noblock(struct strbuf *, int fd, size_t hint); /** * Read the contents of a file, specified by its path. The third argument diff --git a/submodule.c b/submodule.c index 1d64e57..a0e06e8 100644 --- a/submodule.c +++ b/submodule.c @@ -12,6 +12,7 @@ #include "sha1-array.h" #include "argv-array.h" #include "blob.h" +#include "thread-utils.h" static int config_fetch_recurse_submodules = RECURSE_SUBMODULES_ON_DEMAND; static struct string_list changed_submodule_paths; @@ -615,37 +616,79 @@ static void calculate_changed_submodule_paths(void) initialized_fetch_ref_tips = 0; } +struct submodule_parallel_fetch { + int count; + struct argv_array args; + const char *work_tree; + const char *prefix; + int command_line_option; + int quiet; + int result; +}; +#define SPF_INIT {0, ARGV_ARRAY_INIT, NULL, NULL, 0, 0, 0} + +int get_next_submodule(void *data, struct child_process *cp, + struct strbuf *err); + +void handle_submodule_fetch_start_err(void *data, struct child_process *cp, struct strbuf *err) +{ + struct submodule_parallel_fetch *spf = data; + spf->result = 1; +} + +void handle_submodule_fetch_finish( void *data, struct child_process *cp, int retvalue) +{ + struct submodule_parallel_fetch *spf = data; + + if (retvalue) + spf->result = 1; +} + int fetch_populated_submodules(const struct argv_array *options, const char *prefix, int command_line_option, - int quiet) + int quiet, int max_parallel_jobs) { - int i, result = 0; - struct child_process cp = CHILD_PROCESS_INIT; - struct argv_array argv = ARGV_ARRAY_INIT; - const char *work_tree = get_git_work_tree(); - if (!work_tree) + int i; + struct submodule_parallel_fetch spf = SPF_INIT; + spf.work_tree = get_git_work_tree(); + spf.command_line_option = command_line_option; + spf.quiet = quiet; + spf.prefix = prefix; + if (!spf.work_tree) goto out; if (read_cache() < 0) die("index file corrupt"); - argv_array_push(&argv, "fetch"); + argv_array_push(&spf.args, "fetch"); for (i = 0; i < options->argc; i++) - argv_array_push(&argv, options->argv[i]); - argv_array_push(&argv, "--recurse-submodules-default"); + argv_array_push(&spf.args, options->argv[i]); + argv_array_push(&spf.args, "--recurse-submodules-default"); /* default value, "--submodule-prefix" and its value are added later */ - cp.env = local_repo_env; - cp.git_cmd = 1; - cp.no_stdin = 1; - calculate_changed_submodule_paths(); + run_processes_parallel(max_parallel_jobs, &spf, + get_next_submodule, + handle_submodule_fetch_start_err, + handle_submodule_fetch_finish); + + argv_array_clear(&spf.args); +out: + string_list_clear(&changed_submodule_paths, 1); + return spf.result; +} + +int get_next_submodule(void *data, struct child_process *cp, + struct strbuf *err) +{ + int ret = 0; + struct submodule_parallel_fetch *spf = data; - for (i = 0; i < active_nr; i++) { + for ( ; spf->count < active_nr; spf->count++) { struct strbuf submodule_path = STRBUF_INIT; struct strbuf submodule_git_dir = STRBUF_INIT; struct strbuf submodule_prefix = STRBUF_INIT; - const struct cache_entry *ce = active_cache[i]; + const struct cache_entry *ce = active_cache[spf->count]; const char *git_dir, *default_argv; const struct submodule *submodule; @@ -657,7 +700,7 @@ int fetch_populated_submodules(const struct argv_array *options, submodule = submodule_from_name(null_sha1, ce->name); default_argv = "yes"; - if (command_line_option == RECURSE_SUBMODULES_DEFAULT) { + if (spf->command_line_option == RECURSE_SUBMODULES_DEFAULT) { if (submodule && submodule->fetch_recurse != RECURSE_SUBMODULES_NONE) { @@ -680,40 +723,46 @@ int fetch_populated_submodules(const struct argv_array *options, default_argv = "on-demand"; } } - } else if (command_line_option == RECURSE_SUBMODULES_ON_DEMAND) { + } else if (spf->command_line_option == RECURSE_SUBMODULES_ON_DEMAND) { if (!unsorted_string_list_lookup(&changed_submodule_paths, ce->name)) continue; default_argv = "on-demand"; } - strbuf_addf(&submodule_path, "%s/%s", work_tree, ce->name); + strbuf_addf(&submodule_path, "%s/%s", spf->work_tree, ce->name); strbuf_addf(&submodule_git_dir, "%s/.git", submodule_path.buf); - strbuf_addf(&submodule_prefix, "%s%s/", prefix, ce->name); + strbuf_addf(&submodule_prefix, "%s%s/", spf->prefix, ce->name); git_dir = read_gitfile(submodule_git_dir.buf); if (!git_dir) git_dir = submodule_git_dir.buf; if (is_directory(git_dir)) { - if (!quiet) - fprintf(stderr, "Fetching submodule %s%s\n", prefix, ce->name); - cp.dir = submodule_path.buf; - argv_array_push(&argv, default_argv); - argv_array_push(&argv, "--submodule-prefix"); - argv_array_push(&argv, submodule_prefix.buf); - cp.argv = argv.argv; - if (run_command(&cp)) - result = 1; - argv_array_pop(&argv); - argv_array_pop(&argv); - argv_array_pop(&argv); + child_process_init(cp); + cp->dir = strbuf_detach(&submodule_path, NULL); + cp->git_cmd = 1; + cp->no_stdout = 1; + cp->no_stdin = 1; + cp->stdout_to_stderr = 1; + cp->err = -1; + cp->env = local_repo_env; + if (!spf->quiet) + strbuf_addf(err, "Fetching submodule %s%s\n", + spf->prefix, ce->name); + argv_array_init(&cp->args); + argv_array_pushv(&cp->args, spf->args.argv); + argv_array_push(&cp->args, default_argv); + argv_array_push(&cp->args, "--submodule-prefix"); + argv_array_push(&cp->args, submodule_prefix.buf); + ret = 1; } strbuf_release(&submodule_path); strbuf_release(&submodule_git_dir); strbuf_release(&submodule_prefix); + if (ret) { + spf->count++; + return 0; + } } - argv_array_clear(&argv); -out: - string_list_clear(&changed_submodule_paths, 1); - return result; + return 1; } unsigned is_submodule_modified(const char *path, int ignore_untracked) diff --git a/submodule.h b/submodule.h index 5507c3d..cbc0003 100644 --- a/submodule.h +++ b/submodule.h @@ -31,7 +31,7 @@ void set_config_fetch_recurse_submodules(int value); void check_for_new_submodule_commits(unsigned char new_sha1[20]); int fetch_populated_submodules(const struct argv_array *options, const char *prefix, int command_line_option, - int quiet); + int quiet, int max_parallel_jobs); unsigned is_submodule_modified(const char *path, int ignore_untracked); int submodule_uses_gitfile(const char *path); int ok_to_remove_submodule(const char *path); diff --git a/t/t0061-run-command.sh b/t/t0061-run-command.sh index 9acf628..37c89b9 100755 --- a/t/t0061-run-command.sh +++ b/t/t0061-run-command.sh @@ -47,4 +47,24 @@ test_expect_success POSIXPERM,SANITY 'unreadable directory in PATH' ' test_cmp expect actual ' +cat >expect <<-EOF +preloaded output of a child +Hello +World +preloaded output of a child +Hello +World +preloaded output of a child +Hello +World +preloaded output of a child +Hello +World +EOF + +test_expect_success 'run_command runs in parallel' ' + test-run-command run-command-async sh -c "printf \"%s\n%s\n\" Hello World" 2>actual && + test_cmp expect actual +' + test_done diff --git a/t/t5526-fetch-submodules.sh b/t/t5526-fetch-submodules.sh index 17759b1..1b4ce69 100755 --- a/t/t5526-fetch-submodules.sh +++ b/t/t5526-fetch-submodules.sh @@ -71,6 +71,16 @@ test_expect_success "fetch --recurse-submodules recurses into submodules" ' test_i18ncmp expect.err actual.err ' +test_expect_success "fetch --recurse-submodules -j2 has the same output behaviour" ' + add_upstream_commit && + ( + cd downstream && + git fetch --recurse-submodules -j2 2>../actual.err + ) && + test_must_be_empty actual.out && + test_i18ncmp expect.err actual.err +' + test_expect_success "fetch alone only fetches superproject" ' add_upstream_commit && ( @@ -140,6 +150,15 @@ test_expect_success "--quiet propagates to submodules" ' ! test -s actual.err ' +test_expect_success "--quiet propagates to parallel submodules" ' + ( + cd downstream && + git fetch --recurse-submodules -j 2 --quiet >../actual.out 2>../actual.err + ) && + ! test -s actual.out && + ! test -s actual.err +' + test_expect_success "--dry-run propagates to submodules" ' add_upstream_commit && ( diff --git a/test-run-command.c b/test-run-command.c index 89c7de2..71fd3ca 100644 --- a/test-run-command.c +++ b/test-run-command.c @@ -10,9 +10,29 @@ #include "git-compat-util.h" #include "run-command.h" +#include "argv-array.h" +#include "strbuf.h" #include <string.h> #include <errno.h> +static int number_callbacks; +int parallel_next(void *data, + struct child_process *cp, + struct strbuf *err) +{ + struct child_process *d = data; + if (number_callbacks >= 4) + return 1; + + argv_array_pushv(&cp->args, d->argv); + cp->stdout_to_stderr = 1; + cp->no_stdin = 1; + cp->err = -1; + strbuf_addf(err, "preloaded output of a child\n"); + number_callbacks++; + return 0; +} + int main(int argc, char **argv) { struct child_process proc = CHILD_PROCESS_INIT; @@ -30,6 +50,10 @@ int main(int argc, char **argv) if (!strcmp(argv[1], "run-command")) exit(run_command(&proc)); + if (!strcmp(argv[1], "run-command-async")) + exit(run_processes_parallel(4, &proc, parallel_next, + NULL, NULL)); + fprintf(stderr, "check usage\n"); return 1; } -- 2.6.0.rc0.131.gf624c3d ^ permalink raw reply related [flat|nested] 13+ messages in thread
* Re: [PATCHv2] fetch submodules in parallel 2015-09-14 21:50 ` [PATCHv2] " Stefan Beller 2015-09-14 21:50 ` [PATCHv2] fetch: " Stefan Beller @ 2015-09-14 22:06 ` Junio C Hamano 1 sibling, 0 replies; 13+ messages in thread From: Junio C Hamano @ 2015-09-14 22:06 UTC (permalink / raw) To: Stefan Beller Cc: peff, git, jrnieder, johannes.schindelin, Jens.Lehmann, vlovich Stefan Beller <sbeller@google.com> writes: > This replaces the last patch of the "Parallel git submodule fetching" > series. Changes: > > * have correct return code in submodule fetching when one submodule fails > * use poll instead of select now > * broke down into more smaller functions instead of one giant. > (I think it is an improvement, but I wouldn't be surprised if someone objects) > * closed memory leaks > * document the need for stdout_to_stderr > > I don't deem it RFC-ish any more but good to go. I didn't say this in the previous round because it smelled like an RFC, but for a real submission, 2/2 may be doing too many things at once. I suspect this is more or less "taste" thing, so I won't mind too much as long as the reviewers are OK with it. ^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [PATCH 2/2] fetch: fetch submodules in parallel 2015-09-14 17:17 ` Jeff King 2015-09-14 17:47 ` Stefan Beller 2015-09-14 17:55 ` Jonathan Nieder @ 2015-09-14 17:56 ` Junio C Hamano 2 siblings, 0 replies; 13+ messages in thread From: Junio C Hamano @ 2015-09-14 17:56 UTC (permalink / raw) To: Jeff King Cc: Stefan Beller, git@vger.kernel.org, Jonathan Nieder, Johannes Schindelin, Jens Lehmann, Vitali Lovich Jeff King <peff@peff.net> writes: > I don't think you need exact timing information. This is no different > than running the commands themselves, with stdout and stderr writing to > a pty that your terminal emulator will then read() from. If the program > produces intermingled stdout/stderr that clogs up the terminal, that is > its problem. > > The only difference is that we're going to save it and later replay it > all very quickly. So I think it would be sufficient just to retain the > original order. > >> I will add documentation explaining why the async output case >> will only deal with one channel. I chose stderr as that's already >> available and needed in this use case. > > I suspect you could just set child->stdout_to_stderr in this case, and > then you get your ordering for free. I think we are in agreement; that is exactly what I wanted to say when I said "I offhand do not think the latter [i.e. the callers have to dup them together] is unreasonable". Thanks for stating it more clearly and explicitly. > To handle multiple channels, I think you could just do a linked list of > buffers rather than a single strbuf. Like: > > struct io_chunk { > int channel; > char *buf; > size_t len; > struct io_chunk *next; > }; > > and just keep appending chunks to the list (and to dump them, just walk > the list, writing each to the appropriate channel descriptor). Perhaps, but let's not overdesign things before we have a concrete example codepath that benefits from such a thing. It is hard to detect a misdesign without a real usage pattern. ^ permalink raw reply [flat|nested] 13+ messages in thread
end of thread, other threads:[~2015-09-14 22:06 UTC | newest] Thread overview: 13+ messages (download: mbox.gz follow: Atom feed -- links below jump to the message on this page -- 2015-09-11 23:09 [RFC PATCHv1 0/2] Parallel git submodule fetching Stefan Beller 2015-09-11 23:09 ` [PATCH 1/2] Sending "Fetching submodule <foo>" output to stderr Stefan Beller 2015-09-11 23:09 ` [PATCH 2/2] fetch: fetch submodules in parallel Stefan Beller 2015-09-12 19:11 ` Junio C Hamano 2015-09-14 16:46 ` Stefan Beller 2015-09-14 17:17 ` Jeff King 2015-09-14 17:47 ` Stefan Beller 2015-09-14 17:55 ` Jonathan Nieder 2015-09-14 18:07 ` Jeff King 2015-09-14 21:50 ` [PATCHv2] " Stefan Beller 2015-09-14 21:50 ` [PATCHv2] fetch: " Stefan Beller 2015-09-14 22:06 ` [PATCHv2] " Junio C Hamano 2015-09-14 17:56 ` [PATCH 2/2] fetch: " Junio C Hamano
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox; as well as URLs for NNTP newsgroup(s).