cluster-devel.redhat.com archive mirror
 help / color / mirror / Atom feed
From: Alexander Aring <aahringo@redhat.com>
To: cluster-devel.redhat.com
Subject: [Cluster-devel] [PATCH/RFC dlm/next 6/6] fs: dlm: use a non-static queue for callbacks
Date: Tue, 20 Sep 2022 14:36:23 -0400	[thread overview]
Message-ID: <CAK-6q+gEirxOAeded1HGLd54n5UrXfcCxOmLiDFidDLX0581xQ@mail.gmail.com> (raw)
In-Reply-To: <20220916184309.3179451-6-aahringo@redhat.com>

Hi,

On Fri, Sep 16, 2022 at 2:43 PM Alexander Aring <aahringo@redhat.com> wrote:
>
> This patch will introducde a queue implementation for callbacks by using
> the Linux lists. The current callback queue handling is implemented by a
> static limit of 6 entries, see DLM_CALLBACKS_SIZE. The sequence number
> inside the callback structure was used to see if the entries inside the
> static entry is valid or not. We don't need any sequence numbers anymore
> with a dynamic datastructure with grows and shrinks during runtime to
> offer such functionality.
>
> We assume that every callback will be delivered to the DLM user if once
> queued. Therefore the callback flag DLM_CB_SKIP was dropped and the
> check for skipping bast was moved before worker handling and not skip
> while the callback worker executes. This will reduce unnecessary queues
> of the callback worker.
>
> All last callback saves are pointers now and don't need to copied over.
> There is a reference counter for callback structures which will care
> about to free the callback structures at the right time if they are not
> referenced anymore.
>
> Signed-off-by: Alexander Aring <aahringo@redhat.com>
...
>
> -       for (i = 1; i < DLM_CALLBACKS_SIZE; i++) {
> -               if (!lkb->lkb_callbacks[i].seq)
> -                       break;
> -               memcpy(&lkb->lkb_callbacks[i-1], &lkb->lkb_callbacks[i],
> -                      sizeof(struct dlm_callback));
> -               memset(&lkb->lkb_callbacks[i], 0, sizeof(struct dlm_callback));
> -               (*resid)++;
> -       }
> -
> -       /* if cb is a bast, it should be skipped if the blocking mode is
> -          compatible with the last granted mode */
> -
> -       if ((cb->flags & DLM_CB_BAST) && lkb->lkb_last_cast.seq) {
> -               if (dlm_modes_compat(cb->mode, lkb->lkb_last_cast.mode)) {
> -                       cb->flags |= DLM_CB_SKIP;
> -
> -                       log_debug(ls, "skip %x bast %llu mode %d "
> -                                 "for cast %llu mode %d",
> -                                 lkb->lkb_id,
> -                                 (unsigned long long)cb->seq,
> -                                 cb->mode,
> -                                 (unsigned long long)lkb->lkb_last_cast.seq,
> -                                 lkb->lkb_last_cast.mode);
> -                       rv = 0;
> -                       goto out;
> -               }
> -       }
> +       if (flags & DLM_CB_BAST)
> +               dlm_callback_set_last_ptr(&lkb->lkb_last_bast, cb);

I will change this to have an int lkb->lkb_last_bast_mode, this is
only used for debugfs and we need to hold a lock to get this
information. However we only need the mode and this only for debugging
information... so we copy just the mode value.

