From: Alexander Aring <aahringo@redhat.com>
To: cluster-devel.redhat.com
Subject: [Cluster-devel] [PATCH/RFC dlm/next 6/6] fs: dlm: use a non-static queue for callbacks
Date: Tue, 20 Sep 2022 14:36:23 -0400 [thread overview]
Message-ID: <CAK-6q+gEirxOAeded1HGLd54n5UrXfcCxOmLiDFidDLX0581xQ@mail.gmail.com> (raw)
In-Reply-To: <20220916184309.3179451-6-aahringo@redhat.com>
Hi,
On Fri, Sep 16, 2022 at 2:43 PM Alexander Aring <aahringo@redhat.com> wrote:
>
> This patch will introducde a queue implementation for callbacks by using
> the Linux lists. The current callback queue handling is implemented by a
> static limit of 6 entries, see DLM_CALLBACKS_SIZE. The sequence number
> inside the callback structure was used to see if the entries inside the
> static entry is valid or not. We don't need any sequence numbers anymore
> with a dynamic datastructure with grows and shrinks during runtime to
> offer such functionality.
>
> We assume that every callback will be delivered to the DLM user if once
> queued. Therefore the callback flag DLM_CB_SKIP was dropped and the
> check for skipping bast was moved before worker handling and not skip
> while the callback worker executes. This will reduce unnecessary queues
> of the callback worker.
>
> All last callback saves are pointers now and don't need to copied over.
> There is a reference counter for callback structures which will care
> about to free the callback structures at the right time if they are not
> referenced anymore.
>
> Signed-off-by: Alexander Aring <aahringo@redhat.com>
...
>
> - for (i = 1; i < DLM_CALLBACKS_SIZE; i++) {
> - if (!lkb->lkb_callbacks[i].seq)
> - break;
> - memcpy(&lkb->lkb_callbacks[i-1], &lkb->lkb_callbacks[i],
> - sizeof(struct dlm_callback));
> - memset(&lkb->lkb_callbacks[i], 0, sizeof(struct dlm_callback));
> - (*resid)++;
> - }
> -
> - /* if cb is a bast, it should be skipped if the blocking mode is
> - compatible with the last granted mode */
> -
> - if ((cb->flags & DLM_CB_BAST) && lkb->lkb_last_cast.seq) {
> - if (dlm_modes_compat(cb->mode, lkb->lkb_last_cast.mode)) {
> - cb->flags |= DLM_CB_SKIP;
> -
> - log_debug(ls, "skip %x bast %llu mode %d "
> - "for cast %llu mode %d",
> - lkb->lkb_id,
> - (unsigned long long)cb->seq,
> - cb->mode,
> - (unsigned long long)lkb->lkb_last_cast.seq,
> - lkb->lkb_last_cast.mode);
> - rv = 0;
> - goto out;
> - }
> - }
> + if (flags & DLM_CB_BAST)
> + dlm_callback_set_last_ptr(&lkb->lkb_last_bast, cb);
I will change this to have an int lkb->lkb_last_bast_mode, this is
only used for debugfs and we need to hold a lock to get this
information. However we only need the mode and this only for debugging
information... so we copy just the mode value.
>
> - if (cb->flags & DLM_CB_CAST)
> - memcpy(&lkb->lkb_last_cast, cb, sizeof(struct dlm_callback));
> + dlm_callback_set_last_ptr(&lkb->lkb_last_cb, cb);
>
> - if (cb->flags & DLM_CB_BAST)
> - memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
> - rv = 0;
> out:
> return rv;
> }
>
> +int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb)
> +{
> + /* oldest undelivered cb is callbacks first entry */
> + *cb = list_first_entry_or_null(&lkb->lkb_callbacks,
> + struct dlm_callback, list);
> + if (!*cb)
> + return DLM_DEQUEUE_CALLBACK_FAILURE;
> +
> + /* remove it from callbacks so shift others down */
> + list_del(&(*cb)->list);
> + if (atomic_dec_and_test(&lkb->lkb_callbacks_count))
> + return DLM_DEQUEUE_CALLBACK_LAST;
> +
> + return DLM_DEQUEUE_CALLBACK_SUCCESS;
> +}
> +
> void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
> uint32_t sbflags)
> {
> struct dlm_ls *ls = lkb->lkb_resource->res_ls;
> - uint64_t new_seq, prev_seq;
> int rv;
>
> - spin_lock(&dlm_cb_seq_spin);
> - new_seq = ++dlm_cb_seq;
> - if (!dlm_cb_seq)
> - new_seq = ++dlm_cb_seq;
> - spin_unlock(&dlm_cb_seq_spin);
> -
> if (lkb->lkb_flags & DLM_IFL_USER) {
> - dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq);
> + dlm_user_add_ast(lkb, flags, mode, status, sbflags);
> return;
> }
>
> spin_lock(&lkb->lkb_cb_lock);
> - prev_seq = lkb->lkb_callbacks[0].seq;
> -
> - rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq);
> - if (rv < 0)
> - goto out;
> -
> - if (!prev_seq) {
> + rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
> + spin_unlock(&lkb->lkb_cb_lock);
> + switch (rv) {
> + case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
> kref_get(&lkb->lkb_ref);
>
> read_lock(&ls->ls_cb_lock);
> @@ -203,9 +159,16 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
> queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
> }
> read_unlock(&ls->ls_cb_lock);
> + break;
> + case DLM_ENQUEUE_CALLBACK_FAILURE:
> + WARN_ON(1);
> + break;
> + case DLM_ENQUEUE_CALLBACK_SUCCESS:
> + break;
> + default:
> + WARN_ON(1);
> + break;
> }
> - out:
> - spin_unlock(&lkb->lkb_cb_lock);
> }
>
> void dlm_callback_work(struct work_struct *work)
> @@ -214,54 +177,36 @@ void dlm_callback_work(struct work_struct *work)
> struct dlm_ls *ls = lkb->lkb_resource->res_ls;
> void (*castfn) (void *astparam);
> void (*bastfn) (void *astparam, int mode);
> - struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
> - int i, rv, resid;
> + struct dlm_callback *cb;
> + int rv;
>
> - memset(&callbacks, 0, sizeof(callbacks));
> + WARN_ON(!atomic_read(&lkb->lkb_callbacks_count));
>
this WARN_ON() can be removed and mostly covered the WARN_ON() when
dequeue() fails. Because we see it as a failure to see a dequeue() and
there was nothing to dequeue() which indicates an issue with the whole
callback queue workflow.
> - spin_lock(&lkb->lkb_cb_lock);
> - if (!lkb->lkb_callbacks[0].seq) {
> - /* no callback work exists, shouldn't happen */
> - log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id);
> - dlm_print_lkb(lkb);
> - dlm_dump_lkb_callbacks(lkb);
> - }
> + do {
> + spin_lock(&lkb->lkb_cb_lock);
> + rv = dlm_dequeue_lkb_callback(lkb, &cb);
> + spin_unlock(&lkb->lkb_cb_lock);
>
> - for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
> - rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
> - if (rv < 0)
> + if (WARN_ON(rv == DLM_DEQUEUE_CALLBACK_FAILURE))
here.
- Alex
next prev parent reply other threads:[~2022-09-20 18:36 UTC|newest]
Thread overview: 10+ messages / expand[flat|nested] mbox.gz Atom feed top
2022-09-16 18:43 [Cluster-devel] [PATCH/RFC dlm/next 1/6] fs: dlm: let dlm_add_cb queue work after resume only Alexander Aring
2022-09-16 18:43 ` [Cluster-devel] [PATCH/RFC dlm/next 2/6] fs: dlm: use list_first_entry marco Alexander Aring
2022-09-16 18:43 ` [Cluster-devel] [PATCH/RFC dlm/next 3/6] fs: dlm: change the ls cb mutex to rw lock Alexander Aring
2022-09-19 16:25 ` Alexander Aring
2022-09-19 16:46 ` Alexander Aring
2022-09-16 18:43 ` [Cluster-devel] [PATCH/RFC dlm/next 4/6] fs: dlm: use spin lock instead of mutex Alexander Aring
2022-09-16 18:43 ` [Cluster-devel] [PATCH/RFC dlm/next 5/6] fs: dlm: move last cast bast time to function call Alexander Aring
2022-09-16 18:43 ` [Cluster-devel] [PATCH/RFC dlm/next 6/6] fs: dlm: use a non-static queue for callbacks Alexander Aring
2022-09-20 18:36 ` Alexander Aring [this message]
2022-09-20 18:43 ` Alexander Aring
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=CAK-6q+gEirxOAeded1HGLd54n5UrXfcCxOmLiDFidDLX0581xQ@mail.gmail.com \
--to=aahringo@redhat.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).