From mboxrd@z Thu Jan 1 00:00:00 1970 From: Alexander Aring Date: Wed, 23 Jun 2021 11:14:49 -0400 Subject: [Cluster-devel] [RFC dlm/next 10/15] fs: dlm: introduce reconnect work In-Reply-To: <20210623151454.176649-1-aahringo@redhat.com> References: <20210623151454.176649-1-aahringo@redhat.com> Message-ID: <20210623151454.176649-11-aahringo@redhat.com> List-Id: To: cluster-devel.redhat.com MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit This patch will add another work to close the sockets which we cannot do inside the lowcomms_error_report() handler. This patch will also close the "othercon" sock if present. Signed-off-by: Alexander Aring --- fs/dlm/lowcomms.c | 163 +++++++++++++++++++++++++--------------------- 1 file changed, 88 insertions(+), 75 deletions(-) diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index d2febefe1d0d..a54ed3cf0b45 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -73,10 +73,8 @@ struct connection { #define CF_APP_LIMITED 3 #define CF_SHUTDOWN 4 #define CF_CONNECTED 5 -#define CF_RECONNECT 6 -#define CF_DELAY_CONNECT 7 -#define CF_EOF 8 -#define CF_STOP 9 +#define CF_EOF 6 +#define CF_STOP 7 struct list_head writequeue; /* List of outgoing writequeue_entries */ spinlock_t writequeue_lock; atomic_t writequeue_cnt; @@ -89,7 +87,9 @@ struct connection { struct mutex rwork_lock; struct work_struct rwork; struct mutex swork_lock; - struct work_struct swork; + struct delayed_work swork; + struct work_struct cwork; + int sk_err; wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ unsigned char *rx_buf; int rx_buflen; @@ -184,6 +184,7 @@ DEFINE_STATIC_SRCU(connections_srcu); static const struct dlm_proto_ops *dlm_proto_ops; +static void process_close_sockets(struct work_struct *work); static void process_recv_sockets(struct work_struct *work); static void process_send_sockets(struct work_struct *work); @@ -208,6 +209,16 @@ static inline void dlm_io_queue(struct connection *con, queue_work(io_workqueue, work); } +static inline void dlm_io_delayed_queue(struct connection *con, + struct delayed_work *dwork, + unsigned long delay) +{ + if (test_bit(CF_STOP, &con->flags)) + return; + + queue_delayed_work(io_workqueue, dwork, delay); +} + static inline struct connection *dlm_sendcon(struct connection *con) { if (test_bit(CF_IS_OTHERCON, &con->flags)) @@ -260,8 +271,9 @@ static int dlm_con_init(struct connection *con, int nodeid) INIT_LIST_HEAD(&con->writequeue); spin_lock_init(&con->writequeue_lock); atomic_set(&con->writequeue_cnt, 0); - INIT_WORK(&con->swork, process_send_sockets); + INIT_DELAYED_WORK(&con->swork, process_send_sockets); INIT_WORK(&con->rwork, process_recv_sockets); + INIT_WORK(&con->cwork, process_close_sockets); init_waitqueue_head(&con->shutdown_wait); return 0; @@ -551,19 +563,10 @@ static void lowcomms_write_space(struct sock *sk) } out: - dlm_io_queue(con, &con->swork); + dlm_io_delayed_queue(con, &con->swork, 0); read_unlock_bh(&sk->sk_callback_lock); } -static inline void lowcomms_connect_sock(struct connection *con) -{ - if (test_bit(CF_CLOSE, &con->flags)) - return; - - dlm_io_queue(con, &con->swork); - cond_resched(); -} - static void lowcomms_state_change(struct sock *sk) { /* SCTP layer is not calling sk_data_ready when the connection @@ -579,27 +582,6 @@ static void lowcomms_state_change(struct sock *sk) } } -int dlm_lowcomms_connect_node(int nodeid) -{ - struct connection *con; - int idx; - - if (nodeid == dlm_our_nodeid()) - return 0; - - idx = srcu_read_lock(&connections_srcu); - con = nodeid2con(nodeid, GFP_NOFS); - if (!con) { - srcu_read_unlock(&connections_srcu, idx); - return -ENOMEM; - } - - lowcomms_connect_sock(con); - srcu_read_unlock(&connections_srcu, idx); - - return 0; -} - int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark) { struct dlm_node_addr *na; @@ -659,20 +641,8 @@ static void lowcomms_error_report(struct sock *sk) sk->sk_err_soft); } - /* below sendcon only handling */ - if (test_bit(CF_IS_OTHERCON, &con->flags)) - con = con->sendcon; - - switch (sk->sk_err) { - case ECONNREFUSED: - set_bit(CF_DELAY_CONNECT, &con->flags); - break; - default: - break; - } - - if (!test_and_set_bit(CF_RECONNECT, &con->flags)) - dlm_io_queue(con, &con->swork); + con->sk_err = sk->sk_err; + dlm_io_queue(dlm_sendcon(con), &con->cwork); out: read_unlock_bh(&sk->sk_callback_lock); @@ -837,8 +807,6 @@ static void close_connection(struct connection *con, bool and_other) con->retries = 0; clear_bit(CF_APP_LIMITED, &con->flags); clear_bit(CF_CONNECTED, &con->flags); - clear_bit(CF_DELAY_CONNECT, &con->flags); - clear_bit(CF_RECONNECT, &con->flags); clear_bit(CF_EOF, &con->flags); /* handling for tcp shutdown */ @@ -851,10 +819,13 @@ static void cancel_io_work(struct connection *con, bool and_other) struct connection *sendcon = dlm_sendcon(con); set_bit(CF_STOP, &sendcon->flags); - cancel_work_sync(&sendcon->swork); cancel_work_sync(&sendcon->rwork); - if (sendcon->othercon && and_other) + cancel_work_sync(&sendcon->cwork); + cancel_delayed_work_sync(&sendcon->swork); + if (sendcon->othercon && and_other) { cancel_work_sync(&sendcon->othercon->rwork); + cancel_work_sync(&sendcon->othercon->cwork); + } dlm_con_lock(sendcon); close_connection(con, and_other); @@ -911,7 +882,7 @@ static void dlm_tcp_shutdown(struct connection *con) } /* flush all send */ - flush_work(&con->swork); + flush_delayed_work(&con->swork); shutdown_connection(con, con); } @@ -1002,9 +973,10 @@ static void receive_from_sock(struct connection *con, return; out_close: - if (!test_and_set_bit(CF_RECONNECT, &sendcon->flags)) - dlm_io_queue(sendcon, &sendcon->swork); - + dlm_con_lock(sendcon); + close_connection(con, false); + dlm_con_unlock(sendcon); + dlm_io_delayed_queue(sendcon, &sendcon->swork, 0); return; out_eof: @@ -1376,7 +1348,7 @@ static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg) e->len = DLM_WQ_LENGTH_BYTES(e); spin_unlock(&con->writequeue_lock); - dlm_io_queue(con, &con->swork); + dlm_io_delayed_queue(con, &con->swork, 0); return; out: @@ -1430,7 +1402,7 @@ static void send_to_sock(struct connection *con) mutex_lock(&con->swork_lock); if (con->sock == NULL) { - dlm_io_queue(con, &con->swork); + dlm_io_delayed_queue(con, &con->swork, 0); goto out; } @@ -1611,33 +1583,74 @@ static int dlm_connect(struct connection *con) result != -EPROTONOSUPPORT) { log_print("connect %d try %d error %d", con->nodeid, con->retries, result); - msleep(1000); - lowcomms_connect_sock(con); + dlm_io_delayed_queue(con, &con->swork, + msecs_to_jiffies(1000)); } return result; } -/* Send workqueue function */ -static void process_send_sockets(struct work_struct *work) +int dlm_lowcomms_connect_node(int nodeid) { - struct connection *con = container_of(work, struct connection, swork); + struct connection *con; + int idx; - WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags)); + if (nodeid == dlm_our_nodeid()) + return 0; - if (test_and_clear_bit(CF_RECONNECT, &con->flags)) { - dlm_con_lock(con); - close_connection(con, false); - dlm_midcomms_unack_msg_resend(con->nodeid); - dlm_con_unlock(con); + idx = srcu_read_lock(&connections_srcu); + con = nodeid2con(nodeid, GFP_NOFS); + if (!con) { + srcu_read_unlock(&connections_srcu, idx); + return -ENOMEM; } - if (con->sock == NULL) { /* not mutex protected so check it inside too */ - if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags)) - msleep(1000); + if (test_bit(CF_CLOSE, &con->flags)) { + srcu_read_unlock(&connections_srcu, idx); + return 0; + } + dlm_con_lock(con); + dlm_connect(con); + dlm_con_unlock(con); + srcu_read_unlock(&connections_srcu, idx); + + cond_resched(); + return 0; +} + +static void process_close_sockets(struct work_struct *work) +{ + struct connection *con = container_of(work, struct connection, cwork); + struct connection *sendcon = dlm_sendcon(con); + unsigned int delay = 0; + + dlm_con_lock(sendcon); + close_connection(con, false); + dlm_con_unlock(sendcon); + + switch (con->sk_err) { + case ECONNREFUSED: + delay = msecs_to_jiffies(1000); + break; + default: + break; + } + + dlm_io_delayed_queue(sendcon, &sendcon->swork, delay); +} + +/* Send workqueue function */ +static void process_send_sockets(struct work_struct *work) +{ + struct connection *con = container_of(work, struct connection, + swork.work); + + /* be used to connect socket */ + if (con->sock == NULL) { dlm_con_lock(con); dlm_connect(con); + dlm_midcomms_unack_msg_resend(con->nodeid); dlm_con_unlock(con); } -- 2.26.3