All of lore.kernel.org
 help / color / mirror / Atom feed
From: Pavel Begunkov <asml.silence@gmail.com>
To: Jens Axboe <axboe@kernel.dk>, io-uring@vger.kernel.org
Subject: Re: [PATCH 4/4] io_uring: optimise compl locking for non-shared rings
Date: Fri, 18 Mar 2022 15:32:15 +0000	[thread overview]
Message-ID: <aa3c0800-feed-50f9-e8bb-d4b861c4265c@gmail.com> (raw)
In-Reply-To: <ce480edd-c82b-c094-39cd-d45d6b76e5a3@kernel.dk>

[-- Attachment #1: Type: text/plain, Size: 4067 bytes --]

On 3/18/22 15:21, Jens Axboe wrote:
> On 3/18/22 9:13 AM, Pavel Begunkov wrote:
>> On 3/18/22 14:54, Jens Axboe wrote:
>>> On 3/18/22 7:52 AM, Pavel Begunkov wrote:
>>>> When only one task submits requests, most of CQEs are expected to be
>>>> filled from that task context so we have natural serialisation. That
>>>> would mean that in those cases we don't need spinlocking around CQE
>>>> posting. One downside is that it also mean that io-wq workers can't emit
>>>> CQEs directly but should do it through the original task context using
>>>> task_works. That may hurt latency and performance and might matter much
>>>> to some workloads, but it's not a huge deal in general as io-wq is a
>>>> slow path and there is some additional merit from tw completion
>>>> batching.
>>>
>>> Not too worried about io-wq task_work for cq filling, it is the slower
>>> path after all. And I think we can get away with doing notifications as
>>> it's just for CQ filling. If the task is currently waiting in
>>> cqring_wait, then it'll get woken anyway and it will process task work.
>>> If it's in userspace, it doesn't need a notification. That should make
>>> it somewhat lighter than requiring using TIF_NOTIFY_SIGNAL for that.
>>>
>>>> The feature should be opted-in by the userspace by setting a new
>>>> IORING_SETUP_PRIVATE_CQ flag. It doesn't work with IOPOLL, and also for
>>>> now only the task that created a ring can submit requests to it.
>>>
>>> I know this is a WIP, but why do we need CQ_PRIVATE? And this needs to
>>
>> One reason is because of the io-wq -> tw punting, which is not optimal
>> for e.g. active users of IOSQE_ASYNC. The second is because the
>> fundamental requirement is that only one task should be submitting
>> requests. Was thinking about automating it, e.g. when we register
>> a second tctx we go through a slow path waiting for all current tw
>> to complete and then removing an internal and not userspace visible
>> CQ_PRIVATE flag.
> 
> Was thinking something along those lines too. The alternative is setting
> up the ring with SETUP_SINGLE_ISSUER or something like that, having the
> application tell us that it is a single issuer and no submits are
> shared across threads. Serves the same kind of purpose as CQ_PRIVATE,
> but enables us to simply fail things if the task violates those
> constraints. Would also be a better name I believe as it might enable
> further optimizations in the future, like for example the mutex
> reduction for submits.

That's exactly what it is, including the failing part. And I like
your name better, will take it

>> Also, as SQPOLL task is by definition the only one submitting SQEs,
>> was thinking about enabling it by default for them, but didn't do
>> because of the io-wq / IOSQE_ASYNC.
> 
> Gotcha.
> 
>>> work with registered files (and ring fd) as that is probably a bigger
>>> win than skipping the completion_lock if you're not shared anyway.
>>
>> It does work with fixed/registered files and registered io_uring fds.
> 
> t/io_uring fails for me with registered files or rings, getting EINVAL.
> Might be user error, but that's simply just setting CQ_PRIVATE for
> setup.

One thing I changed in the tool is that the ring should be created
by the submitter task, so move setup_ring into the submitter thread.
Plan to get rid of this restriction though.

Weird that it works only for you only without reg files/rings, will
take a look.

Attached io_uring.c that I used, it's based on some old version,
so do_nop can't be set in argv but should turned in the source code.
IORING_ENTER_REGISTERED_RING is always enabled.

>> In regards of "a bigger win", probably in many cases, but if you submit
>> a good batch at once, and completion tw batching doesn't kick in (e.g.
>> direct bdev read of not too high intensity), it might save
>> N spinlock/unlock when registered ring fd would kill only one pair of
>> fdget/fdput.
> 
> Definitely, various cases where one would be a bigger win than the
> other, agree on that. But let's just ensure that both work together :-)

