All of lore.kernel.org
 help / color / mirror / Atom feed
From: Gao Xiang <xiang@kernel.org>
To: linux-erofs@lists.ozlabs.org
Cc: Gao Xiang <hsiangkao@linux.alibaba.com>,
	Yifan Zhao <zhaoyifan@sjtu.edu.cn>,
	Tong Xin <xin_tong@sjtu.edu.cn>
Subject: [PATCH v2 6/8] erofs-utils: mkfs: prepare inter-file multi-threaded compression
Date: Mon, 22 Apr 2024 08:34:48 +0800	[thread overview]
Message-ID: <20240422003450.19132-6-xiang@kernel.org> (raw)
In-Reply-To: <20240422003450.19132-1-xiang@kernel.org>

From: Yifan Zhao <zhaoyifan@sjtu.edu.cn>

This patch separates the compression process into two parts.

Specifically, erofs_begin_compressed_file() will trigger compression.
erofs_write_compressed_file() will wait for the compression finish and
write compressed (meta)data.

Note that it's possible that erofs_begin_compressed_file() and
erofs_write_compressed_file() run with different threads even the
global inode context is used, thus add another synchronization point.

Signed-off-by: Yifan Zhao <zhaoyifan@sjtu.edu.cn>
Co-authored-by: Tong Xin <xin_tong@sjtu.edu.cn>
Signed-off-by: Gao Xiang <hsiangkao@linux.alibaba.com>
---
 include/erofs/compress.h |   5 +-
 lib/compress.c           | 138 ++++++++++++++++++++++++++++-----------
 lib/inode.c              |  17 ++++-
 3 files changed, 118 insertions(+), 42 deletions(-)

diff --git a/include/erofs/compress.h b/include/erofs/compress.h
index 871db54..c9831a7 100644
--- a/include/erofs/compress.h
+++ b/include/erofs/compress.h
@@ -17,8 +17,11 @@ extern "C"
 #define EROFS_CONFIG_COMPR_MAX_SZ	(4000 * 1024)
 #define Z_EROFS_COMPR_QUEUE_SZ		(EROFS_CONFIG_COMPR_MAX_SZ * 2)
 
+struct z_erofs_compress_ictx;
+
 void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
-int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos);
+void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos);
+int erofs_write_compressed_file(struct z_erofs_compress_ictx *ictx);
 
 int z_erofs_compress_init(struct erofs_sb_info *sbi,
 			  struct erofs_buffer_head *bh);
diff --git a/lib/compress.c b/lib/compress.c
index 4ac4760..7fef698 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -109,6 +109,7 @@ struct erofs_compress_work {
 static struct {
 	struct erofs_workqueue wq;
 	struct erofs_compress_work *idle;
+	pthread_mutex_t mutex;
 } z_erofs_mt_ctrl;
 #endif
 
@@ -1312,11 +1313,13 @@ int z_erofs_mt_compress(struct z_erofs_compress_ictx *ictx)
 	pthread_cond_init(&ictx->cond, NULL);
 
 	for (i = 0; i < nsegs; i++) {
+		pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
 		cur = z_erofs_mt_ctrl.idle;
 		if (cur) {
 			z_erofs_mt_ctrl.idle = cur->next;
 			cur->next = NULL;
 		}
+		pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
 		if (!cur) {
 			cur = calloc(1, sizeof(*cur));
 			if (!cur)
@@ -1364,8 +1367,10 @@ int erofs_mt_write_compressed_file(struct z_erofs_compress_ictx *ictx)
 	pthread_mutex_unlock(&ictx->mutex);
 
 	bh = erofs_balloc(DATA, 0, 0, 0);
-	if (IS_ERR(bh))
-		return PTR_ERR(bh);
+	if (IS_ERR(bh)) {
+		ret = PTR_ERR(bh);
+		goto out;
+	}
 
 	DBG_BUGON(!head);
 	blkaddr = erofs_mapbh(bh->block);
@@ -1389,27 +1394,31 @@ int erofs_mt_write_compressed_file(struct z_erofs_compress_ictx *ictx)
 			blkaddr = cur->ctx.blkaddr;
 		}
 
+		pthread_mutex_lock(&z_erofs_mt_ctrl.mutex);
 		cur->next = z_erofs_mt_ctrl.idle;
 		z_erofs_mt_ctrl.idle = cur;
-	} while(head);
+		pthread_mutex_unlock(&z_erofs_mt_ctrl.mutex);
+	} while (head);
 
 	if (ret)
-		return ret;
-
-	return erofs_commit_compressed_file(ictx, bh,
+		goto out;
+	ret = erofs_commit_compressed_file(ictx, bh,
 			blkaddr - compressed_blocks, compressed_blocks);
+
+out:
+	close(ictx->fd);
+	free(ictx);
+	return ret;
 }
 #endif
 
-int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
+static struct z_erofs_compress_ictx g_ictx;
+
+void *erofs_begin_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 {
-	static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ];
-	struct erofs_buffer_head *bh;
-	static struct z_erofs_compress_ictx ctx;
-	static struct z_erofs_compress_sctx sctx;
-	erofs_blk_t blkaddr;
-	int ret;
 	struct erofs_sb_info *sbi = inode->sbi;
