All of lore.kernel.org
 help / color / mirror / Atom feed
From: Stephen Hemminger <stephen@networkplumber.org>
To: kys@microsoft.com, haiyangz@microsoft.com, sthemmin@microsoft.com
Cc: devel@linuxdriverproject.org, netdev@vger.kernel.org
Subject: [PATCH net-next 3/5] netvsc: optimize receive completions
Date: Tue, 25 Jul 2017 13:04:20 -0700	[thread overview]
Message-ID: <20170725200422.13795-4-sthemmin@microsoft.com> (raw)
In-Reply-To: <20170725200422.13795-1-sthemmin@microsoft.com>

Optimize how receive completion ring are managed.
   * Allocate only as many slots as needed for all buffers from host
   * Allocate before setting up sub channel for better error detection
   * Don't need to keep copy of initial receive section message
   * Only needt keep the transaction id, status doesn't matter
   * Precompute the watermark for when receive flushing is needed
   * Replace division with conditional test
   * Replace atomic per-device variable with per-channel check.
   * Handle corner case where receive completion send
     fails if ring buffer to host is full.

Signed-off-by: Stephen Hemminger <sthemmin@microsoft.com>
---
 drivers/net/hyperv/hyperv_net.h   |  19 +--
 drivers/net/hyperv/netvsc.c       | 270 +++++++++++++++-----------------------
 drivers/net/hyperv/rndis_filter.c |  20 +--
 3 files changed, 125 insertions(+), 184 deletions(-)

diff --git a/drivers/net/hyperv/hyperv_net.h b/drivers/net/hyperv/hyperv_net.h
index fb62ea632914..b0259b12c5ee 100644
--- a/drivers/net/hyperv/hyperv_net.h
+++ b/drivers/net/hyperv/hyperv_net.h
@@ -186,6 +186,7 @@ struct net_device_context;
 
 struct netvsc_device *netvsc_device_add(struct hv_device *device,
 					const struct netvsc_device_info *info);
