* [PATCH 0/2] fsck.erofs: introduce multi-threaded decompression
@ 2026-05-23 0:37 Nithurshen
2026-05-23 0:37 ` [PATCH 1/2] fsck.erofs: introduce multi-threaded decompression with static batching Nithurshen
` (2 more replies)
0 siblings, 3 replies; 9+ messages in thread
From: Nithurshen @ 2026-05-23 0:37 UTC (permalink / raw)
To: linux-erofs; +Cc: hsiangkao, xiang, Nithurshen
Hi,
As part of my GSoC 2026 proposal to introduce Multi-Threaded
Decompression Support in fsck.erofs, I am submitting this two-patch
series which establishes the core workqueue offloading infrastructure.
Baseline profiling of fsck.erofs extracting LZ4HC 4K pclusters showed
the main thread bottlenecking on synchronous VFS writes while blocking
decompression tasks. This series decouples the compute payload into the
existing erofs_workqueue.
- Patch 1 introduces the baseline producer-consumer logic. To avoid
massive futex scheduling overhead on tiny 4K clusters, it implements
a batching context that groups sequential pclusters into a single
erofs_work unit. Buffer memory ownership is strictly delegated to
the workers using calloc() to prevent garbage-byte leaks.
- Patch 2 implements dynamic, algorithm-aware batching. Fast algorithms
(LZ4) are permitted to utilize the maximum batch size (32 pclusters)
to hide scheduling latency, whereas compute-heavy algorithms (LZMA)
trigger much smaller batches (8 pclusters) to prevent memory bloat
and keep the thread pool continuously fed.
The implementation has been verified to produce bit-perfect extractions
against heavily packed LZ4HC test images.
Nithurshen (2):
fsck.erofs: introduce multi-threaded decompression with static
batching
fsck.erofs: implement dynamic pcluster batching based on algorithm
complexity
fsck/main.c | 234 +++++++++++++++++----------------------
include/erofs/internal.h | 18 ++-
lib/data.c | 206 ++++++++++++++++++++++++----------
3 files changed, 268 insertions(+), 190 deletions(-)
--
2.52.0
^ permalink raw reply [flat|nested] 9+ messages in thread* [PATCH 1/2] fsck.erofs: introduce multi-threaded decompression with static batching 2026-05-23 0:37 [PATCH 0/2] fsck.erofs: introduce multi-threaded decompression Nithurshen @ 2026-05-23 0:37 ` Nithurshen 2026-06-07 1:50 ` Gao Xiang 2026-05-23 0:37 ` [PATCH 2/2] fsck.erofs: implement dynamic pcluster batching based on algorithm complexity Nithurshen 2026-06-08 5:07 ` [PATCH v2 0/2] fsck.erofs: add multi-threaded decompression Nithurshen 2 siblings, 1 reply; 9+ messages in thread From: Nithurshen @ 2026-05-23 0:37 UTC (permalink / raw) To: linux-erofs; +Cc: hsiangkao, xiang, Nithurshen Currently, fsck.erofs extracts files synchronously. When decompressing heavily packed images (like LZ4HC with 4K pclusters), the main thread spends the majority of its time blocked on a combination of synchronous vfs_write() syscalls and LZ4_decompress_safe(), bottlenecking overall extraction speed. This patch introduces a scalable, multi-threaded decompression framework using the existing erofs_workqueue infrastructure to decouple compute from the main thread's I/O. To prevent massive scheduling overhead (futex contention) where worker threads spend more CPU time waking up than actually decompressing small 4KB clusters, this implementation introduces a batching context. The main thread collects an array of sequential pclusters (temporarily hard- capped at Z_EROFS_PCLUSTER_BATCH_SIZE = 32) before submitting a single erofs_work unit. Key details of this implementation: - The worker pool is dynamically sized based on available system CPUs. - Decompression tasks take strict ownership of the raw and output buffers (safely tracking memory via a `free_out` flag) to prevent data races and memory leaks. - Output buffers are explicitly zero-initialized via calloc() to prevent trailing garbage bytes from leaking into extracted files. - Tail-end packed fragments are processed synchronously by the main thread, as their minimal overhead does not benefit from asynchronous offloading. Signed-off-by: Nithurshen <nithurshen.dev@gmail.com> --- fsck/main.c | 234 +++++++++++++++++---------------------- include/erofs/internal.h | 18 ++- lib/data.c | 203 +++++++++++++++++++++++---------- 3 files changed, 265 insertions(+), 190 deletions(-) diff --git a/fsck/main.c b/fsck/main.c index 16cc627..d7810e8 100644 --- a/fsck/main.c +++ b/fsck/main.c @@ -8,14 +8,18 @@ #include <time.h> #include <utime.h> #include <unistd.h> +#include <pthread.h> #include <sys/stat.h> #include "erofs/print.h" #include "erofs/decompress.h" #include "erofs/dir.h" #include "erofs/xattr.h" +#include "erofs/workqueue.h" #include "../lib/compressor.h" #include "../lib/liberofs_compress.h" +extern struct erofs_workqueue erofs_wq; + static int erofsfsck_check_inode(erofs_nid_t pnid, erofs_nid_t nid); struct erofsfsck_dirstack { @@ -505,135 +509,95 @@ out: static int erofs_verify_inode_data(struct erofs_inode *inode, int outfd) { - struct erofs_map_blocks map = { - .buf = __EROFS_BUF_INITIALIZER, - }; - bool needdecode = fsckcfg.check_decomp && !erofs_is_packed_inode(inode); - int ret = 0; - bool compressed; - erofs_off_t pos = 0; - u64 pchunk_len = 0; - unsigned int raw_size = 0, buffer_size = 0; - char *raw = NULL, *buffer = NULL; - - erofs_dbg("verify data chunk of nid(%llu): type(%d)", - inode->nid | 0ULL, inode->datalayout); - - compressed = erofs_inode_is_data_compressed(inode->datalayout); - while (pos < inode->i_size) { - unsigned int alloc_rawsize; - - map.m_la = pos; - ret = erofs_map_blocks(inode, &map, EROFS_GET_BLOCKS_FIEMAP); - if (ret) - goto out; - - if (!compressed && map.m_llen != map.m_plen) { - erofs_err("broken chunk length m_la %" PRIu64 " m_llen %" PRIu64 " m_plen %" PRIu64, - map.m_la, map.m_llen, map.m_plen); - ret = -EFSCORRUPTED; - goto out; - } - - /* the last lcluster can be divided into 3 parts */ - if (map.m_la + map.m_llen > inode->i_size) - map.m_llen = inode->i_size - map.m_la; - - pchunk_len += map.m_plen; - pos += map.m_llen; - - /* should skip decomp? */ - if (map.m_la >= inode->i_size || !needdecode) - continue; + struct erofs_map_blocks map = { .buf = __EROFS_BUF_INITIALIZER }; + bool needdecode = fsckcfg.check_decomp && !erofs_is_packed_inode(inode); + int ret = 0; + bool compressed = erofs_inode_is_data_compressed(inode->datalayout); + erofs_off_t pos = 0; + u64 pchunk_len = 0; + + struct z_erofs_read_ctx ctx = { + .pending_tasks = 0, + .final_err = 0, + .outfd = outfd, + .free_out = true, + .current_task = NULL + }; + pthread_mutex_init(&ctx.lock, NULL); + pthread_cond_init(&ctx.cond, NULL); + + erofs_dbg("verify data chunk of nid(%llu): type(%d)", inode->nid | 0ULL, inode->datalayout); + + while (pos < inode->i_size) { + map.m_la = pos; + ret = erofs_map_blocks(inode, &map, EROFS_GET_BLOCKS_FIEMAP); + if (ret) goto out; + + if (map.m_la + map.m_llen > inode->i_size) + map.m_llen = inode->i_size - map.m_la; + + pchunk_len += map.m_plen; + pos += map.m_llen; + + if (map.m_la >= inode->i_size || !needdecode) + continue; + + if (outfd >= 0 && !(map.m_flags & EROFS_MAP_MAPPED)) { + ret = lseek(outfd, map.m_llen, SEEK_CUR); + if (ret < 0) { + ret = -errno; + goto out; + } + continue; + } + + if (compressed) { + char *raw = malloc(map.m_plen); + size_t buffer_size = map.m_llen > erofs_blksiz(inode->sbi) ? map.m_llen : erofs_blksiz(inode->sbi); + char *buffer = calloc(1, buffer_size); + + if (!raw || !buffer) { + free(raw); free(buffer); + ret = -ENOMEM; + goto out; + } + + ret = z_erofs_read_one_data(inode, &map, raw, buffer, 0, map.m_llen, false, map.m_la, &ctx); + if (ret) { + /* DO NOT free(raw) or free(buffer) here. z_erofs_read_one_data took ownership! */ + goto out; + } + } else { + char *raw = calloc(1, map.m_llen); + ret = erofs_read_one_data(inode, &map, raw, 0, map.m_llen); + if (ret >= 0 && outfd >= 0) + pwrite(outfd, raw, map.m_llen, map.m_la); + free(raw); + if (ret) goto out; + } + } + z_erofs_read_ctx_enqueue(&ctx); - if (outfd >= 0 && !(map.m_flags & EROFS_MAP_MAPPED)) { - ret = lseek(outfd, map.m_llen, SEEK_CUR); - if (ret < 0) { - ret = -errno; - goto out; - } - continue; - } - - if (map.m_plen > Z_EROFS_PCLUSTER_MAX_SIZE) { - if (compressed && !(map.m_flags & __EROFS_MAP_FRAGMENT)) { - erofs_err("invalid pcluster size %" PRIu64 " @ offset %" PRIu64 " of nid %" PRIu64, - map.m_plen, map.m_la, - inode->nid | 0ULL); - ret = -EFSCORRUPTED; - goto out; - } - alloc_rawsize = Z_EROFS_PCLUSTER_MAX_SIZE; - } else { - alloc_rawsize = map.m_plen; - } - - if (alloc_rawsize > raw_size) { - char *newraw = realloc(raw, alloc_rawsize); - - if (!newraw) { - ret = -ENOMEM; - goto out; - } - raw = newraw; - raw_size = alloc_rawsize; - } - - if (compressed) { - if (map.m_llen > buffer_size) { - char *newbuffer; - - buffer_size = map.m_llen; - newbuffer = realloc(buffer, buffer_size); - if (!newbuffer) { - ret = -ENOMEM; - goto out; - } - buffer = newbuffer; - } - ret = z_erofs_read_one_data(inode, &map, raw, buffer, - 0, map.m_llen, false); - if (ret) - goto out; - - if (outfd >= 0 && write(outfd, buffer, map.m_llen) < 0) - goto fail_eio; - } else { - u64 p = 0; - - do { - u64 count = min_t(u64, alloc_rawsize, - map.m_llen); - - ret = erofs_read_one_data(inode, &map, raw, p, count); - if (ret) - goto out; - - if (outfd >= 0 && write(outfd, raw, count) < 0) - goto fail_eio; - map.m_llen -= count; - p += count; - } while (map.m_llen); - } - } - - if (fsckcfg.print_comp_ratio) { - if (!erofs_is_packed_inode(inode)) - fsckcfg.logical_blocks += BLK_ROUND_UP(inode->sbi, inode->i_size); - fsckcfg.physical_blocks += BLK_ROUND_UP(inode->sbi, pchunk_len); - } out: - if (raw) - free(raw); - if (buffer) - free(buffer); - return ret < 0 ? ret : 0; - -fail_eio: - erofs_err("I/O error occurred when verifying data chunk @ nid %llu", - inode->nid | 0ULL); - ret = -EIO; - goto out; + pthread_mutex_lock(&ctx.lock); + while (ctx.pending_tasks > 0) + pthread_cond_wait(&ctx.cond, &ctx.lock); + if (ctx.final_err < 0 && ret >= 0) + ret = ctx.final_err; + pthread_mutex_unlock(&ctx.lock); + + if (fsckcfg.print_comp_ratio) { + if (!erofs_is_packed_inode(inode)) + fsckcfg.logical_blocks += BLK_ROUND_UP(inode->sbi, inode->i_size); + fsckcfg.physical_blocks += BLK_ROUND_UP(inode->sbi, pchunk_len); + } + + if (outfd >= 0 && ret >= 0) + ftruncate(outfd, inode->i_size); + + pthread_mutex_destroy(&ctx.lock); + pthread_cond_destroy(&ctx.cond); + return ret < 0 ? ret : 0; } static inline int erofs_extract_dir(struct erofs_inode *inode) @@ -1043,9 +1007,14 @@ int erofsfsck_fuzz_one(int argc, char *argv[]) int main(int argc, char *argv[]) #endif { - int err; + int err; + int workers; + + erofs_init_configure(); - erofs_init_configure(); + workers = sysconf(_SC_NPROCESSORS_ONLN); + if (workers < 1) workers = 1; + erofs_alloc_workqueue(&erofs_wq, workers, 256, NULL, NULL); fsckcfg.physical_blocks = 0; fsckcfg.logical_blocks = 0; @@ -1179,9 +1148,10 @@ exit_put_super: exit_dev_close: erofs_dev_close(&g_sbi); exit: - erofs_blob_closeall(&g_sbi); - erofs_exit_configure(); - return err ? 1 : 0; + erofs_blob_closeall(&g_sbi); + erofs_exit_configure(); + erofs_destroy_workqueue(&erofs_wq); + return err ? 1 : 0; } #ifdef FUZZING diff --git a/include/erofs/internal.h b/include/erofs/internal.h index 671880f..38020ee 100644 --- a/include/erofs/internal.h +++ b/include/erofs/internal.h @@ -62,6 +62,7 @@ struct erofs_buf { #define erofs_pos(sbi, nr) ((erofs_off_t)(nr) << (sbi)->blkszbits) #define BLK_ROUND_UP(sbi, addr) \ (roundup(addr, erofs_blksiz(sbi)) >> (sbi)->blkszbits) +#define Z_EROFS_PCLUSTER_BATCH_SIZE 32 struct erofs_buffer_head; struct erofs_bufmgr; @@ -442,6 +443,20 @@ struct z_erofs_paramset { char *extraopts; }; +struct z_erofs_decompress_task; + +struct z_erofs_read_ctx { + pthread_mutex_t lock; + pthread_cond_t cond; + int pending_tasks; + int final_err; + int outfd; + bool free_out; + struct z_erofs_decompress_task *current_task; +}; + +void z_erofs_read_ctx_enqueue(struct z_erofs_read_ctx *ctx); + int liberofs_global_init(void); void liberofs_global_exit(void); @@ -478,7 +493,8 @@ int erofs_read_one_data(struct erofs_inode *inode, struct erofs_map_blocks *map, char *buffer, u64 offset, size_t len); int z_erofs_read_one_data(struct erofs_inode *inode, struct erofs_map_blocks *map, char *raw, char *buffer, - erofs_off_t skip, erofs_off_t length, bool trimmed); + erofs_off_t skip, erofs_off_t length, bool trimmed, + erofs_off_t out_offset, struct z_erofs_read_ctx *ctx); void *erofs_read_metadata(struct erofs_sb_info *sbi, erofs_nid_t nid, erofs_off_t *offset, int *lengthp); int z_erofs_parse_cfgs(struct erofs_sb_info *sbi, struct erofs_super_block *dsb); diff --git a/lib/data.c b/lib/data.c index 6fd1389..fa36899 100644 --- a/lib/data.c +++ b/lib/data.c @@ -9,6 +9,64 @@ #include "erofs/trace.h" #include "erofs/decompress.h" #include "liberofs_fragments.h" +#include "erofs/workqueue.h" +#include <pthread.h> + +struct erofs_workqueue erofs_wq; + +struct z_erofs_decompress_task { + struct erofs_work work; + struct z_erofs_read_ctx *ctx; + struct z_erofs_decompress_req reqs[Z_EROFS_PCLUSTER_BATCH_SIZE]; + char *raw_bufs[Z_EROFS_PCLUSTER_BATCH_SIZE]; + char *out_bufs[Z_EROFS_PCLUSTER_BATCH_SIZE]; + erofs_off_t out_offsets[Z_EROFS_PCLUSTER_BATCH_SIZE]; + unsigned int out_lengths[Z_EROFS_PCLUSTER_BATCH_SIZE]; + unsigned int nr_reqs; +}; + +void z_erofs_read_ctx_enqueue(struct z_erofs_read_ctx *ctx) +{ + if (ctx && ctx->current_task) { + erofs_queue_work(&erofs_wq, &ctx->current_task->work); + ctx->current_task = NULL; + } +} + +static void z_erofs_decompress_worker(struct erofs_work *work, void *tlsp) +{ + struct z_erofs_decompress_task *task = (struct z_erofs_decompress_task *)work; + struct z_erofs_read_ctx *ctx = task->ctx; + int i, ret = 0, first_err = 0; + + for (i = 0; i < task->nr_reqs; ++i) { + ret = z_erofs_decompress(&task->reqs[i]); + + if (ret >= 0 && ctx && ctx->outfd >= 0) { + if (pwrite(ctx->outfd, task->out_bufs[i], + task->out_lengths[i], task->out_offsets[i]) < 0) + ret = -errno; + } + + if (ret < 0 && first_err == 0) + first_err = ret; + + free(task->raw_bufs[i]); + if (ctx && ctx->free_out) + free(task->out_bufs[i]); + } + + if (ctx) { + pthread_mutex_lock(&ctx->lock); + if (first_err < 0 && ctx->final_err == 0) + ctx->final_err = first_err; + ctx->pending_tasks--; + if (ctx->pending_tasks == 0) + pthread_cond_signal(&ctx->cond); + pthread_mutex_unlock(&ctx->lock); + } + free(task); +} void *erofs_bread(struct erofs_buf *buf, erofs_off_t offset, bool need_kmap) { @@ -277,7 +335,8 @@ static int erofs_read_raw_data(struct erofs_inode *inode, char *buffer, int z_erofs_read_one_data(struct erofs_inode *inode, struct erofs_map_blocks *map, char *raw, char *buffer, - erofs_off_t skip, erofs_off_t length, bool trimmed) + erofs_off_t skip, erofs_off_t length, bool trimmed, + erofs_off_t out_offset, struct z_erofs_read_ctx *ctx) { struct erofs_sb_info *sbi = inode->sbi; struct erofs_map_dev mdev; @@ -285,77 +344,101 @@ int z_erofs_read_one_data(struct erofs_inode *inode, if (map->m_flags & __EROFS_MAP_FRAGMENT) { if (__erofs_unlikely(inode->nid == sbi->packed_nid)) { - erofs_err("fragment should not exist in the packed inode %llu", - sbi->packed_nid | 0ULL); - return -EFSCORRUPTED; + ret = -EFSCORRUPTED; + goto err_out; } - return erofs_packedfile_read(sbi, buffer, length - skip, - inode->fragmentoff + skip); + ret = erofs_packedfile_read(sbi, buffer, length - skip, + inode->fragmentoff + skip); + + if (ret >= 0 && ctx && ctx->outfd >= 0) { + if (pwrite(ctx->outfd, buffer, length - skip, out_offset) < 0) + ret = -errno; + } + goto err_out; } - /* no device id here, thus it will always succeed */ - mdev = (struct erofs_map_dev) { - .m_pa = map->m_pa, - }; + mdev = (struct erofs_map_dev) { .m_pa = map->m_pa }; ret = erofs_map_dev(sbi, &mdev); - if (ret) { - DBG_BUGON(1); - return ret; - } + if (ret) goto err_out; ret = erofs_dev_read(sbi, mdev.m_deviceid, raw, mdev.m_pa, map->m_plen); - if (ret < 0) - return ret; + if (ret < 0) goto err_out; + + struct z_erofs_decompress_task *task = ctx->current_task; + if (!task) { + task = calloc(1, sizeof(*task)); + task->ctx = ctx; + task->work.fn = z_erofs_decompress_worker; + ctx->current_task = task; + + pthread_mutex_lock(&ctx->lock); + ctx->pending_tasks++; + pthread_mutex_unlock(&ctx->lock); + } - ret = z_erofs_decompress(&(struct z_erofs_decompress_req) { - .sbi = sbi, - .in = raw, - .out = buffer, - .decodedskip = skip, - .interlaced_offset = - map->m_algorithmformat == Z_EROFS_COMPRESSION_INTERLACED ? - erofs_blkoff(sbi, map->m_la) : 0, - .inputsize = map->m_plen, - .decodedlength = length, - .alg = map->m_algorithmformat, - .partial_decoding = trimmed ? true : - !(map->m_flags & EROFS_MAP_FULL_MAPPED) || - (map->m_flags & EROFS_MAP_PARTIAL_REF), - }); - if (ret < 0) - return ret; + int idx = task->nr_reqs++; + task->reqs[idx] = (struct z_erofs_decompress_req) { + .sbi = sbi, + .in = raw, + .out = buffer, + .decodedskip = skip, + .interlaced_offset = + map->m_algorithmformat == Z_EROFS_COMPRESSION_INTERLACED ? + erofs_blkoff(sbi, map->m_la) : 0, + .inputsize = map->m_plen, + .decodedlength = length, + .alg = map->m_algorithmformat, + .partial_decoding = trimmed ? true : + !(map->m_flags & EROFS_MAP_FULL_MAPPED) || + (map->m_flags & EROFS_MAP_PARTIAL_REF), + }; + task->raw_bufs[idx] = raw; + task->out_bufs[idx] = buffer; + task->out_offsets[idx] = out_offset; + task->out_lengths[idx] = length; + + if (task->nr_reqs == Z_EROFS_PCLUSTER_BATCH_SIZE) { + z_erofs_read_ctx_enqueue(ctx); + } return 0; + +err_out: + if (ctx && ctx->free_out) free(buffer); + free(raw); + return ret; } static int z_erofs_read_data(struct erofs_inode *inode, char *buffer, - erofs_off_t size, erofs_off_t offset) + erofs_off_t size, erofs_off_t offset) { erofs_off_t end, length, skip; struct erofs_map_blocks map = { .buf = __EROFS_BUF_INITIALIZER, }; bool trimmed; - unsigned int bufsize = 0; - char *raw = NULL; int ret = 0; + struct z_erofs_read_ctx ctx = { + .pending_tasks = 0, + .final_err = 0, + .outfd = -1, + .free_out = false, + .current_task = NULL + }; + pthread_mutex_init(&ctx.lock, NULL); + pthread_cond_init(&ctx.cond, NULL); + end = offset + size; while (end > offset) { map.m_la = end - 1; ret = z_erofs_map_blocks_iter(inode, &map, 0); - if (ret) - break; + if (ret) break; - /* - * trim to the needed size if the returned extent is quite - * larger than requested, and set up partial flag as well. - */ if (end < map.m_la + map.m_llen) { length = end - map.m_la; trimmed = true; } else { - DBG_BUGON(end != map.m_la + map.m_llen); length = map.m_llen; trimmed = false; } @@ -374,25 +457,31 @@ static int z_erofs_read_data(struct erofs_inode *inode, char *buffer, continue; } - if (map.m_plen > bufsize) { - char *newraw; - - bufsize = map.m_plen; - newraw = realloc(raw, bufsize); - if (!newraw) { - ret = -ENOMEM; - break; - } - raw = newraw; + char *raw = malloc(map.m_plen); + if (!raw) { + ret = -ENOMEM; + break; } ret = z_erofs_read_one_data(inode, &map, raw, - buffer + end - offset, skip, length, trimmed); - if (ret < 0) + buffer + end - offset, skip, length, trimmed, 0, &ctx); + if (ret < 0) { break; + } } - if (raw) - free(raw); + z_erofs_read_ctx_enqueue(&ctx); + + pthread_mutex_lock(&ctx.lock); + while (ctx.pending_tasks > 0) + pthread_cond_wait(&ctx.cond, &ctx.lock); + + if (ctx.final_err < 0 && ret == 0) + ret = ctx.final_err; + + pthread_mutex_unlock(&ctx.lock); + pthread_mutex_destroy(&ctx.lock); + pthread_cond_destroy(&ctx.cond); + return ret < 0 ? ret : 0; } -- 2.52.0 ^ permalink raw reply related [flat|nested] 9+ messages in thread
* Re: [PATCH 1/2] fsck.erofs: introduce multi-threaded decompression with static batching 2026-05-23 0:37 ` [PATCH 1/2] fsck.erofs: introduce multi-threaded decompression with static batching Nithurshen @ 2026-06-07 1:50 ` Gao Xiang 0 siblings, 0 replies; 9+ messages in thread From: Gao Xiang @ 2026-06-07 1:50 UTC (permalink / raw) To: Nithurshen; +Cc: linux-erofs, hsiangkao, xiang Hi Nithurshen, On Sat, May 23, 2026 at 06:07:56AM +0530, Nithurshen wrote: > Currently, fsck.erofs extracts files synchronously. When decompressing > heavily packed images (like LZ4HC with 4K pclusters), the main thread > spends the majority of its time blocked on a combination of synchronous > vfs_write() syscalls and LZ4_decompress_safe(), bottlenecking overall > extraction speed. > > This patch introduces a scalable, multi-threaded decompression framework > using the existing erofs_workqueue infrastructure to decouple compute > from the main thread's I/O. > > To prevent massive scheduling overhead (futex contention) where worker > threads spend more CPU time waking up than actually decompressing small > 4KB clusters, this implementation introduces a batching context. The > main thread collects an array of sequential pclusters (temporarily hard- > capped at Z_EROFS_PCLUSTER_BATCH_SIZE = 32) before submitting a single > erofs_work unit. > > Key details of this implementation: > - The worker pool is dynamically sized based on available system CPUs. > - Decompression tasks take strict ownership of the raw and output > buffers (safely tracking memory via a `free_out` flag) to prevent > data races and memory leaks. > - Output buffers are explicitly zero-initialized via calloc() to > prevent trailing garbage bytes from leaking into extracted files. > - Tail-end packed fragments are processed synchronously by the main > thread, as their minimal overhead does not benefit from asynchronous > offloading. > > Signed-off-by: Nithurshen <nithurshen.dev@gmail.com> > --- > fsck/main.c | 234 +++++++++++++++++---------------------- > include/erofs/internal.h | 18 ++- > lib/data.c | 203 +++++++++++++++++++++++---------- > 3 files changed, 265 insertions(+), 190 deletions(-) > > diff --git a/fsck/main.c b/fsck/main.c > index 16cc627..d7810e8 100644 > --- a/fsck/main.c > +++ b/fsck/main.c > @@ -8,14 +8,18 @@ > #include <time.h> > #include <utime.h> > #include <unistd.h> > +#include <pthread.h> > #include <sys/stat.h> > #include "erofs/print.h" > #include "erofs/decompress.h" > #include "erofs/dir.h" > #include "erofs/xattr.h" > +#include "erofs/workqueue.h" > #include "../lib/compressor.h" > #include "../lib/liberofs_compress.h" > > +extern struct erofs_workqueue erofs_wq; > + > static int erofsfsck_check_inode(erofs_nid_t pnid, erofs_nid_t nid); > > struct erofsfsck_dirstack { > @@ -505,135 +509,95 @@ out: > > static int erofs_verify_inode_data(struct erofs_inode *inode, int outfd) > { > - struct erofs_map_blocks map = { > - .buf = __EROFS_BUF_INITIALIZER, > - }; > - bool needdecode = fsckcfg.check_decomp && !erofs_is_packed_inode(inode); > - int ret = 0; > - bool compressed; > - erofs_off_t pos = 0; > - u64 pchunk_len = 0; > - unsigned int raw_size = 0, buffer_size = 0; > - char *raw = NULL, *buffer = NULL; > - > - erofs_dbg("verify data chunk of nid(%llu): type(%d)", > - inode->nid | 0ULL, inode->datalayout); > - > - compressed = erofs_inode_is_data_compressed(inode->datalayout); > - while (pos < inode->i_size) { > - unsigned int alloc_rawsize; > - > - map.m_la = pos; > - ret = erofs_map_blocks(inode, &map, EROFS_GET_BLOCKS_FIEMAP); > - if (ret) > - goto out; > - > - if (!compressed && map.m_llen != map.m_plen) { > - erofs_err("broken chunk length m_la %" PRIu64 " m_llen %" PRIu64 " m_plen %" PRIu64, > - map.m_la, map.m_llen, map.m_plen); > - ret = -EFSCORRUPTED; > - goto out; > - } > - > - /* the last lcluster can be divided into 3 parts */ > - if (map.m_la + map.m_llen > inode->i_size) > - map.m_llen = inode->i_size - map.m_la; > - > - pchunk_len += map.m_plen; > - pos += map.m_llen; > - > - /* should skip decomp? */ > - if (map.m_la >= inode->i_size || !needdecode) > - continue; > + struct erofs_map_blocks map = { .buf = __EROFS_BUF_INITIALIZER }; > + bool needdecode = fsckcfg.check_decomp && !erofs_is_packed_inode(inode); > + int ret = 0; > + bool compressed = erofs_inode_is_data_compressed(inode->datalayout); > + erofs_off_t pos = 0; > + u64 pchunk_len = 0; > + > + struct z_erofs_read_ctx ctx = { > + .pending_tasks = 0, > + .final_err = 0, > + .outfd = outfd, > + .free_out = true, > + .current_task = NULL > + }; > + pthread_mutex_init(&ctx.lock, NULL); > + pthread_cond_init(&ctx.cond, NULL); Please avoid barely used pthread interface. For example, erofs_mutex_t is needed for erofs-utils instead of pthread_mutex_t; pthread_cond_init needs to be replaced by an abstract too. Also, I may forget to mention that, your new implementation should be workable for both EROFS_MT_ENABLED and !EROFS_MT_ENABLED, not only EROFS_MT_ENABLED. > + > + erofs_dbg("verify data chunk of nid(%llu): type(%d)", inode->nid | 0ULL, inode->datalayout); > + > + while (pos < inode->i_size) { > + map.m_la = pos; > + ret = erofs_map_blocks(inode, &map, EROFS_GET_BLOCKS_FIEMAP); > + if (ret) goto out; > + > + if (map.m_la + map.m_llen > inode->i_size) > + map.m_llen = inode->i_size - map.m_la; > + > + pchunk_len += map.m_plen; > + pos += map.m_llen; > + > + if (map.m_la >= inode->i_size || !needdecode) > + continue; > + > + if (outfd >= 0 && !(map.m_flags & EROFS_MAP_MAPPED)) { > + ret = lseek(outfd, map.m_llen, SEEK_CUR); > + if (ret < 0) { > + ret = -errno; > + goto out; > + } > + continue; > + } > + > + if (compressed) { > + char *raw = malloc(map.m_plen); > + size_t buffer_size = map.m_llen > erofs_blksiz(inode->sbi) ? map.m_llen : erofs_blksiz(inode->sbi); > + char *buffer = calloc(1, buffer_size); > + > + if (!raw || !buffer) { > + free(raw); free(buffer); > + ret = -ENOMEM; > + goto out; > + } > + > + ret = z_erofs_read_one_data(inode, &map, raw, buffer, 0, map.m_llen, false, map.m_la, &ctx); > + if (ret) { > + /* DO NOT free(raw) or free(buffer) here. z_erofs_read_one_data took ownership! */ > + goto out; > + } > + } else { > + char *raw = calloc(1, map.m_llen); > + ret = erofs_read_one_data(inode, &map, raw, 0, map.m_llen); > + if (ret >= 0 && outfd >= 0) > + pwrite(outfd, raw, map.m_llen, map.m_la); > + free(raw); > + if (ret) goto out; > + } I think the file read should be multithreaded too, especially the inode could be large. Thanks, Gao Xiang ^ permalink raw reply [flat|nested] 9+ messages in thread
* [PATCH 2/2] fsck.erofs: implement dynamic pcluster batching based on algorithm complexity 2026-05-23 0:37 [PATCH 0/2] fsck.erofs: introduce multi-threaded decompression Nithurshen 2026-05-23 0:37 ` [PATCH 1/2] fsck.erofs: introduce multi-threaded decompression with static batching Nithurshen @ 2026-05-23 0:37 ` Nithurshen 2026-06-07 1:52 ` Gao Xiang 2026-06-08 5:07 ` [PATCH v2 0/2] fsck.erofs: add multi-threaded decompression Nithurshen 2 siblings, 1 reply; 9+ messages in thread From: Nithurshen @ 2026-05-23 0:37 UTC (permalink / raw) To: linux-erofs; +Cc: hsiangkao, xiang, Nithurshen While static batching successfully overlaps I/O and compute, different compression algorithms exhibit vastly different scheduling thresholds. Extremely fast algorithms like LZ4 require large batches (e.g., 32 pclusters) to effectively hide the synchronization overhead of the thread pool. Conversely, applying this large batch size to compute-heavy algorithms like LZMA or ZSTD causes memory bloat and thread starvation, as the main thread spends too much time reading and accumulating memory before waking up the background workers. This patch modifies the workqueue submission logic in z_erofs_read_one_data to dynamically scale the batch size based on the algorithm format. LZ4 is permitted to utilize the Z_EROFS_PCLUSTER_MAX_BATCH_SIZE, while other heavier algorithms trigger workqueue submission at a much lower threshold (8 pclusters) to ensure a steady pipeline of work and a bounded memory footprint. Signed-off-by: Nithurshen <nithurshen.dev@gmail.com> --- include/erofs/internal.h | 2 +- lib/data.c | 15 +++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/include/erofs/internal.h b/include/erofs/internal.h index 38020ee..c8f056f 100644 --- a/include/erofs/internal.h +++ b/include/erofs/internal.h @@ -62,7 +62,7 @@ struct erofs_buf { #define erofs_pos(sbi, nr) ((erofs_off_t)(nr) << (sbi)->blkszbits) #define BLK_ROUND_UP(sbi, addr) \ (roundup(addr, erofs_blksiz(sbi)) >> (sbi)->blkszbits) -#define Z_EROFS_PCLUSTER_BATCH_SIZE 32 +#define Z_EROFS_PCLUSTER_MAX_BATCH_SIZE 32 struct erofs_buffer_head; struct erofs_bufmgr; diff --git a/lib/data.c b/lib/data.c index fa36899..a06f4c2 100644 --- a/lib/data.c +++ b/lib/data.c @@ -17,11 +17,11 @@ struct erofs_workqueue erofs_wq; struct z_erofs_decompress_task { struct erofs_work work; struct z_erofs_read_ctx *ctx; - struct z_erofs_decompress_req reqs[Z_EROFS_PCLUSTER_BATCH_SIZE]; - char *raw_bufs[Z_EROFS_PCLUSTER_BATCH_SIZE]; - char *out_bufs[Z_EROFS_PCLUSTER_BATCH_SIZE]; - erofs_off_t out_offsets[Z_EROFS_PCLUSTER_BATCH_SIZE]; - unsigned int out_lengths[Z_EROFS_PCLUSTER_BATCH_SIZE]; + struct z_erofs_decompress_req reqs[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; + char *raw_bufs[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; + char *out_bufs[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; + erofs_off_t out_offsets[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; + unsigned int out_lengths[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; unsigned int nr_reqs; }; @@ -397,7 +397,10 @@ int z_erofs_read_one_data(struct erofs_inode *inode, task->out_offsets[idx] = out_offset; task->out_lengths[idx] = length; - if (task->nr_reqs == Z_EROFS_PCLUSTER_BATCH_SIZE) { + int batch_limit = (map->m_algorithmformat == Z_EROFS_COMPRESSION_LZ4) ? + Z_EROFS_PCLUSTER_MAX_BATCH_SIZE : 8; + + if (task->nr_reqs >= batch_limit) { z_erofs_read_ctx_enqueue(ctx); } return 0; -- 2.52.0 ^ permalink raw reply related [flat|nested] 9+ messages in thread
* Re: [PATCH 2/2] fsck.erofs: implement dynamic pcluster batching based on algorithm complexity 2026-05-23 0:37 ` [PATCH 2/2] fsck.erofs: implement dynamic pcluster batching based on algorithm complexity Nithurshen @ 2026-06-07 1:52 ` Gao Xiang 0 siblings, 0 replies; 9+ messages in thread From: Gao Xiang @ 2026-06-07 1:52 UTC (permalink / raw) To: Nithurshen; +Cc: linux-erofs, hsiangkao, xiang On Sat, May 23, 2026 at 06:07:57AM +0530, Nithurshen wrote: > While static batching successfully overlaps I/O and compute, different > compression algorithms exhibit vastly different scheduling thresholds. > Extremely fast algorithms like LZ4 require large batches (e.g., 32 > pclusters) to effectively hide the synchronization overhead of the > thread pool. > > Conversely, applying this large batch size to compute-heavy algorithms > like LZMA or ZSTD causes memory bloat and thread starvation, as the > main thread spends too much time reading and accumulating memory before > waking up the background workers. > > This patch modifies the workqueue submission logic in z_erofs_read_one_data > to dynamically scale the batch size based on the algorithm format. LZ4 > is permitted to utilize the Z_EROFS_PCLUSTER_MAX_BATCH_SIZE, while > other heavier algorithms trigger workqueue submission at a much lower > threshold (8 pclusters) to ensure a steady pipeline of work and a > bounded memory footprint. > > Signed-off-by: Nithurshen <nithurshen.dev@gmail.com> > --- > include/erofs/internal.h | 2 +- > lib/data.c | 15 +++++++++------ > 2 files changed, 10 insertions(+), 7 deletions(-) > > diff --git a/include/erofs/internal.h b/include/erofs/internal.h > index 38020ee..c8f056f 100644 > --- a/include/erofs/internal.h > +++ b/include/erofs/internal.h > @@ -62,7 +62,7 @@ struct erofs_buf { > #define erofs_pos(sbi, nr) ((erofs_off_t)(nr) << (sbi)->blkszbits) > #define BLK_ROUND_UP(sbi, addr) \ > (roundup(addr, erofs_blksiz(sbi)) >> (sbi)->blkszbits) > -#define Z_EROFS_PCLUSTER_BATCH_SIZE 32 > +#define Z_EROFS_PCLUSTER_MAX_BATCH_SIZE 32 > > struct erofs_buffer_head; > struct erofs_bufmgr; > diff --git a/lib/data.c b/lib/data.c > index fa36899..a06f4c2 100644 > --- a/lib/data.c > +++ b/lib/data.c > @@ -17,11 +17,11 @@ struct erofs_workqueue erofs_wq; > struct z_erofs_decompress_task { > struct erofs_work work; > struct z_erofs_read_ctx *ctx; > - struct z_erofs_decompress_req reqs[Z_EROFS_PCLUSTER_BATCH_SIZE]; > - char *raw_bufs[Z_EROFS_PCLUSTER_BATCH_SIZE]; > - char *out_bufs[Z_EROFS_PCLUSTER_BATCH_SIZE]; > - erofs_off_t out_offsets[Z_EROFS_PCLUSTER_BATCH_SIZE]; > - unsigned int out_lengths[Z_EROFS_PCLUSTER_BATCH_SIZE]; > + struct z_erofs_decompress_req reqs[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; > + char *raw_bufs[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; > + char *out_bufs[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; > + erofs_off_t out_offsets[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; > + unsigned int out_lengths[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; > unsigned int nr_reqs; > }; > > @@ -397,7 +397,10 @@ int z_erofs_read_one_data(struct erofs_inode *inode, > task->out_offsets[idx] = out_offset; > task->out_lengths[idx] = length; > > - if (task->nr_reqs == Z_EROFS_PCLUSTER_BATCH_SIZE) { > + int batch_limit = (map->m_algorithmformat == Z_EROFS_COMPRESSION_LZ4) ? > + Z_EROFS_PCLUSTER_MAX_BATCH_SIZE : 8; Why it's called dynamic decompression batching? > + > + if (task->nr_reqs >= batch_limit) { > z_erofs_read_ctx_enqueue(ctx); > } > return 0; > -- > 2.52.0 > ^ permalink raw reply [flat|nested] 9+ messages in thread
* [PATCH v2 0/2] fsck.erofs: add multi-threaded decompression 2026-05-23 0:37 [PATCH 0/2] fsck.erofs: introduce multi-threaded decompression Nithurshen 2026-05-23 0:37 ` [PATCH 1/2] fsck.erofs: introduce multi-threaded decompression with static batching Nithurshen 2026-05-23 0:37 ` [PATCH 2/2] fsck.erofs: implement dynamic pcluster batching based on algorithm complexity Nithurshen @ 2026-06-08 5:07 ` Nithurshen 2026-06-08 5:07 ` [PATCH v2 1/2] fsck.erofs: introduce multi-threaded decompression with static batching Nithurshen 2026-06-08 5:07 ` [PATCH v2 2/2] fsck.erofs: implement algorithm-aware pcluster batching Nithurshen 2 siblings, 2 replies; 9+ messages in thread From: Nithurshen @ 2026-06-08 5:07 UTC (permalink / raw) To: nithurshen.dev; +Cc: hsiangkao, linux-erofs, xiang Hi Xiang, Thank you for the review. I have addressed the feedback: 1. Replaced bare pthread calls with EROFS abstractions (erofs_mutex_t, erofs_cond_t) to ensure the code remains portable and works when !EROFS_MT_ENABLED. 2. Updated "dynamic" batching terminology to "algorithm-aware" batching for better clarity. This series introduces multi-threaded decompression to fsck.erofs to decouple I/O from decompression, significantly improving extraction throughput on multicore systems. Best regards, Nithurshen Nithurshen (2): fsck.erofs: introduce multi-threaded decompression with static batching fsck.erofs: implement algorithm-aware pcluster batching fsck/main.c | 239 ++++++++++++++++++--------------------- include/erofs/internal.h | 19 +++- include/erofs/lock.h | 22 ++++ lib/data.c | 210 ++++++++++++++++++++++++---------- 4 files changed, 301 insertions(+), 189 deletions(-) -- 2.52.0 ^ permalink raw reply [flat|nested] 9+ messages in thread
* [PATCH v2 1/2] fsck.erofs: introduce multi-threaded decompression with static batching 2026-06-08 5:07 ` [PATCH v2 0/2] fsck.erofs: add multi-threaded decompression Nithurshen @ 2026-06-08 5:07 ` Nithurshen 2026-06-08 6:25 ` Gao Xiang 2026-06-08 5:07 ` [PATCH v2 2/2] fsck.erofs: implement algorithm-aware pcluster batching Nithurshen 1 sibling, 1 reply; 9+ messages in thread From: Nithurshen @ 2026-06-08 5:07 UTC (permalink / raw) To: nithurshen.dev; +Cc: hsiangkao, linux-erofs, xiang Currently, fsck.erofs extracts files synchronously. When decompressing heavily packed images (like LZ4HC with 4K pclusters), the main thread spends the majority of its time blocked on a combination of synchronous vfs_write() syscalls and LZ4_decompress_safe(), bottlenecking overall extraction speed. This patch introduces a scalable, multi-threaded decompression framework using the existing erofs_workqueue infrastructure to decouple compute from the main thread's I/O. To prevent massive scheduling overhead (futex contention) where worker threads spend more CPU time waking up than actually decompressing small 4KB clusters, this implementation introduces a batching context. The main thread collects an array of sequential pclusters (temporarily hard- capped at Z_EROFS_PCLUSTER_BATCH_SIZE = 32) before submitting a single erofs_work unit. Key details of this implementation: - The worker pool is dynamically sized based on available system CPUs. - Decompression tasks take strict ownership of the raw and output buffers (safely tracking memory via a `free_out` flag) to prevent data races and memory leaks. - Output buffers are explicitly zero-initialized via calloc() to prevent trailing garbage bytes from leaking into extracted files. - Tail-end packed fragments are processed synchronously by the main thread, as their minimal overhead does not benefit from asynchronous offloading. Signed-off-by: Nithurshen <nithurshen.dev@gmail.com> --- fsck/main.c | 239 ++++++++++++++++++--------------------- include/erofs/internal.h | 19 +++- include/erofs/lock.h | 22 ++++ lib/data.c | 207 +++++++++++++++++++++++---------- 4 files changed, 298 insertions(+), 189 deletions(-) diff --git a/fsck/main.c b/fsck/main.c index 16cc627..de6ab4d 100644 --- a/fsck/main.c +++ b/fsck/main.c @@ -8,14 +8,18 @@ #include <time.h> #include <utime.h> #include <unistd.h> +#include "erofs/lock.h" #include <sys/stat.h> #include "erofs/print.h" #include "erofs/decompress.h" #include "erofs/dir.h" #include "erofs/xattr.h" +#include "erofs/workqueue.h" #include "../lib/compressor.h" #include "../lib/liberofs_compress.h" +extern struct erofs_workqueue erofs_wq; + static int erofsfsck_check_inode(erofs_nid_t pnid, erofs_nid_t nid); struct erofsfsck_dirstack { @@ -505,135 +509,95 @@ out: static int erofs_verify_inode_data(struct erofs_inode *inode, int outfd) { - struct erofs_map_blocks map = { - .buf = __EROFS_BUF_INITIALIZER, - }; - bool needdecode = fsckcfg.check_decomp && !erofs_is_packed_inode(inode); - int ret = 0; - bool compressed; - erofs_off_t pos = 0; - u64 pchunk_len = 0; - unsigned int raw_size = 0, buffer_size = 0; - char *raw = NULL, *buffer = NULL; - - erofs_dbg("verify data chunk of nid(%llu): type(%d)", - inode->nid | 0ULL, inode->datalayout); - - compressed = erofs_inode_is_data_compressed(inode->datalayout); - while (pos < inode->i_size) { - unsigned int alloc_rawsize; - - map.m_la = pos; - ret = erofs_map_blocks(inode, &map, EROFS_GET_BLOCKS_FIEMAP); - if (ret) - goto out; - - if (!compressed && map.m_llen != map.m_plen) { - erofs_err("broken chunk length m_la %" PRIu64 " m_llen %" PRIu64 " m_plen %" PRIu64, - map.m_la, map.m_llen, map.m_plen); - ret = -EFSCORRUPTED; - goto out; - } - - /* the last lcluster can be divided into 3 parts */ - if (map.m_la + map.m_llen > inode->i_size) - map.m_llen = inode->i_size - map.m_la; - - pchunk_len += map.m_plen; - pos += map.m_llen; - - /* should skip decomp? */ - if (map.m_la >= inode->i_size || !needdecode) - continue; - - if (outfd >= 0 && !(map.m_flags & EROFS_MAP_MAPPED)) { - ret = lseek(outfd, map.m_llen, SEEK_CUR); - if (ret < 0) { - ret = -errno; - goto out; - } - continue; - } - - if (map.m_plen > Z_EROFS_PCLUSTER_MAX_SIZE) { - if (compressed && !(map.m_flags & __EROFS_MAP_FRAGMENT)) { - erofs_err("invalid pcluster size %" PRIu64 " @ offset %" PRIu64 " of nid %" PRIu64, - map.m_plen, map.m_la, - inode->nid | 0ULL); - ret = -EFSCORRUPTED; - goto out; - } - alloc_rawsize = Z_EROFS_PCLUSTER_MAX_SIZE; - } else { - alloc_rawsize = map.m_plen; - } - - if (alloc_rawsize > raw_size) { - char *newraw = realloc(raw, alloc_rawsize); + struct erofs_map_blocks map = { .buf = __EROFS_BUF_INITIALIZER }; + bool needdecode = fsckcfg.check_decomp && !erofs_is_packed_inode(inode); + int ret = 0; + bool compressed = erofs_inode_is_data_compressed(inode->datalayout); + erofs_off_t pos = 0; + u64 pchunk_len = 0; + + struct z_erofs_read_ctx ctx = { + .pending_tasks = 0, + .final_err = 0, + .outfd = outfd, + .free_out = true, + .current_task = NULL + }; + erofs_mutex_init(&ctx.lock); + erofs_cond_init(&ctx.cond); + + erofs_dbg("verify data chunk of nid(%llu): type(%d)", inode->nid | 0ULL, inode->datalayout); + + while (pos < inode->i_size) { + map.m_la = pos; + ret = erofs_map_blocks(inode, &map, EROFS_GET_BLOCKS_FIEMAP); + if (ret) goto out; + + if (map.m_la + map.m_llen > inode->i_size) + map.m_llen = inode->i_size - map.m_la; + + pchunk_len += map.m_plen; + pos += map.m_llen; + + if (map.m_la >= inode->i_size || !needdecode) + continue; + + if (outfd >= 0 && !(map.m_flags & EROFS_MAP_MAPPED)) { + ret = lseek(outfd, map.m_llen, SEEK_CUR); + if (ret < 0) { + ret = -errno; + goto out; + } + continue; + } + + if (compressed) { + char *raw = malloc(map.m_plen); + size_t buffer_size = map.m_llen > erofs_blksiz(inode->sbi) ? map.m_llen : erofs_blksiz(inode->sbi); + char *buffer = calloc(1, buffer_size); + + if (!raw || !buffer) { + free(raw); free(buffer); + ret = -ENOMEM; + goto out; + } + + ret = z_erofs_read_one_data(inode, &map, raw, buffer, 0, map.m_llen, false, map.m_la, &ctx); + if (ret) { + /* DO NOT free(raw) or free(buffer) here. z_erofs_read_one_data took ownership! */ + goto out; + } + } else { + char *raw = calloc(1, map.m_llen); + ret = erofs_read_one_data(inode, &map, raw, 0, map.m_llen); + if (ret >= 0 && outfd >= 0) + pwrite(outfd, raw, map.m_llen, map.m_la); + free(raw); + if (ret) goto out; + } + } + z_erofs_read_ctx_enqueue(&ctx); - if (!newraw) { - ret = -ENOMEM; - goto out; - } - raw = newraw; - raw_size = alloc_rawsize; - } - - if (compressed) { - if (map.m_llen > buffer_size) { - char *newbuffer; - - buffer_size = map.m_llen; - newbuffer = realloc(buffer, buffer_size); - if (!newbuffer) { - ret = -ENOMEM; - goto out; - } - buffer = newbuffer; - } - ret = z_erofs_read_one_data(inode, &map, raw, buffer, - 0, map.m_llen, false); - if (ret) - goto out; - - if (outfd >= 0 && write(outfd, buffer, map.m_llen) < 0) - goto fail_eio; - } else { - u64 p = 0; - - do { - u64 count = min_t(u64, alloc_rawsize, - map.m_llen); - - ret = erofs_read_one_data(inode, &map, raw, p, count); - if (ret) - goto out; - - if (outfd >= 0 && write(outfd, raw, count) < 0) - goto fail_eio; - map.m_llen -= count; - p += count; - } while (map.m_llen); - } - } - - if (fsckcfg.print_comp_ratio) { - if (!erofs_is_packed_inode(inode)) - fsckcfg.logical_blocks += BLK_ROUND_UP(inode->sbi, inode->i_size); - fsckcfg.physical_blocks += BLK_ROUND_UP(inode->sbi, pchunk_len); - } out: - if (raw) - free(raw); - if (buffer) - free(buffer); - return ret < 0 ? ret : 0; - -fail_eio: - erofs_err("I/O error occurred when verifying data chunk @ nid %llu", - inode->nid | 0ULL); - ret = -EIO; - goto out; + erofs_mutex_lock(&ctx.lock); + while (ctx.pending_tasks > 0) + erofs_cond_wait(&ctx.cond, &ctx.lock); + if (ctx.final_err < 0 && ret >= 0) + ret = ctx.final_err; + erofs_mutex_unlock(&ctx.lock); + + if (fsckcfg.print_comp_ratio) { + if (!erofs_is_packed_inode(inode)) + fsckcfg.logical_blocks += BLK_ROUND_UP(inode->sbi, inode->i_size); + fsckcfg.physical_blocks += BLK_ROUND_UP(inode->sbi, pchunk_len); + } + + if (outfd >= 0 && ret >= 0) + ftruncate(outfd, inode->i_size); + + erofs_mutex_destroy(&ctx.lock); + erofs_cond_destroy(&ctx.cond); + return ret < 0 ? ret : 0; } static inline int erofs_extract_dir(struct erofs_inode *inode) @@ -1043,9 +1007,19 @@ int erofsfsck_fuzz_one(int argc, char *argv[]) int main(int argc, char *argv[]) #endif { + int err; +#ifdef EROFS_MT_ENABLED + int workers; +#endif - erofs_init_configure(); + erofs_init_configure(); + +#ifdef EROFS_MT_ENABLED + workers = sysconf(_SC_NPROCESSORS_ONLN); + if (workers < 1) workers = 1; + erofs_alloc_workqueue(&erofs_wq, workers, 256, NULL, NULL); +#endif fsckcfg.physical_blocks = 0; fsckcfg.logical_blocks = 0; @@ -1179,9 +1153,12 @@ exit_put_super: exit_dev_close: erofs_dev_close(&g_sbi); exit: - erofs_blob_closeall(&g_sbi); - erofs_exit_configure(); - return err ? 1 : 0; + erofs_blob_closeall(&g_sbi); + erofs_exit_configure(); +#ifdef EROFS_MT_ENABLED + erofs_destroy_workqueue(&erofs_wq); +#endif + return err ? 1 : 0; } #ifdef FUZZING diff --git a/include/erofs/internal.h b/include/erofs/internal.h index 671880f..63fd3bf 100644 --- a/include/erofs/internal.h +++ b/include/erofs/internal.h @@ -25,6 +25,7 @@ typedef unsigned short umode_t; #ifdef HAVE_PTHREAD_H #include <pthread.h> #endif +#include <erofs/lock.h> #include <stdlib.h> #include <string.h> #include "atomic.h" @@ -62,6 +63,7 @@ struct erofs_buf { #define erofs_pos(sbi, nr) ((erofs_off_t)(nr) << (sbi)->blkszbits) #define BLK_ROUND_UP(sbi, addr) \ (roundup(addr, erofs_blksiz(sbi)) >> (sbi)->blkszbits) +#define Z_EROFS_PCLUSTER_BATCH_SIZE 32 struct erofs_buffer_head; struct erofs_bufmgr; @@ -442,6 +444,20 @@ struct z_erofs_paramset { char *extraopts; }; +struct z_erofs_decompress_task; + +struct z_erofs_read_ctx { + erofs_mutex_t lock; + erofs_cond_t cond; + int pending_tasks; + int final_err; + int outfd; + bool free_out; + struct z_erofs_decompress_task *current_task; +}; + +void z_erofs_read_ctx_enqueue(struct z_erofs_read_ctx *ctx); + int liberofs_global_init(void); void liberofs_global_exit(void); @@ -478,7 +494,8 @@ int erofs_read_one_data(struct erofs_inode *inode, struct erofs_map_blocks *map, char *buffer, u64 offset, size_t len); int z_erofs_read_one_data(struct erofs_inode *inode, struct erofs_map_blocks *map, char *raw, char *buffer, - erofs_off_t skip, erofs_off_t length, bool trimmed); + erofs_off_t skip, erofs_off_t length, bool trimmed, + erofs_off_t out_offset, struct z_erofs_read_ctx *ctx); void *erofs_read_metadata(struct erofs_sb_info *sbi, erofs_nid_t nid, erofs_off_t *offset, int *lengthp); int z_erofs_parse_cfgs(struct erofs_sb_info *sbi, struct erofs_super_block *dsb); diff --git a/include/erofs/lock.h b/include/erofs/lock.h index c6e3093..a2e1b60 100644 --- a/include/erofs/lock.h +++ b/include/erofs/lock.h @@ -15,6 +15,7 @@ static inline void erofs_mutex_init(erofs_mutex_t *lock) } #define erofs_mutex_lock pthread_mutex_lock #define erofs_mutex_unlock pthread_mutex_unlock +#define erofs_mutex_destroy pthread_mutex_destroy #define EROFS_DEFINE_MUTEX(lock) \ erofs_mutex_t lock = PTHREAD_MUTEX_INITIALIZER @@ -29,12 +30,25 @@ static inline void erofs_init_rwsem(erofs_rwsem_t *lock) #define erofs_down_write pthread_rwlock_wrlock #define erofs_up_read pthread_rwlock_unlock #define erofs_up_write pthread_rwlock_unlock + +typedef pthread_cond_t erofs_cond_t; + +static inline void erofs_cond_init(erofs_cond_t *cond) +{ + pthread_cond_init(cond, NULL); +} +#define erofs_cond_wait pthread_cond_wait +#define erofs_cond_signal pthread_cond_signal +#define erofs_cond_broadcast pthread_cond_broadcast +#define erofs_cond_destroy pthread_cond_destroy + #else typedef struct {} erofs_mutex_t; static inline void erofs_mutex_init(erofs_mutex_t *lock) {} static inline void erofs_mutex_lock(erofs_mutex_t *lock) {} static inline void erofs_mutex_unlock(erofs_mutex_t *lock) {} +static inline void erofs_mutex_destroy(erofs_mutex_t *lock) {} #define EROFS_DEFINE_MUTEX(lock) \ erofs_mutex_t lock = {} @@ -46,5 +60,13 @@ static inline void erofs_down_write(erofs_rwsem_t *lock) {} static inline void erofs_up_read(erofs_rwsem_t *lock) {} static inline void erofs_up_write(erofs_rwsem_t *lock) {} +typedef struct {} erofs_cond_t; + +static inline void erofs_cond_init(erofs_cond_t *cond) {} +static inline int erofs_cond_wait(erofs_cond_t *cond, erofs_mutex_t *mutex) { return 0; } +static inline int erofs_cond_signal(erofs_cond_t *cond) { return 0; } +static inline int erofs_cond_broadcast(erofs_cond_t *cond) { return 0; } +static inline int erofs_cond_destroy(erofs_cond_t *cond) { return 0; } + #endif #endif diff --git a/lib/data.c b/lib/data.c index 6fd1389..26fdb43 100644 --- a/lib/data.c +++ b/lib/data.c @@ -9,6 +9,68 @@ #include "erofs/trace.h" #include "erofs/decompress.h" #include "liberofs_fragments.h" +#include "erofs/workqueue.h" +#include "erofs/lock.h" + +struct erofs_workqueue erofs_wq; + +struct z_erofs_decompress_task { + struct erofs_work work; + struct z_erofs_read_ctx *ctx; + struct z_erofs_decompress_req reqs[Z_EROFS_PCLUSTER_BATCH_SIZE]; + char *raw_bufs[Z_EROFS_PCLUSTER_BATCH_SIZE]; + char *out_bufs[Z_EROFS_PCLUSTER_BATCH_SIZE]; + erofs_off_t out_offsets[Z_EROFS_PCLUSTER_BATCH_SIZE]; + unsigned int out_lengths[Z_EROFS_PCLUSTER_BATCH_SIZE]; + unsigned int nr_reqs; +}; + +static void z_erofs_decompress_worker(struct erofs_work *work, void *tlsp) +{ + struct z_erofs_decompress_task *task = (struct z_erofs_decompress_task *)work; + struct z_erofs_read_ctx *ctx = task->ctx; + int i, ret = 0, first_err = 0; + + for (i = 0; i < task->nr_reqs; ++i) { + ret = z_erofs_decompress(&task->reqs[i]); + + if (ret >= 0 && ctx && ctx->outfd >= 0) { + if (pwrite(ctx->outfd, task->out_bufs[i], + task->out_lengths[i], task->out_offsets[i]) < 0) + ret = -errno; + } + + if (ret < 0 && first_err == 0) + first_err = ret; + + free(task->raw_bufs[i]); + if (ctx && ctx->free_out) + free(task->out_bufs[i]); + } + + if (ctx) { + erofs_mutex_lock(&ctx->lock); + if (first_err < 0 && ctx->final_err == 0) + ctx->final_err = first_err; + ctx->pending_tasks--; + if (ctx->pending_tasks == 0) + erofs_cond_signal(&ctx->cond); + erofs_mutex_unlock(&ctx->lock); + } + free(task); +} + +void z_erofs_read_ctx_enqueue(struct z_erofs_read_ctx *ctx) +{ + if (ctx && ctx->current_task) { +#ifdef EROFS_MT_ENABLED + erofs_queue_work(&erofs_wq, &ctx->current_task->work); +#else + z_erofs_decompress_worker(&ctx->current_task->work, NULL); +#endif + ctx->current_task = NULL; + } +} void *erofs_bread(struct erofs_buf *buf, erofs_off_t offset, bool need_kmap) { @@ -277,7 +339,8 @@ static int erofs_read_raw_data(struct erofs_inode *inode, char *buffer, int z_erofs_read_one_data(struct erofs_inode *inode, struct erofs_map_blocks *map, char *raw, char *buffer, - erofs_off_t skip, erofs_off_t length, bool trimmed) + erofs_off_t skip, erofs_off_t length, bool trimmed, + erofs_off_t out_offset, struct z_erofs_read_ctx *ctx) { struct erofs_sb_info *sbi = inode->sbi; struct erofs_map_dev mdev; @@ -285,77 +348,101 @@ int z_erofs_read_one_data(struct erofs_inode *inode, if (map->m_flags & __EROFS_MAP_FRAGMENT) { if (__erofs_unlikely(inode->nid == sbi->packed_nid)) { - erofs_err("fragment should not exist in the packed inode %llu", - sbi->packed_nid | 0ULL); - return -EFSCORRUPTED; + ret = -EFSCORRUPTED; + goto err_out; } - return erofs_packedfile_read(sbi, buffer, length - skip, - inode->fragmentoff + skip); + ret = erofs_packedfile_read(sbi, buffer, length - skip, + inode->fragmentoff + skip); + + if (ret >= 0 && ctx && ctx->outfd >= 0) { + if (pwrite(ctx->outfd, buffer, length - skip, out_offset) < 0) + ret = -errno; + } + goto err_out; } - /* no device id here, thus it will always succeed */ - mdev = (struct erofs_map_dev) { - .m_pa = map->m_pa, - }; + mdev = (struct erofs_map_dev) { .m_pa = map->m_pa }; ret = erofs_map_dev(sbi, &mdev); - if (ret) { - DBG_BUGON(1); - return ret; - } + if (ret) goto err_out; ret = erofs_dev_read(sbi, mdev.m_deviceid, raw, mdev.m_pa, map->m_plen); - if (ret < 0) - return ret; + if (ret < 0) goto err_out; + + struct z_erofs_decompress_task *task = ctx->current_task; + if (!task) { + task = calloc(1, sizeof(*task)); + task->ctx = ctx; + task->work.fn = z_erofs_decompress_worker; + ctx->current_task = task; + + erofs_mutex_lock(&ctx->lock); + ctx->pending_tasks++; + erofs_mutex_unlock(&ctx->lock); + } - ret = z_erofs_decompress(&(struct z_erofs_decompress_req) { - .sbi = sbi, - .in = raw, - .out = buffer, - .decodedskip = skip, - .interlaced_offset = - map->m_algorithmformat == Z_EROFS_COMPRESSION_INTERLACED ? - erofs_blkoff(sbi, map->m_la) : 0, - .inputsize = map->m_plen, - .decodedlength = length, - .alg = map->m_algorithmformat, - .partial_decoding = trimmed ? true : - !(map->m_flags & EROFS_MAP_FULL_MAPPED) || - (map->m_flags & EROFS_MAP_PARTIAL_REF), - }); - if (ret < 0) - return ret; + int idx = task->nr_reqs++; + task->reqs[idx] = (struct z_erofs_decompress_req) { + .sbi = sbi, + .in = raw, + .out = buffer, + .decodedskip = skip, + .interlaced_offset = + map->m_algorithmformat == Z_EROFS_COMPRESSION_INTERLACED ? + erofs_blkoff(sbi, map->m_la) : 0, + .inputsize = map->m_plen, + .decodedlength = length, + .alg = map->m_algorithmformat, + .partial_decoding = trimmed ? true : + !(map->m_flags & EROFS_MAP_FULL_MAPPED) || + (map->m_flags & EROFS_MAP_PARTIAL_REF), + }; + task->raw_bufs[idx] = raw; + task->out_bufs[idx] = buffer; + task->out_offsets[idx] = out_offset; + task->out_lengths[idx] = length; + + if (task->nr_reqs == Z_EROFS_PCLUSTER_BATCH_SIZE) { + z_erofs_read_ctx_enqueue(ctx); + } return 0; + +err_out: + if (ctx && ctx->free_out) free(buffer); + free(raw); + return ret; } static int z_erofs_read_data(struct erofs_inode *inode, char *buffer, - erofs_off_t size, erofs_off_t offset) + erofs_off_t size, erofs_off_t offset) { erofs_off_t end, length, skip; struct erofs_map_blocks map = { .buf = __EROFS_BUF_INITIALIZER, }; bool trimmed; - unsigned int bufsize = 0; - char *raw = NULL; int ret = 0; + struct z_erofs_read_ctx ctx = { + .pending_tasks = 0, + .final_err = 0, + .outfd = -1, + .free_out = false, + .current_task = NULL + }; + erofs_mutex_init(&ctx.lock); + erofs_cond_init(&ctx.cond); + end = offset + size; while (end > offset) { map.m_la = end - 1; ret = z_erofs_map_blocks_iter(inode, &map, 0); - if (ret) - break; + if (ret) break; - /* - * trim to the needed size if the returned extent is quite - * larger than requested, and set up partial flag as well. - */ if (end < map.m_la + map.m_llen) { length = end - map.m_la; trimmed = true; } else { - DBG_BUGON(end != map.m_la + map.m_llen); length = map.m_llen; trimmed = false; } @@ -374,25 +461,31 @@ static int z_erofs_read_data(struct erofs_inode *inode, char *buffer, continue; } - if (map.m_plen > bufsize) { - char *newraw; - - bufsize = map.m_plen; - newraw = realloc(raw, bufsize); - if (!newraw) { - ret = -ENOMEM; - break; - } - raw = newraw; + char *raw = malloc(map.m_plen); + if (!raw) { + ret = -ENOMEM; + break; } ret = z_erofs_read_one_data(inode, &map, raw, - buffer + end - offset, skip, length, trimmed); - if (ret < 0) + buffer + end - offset, skip, length, trimmed, 0, &ctx); + if (ret < 0) { break; + } } - if (raw) - free(raw); + z_erofs_read_ctx_enqueue(&ctx); + + erofs_mutex_lock(&ctx.lock); + while (ctx.pending_tasks > 0) + erofs_cond_wait(&ctx.cond, &ctx.lock); + + if (ctx.final_err < 0 && ret == 0) + ret = ctx.final_err; + + erofs_mutex_unlock(&ctx.lock); + erofs_mutex_destroy(&ctx.lock); + erofs_cond_destroy(&ctx.cond); + return ret < 0 ? ret : 0; } -- 2.52.0 ^ permalink raw reply related [flat|nested] 9+ messages in thread
* Re: [PATCH v2 1/2] fsck.erofs: introduce multi-threaded decompression with static batching 2026-06-08 5:07 ` [PATCH v2 1/2] fsck.erofs: introduce multi-threaded decompression with static batching Nithurshen @ 2026-06-08 6:25 ` Gao Xiang 0 siblings, 0 replies; 9+ messages in thread From: Gao Xiang @ 2026-06-08 6:25 UTC (permalink / raw) To: Nithurshen; +Cc: linux-erofs, xiang On 2026/6/8 13:07, Nithurshen wrote: > Currently, fsck.erofs extracts files synchronously. When decompressing > heavily packed images (like LZ4HC with 4K pclusters), the main thread > spends the majority of its time blocked on a combination of synchronous > vfs_write() syscalls and LZ4_decompress_safe(), bottlenecking overall > extraction speed. > > This patch introduces a scalable, multi-threaded decompression framework > using the existing erofs_workqueue infrastructure to decouple compute > from the main thread's I/O. > > To prevent massive scheduling overhead (futex contention) where worker > threads spend more CPU time waking up than actually decompressing small > 4KB clusters, this implementation introduces a batching context. The > main thread collects an array of sequential pclusters (temporarily hard- > capped at Z_EROFS_PCLUSTER_BATCH_SIZE = 32) before submitting a single > erofs_work unit. > > Key details of this implementation: > - The worker pool is dynamically sized based on available system CPUs. > - Decompression tasks take strict ownership of the raw and output > buffers (safely tracking memory via a `free_out` flag) to prevent > data races and memory leaks. > - Output buffers are explicitly zero-initialized via calloc() to > prevent trailing garbage bytes from leaking into extracted files. > - Tail-end packed fragments are processed synchronously by the main > thread, as their minimal overhead does not benefit from asynchronous > offloading. > > Signed-off-by: Nithurshen <nithurshen.dev@gmail.com> > --- > fsck/main.c | 239 ++++++++++++++++++--------------------- > include/erofs/internal.h | 19 +++- > include/erofs/lock.h | 22 ++++ > lib/data.c | 207 +++++++++++++++++++++++---------- > 4 files changed, 298 insertions(+), 189 deletions(-) > > diff --git a/fsck/main.c b/fsck/main.c > index 16cc627..de6ab4d 100644 > --- a/fsck/main.c > +++ b/fsck/main.c > @@ -8,14 +8,18 @@ > #include <time.h> > #include <utime.h> > #include <unistd.h> > +#include "erofs/lock.h" > #include <sys/stat.h> > #include "erofs/print.h" > #include "erofs/decompress.h" > #include "erofs/dir.h" > #include "erofs/xattr.h" > +#include "erofs/workqueue.h" > #include "../lib/compressor.h" > #include "../lib/liberofs_compress.h" > > +extern struct erofs_workqueue erofs_wq; > + > static int erofsfsck_check_inode(erofs_nid_t pnid, erofs_nid_t nid); > > struct erofsfsck_dirstack { > @@ -505,135 +509,95 @@ out: > > static int erofs_verify_inode_data(struct erofs_inode *inode, int outfd) > { > - struct erofs_map_blocks map = { > - .buf = __EROFS_BUF_INITIALIZER, > - }; > - bool needdecode = fsckcfg.check_decomp && !erofs_is_packed_inode(inode); > - int ret = 0; > - bool compressed; > - erofs_off_t pos = 0; > - u64 pchunk_len = 0; > - unsigned int raw_size = 0, buffer_size = 0; > - char *raw = NULL, *buffer = NULL; > - > - erofs_dbg("verify data chunk of nid(%llu): type(%d)", > - inode->nid | 0ULL, inode->datalayout); > - > - compressed = erofs_inode_is_data_compressed(inode->datalayout); > - while (pos < inode->i_size) { > - unsigned int alloc_rawsize; > - > - map.m_la = pos; > - ret = erofs_map_blocks(inode, &map, EROFS_GET_BLOCKS_FIEMAP); > - if (ret) > - goto out; > - > - if (!compressed && map.m_llen != map.m_plen) { > - erofs_err("broken chunk length m_la %" PRIu64 " m_llen %" PRIu64 " m_plen %" PRIu64, > - map.m_la, map.m_llen, map.m_plen); > - ret = -EFSCORRUPTED; > - goto out; > - } > - > - /* the last lcluster can be divided into 3 parts */ > - if (map.m_la + map.m_llen > inode->i_size) > - map.m_llen = inode->i_size - map.m_la; > - > - pchunk_len += map.m_plen; > - pos += map.m_llen; > - > - /* should skip decomp? */ > - if (map.m_la >= inode->i_size || !needdecode) > - continue; > - > - if (outfd >= 0 && !(map.m_flags & EROFS_MAP_MAPPED)) { > - ret = lseek(outfd, map.m_llen, SEEK_CUR); > - if (ret < 0) { > - ret = -errno; > - goto out; > - } > - continue; > - } > - > - if (map.m_plen > Z_EROFS_PCLUSTER_MAX_SIZE) { > - if (compressed && !(map.m_flags & __EROFS_MAP_FRAGMENT)) { > - erofs_err("invalid pcluster size %" PRIu64 " @ offset %" PRIu64 " of nid %" PRIu64, > - map.m_plen, map.m_la, > - inode->nid | 0ULL); > - ret = -EFSCORRUPTED; > - goto out; > - } > - alloc_rawsize = Z_EROFS_PCLUSTER_MAX_SIZE; > - } else { > - alloc_rawsize = map.m_plen; > - } > - > - if (alloc_rawsize > raw_size) { > - char *newraw = realloc(raw, alloc_rawsize); > + struct erofs_map_blocks map = { .buf = __EROFS_BUF_INITIALIZER }; Please use tab instead of spaces. > + bool needdecode = fsckcfg.check_decomp && !erofs_is_packed_inode(inode); > + int ret = 0; > + bool compressed = erofs_inode_is_data_compressed(inode->datalayout); > + erofs_off_t pos = 0; > + u64 pchunk_len = 0; > + > + struct z_erofs_read_ctx ctx = { > + .pending_tasks = 0, > + .final_err = 0, > + .outfd = outfd, > + .free_out = true, > + .current_task = NULL > + }; Honestly, I don't like `z_erofs_read_ctx` naming, but I don't have a better suggestion. > + erofs_mutex_init(&ctx.lock); > + erofs_cond_init(&ctx.cond); > + ... > + > +#ifdef EROFS_MT_ENABLED > + workers = sysconf(_SC_NPROCESSORS_ONLN); why erofs_get_available_processors() doesn't work? > + if (workers < 1) workers = 1; Please leave the single statement in the next line; > + erofs_alloc_workqueue(&erofs_wq, workers, 256, NULL, NULL); > +#endif > > fsckcfg.physical_blocks = 0; > fsckcfg.logical_blocks = 0; > @@ -1179,9 +1153,12 @@ exit_put_super: ... > diff --git a/include/erofs/lock.h b/include/erofs/lock.h > index c6e3093..a2e1b60 100644 > --- a/include/erofs/lock.h > +++ b/include/erofs/lock.h > @@ -15,6 +15,7 @@ static inline void erofs_mutex_init(erofs_mutex_t *lock) > } > #define erofs_mutex_lock pthread_mutex_lock > #define erofs_mutex_unlock pthread_mutex_unlock > +#define erofs_mutex_destroy pthread_mutex_destroy > > #define EROFS_DEFINE_MUTEX(lock) \ > erofs_mutex_t lock = PTHREAD_MUTEX_INITIALIZER > @@ -29,12 +30,25 @@ static inline void erofs_init_rwsem(erofs_rwsem_t *lock) > #define erofs_down_write pthread_rwlock_wrlock > #define erofs_up_read pthread_rwlock_unlock > #define erofs_up_write pthread_rwlock_unlock > + > +typedef pthread_cond_t erofs_cond_t; I think erofs_cond_t should be in a seperate .h > + > +static inline void erofs_cond_init(erofs_cond_t *cond) > +{ > + pthread_cond_init(cond, NULL); > +} > +#define erofs_cond_wait pthread_cond_wait > +#define erofs_cond_signal pthread_cond_signal > +#define erofs_cond_broadcast pthread_cond_broadcast > +#define erofs_cond_destroy pthread_cond_destroy > + > #else > typedef struct {} erofs_mutex_t; > > static inline void erofs_mutex_init(erofs_mutex_t *lock) {} > static inline void erofs_mutex_lock(erofs_mutex_t *lock) {} > static inline void erofs_mutex_unlock(erofs_mutex_t *lock) {} > +static inline void erofs_mutex_destroy(erofs_mutex_t *lock) {} > > #define EROFS_DEFINE_MUTEX(lock) \ > erofs_mutex_t lock = {} > @@ -46,5 +60,13 @@ static inline void erofs_down_write(erofs_rwsem_t *lock) {} > static inline void erofs_up_read(erofs_rwsem_t *lock) {} > static inline void erofs_up_write(erofs_rwsem_t *lock) {} > > +typedef struct {} erofs_cond_t; > + > +static inline void erofs_cond_init(erofs_cond_t *cond) {} > +static inline int erofs_cond_wait(erofs_cond_t *cond, erofs_mutex_t *mutex) { return 0; } > +static inline int erofs_cond_signal(erofs_cond_t *cond) { return 0; } > +static inline int erofs_cond_broadcast(erofs_cond_t *cond) { return 0; } > +static inline int erofs_cond_destroy(erofs_cond_t *cond) { return 0; } > + > #endif > #endif > diff --git a/lib/data.c b/lib/data.c > index 6fd1389..26fdb43 100644 > --- a/lib/data.c > +++ b/lib/data.c > @@ -9,6 +9,68 @@ > #include "erofs/trace.h" > #include "erofs/decompress.h" > #include "liberofs_fragments.h" > +#include "erofs/workqueue.h" > +#include "erofs/lock.h" > + > +struct erofs_workqueue erofs_wq; > + > +struct z_erofs_decompress_task { > + struct erofs_work work; > + struct z_erofs_read_ctx *ctx; > + struct z_erofs_decompress_req reqs[Z_EROFS_PCLUSTER_BATCH_SIZE]; > + char *raw_bufs[Z_EROFS_PCLUSTER_BATCH_SIZE]; > + char *out_bufs[Z_EROFS_PCLUSTER_BATCH_SIZE]; > + erofs_off_t out_offsets[Z_EROFS_PCLUSTER_BATCH_SIZE]; > + unsigned int out_lengths[Z_EROFS_PCLUSTER_BATCH_SIZE]; > + unsigned int nr_reqs; > +}; Why not adding `struct z_erofs_decompress_task_item` and make req/raw_buf/out_buf/.. in it. > + > +static void z_erofs_decompress_worker(struct erofs_work *work, void *tlsp) > +{ > + struct z_erofs_decompress_task *task = (struct z_erofs_decompress_task *)work; > + struct z_erofs_read_ctx *ctx = task->ctx; > + int i, ret = 0, first_err = 0; > + > + for (i = 0; i < task->nr_reqs; ++i) { > + ret = z_erofs_decompress(&task->reqs[i]); > + > + if (ret >= 0 && ctx && ctx->outfd >= 0) { > + if (pwrite(ctx->outfd, task->out_bufs[i], > + task->out_lengths[i], task->out_offsets[i]) < 0) > + ret = -errno; > + } > + > + if (ret < 0 && first_err == 0) > + first_err = ret; > + > + free(task->raw_bufs[i]); > + if (ctx && ctx->free_out) > + free(task->out_bufs[i]); > + } > + > + if (ctx) { > + erofs_mutex_lock(&ctx->lock); > + if (first_err < 0 && ctx->final_err == 0) > + ctx->final_err = first_err; > + ctx->pending_tasks--; > + if (ctx->pending_tasks == 0) > + erofs_cond_signal(&ctx->cond); > + erofs_mutex_unlock(&ctx->lock); > + } > + free(task); > +} > + > +void z_erofs_read_ctx_enqueue(struct z_erofs_read_ctx *ctx) > +{ > + if (ctx && ctx->current_task) { > +#ifdef EROFS_MT_ENABLED > + erofs_queue_work(&erofs_wq, &ctx->current_task->work); > +#else > + z_erofs_decompress_worker(&ctx->current_task->work, NULL); > +#endif > + ctx->current_task = NULL; > + } > +} > > void *erofs_bread(struct erofs_buf *buf, erofs_off_t offset, bool need_kmap) > { > @@ -277,7 +339,8 @@ static int erofs_read_raw_data(struct erofs_inode *inode, char *buffer, > > int z_erofs_read_one_data(struct erofs_inode *inode, > struct erofs_map_blocks *map, char *raw, char *buffer, > - erofs_off_t skip, erofs_off_t length, bool trimmed) > + erofs_off_t skip, erofs_off_t length, bool trimmed, > + erofs_off_t out_offset, struct z_erofs_read_ctx *ctx) > { > struct erofs_sb_info *sbi = inode->sbi; > struct erofs_map_dev mdev; > @@ -285,77 +348,101 @@ int z_erofs_read_one_data(struct erofs_inode *inode, > > if (map->m_flags & __EROFS_MAP_FRAGMENT) { > if (__erofs_unlikely(inode->nid == sbi->packed_nid)) { > - erofs_err("fragment should not exist in the packed inode %llu", > - sbi->packed_nid | 0ULL); > - return -EFSCORRUPTED; > + ret = -EFSCORRUPTED; > + goto err_out; > } > - return erofs_packedfile_read(sbi, buffer, length - skip, > - inode->fragmentoff + skip); > + ret = erofs_packedfile_read(sbi, buffer, length - skip, > + inode->fragmentoff + skip); > + > + if (ret >= 0 && ctx && ctx->outfd >= 0) { > + if (pwrite(ctx->outfd, buffer, length - skip, out_offset) < 0) > + ret = -errno; > + } > + goto err_out; > } > > - /* no device id here, thus it will always succeed */ > - mdev = (struct erofs_map_dev) { > - .m_pa = map->m_pa, > - }; > + mdev = (struct erofs_map_dev) { .m_pa = map->m_pa }; > ret = erofs_map_dev(sbi, &mdev); > - if (ret) { > - DBG_BUGON(1); > - return ret; > - } > + if (ret) goto err_out; Bad style here too. > > ret = erofs_dev_read(sbi, mdev.m_deviceid, raw, mdev.m_pa, map->m_plen); > - if (ret < 0) > - return ret; > + if (ret < 0) goto err_out; Ditto. Thanks, Gao Xiang ^ permalink raw reply [flat|nested] 9+ messages in thread
* [PATCH v2 2/2] fsck.erofs: implement algorithm-aware pcluster batching 2026-06-08 5:07 ` [PATCH v2 0/2] fsck.erofs: add multi-threaded decompression Nithurshen 2026-06-08 5:07 ` [PATCH v2 1/2] fsck.erofs: introduce multi-threaded decompression with static batching Nithurshen @ 2026-06-08 5:07 ` Nithurshen 1 sibling, 0 replies; 9+ messages in thread From: Nithurshen @ 2026-06-08 5:07 UTC (permalink / raw) To: nithurshen.dev; +Cc: hsiangkao, linux-erofs, xiang While static batching successfully overlaps I/O and compute, different compression algorithms exhibit vastly different scheduling thresholds. Extremely fast algorithms like LZ4 require large batches (e.g., 32 pclusters) to effectively hide the synchronization overhead of the thread pool. Conversely, applying this large batch size to compute-heavy algorithms like LZMA or ZSTD causes memory bloat and thread starvation, as the main thread spends too much time reading and accumulating memory before waking up the background workers. This patch modifies the workqueue submission logic in z_erofs_read_one_data to scale the batch size based on the algorithm format. LZ4 is permitted to utilize the Z_EROFS_PCLUSTER_MAX_BATCH_SIZE, while other heavier algorithms trigger workqueue submission at a much lower threshold (8 pclusters) to ensure a steady pipeline of work and a bounded memory footprint. Signed-off-by: Nithurshen <nithurshen.dev@gmail.com> --- include/erofs/internal.h | 2 +- lib/data.c | 15 +++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/include/erofs/internal.h b/include/erofs/internal.h index 63fd3bf..ffb0adb 100644 --- a/include/erofs/internal.h +++ b/include/erofs/internal.h @@ -63,7 +63,7 @@ struct erofs_buf { #define erofs_pos(sbi, nr) ((erofs_off_t)(nr) << (sbi)->blkszbits) #define BLK_ROUND_UP(sbi, addr) \ (roundup(addr, erofs_blksiz(sbi)) >> (sbi)->blkszbits) -#define Z_EROFS_PCLUSTER_BATCH_SIZE 32 +#define Z_EROFS_PCLUSTER_MAX_BATCH_SIZE 32 struct erofs_buffer_head; struct erofs_bufmgr; diff --git a/lib/data.c b/lib/data.c index 26fdb43..cb882c5 100644 --- a/lib/data.c +++ b/lib/data.c @@ -17,11 +17,11 @@ struct erofs_workqueue erofs_wq; struct z_erofs_decompress_task { struct erofs_work work; struct z_erofs_read_ctx *ctx; - struct z_erofs_decompress_req reqs[Z_EROFS_PCLUSTER_BATCH_SIZE]; - char *raw_bufs[Z_EROFS_PCLUSTER_BATCH_SIZE]; - char *out_bufs[Z_EROFS_PCLUSTER_BATCH_SIZE]; - erofs_off_t out_offsets[Z_EROFS_PCLUSTER_BATCH_SIZE]; - unsigned int out_lengths[Z_EROFS_PCLUSTER_BATCH_SIZE]; + struct z_erofs_decompress_req reqs[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; + char *raw_bufs[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; + char *out_bufs[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; + erofs_off_t out_offsets[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; + unsigned int out_lengths[Z_EROFS_PCLUSTER_MAX_BATCH_SIZE]; unsigned int nr_reqs; }; @@ -401,7 +401,10 @@ int z_erofs_read_one_data(struct erofs_inode *inode, task->out_offsets[idx] = out_offset; task->out_lengths[idx] = length; - if (task->nr_reqs == Z_EROFS_PCLUSTER_BATCH_SIZE) { + int batch_limit = (map->m_algorithmformat == Z_EROFS_COMPRESSION_LZ4) ? + Z_EROFS_PCLUSTER_MAX_BATCH_SIZE : 8; + + if (task->nr_reqs >= batch_limit) { z_erofs_read_ctx_enqueue(ctx); } return 0; -- 2.52.0 ^ permalink raw reply related [flat|nested] 9+ messages in thread
end of thread, other threads:[~2026-06-08 6:25 UTC | newest] Thread overview: 9+ messages (download: mbox.gz follow: Atom feed -- links below jump to the message on this page -- 2026-05-23 0:37 [PATCH 0/2] fsck.erofs: introduce multi-threaded decompression Nithurshen 2026-05-23 0:37 ` [PATCH 1/2] fsck.erofs: introduce multi-threaded decompression with static batching Nithurshen 2026-06-07 1:50 ` Gao Xiang 2026-05-23 0:37 ` [PATCH 2/2] fsck.erofs: implement dynamic pcluster batching based on algorithm complexity Nithurshen 2026-06-07 1:52 ` Gao Xiang 2026-06-08 5:07 ` [PATCH v2 0/2] fsck.erofs: add multi-threaded decompression Nithurshen 2026-06-08 5:07 ` [PATCH v2 1/2] fsck.erofs: introduce multi-threaded decompression with static batching Nithurshen 2026-06-08 6:25 ` Gao Xiang 2026-06-08 5:07 ` [PATCH v2 2/2] fsck.erofs: implement algorithm-aware pcluster batching Nithurshen
This is an external index of several public inboxes, see mirroring instructions on how to clone and mirror all data and code used by this external index.