-- 
Pavel Begunkov

[-- Attachment #2: io_uring.c --]
[-- Type: text/x-csrc, Size: 15194 bytes --]

#include <stdio.h>
#include <errno.h>
#include <assert.h>
#include <stdlib.h>
#include <stddef.h>
#include <signal.h>
#include <inttypes.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <sys/syscall.h>
#include <sys/resource.h>
#include <sys/mman.h>
#include <sys/uio.h>
#include <linux/fs.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <sched.h>

#include "../arch/arch.h"
#include "../lib/types.h"
#include "../os/linux/io_uring.h"

#define min(a, b)		((a < b) ? (a) : (b))

struct io_sq_ring {
	unsigned *head;
	unsigned *tail;
	unsigned *ring_mask;
	unsigned *ring_entries;
	unsigned *flags;
	unsigned *array;
};


#define IORING_SETUP_PRIVATE_CQ	(1U << 8)

enum {
	IORING_REGISTER_RING_FDS		= 20,
};

#define IORING_ENTER_REGISTERED_RING	(1U << 4)

struct io_cq_ring {
	unsigned *head;
	unsigned *tail;
	unsigned *ring_mask;
	unsigned *ring_entries;
	struct io_uring_cqe *cqes;
};

#define DEPTH			128
#define BATCH_SUBMIT		32
#define BATCH_COMPLETE		32
#define BS			4096

#define MAX_FDS			16

static unsigned sq_ring_mask, cq_ring_mask;

struct file {
	unsigned long max_blocks;
	unsigned pending_ios;
	int real_fd;
	int fixed_fd;
};

struct submitter {
	pthread_t thread;
	int ring_fd;
	struct io_sq_ring sq_ring;
	struct io_uring_sqe *sqes;
	struct io_cq_ring cq_ring;
	int inflight;
	unsigned long reaps;
	unsigned long done;
	unsigned long calls;
	volatile int finish;

	__s32 *fds;

	struct file files[MAX_FDS];
	unsigned nr_files;
	unsigned cur_file;
	struct iovec iovecs[];
};

static struct submitter *submitter;
static volatile int finish;

static int depth = DEPTH;
static int batch_submit = BATCH_SUBMIT;
static int batch_complete = BATCH_COMPLETE;
static int bs = BS;
static int polled = 1;		/* use IO polling */
static int fixedbufs = 1;	/* use fixed user buffers */
static int register_files = 1;	/* use fixed files */
static int buffered = 0;	/* use buffered IO, not O_DIRECT */
static int sq_thread_poll = 0;	/* use kernel submission/poller thread */
static int sq_thread_cpu = -1;	/* pin above thread to this CPU */
static int do_nop = 1;		/* no-op SQ ring commands */

struct io_uring_rsrc_update {
	__u32 offset;
	__u32 resv;
	__aligned_u64 data;
};

static int vectored = 1;

static int setup_ring(struct submitter *s);

static int io_uring_register_buffers(struct submitter *s)
{
	if (do_nop)
		return 0;

	return syscall(__NR_io_uring_register, s->ring_fd,
			IORING_REGISTER_BUFFERS, s->iovecs, depth);
}

static int io_uring_register_io_uring_fd(struct submitter *s)
{
	struct io_uring_rsrc_update up = {};

	up.offset = 0;
	up.data = s->ring_fd;

	return syscall(__NR_io_uring_register, s->ring_fd,
			IORING_REGISTER_RING_FDS, &up, 1);
}

static int io_uring_register_files(struct submitter *s)
{
	int i;

	if (do_nop)
		return 0;

	s->fds = calloc(s->nr_files, sizeof(__s32));
	for (i = 0; i < s->nr_files; i++) {
		s->fds[i] = s->files[i].real_fd;
		s->files[i].fixed_fd = i;
	}

	return syscall(__NR_io_uring_register, s->ring_fd,
			IORING_REGISTER_FILES, s->fds, s->nr_files);
}

static int io_uring_setup(unsigned entries, struct io_uring_params *p)
{
	return syscall(__NR_io_uring_setup, entries, p);
}

static void io_uring_probe(int fd)
{
	struct io_uring_probe *p;
	int ret;

	p = malloc(sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
	if (!p)
		return;

	memset(p, 0, sizeof(*p) + 256 * sizeof(struct io_uring_probe_op));
	ret = syscall(__NR_io_uring_register, fd, IORING_REGISTER_PROBE, p, 256);
	if (ret < 0)
		goto out;

	if (IORING_OP_READ > p->ops_len)
		goto out;

	if ((p->ops[IORING_OP_READ].flags & IO_URING_OP_SUPPORTED))
		vectored = 0;
out:
	free(p);
}

static int io_uring_enter(struct submitter *s, unsigned int to_submit,
			  unsigned int min_complete, unsigned int flags)
{
	return syscall(__NR_io_uring_enter, 0, to_submit, min_complete,
			flags | IORING_ENTER_REGISTERED_RING, NULL, 0);
}

#ifndef CONFIG_HAVE_GETTID
static int gettid(void)
{
	return syscall(__NR_gettid);
}
#endif

static unsigned file_depth(struct submitter *s)
{
	return (depth + s->nr_files - 1) / s->nr_files;
}

static void init_io(struct submitter *s, unsigned index)
{
	struct io_uring_sqe *sqe = &s->sqes[index];
	unsigned long offset;
	struct file *f;
	long r;

	if (do_nop) {
		sqe->opcode = IORING_OP_NOP;
		return;
	}

	if (s->nr_files == 1) {
		f = &s->files[0];
	} else {
		f = &s->files[s->cur_file];
		if (f->pending_ios >= file_depth(s)) {
			s->cur_file++;
			if (s->cur_file == s->nr_files)
				s->cur_file = 0;
			f = &s->files[s->cur_file];
		}
	}
	f->pending_ios++;

	r = lrand48();
	offset = (r % (f->max_blocks - 1)) * bs;

	if (register_files) {
		sqe->flags = IOSQE_FIXED_FILE;
		sqe->fd = f->fixed_fd;
	} else {
		sqe->flags = 0;
		sqe->fd = f->real_fd;
	}
	if (fixedbufs) {
		sqe->opcode = IORING_OP_READ_FIXED;
		sqe->addr = (unsigned long) s->iovecs[index].iov_base;
		sqe->len = bs;
		sqe->buf_index = index;
	} else if (!vectored) {
		sqe->opcode = IORING_OP_READ;
		sqe->addr = (unsigned long) s->iovecs[index].iov_base;
		sqe->len = bs;
		sqe->buf_index = 0;
	} else {
		sqe->opcode = IORING_OP_READV;
		sqe->addr = (unsigned long) &s->iovecs[index];
		sqe->len = 1;
		sqe->buf_index = 0;
	}
	sqe->ioprio = 0;
	sqe->off = offset;
	sqe->user_data = (unsigned long) f;
}

static int prep_more_ios(struct submitter *s, int max_ios)
{
	struct io_sq_ring *ring = &s->sq_ring;
	unsigned index, tail, next_tail, prepped = 0;

	next_tail = tail = *ring->tail;
	do {
		next_tail++;
		if (next_tail == atomic_load_acquire(ring->head))
			break;

		index = tail & sq_ring_mask;
		init_io(s, index);
		ring->array[index] = index;
		prepped++;
		tail = next_tail;
	} while (prepped < max_ios);

	if (prepped)
		atomic_store_release(ring->tail, tail);
	return prepped;
}

static int get_file_size(struct file *f)
{
	struct stat st;

	if (fstat(f->real_fd, &st) < 0)
		return -1;
	if (S_ISBLK(st.st_mode)) {
		unsigned long long bytes;

		if (ioctl(f->real_fd, BLKGETSIZE64, &bytes) != 0)
			return -1;

		f->max_blocks = bytes / bs;
		return 0;
	} else if (S_ISREG(st.st_mode)) {
		f->max_blocks = st.st_size / bs;
		return 0;
	}

	return -1;
}

static int reap_events(struct submitter *s)
{
	struct io_cq_ring *ring = &s->cq_ring;
	struct io_uring_cqe *cqe;
	unsigned head, reaped = 0;

	head = *ring->head;
	do {
		struct file *f;

		read_barrier();
		if (head == atomic_load_acquire(ring->tail))
			break;
		cqe = &ring->cqes[head & cq_ring_mask];
		if (!do_nop) {
			f = (struct file *) (uintptr_t) cqe->user_data;
			f->pending_ios--;
			if (cqe->res != bs) {
				printf("io: unexpected ret=%d\n", cqe->res);
				if (polled && cqe->res == -EOPNOTSUPP)
					printf("Your filesystem/driver/kernel doesn't support polled IO\n");
				return -1;
			}
		}
		reaped++;
		head++;
	} while (1);

	if (reaped) {
		s->inflight -= reaped;
		atomic_store_release(ring->head, head);
	}
	return reaped;
}

static void *submitter_fn(void *data)
{
	struct submitter *s = data;
	struct io_sq_ring *ring = &s->sq_ring;
	int ret, prepped;

	ret = setup_ring(s);
	if (ret) {
		printf("ring setup failed: %s, %d\n", strerror(errno), ret);
		return NULL;
	}

	printf("submitter=%d\n", gettid());

	srand48(pthread_self());

	prepped = 0;
	do {
		int to_wait, to_submit, this_reap, to_prep;
		unsigned ring_flags = 0;

		if (!prepped && s->inflight < depth) {
			to_prep = min(depth - s->inflight, batch_submit);
			prepped = prep_more_ios(s, to_prep);
		}
		s->inflight += prepped;
submit_more:
		to_submit = prepped;
submit:
		if (to_submit && (s->inflight + to_submit <= depth))
			to_wait = 0;
		else
			to_wait = min(s->inflight + to_submit, batch_complete);

		/*
		 * Only need to call io_uring_enter if we're not using SQ thread
		 * poll, or if IORING_SQ_NEED_WAKEUP is set.
		 */
		if (sq_thread_poll)
			ring_flags = atomic_load_acquire(ring->flags);
		if (!sq_thread_poll || ring_flags & IORING_SQ_NEED_WAKEUP) {
			unsigned flags = 0;

			if (to_wait)
				flags = IORING_ENTER_GETEVENTS;
			if (ring_flags & IORING_SQ_NEED_WAKEUP)
				flags |= IORING_ENTER_SQ_WAKEUP;
			ret = io_uring_enter(s, to_submit, to_wait, flags);
			s->calls++;
		} else {
			/* for SQPOLL, we submitted it all effectively */
			ret = to_submit;
		}

		/*
		 * For non SQ thread poll, we already got the events we needed
		 * through the io_uring_enter() above. For SQ thread poll, we
		 * need to loop here until we find enough events.
		 */
		this_reap = 0;
		do {
			int r;
			r = reap_events(s);
			if (r == -1) {
				s->finish = 1;
				break;
			} else if (r > 0)
				this_reap += r;
		} while (sq_thread_poll && this_reap < to_wait);
		s->reaps += this_reap;

		if (ret >= 0) {
			if (!ret) {
				to_submit = 0;
				if (s->inflight)
					goto submit;
				continue;
			} else if (ret < to_submit) {
				int diff = to_submit - ret;

				s->done += ret;
				prepped -= diff;
				goto submit_more;
			}
			s->done += ret;
			prepped = 0;
			continue;
		} else if (ret < 0) {
			if (errno == EAGAIN) {
				if (s->finish)
					break;
				if (this_reap)
					goto submit;
				to_submit = 0;
				goto submit;
			}
			printf("io_submit: %s\n", strerror(errno));
			break;
		}
	} while (!s->finish);

	finish = 1;
	return NULL;
}

static void sig_int(int sig)
{
	printf("Exiting on signal %d\n", sig);
	submitter->finish = 1;
	finish = 1;
}

static void arm_sig_int(void)
{
	struct sigaction act;

	memset(&act, 0, sizeof(act));
	act.sa_handler = sig_int;
	act.sa_flags = SA_RESTART;
	sigaction(SIGINT, &act, NULL);
}

static int setup_ring(struct submitter *s)
{
	struct io_sq_ring *sring = &s->sq_ring;
	struct io_cq_ring *cring = &s->cq_ring;
	struct io_uring_params p;
	int ret, fd;
	void *ptr;

	memset(&p, 0, sizeof(p));

	if (polled && !do_nop)
		p.flags |= IORING_SETUP_IOPOLL;
	if (sq_thread_poll) {
		p.flags |= IORING_SETUP_SQPOLL;
		if (sq_thread_cpu != -1) {
			p.flags |= IORING_SETUP_SQ_AFF;
			p.sq_thread_cpu = sq_thread_cpu;
		}
	}
	p.flags |= IORING_SETUP_PRIVATE_CQ;

	fd = io_uring_setup(depth, &p);
	if (fd < 0) {
		perror("io_uring_setup");
		return 1;
	}
	s->ring_fd = fd;

	io_uring_probe(fd);

	ret = io_uring_register_io_uring_fd(s);
	if (ret < 0) {
		perror("io_uring_register_io_uring_fd");
		return 1;
	}

	if (fixedbufs) {
		ret = io_uring_register_buffers(s);
		if (ret < 0) {
			perror("io_uring_register_buffers");
			return 1;
		}
	}

	if (register_files) {
		ret = io_uring_register_files(s);
		if (ret < 0) {
			perror("io_uring_register_files");
			return 1;
		}
	}

	ptr = mmap(0, p.sq_off.array + p.sq_entries * sizeof(__u32),
			PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
			IORING_OFF_SQ_RING);
	printf("sq_ring ptr = 0x%p\n", ptr);
	sring->head = ptr + p.sq_off.head;
	sring->tail = ptr + p.sq_off.tail;
	sring->ring_mask = ptr + p.sq_off.ring_mask;
	sring->ring_entries = ptr + p.sq_off.ring_entries;
	sring->flags = ptr + p.sq_off.flags;
	sring->array = ptr + p.sq_off.array;
	sq_ring_mask = *sring->ring_mask;

	s->sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
			PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
			IORING_OFF_SQES);
	printf("sqes ptr    = 0x%p\n", s->sqes);

	ptr = mmap(0, p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe),
			PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd,
			IORING_OFF_CQ_RING);
	printf("cq_ring ptr = 0x%p\n", ptr);
	cring->head = ptr + p.cq_off.head;
	cring->tail = ptr + p.cq_off.tail;
	cring->ring_mask = ptr + p.cq_off.ring_mask;
	cring->ring_entries = ptr + p.cq_off.ring_entries;
	cring->cqes = ptr + p.cq_off.cqes;
	cq_ring_mask = *cring->ring_mask;
	return 0;
}

