All of lore.kernel.org
 help / color / mirror / Atom feed
From: "Nguyễn Thái Ngọc Duy" <pclouds@gmail.com>
To: git@vger.kernel.org
Cc: "Nguyễn Thái Ngọc Duy" <pclouds@gmail.com>
Subject: [PATCH 4/4] index-pack: support multithreaded delta resolving
Date: Sat, 25 Feb 2012 17:56:16 +0700	[thread overview]
Message-ID: <1330167376-24859-5-git-send-email-pclouds@gmail.com> (raw)
In-Reply-To: <1330167376-24859-1-git-send-email-pclouds@gmail.com>


Signed-off-by: Nguyễn Thái Ngọc Duy <pclouds@gmail.com>
---
 Documentation/config.txt         |    4 +
 Documentation/git-index-pack.txt |   10 ++
 Makefile                         |    2 +-
 builtin/index-pack.c             |  197 ++++++++++++++++++++++++++++++++------
 4 files changed, 182 insertions(+), 31 deletions(-)

diff --git a/Documentation/config.txt b/Documentation/config.txt
index e55dae1..965304b 100644
--- a/Documentation/config.txt
+++ b/Documentation/config.txt
@@ -445,6 +445,10 @@ for all users/operating systems, except on the largest projects.
 You probably do not need to adjust this value.
 +
 Common unit suffixes of 'k', 'm', or 'g' are supported.
++
+When gitlink:git-index-pack[1] runs on more than one thread, this
+value is applied per thread so the total amount of used memory depends
+on how many threads are used.
 
 core.bigFileThreshold::
 	Files larger than this size are stored deflated, without
diff --git a/Documentation/git-index-pack.txt b/Documentation/git-index-pack.txt
index 909687f..7e5f61b 100644
--- a/Documentation/git-index-pack.txt
+++ b/Documentation/git-index-pack.txt
@@ -74,6 +74,16 @@ OPTIONS
 --strict::
 	Die, if the pack contains broken objects or links.
 
+--threads=<n>::
+	Specifies the number of threads to spawn when resolving
+	deltas. This requires that index-pack be compiled with
+	pthreads otherwise this option is ignored with a warning.
+	This is meant to reduce packing time on multiprocessor
+	machines. The required amount of memory for the delta search
+	window is however multiplied by the number of threads.
+	Specifying 0 will cause git to auto-detect the number of CPU's
+	and set the number of threads accordingly.
+
 
 Note
 ----
diff --git a/Makefile b/Makefile
index 1fb1705..5fae875 100644
--- a/Makefile
+++ b/Makefile
@@ -2159,7 +2159,7 @@ builtin/branch.o builtin/checkout.o builtin/clone.o builtin/reset.o branch.o tra
 builtin/bundle.o bundle.o transport.o: bundle.h
 builtin/bisect--helper.o builtin/rev-list.o bisect.o: bisect.h
 builtin/clone.o builtin/fetch-pack.o transport.o: fetch-pack.h
-builtin/grep.o builtin/pack-objects.o transport-helper.o thread-utils.o: thread-utils.h
+builtin/index-pack.o builtin/grep.o builtin/pack-objects.o transport-helper.o thread-utils.o: thread-utils.h
 builtin/send-pack.o transport.o: send-pack.h
 builtin/log.o builtin/shortlog.o: shortlog.h
 builtin/prune.o builtin/reflog.o reachable.o: reachable.h
diff --git a/builtin/index-pack.c b/builtin/index-pack.c
index e1e858a..120195a 100644
--- a/builtin/index-pack.c
+++ b/builtin/index-pack.c
@@ -9,6 +9,7 @@
 #include "progress.h"
 #include "fsck.h"
 #include "exec_cmd.h"
+#include "thread-utils.h"
 
 static const char index_pack_usage[] =
 "git index-pack [-v] [-o <index-file>] [--keep | --keep=<msg>] [--verify] [--strict] (<pack-file> | --stdin [--fix-thin] [<pack-file>])";
@@ -38,6 +39,15 @@ struct base_data {
 	int ofs_first, ofs_last;
 };
 
