From: "Frederich, Jens" <Jens.Frederich@vector.com>
To: "io-uring@vger.kernel.org" <io-uring@vger.kernel.org>
Subject: Question about the optimal receiving TCP streams via io_uring
Date: Tue, 27 Oct 2020 11:01:25 +0000 [thread overview]
Message-ID: <73bd83cd579246acb1f15bb38f5dc90e@vector.com> (raw)
Hello,
I would like to receive n 10 Gbps TCP or UDP streams (jumbo frames) as fast as possible and write each socket stream to a file on a fast XFS storage. How can I optimally implement this with io_uring? I want to use io_uring for network and file IO and the CPU load should keeping low. I would like to know your opinions. My first naive implementation looks like this, But I can't get more than 1Gbps through by one TCP stream:
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <unistd.h>
#include "liburing.h"
#define MAX_CONNECTIONS 4096
#define BACKLOG 512
#define MAX_MESSAGE_LEN 9000
#define BUFFERS_COUNT MAX_CONNECTIONS
struct Stream_Server {
int port;
struct io_uring_params ring_params;
struct io_uring ring;
int socket_listen_fd;
struct sockaddr_in next_client_address;
socklen_t next_client_address_size;
uint64_t total_cqe_count;
struct Data_Analyzer *data_analyzer;
};
enum {
ACCEPT,
READ,
WRITE,
PROVIDE_BUFFERS,
};
typedef struct conn_info {
__u32 fd;
__u16 type;
__u16 bid;
} conn_info;
char bufs[BUFFERS_COUNT][MAX_MESSAGE_LEN] = {0};
int socket_buffers_group_id = 1337;
int mdf_file_buffers_group_id = 1338;
void make_accept_sqe_and_submit(struct io_uring *ring, int fd, struct sockaddr *client_address, socklen_t *client_address_size, __u8 flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_accept(sqe, fd, client_address, client_address_size, 0);
io_uring_sqe_set_flags(sqe, flags);
conn_info *conn_i = (conn_info *)&sqe->user_data;
conn_i->fd = fd;
conn_i->type = ACCEPT;
conn_i->bid = 0;
}
void make_socket_read_sqe_and_submit(struct io_uring *ring, int fd, unsigned gid, size_t message_size, __u8 flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_recv(sqe, fd, NULL, message_size, 0);
io_uring_sqe_set_flags(sqe, flags);
sqe->buf_group = gid;
conn_info *conn_i = (conn_info *)&sqe->user_data;
conn_i->fd = fd;
conn_i->type = READ;
conn_i->bid = 0;
}
void make_socket_write_sqe_and_submit(struct io_uring *ring, int fd, __u16 bid, size_t message_size, __u8 flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_send(sqe, fd, &bufs[bid], message_size, 0);
io_uring_sqe_set_flags(sqe, flags);
conn_info *conn_i = (conn_info *)&sqe->user_data;
conn_i->fd = fd;
conn_i->type = WRITE;
conn_i->bid = bid;
}
// @Temporary: support n file streams
int outfd = -1;
off_t file_offset = 0;
int file_index = 0;
void make_file_write_sqe_and_submit(struct io_uring *ring, int socket_fd, __u16 bid, size_t message_size, off_t file_offset, __u8 flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_write(sqe, outfd, &bufs[bid], message_size, file_offset);
io_uring_sqe_set_flags(sqe, flags);
conn_info *conn_i = (conn_info *)&sqe->user_data;
conn_i->fd = socket_fd;
conn_i->type = WRITE;
conn_i->bid = bid;
}
void make_provide_buffers_sqe_and_submit(struct io_uring *ring, __u16 bid, unsigned gid) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_provide_buffers(sqe, bufs[bid], MAX_MESSAGE_LEN, 1, gid, bid);
conn_info *conn_i = (conn_info *)&sqe->user_data;
conn_i->fd = 0;
conn_i->type = PROVIDE_BUFFERS;
conn_i->bid = 0;
}
struct Stream_Server *stream_server = NULL;
void main_loop_process_cqes() {
struct io_uring_cqe *cqe;
unsigned head;
stream_server->total_cqe_count = 0;
while (1) {
uint64_t cqe_count = 0;
io_uring_submit_and_wait(&stream_server->ring, 1);
//io_uring_submit(&stream_server->ring);
io_uring_for_each_cqe(&stream_server->ring, head, cqe) {
cqe_count += 1;
conn_info *conn_i = (conn_info *)&cqe->user_data;
if (cqe->res == -ENOBUFS) {
fprintf(stdout, "bufs in automatic buffer selection empty, this should not happen...\n");
fflush(stdout);
exit(1);
} else if (conn_i->type == PROVIDE_BUFFERS) {
if (cqe->res < 0) {
printf("cqe->res = %d\n", cqe->res);
exit(1);
}
} else if (conn_i->type == ACCEPT) {
int sock_conn_fd = cqe->res;
if (sock_conn_fd >= 0) {
outfd = open("/brick_storage/test_io_file.out", O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (outfd < 0) {
perror("open outfile");
exit(1);
}
make_socket_read_sqe_and_submit(&stream_server->ring, sock_conn_fd, socket_buffers_group_id, MAX_MESSAGE_LEN, IOSQE_BUFFER_SELECT);
}
// new connected client; read data from socket and re-add accept to monitor for new connections
make_accept_sqe_and_submit(&stream_server->ring, stream_server->socket_listen_fd, (struct sockaddr *)&stream_server->next_client_address, &stream_server->next_client_address_size, 0);
} else if (conn_i->type == READ) {
int bytes_read = cqe->res;
if (cqe->res <= 0) {
// connection closed or error
shutdown(conn_i->fd, SHUT_RDWR);
} else {
// bytes have been read into bufs, now add write to socket sqe
int bid = cqe->flags >> 16;
/*
int *data = (int *)&bufs[bid];
int *count = (int *) data;
int *id = (int *) data + 1;
printf("read cqe: bid %d, fd %d, count %d, id %d, bytes_read %d\n", bid, conn_i->fd, *count, *id, bytes_read);
*/
file_index += 1;
file_offset += bytes_read;
make_file_write_sqe_and_submit(&stream_server->ring, conn_i->fd, bid, bytes_read, file_offset, 0);
}
} else if (conn_i->type == WRITE) {
// write has been completed, first re-add the buffer
make_provide_buffers_sqe_and_submit(&stream_server->ring, conn_i->bid, socket_buffers_group_id);
// @Speed: Too late? What's the optimal way to keep receiving socket data as fast as possible?
make_socket_read_sqe_and_submit(&stream_server->ring, conn_i->fd, socket_buffers_group_id, MAX_MESSAGE_LEN, IOSQE_BUFFER_SELECT);
}
}
io_uring_cq_advance(&stream_server->ring, cqe_count);
stream_server->total_cqe_count += cqe_count;
}
}
int stream_server_proc(struct Stream_Server *_stream_server) {
stream_server = _stream_server;
stream_server->next_client_address_size = sizeof(stream_server->next_client_address);
struct sockaddr_in serv_addr = { 0 };
stream_server->socket_listen_fd = socket(AF_INET, SOCK_STREAM /* | SOCK_NONBLOCK */, 0);
const int val = 1;
setsockopt(stream_server->socket_listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(stream_server->port);
serv_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(stream_server->socket_listen_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
perror("Error binding socket...\n");
exit(1);
}
if (listen(stream_server->socket_listen_fd, BACKLOG) < 0) {
perror("Error listening on socket...\n");
exit(1);
}
printf("listening for connections on port: %d\n", stream_server->port);
memset(&stream_server->ring_params, 0, sizeof(stream_server->ring_params));
if (io_uring_queue_init_params(2048, &stream_server->ring, &stream_server->ring_params) < 0) {
perror("io_uring_init_failed...\n");
exit(1);
}
struct io_uring_probe *probe;
probe = io_uring_get_probe_ring(&stream_server->ring);
if (!probe || !io_uring_opcode_supported(probe, IORING_OP_PROVIDE_BUFFERS)) {
printf("Buffer select not supported, skipping...\n");
exit(0);
}
free(probe);
// first time, register buffers for buffer selection
{
struct io_uring_sqe *sqe;
struct io_uring_cqe *cqe;
sqe = io_uring_get_sqe(&stream_server->ring);
io_uring_prep_provide_buffers(sqe, bufs, MAX_MESSAGE_LEN, BUFFERS_COUNT, socket_buffers_group_id, 0);
io_uring_submit(&stream_server->ring);
io_uring_wait_cqe(&stream_server->ring, &cqe);
if (cqe->res < 0) {
printf("cqe->res = %d\n", cqe->res);
exit(1);
}
io_uring_cqe_seen(&stream_server->ring, cqe);
}
// add first accept SQE to monitor for new incoming connections
make_accept_sqe_and_submit(&stream_server->ring, stream_server->socket_listen_fd, (struct sockaddr *)&stream_server->next_client_address, &stream_server->next_client_address_size, 0);
main_loop_process_cqes();
}
Grüße / Regards
Jens Frederich
reply other threads:[~2020-10-27 11:08 UTC|newest]
Thread overview: [no followups] expand[flat|nested] mbox.gz Atom feed
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=73bd83cd579246acb1f15bb38f5dc90e@vector.com \
--to=jens.frederich@vector.com \
--cc=io-uring@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.