static void file_depths(char *buf)
{
	struct submitter *s = submitter;
	char *p;
	int i;

	buf[0] = '\0';
	p = buf;
	for (i = 0; i < s->nr_files; i++) {
		struct file *f = &s->files[i];

		if (i + 1 == s->nr_files)
			p += sprintf(p, "%d", f->pending_ios);
		else
			p += sprintf(p, "%d, ", f->pending_ios);
	}
}

static void usage(char *argv)
{
	printf("%s [options] -- [filenames]\n"
		" -d <int> : IO Depth, default %d\n"
		" -s <int> : Batch submit, default %d\n"
		" -c <int> : Batch complete, default %d\n"
		" -b <int> : Block size, default %d\n"
		" -p <bool> : Polled IO, default %d\n",
		argv, DEPTH, BATCH_SUBMIT, BATCH_COMPLETE, BS, polled);
	exit(0);
}

int main(int argc, char *argv[])
{
	struct submitter *s;
	unsigned long done, calls, reap;
	int i, flags, fd, opt;
	char *fdepths;
	void *ret;

	if (!do_nop && argc < 2) {
		printf("%s: filename [options]\n", argv[0]);
		return 1;
	}

	while ((opt = getopt(argc, argv, "d:s:c:b:p:B:F:h?")) != -1) {
		switch (opt) {
		case 'd':
			depth = atoi(optarg);
			break;
		case 's':
			batch_submit = atoi(optarg);
			break;
		case 'c':
			batch_complete = atoi(optarg);
			break;
		case 'b':
			bs = atoi(optarg);
			break;
		case 'p':
			polled = !!atoi(optarg);
			break;
		case 'B':
			fixedbufs = !!atoi(optarg);
			break;
		case 'F':
			register_files = !!atoi(optarg);
			break;
		case 'h':
		case '?':
		default:
			usage(argv[0]);
			break;
		}
	}

	submitter = malloc(sizeof(*submitter) + depth * sizeof(struct iovec));
	memset(submitter, 0, sizeof(*submitter) + depth * sizeof(struct iovec));
	s = submitter;

	flags = O_RDONLY | O_NOATIME;
	if (!buffered)
		flags |= O_DIRECT;

	i = optind;
	while (!do_nop && i < argc) {
		struct file *f;

		if (s->nr_files == MAX_FDS) {
			printf("Max number of files (%d) reached\n", MAX_FDS);
			break;
		}
		fd = open(argv[i], flags);
		if (fd < 0) {
			perror("open");
			return 1;
		}

		f = &s->files[s->nr_files];
		f->real_fd = fd;
		if (get_file_size(f)) {
			printf("failed getting size of device/file\n");
			return 1;
		}
		if (f->max_blocks <= 1) {
			printf("Zero file/device size?\n");
			return 1;
		}
		f->max_blocks--;

		printf("Added file %s\n", argv[i]);
		s->nr_files++;
		i++;
	}

	if (fixedbufs) {
		struct rlimit rlim;

		rlim.rlim_cur = RLIM_INFINITY;
		rlim.rlim_max = RLIM_INFINITY;
		if (setrlimit(RLIMIT_MEMLOCK, &rlim) < 0) {
			perror("setrlimit");
			return 1;
		}
	}

	arm_sig_int();

	for (i = 0; i < depth; i++) {
		void *buf;

		if (posix_memalign(&buf, bs, bs)) {
			printf("failed alloc\n");
			return 1;
		}
		s->iovecs[i].iov_base = buf;
		s->iovecs[i].iov_len = bs;
	}

	printf("polled=%d, fixedbufs=%d, register_files=%d, buffered=%d", polled, fixedbufs, register_files, buffered);
	printf(" QD=%d, sq_ring=%d, cq_ring=%d\n", depth, 0, 0);

	pthread_create(&s->thread, NULL, submitter_fn, s);

	fdepths = malloc(8 * s->nr_files);
	reap = calls = done = 0;
	do {
		unsigned long this_done = 0;
		unsigned long this_reap = 0;
		unsigned long this_call = 0;
		unsigned long rpc = 0, ipc = 0;

		sleep(1);
		this_done += s->done;
		this_call += s->calls;
		this_reap += s->reaps;
		if (this_call - calls) {
			rpc = (this_done - done) / (this_call - calls);
			ipc = (this_reap - reap) / (this_call - calls);
		} else
			rpc = ipc = -1;
		file_depths(fdepths);
		printf("IOPS=%lu, IOS/call=%ld/%ld, inflight=%u (%s)\n",
				this_done - done, rpc, ipc, s->inflight,
				fdepths);
		done = this_done;
		calls = this_call;
		reap = this_reap;
	} while (!finish);

	pthread_join(s->thread, &ret);
	close(s->ring_fd);
	free(fdepths);
	return 0;
}

  reply	other threads:[~2022-03-18 15:34 UTC|newest]