+struct thread_local {
+#ifndef NO_PTHREADS
+	pthread_t thread;
+#endif
+	struct base_data *base_cache;
+	size_t base_cache_used;
+	int nr_resolved_deltas;
+};
+
 /*
  * Even if sizeof(union delta_base) == 24 on 64-bit archs, we really want
  * to memcmp() only the first 20 bytes.
@@ -54,11 +64,12 @@ struct delta_entry {
 
 static struct object_entry *objects;
 static struct delta_entry *deltas;
-static struct base_data *base_cache;
-static size_t base_cache_used;
+static struct thread_local *thread_data;
 static int nr_objects;
+static int nr_processed;
 static int nr_deltas;
 static int nr_resolved_deltas;
+static int nr_threads;
 
 static int from_stdin;
 static int strict;
@@ -76,6 +87,42 @@ static git_SHA_CTX input_ctx;
 static uint32_t input_crc32;
 static int input_fd, output_fd, pack_fd;
 
+#ifndef NO_PTHREADS
+
+static pthread_mutex_t read_mutex;
+#define read_lock()		pthread_mutex_lock(&read_mutex)
+#define read_unlock()		pthread_mutex_unlock(&read_mutex)
+
+static pthread_mutex_t work_mutex;
+#define work_lock()		pthread_mutex_lock(&work_mutex)
+#define work_unlock()		pthread_mutex_unlock(&work_mutex)
+
+/*
+ * Mutex and conditional variable can't be statically-initialized on Windows.
+ */
+static void init_thread(void)
+{
+	init_recursive_mutex(&read_mutex);
+	pthread_mutex_init(&work_mutex, NULL);
+}
+
+static void cleanup_thread(void)
+{
+	pthread_mutex_destroy(&read_mutex);
+	pthread_mutex_destroy(&work_mutex);
+}
+
+#else
+
+#define read_lock()
+#define read_unlock()
+
+#define work_lock()
+#define work_unlock()
+
+#endif
+
+
 static int mark_link(struct object *obj, int type, void *data)
 {
 	if (!obj)
@@ -224,6 +271,18 @@ static NORETURN void bad_object(unsigned long offset, const char *format, ...)
 	die("pack has bad object at offset %lu: %s", offset, buf);
 }
 