+int netvsc_alloc_recv_comp_ring(struct netvsc_device *net_device, u32 q_idx);
 void netvsc_device_remove(struct hv_device *device);
 int netvsc_send(struct net_device_context *ndc,
 		struct hv_netvsc_packet *packet,
@@ -652,18 +653,10 @@ struct multi_send_data {
 	u32 count; /* counter of batched packets */
 };
 
-struct recv_comp_data {
-	u64 tid; /* transaction id */
-	u32 status;
-};
-
-/* Netvsc Receive Slots Max */
-#define NETVSC_RECVSLOT_MAX (NETVSC_RECEIVE_BUFFER_SIZE / ETH_DATA_LEN + 1)
-
 struct multi_recv_comp {
-	void *buf; /* queued receive completions */
-	u32 first; /* first data entry */
-	u32 next; /* next entry for writing */
+	u64 *slots;	/* pending completion transactions */
+	u32 first;	/* first data entry */
+	u32 next;	/* next entry for writing */
 };
 
 struct netvsc_stats {
@@ -750,7 +743,7 @@ struct netvsc_device {
 	u32 recv_buf_size;
 	u32 recv_buf_gpadl_handle;
 	u32 recv_section_cnt;
-	struct nvsp_1_receive_buffer_section *recv_section;
+	u32 recv_completion_cnt;
 
 	/* Send buffer allocated by us */
 	void *send_buf;
@@ -778,8 +771,6 @@ struct netvsc_device {
 	u32 max_pkt; /* max number of pkt in one send, e.g. 8 */
 	u32 pkt_align; /* alignment bytes, e.g. 8 */
 
-	atomic_t num_outstanding_recvs;
-
 	atomic_t open_cnt;
 
 	struct netvsc_channel chan_table[VRSS_CHANNEL_MAX];
diff --git a/drivers/net/hyperv/netvsc.c b/drivers/net/hyperv/netvsc.c
index 94c00acac58a..03c1fec762c7 100644
--- a/drivers/net/hyperv/netvsc.c
+++ b/drivers/net/hyperv/netvsc.c
@@ -72,9 +72,6 @@ static struct netvsc_device *alloc_net_device(void)
 	if (!net_device)
 		return NULL;
 
-	net_device->chan_table[0].mrc.buf
-		= vzalloc(NETVSC_RECVSLOT_MAX * sizeof(struct recv_comp_data));
-
 	init_waitqueue_head(&net_device->wait_drain);
 	net_device->destroy = false;
 	atomic_set(&net_device->open_cnt, 0);
@@ -92,7 +89,7 @@ static void free_netvsc_device(struct rcu_head *head)
 	int i;
 
 	for (i = 0; i < VRSS_CHANNEL_MAX; i++)
-		vfree(nvdev->chan_table[i].mrc.buf);
+		vfree(nvdev->chan_table[i].mrc.slots);
 
 	kfree(nvdev);
 }
@@ -171,12 +168,6 @@ static void netvsc_destroy_buf(struct hv_device *device)
 		net_device->recv_buf = NULL;
 	}
 
-	if (net_device->recv_section) {
-		net_device->recv_section_cnt = 0;
-		kfree(net_device->recv_section);
-		net_device->recv_section = NULL;
-	}
-
 	/* Deal with the send buffer we may have setup.
 	 * If we got a  send section size, it means we received a
 	 * NVSP_MSG1_TYPE_SEND_SEND_BUF_COMPLETE msg (ie sent
@@ -239,11 +230,26 @@ static void netvsc_destroy_buf(struct hv_device *device)
 	kfree(net_device->send_section_map);
 }
 
+int netvsc_alloc_recv_comp_ring(struct netvsc_device *net_device, u32 q_idx)
+{
+	struct netvsc_channel *nvchan = &net_device->chan_table[q_idx];
+	int node = cpu_to_node(nvchan->channel->target_cpu);
+	size_t size;
+
+	size = net_device->recv_completion_cnt * sizeof(u64);
+	nvchan->mrc.slots = vzalloc_node(size, node);
+	if (!nvchan->mrc.slots)
+		nvchan->mrc.slots = vzalloc(size);
+
+	return nvchan->mrc.slots ? 0 : -ENOMEM;
+}
+
 static int netvsc_init_buf(struct hv_device *device,
 			   struct netvsc_device *net_device)
 {
 	int ret = 0;
 	struct nvsp_message *init_packet;
+	struct nvsp_1_message_send_receive_buffer_complete *resp;
 	struct net_device *ndev;
 	size_t map_words;
 	int node;
@@ -300,43 +306,41 @@ static int netvsc_init_buf(struct hv_device *device,
 	wait_for_completion(&net_device->channel_init_wait);
 
 	/* Check the response */
-	if (init_packet->msg.v1_msg.
-	    send_recv_buf_complete.status != NVSP_STAT_SUCCESS) {
-		netdev_err(ndev, "Unable to complete receive buffer "
-			   "initialization with NetVsp - status %d\n",
-			   init_packet->msg.v1_msg.
-			   send_recv_buf_complete.status);
+	resp = &init_packet->msg.v1_msg.send_recv_buf_complete;
+	if (resp->status != NVSP_STAT_SUCCESS) {
+		netdev_err(ndev,
+			   "Unable to complete receive buffer initialization with NetVsp - status %d\n",
+			   resp->status);
 		ret = -EINVAL;
 		goto cleanup;
 	}
 
 	/* Parse the response */
+	netdev_dbg(ndev, "Receive sections: %u sub_allocs: size %u count: %u\n",
+		   resp->num_sections, resp->sections[0].sub_alloc_size,
+		   resp->sections[0].num_sub_allocs);
 
-	net_device->recv_section_cnt = init_packet->msg.
-		v1_msg.send_recv_buf_complete.num_sections;
-
-	net_device->recv_section = kmemdup(
-		init_packet->msg.v1_msg.send_recv_buf_complete.sections,
-		net_device->recv_section_cnt *
-		sizeof(struct nvsp_1_receive_buffer_section),
-		GFP_KERNEL);
-	if (net_device->recv_section == NULL) {
-		ret = -EINVAL;
-		goto cleanup;
-	}
+	net_device->recv_section_cnt = resp->num_sections;
 
 	/*
 	 * For 1st release, there should only be 1 section that represents the
 	 * entire receive buffer
 	 */
 	if (net_device->recv_section_cnt != 1 ||
-	    net_device->recv_section->offset != 0) {
+	    resp->sections[0].offset != 0) {
 		ret = -EINVAL;
 		goto cleanup;
 	}
 
-	/* Now setup the send buffer.
-	 */
+	/* Setup receive completion ring */
+	net_device->recv_completion_cnt
+		= round_up(resp->sections[0].num_sub_allocs + 1,
+			   PAGE_SIZE / sizeof(u64));
+	ret = netvsc_alloc_recv_comp_ring(net_device, 0);
+	if (ret)
+		goto cleanup;
+
+	/* Now setup the send buffer. */
 	net_device->send_buf = vzalloc_node(net_device->send_buf_size, node);
 	if (!net_device->send_buf)
 		net_device->send_buf = vzalloc(net_device->send_buf_size);
@@ -950,139 +954,96 @@ int netvsc_send(struct net_device_context *ndev_ctx,
 	return ret;
 }
 
-static int netvsc_send_recv_completion(struct vmbus_channel *channel,
-				       u64 transaction_id, u32 status)
-{
-	struct nvsp_message recvcompMessage;
-	int ret;
-
-	recvcompMessage.hdr.msg_type =
-				NVSP_MSG1_TYPE_SEND_RNDIS_PKT_COMPLETE;
-
-	recvcompMessage.msg.v1_msg.send_rndis_pkt_complete.status = status;
-
-	/* Send the completion */
-	ret = vmbus_sendpacket(channel, &recvcompMessage,
-			       sizeof(struct nvsp_message_header) + sizeof(u32),
-			       transaction_id, VM_PKT_COMP, 0);
-
-	return ret;
-}
-
-static inline void count_recv_comp_slot(struct netvsc_device *nvdev, u16 q_idx,
-					u32 *filled, u32 *avail)
+/* Send pending recv completions */
+static int send_recv_completions(struct netvsc_device *nvdev,
+				 struct vmbus_channel *channel, u16 q_idx)
 {
 	struct multi_recv_comp *mrc = &nvdev->chan_table[q_idx].mrc;
-	u32 first = mrc->first;
-	u32 next = mrc->next;
+	struct recv_comp_msg {
+		struct nvsp_message_header hdr;
+		u32 status;
+	}  __packed;
+	struct recv_comp_msg msg = {
+		.hdr.msg_type = NVSP_MSG1_TYPE_SEND_RNDIS_PKT_COMPLETE,
+		.status = NVSP_STAT_SUCCESS,
+	};
+	int ret;
 
-	*filled = (first > next) ? NETVSC_RECVSLOT_MAX - first + next :
-		  next - first;
+	while (mrc->first != mrc->next) {
+		u64 tid = mrc->slots[mrc->first];
 
-	*avail = NETVSC_RECVSLOT_MAX - *filled - 1;
-}
+		ret = vmbus_sendpacket(channel, &msg, sizeof(msg),
+				       tid, VM_PKT_COMP, 0);
+		if (unlikely(ret))
+			return ret;
 
-/* Read the first filled slot, no change to index */
-static inline struct recv_comp_data *read_recv_comp_slot(struct netvsc_device
-							 *nvdev, u16 q_idx)
-{
-	struct multi_recv_comp *mrc = &nvdev->chan_table[q_idx].mrc;
-	u32 filled, avail;
-
-	if (unlikely(!mrc->buf))
-		return NULL;
+		if (++mrc->first == nvdev->recv_completion_cnt)
+			mrc->first = 0;
+	}
 
-	count_recv_comp_slot(nvdev, q_idx, &filled, &avail);
-	if (!filled)
-		return NULL;
+	/* receive completion ring has been emptied */
+	if (unlikely(nvdev->destroy))
+		wake_up(&nvdev->wait_drain);
 
-	return mrc->buf + mrc->first * sizeof(struct recv_comp_data);
+	return 0;
 }
 
-/* Put the first filled slot back to available pool */
-static inline void put_recv_comp_slot(struct netvsc_device *nvdev, u16 q_idx)
+/* Count how many receive completions are outstanding */
+static void recv_comp_slot_avail(const struct netvsc_device *nvdev,
+				 const struct multi_recv_comp *mrc,
+				 u32 *filled, u32 *avail)
 {
-	struct multi_recv_comp *mrc = &nvdev->chan_table[q_idx].mrc;
-	int num_recv;
+	u32 count = nvdev->recv_completion_cnt;
 
-	mrc->first = (mrc->first + 1) % NETVSC_RECVSLOT_MAX;
-
-	num_recv = atomic_dec_return(&nvdev->num_outstanding_recvs);
+	if (mrc->next >= mrc->first)
+		*filled = mrc->next - mrc->first;
+	else
+		*filled = (count - mrc->first) + mrc->next;
 
-	if (nvdev->destroy && num_recv == 0)
-		wake_up(&nvdev->wait_drain);
+	*avail = count - *filled - 1;
 }
 
-/* Check and send pending recv completions */
-static void netvsc_chk_recv_comp(struct netvsc_device *nvdev,
-				 struct vmbus_channel *channel, u16 q_idx)
+/* Add receive complete to ring to send to host. */
+static void enq_receive_complete(struct net_device *ndev,
+				 struct netvsc_device *nvdev, u16 q_idx,
+				 u64 tid)
 {
-	struct recv_comp_data *rcd;
-	int ret;
-
-	while (true) {
-		rcd = read_recv_comp_slot(nvdev, q_idx);
-		if (!rcd)
-			break;
+	struct netvsc_channel *nvchan = &nvdev->chan_table[q_idx];
+	struct multi_recv_comp *mrc = &nvchan->mrc;
+	u32 filled, avail;
 
-		ret = netvsc_send_recv_completion(channel, rcd->tid,
-						  rcd->status);
-		if (ret)
-			break;
+	recv_comp_slot_avail(nvdev, mrc, &filled, &avail);
 
-		put_recv_comp_slot(nvdev, q_idx);
+	if (unlikely(filled > NAPI_POLL_WEIGHT)) {
+		send_recv_completions(nvdev, nvchan->channel, q_idx);
+		recv_comp_slot_avail(nvdev, mrc, &filled, &avail);
 	}
-}
-
-#define NETVSC_RCD_WATERMARK 80
-
-/* Get next available slot */
-static inline struct recv_comp_data *get_recv_comp_slot(
-	struct netvsc_device *nvdev, struct vmbus_channel *channel, u16 q_idx)
-{
-	struct multi_recv_comp *mrc = &nvdev->chan_table[q_idx].mrc;
-	u32 filled, avail, next;
-	struct recv_comp_data *rcd;
-
-	if (unlikely(!nvdev->recv_section))
-		return NULL;
-
-	if (unlikely(!mrc->buf))
-		return NULL;
-
-	if (atomic_read(&nvdev->num_outstanding_recvs) >
-	    nvdev->recv_section->num_sub_allocs * NETVSC_RCD_WATERMARK / 100)
-		netvsc_chk_recv_comp(nvdev, channel, q_idx);
-
-	count_recv_comp_slot(nvdev, q_idx, &filled, &avail);
-	if (!avail)
-		return NULL;
-
-	next = mrc->next;
-	rcd = mrc->buf + next * sizeof(struct recv_comp_data);
-	mrc->next = (next + 1) % NETVSC_RECVSLOT_MAX;
 
-	atomic_inc(&nvdev->num_outstanding_recvs);
+	if (unlikely(!avail)) {
+		netdev_err(ndev, "Recv_comp full buf q:%hd, tid:%llx\n",
+			   q_idx, tid);
+		return;
+	}
 
-	return rcd;
+	mrc->slots[mrc->next] = tid;
+	if (++mrc->next == nvdev->recv_completion_cnt)
+		mrc->next = 0;
 }
 
 static int netvsc_receive(struct net_device *ndev,
-		   struct netvsc_device *net_device,
-		   struct net_device_context *net_device_ctx,
-		   struct hv_device *device,
-		   struct vmbus_channel *channel,
-		   const struct vmpacket_descriptor *desc,
-		   struct nvsp_message *nvsp)
+			  struct netvsc_device *net_device,
+			  struct net_device_context *net_device_ctx,
+			  struct hv_device *device,
+			  struct vmbus_channel *channel,
+			  const struct vmpacket_descriptor *desc,
+			  struct nvsp_message *nvsp)
 {
 	const struct vmtransfer_page_packet_header *vmxferpage_packet
 		= container_of(desc, const struct vmtransfer_page_packet_header, d);
 	u16 q_idx = channel->offermsg.offer.sub_channel_index;
 	char *recv_buf = net_device->recv_buf;
-	u32 status = NVSP_STAT_SUCCESS;
 	int i;
 	int count = 0;
-	int ret;
 
 	/* Make sure this is a valid nvsp packet */
 	if (unlikely(nvsp->hdr.msg_type != NVSP_MSG1_TYPE_SEND_RNDIS_PKT)) {
@@ -1109,29 +1070,13 @@ static int netvsc_receive(struct net_device *ndev,
 		u32 buflen = vmxferpage_packet->ranges[i].byte_count;
 
 		/* Pass it to the upper layer */
-		status = rndis_filter_receive(ndev, net_device, device,
-					      channel, data, buflen);
+		rndis_filter_receive(ndev, net_device, device,
+				     channel, data, buflen);
 	}
 
-	if (net_device->chan_table[q_idx].mrc.buf) {
-		struct recv_comp_data *rcd;
+	enq_receive_complete(ndev, net_device, q_idx,
+			     vmxferpage_packet->d.trans_id);
 
-		rcd = get_recv_comp_slot(net_device, channel, q_idx);
-		if (rcd) {
-			rcd->tid = vmxferpage_packet->d.trans_id;
-			rcd->status = status;
-		} else {
-			netdev_err(ndev, "Recv_comp full buf q:%hd, tid:%llx\n",
-				   q_idx, vmxferpage_packet->d.trans_id);
-		}
-	} else {
-		ret = netvsc_send_recv_completion(channel,
-						  vmxferpage_packet->d.trans_id,
-						  status);
-		if (ret)
-			netdev_err(ndev, "Recv_comp q:%hd, tid:%llx, err:%d\n",
-				   q_idx, vmxferpage_packet->d.trans_id, ret);
-	}
 	return count;
 }
 
@@ -1244,17 +1189,18 @@ int netvsc_poll(struct napi_struct *napi, int budget)
 		nvchan->desc = hv_pkt_iter_next(channel, nvchan->desc);
 	}
 
-	/* If receive ring was exhausted
-	 * and not doing busy poll
-	 * then re-enable host interrupts
-	 *  and reschedule if ring is not empty.
+	/* If send of  pending receive completions suceeded
+	 *   and did not exhaust NAPI budget
+	 *   and not doing busy poll
+	 * then reschedule if more data has arrived from host
 	 */
-	if (work_done < budget &&
+	if (send_recv_completions(net_device, channel, q_idx) == 0 &&
+	    work_done < budget &&
 	    napi_complete_done(napi, work_done) &&
-	    hv_end_read(&channel->inbound) != 0)
+	    hv_end_read(&channel->inbound)) {
+		hv_begin_read(&channel->inbound);
 		napi_reschedule(napi);
-
-	netvsc_chk_recv_comp(net_device, channel, q_idx);
+	}
 
 	/* Driver may overshoot since multiple packets per descriptor */
 	return min(work_done, budget);
diff --git a/drivers/net/hyperv/rndis_filter.c b/drivers/net/hyperv/rndis_filter.c
index bf21ea92c743..db9bac31dd09 100644
--- a/drivers/net/hyperv/rndis_filter.c
+++ b/drivers/net/hyperv/rndis_filter.c
@@ -928,12 +928,12 @@ static bool netvsc_device_idle(const struct netvsc_device *nvdev)
 {
 	int i;
 
-	if (atomic_read(&nvdev->num_outstanding_recvs) > 0)
-		return false;
-
 	for (i = 0; i < nvdev->num_chn; i++) {
 		const struct netvsc_channel *nvchan = &nvdev->chan_table[i];
 
+		if (nvchan->mrc.first != nvchan->mrc.next)
+			return false;
+
 		if (atomic_read(&nvchan->queue_sends) > 0)
 			return false;
 	}
@@ -1031,11 +1031,6 @@ static void netvsc_sc_open(struct vmbus_channel *new_sc)
 		return;
 
 	nvchan = nvscdev->chan_table + chn_index;
-	nvchan->mrc.buf
-		= vzalloc(NETVSC_RECVSLOT_MAX * sizeof(struct recv_comp_data));
-
-	if (!nvchan->mrc.buf)
-		return;
 
 	/* Because the device uses NAPI, all the interrupt batching and
 	 * control is done via Net softirq, not the channel handling
@@ -1225,6 +1220,15 @@ struct netvsc_device *rndis_filter_device_add(struct hv_device *dev,
 	if (num_rss_qs == 0)
 		return net_device;
 
+	for (i = 1; i < net_device->num_chn; i++) {
+		ret = netvsc_alloc_recv_comp_ring(net_device, i);
+		if (ret) {
+			while (--i != 0)
+				vfree(net_device->chan_table[i].mrc.slots);
+			goto out;
+		}
+	}
+
 	refcount_set(&net_device->sc_offered, num_rss_qs);
 	vmbus_set_sc_create_callback(dev->channel, netvsc_sc_open);
 
-- 
2.11.0

  parent reply	other threads:[~2017-07-25 20:04 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-07-25 20:04 [PATCH net-next 0/5] netvsc: fixes and performance related changes Stephen Hemminger
2017-07-25 20:04 ` [PATCH net-next 1/5] netvsc: fix return value for set_channels Stephen Hemminger
2017-07-25 20:04 ` [PATCH net-next 2/5] netvsc: fix warnings reported by lockdep Stephen Hemminger
2017-07-25 20:04 ` Stephen Hemminger [this message]
2017-07-25 21:07   ` [PATCH net-next 3/5] netvsc: optimize receive completions Stephen Hemminger
2017-07-25 20:04 ` [PATCH net-next 4/5] netvsc: signal host if receive ring is emptied Stephen Hemminger
2017-07-25 20:04 ` [PATCH net-next 5/5] netvsc: allow smaller send/recv buffer size Stephen Hemminger

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20170725200422.13795-4-sthemmin@microsoft.com \
    --to=stephen@networkplumber.org \
    --cc=devel@linuxdriverproject.org \
    --cc=haiyangz@microsoft.com \
    --cc=kys@microsoft.com \
    --cc=netdev@vger.kernel.org \
    --cc=sthemmin@microsoft.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.