All of lore.kernel.org
 help / color / mirror / Atom feed
From: Junio C Hamano <gitster@pobox.com>
To: Stefan Beller <sbeller@google.com>
Cc: git@vger.kernel.org, jrnieder@gmail.com,
	Johannes Sixt <j6t@kdbg.org>, Jeff King <peff@peff.net>,
	hvoigt@hvoigt.net, jens.lehmann@web.de
Subject: Re: [RFC PATCH 2/3] run-commands: add an async queue processor
Date: Fri, 21 Aug 2015 12:05:13 -0700	[thread overview]
Message-ID: <xmqqegiw5uom.fsf@gitster.dls.corp.google.com> (raw)
In-Reply-To: <1440121237-24576-2-git-send-email-sbeller@google.com> (Stefan Beller's message of "Thu, 20 Aug 2015 18:40:36 -0700")

Stefan Beller <sbeller@google.com> writes:

> This adds functionality to do work in parallel.
>
> The whole life cycle of such a thread pool would look like
>
>     struct task_queue * tq = create_task_queue(32); // no of threads
>     for (...)
>         add_task(tq, process_one_item_function, item); // non blocking
>     ...
>     int ret = finish_task_queue(tq); // blocks until all tasks are done
>     if (!tq)
>         die ("Not all items were be processed");
>
> The caller must take care of handling the output.
>
> Signed-off-by: Stefan Beller <sbeller@google.com>
> ---
>
> I sent this a while ago to the list, no comments on it :(

The primary reason I suspect is because you sent to a wrong set of
people.  Submodule folks have largely been working in the scripted
ones, and may not necessarily be the ones who are most familiar with
the run-command infrastructure.

"shortlog --no-merges" tells me that the obvious suspects are j6t
and peff.

> The core functionality stayed the same, but I hope to improved naming and
> location of the code.
>
> The WIP is only for the NO_PTHREADS case.

>  run-command.c | 212 ++++++++++++++++++++++++++++++++++++++++++++++++++++++----
>  run-command.h |  30 +++++++++
>  2 files changed, 230 insertions(+), 12 deletions(-)
>
> diff --git a/run-command.c b/run-command.c
> index 28e1d55..4029011 100644
> --- a/run-command.c
> +++ b/run-command.c
> @@ -4,6 +4,21 @@
>  #include "sigchain.h"
>  #include "argv-array.h"
>  
> +#ifdef NO_PTHREADS
> +
> +#else
> +
> +#include "thread-utils.h"
> +
> +#include <pthread.h>
> +#include <semaphore.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +
> +#endif
> +
> +#include "git-compat-util.h"
> +

This goes against the way we have been organizing the header files.

http://thread.gmane.org/gmane.comp.version-control.git/276241/focus=276265

> @@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void))
>  
>  #endif
>  
> +void setup_main_thread()

void setup_main_thread(void)

> @@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct strbuf *buf, size_t hint)
>  	close(cmd->out);
>  	return finish_command(cmd);
>  }
> +
> +#ifndef NO_PTHREADS
> +struct job_list {
> +	int (*fct)(struct task_queue *aq, void *task);
> +	void *task;
> +	struct job_list *next;
> +};
> +#endif
> +
> +struct task_queue {
> +#ifndef NO_PTHREADS
> +	/*
> +	 * To avoid deadlocks always aquire the semaphores with lowest priority

acquire.

> +	 * first, priorites are in descending order as listed.
> +	 *
> +	 * The `mutex` is a general purpose lock for modifying data in the async
> +	 * queue, such as adding a new task or adding a return value from
> +	 * an already run task.
> +	 *
> +	 * `workingcount` and `freecount` are opposing semaphores, the sum of
> +	 * their values should equal `max_threads` at any time while the `mutex`
> +	 * is available.
> +	 */
> +	sem_t mutex;
> +	sem_t workingcount;
> +	sem_t freecount;
> +
> +	pthread_t *threads;
> +	unsigned max_threads;
> +
> +	struct job_list *first;
> +	struct job_list *last;
> +#endif
> +	int early_return;
> +};
> +
> +#ifndef NO_PTHREADS
> +
> +static void get_task(struct task_queue *aq,
> +		     int (**fct)(struct task_queue *aq, void *task),
> +		     void **task,
> +		     int *early_return)
> +{
> +	struct job_list *job;
> +
> +	sem_wait(&aq->workingcount);
> +	sem_wait(&aq->mutex);
> +
> +	if (!aq->first)
> +		die("BUG: internal error with dequeuing jobs for threads");
> +	job = aq->first;
> +	*fct = job->fct;
> +	*task = job->task;
> +	aq->early_return |= *early_return;
> +	*early_return = aq->early_return;
> +	aq->first = job->next;
> +	if (!aq->first)
> +		aq->last = NULL;
> +
> +	sem_post(&aq->freecount);
> +	sem_post(&aq->mutex);
> +
> +	free(job);
> +}
> +
> +static void* dispatcher(void *args)

