git.vger.kernel.org archive mirror
 help / color / mirror / Atom feed
From: Stefan Beller <sbeller@google.com>
To: peff@peff.net
Cc: git@vger.kernel.org, jrnieder@gmail.com, gitster@pobox.com,
	Stefan Beller <sbeller@google.com>
Subject: [PATCH 4/5] index-pack: Use the new worker pool
Date: Tue, 25 Aug 2015 10:28:25 -0700	[thread overview]
Message-ID: <1440523706-23041-5-git-send-email-sbeller@google.com> (raw)
In-Reply-To: <1440523706-23041-1-git-send-email-sbeller@google.com>

By treating each object as its own task the workflow is easier to follow
as the function used in the worker threads doesn't need any control logic
any more.

Signed-off-by: Stefan Beller <sbeller@google.com>
---
 builtin/index-pack.c | 71 +++++++++++++++++++++++-----------------------------
 1 file changed, 32 insertions(+), 39 deletions(-)

diff --git a/builtin/index-pack.c b/builtin/index-pack.c
index 3f10840..826bd22 100644
--- a/builtin/index-pack.c
+++ b/builtin/index-pack.c
@@ -11,6 +11,7 @@
 #include "exec_cmd.h"
 #include "streaming.h"
 #include "thread-utils.h"
+#include "run-command.h"
 
 static const char index_pack_usage[] =
 "git index-pack [-v] [-o <index-file>] [--keep | --keep=<msg>] [--verify] [--strict] (<pack-file> | --stdin [--fix-thin] [<pack-file>])";
@@ -95,7 +96,6 @@ static const char *curr_pack;
 #ifndef NO_PTHREADS
 
 static struct thread_local *thread_data;
-static int nr_dispatched;
 static int threads_active;
 
 static pthread_mutex_t read_mutex;
@@ -106,10 +106,6 @@ static pthread_mutex_t counter_mutex;
 #define counter_lock()		lock_mutex(&counter_mutex)
 #define counter_unlock()	unlock_mutex(&counter_mutex)
 
-static pthread_mutex_t work_mutex;
-#define work_lock()		lock_mutex(&work_mutex)
-#define work_unlock()		unlock_mutex(&work_mutex)
-
 static pthread_mutex_t deepest_delta_mutex;
 #define deepest_delta_lock()	lock_mutex(&deepest_delta_mutex)
 #define deepest_delta_unlock()	unlock_mutex(&deepest_delta_mutex)
@@ -140,7 +136,6 @@ static void init_thread(void)
 	int i;
 	init_recursive_mutex(&read_mutex);
 	pthread_mutex_init(&counter_mutex, NULL);
-	pthread_mutex_init(&work_mutex, NULL);
 	pthread_mutex_init(&type_cas_mutex, NULL);
 	if (show_stat)
 		pthread_mutex_init(&deepest_delta_mutex, NULL);
@@ -163,7 +158,6 @@ static void cleanup_thread(void)
 	threads_active = 0;
 	pthread_mutex_destroy(&read_mutex);
 	pthread_mutex_destroy(&counter_mutex);
-	pthread_mutex_destroy(&work_mutex);
 	pthread_mutex_destroy(&type_cas_mutex);
 	if (show_stat)
 		pthread_mutex_destroy(&deepest_delta_mutex);
@@ -181,9 +175,6 @@ static void cleanup_thread(void)
 #define counter_lock()
 #define counter_unlock()
 
-#define work_lock()
-#define work_unlock()
-
 #define deepest_delta_lock()
 #define deepest_delta_unlock()
 
@@ -1075,28 +1066,29 @@ static void resolve_base(struct object_entry *obj)
 }
 
 #ifndef NO_PTHREADS
