All of lore.kernel.org
 help / color / mirror / Atom feed
From: Benny Halevy <bhalevy@panasas.com>
To: Trond Myklebust <trond.myklebust@fys.uio.no>
Cc: Fred Isaman <iisaman@netapp.com>, linux-nfs@vger.kernel.org
Subject: Re: [PATCH 16/22] pnfs-submit: rewrite of layout state handling and cb_layoutrecall
Date: Sun, 14 Nov 2010 13:44:13 +0200	[thread overview]
Message-ID: <4CDFCB8D.7040209@panasas.com> (raw)
In-Reply-To: <1289639517.3669.9.camel@heimdal.trondhjem.org>

On 2010-11-13 11:11, Trond Myklebust wrote:
> On Fri, 2010-11-12 at 03:48 -0500, Fred Isaman wrote:
>> Remove NFS_LAYOUT_STATEID_SET in favor of just checking list_empty(lo->segs).
>>
>> LAYOUTGETs with openstateid are serialized.  Waiting on the condition
>> (list_empty(lo->segs) && plh_outstanding>0) both drains outstanding RPCs once
>> the stateid is invalidated and allows only a single LAYOUTGET(openstateid)
>> through at a time.
>>
>> Before sending a LAYOUTRETURN, plh_block_lgets is incremented.  It is
>> decremented in the rpc_release function.  While set, LAYOUTGETs are
>> paused in their rpc_prepare function, and any responses are
>> forgotten.
>>
>> Callbacks are handled by blocking any matching LAYOUTGETS while processing and
>> initiating drain of IO.  A notification system is set up so that when
>> all relevant IO is finished, the state manger thread is invoked, which
>> synchronously sends the final matching LAYOUTRETURN before unblocking
>> LAYOUTGETS.
>>
>> Signed-off-by: Fred Isaman <iisaman@netapp.com>
>> ---
>>  fs/nfs/callback.h         |    7 +
>>  fs/nfs/callback_proc.c    |  466 +++++++++++++++++++++++----------------------
>>  fs/nfs/client.c           |    3 +
>>  fs/nfs/nfs4proc.c         |   81 ++++++--
>>  fs/nfs/nfs4state.c        |    4 +
>>  fs/nfs/nfs4xdr.c          |   16 ++-
>>  fs/nfs/pnfs.c             |  177 +++++++++++++-----
>>  fs/nfs/pnfs.h             |   41 +++-
>>  include/linux/nfs_fs_sb.h |    4 +
>>  9 files changed, 497 insertions(+), 302 deletions(-)
>>
>> diff --git a/fs/nfs/callback.h b/fs/nfs/callback.h
>> index cea58cc..4a9905b 100644
>> --- a/fs/nfs/callback.h
>> +++ b/fs/nfs/callback.h
>> @@ -163,6 +163,9 @@ struct cb_layoutrecallargs {
>>  extern unsigned nfs4_callback_layoutrecall(
>>  	struct cb_layoutrecallargs *args,
>>  	void *dummy, struct cb_process_state *cps);
>> +extern bool matches_outstanding_recall(struct inode *ino,
>> +				       struct pnfs_layout_range *range);
>> +extern void nfs_client_return_layouts(struct nfs_client *clp);
>>  
>>  static inline void put_session_client(struct nfs4_session *session)
>>  {
>> @@ -178,6 +181,10 @@ find_client_from_cps(struct cb_process_state *cps, struct sockaddr *addr)
>>  
>>  #else
>>  
>> +static inline void nfs_client_return_layouts(struct nfs_client *clp)
>> +{
>> +}
>> +
>>  static inline struct nfs_client *
>>  find_client_from_cps(struct cb_process_state *cps, struct sockaddr *addr)
>>  {
>> diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
>> index 6e0fc40..af405cf 100644
>> --- a/fs/nfs/callback_proc.c
>> +++ b/fs/nfs/callback_proc.c
>> @@ -124,265 +124,283 @@ int nfs4_validate_delegation_stateid(struct nfs_delegation *delegation, const nf
>>  #if defined(CONFIG_NFS_V4_1)
>>  
>>  static bool
>> -pnfs_is_next_layout_stateid(const struct pnfs_layout_hdr *lo,
>> -			    const nfs4_stateid stateid)
>> +_recall_matches_lget(struct pnfs_cb_lrecall_info *cb_info,
>> +		     struct inode *ino, struct pnfs_layout_range *range)
>>  {
>> -	bool res;
>> -	u32 oldseqid, newseqid;
>> -
>> -	spin_lock(&lo->inode->i_lock);
>> -	{
>> -		oldseqid = be32_to_cpu(lo->stateid.stateid.seqid);
>> -		newseqid = be32_to_cpu(stateid.stateid.seqid);
>> -		res = !memcmp(lo->stateid.stateid.other,
>> -			      stateid.stateid.other,
>> -			      NFS4_STATEID_OTHER_SIZE);
>> -		if (res) { /* comparing layout stateids */
>> -			if (oldseqid == ~0)
>> -				res = (newseqid == 1);
>> -			else
>> -				res = (newseqid == oldseqid + 1);
>> -		} else { /* open stateid */
>> -			res = !memcmp(lo->stateid.data,
>> -				      &zero_stateid,
>> -				      NFS4_STATEID_SIZE);
>> -			if (res)
>> -				res = (newseqid == 1);
>> -		}
>> -	}
>> -	spin_unlock(&lo->inode->i_lock);
>> +	struct cb_layoutrecallargs *cb_args = &cb_info->pcl_args;
>>  
>> -	return res;
>> +	switch (cb_args->cbl_recall_type) {
>> +	case RETURN_ALL:
>> +		return true;
>> +	case RETURN_FSID:
>> +		return !memcmp(&NFS_SERVER(ino)->fsid, &cb_args->cbl_fsid,
>> +			       sizeof(struct nfs_fsid));
>> +	case RETURN_FILE:
>> +		return (ino == cb_info->pcl_ino) &&
>> +			should_free_lseg(range, &cb_args->cbl_range);
>> +	default:
>> +		BUG();
> 
> Why should we BUG() just because the server is screwed up? That's not a
> client bug.
> 

Agreed.  This should be handled earlier in nfs4_callback_layoutrecall
or do_callback_layoutrecall so that we can return NFS4ERR_INVALID.


>> +	}
>>  }
>>  
>> -/*
>> - * Retrieve an inode based on layout recall parameters
>> - *
>> - * Note: caller must iput(inode) to dereference the inode.
>> - */
>> -static struct inode *
>> -nfs_layoutrecall_find_inode(struct nfs_client *clp,
>> -			    const struct cb_layoutrecallargs *args)
>> +bool
>> +matches_outstanding_recall(struct inode *ino, struct pnfs_layout_range *range)
>>  {
>> -	struct nfs_inode *nfsi;
>> -	struct pnfs_layout_hdr *lo;
>> -	struct nfs_server *server;
>> -	struct inode *ino = NULL;
>> -
>> -	dprintk("%s: Begin recall_type=%d clp %p\n",
>> -		__func__, args->cbl_recall_type, clp);
>> -
>> -	spin_lock(&clp->cl_lock);
>> -	list_for_each_entry(lo, &clp->cl_layouts, layouts) {
>> -		nfsi = NFS_I(lo->inode);
>> -		if (!nfsi)
>> -			continue;
>> -
>> -		dprintk("%s: Searching inode=%lu\n",
>> -			__func__, nfsi->vfs_inode.i_ino);
>> -
>> -		if (args->cbl_recall_type == RETURN_FILE) {
>> -		    if (nfs_compare_fh(&args->cbl_fh, &nfsi->fh))
>> -			continue;
>> -		} else if (args->cbl_recall_type == RETURN_FSID) {
>> -			server = NFS_SERVER(&nfsi->vfs_inode);
>> -			if (server->fsid.major != args->cbl_fsid.major ||
>> -			    server->fsid.minor != args->cbl_fsid.minor)
>> -				continue;
>> +	struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
>> +	struct pnfs_cb_lrecall_info *cb_info;
>> +	bool rv = false;
>> +
>> +	assert_spin_locked(&clp->cl_lock);
> 
> Can we please go easy on the asserts? There is way too much asserting
> going on in the NFSv4.1 code. This isn't a publicly visible interface,
> so just get it right in the debugging process before the merge, and then
> kill these asserts...
> 

OK. We can keep them in a DEVONLY patch only in the development tree
(it becomes handy when any changes are made on these code paths)

>> +	list_for_each_entry(cb_info, &clp->cl_layoutrecalls, pcl_list) {
>> +		if (_recall_matches_lget(cb_info, ino, range)) {
>> +			rv = true;
>> +			break;
>>  		}
>> -
>> -		/* Make sure client didn't clean up layout without
>> -		 * telling the server */
>> -		if (!has_layout(nfsi))
>> -			continue;
>> -
>> -		ino = igrab(&nfsi->vfs_inode);
>> -		dprintk("%s: Found inode=%p\n", __func__, ino);
>> -		break;
>>  	}
>> -	spin_unlock(&clp->cl_lock);
>> -	return ino;
>> +	return rv;
>>  }
>>  
>> -struct recall_layout_threadargs {
>> -	struct inode *inode;
>> -	struct nfs_client *clp;
>> -	struct completion started;
>> -	struct cb_layoutrecallargs *rl;
>> -	int result;
>> -};
>> -
>> -static int pnfs_recall_layout(void *data)
>> +/* Send a synchronous LAYOUTRETURN.  By the time this is called, we know
>> + * all IO has been drained, any matching lsegs deleted, and that no
>> + * overlapping LAYOUTGETs will be sent or processed for the duration
>> + * of this call.
>> + * Note that it is possible that when this is called, the stateid has
>> + * been invalidated.  But will not be cleared, so can still use.
>> + */
>> +static int
>> +pnfs_send_layoutreturn(struct nfs_client *clp,
>> +		       struct pnfs_cb_lrecall_info *cb_info)
>>  {
>> -	struct inode *inode, *ino;
>> -	struct nfs_client *clp;
>> -	struct cb_layoutrecallargs rl;
>> +	struct cb_layoutrecallargs *args = &cb_info->pcl_args;
>>  	struct nfs4_layoutreturn *lrp;
>> -	struct recall_layout_threadargs *args =
>> -		(struct recall_layout_threadargs *)data;
>> -	int status = 0;
>> -
>> -	daemonize("nfsv4-layoutreturn");
>> -
>> -	dprintk("%s: recall_type=%d fsid 0x%llx-0x%llx start\n",
>> -		__func__, args->rl->cbl_recall_type,
>> -		args->rl->cbl_fsid.major, args->rl->cbl_fsid.minor);
>> -
>> -	clp = args->clp;
>> -	inode = args->inode;
>> -	rl = *args->rl;
>> -
>> -	/* support whole file layouts only */
>> -	rl.cbl_range.offset = 0;
>> -	rl.cbl_range.length = NFS4_MAX_UINT64;
>> -
>> -	if (rl.cbl_recall_type == RETURN_FILE) {
>> -		if (pnfs_is_next_layout_stateid(NFS_I(inode)->layout,
>> -						rl.cbl_stateid))
>> -			status = pnfs_return_layout(inode, &rl.cbl_range,
>> -						    &rl.cbl_stateid, RETURN_FILE,
>> -						    false);
>> -		else
>> -			status = cpu_to_be32(NFS4ERR_DELAY);
>> -		if (status)
>> -			dprintk("%s RETURN_FILE error: %d\n", __func__, status);
>> -		else
>> -			status =  cpu_to_be32(NFS4ERR_NOMATCHING_LAYOUT);
>> -		args->result = status;
>> -		complete(&args->started);
>> -		goto out;
>> -	}
>> -
>> -	status = cpu_to_be32(NFS4_OK);
>> -	args->result = status;
>> -	complete(&args->started);
>> -	args = NULL;
>> -
>> -	/* IMPROVEME: This loop is inefficient, running in O(|s_inodes|^2) */
>> -	while ((ino = nfs_layoutrecall_find_inode(clp, &rl)) != NULL) {
>> -		/* FIXME: need to check status on pnfs_return_layout */
>> -		pnfs_return_layout(ino, &rl.cbl_range, NULL, RETURN_FILE, false);
>> -		iput(ino);
>> -	}
>>  
>>  	lrp = kzalloc(sizeof(*lrp), GFP_KERNEL);
>> -	if (!lrp) {
>> -		dprintk("%s: allocation failed. Cannot send last LAYOUTRETURN\n",
>> -			__func__);
>> -		goto out;
>> -	}
>> -
>> -	/* send final layoutreturn */
>> +	if (!lrp)
>> +		return -ENOMEM;
>>  	lrp->args.reclaim = 0;
>> -	lrp->args.layout_type = rl.cbl_layout_type;
>> -	lrp->args.return_type = rl.cbl_recall_type;
>> +	lrp->args.layout_type = args->cbl_layout_type;
>> +	lrp->args.return_type = args->cbl_recall_type;
>>  	lrp->clp = clp;
>> -	lrp->args.range = rl.cbl_range;
>> -	lrp->args.inode = inode;
>> -	nfs4_proc_layoutreturn(lrp, true);
>> -
>> -out:
>> -	clear_bit(NFS4CLNT_LAYOUT_RECALL, &clp->cl_state);
>> -	nfs_put_client(clp);
>> -	module_put_and_exit(0);
>> -	dprintk("%s: exit status %d\n", __func__, 0);
>> -	return 0;
>> +	if (args->cbl_recall_type == RETURN_FILE) {
>> +		lrp->args.range = args->cbl_range;
>> +		lrp->args.inode = cb_info->pcl_ino;
>> +	} else {
>> +		lrp->args.range.iomode = IOMODE_ANY;
>> +		lrp->args.inode = NULL;
>> +	}
>> +	return nfs4_proc_layoutreturn(lrp, true);
>>  }
>>  
>> -/*
>> - * Asynchronous layout recall!
>> +/* Called by state manager to finish CB_LAYOUTRECALLS initiated by
>> + * nfs4_callback_layoutrecall().
>>   */
>> -static int pnfs_async_return_layout(struct nfs_client *clp, struct inode *inode,
>> -				    struct cb_layoutrecallargs *rl)
>> +void nfs_client_return_layouts(struct nfs_client *clp)
>>  {
>> -	struct recall_layout_threadargs data = {
>> -		.clp = clp,
>> -		.inode = inode,
>> -		.rl = rl,
>> -	};
>> -	struct task_struct *t;
>> -	int status = -EAGAIN;
>> +	struct pnfs_cb_lrecall_info *cb_info;
>>  
>> -	dprintk("%s: -->\n", __func__);
>> +	spin_lock(&clp->cl_lock);
>> +	while (true) {
>> +		if (list_empty(&clp->cl_layoutrecalls)) {
>> +			spin_unlock(&clp->cl_lock);
>> +			break;
>> +		}
>> +		cb_info = list_first_entry(&clp->cl_layoutrecalls,
>> +					   struct pnfs_cb_lrecall_info,
>> +					   pcl_list);
>> +		spin_unlock(&clp->cl_lock);
>> +		if (atomic_read(&cb_info->pcl_count) != 0)
>> +			break;
>> +		/* What do on error return?  These layoutreturns are
>> +		 * required by the protocol.  So if do not get
>> +		 * successful reply, probably have to do something
>> +		 * more drastic.
>> +		 */
>> +		pnfs_send_layoutreturn(clp, cb_info);
>> +		spin_lock(&clp->cl_lock);
>> +		/* Removing from the list unblocks LAYOUTGETs */
>> +		list_del(&cb_info->pcl_list);
>> +		clp->cl_cb_lrecall_count--;
>> +		rpc_wake_up(&clp->cl_rpcwaitq_recall);
>> +		kfree(cb_info);
>> +	}
>> +}
>>  
>> -	/* FIXME: do not allow two concurrent layout recalls */
>> -	if (test_and_set_bit(NFS4CLNT_LAYOUT_RECALL, &clp->cl_state))
>> -		return status;
>> -
>> -	init_completion(&data.started);
>> -	__module_get(THIS_MODULE);
>> -	atomic_inc(&clp->cl_count);
>> -
>> -	t = kthread_run(pnfs_recall_layout, &data, "%s", "pnfs_recall_layout");
>> -	if (IS_ERR(t)) {
>> -		printk(KERN_INFO "NFS: Layout recall callback thread failed "
>> -			"for client (clientid %08x/%08x)\n",
>> -			(unsigned)(clp->cl_clientid >> 32),
>> -			(unsigned)(clp->cl_clientid));
>> -		status = PTR_ERR(t);
>> -		goto out_module_put;
>> +void notify_drained(struct pnfs_cb_lrecall_info *d)
>> +{
>> +	if (d && atomic_dec_and_test(&d->pcl_count)) {
>> +		set_bit(NFS4CLNT_LAYOUT_RECALL, &d->pcl_clp->cl_state);
>> +		nfs4_schedule_state_manager(d->pcl_clp);
>>  	}
>> -	wait_for_completion(&data.started);
>> -	return data.result;
>> -out_module_put:
>> -	nfs_put_client(clp);
>> -	clear_bit(NFS4CLNT_LAYOUT_RECALL, &clp->cl_state);
>> -	module_put(THIS_MODULE);
>> -	return status;
>>  }
>>  
>> -static int pnfs_recall_all_layouts(struct nfs_client *clp)
>> +static int initiate_layout_draining(struct pnfs_cb_lrecall_info *cb_info)
>>  {
>> -	struct cb_layoutrecallargs rl;
>> -	struct inode *inode;
>> -	int status = 0;
>> -
>> -	rl.cbl_recall_type = RETURN_ALL;
>> -	rl.cbl_range.iomode = IOMODE_ANY;
>> -	rl.cbl_range.offset = 0;
>> -	rl.cbl_range.length = NFS4_MAX_UINT64;
>> -
>> -	/* we need the inode to get the nfs_server struct */
>> -	inode = nfs_layoutrecall_find_inode(clp, &rl);
>> -	if (!inode)
>> -		return status;
>> -	status = pnfs_async_return_layout(clp, inode, &rl);
>> -	iput(inode);
>> +	struct nfs_client *clp = cb_info->pcl_clp;
>> +	struct pnfs_layout_hdr *lo;
>> +	int rv = NFS4ERR_NOMATCHING_LAYOUT;
>> +	struct cb_layoutrecallargs *args = &cb_info->pcl_args;
>> +
>> +	if (args->cbl_recall_type == RETURN_FILE) {
>> +		LIST_HEAD(free_me_list);
>> +
>> +		spin_lock(&clp->cl_lock);
>> +		list_for_each_entry(lo, &clp->cl_layouts, layouts) {
>> +			if (nfs_compare_fh(&args->cbl_fh,
>> +					   &NFS_I(lo->inode)->fh))
>> +				continue;
>> +			if (test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags))
>> +				rv = NFS4ERR_DELAY;
>> +			else {
>> +				/* FIXME I need to better understand igrab and
>> +				 * does having a layout ref keep ino around?
>> +				 *  It should.
>> +				 */
>> +				/* We need to hold the reference until any
>> +				 * potential LAYOUTRETURN is finished.
>> +				 */
>> +				get_layout_hdr(lo);
>> +				cb_info->pcl_ino = lo->inode;
>> +				rv = NFS4_OK;
>> +			}
>> +			break;
>> +		}
>> +		spin_unlock(&clp->cl_lock);
>> +
>> +		spin_lock(&lo->inode->i_lock);
>> +		if (rv == NFS4_OK) {
>> +			lo->plh_block_lgets++;
>> +			nfs4_asynch_forget_layouts(lo, &args->cbl_range,
>> +						   cb_info, &free_me_list);
>> +		}
>> +		pnfs_set_layout_stateid(lo, &args->cbl_stateid, true);
>> +		spin_unlock(&lo->inode->i_lock);
>> +		pnfs_free_lseg_list(&free_me_list);
>> +	} else {
>> +		struct pnfs_layout_hdr *tmp;
>> +		LIST_HEAD(recall_list);
>> +		LIST_HEAD(free_me_list);
>> +		struct pnfs_layout_range range = {
>> +			.iomode = IOMODE_ANY,
>> +			.offset = 0,
>> +			.length = NFS4_MAX_UINT64,
>> +		};
>> +
>> +		spin_lock(&clp->cl_lock);
>> +		/* Per RFC 5661, 12.5.5.2.1.5, bulk recall must be serialized */
>> +		if (!list_is_singular(&clp->cl_layoutrecalls)) {
>> +			spin_unlock(&clp->cl_lock);
>> +			return NFS4ERR_DELAY;
>> +		}
>> +		list_for_each_entry(lo, &clp->cl_layouts, layouts) {
>> +			if ((args->cbl_recall_type == RETURN_FSID) &&
>> +			    memcmp(&NFS_SERVER(lo->inode)->fsid,
>> +				   &args->cbl_fsid, sizeof(struct nfs_fsid)))
>> +				continue;
>> +			get_layout_hdr(lo);
>> +			/* We could list_del(&lo->layouts) here */
>> +			BUG_ON(!list_empty(&lo->plh_bulk_recall));
>> +			list_add(&lo->plh_bulk_recall, &recall_list);
>> +		}
>> +		spin_unlock(&clp->cl_lock);
>> +		list_for_each_entry_safe(lo, tmp,
>> +					 &recall_list, plh_bulk_recall) {
>> +			spin_lock(&lo->inode->i_lock);
>> +			set_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags);
>> +			nfs4_asynch_forget_layouts(lo, &range, cb_info,
>> +						   &free_me_list);
>> +			list_del_init(&lo->plh_bulk_recall);
>> +			spin_unlock(&lo->inode->i_lock);
>> +			put_layout_hdr(lo->inode);
>> +			rv = NFS4_OK;
>> +		}
>> +		pnfs_free_lseg_list(&free_me_list);
>> +	}
>> +	return rv;
>> +}
>> +
>> +static u32 do_callback_layoutrecall(struct nfs_client *clp,
>> +				    struct cb_layoutrecallargs *args)
>> +{
>> +	struct pnfs_cb_lrecall_info *new;
>> +	u32 res;
>> +
>> +	dprintk("%s enter, type=%i\n", __func__, args->cbl_recall_type);
>> +	new = kmalloc(sizeof(*new), GFP_KERNEL);
>> +	if (!new) {
>> +		res = NFS4ERR_RESOURCE;
>> +		goto out;
>> +	}
>> +	memcpy(&new->pcl_args, args, sizeof(*args));
>> +	atomic_set(&new->pcl_count, 1);
>> +	new->pcl_clp = clp;
>> +	new->pcl_ino = NULL;
>> +	spin_lock(&clp->cl_lock);
>> +	if (clp->cl_cb_lrecall_count >= PNFS_MAX_CB_LRECALLS) {
>> +		kfree(new);
>> +		res = NFS4ERR_DELAY;
>> +		spin_unlock(&clp->cl_lock);
>> +		goto out;
>> +	}
>> +	clp->cl_cb_lrecall_count++;
>> +	/* Adding to the list will block conflicting LGET activity */
>> +	list_add_tail(&new->pcl_list, &clp->cl_layoutrecalls);
>> +	spin_unlock(&clp->cl_lock);
>> +	res = initiate_layout_draining(new);
>> +	if (res || atomic_dec_and_test(&new->pcl_count)) {
>> +		spin_lock(&clp->cl_lock);
>> +		list_del(&new->pcl_list);
>> +		clp->cl_cb_lrecall_count--;
>> +		rpc_wake_up(&clp->cl_rpcwaitq_recall);
>> +		spin_unlock(&clp->cl_lock);
>> +		if (res == NFS4_OK) {
>> +			if (args->cbl_recall_type == RETURN_FILE) {
>> +				struct pnfs_layout_hdr *lo;
>> +
>> +				lo = NFS_I(new->pcl_ino)->layout;
>> +				spin_lock(&lo->inode->i_lock);
>> +				lo->plh_block_lgets--;
>> +				if (!pnfs_layoutgets_blocked(lo, NULL))
>> +					rpc_wake_up(&NFS_I(lo->inode)->lo_rpcwaitq_stateid);
>> +				spin_unlock(&lo->inode->i_lock);
>> +				put_layout_hdr(new->pcl_ino);
>> +			}
>> +			res = NFS4ERR_NOMATCHING_LAYOUT;
>> +		}
>> +		kfree(new);
>> +	}
>> +out:
>> +	dprintk("%s returning %i\n", __func__, res);
>> +	return res;
>>  
>> -	return status;
>>  }
>>  
>>  __be32 nfs4_callback_layoutrecall(struct cb_layoutrecallargs *args,
>>  				  void *dummy, struct cb_process_state *cps)
>>  {
>>  	struct nfs_client *clp;
>> -	struct inode *inode = NULL;
>> -	__be32 res;
>> -	int status;
>> +	u32 res;
>>  
>>  	dprintk("%s: -->\n", __func__);
>>  
>> -	res = cpu_to_be32(NFS4ERR_OP_NOT_IN_SESSION);
>> -	if (cps->session) /* set in cb_sequence */
>> +	if (cps->session) { /* set in cb_sequence */
>>  		clp = cps->session->clp;
>> -	else
>> -		goto out;
>> +		res = do_callback_layoutrecall(clp, args);
>> +	} else
>> +		res = NFS4ERR_OP_NOT_IN_SESSION;
>>  
>> -	res = cpu_to_be32(NFS4ERR_NOMATCHING_LAYOUT);
>> -	/*
>> -	 * In the _ALL or _FSID case, we need the inode to get
>> -	 * the nfs_server struct.
>> -	 */
>> -	inode = nfs_layoutrecall_find_inode(clp, args);
>> -	if (!inode)
>> -		goto out;
>> -	status = pnfs_async_return_layout(clp, inode, args);
>> -	if (status)
>> -		res = cpu_to_be32(NFS4ERR_DELAY);
>> -	iput(inode);
>> -out:
>> -	dprintk("%s: exit with status = %d\n", __func__, ntohl(res));
>> -	return res;
>> +	dprintk("%s: exit with status = %d\n", __func__, res);
>> +	return cpu_to_be32(res);
>> +}
>> +
>> +static void pnfs_recall_all_layouts(struct nfs_client *clp)
>> +{
>> +	struct cb_layoutrecallargs args;
>> +
>> +	/* Pretend we got a CB_LAYOUTRECALL(ALL) */
>> +	memset(&args, 0, sizeof(args));
>> +	args.cbl_recall_type = RETURN_ALL;
>> +	/* FIXME we ignore errors, what should we do? */
> 
> We're a forgetful client: we don't care...
> 

Well, CB_RECALL_ANY is generated in order to trim the server's state down
by allowing the client to *return* state it needs less or no longer needs.
Just forgetting this state doesn't help the server at all with this job!
There's no equivalent error to NFS4ERR_NOMATCHING_LAYOUT for CB_RECALL_ANY.

>> +	do_callback_layoutrecall(clp, &args);
>>  }
> 
> 
> 
>>  
>>  int nfs41_validate_delegation_stateid(struct nfs_delegation *delegation, const nfs4_stateid *stateid)
>> @@ -665,9 +683,7 @@ __be32 nfs4_callback_recallany(struct cb_recallanyargs *args, void *dummy,
>>  		flags |= FMODE_WRITE;
>>  	if (test_bit(RCA4_TYPE_MASK_FILE_LAYOUT, (const unsigned long *)
>>  		     &args->craa_type_mask))
>> -		if (pnfs_recall_all_layouts(clp) == -EAGAIN)
>> -			status = cpu_to_be32(NFS4ERR_DELAY);
>> -
>> +		pnfs_recall_all_layouts(clp);
>>  	if (flags)
>>  		nfs_expire_all_delegation_types(clp, flags);
>>  out:
>> diff --git a/fs/nfs/client.c b/fs/nfs/client.c
>> index 3c8c841..dbf43e7 100644
>> --- a/fs/nfs/client.c
>> +++ b/fs/nfs/client.c
>> @@ -158,6 +158,9 @@ static struct nfs_client *nfs_alloc_client(const struct nfs_client_initdata *cl_
>>  		clp->cl_machine_cred = cred;
>>  #if defined(CONFIG_NFS_V4_1)
>>  	INIT_LIST_HEAD(&clp->cl_layouts);
>> +	INIT_LIST_HEAD(&clp->cl_layoutrecalls);
>> +	rpc_init_wait_queue(&clp->cl_rpcwaitq_recall,
>> +			    "NFS client CB_LAYOUTRECALLS");
>>  #endif
>>  	nfs_fscache_get_client_cookie(clp);
>>  
>> diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
>> index fe79872..6223c6a 100644
>> --- a/fs/nfs/nfs4proc.c
>> +++ b/fs/nfs/nfs4proc.c
>> @@ -5346,31 +5346,58 @@ nfs4_layoutget_prepare(struct rpc_task *task, void *calldata)
>>  	struct inode *ino = lgp->args.inode;
>>  	struct nfs_inode *nfsi = NFS_I(ino);
>>  	struct nfs_server *server = NFS_SERVER(ino);
>> +	struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
>>  
>>  	dprintk("--> %s\n", __func__);
>> +	spin_lock(&clp->cl_lock);
>> +	if (matches_outstanding_recall(ino, &lgp->args.range)) {
>> +		rpc_sleep_on(&clp->cl_rpcwaitq_recall, task, NULL);
>> +		spin_unlock(&clp->cl_lock);
>> +		return;
>> +	}
>> +	spin_unlock(&clp->cl_lock);
>> +	/* Note the is a race here, where a CB_LAYOUTRECALL can come in
>> +	 * right now covering the LAYOUTGET we are about to send.
>> +	 * However, that is not so catastrophic, and there seems
>> +	 * to be no way to prevent it completely.
>> +	 */
>>  	spin_lock(&ino->i_lock);
>> -	if (pnfs_layoutgets_blocked(nfsi->layout)) {
>> +	if (pnfs_layoutgets_blocked(nfsi->layout, NULL)) {
>>  		rpc_sleep_on(&nfsi->lo_rpcwaitq_stateid, task, NULL);
>>  		spin_unlock(&ino->i_lock);
>>  		return;
>>  	}
>> +	/* This needs after above check but atomic with it in order to properly
>> +	 * serialize openstateid LAYOUTGETs.
>> +	 */
>> +	nfsi->layout->plh_outstanding++;
>>  	spin_unlock(&ino->i_lock);
>> +
>>  	if (nfs4_setup_sequence(server, NULL, &lgp->args.seq_args,
>> -				&lgp->res.seq_res, 0, task))
>> +				&lgp->res.seq_res, 0, task)) {
>> +		spin_lock(&ino->i_lock);
>> +		nfsi->layout->plh_outstanding--;
>> +		spin_unlock(&ino->i_lock);
>>  		return;
>> +	}
>>  	rpc_call_start(task);
>>  }
>>  
>>  static void nfs4_layoutget_done(struct rpc_task *task, void *calldata)
>>  {
>>  	struct nfs4_layoutget *lgp = calldata;
>> -	struct nfs_server *server = NFS_SERVER(lgp->args.inode);
>> +	struct inode *ino = lgp->args.inode;
>>  
>>  	dprintk("--> %s\n", __func__);
>>  
>> -	if (!nfs4_sequence_done(task, &lgp->res.seq_res))
>> +	if (!nfs4_sequence_done(task, &lgp->res.seq_res)) {
>> +		/* layout code relies on fact that in this case
>> +		 * code falls back to tk_action=call_start, but not
>> +		 * back to rpc_prepare_task, to keep plh_outstanding
>> +		 * correct.
>> +		 */
>>  		return;
>> -
>> +	}
>>  	switch (task->tk_status) {
>>  	case 0:
>>  		break;
>> @@ -5379,7 +5406,11 @@ static void nfs4_layoutget_done(struct rpc_task *task, void *calldata)
>>  		task->tk_status = -NFS4ERR_DELAY;
>>  		/* Fall through */
>>  	default:
>> -		if (nfs4_async_handle_error(task, server, NULL, NULL) == -EAGAIN) {
>> +		if (nfs4_async_handle_error(task, NFS_SERVER(ino),
>> +					    NULL, NULL) == -EAGAIN) {
>> +			spin_lock(&ino->i_lock);
>> +			NFS_I(ino)->layout->plh_outstanding--;
>> +			spin_unlock(&ino->i_lock);
>>  			rpc_restart_call_prepare(task);
>>  			return;
>>  		}
>> @@ -5437,13 +5468,20 @@ int nfs4_proc_layoutget(struct nfs4_layoutget *lgp)
>>  	if (IS_ERR(task))
>>  		return PTR_ERR(task);
>>  	status = nfs4_wait_for_completion_rpc_task(task);
>> -	if (status != 0)
>> -		goto out;
>> -	status = task->tk_status;
>> -	if (status != 0)
>> -		goto out;
>> -	status = pnfs_layout_process(lgp);
>> -out:
>> +	if (status == 0)
>> +		status = task->tk_status;
>> +	if (status == 0)
>> +		status = pnfs_layout_process(lgp);
>> +	else {
>> +		struct inode *ino = lgp->args.inode;
>> +		struct pnfs_layout_hdr *lo = NFS_I(ino)->layout;
>> +
>> +		spin_lock(&ino->i_lock);
>> +		lo->plh_outstanding--;
>> +		if (!pnfs_layoutgets_blocked(lo, NULL))
>> +			rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq_stateid);
>> +		spin_unlock(&ino->i_lock);
>> +	}
>>  	rpc_put_task(task);
>>  	dprintk("<-- %s status=%d\n", __func__, status);
>>  	return status;
>> @@ -5587,9 +5625,9 @@ static void nfs4_layoutreturn_done(struct rpc_task *task, void *calldata)
>>  
>>  		spin_lock(&lo->inode->i_lock);
>>  		if (lrp->res.lrs_present)
>> -			pnfs_set_layout_stateid(lo, &lrp->res.stateid);
>> +			pnfs_set_layout_stateid(lo, &lrp->res.stateid, true);
>>  		else
>> -			pnfs_invalidate_layout_stateid(lo);
>> +			BUG_ON(!list_empty(&lo->segs));
>>  		spin_unlock(&lo->inode->i_lock);
>>  	}
>>  	dprintk("<-- %s\n", __func__);
>> @@ -5606,10 +5644,11 @@ static void nfs4_layoutreturn_release(void *calldata)
>>  
>>  		spin_lock(&ino->i_lock);
>>  		lo->plh_block_lgets--;
>> -		if (!pnfs_layoutgets_blocked(lo))
>> +		lo->plh_outstanding--;
>> +		if (!pnfs_layoutgets_blocked(lo, NULL))
>>  			rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq_stateid);
>>  		spin_unlock(&ino->i_lock);
>> -		put_layout_hdr(lrp->args.inode);
>> +		put_layout_hdr(ino);
>>  	}
>>  	kfree(calldata);
>>  	dprintk("<-- %s\n", __func__);
>> @@ -5639,6 +5678,14 @@ int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp, bool issync)
>>  	int status = 0;
>>  
>>  	dprintk("--> %s\n", __func__);
>> +	if (lrp->args.return_type == RETURN_FILE) {
>> +		struct pnfs_layout_hdr *lo = NFS_I(lrp->args.inode)->layout;
>> +		/* FIXME we should test for BULK here */
>> +		spin_lock(&lo->inode->i_lock);
>> +		BUG_ON(lo->plh_block_lgets == 0);
>> +		lo->plh_outstanding++;
>> +		spin_unlock(&lo->inode->i_lock);
>> +	}
>>  	task = rpc_run_task(&task_setup_data);
>>  	if (IS_ERR(task))
>>  		return PTR_ERR(task);
>> diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
>> index 00632f6..ceb0d66 100644
>> --- a/fs/nfs/nfs4state.c
>> +++ b/fs/nfs/nfs4state.c
>> @@ -1560,6 +1560,10 @@ static void nfs4_state_manager(struct nfs_client *clp)
>>  			nfs_client_return_marked_delegations(clp);
>>  			continue;
>>  		}
>> +		if (test_and_clear_bit(NFS4CLNT_LAYOUT_RECALL, &clp->cl_state)) {
>> +			nfs_client_return_layouts(clp);
>> +			continue;
>> +		}
>>  		/* Recall session slots */
>>  		if (test_and_clear_bit(NFS4CLNT_RECALL_SLOT, &clp->cl_state)
>>  		   && nfs4_has_session(clp)) {
>> diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
>> index 328cca5..f530c7e 100644
>> --- a/fs/nfs/nfs4xdr.c
>> +++ b/fs/nfs/nfs4xdr.c
>> @@ -1827,13 +1827,14 @@ encode_getdeviceinfo(struct xdr_stream *xdr,
>>  	hdr->replen += decode_getdeviceinfo_maxsz;
>>  }
>>  
>> -static void
>> +static int
>>  encode_layoutget(struct xdr_stream *xdr,
>>  		      const struct nfs4_layoutget_args *args,
>>  		      struct compound_hdr *hdr)
>>  {
>>  	nfs4_stateid stateid;
>>  	__be32 *p;
>> +	int status;
>>  
>>  	p = reserve_space(xdr, 44 + NFS4_STATEID_SIZE);
>>  	*p++ = cpu_to_be32(OP_LAYOUTGET);
>> @@ -1843,8 +1844,11 @@ encode_layoutget(struct xdr_stream *xdr,
>>  	p = xdr_encode_hyper(p, args->range.offset);
>>  	p = xdr_encode_hyper(p, args->range.length);
>>  	p = xdr_encode_hyper(p, args->minlength);
>> -	pnfs_get_layout_stateid(&stateid, NFS_I(args->inode)->layout,
>> -				args->ctx->state);
>> +	status = pnfs_choose_layoutget_stateid(&stateid,
>> +					       NFS_I(args->inode)->layout,
>> +					       args->ctx->state);
>> +	if (status)
>> +		return status;
>>  	p = xdr_encode_opaque_fixed(p, &stateid.data, NFS4_STATEID_SIZE);
>>  	*p = cpu_to_be32(args->maxcount);
>>  
>> @@ -1857,6 +1861,7 @@ encode_layoutget(struct xdr_stream *xdr,
>>  		args->maxcount);
>>  	hdr->nops++;
>>  	hdr->replen += decode_layoutget_maxsz;
>> +	return 0;
>>  }
>>  
>>  static int
>> @@ -2782,12 +2787,15 @@ static int nfs4_xdr_enc_layoutget(struct rpc_rqst *req, uint32_t *p,
>>  	struct compound_hdr hdr = {
>>  		.minorversion = nfs4_xdr_minorversion(&args->seq_args),
>>  	};
>> +	int status;
>>  
>>  	xdr_init_encode(&xdr, &req->rq_snd_buf, p);
>>  	encode_compound_hdr(&xdr, req, &hdr);
>>  	encode_sequence(&xdr, &args->seq_args, &hdr);
>>  	encode_putfh(&xdr, NFS_FH(args->inode), &hdr);
>> -	encode_layoutget(&xdr, args, &hdr);
>> +	status = encode_layoutget(&xdr, args, &hdr);
>> +	if (status)
>> +		return status;
>>  	encode_nops(&hdr);
>>  	return 0;
>>  }
>> diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
>> index 07b04e8..2d817be 100644
>> --- a/fs/nfs/pnfs.c
>> +++ b/fs/nfs/pnfs.c
>> @@ -233,7 +233,7 @@ EXPORT_SYMBOL_GPL(pnfs_unregister_layoutdriver);
>>   */
>>  
>>  /* Need to hold i_lock if caller does not already hold reference */
>> -static void
>> +void
>>  get_layout_hdr(struct pnfs_layout_hdr *lo)
>>  {
>>  	atomic_inc(&lo->plh_refcount);
>> @@ -278,24 +278,29 @@ init_lseg(struct pnfs_layout_hdr *lo, struct pnfs_layout_segment *lseg)
>>  	smp_mb();
>>  	lseg->valid = true;
>>  	lseg->layout = lo;
>> +	lseg->drain_notification = NULL;
>>  }
>>  
>>  static void
>>  _put_lseg_common(struct pnfs_layout_segment *lseg)
>>  {
>> +	struct inode *ino = lseg->layout->inode;
>> +
>>  	BUG_ON(lseg->valid == true);
>>  	list_del(&lseg->fi_list);
>>  	if (list_empty(&lseg->layout->segs)) {
>>  		struct nfs_client *clp;
>>  
>> -		clp = NFS_SERVER(lseg->layout->inode)->nfs_client;
>> +		clp = NFS_SERVER(ino)->nfs_client;
>>  		spin_lock(&clp->cl_lock);
>>  		/* List does not take a reference, so no need for put here */
>>  		list_del_init(&lseg->layout->layouts);
>>  		spin_unlock(&clp->cl_lock);
>> -		pnfs_invalidate_layout_stateid(lseg->layout);
>> +		clear_bit(NFS_LAYOUT_BULK_RECALL, &lseg->layout->plh_flags);
>> +		if (!pnfs_layoutgets_blocked(lseg->layout, NULL))
>> +			rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq_stateid);
>>  	}
>> -	rpc_wake_up(&NFS_I(lseg->layout->inode)->lo_rpcwaitq);
>> +	rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq);
>>  }
>>  
>>  /* The use of tmp_list is necessary because pnfs_curr_ld->free_lseg
>> @@ -325,9 +330,12 @@ put_lseg(struct pnfs_layout_segment *lseg)
>>  		atomic_read(&lseg->pls_refcount), lseg->valid);
>>  	ino = lseg->layout->inode;
>>  	if (atomic_dec_and_lock(&lseg->pls_refcount, &ino->i_lock)) {
>> +		struct pnfs_cb_lrecall_info *drain_info = lseg->drain_notification;
>> +
>>  		_put_lseg_common(lseg);
>>  		spin_unlock(&ino->i_lock);
>>  		NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg);
>> +		notify_drained(drain_info);
>>  		/* Matched by get_layout_hdr_locked in pnfs_insert_layout */
>>  		put_layout_hdr(ino);
>>  	}
>> @@ -345,7 +353,7 @@ EXPORT_SYMBOL_GPL(put_lseg);
>>   * READ		READ	true
>>   * READ		RW	false
>>   */
>> -static int
>> +bool
>>  should_free_lseg(struct pnfs_layout_range *lseg_range,
>>  		 struct pnfs_layout_range *recall_range)
>>  {
>> @@ -388,16 +396,19 @@ pnfs_clear_lseg_list(struct pnfs_layout_hdr *lo, struct list_head *tmp_list,
>>  	dprintk("%s:Return\n", __func__);
>>  }
>>  
>> -static void
>> +void
>>  pnfs_free_lseg_list(struct list_head *free_me)
>>  {
>>  	struct pnfs_layout_segment *lseg, *tmp;
>>  	struct inode *ino;
>> +	struct pnfs_cb_lrecall_info *drain_info;
>>  
>>  	list_for_each_entry_safe(lseg, tmp, free_me, fi_list) {
>>  		BUG_ON(atomic_read(&lseg->pls_refcount) != 0);
>>  		ino = lseg->layout->inode;
>> +		drain_info = lseg->drain_notification;
>>  		NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg);
>> +		notify_drained(drain_info);
>>  		/* Matched by get_layout_hdr_locked in pnfs_insert_layout */
>>  		put_layout_hdr(ino);
>>  	}
>> @@ -453,40 +464,49 @@ pnfs_destroy_all_layouts(struct nfs_client *clp)
>>  	}
>>  }
>>  
>> -/* update lo->stateid with new if is more recent
>> - *
>> - * lo->stateid could be the open stateid, in which case we just use what given.
>> - */
>> +/* update lo->stateid with new if is more recent */
>>  void
>> -pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo,
>> -			const nfs4_stateid *new)
>> +pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo, const nfs4_stateid *new,
>> +			bool update_barrier)
>>  {
>> -	nfs4_stateid *old = &lo->stateid;
>> -	bool overwrite = false;
>> +	u32 oldseq, newseq;
>>  
>>  	assert_spin_locked(&lo->inode->i_lock);
>> -	if (!test_bit(NFS_LAYOUT_STATEID_SET, &lo->plh_flags) ||
>> -	    memcmp(old->stateid.other, new->stateid.other, sizeof(new->stateid.other)))
>> -		overwrite = true;
>> -	else {
>> -		u32 oldseq, newseq;
>> -
>> -		oldseq = be32_to_cpu(old->stateid.seqid);
>> -		newseq = be32_to_cpu(new->stateid.seqid);
>> -		if ((int)(newseq - oldseq) > 0)
>> -			overwrite = true;
>> +	oldseq = be32_to_cpu(lo->stateid.stateid.seqid);
>> +	newseq = be32_to_cpu(new->stateid.seqid);
>> +	if ((int)(newseq - oldseq) > 0) {
>> +		memcpy(&lo->stateid, &new->stateid, sizeof(new->stateid));
>> +		if (update_barrier)
>> +			lo->plh_barrier = be32_to_cpu(new->stateid.seqid);
>> +		else {
>> +			/* Because of wraparound, we want to keep the barrier
>> +			 * "close" to the current seqids.  It needs to be
>> +			 * within 2**31 to count as "behind", so if it
>> +			 * gets too near that limit, give us a litle leeway
>> +			 * and bring it to within 2**30.
>> +			 * NOTE - and yes, this is all unsigned arithmetic.
>> +			 */
>> +			if (unlikely((newseq - lo->plh_barrier) > (3 << 29)))
>> +				lo->plh_barrier = newseq - (1 << 30);
>> +		}
>>  	}
>> -	if (overwrite)
>> -		memcpy(&old->stateid, &new->stateid, sizeof(new->stateid));
>>  }
>>  
>> -void
>> -pnfs_get_layout_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
>> -			struct nfs4_state *open_state)
>> +int
>> +pnfs_choose_layoutget_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
>> +			      struct nfs4_state *open_state)
>>  {
>> +	int status = 0;
>> +
>>  	dprintk("--> %s\n", __func__);
>>  	spin_lock(&lo->inode->i_lock);
>> -	if (!test_bit(NFS_LAYOUT_STATEID_SET, &lo->plh_flags)) {
>> +	if (lo->plh_block_lgets ||
>> +	    test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags)) {
>> +		/* We avoid -EAGAIN, as that has special meaning to
>> +		 * some callers.
>> +		 */
>> +		status = -NFS4ERR_LAYOUTTRYLATER;
>> +	} else if (list_empty(&lo->segs)) {
>>  		int seq;
>>  
>>  		do {
>> @@ -494,12 +514,11 @@ pnfs_get_layout_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
>>  			memcpy(dst->data, open_state->stateid.data,
>>  			       sizeof(open_state->stateid.data));
>>  		} while (read_seqretry(&open_state->seqlock, seq));
>> -		set_bit(NFS_LAYOUT_STATEID_SET, &lo->plh_flags);
>>  	} else
>> -		memcpy(dst->data, lo->stateid.data,
>> -		       sizeof(lo->stateid.data));
>> +		memcpy(dst->data, lo->stateid.data, sizeof(lo->stateid.data));
>>  	spin_unlock(&lo->inode->i_lock);
>>  	dprintk("<-- %s\n", __func__);
>> +	return status;
>>  }
>>  
>>  /*
>> @@ -566,6 +585,28 @@ has_layout_to_return(struct pnfs_layout_hdr *lo,
>>  	return out;
>>  }
>>  
>> +void nfs4_asynch_forget_layouts(struct pnfs_layout_hdr *lo,
>> +				struct pnfs_layout_range *range,
>> +				struct pnfs_cb_lrecall_info *drain_info,
>> +				struct list_head *tmp_list)
>> +{
>> +	struct pnfs_layout_segment *lseg, *tmp;
>> +
>> +	assert_spin_locked(&lo->inode->i_lock);
> 
> Poor practice. If you want to ensure the caller holds the inode->i_lock,
> then just call the function '*_locked'. That is a lot more helpful than
> these damned asserts.
> 

That makes sense.

Benny

>> +	list_for_each_entry_safe(lseg, tmp, &lo->segs, fi_list)
>> +		if (should_free_lseg(&lseg->range, range)) {
>> +			/* FIXME - need to change to something like a
>> +			 * notification bitmap to remove the restriction
>> +			 * of only being able to process a single
>> +			 * CB_LAYOUTRECALL at a time.
>> +			 */
>> +			BUG_ON(lseg->drain_notification);
>> +			lseg->drain_notification = drain_info;
>> +			atomic_inc(&drain_info->pcl_count);
>> +			mark_lseg_invalid(lseg, tmp_list);
>> +		}
>> +}
>> +
>>  /* Return true if there is layout based io in progress in the given range.
>>   * Assumes range has already been marked invalid, and layout marked to
>>   * prevent any new lseg from being inserted.
>> @@ -711,14 +752,6 @@ pnfs_insert_layout(struct pnfs_layout_hdr *lo,
>>  	dprintk("%s:Begin\n", __func__);
>>  
>>  	assert_spin_locked(&lo->inode->i_lock);
>> -	if (list_empty(&lo->segs)) {
>> -		struct nfs_client *clp = NFS_SERVER(lo->inode)->nfs_client;
>> -
>> -		spin_lock(&clp->cl_lock);
>> -		BUG_ON(!list_empty(&lo->layouts));
>> -		list_add_tail(&lo->layouts, &clp->cl_layouts);
>> -		spin_unlock(&clp->cl_lock);
>> -	}
>>  	list_for_each_entry(lp, &lo->segs, fi_list) {
>>  		if (cmp_layout(&lp->range, &lseg->range) > 0)
>>  			continue;
>> @@ -735,6 +768,9 @@ pnfs_insert_layout(struct pnfs_layout_hdr *lo,
>>  	}
>>  	if (!found) {
>>  		list_add_tail(&lseg->fi_list, &lo->segs);
>> +		if (list_is_singular(&lo->segs) &&
>> +		    !pnfs_layoutgets_blocked(lo, NULL))
>> +			rpc_wake_up(&NFS_I(lo->inode)->lo_rpcwaitq_stateid);
>>  		dprintk("%s: inserted lseg %p "
>>  			"iomode %d offset %llu length %llu at tail\n",
>>  			__func__, lseg, lseg->range.iomode,
>> @@ -756,6 +792,7 @@ alloc_init_layout_hdr(struct inode *ino)
>>  	atomic_set(&lo->plh_refcount, 1);
>>  	INIT_LIST_HEAD(&lo->layouts);
>>  	INIT_LIST_HEAD(&lo->segs);
>> +	INIT_LIST_HEAD(&lo->plh_bulk_recall);
>>  	lo->inode = ino;
>>  	return lo;
>>  }
>> @@ -843,6 +880,7 @@ pnfs_update_layout(struct inode *ino,
>>  		.length = NFS4_MAX_UINT64,
>>  	};
>>  	struct nfs_inode *nfsi = NFS_I(ino);
>> +	struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
>>  	struct pnfs_layout_hdr *lo;
>>  	struct pnfs_layout_segment *lseg = NULL;
>>  
>> @@ -878,9 +916,28 @@ pnfs_update_layout(struct inode *ino,
>>  		goto out_unlock;
>>  
>>  	get_layout_hdr(lo); /* Matched in pnfs_layoutget_release */
>> +	if (list_empty(&lo->segs)) {
>> +		/* The lo must be on the clp list if there is any
>> +		 * chance of a CB_LAYOUTRECALL(FILE) coming in.
>> +		 */
>> +		spin_lock(&clp->cl_lock);
>> +		BUG_ON(!list_empty(&lo->layouts));
>> +		list_add_tail(&lo->layouts, &clp->cl_layouts);
>> +		spin_unlock(&clp->cl_lock);
>> +	}
>>  	spin_unlock(&ino->i_lock);
>>  
>>  	lseg = send_layoutget(lo, ctx, &arg);
>> +	if (!lseg) {
>> +		spin_lock(&ino->i_lock);
>> +		if (list_empty(&lo->segs)) {
>> +			spin_lock(&clp->cl_lock);
>> +			list_del_init(&lo->layouts);
>> +			spin_unlock(&clp->cl_lock);
>> +			clear_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags);
>> +		}
>> +		spin_unlock(&ino->i_lock);
>> +	}
>>  out:
>>  	dprintk("%s end, state 0x%lx lseg %p\n", __func__,
>>  		nfsi->layout->plh_flags, lseg);
>> @@ -891,10 +948,15 @@ out_unlock:
>>  }
>>  
>>  bool
>> -pnfs_layoutgets_blocked(struct pnfs_layout_hdr *lo)
>> +pnfs_layoutgets_blocked(struct pnfs_layout_hdr *lo, nfs4_stateid *stateid)
>>  {
>>  	assert_spin_locked(&lo->inode->i_lock);
>> -	return lo->plh_block_lgets;
>> +	if ((stateid) &&
>> +	    (int)(lo->plh_barrier - be32_to_cpu(stateid->stateid.seqid)) >= 0)
>> +		return true;
>> +	return lo->plh_block_lgets ||
>> +		test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags) ||
>> +		(list_empty(&lo->segs) && lo->plh_outstanding);
>>  }
>>  
>>  int
>> @@ -904,6 +966,7 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
>>  	struct nfs4_layoutget_res *res = &lgp->res;
>>  	struct pnfs_layout_segment *lseg;
>>  	struct inode *ino = lo->inode;
>> +	struct nfs_client *clp = NFS_SERVER(ino)->nfs_client;
>>  	int status = 0;
>>  
>>  	/* Inject layout blob into I/O device driver */
>> @@ -915,10 +978,25 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
>>  			status = PTR_ERR(lseg);
>>  		dprintk("%s: Could not allocate layout: error %d\n",
>>  		       __func__, status);
>> +		spin_lock(&ino->i_lock);
>>  		goto out;
>>  	}
>>  
>>  	spin_lock(&ino->i_lock);
>> +	/* decrement needs to be done before call to pnfs_layoutget_blocked */
>> +	lo->plh_outstanding--;
>> +	spin_lock(&clp->cl_lock);
>> +	if (matches_outstanding_recall(ino, &res->range)) {
>> +		spin_unlock(&clp->cl_lock);
>> +		dprintk("%s forget reply due to recall\n", __func__);
>> +		goto out_forget_reply;
>> +	}
>> +	spin_unlock(&clp->cl_lock);
>> +
>> +	if (pnfs_layoutgets_blocked(lo, &res->stateid)) {
>> +		dprintk("%s forget reply due to state\n", __func__);
>> +		goto out_forget_reply;
>> +	}
>>  	init_lseg(lo, lseg);
>>  	lseg->range = res->range;
>>  	get_lseg(lseg);
>> @@ -934,10 +1012,19 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
>>  	}
>>  
>>  	/* Done processing layoutget. Set the layout stateid */
>> -	pnfs_set_layout_stateid(lo, &res->stateid);
>> -	spin_unlock(&ino->i_lock);
>> +	pnfs_set_layout_stateid(lo, &res->stateid, false);
>>  out:
>> +	if (!pnfs_layoutgets_blocked(lo, NULL))
>> +		rpc_wake_up(&NFS_I(ino)->lo_rpcwaitq_stateid);
>> +	spin_unlock(&ino->i_lock);
>>  	return status;
>> +
>> +out_forget_reply:
>> +	spin_unlock(&ino->i_lock);
>> +	lseg->layout = lo;
>> +	NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg);
>> +	spin_lock(&ino->i_lock);
>> +	goto out;
>>  }
>>  
>>  void
>> diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
>> index 891aeab..7ea121f 100644
>> --- a/fs/nfs/pnfs.h
>> +++ b/fs/nfs/pnfs.h
>> @@ -31,6 +31,7 @@
>>  #define FS_NFS_PNFS_H
>>  
>>  #include <linux/nfs_page.h>
>> +#include "callback.h" /* for cb_layoutrecallargs */
>>  
>>  struct pnfs_layout_segment {
>>  	struct list_head fi_list;
>> @@ -38,6 +39,7 @@ struct pnfs_layout_segment {
>>  	atomic_t pls_refcount;
>>  	bool valid;
>>  	struct pnfs_layout_hdr *layout;
>> +	struct pnfs_cb_lrecall_info *drain_notification;
>>  };
>>  
>>  enum pnfs_try_status {
>> @@ -52,7 +54,7 @@ enum pnfs_try_status {
>>  enum {
>>  	NFS_LAYOUT_RO_FAILED = 0,	/* get ro layout failed stop trying */
>>  	NFS_LAYOUT_RW_FAILED,		/* get rw layout failed stop trying */
>> -	NFS_LAYOUT_STATEID_SET,		/* have a valid layout stateid */
>> +	NFS_LAYOUT_BULK_RECALL,		/* bulk recall affecting layout */
>>  	NFS_LAYOUT_NEED_LCOMMIT,	/* LAYOUTCOMMIT needed */
>>  };
>>  
>> @@ -94,10 +96,13 @@ struct pnfs_layoutdriver_type {
>>  struct pnfs_layout_hdr {
>>  	atomic_t		plh_refcount;
>>  	struct list_head	layouts;   /* other client layouts */
>> +	struct list_head	plh_bulk_recall; /* clnt list of bulk recalls */
>>  	struct list_head	segs;      /* layout segments list */
>>  	int			roc_iomode;/* return on close iomode, 0=none */
>>  	nfs4_stateid		stateid;
>> +	unsigned long		plh_outstanding; /* number of RPCs out */
>>  	unsigned long		plh_block_lgets; /* block LAYOUTGET if >0 */
>> +	u32			plh_barrier; /* ignore lower seqids */
>>  	unsigned long		plh_flags;
>>  	struct rpc_cred		*cred;     /* layoutcommit credential */
>>  	/* DH: These vars keep track of the maximum write range
>> @@ -118,6 +123,14 @@ struct pnfs_device {
>>  	unsigned int  pglen;
>>  };
>>  
>> +struct pnfs_cb_lrecall_info {
>> +	struct list_head	pcl_list; /* hook into cl_layoutrecalls list */
>> +	atomic_t		pcl_count;
>> +	struct nfs_client	*pcl_clp;
>> +	struct inode		*pcl_ino;
>> +	struct cb_layoutrecallargs pcl_args;
>> +};
>> +
>>  /*
>>   * Device ID RCU cache. A device ID is unique per client ID and layout type.
>>   */
>> @@ -176,7 +189,10 @@ extern int nfs4_proc_layoutcommit(struct nfs4_layoutcommit_data *data,
>>  extern int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp, bool wait);
>>  
>>  /* pnfs.c */
>> +void get_layout_hdr(struct pnfs_layout_hdr *lo);
>>  void put_lseg(struct pnfs_layout_segment *lseg);
>> +bool should_free_lseg(struct pnfs_layout_range *lseg_range,
>> +		      struct pnfs_layout_range *recall_range);
>>  struct pnfs_layout_segment *
>>  pnfs_has_layout(struct pnfs_layout_hdr *lo, struct pnfs_layout_range *range);
>>  struct pnfs_layout_segment *
>> @@ -201,15 +217,24 @@ enum pnfs_try_status pnfs_try_to_commit(struct nfs_write_data *,
>>  void pnfs_pageio_init_read(struct nfs_pageio_descriptor *, struct inode *,
>>  			   struct nfs_open_context *, struct list_head *);
>>  void pnfs_pageio_init_write(struct nfs_pageio_descriptor *, struct inode *);
>> -bool pnfs_layoutgets_blocked(struct pnfs_layout_hdr *lo);
>> +bool pnfs_layoutgets_blocked(struct pnfs_layout_hdr *lo, nfs4_stateid *stateid);
>>  int pnfs_layout_process(struct nfs4_layoutget *lgp);
>> +void pnfs_free_lseg_list(struct list_head *tmp_list);
>>  void pnfs_destroy_layout(struct nfs_inode *);
>>  void pnfs_destroy_all_layouts(struct nfs_client *);
>>  void put_layout_hdr(struct inode *inode);
>>  void pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo,
>> -			     const nfs4_stateid *new);
>> -void pnfs_get_layout_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo,
>> -			     struct nfs4_state *open_state);
>> +			     const nfs4_stateid *new,
>> +			     bool update_barrier);
>> +int pnfs_choose_layoutget_stateid(nfs4_stateid *dst,
>> +				  struct pnfs_layout_hdr *lo,
>> +				  struct nfs4_state *open_state);
>> +void nfs4_asynch_forget_layouts(struct pnfs_layout_hdr *lo,
>> +				struct pnfs_layout_range *range,
>> +				struct pnfs_cb_lrecall_info *drain_info,
>> +				struct list_head *tmp_list);
>> +/* FIXME - this should be in callback.h, but pnfs_cb_lrecall_info needs to be there too */
>> +extern void notify_drained(struct pnfs_cb_lrecall_info *d);
>>  
>>  static inline bool
>>  has_layout(struct nfs_inode *nfsi)
>> @@ -223,12 +248,6 @@ static inline int lo_fail_bit(u32 iomode)
>>  			 NFS_LAYOUT_RW_FAILED : NFS_LAYOUT_RO_FAILED;
>>  }
>>  
>> -static inline void pnfs_invalidate_layout_stateid(struct pnfs_layout_hdr *lo)
>> -{
>> -	assert_spin_locked(&lo->inode->i_lock);
>> -	clear_bit(NFS_LAYOUT_STATEID_SET, &lo->plh_flags);
>> -}
>> -
>>  static inline void get_lseg(struct pnfs_layout_segment *lseg)
>>  {
>>  	atomic_inc(&lseg->pls_refcount);
>> diff --git a/include/linux/nfs_fs_sb.h b/include/linux/nfs_fs_sb.h
>> index 3cae408..80dcc00 100644
>> --- a/include/linux/nfs_fs_sb.h
>> +++ b/include/linux/nfs_fs_sb.h
>> @@ -83,6 +83,10 @@ struct nfs_client {
>>  	u32			cl_exchange_flags;
>>  	struct nfs4_session	*cl_session; 	/* sharred session */
>>  	struct list_head	cl_layouts;
>> +	struct list_head	cl_layoutrecalls;
>> +	unsigned long		cl_cb_lrecall_count;
>> +#define PNFS_MAX_CB_LRECALLS (1)
>> +	struct rpc_wait_queue	cl_rpcwaitq_recall;
>>  	struct pnfs_deviceid_cache *cl_devid_cache; /* pNFS deviceid cache */
>>  #endif /* CONFIG_NFS_V4_1 */
>>  
> 
> 
> 
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html

  reply	other threads:[~2010-11-14 11:44 UTC|newest]

Thread overview: 46+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2010-11-12  8:48 [PATCH 00/22] rewrite of CB_LAYOUTRECALL and layoutstate code, try 2 Fred Isaman
2010-11-12  8:48 ` [PATCH 01/22] pnfs-submit: remove RPC_ASSASSINATED(task) checks Fred Isaman
2010-11-12  8:48 ` [PATCH 02/22] pnfs-submit: remove unnecessary field lgp->status Fred Isaman
2010-11-12  8:48 ` [PATCH 03/22] pnfs-submit: layoutreturn's rpc_call_op functions need to handle bulk returns Fred Isaman
2010-11-12  8:48 ` [PATCH 04/22] pnfs-submit: argument to should_free_lseg changed from lseg to range Fred Isaman
2010-11-12  8:48 ` [PATCH 05/22] pnfs-submit: change layout state seqlock to a spinlock Fred Isaman
2010-11-12  8:48 ` [PATCH 06/22] NFSv4.1: Callback share session between ops Fred Isaman
2010-11-12  8:48 ` [PATCH 07/22] SQUASHME: pnfs-submit: fixups for nfsv4.1 callbacks Fred Isaman
2010-11-12  8:48 ` [PATCH 08/22] SQUASHME: allow cb_sequence changes to compile without v4.1 Fred Isaman
2010-11-14 12:05   ` Benny Halevy
2010-11-15 15:07     ` Fred Isaman
2010-11-12  8:48 ` [PATCH 09/22] pnfs-submit: change pnfs_layout_segment refcounting from kref to atomic_t Fred Isaman
2010-11-12  8:48 ` [PATCH 10/22] pnfs-submit: Have LAYOUTGETS wait when lo->plh_block_lgets is set Fred Isaman
2010-11-12  8:48 ` [PATCH 11/22] pnfs-submit: remove _pnfs_can_return_lseg call from pnfs_clear_lseg_list Fred Isaman
2010-11-12  8:48 ` [PATCH 12/22] pnfs_submit: nfs4_layoutreturn_release should not reference results Fred Isaman
2010-11-12  8:48 ` [PATCH 13/22] pnfs-submit: reorganize struct cb_layoutrecallargs Fred Isaman
2010-11-12  8:48 ` [PATCH 14/22] pnfs-submit: rename lo->state to lo->plh_flags Fred Isaman
2010-11-12  8:48 ` [PATCH 15/22] pnfs-submit: change pnfs_layout_hdr refcount to atomic_t Fred Isaman
2010-11-12  8:48 ` [PATCH 16/22] pnfs-submit: rewrite of layout state handling and cb_layoutrecall Fred Isaman
2010-11-13  9:11   ` Trond Myklebust
2010-11-14 11:44     ` Benny Halevy [this message]
2010-11-14 11:50       ` Benny Halevy
2010-11-15 14:28         ` Fred Isaman
2010-11-14 15:43   ` Benny Halevy
2010-11-15 14:51     ` Fred Isaman
2010-11-15 16:17       ` Benny Halevy
2010-11-15 17:53         ` [nfsv4] " Fred Isaman
2010-11-15 19:19           ` Boaz Harrosh
2010-11-15 20:40             ` Fred Isaman
2010-11-16  9:54               ` Boaz Harrosh
2010-11-16 11:12                 ` Boaz Harrosh
2010-11-17 17:53           ` Benny Halevy
2010-11-12  8:48 ` [PATCH 17/22] pnfs-submit: increase number of outstanding CB_LAYOUTRECALLS we can handle Fred Isaman
2010-11-12  8:48 ` [PATCH 18/22] pnfs-submit: roc add layoutreturn op to close compound Fred Isaman
2010-11-12 16:31   ` Benny Halevy
2010-11-12 16:56     ` Fred Isaman
2010-11-14 10:54       ` Benny Halevy
2010-11-14 14:21       ` [PATCH] SQUASHME: pnfs-submit: encode layoutreturn on close before close Benny Halevy
2010-11-14 18:12         ` [PATCH 2/2] pnfs-submit: handle NFS4ERR_DELEG_REVOKED for LAYOUTRETURN Benny Halevy
2010-11-15 12:54           ` [PATCH 2/2 v2] " Benny Halevy
2010-11-15 15:02         ` [PATCH] SQUASHME: pnfs-submit: encode layoutreturn on close before close Fred Isaman
2010-11-15 15:34           ` Benny Halevy
2010-11-12  8:48 ` [PATCH 19/22] pnfs-submit refactor layoutcommit xdr structures Fred Isaman
2010-11-12  8:48 ` [PATCH 20/22] pnfs-submit refactor pnfs_layoutcommit_setup Fred Isaman
2010-11-12  8:48 ` [PATCH 21/22] pnfs_submit: roc add layoutcommit op to close compound Fred Isaman
2010-11-12  8:48 ` [PATCH 22/22] SQUASHME: make roc patches compile without v4.1 Fred Isaman

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=4CDFCB8D.7040209@panasas.com \
    --to=bhalevy@panasas.com \
    --cc=iisaman@netapp.com \
    --cc=linux-nfs@vger.kernel.org \
    --cc=trond.myklebust@fys.uio.no \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.