+	struct z_erofs_compress_ictx *ictx;
+	int ret;
 
 	/* initialize per-file compression setting */
 	inode->z_advise = 0;
@@ -1440,43 +1449,87 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 		}
 	}
 #endif
-	ctx.ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
-	inode->z_algorithmtype[0] = ctx.ccfg->algorithmtype;
-	inode->z_algorithmtype[1] = 0;
-
 	inode->idata_size = 0;
 	inode->fragment_size = 0;
 
+	if (z_erofs_mt_enabled) {
+		ictx = malloc(sizeof(*ictx));
+		if (!ictx)
+			return ERR_PTR(-ENOMEM);
+		ictx->fd = dup(fd);
+	} else {
+#ifdef EROFS_MT_ENABLED
+		pthread_mutex_lock(&g_ictx.mutex);
+		if (g_ictx.seg_num)
+			pthread_cond_wait(&g_ictx.cond, &g_ictx.mutex);
+		g_ictx.seg_num = 1;
+		pthread_mutex_unlock(&g_ictx.mutex);
+#endif
+		ictx = &g_ictx;
+		ictx->fd = fd;
+	}
+
+	ictx->ccfg = &erofs_ccfg[inode->z_algorithmtype[0]];
+	inode->z_algorithmtype[0] = ictx->ccfg->algorithmtype;
+	inode->z_algorithmtype[1] = 0;
+
 	/*
 	 * Handle tails in advance to avoid writing duplicated
 	 * parts into the packed inode.
 	 */
 	if (cfg.c_fragments && !erofs_is_packed_inode(inode)) {
-		ret = z_erofs_fragments_dedupe(inode, fd, &ctx.tof_chksum);
+		ret = z_erofs_fragments_dedupe(inode, fd, &ictx->tof_chksum);
 		if (ret < 0)
-			return ret;
+			goto err_free_ictx;
 	}
 
-	ctx.inode = inode;
-	ctx.fd = fd;
-	ctx.fpos = fpos;
-	init_list_head(&ctx.extents);
-	ctx.fix_dedupedfrag = false;
-	ctx.fragemitted = false;
+	ictx->inode = inode;
+	ictx->fpos = fpos;
+	init_list_head(&ictx->extents);
+	ictx->fix_dedupedfrag = false;
+	ictx->fragemitted = false;
 
 	if (cfg.c_all_fragments && !erofs_is_packed_inode(inode) &&
 	    !inode->fragment_size) {
-		ret = z_erofs_pack_file_from_fd(inode, fd, ctx.tof_chksum);
+		ret = z_erofs_pack_file_from_fd(inode, fd, ictx->tof_chksum);
 		if (ret)
 			goto err_free_idata;
+	}
 #ifdef EROFS_MT_ENABLED
-	} else if (z_erofs_mt_enabled) {
-		ret = z_erofs_mt_compress(&ctx);
+	if (ictx != &g_ictx) {
+		ret = z_erofs_mt_compress(ictx);
 		if (ret)
 			goto err_free_idata;
-		return erofs_mt_write_compressed_file(&ctx);
+	}
 #endif
+	return ictx;
+
+err_free_idata:
+	if (inode->idata) {
+		free(inode->idata);
+		inode->idata = NULL;
 	}
+err_free_ictx:
+	if (ictx != &g_ictx)
+		free(ictx);
+	return ERR_PTR(ret);
+}
+
+int erofs_write_compressed_file(struct z_erofs_compress_ictx *ictx)
+{
+	static u8 g_queue[Z_EROFS_COMPR_QUEUE_SZ];
+	struct erofs_buffer_head *bh;
+	static struct z_erofs_compress_sctx sctx;
+	struct erofs_compress_cfg *ccfg = ictx->ccfg;
+	struct erofs_inode *inode = ictx->inode;
+	erofs_blk_t blkaddr;
+	int ret;
+
+#ifdef EROFS_MT_ENABLED
+	if (ictx != &g_ictx)
+		return erofs_mt_write_compressed_file(ictx);
+#endif
+
 	/* allocate main data buffer */
 	bh = erofs_balloc(DATA, 0, 0, 0);
 	if (IS_ERR(bh)) {
@@ -1485,11 +1538,11 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 	}
 	blkaddr = erofs_mapbh(bh->block); /* start_blkaddr */
 
-	ctx.seg_num = 1;
+	ictx->seg_num = 1;
 	sctx = (struct z_erofs_compress_sctx) {
-		.ictx = &ctx,
+		.ictx = ictx,
 		.queue = g_queue,
-		.chandle = &ctx.ccfg->handle,
+		.chandle = &ccfg->handle,
 		.remaining = inode->i_size - inode->fragment_size,
 		.seg_idx = 0,
 		.pivot = &dummy_pivot,
@@ -1499,19 +1552,26 @@ int erofs_write_compressed_file(struct erofs_inode *inode, int fd, u64 fpos)
 
 	ret = z_erofs_compress_segment(&sctx, -1, blkaddr);
 	if (ret)
-		goto err_bdrop;
-	list_splice_tail(&sctx.extents, &ctx.extents);
+		goto err_free_idata;
 
-	return erofs_commit_compressed_file(&ctx, bh, blkaddr,
-					    sctx.blkaddr - blkaddr);
+	list_splice_tail(&sctx.extents, &ictx->extents);
+	ret = erofs_commit_compressed_file(ictx, bh, blkaddr,
+					   sctx.blkaddr - blkaddr);
+	goto out;
 
-err_bdrop:
-	erofs_bdrop(bh, true);	/* revoke buffer */
 err_free_idata:
+	erofs_bdrop(bh, true);	/* revoke buffer */
 	if (inode->idata) {
 		free(inode->idata);
 		inode->idata = NULL;
 	}
+out:
+#ifdef EROFS_MT_ENABLED
+	pthread_mutex_lock(&ictx->mutex);
+	ictx->seg_num = 0;
+	pthread_cond_signal(&ictx->cond);
+	pthread_mutex_unlock(&ictx->mutex);
+#endif
 	return ret;
 }
 
@@ -1666,6 +1726,8 @@ int z_erofs_compress_init(struct erofs_sb_info *sbi, struct erofs_buffer_head *s
 					    z_erofs_mt_wq_tls_free);
 		z_erofs_mt_enabled = !ret;
 	}