static void *dispatcher(....)

> +{
> +	void *task;
> +	int (*fct)(struct task_queue *aq, void *data);

s/data/task/?

> +	int early_return = 0;
> +	struct task_queue *aq = args;
> +
> +	get_task(aq, &fct, &task, &early_return);
> +	while (fct || early_return != 0) {
> +		early_return = fct(aq, task);
> +		get_task(aq, &fct, &task, &early_return);
> +	}

If the func said "we are done, you may stop dispatching now", do you
still want to do another get_task()?

> +	pthread_exit(0);
> +}
> +#endif
> +
> +struct task_queue *create_task_queue(unsigned max_threads)
> +{
> +	struct task_queue *aq = xmalloc(sizeof(*aq));
> +
> +#ifndef NO_PTHREADS
> +	int i;
> +	if (!max_threads)
> +		aq->max_threads = online_cpus();
> +	else
> +		aq->max_threads = max_threads;
> +
> +	sem_init(&aq->mutex, 0, 1);
> +	sem_init(&aq->workingcount, 0, 0);
> +	sem_init(&aq->freecount, 0, aq->max_threads);
> +	aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t));
> +
> +	for (i = 0; i < aq->max_threads; i++)
> +		pthread_create(&aq->threads[i], 0, &dispatcher, aq);
> +
> +	aq->first = NULL;
> +	aq->last = NULL;


Shouldn't these be initialized before letting threads call into
dispatcher?  The workingcount semaphore that is initialized to 0 may
prevent them from peeking into these pointers and barfing, but still...

> +
> +	setup_main_thread();
> +#endif
> +	aq->early_return = 0;
> +
> +	return aq;
> +}

  reply	other threads:[~2015-08-21 19:05 UTC|newest]

Thread overview: 16+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-08-21  1:40 [PATCH 1/3] submodule: implement `module_clone` as a builtin helper Stefan Beller
2015-08-21  1:40 ` [RFC PATCH 2/3] run-commands: add an async queue processor Stefan Beller
2015-08-21 19:05   ` Junio C Hamano [this message]
2015-08-21 19:44     ` Jeff King
2015-08-21 19:48       ` Stefan Beller
2015-08-21 19:51         ` Jeff King
2015-08-21 20:12           ` Stefan Beller
2015-08-21 20:41       ` Junio C Hamano
2015-08-21 23:40       ` Stefan Beller
2015-08-24 21:22         ` Junio C Hamano
2015-08-21 19:45     ` Stefan Beller
2015-08-21 20:47       ` Junio C Hamano
2015-08-21 20:56         ` Stefan Beller
2015-08-21  1:40 ` [WIP/PATCH 3/3] submodule: helper to run foreach in parallel Stefan Beller
2015-08-21 19:23   ` Junio C Hamano
2015-08-21 20:21     ` Stefan Beller

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=xmqqegiw5uom.fsf@gitster.dls.corp.google.com \
    --to=gitster@pobox.com \
    --cc=git@vger.kernel.org \
    --cc=hvoigt@hvoigt.net \
    --cc=j6t@kdbg.org \
    --cc=jens.lehmann@web.de \
    --cc=jrnieder@gmail.com \
    --cc=peff@peff.net \
    --cc=sbeller@google.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.