From: Jon Maloy <jon.maloy@ericsson.com>
To: <davem@davemloft.net>, <netdev@vger.kernel.org>
Cc: <mohan.krishna.ghanta.krishnamurthy@ericsson.com>,
<tung.q.nguyen@dektech.com.au>, <hoang.h.le@dektech.com.au>,
<jon.maloy@ericsson.com>, <canh.d.luu@dektech.com.au>,
<ying.xue@windriver.com>, <tipc-discussion@lists.sourceforge.net>
Subject: [net-next 04/10] tipc: simplify interaction between subscription and topology connection
Date: Thu, 15 Feb 2018 10:40:45 +0100 [thread overview]
Message-ID: <1518687651-26561-5-git-send-email-jon.maloy@ericsson.com> (raw)
In-Reply-To: <1518687651-26561-1-git-send-email-jon.maloy@ericsson.com>
The message transmission and reception in the topology server is more
generic than is currently necessary. By basing the funtionality on the
fact that we only send items of type struct tipc_event and always
receive items of struct tipc_subcr we can make several simplifications,
and also get rid of some unnecessary dynamic memory allocations.
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
---
net/tipc/name_table.c | 10 +--
net/tipc/server.c | 170 +++++++++++++++++---------------------------------
net/tipc/server.h | 12 +---
net/tipc/subscr.c | 40 ++++++------
net/tipc/subscr.h | 5 +-
5 files changed, 88 insertions(+), 149 deletions(-)
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index ed0457c..c0ca7be 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -810,14 +810,15 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
*/
void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
{
- struct tipc_net *tn = net_generic(s->net, tipc_net_id);
+ struct tipc_server *srv = s->server;
+ struct tipc_net *tn = tipc_net(srv->net);
u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap);
int index = hash(type);
struct name_seq *seq;
struct tipc_name_seq ns;
spin_lock_bh(&tn->nametbl_lock);
- seq = nametbl_find_seq(s->net, type);
+ seq = nametbl_find_seq(srv->net, type);
if (!seq)
seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]);
if (seq) {
@@ -837,12 +838,13 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
*/
void tipc_nametbl_unsubscribe(struct tipc_subscription *s)
{
- struct tipc_net *tn = net_generic(s->net, tipc_net_id);
+ struct tipc_server *srv = s->server;
+ struct tipc_net *tn = tipc_net(srv->net);
struct name_seq *seq;
u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap);
spin_lock_bh(&tn->nametbl_lock);
- seq = nametbl_find_seq(s->net, type);
+ seq = nametbl_find_seq(srv->net, type);
if (seq != NULL) {
spin_lock_bh(&seq->lock);
list_del_init(&s->nameseq_list);
diff --git a/net/tipc/server.c b/net/tipc/server.c
index b8268c0..7933fb9 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -84,9 +84,9 @@ struct tipc_conn {
/* An entry waiting to be sent */
struct outqueue_entry {
- u32 evt;
+ bool inactive;
+ struct tipc_event evt;
struct list_head list;
- struct kvec iov;
};
static void tipc_recv_work(struct work_struct *work);
@@ -154,6 +154,9 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
return con;
}
+/* sock_data_ready - interrupt callback indicating the socket has data to read
+ * The queued job is launched in tipc_recv_from_sock()
+ */
static void sock_data_ready(struct sock *sk)
{
struct tipc_conn *con;
@@ -168,6 +171,10 @@ static void sock_data_ready(struct sock *sk)
read_unlock_bh(&sk->sk_callback_lock);
}
+/* sock_write_space - interrupt callback after a sendmsg EAGAIN
+ * Indicates that there now is more is space in the send buffer
+ * The queued job is launched in tipc_send_to_sock()
+ */
static void sock_write_space(struct sock *sk)
{
struct tipc_conn *con;
@@ -273,10 +280,10 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
return con;
}
-int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
- void *buf, size_t len)
+static int tipc_con_rcv_sub(struct tipc_server *srv,
+ struct tipc_conn *con,
+ struct tipc_subscr *s)
{
- struct tipc_subscr *s = (struct tipc_subscr *)buf;
struct tipc_subscription *sub;
bool status;
int swap;
@@ -292,7 +299,7 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
return 0;
}
status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
- sub = tipc_subscrp_subscribe(net, s, conid, swap, status);
+ sub = tipc_subscrp_subscribe(srv, s, con->conid, swap, status);
if (!sub)
return -1;
@@ -304,43 +311,27 @@ int tipc_con_rcv_sub(struct net *net, int conid, struct tipc_conn *con,
static int tipc_receive_from_sock(struct tipc_conn *con)
{
- struct tipc_server *s = con->server;
+ struct tipc_server *srv = con->server;
struct sock *sk = con->sock->sk;
struct msghdr msg = {};
+ struct tipc_subscr s;
struct kvec iov;
- void *buf;
int ret;
- buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC);
- if (!buf) {
- ret = -ENOMEM;
- goto out_close;
- }
-
- iov.iov_base = buf;
- iov.iov_len = s->max_rcvbuf_size;
+ iov.iov_base = &s;
+ iov.iov_len = sizeof(s);
msg.msg_name = NULL;
iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, iov.iov_len);
ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT);
- if (ret <= 0) {
- kmem_cache_free(s->rcvbuf_cache, buf);
- goto out_close;
+ if (ret == -EWOULDBLOCK)
+ return -EWOULDBLOCK;
+ if (ret > 0) {
+ read_lock_bh(&sk->sk_callback_lock);
+ ret = tipc_con_rcv_sub(srv, con, &s);
+ read_unlock_bh(&sk->sk_callback_lock);
}
-
- read_lock_bh(&sk->sk_callback_lock);
- ret = tipc_con_rcv_sub(s->net, con->conid, con, buf, ret);
- read_unlock_bh(&sk->sk_callback_lock);
- kmem_cache_free(s->rcvbuf_cache, buf);
if (ret < 0)
- tipc_conn_terminate(s, con->conid);
- return ret;
-
-out_close:
- if (ret != -EWOULDBLOCK)
tipc_close_conn(con);
- else if (ret == 0)
- /* Don't return success if we really got EOF */
- ret = -EAGAIN;
return ret;
}
@@ -442,33 +433,6 @@ static int tipc_open_listening_sock(struct tipc_server *s)
return 0;
}
-static struct outqueue_entry *tipc_alloc_entry(void *data, int len)
-{
- struct outqueue_entry *entry;
- void *buf;
-
- entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC);
- if (!entry)
- return NULL;
-
- buf = kmemdup(data, len, GFP_ATOMIC);
- if (!buf) {
- kfree(entry);
- return NULL;
- }
-
- entry->iov.iov_base = buf;
- entry->iov.iov_len = len;
-
- return entry;
-}
-
-static void tipc_free_entry(struct outqueue_entry *e)
-{
- kfree(e->iov.iov_base);
- kfree(e);
-}
-
static void tipc_clean_outqueues(struct tipc_conn *con)
{
struct outqueue_entry *e, *safe;
@@ -476,50 +440,40 @@ static void tipc_clean_outqueues(struct tipc_conn *con)
spin_lock_bh(&con->outqueue_lock);
list_for_each_entry_safe(e, safe, &con->outqueue, list) {
list_del(&e->list);
- tipc_free_entry(e);
+ kfree(e);
}
spin_unlock_bh(&con->outqueue_lock);
}
-int tipc_conn_sendmsg(struct tipc_server *s, int conid,
- u32 evt, void *data, size_t len)
+/* tipc_conn_queue_evt - interrupt level call from a subscription instance
+ * The queued job is launched in tipc_send_to_sock()
+ */
+void tipc_conn_queue_evt(struct tipc_server *s, int conid,
+ u32 event, struct tipc_event *evt)
{
struct outqueue_entry *e;
struct tipc_conn *con;
con = tipc_conn_lookup(s, conid);
if (!con)
- return -EINVAL;
+ return;
- if (!connected(con)) {
- conn_put(con);
- return 0;
- }
+ if (!connected(con))
+ goto err;
- e = tipc_alloc_entry(data, len);
- if (!e) {
- conn_put(con);
- return -ENOMEM;
- }
- e->evt = evt;
+ e = kmalloc(sizeof(*e), GFP_ATOMIC);
+ if (!e)
+ goto err;
+ e->inactive = (event == TIPC_SUBSCR_TIMEOUT);
+ memcpy(&e->evt, evt, sizeof(*evt));
spin_lock_bh(&con->outqueue_lock);
list_add_tail(&e->list, &con->outqueue);
spin_unlock_bh(&con->outqueue_lock);
- if (!queue_work(s->send_wq, &con->swork))
- conn_put(con);
- return 0;
-}
-
-void tipc_conn_terminate(struct tipc_server *s, int conid)
-{
- struct tipc_conn *con;
-
- con = tipc_conn_lookup(s, conid);
- if (con) {
- tipc_close_conn(con);
- conn_put(con);
- }
+ if (queue_work(s->send_wq, &con->swork))
+ return;
+err:
+ conn_put(con);
}
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
@@ -542,7 +496,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
*conid = con->conid;
con->sock = NULL;
- rc = tipc_con_rcv_sub(net, *conid, con, &sub, sizeof(sub));
+ rc = tipc_con_rcv_sub(tipc_topsrv(net), con, &sub);
if (rc < 0)
tipc_close_conn(con);
return !rc;
@@ -587,6 +541,7 @@ static void tipc_send_to_sock(struct tipc_conn *con)
struct outqueue_entry *e;
struct tipc_event *evt;
struct msghdr msg;
+ struct kvec iov;
int count = 0;
int ret;
@@ -594,27 +549,28 @@ static void tipc_send_to_sock(struct tipc_conn *con)
while (!list_empty(queue)) {
e = list_first_entry(queue, struct outqueue_entry, list);
-
+ evt = &e->evt;
spin_unlock_bh(&con->outqueue_lock);
- if (e->evt == TIPC_SUBSCR_TIMEOUT) {
- evt = (struct tipc_event *)e->iov.iov_base;
+ if (e->inactive)
tipc_con_delete_sub(con, &evt->s);
- }
+
memset(&msg, 0, sizeof(msg));
msg.msg_flags = MSG_DONTWAIT;
+ iov.iov_base = evt;
+ iov.iov_len = sizeof(*evt);
+ msg.msg_name = NULL;
if (con->sock) {
- ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
- e->iov.iov_len);
+ ret = kernel_sendmsg(con->sock, &msg, &iov,
+ 1, sizeof(*evt));
if (ret == -EWOULDBLOCK || ret == 0) {
cond_resched();
goto out;
} else if (ret < 0) {
- goto send_err;
+ goto err;
}
} else {
- evt = e->iov.iov_base;
tipc_send_kern_top_evt(srv->net, evt);
}
@@ -625,13 +581,12 @@ static void tipc_send_to_sock(struct tipc_conn *con)
}
spin_lock_bh(&con->outqueue_lock);
list_del(&e->list);
- tipc_free_entry(e);
+ kfree(e);
}
spin_unlock_bh(&con->outqueue_lock);
out:
return;
-
-send_err:
+err:
tipc_close_conn(con);
}
@@ -695,22 +650,14 @@ int tipc_server_start(struct tipc_server *s)
idr_init(&s->conn_idr);
s->idr_in_use = 0;
- s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size,
- 0, SLAB_HWCACHE_ALIGN, NULL);
- if (!s->rcvbuf_cache)
- return -ENOMEM;
-
ret = tipc_work_start(s);
- if (ret < 0) {
- kmem_cache_destroy(s->rcvbuf_cache);
+ if (ret < 0)
return ret;
- }
+
ret = tipc_open_listening_sock(s);
- if (ret < 0) {
+ if (ret < 0)
tipc_work_stop(s);
- kmem_cache_destroy(s->rcvbuf_cache);
- return ret;
- }
+
return ret;
}
@@ -731,6 +678,5 @@ void tipc_server_stop(struct tipc_server *s)
spin_unlock_bh(&s->idr_lock);
tipc_work_stop(s);
- kmem_cache_destroy(s->rcvbuf_cache);
idr_destroy(&s->conn_idr);
}
diff --git a/net/tipc/server.h b/net/tipc/server.h
index fcc7232..2de8709 100644
--- a/net/tipc/server.h
+++ b/net/tipc/server.h
@@ -36,6 +36,7 @@
#ifndef _TIPC_SERVER_H
#define _TIPC_SERVER_H
+#include "core.h"
#include <linux/idr.h>
#include <linux/tipc.h>
#include <net/net_namespace.h>
@@ -68,7 +69,6 @@ struct tipc_server {
spinlock_t idr_lock;
int idr_in_use;
struct net *net;
- struct kmem_cache *rcvbuf_cache;
struct workqueue_struct *rcv_wq;
struct workqueue_struct *send_wq;
int max_rcvbuf_size;
@@ -76,19 +76,13 @@ struct tipc_server {
char name[TIPC_SERVER_NAME_LEN];
};
-int tipc_conn_sendmsg(struct tipc_server *s, int conid,
- u32 evt, void *data, size_t len);
+void tipc_conn_queue_evt(struct tipc_server *s, int conid,
+ u32 event, struct tipc_event *evt);
bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
u32 upper, u32 filter, int *conid);
void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
-/**
- * tipc_conn_terminate - terminate connection with server
- *
- * Note: Must call it in process context since it might sleep
- */
-void tipc_conn_terminate(struct tipc_server *s, int conid);
int tipc_server_start(struct tipc_server *s);
void tipc_server_stop(struct tipc_server *s);
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index c6de145..c3e0b924 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -52,22 +52,19 @@ static u32 htohl(u32 in, int swap)
static void tipc_subscrp_send_event(struct tipc_subscription *sub,
u32 found_lower, u32 found_upper,
- u32 event, u32 port_ref, u32 node)
+ u32 event, u32 port, u32 node)
{
- struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
- struct kvec msg_sect;
+ struct tipc_event *evt = &sub->evt;
+ bool swap = sub->swap;
if (sub->inactive)
return;
- msg_sect.iov_base = (void *)&sub->evt;
- msg_sect.iov_len = sizeof(struct tipc_event);
- sub->evt.event = htohl(event, sub->swap);
- sub->evt.found_lower = htohl(found_lower, sub->swap);
- sub->evt.found_upper = htohl(found_upper, sub->swap);
- sub->evt.port.ref = htohl(port_ref, sub->swap);
- sub->evt.port.node = htohl(node, sub->swap);
- tipc_conn_sendmsg(tn->topsrv, sub->conid, event,
- msg_sect.iov_base, msg_sect.iov_len);
+ evt->event = htohl(event, swap);
+ evt->found_lower = htohl(found_lower, swap);
+ evt->found_upper = htohl(found_upper, swap);
+ evt->port.ref = htohl(port, swap);
+ evt->port.node = htohl(node, swap);
+ tipc_conn_queue_evt(sub->server, sub->conid, event, evt);
}
/**
@@ -137,10 +134,11 @@ static void tipc_subscrp_timeout(struct timer_list *t)
static void tipc_subscrp_kref_release(struct kref *kref)
{
- struct tipc_subscription *sub = container_of(kref,
- struct tipc_subscription,
- kref);
- struct tipc_net *tn = net_generic(sub->net, tipc_net_id);
+ struct tipc_subscription *sub;
+ struct tipc_net *tn;
+
+ sub = container_of(kref, struct tipc_subscription, kref);
+ tn = tipc_net(sub->server->net);
atomic_dec(&tn->subscription_count);
kfree(sub);
@@ -156,11 +154,11 @@ void tipc_subscrp_get(struct tipc_subscription *subscription)
kref_get(&subscription->kref);
}
-static struct tipc_subscription *tipc_subscrp_create(struct net *net,
+static struct tipc_subscription *tipc_subscrp_create(struct tipc_server *srv,
struct tipc_subscr *s,
int conid, bool swap)
{
- struct tipc_net *tn = net_generic(net, tipc_net_id);
+ struct tipc_net *tn = tipc_net(srv->net);
struct tipc_subscription *sub;
u32 filter = htohl(s->filter, swap);
@@ -179,7 +177,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
}
/* Initialize subscription object */
- sub->net = net;
+ sub->server = srv;
sub->conid = conid;
sub->inactive = false;
if (((filter & TIPC_SUB_PORTS) && (filter & TIPC_SUB_SERVICE)) ||
@@ -197,7 +195,7 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
return sub;
}
-struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
+struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv,
struct tipc_subscr *s,
int conid, bool swap,
bool status)
@@ -205,7 +203,7 @@ struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
struct tipc_subscription *sub = NULL;
u32 timeout;
- sub = tipc_subscrp_create(net, s, conid, swap);
+ sub = tipc_subscrp_create(srv, s, conid, swap);
if (!sub)
return NULL;
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index cfd0cb3..64ffd32 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -49,7 +49,6 @@ struct tipc_conn;
* struct tipc_subscription - TIPC network topology subscription object
* @subscriber: pointer to its subscriber
* @seq: name sequence associated with subscription
- * @net: point to network namespace
* @timer: timer governing subscription duration (optional)
* @nameseq_list: adjacent subscriptions in name sequence's subscription list
* @subscrp_list: adjacent subscriptions in subscriber's subscription list
@@ -58,7 +57,7 @@ struct tipc_conn;
*/
struct tipc_subscription {
struct kref kref;
- struct net *net;
+ struct tipc_server *server;
struct timer_list timer;
struct list_head nameseq_list;
struct list_head subscrp_list;
@@ -69,7 +68,7 @@ struct tipc_subscription {
spinlock_t lock; /* serialize up/down and timer events */
};
-struct tipc_subscription *tipc_subscrp_subscribe(struct net *net,
+struct tipc_subscription *tipc_subscrp_subscribe(struct tipc_server *srv,
struct tipc_subscr *s,
int conid, bool swap,
bool status);
--
2.1.4
next prev parent reply other threads:[~2018-02-15 9:42 UTC|newest]
Thread overview: 13+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-02-15 9:40 [net-next 00/10] tipc: de-generealize topology server Jon Maloy
2018-02-15 9:40 ` [net-next 01/10] tipc: remove redundant code in " Jon Maloy
2018-02-15 9:40 ` [net-next 02/10] tipc: remove unnecessary function pointers Jon Maloy
2018-02-15 9:40 ` [net-next 03/10] tipc: eliminate struct tipc_subscriber Jon Maloy
2018-02-15 9:40 ` Jon Maloy [this message]
2018-02-15 9:40 ` [net-next 05/10] tipc: simplify endianness handling in topology subscriber Jon Maloy
2018-02-15 9:40 ` [net-next 06/10] tipc: collapse subscription creation functions Jon Maloy
2018-02-15 9:40 ` [net-next 07/10] tipc: some prefix changes Jon Maloy
2018-02-15 9:40 ` [net-next 08/10] tipc: make struct tipc_server private for server.c Jon Maloy
2018-02-15 9:40 ` [net-next 09/10] tipc: separate topology server listener socket from subcsriber sockets Jon Maloy
2018-02-17 18:35 ` [tipc] 5fb6af56fa: BUG:sleeping_function_called_from_invalid_context_at_net/core/sock.c kernel test robot
2018-02-15 9:40 ` [net-next 10/10] tipc: rename tipc_server to tipc_topsrv Jon Maloy
2018-02-16 20:29 ` [net-next 00/10] tipc: de-generealize topology server David Miller
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1518687651-26561-5-git-send-email-jon.maloy@ericsson.com \
--to=jon.maloy@ericsson.com \
--cc=canh.d.luu@dektech.com.au \
--cc=davem@davemloft.net \
--cc=hoang.h.le@dektech.com.au \
--cc=mohan.krishna.ghanta.krishnamurthy@ericsson.com \
--cc=netdev@vger.kernel.org \
--cc=tipc-discussion@lists.sourceforge.net \
--cc=tung.q.nguyen@dektech.com.au \
--cc=ying.xue@windriver.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).