From: Chuck Lever <chuck.lever@oracle.com>
To: linux-rdma@vger.kernel.org, linux-nfs@vger.kernel.org
Subject: [PATCH v1 06/20] xprtrdma: Refactor MR recovery work queues
Date: Tue, 07 Jun 2016 15:47:08 -0400 [thread overview]
Message-ID: <20160607194708.18401.79092.stgit@manet.1015granger.net> (raw)
In-Reply-To: <20160607194001.18401.88592.stgit@manet.1015granger.net>
I found that commit ead3f26e359e ("xprtrdma: Add ro_unmap_safe
memreg method"), which introduces ro_unmap_safe, never wired up the
FMR recovery worker.
The FMR and FRWR recovery work queues both do the same thing.
Instead of setting up separate individual work queues for this,
schedule a delayed worker to deal with them, since recovering MRs is
not performance-critical.
Fixes: ead3f26e359e ("xprtrdma: Add ro_unmap_safe memreg method")
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
net/sunrpc/xprtrdma/fmr_ops.c | 85 +++++++++++----------------------------
net/sunrpc/xprtrdma/frwr_ops.c | 68 ++++++-------------------------
net/sunrpc/xprtrdma/transport.c | 11 -----
net/sunrpc/xprtrdma/verbs.c | 42 +++++++++++++++++++
net/sunrpc/xprtrdma/xprt_rdma.h | 11 +++--
5 files changed, 87 insertions(+), 130 deletions(-)
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index b581a28..45800c9 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -35,30 +35,6 @@
/* Maximum scatter/gather per FMR */
#define RPCRDMA_MAX_FMR_SGES (64)
-static struct workqueue_struct *fmr_recovery_wq;
-
-#define FMR_RECOVERY_WQ_FLAGS (WQ_UNBOUND)
-
-int
-fmr_alloc_recovery_wq(void)
-{
- fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0);
- return !fmr_recovery_wq ? -ENOMEM : 0;
-}
-
-void
-fmr_destroy_recovery_wq(void)
-{
- struct workqueue_struct *wq;
-
- if (!fmr_recovery_wq)
- return;
-
- wq = fmr_recovery_wq;
- fmr_recovery_wq = NULL;
- destroy_workqueue(wq);
-}
-
static int
__fmr_unmap(struct rpcrdma_mw *mw)
{
@@ -68,54 +44,30 @@ __fmr_unmap(struct rpcrdma_mw *mw)
return ib_unmap_fmr(&l);
}
-static void
-__fmr_dma_unmap(struct rpcrdma_mw *mw)
-{
- struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
-
- ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
- mw->mw_sg, mw->mw_nents, mw->mw_dir);
- rpcrdma_put_mw(r_xprt, mw);
-}
-
+/* Reset of a single FMR.
+ *
+ * There's no recovery if this fails. The FMR is abandoned, but
+ * remains in rb_all. It will be cleaned up when the transport is
+ * destroyed.
+ */
static void
__fmr_reset_and_unmap(struct rpcrdma_mw *mw)
{
+ struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
int rc;
/* ORDER */
rc = __fmr_unmap(mw);
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ mw->mw_sg, mw->mw_nents, mw->mw_dir);
if (rc) {
pr_warn("rpcrdma: ib_unmap_fmr status %d, fmr %p orphaned\n",
rc, mw);
return;
}
- __fmr_dma_unmap(mw);
+ rpcrdma_put_mw(r_xprt, mw);
}
-/* Deferred reset of a single FMR. Generate a fresh rkey by
- * replacing the MR. There's no recovery if this fails.
- */
-static void
-__fmr_recovery_worker(struct work_struct *work)
-{
- struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw,
- mw_work);
-
- __fmr_reset_and_unmap(mw);
- return;
-}
-
-/* A broken MR was discovered in a context that can't sleep.
- * Defer recovery to the recovery worker.
- */
-static void
-__fmr_queue_recovery(struct rpcrdma_mw *mw)
-{
- INIT_WORK(&mw->mw_work, __fmr_recovery_worker);
- queue_work(fmr_recovery_wq, &mw->mw_work);
-}
-
static void
__fmr_release(struct rpcrdma_mw *r)
{
@@ -142,6 +94,12 @@ fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
return 0;
}
+static void
+fmr_op_recover_mr(struct rpcrdma_mw *mw)
+{
+ __fmr_reset_and_unmap(mw);
+}
+
/* FMR mode conveys up to 64 pages of payload per chunk segment.
*/
static size_t
@@ -287,7 +245,9 @@ out_maperr:
pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
len, (unsigned long long)dma_pages[0],
pageoff, mw->mw_nents, rc);
- __fmr_dma_unmap(mw);
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ mw->mw_sg, mw->mw_nents, mw->mw_dir);
+ rpcrdma_put_mw(r_xprt, mw);
return rc;
}
@@ -331,7 +291,9 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
seg = &req->rl_segments[i];
mw = seg->rl_mw;
- __fmr_dma_unmap(mw);
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ mw->mw_sg, mw->mw_nents, mw->mw_dir);
+ rpcrdma_put_mw(r_xprt, mw);
i += seg->mr_nsegs;
seg->mr_nsegs = 0;
@@ -359,7 +321,7 @@ fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (sync)
__fmr_reset_and_unmap(mw);
else
- __fmr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
i += seg->mr_nsegs;
seg->mr_nsegs = 0;
@@ -384,6 +346,7 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
.ro_map = fmr_op_map,
.ro_unmap_sync = fmr_op_unmap_sync,
.ro_unmap_safe = fmr_op_unmap_safe,
+ .ro_recover_mr = fmr_op_recover_mr,
.ro_open = fmr_op_open,
.ro_maxpages = fmr_op_maxpages,
.ro_init = fmr_op_init,
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index a476fa6..19bddfe 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -73,31 +73,6 @@
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif
-static struct workqueue_struct *frwr_recovery_wq;
-
-#define FRWR_RECOVERY_WQ_FLAGS (WQ_UNBOUND | WQ_MEM_RECLAIM)
-
-int
-frwr_alloc_recovery_wq(void)
-{
- frwr_recovery_wq = alloc_workqueue("frwr_recovery",
- FRWR_RECOVERY_WQ_FLAGS, 0);
- return !frwr_recovery_wq ? -ENOMEM : 0;
-}
-
-void
-frwr_destroy_recovery_wq(void)
-{
- struct workqueue_struct *wq;
-
- if (!frwr_recovery_wq)
- return;
-
- wq = frwr_recovery_wq;
- frwr_recovery_wq = NULL;
- destroy_workqueue(wq);
-}
-
static int
__frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
{
@@ -124,6 +99,12 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r)
return 0;
}
+/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR.
+ *
+ * There's no recovery if this fails. The FRMR is abandoned, but
+ * remains in rb_all. It will be cleaned up when the transport is
+ * destroyed.
+ */
static void
__frwr_reset_and_unmap(struct rpcrdma_mw *mw)
{
@@ -138,30 +119,10 @@ __frwr_reset_and_unmap(struct rpcrdma_mw *mw)
rpcrdma_put_mw(r_xprt, mw);
}
-/* Deferred reset of a single FRMR. Generate a fresh rkey by
- * replacing the MR.
- *
- * There's no recovery if this fails. The FRMR is abandoned, but
- * remains in rb_all. It will be cleaned up when the transport is
- * destroyed.
- */
-static void
-__frwr_recovery_worker(struct work_struct *work)
-{
- struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
- mw_work);
-
- __frwr_reset_and_unmap(r);
-}
-
-/* A broken MR was discovered in a context that can't sleep.
- * Defer recovery to the recovery worker.
- */
static void
-__frwr_queue_recovery(struct rpcrdma_mw *r)
+frwr_op_recover_mr(struct rpcrdma_mw *mw)
{
- INIT_WORK(&r->mw_work, __frwr_recovery_worker);
- queue_work(frwr_recovery_wq, &r->mw_work);
+ __frwr_reset_and_unmap(mw);
}
static int
@@ -399,7 +360,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
seg1->rl_mw = NULL;
do {
if (mw)
- __frwr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
mw = rpcrdma_get_mw(r_xprt);
if (!mw)
return -ENOMEM;
@@ -481,12 +442,11 @@ out_mapmr_err:
pr_err("rpcrdma: failed to register mr %p (%u/%u)\n",
frmr->fr_mr, n, mw->mw_nents);
rc = n < 0 ? n : -EIO;
- __frwr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
return rc;
out_senderr:
- pr_err("rpcrdma: ib_post_send status %i\n", rc);
- __frwr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
return rc;
}
@@ -627,7 +587,7 @@ frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
if (sync)
__frwr_reset_and_unmap(mw);
else
- __frwr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
i += seg->mr_nsegs;
seg->mr_nsegs = 0;
@@ -640,9 +600,6 @@ frwr_op_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_mw *r;
- /* Ensure stale MWs for "buf" are no longer in flight */
- flush_workqueue(frwr_recovery_wq);
-
while (!list_empty(&buf->rb_all)) {
r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&r->mw_all);
@@ -655,6 +612,7 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
.ro_map = frwr_op_map,
.ro_unmap_sync = frwr_op_unmap_sync,
.ro_unmap_safe = frwr_op_unmap_safe,
+ .ro_recover_mr = frwr_op_recover_mr,
.ro_open = frwr_op_open,
.ro_maxpages = frwr_op_maxpages,
.ro_init = frwr_op_init,
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 99d2e5b..29c91bf 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -741,7 +741,6 @@ void xprt_rdma_cleanup(void)
__func__, rc);
rpcrdma_destroy_wq();
- frwr_destroy_recovery_wq();
rc = xprt_unregister_transport(&xprt_rdma_bc);
if (rc)
@@ -753,20 +752,13 @@ int xprt_rdma_init(void)
{
int rc;
- rc = frwr_alloc_recovery_wq();
- if (rc)
- return rc;
-
rc = rpcrdma_alloc_wq();
- if (rc) {
- frwr_destroy_recovery_wq();
+ if (rc)
return rc;
- }
rc = xprt_register_transport(&xprt_rdma);
if (rc) {
rpcrdma_destroy_wq();
- frwr_destroy_recovery_wq();
return rc;
}
@@ -774,7 +766,6 @@ int xprt_rdma_init(void)
if (rc) {
xprt_unregister_transport(&xprt_rdma);
rpcrdma_destroy_wq();
- frwr_destroy_recovery_wq();
return rc;
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 4f7e1ba..e9f7f727 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -762,6 +762,41 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
ib_drain_qp(ia->ri_id->qp);
}
+static void
+rpcrdma_mr_recovery_worker(struct work_struct *work)
+{
+ struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
+ rb_recovery_worker.work);
+ struct rpcrdma_mw *mw;
+
+ spin_lock(&buf->rb_recovery_lock);
+ while (!list_empty(&buf->rb_stale_mrs)) {
+ mw = list_first_entry(&buf->rb_stale_mrs,
+ struct rpcrdma_mw, mw_list);
+ list_del_init(&mw->mw_list);
+ spin_unlock(&buf->rb_recovery_lock);
+
+ dprintk("RPC: %s: recovering MR %p\n", __func__, mw);
+ mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);
+
+ spin_lock(&buf->rb_recovery_lock);
+ };
+ spin_unlock(&buf->rb_recovery_lock);
+}
+
+void
+rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
+{
+ struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+
+ spin_lock(&buf->rb_recovery_lock);
+ list_add(&mw->mw_list, &buf->rb_stale_mrs);
+ spin_unlock(&buf->rb_recovery_lock);
+
+ schedule_delayed_work(&buf->rb_recovery_worker, 0);
+}
+
struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
@@ -863,6 +898,11 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
list_add(&rep->rr_list, &buf->rb_recv_bufs);
}
+ INIT_DELAYED_WORK(&buf->rb_recovery_worker,
+ rpcrdma_mr_recovery_worker);
+ spin_lock_init(&buf->rb_recovery_lock);
+ INIT_LIST_HEAD(&buf->rb_stale_mrs);
+
return 0;
out:
rpcrdma_buffer_destroy(buf);
@@ -911,6 +951,8 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+ cancel_delayed_work_sync(&buf->rb_recovery_worker);
+
while (!list_empty(&buf->rb_recv_bufs)) {
struct rpcrdma_rep *rep;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 3472844..8eb02ab 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -244,7 +244,6 @@ struct rpcrdma_mw {
struct rpcrdma_fmr fmr;
struct rpcrdma_frmr frmr;
};
- struct work_struct mw_work;
struct rpcrdma_xprt *mw_xprt;
struct list_head mw_all;
};
@@ -336,6 +335,10 @@ struct rpcrdma_buffer {
struct list_head rb_allreqs;
u32 rb_bc_max_requests;
+
+ spinlock_t rb_recovery_lock;
+ struct list_head rb_stale_mrs;
+ struct delayed_work rb_recovery_worker;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
@@ -395,6 +398,7 @@ struct rpcrdma_memreg_ops {
struct rpcrdma_req *);
void (*ro_unmap_safe)(struct rpcrdma_xprt *,
struct rpcrdma_req *, bool);
+ void (*ro_recover_mr)(struct rpcrdma_mw *);
int (*ro_open)(struct rpcrdma_ia *,
struct rpcrdma_ep *,
struct rpcrdma_create_data_internal *);
@@ -471,6 +475,8 @@ void rpcrdma_buffer_put(struct rpcrdma_req *);
void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
+void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);
+
struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
size_t, gfp_t);
void rpcrdma_free_regbuf(struct rpcrdma_ia *,
@@ -478,9 +484,6 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
-int frwr_alloc_recovery_wq(void);
-void frwr_destroy_recovery_wq(void);
-
int rpcrdma_alloc_wq(void);
void rpcrdma_destroy_wq(void);
next prev parent reply other threads:[~2016-06-07 19:47 UTC|newest]
Thread overview: 33+ messages / expand[flat|nested] mbox.gz Atom feed top
2016-06-07 19:46 [PATCH v1 00/20] NFS/RDMA client patches proposed for v4.8 Chuck Lever
2016-06-07 19:46 ` [PATCH v1 01/20] xprtrdma: Remove ALLPHYSICAL memory registration mode Chuck Lever
2016-06-07 19:46 ` [PATCH v1 02/20] xprtrdma: Refactor ->ro_init Chuck Lever
2016-06-08 17:48 ` Anna Schumaker
2016-06-07 19:46 ` [PATCH v1 03/20] xprtrdma: Create common scatterlist fields in rpcrdma_mw Chuck Lever
2016-06-07 19:46 ` [PATCH v1 04/20] xprtrdma: Use scatterlist for DMA mapping and unmapping under FMR Chuck Lever
2016-06-07 19:46 ` [PATCH v1 05/20] xprtrdma: Remove rpcrdma_map_one() and friends Chuck Lever
2016-06-07 19:47 ` Chuck Lever [this message]
2016-06-07 19:47 ` [PATCH v1 07/20] xprtrdma: Place registered MWs on a per-req list Chuck Lever
2016-06-07 19:47 ` [PATCH v1 08/20] xprtrdma: Reply buffer exhaustion can be catastrophic Chuck Lever
2016-06-07 19:47 ` [PATCH v1 09/20] xprtrdma: Limit the number of rpcrdma_mws Chuck Lever
2016-06-07 20:49 ` Jason Gunthorpe
2016-06-07 21:09 ` Chuck Lever
2016-06-07 21:28 ` Jason Gunthorpe
2016-06-07 21:51 ` Chuck Lever
2016-06-07 22:01 ` Jason Gunthorpe
2016-06-08 14:54 ` Tom Talpey
2016-06-08 15:06 ` Trond Myklebust
2016-06-08 17:40 ` Jason Gunthorpe
2016-06-08 17:50 ` Trond Myklebust
2016-06-08 17:53 ` Chuck Lever
2016-06-08 18:45 ` Tom Talpey
2016-06-07 19:47 ` [PATCH v1 10/20] xprtrdma: Chunk list encoders no longer share one rl_segments array Chuck Lever
2016-06-07 19:47 ` [PATCH v1 11/20] xprtrdma: rpcrdma_inline_fixup() overruns the receive page list Chuck Lever
2016-06-07 19:47 ` [PATCH v1 12/20] xprtrdma: Do not update {head, tail}.iov_len in rpcrdma_inline_fixup() Chuck Lever
2016-06-07 19:48 ` [PATCH v1 13/20] xprtrdma: Update only specific fields in private receive buffer Chuck Lever
2016-06-07 19:48 ` [PATCH v1 14/20] xprtrdma: Clean up fixup_copy_count accounting Chuck Lever
2016-06-07 19:48 ` [PATCH v1 15/20] xprtrdma: No direct data placement with krb5i and krb5p Chuck Lever
2016-06-07 19:48 ` [PATCH v1 16/20] svc: Avoid garbage replies when pc_func() returns rpc_drop_reply Chuck Lever
2016-06-07 19:48 ` [PATCH v1 17/20] NFS: Don't drop CB requests with invalid principals Chuck Lever
2016-06-07 19:48 ` [PATCH v1 18/20] xprtrdma: Eliminate rpcrdma_receive_worker() Chuck Lever
2016-06-07 19:48 ` [PATCH v1 19/20] xprtrdma: Eliminate INLINE_THRESHOLD macros Chuck Lever
2016-06-07 19:49 ` [PATCH v1 20/20] xprtrdma: Relocate connection helper functions Chuck Lever
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20160607194708.18401.79092.stgit@manet.1015granger.net \
--to=chuck.lever@oracle.com \
--cc=linux-nfs@vger.kernel.org \
--cc=linux-rdma@vger.kernel.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).