+	pthread_mutex_init(&g_ictx.mutex, NULL);
+	pthread_cond_init(&g_ictx.cond, NULL);
 #endif
 	return 0;
 }
diff --git a/lib/inode.c b/lib/inode.c
index 1ff05e1..0d044f4 100644
--- a/lib/inode.c
+++ b/lib/inode.c
@@ -499,10 +499,15 @@ int erofs_write_file(struct erofs_inode *inode, int fd, u64 fpos)
 	DBG_BUGON(!inode->i_size);
 
 	if (cfg.c_compr_opts[0].alg && erofs_file_is_compressible(inode)) {
+		void *ictx;
 		int ret;
 
-		ret = erofs_write_compressed_file(inode, fd, fpos);
-		if (!ret || ret != -ENOSPC)
+		ictx = erofs_begin_compressed_file(inode, fd, fpos);
+		if (IS_ERR(ictx))
+			return PTR_ERR(ictx);
+
+		ret = erofs_write_compressed_file(ictx);
+		if (ret != -ENOSPC)
 			return ret;
 
 		if (lseek(fd, fpos, SEEK_SET) < 0)
@@ -1362,6 +1367,7 @@ struct erofs_inode *erofs_mkfs_build_special_from_fd(int fd, const char *name)
 {
 	struct stat st;
 	struct erofs_inode *inode;
+	void *ictx;
 	int ret;
 
 	ret = lseek(fd, 0, SEEK_SET);
@@ -1392,7 +1398,12 @@ struct erofs_inode *erofs_mkfs_build_special_from_fd(int fd, const char *name)
 		inode->nid = inode->sbi->packed_nid;
 	}
 
-	ret = erofs_write_compressed_file(inode, fd, 0);
+	ictx = erofs_begin_compressed_file(inode, fd, 0);
+	if (IS_ERR(ictx))
+		return ERR_CAST(ictx);
+
+	DBG_BUGON(!ictx);
+	ret = erofs_write_compressed_file(ictx);
 	if (ret == -ENOSPC) {
 		ret = lseek(fd, 0, SEEK_SET);
 		if (ret < 0)
-- 
2.30.2


  parent reply	other threads:[~2024-04-22  0:36 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2024-04-22  0:34 [PATCH v2 1/8] erofs-utils: use erofs_atomic_t for inode->i_count Gao Xiang
2024-04-22  0:34 ` [PATCH v2 2/8] erofs-utils: lib: prepare for later deferred work Gao Xiang
2024-04-22  0:34 ` [PATCH v2 3/8] erofs-utils: lib: split out erofs_commit_compressed_file() Gao Xiang
2024-04-22  0:34 ` [PATCH v2 4/8] erofs-utils: rearrange several fields for multi-threaded mkfs Gao Xiang
2024-04-22  0:34 ` [PATCH v2 5/8] erofs-utils: lib: split up z_erofs_mt_compress() Gao Xiang
2024-04-22  0:34 ` Gao Xiang [this message]
2024-04-22  0:34 ` [PATCH v2 7/8] erofs-utils: lib: introduce non-directory jobitem context Gao Xiang
2024-04-22  0:34 ` [PATCH v2 8/8] erofs-utils: mkfs: enable inter-file multi-threaded compression Gao Xiang

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20240422003450.19132-6-xiang@kernel.org \
    --to=xiang@kernel.org \
    --cc=hsiangkao@linux.alibaba.com \
    --cc=linux-erofs@lists.ozlabs.org \
    --cc=xin_tong@sjtu.edu.cn \
    --cc=zhaoyifan@sjtu.edu.cn \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.