>
> -       if (cb->flags & DLM_CB_CAST)
> -               memcpy(&lkb->lkb_last_cast, cb, sizeof(struct dlm_callback));
> +       dlm_callback_set_last_ptr(&lkb->lkb_last_cb, cb);
>
> -       if (cb->flags & DLM_CB_BAST)
> -               memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback));
> -       rv = 0;
>   out:
>         return rv;
>  }
>
> +int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb)
> +{
> +       /* oldest undelivered cb is callbacks first entry */
> +       *cb = list_first_entry_or_null(&lkb->lkb_callbacks,
> +                                      struct dlm_callback, list);
> +       if (!*cb)
> +               return DLM_DEQUEUE_CALLBACK_FAILURE;
> +
> +       /* remove it from callbacks so shift others down */
> +       list_del(&(*cb)->list);
> +       if (atomic_dec_and_test(&lkb->lkb_callbacks_count))
> +               return DLM_DEQUEUE_CALLBACK_LAST;
> +
> +       return DLM_DEQUEUE_CALLBACK_SUCCESS;
> +}
> +
>  void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
>                 uint32_t sbflags)
>  {
>         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
> -       uint64_t new_seq, prev_seq;
>         int rv;
>
> -       spin_lock(&dlm_cb_seq_spin);
> -       new_seq = ++dlm_cb_seq;
> -       if (!dlm_cb_seq)
> -               new_seq = ++dlm_cb_seq;
> -       spin_unlock(&dlm_cb_seq_spin);
> -
>         if (lkb->lkb_flags & DLM_IFL_USER) {
> -               dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq);
> +               dlm_user_add_ast(lkb, flags, mode, status, sbflags);
>                 return;
>         }
>
>         spin_lock(&lkb->lkb_cb_lock);
> -       prev_seq = lkb->lkb_callbacks[0].seq;
> -
> -       rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq);
> -       if (rv < 0)
> -               goto out;
> -
> -       if (!prev_seq) {
> +       rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
> +       spin_unlock(&lkb->lkb_cb_lock);
> +       switch (rv) {
> +       case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
>                 kref_get(&lkb->lkb_ref);
>
>                 read_lock(&ls->ls_cb_lock);
> @@ -203,9 +159,16 @@ void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
>                         queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
>                 }
>                 read_unlock(&ls->ls_cb_lock);
> +               break;
> +       case DLM_ENQUEUE_CALLBACK_FAILURE:
> +               WARN_ON(1);
> +               break;
> +       case DLM_ENQUEUE_CALLBACK_SUCCESS:
> +               break;
> +       default:
> +               WARN_ON(1);
> +               break;
>         }
> - out:
> -       spin_unlock(&lkb->lkb_cb_lock);
>  }
>
>  void dlm_callback_work(struct work_struct *work)
> @@ -214,54 +177,36 @@ void dlm_callback_work(struct work_struct *work)
>         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
>         void (*castfn) (void *astparam);
>         void (*bastfn) (void *astparam, int mode);
> -       struct dlm_callback callbacks[DLM_CALLBACKS_SIZE];
> -       int i, rv, resid;
> +       struct dlm_callback *cb;
> +       int rv;
>
> -       memset(&callbacks, 0, sizeof(callbacks));
> +       WARN_ON(!atomic_read(&lkb->lkb_callbacks_count));
>

this WARN_ON() can be removed and mostly covered the WARN_ON() when
dequeue() fails. Because we see it as a failure to see a dequeue() and
there was nothing to dequeue() which indicates an issue with the whole
callback queue workflow.

> -       spin_lock(&lkb->lkb_cb_lock);
> -       if (!lkb->lkb_callbacks[0].seq) {
> -               /* no callback work exists, shouldn't happen */
> -               log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id);
> -               dlm_print_lkb(lkb);
> -               dlm_dump_lkb_callbacks(lkb);
> -       }
> +       do {
> +               spin_lock(&lkb->lkb_cb_lock);
> +               rv = dlm_dequeue_lkb_callback(lkb, &cb);
> +               spin_unlock(&lkb->lkb_cb_lock);
>
> -       for (i = 0; i < DLM_CALLBACKS_SIZE; i++) {
> -               rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid);
> -               if (rv < 0)
> +               if (WARN_ON(rv == DLM_DEQUEUE_CALLBACK_FAILURE))

here.

- Alex


  reply	other threads:[~2022-09-20 18:36 UTC|newest]

Thread overview: 10+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-09-16 18:43 [Cluster-devel] [PATCH/RFC dlm/next 1/6] fs: dlm: let dlm_add_cb queue work after resume only Alexander Aring
2022-09-16 18:43 ` [Cluster-devel] [PATCH/RFC dlm/next 2/6] fs: dlm: use list_first_entry marco Alexander Aring
2022-09-16 18:43 ` [Cluster-devel] [PATCH/RFC dlm/next 3/6] fs: dlm: change the ls cb mutex to rw lock Alexander Aring
2022-09-19 16:25   ` Alexander Aring
2022-09-19 16:46     ` Alexander Aring
2022-09-16 18:43 ` [Cluster-devel] [PATCH/RFC dlm/next 4/6] fs: dlm: use spin lock instead of mutex Alexander Aring
2022-09-16 18:43 ` [Cluster-devel] [PATCH/RFC dlm/next 5/6] fs: dlm: move last cast bast time to function call Alexander Aring
2022-09-16 18:43 ` [Cluster-devel] [PATCH/RFC dlm/next 6/6] fs: dlm: use a non-static queue for callbacks Alexander Aring
2022-09-20 18:36   ` Alexander Aring [this message]
2022-09-20 18:43     ` Alexander Aring

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=CAK-6q+gEirxOAeded1HGLd54n5UrXfcCxOmLiDFidDLX0581xQ@mail.gmail.com \
    --to=aahringo@redhat.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).