-static void *threaded_second_pass(void *data)
+static int threaded_second_pass(struct task_queue *tq, void *data)
 {
-	set_thread_data(data);
-	for (;;) {
-		int i;
-		counter_lock();
-		display_progress(progress, nr_resolved_deltas);
-		counter_unlock();
-		work_lock();
-		while (nr_dispatched < nr_objects &&
-		       is_delta_type(objects[nr_dispatched].type))
-			nr_dispatched++;
-		if (nr_dispatched >= nr_objects) {
-			work_unlock();
-			break;
-		}
-		i = nr_dispatched++;
-		work_unlock();
+	if (!get_thread_data()) {
+		struct thread_local *t = xmalloc(sizeof(*t));
+		t->pack_fd = open(curr_pack, O_RDONLY);
+		if (t->pack_fd == -1)
+			die_errno(_("unable to open %s"), curr_pack);
 
-		resolve_base(&objects[i]);
+		set_thread_data(t);
 	}
-	return NULL;
+
+	resolve_base(data);
+
+	counter_lock();
+	display_progress(progress, nr_resolved_deltas);
+	counter_unlock();
+	return 0;
+}
+
+void cleanup_threaded_second_pass(struct task_queue *aq)
+{
+	struct thread_local *t = get_thread_data();
+	free(t);
 }
 #endif
 
@@ -1195,18 +1187,19 @@ static void resolve_deltas(void)
 					  nr_ref_deltas + nr_ofs_deltas);
 
 #ifndef NO_PTHREADS
-	nr_dispatched = 0;
 	if (nr_threads > 1 || getenv("GIT_FORCE_THREADS")) {
+		struct task_queue *tq;
+
 		init_thread();
-		for (i = 0; i < nr_threads; i++) {
-			int ret = pthread_create(&thread_data[i].thread, NULL,
-						 threaded_second_pass, thread_data + i);
-			if (ret)
-				die(_("unable to create thread: %s"),
-				    strerror(ret));
-		}
-		for (i = 0; i < nr_threads; i++)
-			pthread_join(thread_data[i].thread, NULL);
+		tq = create_task_queue(nr_threads);
+
+		for (i = 0; i < nr_objects; i++)
+			if (!is_delta_type(objects[i].type))
+				add_task(tq, threaded_second_pass, &objects[i]);
+
+		if (finish_task_queue(tq, cleanup_threaded_second_pass))
+			die("Not all threads have finished");
+
 		cleanup_thread();
 		return;
 	}
-- 
2.5.0.400.gff86faf

  parent reply	other threads:[~2015-08-25 17:28 UTC|newest]

Thread overview: 20+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2015-08-25 17:28 [RFC PATCH 0/5] Demonstrate new parallel threading API Stefan Beller
2015-08-25 17:28 ` [PATCH 1/5] FIXUP submodule: implement `module_clone` as a builtin helper Stefan Beller
2015-08-25 17:28 ` [PATCH 2/5] thread-utils: add a threaded task queue Stefan Beller
2015-08-25 17:28 ` [PATCH 3/5] submodule: helper to run foreach in parallel Stefan Beller
2015-08-25 21:09   ` Junio C Hamano
2015-08-25 21:42     ` Stefan Beller
2015-08-25 22:23       ` Junio C Hamano
2015-08-25 22:44         ` Junio C Hamano
2015-08-26 17:06   ` Jeff King
2015-08-26 17:21     ` Stefan Beller
2015-08-25 17:28 ` Stefan Beller [this message]
2015-08-25 19:03   ` [PATCH 4/5] index-pack: Use the new worker pool Jeff King
2015-08-25 19:23     ` Stefan Beller
2015-08-25 20:41     ` Junio C Hamano
2015-08-25 20:59       ` Stefan Beller
2015-08-25 21:12         ` Junio C Hamano
2015-08-25 22:39           ` Stefan Beller
2015-08-25 22:50             ` Junio C Hamano
2015-08-25 17:28 ` [PATCH 5/5] pack-objects: Use " Stefan Beller
  -- strict thread matches above, loose matches on Subject: below --
2015-08-27  0:52 [RFC PATCH 0/5] Progressing with `git submodule foreach_parallel` Stefan Beller
2015-08-27  0:52 ` [PATCH 4/5] index-pack: Use the new worker pool Stefan Beller

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1440523706-23041-5-git-send-email-sbeller@google.com \
    --to=sbeller@google.com \
    --cc=git@vger.kernel.org \
    --cc=gitster@pobox.com \
    --cc=jrnieder@gmail.com \
    --cc=peff@peff.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).