+static struct thread_local *get_thread_data()
+{
+#ifndef NO_PTHREADS
+	int i;
+	pthread_t self = pthread_self();
+	for (i = 1; i < nr_threads; i++)
+		if (self == thread_data[i].thread)
+			return &thread_data[i];
+#endif
+	return &thread_data[0];
+}
+
 static struct base_data *alloc_base_data(void)
 {
 	struct base_data *base = xmalloc(sizeof(struct base_data));
@@ -238,15 +297,16 @@ static void free_base_data(struct base_data *c)
 	if (c->data) {
 		free(c->data);
 		c->data = NULL;
-		base_cache_used -= c->size;
+		get_thread_data()->base_cache_used -= c->size;
 	}
 }
 
 static void prune_base_data(struct base_data *retain)
 {
 	struct base_data *b;
-	for (b = base_cache;
-	     base_cache_used > delta_base_cache_limit && b;
+	struct thread_local *data = get_thread_data();
+	for (b = data->base_cache;
+	     data->base_cache_used > delta_base_cache_limit && b;
 	     b = b->child) {
 		if (b->data && b != retain)
 			free_base_data(b);
@@ -258,22 +318,23 @@ static void link_base_data(struct base_data *base, struct base_data *c)
 	if (base)
 		base->child = c;
 	else
-		base_cache = c;
+		get_thread_data()->base_cache = c;
 
 	c->base = base;
 	c->child = NULL;
 	if (c->data)
-		base_cache_used += c->size;
+		get_thread_data()->base_cache_used += c->size;
 	prune_base_data(c);
 }
 
 static void unlink_base_data(struct base_data *c)
 {
-	struct base_data *base = c->base;
+	struct base_data *base;
+	base = c->base;
 	if (base)
 		base->child = NULL;
 	else
-		base_cache = NULL;
+		get_thread_data()->base_cache = NULL;
 	free_base_data(c);
 }
 
@@ -503,19 +564,25 @@ static void sha1_object(const void *data, unsigned long size,
 {
 	if (data)
 		hash_sha1_file(data, size, typename(type), sha1);
-	if (data && (strict || !verify) && has_sha1_file(sha1)) {
-		void *has_data;
-		enum object_type has_type;
-		unsigned long has_size;
-		has_data = read_sha1_file(sha1, &has_type, &has_size);
-		if (!has_data)
-			die("cannot read existing object %s", sha1_to_hex(sha1));
-		if (size != has_size || type != has_type ||
-		    memcmp(data, has_data, size) != 0)
-			die("SHA1 COLLISION FOUND WITH %s !", sha1_to_hex(sha1));
-		free(has_data);
+	if (data && (strict || !verify)) {
+		read_lock();
+		if (has_sha1_file(sha1)) {
+			void *has_data;
+			enum object_type has_type;
+			unsigned long has_size;
+			has_data = read_sha1_file(sha1, &has_type, &has_size);
+			read_unlock();
+			if (!has_data)
+				die("cannot read existing object %s", sha1_to_hex(sha1));
+			if (size != has_size || type != has_type ||
+			    memcmp(data, has_data, size) != 0)
+				die("SHA1 COLLISION FOUND WITH %s !", sha1_to_hex(sha1));
+			free(has_data);
+		} else
+			read_unlock();
 	}
 	if (strict) {
+		read_lock();
 		if (type == OBJ_BLOB) {
 			struct blob *blob = lookup_blob(sha1);
 			if (blob)
@@ -549,6 +616,7 @@ static void sha1_object(const void *data, unsigned long size,
 			}
 			obj->flags |= FLAG_CHECKED;
 		}
+		read_unlock();
 	}
 }
 
@@ -589,7 +657,7 @@ static void *get_base_data(struct base_data *c)
 		if (!delta_nr) {
 			c->data = get_data_from_pack(obj);
 			c->size = obj->size;
-			base_cache_used += c->size;
+			get_thread_data()->base_cache_used += c->size;
 			prune_base_data(c);
 		}
 		for (; delta_nr > 0; delta_nr--) {
@@ -605,7 +673,7 @@ static void *get_base_data(struct base_data *c)
 			free(raw);
 			if (!c->data)
 				bad_object(obj->idx.offset, "failed to apply delta");
-			base_cache_used += c->size;
+			get_thread_data()->base_cache_used += c->size;
 			prune_base_data(c);
 		}
 		free(delta);
@@ -633,7 +701,7 @@ static void resolve_delta(struct object_entry *delta_obj,
 		bad_object(delta_obj->idx.offset, "failed to apply delta");
 	sha1_object(result->data, result->size, delta_obj->real_type,
 		    delta_obj->idx.sha1);
-	nr_resolved_deltas++;
+	get_thread_data()->nr_resolved_deltas++;
 }
 
 static struct base_data *find_unresolved_deltas_1(struct base_data *base,
@@ -745,7 +813,30 @@ static void second_pass(struct object_entry *obj)
 	base_obj->obj = obj;
 	base_obj->data = NULL;
 	find_unresolved_deltas(base_obj);
-	display_progress(progress, nr_resolved_deltas);
+}
+
+static void *threaded_second_pass(void *arg)
+{
+	struct thread_local *data = get_thread_data();
+	for (;;) {
+		int i;
+		work_lock();
+		nr_resolved_deltas += data->nr_resolved_deltas;
+		display_progress(progress, nr_resolved_deltas);
+		data->nr_resolved_deltas = 0;
+		while (nr_processed < nr_objects &&
+		       is_delta_type(objects[nr_processed].type))
+			nr_processed++;
+		if (nr_processed == nr_objects) {
+			work_unlock();
+			break;
+		}
+		i = nr_processed++;
+		work_unlock();
+
+		second_pass(&objects[i]);
+	}
+	return NULL;
 }
 
 /* Parse all objects and return the pack content SHA1 hash */
@@ -804,14 +895,26 @@ static void parse_pack_objects(unsigned char *sha1)
 
 	if (verbose)
 		progress = start_progress("Resolving deltas", nr_deltas);
-	for (i = 0; i < nr_objects; i++) {
-		struct object_entry *obj = &objects[i];
-
-		if (is_delta_type(obj->type))
-			continue;
 
-		second_pass(obj);
+	nr_processed = 0;
+#ifndef NO_PTHREADS
+	if (nr_threads > 1) {
+		init_thread();
+		for (i = 1; i < nr_threads; i++) {
+			int ret = pthread_create(&thread_data[i].thread, NULL,
+						 threaded_second_pass, NULL);
+			if (ret)
+				die("unable to create thread: %s", strerror(ret));
+		}
+		for (i = 1; i < nr_threads; i++) {
+			pthread_join(thread_data[i].thread, NULL);
+			thread_data[i].thread = 0;
+		}
+		cleanup_thread();
+		return;
 	}
+#endif
+	threaded_second_pass(thread_data);
 }
 
 static int write_compressed(struct sha1file *f, void *in, unsigned int size)
@@ -1017,6 +1120,17 @@ static int git_index_pack_config(const char *k, const char *v, void *cb)
 			die("bad pack.indexversion=%"PRIu32, opts->version);
 		return 0;
 	}
+	if (!strcmp(k, "pack.threads")) {
+		nr_threads = git_config_int(k, v);
+		if (nr_threads < 0)
+			die("invalid number of threads specified (%d)",
+			    nr_threads);
+#ifdef NO_PTHREADS
+		if (nr_threads != 1)
+			warning("no threads support, ignoring %s", k);
+#endif
+		return 0;
+	}
 	return git_default_config(k, v, cb);
 }
 
@@ -1175,6 +1289,16 @@ int cmd_index_pack(int argc, const char **argv, const char *prefix)
 				keep_msg = "";
 			} else if (!prefixcmp(arg, "--keep=")) {
 				keep_msg = arg + 7;
+			} else if (!prefixcmp(arg, "--threads=")) {
+				char *end;
+				nr_threads = strtoul(arg+10, &end, 0);
+				if (!arg[10] || *end || nr_threads < 0)
+					usage(index_pack_usage);
+#ifdef NO_PTHREADS
+				if (nr_threads != 1)
+					warning("no threads support, "
+						"ignoring %s", arg);
+#endif
 			} else if (!prefixcmp(arg, "--pack_header=")) {
 				struct pack_header *hdr;
 				char *c;
@@ -1246,6 +1370,19 @@ int cmd_index_pack(int argc, const char **argv, const char *prefix)
 	if (strict)
 		opts.flags |= WRITE_IDX_STRICT;
 
+#ifndef NO_PTHREADS
+	if (!nr_threads)
+		nr_threads = online_cpus();
+	/* reserve thread_data[0] for the main thread */
+	if (nr_threads > 1)
+		nr_threads++;
+#else
+	if (nr_threads != 1)
+		warning("no threads support, ignoring --threads");
+	nr_threads = 1;
+#endif
+	thread_data = xcalloc(nr_threads, sizeof(*thread_data));
+
 	curr_pack = open_pack_file(pack_name);
 	parse_pack_header();
 	objects = xcalloc(nr_objects + 1, sizeof(struct object_entry));
-- 
1.7.8.36.g69ee2

      parent reply	other threads:[~2012-02-25 10:54 UTC|newest]

Thread overview: 5+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2012-02-25 10:56 [PATCH 0/4] index-pack improvements Nguyễn Thái Ngọc Duy
2012-02-25 10:56 ` [PATCH 1/4] index-pack --verify: skip sha-1 collision test Nguyễn Thái Ngọc Duy
2012-02-25 10:56 ` [PATCH 2/4] index-pack: reduce memory usage when the pack has large blobs Nguyễn Thái Ngọc Duy
2012-02-25 10:56 ` [PATCH 3/4] index-pack: move second pass code into separate function Nguyễn Thái Ngọc Duy
2012-02-25 10:56 ` Nguyễn Thái Ngọc Duy [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1330167376-24859-5-git-send-email-pclouds@gmail.com \
    --to=pclouds@gmail.com \
    --cc=git@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.