Thread overview: 15+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-03-18 13:52 [RFC 0/4] completion locking optimisation feature Pavel Begunkov
2022-03-18 13:52 ` [PATCH 1/4] io_uring: get rid of raw fill cqe in kill_timeout Pavel Begunkov
2022-03-18 13:52 ` [PATCH 2/4] io_uring: get rid of raw fill_cqe in io_fail_links Pavel Begunkov
2022-03-18 13:52 ` [PATCH 3/4] io_uring: remove raw fill_cqe from linked timeout Pavel Begunkov
2022-03-18 13:52 ` [PATCH 4/4] io_uring: optimise compl locking for non-shared rings Pavel Begunkov
2022-03-18 14:54   ` Jens Axboe
2022-03-18 15:13     ` Pavel Begunkov
2022-03-18 15:21       ` Jens Axboe
2022-03-18 15:32         ` Pavel Begunkov [this message]
2022-03-18 16:06           ` Jens Axboe
2022-03-18 14:42 ` [RFC 0/4] completion locking optimisation feature Pavel Begunkov
2022-03-18 14:52   ` Jens Axboe
2022-03-18 15:00     ` Pavel Begunkov
2022-03-18 15:22       ` Jens Axboe
2022-03-18 15:34         ` Pavel Begunkov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=aa3c0800-feed-50f9-e8bb-d4b861c4265c@gmail.com \
    --to=asml.silence@gmail.com \
    --cc=axboe@kernel.dk \
    --cc=io-uring@vger.kernel.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.