From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-7.1 required=3.0 tests=DKIMWL_WL_HIGH,DKIM_SIGNED, DKIM_VALID,DKIM_VALID_AU,HEADER_FROM_DIFFERENT_DOMAINS,INCLUDES_PATCH, MAILING_LIST_MULTI,SIGNED_OFF_BY,SPF_PASS autolearn=ham autolearn_force=no version=3.4.0 Received: from mail.kernel.org (mail.kernel.org [198.145.29.99]) by smtp.lore.kernel.org (Postfix) with ESMTP id D36EFC43381 for ; Mon, 4 Mar 2019 10:36:16 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 83DF7206B8 for ; Mon, 4 Mar 2019 10:36:16 +0000 (UTC) Authentication-Results: mail.kernel.org; dkim=pass (2048-bit key) header.d=oracle.com header.i=@oracle.com header.b="BvzWuOFO" Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1726121AbfCDKgP (ORCPT ); Mon, 4 Mar 2019 05:36:15 -0500 Received: from userp2130.oracle.com ([156.151.31.86]:33396 "EHLO userp2130.oracle.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1726063AbfCDKgP (ORCPT ); Mon, 4 Mar 2019 05:36:15 -0500 Received: from pps.filterd (userp2130.oracle.com [127.0.0.1]) by userp2130.oracle.com (8.16.0.27/8.16.0.27) with SMTP id x24AXlPH050708 for ; Mon, 4 Mar 2019 10:36:10 GMT DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=oracle.com; h=subject : to : cc : references : from : message-id : date : mime-version : in-reply-to : content-type : content-transfer-encoding; s=corp-2018-07-02; bh=YkBmMaYCV4WzkozmKHu/zikDVKcQhDGq1VkMriQXG3g=; b=BvzWuOFOZOWSIh86fPNKMzcrapcbTHm0/bgFjj5wLuwBUUuG6EeWKfJWvf5+C6PrafHh NnRjHJ6mg1tjCcaS/3nvZmQTiYj8XILGl7WacsbEZalNNdmZJwVtTW7mOW/wSrs/45KY dsG8ym+C2gYw28gdl+gL5qeLIdmZlPSDy9JcqEABdoRjiuPC+TDFMuM3afYYvvuOA+Y4 woSf7sQc15mmAB2nMMxyvxSmvDlWEi89v5pCTchNgE1p396YYrS8jqKA1FZ52U9Glhjk 9iPUtpieaaH6W2QrRMEn3o4I2drZlgZaLSqlc3RNbCBXc50BLVYuFPLOhl5E7uie4des +g== Received: from aserv0021.oracle.com (aserv0021.oracle.com [141.146.126.233]) by userp2130.oracle.com with ESMTP id 2qyh8tx4js-1 (version=TLSv1.2 cipher=ECDHE-RSA-AES256-GCM-SHA384 bits=256 verify=OK) for ; Mon, 04 Mar 2019 10:36:10 +0000 Received: from aserv0121.oracle.com (aserv0121.oracle.com [141.146.126.235]) by aserv0021.oracle.com (8.14.4/8.14.4) with ESMTP id x24Aa49c008127 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-GCM-SHA384 bits=256 verify=OK) for ; Mon, 4 Mar 2019 10:36:04 GMT Received: from abhmp0004.oracle.com (abhmp0004.oracle.com [141.146.116.10]) by aserv0121.oracle.com (8.14.4/8.13.8) with ESMTP id x24Aa4qg005854 for ; Mon, 4 Mar 2019 10:36:04 GMT Received: from [10.159.211.29] (/10.159.211.29) by default (Oracle Beehive Gateway v4.0) with ESMTP ; Mon, 04 Mar 2019 02:36:03 -0800 Subject: Re: [PATCH UEK5-{Master,U2,U1},UEK4-QU7] rds: Add per peer RDS socket send buffer To: netdev@vger.kernel.org Cc: santosh.shilimkar@oracle.com References: <1551695569-3796-1-git-send-email-ka-cheong.poon@oracle.com> From: Ka-Cheong Poon Organization: Oracle Corporation Message-ID: Date: Mon, 4 Mar 2019 18:35:55 +0800 User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:60.0) Gecko/20100101 Thunderbird/60.5.1 MIME-Version: 1.0 In-Reply-To: <1551695569-3796-1-git-send-email-ka-cheong.poon@oracle.com> Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit X-Proofpoint-Virus-Version: vendor=nai engine=5900 definitions=9184 signatures=668685 X-Proofpoint-Spam-Details: rule=notspam policy=default score=0 priorityscore=1501 malwarescore=0 suspectscore=3 phishscore=0 bulkscore=0 spamscore=0 clxscore=1015 lowpriorityscore=0 mlxscore=0 impostorscore=0 mlxlogscore=999 adultscore=0 classifier=spam adjust=0 reason=mlx scancount=1 engine=8.0.1-1810050000 definitions=main-1903040078 Sender: netdev-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: netdev@vger.kernel.org Please ignore this review request. It was sent to the wrong alias. I'm sorry about this. On 3/4/19 6:32 PM, Ka-Cheong Poon wrote: > Currently, an RDS socket's send buffer is shared by all the peers this > socket communicates with. When one or more peers are slow to receive, > their unacknowledged data can consume a large portion of the socket's > buffer. This will affect the communication with other peers of the > socket. > > To resolve this issue, an RDS socket's send buffer is now per peer. > This works like opening multiple TCP sockets to different peers, each > peer has its own send buffer (in each socket). In RDS, each peer has > its own buffer space in the socket. With this per peer send buffer, > when one or more peers are slow, their data will not interfere with > data sent to other peers. > > A sysctl parameter, sock_max_peers, is added to limit the number of > peers a socket can communicate with. The default is 8192. Its range > of valid value is 128 to 65536. > > Orabug: 28314151 > > Tested-by: Rose Wang > Signed-off-by: Ka-Cheong Poon > --- > net/rds/af_rds.c | 129 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- > net/rds/rds.h | 49 +++++++++++++++++---- > net/rds/recv.c | 6 +-- > net/rds/send.c | 90 +++++++++++++++++++++++++------------- > net/rds/sysctl.c | 15 ++++++- > 5 files changed, 244 insertions(+), 45 deletions(-) > > diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c > index 0b86de5..ad51714 100644 > --- a/net/rds/af_rds.c > +++ b/net/rds/af_rds.c > @@ -1,5 +1,5 @@ > /* > - * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved. > + * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved. > * > * This software is available to you under a choice of one of two > * licenses. You may choose to be licensed under the terms of the GNU > @@ -80,6 +80,17 @@ char *rds_str_array(char **array, size_t elements, size_t index) > static LIST_HEAD(rds_sock_list); > DECLARE_WAIT_QUEUE_HEAD(rds_poll_waitq); > > +/* kmem cache slab for struct rds_buf_info */ > +static struct kmem_cache *rds_rs_buf_info_slab; > + > +/* Helper function to be passed to rhashtable_free_and_destroy() to free a > + * struct rs_buf_info. > + */ > +static void rds_buf_info_free(void *rsbi, void *arg) > +{ > + kmem_cache_free(rds_rs_buf_info_slab, rsbi); > +} > + > /* > * This is called as the final descriptor referencing this socket is closed. > * We have to unbind the socket so that another socket can be bound to the > @@ -112,6 +123,9 @@ static int rds_release(struct socket *sock) > rds_rdma_drop_keys(rs); > rds_notify_queue_get(rs, NULL); > > + rhashtable_free_and_destroy(&rs->rs_buf_info_tbl, rds_buf_info_free, > + NULL); > + > spin_lock_bh(&rds_sock_lock); > list_del_init(&rs->rs_item); > rds_sock_count--; > @@ -272,10 +286,18 @@ static unsigned int rds_poll(struct file *file, struct socket *sock, > if (!list_empty(&rs->rs_recv_queue) > || !list_empty(&rs->rs_notify_queue)) > mask |= (POLLIN | POLLRDNORM); > - if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) > - mask |= (POLLOUT | POLLWRNORM); > read_unlock_irqrestore(&rs->rs_recv_lock, flags); > > + /* Use the number of destination this socket has to estimate the > + * send buffer size. When there is no peer yet, return the default > + * send buffer size. > + */ > + spin_lock_irqsave(&rs->rs_snd_lock, flags); > + if (rs->rs_snd_bytes < max(rs->rs_buf_info_dest_cnt, (u32)1) * > + rds_sk_sndbuf(rs)) > + mask |= (POLLOUT | POLLWRNORM); > + spin_unlock_irqrestore(&rs->rs_snd_lock, flags); > + > /* clear state any time we wake a seen-congested socket */ > if (mask) > rs->rs_seen_congestion = 0; > @@ -712,6 +734,80 @@ static int rds_getsockopt(struct socket *sock, int level, int optname, > > } > > +/* Check if there is a rs_buf_info associated with the given address. If not, > + * add one to the rds_sock. The found or added rs_buf_info is returned. If > + * there is no rs_buf_info found and a new rs_buf_info cannot be allocated, > + * NULL is returned and ret is set to the error. Once an address' rs_buf_info > + * is added, it will not be removed until the rs_sock is closed. > + */ > +struct rs_buf_info *rds_add_buf_info(struct rds_sock *rs, struct in6_addr *addr, > + int *ret, gfp_t gfp) > +{ > + struct rs_buf_info *info, *tmp_info; > + unsigned long flags; > + > + /* Normal path, peer is expected to be found most of the time. */ > + rcu_read_lock(); > + info = rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr, > + rs_buf_info_params); > + if (info) { > + rcu_read_unlock(); > + *ret = 0; > + return info; > + } > + rcu_read_unlock(); > + > + /* Allocate the buffer outside of lock first. */ > + tmp_info = kmem_cache_alloc(rds_rs_buf_info_slab, gfp); > + if (!tmp_info) { > + *ret = -ENOMEM; > + return NULL; > + } > + > + spin_lock_irqsave(&rs->rs_snd_lock, flags); > + > + /* Cannot add more peer. */ > + if (rs->rs_buf_info_dest_cnt + 1 > rds_sock_max_peers) { > + spin_unlock_irqrestore(&rs->rs_snd_lock, flags); > + *ret = -ENFILE; > + return NULL; > + } > + > + tmp_info->rsbi_key = *addr; > + tmp_info->rsbi_snd_bytes = 0; > + *ret = rhashtable_insert_fast(&rs->rs_buf_info_tbl, > + &tmp_info->rsbi_link, rs_buf_info_params); > + if (!*ret) { > + rs->rs_buf_info_dest_cnt++; > + spin_unlock_irqrestore(&rs->rs_snd_lock, flags); > + return tmp_info; > + } else if (*ret != -EEXIST) { > + spin_unlock_irqrestore(&rs->rs_snd_lock, flags); > + kmem_cache_free(rds_rs_buf_info_slab, tmp_info); > + /* Very unlikely to happen... */ > + printk(KERN_ERR "%s: cannot add rs_buf_info for %pI6c: %d\n", > + __func__, addr, *ret); > + return NULL; > + } > + > + /* Another thread beats us in adding the rs_buf_info.... */ > + info = rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr, > + rs_buf_info_params); > + spin_unlock_irqrestore(&rs->rs_snd_lock, flags); > + kmem_cache_free(rds_rs_buf_info_slab, tmp_info); > + > + if (info) { > + *ret = 0; > + return info; > + } > + > + /* Should not happen... */ > + printk(KERN_ERR "%s: cannot find rs_buf_info for %pI6c\n", __func__, > + addr); > + *ret = -EINVAL; > + return NULL; > +} > + > static int rds_connect(struct socket *sock, struct sockaddr *uaddr, > int addr_len, int flags) > { > @@ -800,6 +896,12 @@ static int rds_connect(struct socket *sock, struct sockaddr *uaddr, > break; > } > > + if (!ret && > + !rds_add_buf_info(rs, &rs->rs_conn_addr, &ret, GFP_KERNEL)) { > + /* Need to clear the connected info in case of error. */ > + rs->rs_conn_addr = in6addr_any; > + rs->rs_conn_port = 0; > + } > release_sock(sk); > return ret; > } > @@ -842,6 +944,7 @@ static void rds_sock_destruct(struct sock *sk) > static int __rds_create(struct socket *sock, struct sock *sk, int protocol) > { > struct rds_sock *rs; > + int ret; > > sock_init_data(sock, sk); > sock->ops = &rds_proto_ops; > @@ -863,6 +966,11 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol) > rs->rs_netfilter_enabled = 0; > rs->rs_rx_traces = 0; > > + spin_lock_init(&rs->rs_snd_lock); > + ret = rhashtable_init(&rs->rs_buf_info_tbl, &rs_buf_info_params); > + if (ret) > + return ret; > + > if (!ipv6_addr_any(&rs->rs_bound_addr)) { > printk(KERN_CRIT "bound addr %pI6c at create\n", > &rs->rs_bound_addr); > @@ -879,6 +987,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol) > static int rds_create(struct net *net, struct socket *sock, int protocol, int kern) > { > struct sock *sk; > + int ret; > > if (sock->type != SOCK_SEQPACKET || > (protocol && IPPROTO_OKA != protocol)) > @@ -888,7 +997,10 @@ static int rds_create(struct net *net, struct socket *sock, int protocol, int ke > if (!sk) > return -ENOMEM; > > - return __rds_create(sock, sk, protocol); > + ret = __rds_create(sock, sk, protocol); > + if (ret) > + sk_free(sk); > + return ret; > } > > void debug_sock_hold(struct sock *sk) > @@ -1194,6 +1306,7 @@ static void __exit rds_exit(void) > rds_info_deregister_func(RDS6_INFO_SOCKETS, rds6_sock_info); > rds_info_deregister_func(RDS6_INFO_RECV_MESSAGES, rds6_sock_inc_info); > #endif > + kmem_cache_destroy(rds_rs_buf_info_slab); > } > > module_exit(rds_exit); > @@ -1204,6 +1317,14 @@ static int __init rds_init(void) > { > int ret; > > + rds_rs_buf_info_slab = kmem_cache_create("rds_rs_buf_info", > + sizeof(struct rs_buf_info), > + 0, SLAB_HWCACHE_ALIGN, NULL); > + if (!rds_rs_buf_info_slab) { > + ret = -ENOMEM; > + goto out; > + } > + > net_get_random_once(&rds_gen_num, sizeof(rds_gen_num)); > > rds_bind_lock_init(); > diff --git a/net/rds/rds.h b/net/rds/rds.h > index 31ab639..435caa5 100644 > --- a/net/rds/rds.h > +++ b/net/rds/rds.h > @@ -12,6 +12,7 @@ > #include > #include > #include > +#include > > #include "info.h" > > @@ -725,6 +726,13 @@ struct rds_transport { > struct rdma_cm_event *event); > }; > > +/* Used to store per peer socket buffer info. */ > +struct rs_buf_info { > + struct in6_addr rsbi_key; > + struct rhash_head rsbi_link; > + u32 rsbi_snd_bytes; > +}; > + > struct rds_sock { > struct sock rs_sk; > > @@ -757,28 +765,34 @@ struct rds_sock { > /* seen congestion (ENOBUFS) when sending? */ > int rs_seen_congestion; > > - /* rs_lock protects all these adjacent members before the newline */ > - spinlock_t rs_lock; > - struct list_head rs_send_queue; > - u32 rs_snd_bytes; > - int rs_rcv_bytes; > - struct list_head rs_notify_queue; /* currently used for failed RDMAs */ > - > - /* Congestion wake_up. If rs_cong_monitor is set, we use cong_mask > + /* rs_lock protects all these adjacent members before the newline. > + * > + * Congestion wake_up. If rs_cong_monitor is set, we use cong_mask > * to decide whether the application should be woken up. > * If not set, we use rs_cong_track to find out whether a cong map > * update arrived. > */ > + spinlock_t rs_lock; > uint64_t rs_cong_mask; > uint64_t rs_cong_notify; > struct list_head rs_cong_list; > unsigned long rs_cong_track; > + struct list_head rs_notify_queue; /* currently used for failed RDMAs */ > + > + /* rs_snd_lock protects all these adjacent members before the > + * newline */ > + spinlock_t rs_snd_lock; > + struct list_head rs_send_queue; > + u32 rs_snd_bytes; /* Total bytes to all peers */ > + u32 rs_buf_info_dest_cnt; > + struct rhashtable rs_buf_info_tbl; > > /* > * rs_recv_lock protects the receive queue, and is > * used to serialize with rds_release. > */ > rwlock_t rs_recv_lock; > + int rs_rcv_bytes; > struct list_head rs_recv_queue; > > /* just for stats reporting */ > @@ -867,6 +881,25 @@ struct rds_statistics { > }; > > /* af_rds.c */ > +#define RDS_SOCK_BUF_INFO_HTBL_SIZE 512 > +static const struct rhashtable_params rs_buf_info_params = { > + .nelem_hint = RDS_SOCK_BUF_INFO_HTBL_SIZE, > + .key_len = sizeof(struct in6_addr), > + .key_offset = offsetof(struct rs_buf_info, rsbi_key), > + .head_offset = offsetof(struct rs_buf_info, rsbi_link), > +}; > + > +/* Maximum number of peers a socket can communicate with */ > +extern unsigned int rds_sock_max_peers; > + > +struct rs_buf_info *rds_add_buf_info(struct rds_sock *, struct in6_addr *, > + int *, gfp_t); > +static inline struct rs_buf_info *rds_get_buf_info(struct rds_sock *rs, > + struct in6_addr *addr) > +{ > + return rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr, > + rs_buf_info_params); > +} > char *rds_str_array(char **array, size_t elements, size_t index); > void rds_sock_addref(struct rds_sock *rs); > void rds_sock_put(struct rds_sock *rs); > diff --git a/net/rds/recv.c b/net/rds/recv.c > index 7138f75..4402b02 100644 > --- a/net/rds/recv.c > +++ b/net/rds/recv.c > @@ -1,5 +1,5 @@ > /* > - * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved. > + * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved. > * > * This software is available to you under a choice of one of two > * licenses. You may choose to be licensed under the terms of the GNU > @@ -900,9 +900,9 @@ static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr) > if (err) > return err; > > - spin_lock_irqsave(&rs->rs_lock, flags); > + spin_lock_irqsave(&rs->rs_snd_lock, flags); > rs->rs_cong_notify &= ~notify; > - spin_unlock_irqrestore(&rs->rs_lock, flags); > + spin_unlock_irqrestore(&rs->rs_snd_lock, flags); > > return 0; > } > diff --git a/net/rds/send.c b/net/rds/send.c > index 676f38b..1f1a54c 100644 > --- a/net/rds/send.c > +++ b/net/rds/send.c > @@ -1,5 +1,5 @@ > /* > - * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved. > + * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved. > * > * This software is available to you under a choice of one of two > * licenses. You may choose to be licensed under the terms of the GNU > @@ -548,10 +548,16 @@ int rds_send_xmit(struct rds_conn_path *cp) > static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm) > { > u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len); > + struct rs_buf_info *bufi; > > - assert_spin_locked(&rs->rs_lock); > + assert_spin_locked(&rs->rs_snd_lock); > > + bufi = rds_get_buf_info(rs, &rm->m_daddr); > + /* bufi cannot be NULL as an address's rs_buf_info is never deleted. */ > + BUG_ON(!bufi); > + BUG_ON(bufi->rsbi_snd_bytes < len); > BUG_ON(rs->rs_snd_bytes < len); > + bufi->rsbi_snd_bytes -= len; > rs->rs_snd_bytes -= len; > > if (rs->rs_snd_bytes == 0) > @@ -764,11 +770,12 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) > rs = rm->m_rs; > debug_sock_hold(rds_rs_to_sk(rs)); > } > - spin_lock(&rs->rs_lock); > > + spin_lock(&rs->rs_snd_lock); > if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { > list_del_init(&rm->m_sock_item); > rds_send_sndbuf_remove(rs, rm); > + spin_unlock(&rs->rs_snd_lock); > > if (rm->rdma.op_active && rm->rdma.op_notifier) { > struct rm_rdma_op *ro = &rm->rdma; > @@ -776,8 +783,10 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) > > if (ro->op_notify || status) { > notifier = ro->op_notifier; > + spin_lock(&rs->rs_lock); > list_add_tail(¬ifier->n_list, > &rs->rs_notify_queue); > + spin_unlock(&rs->rs_lock); > if (!notifier->n_status) > notifier->n_status = status; > } else > @@ -789,8 +798,10 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) > > if (ao->op_notify || status) { > notifier = ao->op_notifier; > + spin_lock(&rs->rs_lock); > list_add_tail(¬ifier->n_list, > &rs->rs_notify_queue); > + spin_unlock(&rs->rs_lock); > if (!notifier->n_status) > notifier->n_status = status; > } else > @@ -802,8 +813,10 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) > > if (so->op_notify || status) { > notifier = so->op_notifier; > + spin_lock(&rs->rs_lock); > list_add_tail(¬ifier->n_list, > &rs->rs_notify_queue); > + spin_unlock(&rs->rs_lock); > if (!notifier->n_status) > notifier->n_status = status; > } else > @@ -813,8 +826,8 @@ void rds_send_remove_from_sock(struct list_head *messages, int status) > > was_on_sock = 1; > rm->m_rs = NULL; > - } > - spin_unlock(&rs->rs_lock); > + } else > + spin_unlock(&rs->rs_snd_lock); > > unlock_and_drop: > spin_unlock_irqrestore(&rm->m_rs_lock, flags); > @@ -885,8 +898,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest) > LIST_HEAD(list); > int conn_dropped = 0; > > - /* get all the messages we're dropping under the rs lock */ > - spin_lock_irqsave(&rs->rs_lock, flags); > + /* get all the messages we're dropping under the rs_snd_lock */ > + spin_lock_irqsave(&rs->rs_snd_lock, flags); > > list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) { > if (dest && > @@ -899,10 +912,10 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest) > clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags); > } > > - /* order flag updates with the rs lock */ > + /* order flag updates with the rs_snd_lock */ > smp_mb__after_atomic(); > > - spin_unlock_irqrestore(&rs->rs_lock, flags); > + spin_unlock_irqrestore(&rs->rs_snd_lock, flags); > > if (list_empty(&list)) > return; > @@ -935,9 +948,9 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest) > */ > spin_lock_irqsave(&rm->m_rs_lock, flags); > > - spin_lock(&rs->rs_lock); > + spin_lock(&rs->rs_snd_lock); > __rds_send_complete(rs, rm, RDS_RDMA_SEND_CANCELED); > - spin_unlock(&rs->rs_lock); > + spin_unlock(&rs->rs_snd_lock); > > rm->m_rs = NULL; > spin_unlock_irqrestore(&rm->m_rs_lock, flags); > @@ -970,9 +983,9 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest) > */ > spin_lock_irqsave(&rm->m_rs_lock, flags); > > - spin_lock(&rs->rs_lock); > + spin_lock(&rs->rs_snd_lock); > __rds_send_complete(rs, rm, RDS_RDMA_SEND_CANCELED); > - spin_unlock(&rs->rs_lock); > + spin_unlock(&rs->rs_snd_lock); > > rm->m_rs = NULL; > spin_unlock_irqrestore(&rm->m_rs_lock, flags); > @@ -989,7 +1002,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest) > static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, > struct rds_conn_path *cp, > struct rds_message *rm, __be16 sport, > - __be16 dport, int *queued) > + __be16 dport, int *queued, > + struct rs_buf_info *bufi) > { > unsigned long flags; > u32 len; > @@ -999,9 +1013,9 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, > > len = be32_to_cpu(rm->m_inc.i_hdr.h_len); > > - /* this is the only place which holds both the socket's rs_lock > + /* this is the only place which holds both the socket's rs_snd_lock > * and the connection's c_lock */ > - spin_lock_irqsave(&rs->rs_lock, flags); > + spin_lock_irqsave(&rs->rs_snd_lock, flags); > > /* > * If there is a little space in sndbuf, we don't queue anything, > @@ -1011,7 +1025,10 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, > * rs_snd_bytes here to allow the last msg to exceed the buffer, > * and poll() now knows no more data can be sent. > */ > - if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) { > + if (bufi->rsbi_snd_bytes < rds_sk_sndbuf(rs)) { > + bufi->rsbi_snd_bytes += len; > + > + /* Record the total number of snd_bytes of all peers. */ > rs->rs_snd_bytes += len; > > /* let recv side know we are close to send space exhaustion. > @@ -1019,7 +1036,7 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, > * means we set the flag on *all* messages as soon as our > * throughput hits a certain threshold. > */ > - if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2) > + if (bufi->rsbi_snd_bytes >= rds_sk_sndbuf(rs) / 2) > set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); > > list_add_tail(&rm->m_sock_item, &rs->rs_send_queue); > @@ -1037,7 +1054,7 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, > spin_lock(&cp->cp_lock); > if (cp->cp_pending_flush) { > spin_unlock(&cp->cp_lock); > - spin_unlock_irqrestore(&rs->rs_lock, flags); > + spin_unlock_irqrestore(&rs->rs_snd_lock, flags); > goto out; > } > rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++); > @@ -1046,14 +1063,14 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, > > spin_unlock(&cp->cp_lock); > > - rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n", > - rm, len, rs, rs->rs_snd_bytes, > + rdsdebug("queued msg %p len %d, rs %p bytes %u (%u) seq %llu\n", > + rm, len, rs, rs->rs_snd_bytes, bufi->rsbi_snd_bytes, > (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence)); > > *queued = 1; > } > > - spin_unlock_irqrestore(&rs->rs_lock, flags); > + spin_unlock_irqrestore(&rs->rs_snd_lock, flags); > out: > return *queued; > } > @@ -1257,6 +1274,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) > long timeo = sock_sndtimeo(sk, nonblock); > size_t total_payload_len = payload_len, rdma_payload_len = 0; > struct rds_conn_path *cpath; > + struct rs_buf_info *bufi; > struct in6_addr daddr; > __u32 scope_id = 0; > int namelen; > @@ -1395,19 +1413,25 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) > goto out; > } > > + > + bufi = rds_add_buf_info(rs, &daddr, &ret, GFP_KERNEL); > + if (!bufi) > + goto out; > + > /* > * Avoid copying the message from user-space if we already > * know there's no space in the send buffer. > * The check is a negated version of the condition used inside > - * function "rds_send_queue_rm": "if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))", > + * function "rds_send_queue_rm": > + * "if (bufi->rsbi_snd_bytes < rds_sk_sndbuf(rs))", > * which needs some reconsideration, as it unexpectedly checks > * if half of the send-buffer space is available, instead of > * checking if the given message would fit. > */ > if (nonblock) { > - spin_lock_irqsave(&rs->rs_lock, flags); > - no_space = rs->rs_snd_bytes >= rds_sk_sndbuf(rs); > - spin_unlock_irqrestore(&rs->rs_lock, flags); > + spin_lock_irqsave(&rs->rs_snd_lock, flags); > + no_space = bufi->rsbi_snd_bytes >= rds_sk_sndbuf(rs); > + spin_unlock_irqrestore(&rs->rs_snd_lock, flags); > if (no_space) { > ret = -EAGAIN; > goto out; > @@ -1525,7 +1549,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) > } > > while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port, > - dport, &queued)) { > + dport, &queued, bufi)) { > rds_stats_inc(s_send_queue_full); > > if (nonblock) { > @@ -1541,7 +1565,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) > rds_send_queue_rm(rs, conn, cpath, rm, > rs->rs_bound_port, > dport, > - &queued), > + &queued, bufi), > timeo); > rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo); > if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT) > @@ -1591,6 +1615,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) > int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs, > struct sk_buff *skb, gfp_t gfp) > { > + struct rs_buf_info *bufi; > struct rds_nf_hdr *dst; > struct rds_message *rm = NULL; > struct scatterlist *sg; > @@ -1609,6 +1634,13 @@ int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs, > if (ret < 0) > goto out; > > + bufi = rds_add_buf_info(rs, &dst->daddr, &ret, gfp); > + if (!bufi) { > + rds_rtd(RDS_RTD_ERR, "failed to allocate rs %p sbuf to %pI6c", > + rs, &dst->daddr); > + goto out; > + } > + > /* create ourselves a new message to send out the data */ > rm = rds_message_alloc(ret, gfp); > if (!rm) { > @@ -1677,7 +1709,7 @@ int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs, > > /* only take a single pass */ > if (!rds_send_queue_rm(rs, conn, &conn->c_path[0], rm, > - rs->rs_bound_port, dst->dport, &queued)) { > + rs->rs_bound_port, dst->dport, &queued, bufi)) { > rds_rtd(RDS_RTD_SND, "cannot block on internal send rs %p", rs); > rds_stats_inc(s_send_queue_full); > > diff --git a/net/rds/sysctl.c b/net/rds/sysctl.c > index b22e8b8..21bc056e 100644 > --- a/net/rds/sysctl.c > +++ b/net/rds/sysctl.c > @@ -1,5 +1,5 @@ > /* > - * Copyright (c) 2006 Oracle. All rights reserved. > + * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved. > * > * This software is available to you under a choice of one of two > * licenses. You may choose to be licensed under the terms of the GNU > @@ -52,6 +52,10 @@ > unsigned int rds_sysctl_shutdown_trace_start_time; > unsigned int rds_sysctl_shutdown_trace_end_time; > > +unsigned int rds_sock_max_peers_min = 128; > +unsigned int rds_sock_max_peers_max = 65536; > +unsigned int rds_sock_max_peers = 8192; > + > /* > * We have official values, but must maintain the sysctl interface for existing > * software that expects to find these values here. > @@ -127,6 +131,15 @@ > .mode = 0644, > .proc_handler = &proc_dointvec, > }, > + { > + .procname = "sock_max_peers", > + .data = &rds_sock_max_peers, > + .maxlen = sizeof(int), > + .mode = 0644, > + .proc_handler = &proc_dointvec_minmax, > + .extra1 = &rds_sock_max_peers_min, > + .extra2 = &rds_sock_max_peers_max > + }, > { } > }; > > -- K. Poon ka-cheong.poon@oracle.com