From: Han Young <hanyang.tony@bytedance.com>
To: git@vger.kernel.org
Cc: Han Young <hanyang.tony@bytedance.com>
Subject: [RFC PATCH 3/4] parallel-checkout: add parallel_unlink
Date: Sun, 3 Dec 2023 21:39:10 +0800 [thread overview]
Message-ID: <20231203133911.41594-4-hanyoung@protonmail.com> (raw)
In-Reply-To: <20231203133911.41594-1-hanyoung@protonmail.com>
From: Han Young <hanyang.tony@bytedance.com>
Add parallel_unlink to parallel-checkout, parallel_unlink uses multiple threads to unlink entries. Because the path to be removed is sorted, each thread iterate through the entry list interleaved to distribute the workload as evenly as possible. Due to the multithread nature, it's not possible to remove all the dirs in one pass. The dir one thread is about to remove may have item that are being removed by another thread. Whenever we failed to remove the dir, we save it in a hashset. When every thread has finished its job, we remove all the entries in the hashset.
---
Note that we display progress after thread join, the progress count is updated for every thread instead of every path.
During testing, threads almost finished at around the same time. This caused the abrupt progress update.
We can use a mutex to display the progress, but that nullified the optimization on environment with fast file deletion time.
parallel-checkout.c | 80 +++++++++++++++++++++++++++++++++++++++++++++
parallel-checkout.h | 25 ++++++++++++++
2 files changed, 105 insertions(+)
diff --git a/parallel-checkout.c b/parallel-checkout.c
index b5a714c711..6e62e044d8 100644
--- a/parallel-checkout.c
+++ b/parallel-checkout.c
@@ -328,6 +328,24 @@ static int close_and_clear(int *fd)
return ret;
}
+void *parallel_unlink_proc(void *_data)
+{
+ struct parallel_unlink_data *data = _data;
+ struct cache_def cache = CACHE_DEF_INIT;
+ int i = data->start;
+ data->cnt = 0;
+
+ while (i < data->len) {
+ const struct cache_entry *ce = data->cache[i];
+ if (ce->ce_flags & CE_WT_REMOVE) {
+ ++data->cnt;
+ threaded_unlink_entry(ce, data->super_prefix, data->removal_cache, &cache);
+ }
+ i += data->step;
+ }
+ return &data->cnt;
+}
+
void write_pc_item(struct parallel_checkout_item *pc_item,
struct checkout *state)
{
@@ -678,3 +696,65 @@ int run_parallel_checkout(struct checkout *state, int num_workers, int threshold
finish_parallel_checkout();
return ret;
}
+
+unsigned run_parallel_unlink(struct index_state *index,
+ struct progress *progress,
+ const char *super_prefix, int num_workers, int threshold,
+ unsigned cnt)
+{
+ int i, use_parallel = 0, errs = 0;
+ if (num_workers > 1 && index->cache_nr >= threshold) {
+ int unlink_cnt = 0;
+ for (i = 0; i < index->cache_nr; i++) {
+ const struct cache_entry *ce = index->cache[i];
+ if (ce->ce_flags & CE_WT_REMOVE) {
+ unlink_cnt++;
+ }
+ }
+ if (unlink_cnt >= threshold) {
+ use_parallel = 1;
+ }
+ }
+ if (use_parallel) {
+ struct parallel_unlink_data *unlink_data;
+ CALLOC_ARRAY(unlink_data, num_workers);
+ threaded_init_remove_scheduled_dirs();
+ struct strbuf removal_caches[num_workers];
+ for (i = 0; i < num_workers; i++) {
+ struct parallel_unlink_data *data = &unlink_data[i];
+ strbuf_init(&removal_caches[i], 50);
+ data->start = i;
+ data->cache = index->cache;
+ data->len = index->cache_nr;
+ data->step = num_workers;
+ data->super_prefix = super_prefix;
+ data->removal_cache = &removal_caches[i];
+ errs = pthread_create(&data->pthread, NULL, parallel_unlink_proc, data);
+ if (errs)
+ die(_("unable to create parallel_checkout thread: %s"), strerror(errs));
+ }
+ for (i = 0; i < num_workers; i++) {
+ void *t_cnt;
+ if (pthread_join(unlink_data[i].pthread, &t_cnt))
+ die("unable to join parallel_unlink_thread");
+ cnt += *((unsigned *)t_cnt);
+ display_progress(progress, cnt);
+ }
+ threaded_remove_scheduled_dirs_clean_up();
+ for (i = 0; i < num_workers; i++) {
+ threaded_remove_scheduled_dirs(&removal_caches[i]);
+ }
+ remove_marked_cache_entries(index, 0);
+ } else {
+ for (i = 0; i < index->cache_nr; i++) {
+ const struct cache_entry *ce = index->cache[i];
+ if (ce->ce_flags & CE_WT_REMOVE) {
+ display_progress(progress, ++cnt);
+ unlink_entry(ce, super_prefix);
+ }
+ }
+ remove_marked_cache_entries(index, 0);
+ remove_scheduled_dirs();
+ }
+ return cnt;
+}
diff --git a/parallel-checkout.h b/parallel-checkout.h
index c575284005..e851b773d9 100644
--- a/parallel-checkout.h
+++ b/parallel-checkout.h
@@ -43,6 +43,18 @@ size_t pc_queue_size(void);
int run_parallel_checkout(struct checkout *state, int num_workers, int threshold,
struct progress *progress, unsigned int *progress_cnt);
+/*
+ * Unlink all the unlink entries in the index, returning the number of entries
+ * unlinked plus the origin value of cnt. If the number of entries
+ * to be removed is smaller than the specified threshold, the operation
+ * is performed sequentially.
+ */
+unsigned run_parallel_unlink(struct index_state *index,
+ struct progress *progress,
+ const char *super_prefix,
+ int num_workers, int threshold,
+ unsigned cnt);
+
/****************************************************************
* Interface with checkout--worker
****************************************************************/
@@ -76,6 +88,19 @@ struct parallel_checkout_item {
struct stat st;
};
+struct parallel_unlink_data {
+ pthread_t pthread;
+ struct cache_entry **cache;
+ struct strbuf *removal_cache;
+ size_t len;
+ int start;
+ size_t step;
+ unsigned cnt;
+ const char *super_prefix;
+};
+
+void *parallel_unlink_proc(void *_data);
+
/*
* The fixed-size portion of `struct parallel_checkout_item` that is sent to the
* workers. Following this will be 2 strings: ca.working_tree_encoding and
--
2.43.0
next prev parent reply other threads:[~2023-12-03 13:39 UTC|newest]
Thread overview: 5+ messages / expand[flat|nested] mbox.gz Atom feed top
2023-12-03 13:39 [RFC PATCH 0/4] add parallel unlink Han Young
2023-12-03 13:39 ` [RFC PATCH 1/4] symlinks: add and export threaded rmdir variants Han Young
2023-12-03 13:39 ` [RFC PATCH 2/4] entry: add threaded_unlink_entry function Han Young
2023-12-03 13:39 ` Han Young [this message]
2023-12-03 13:39 ` [RFC PATCH 4/4] unpack-trees: introduce parallel_unlink Han Young
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20231203133911.41594-4-hanyoung@protonmail.com \
--to=hanyang.tony@bytedance.com \
--cc=git@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox