public inbox for linux-kernel@vger.kernel.org
 help / color / mirror / Atom feed
* [PATCH] unified SysV and POSIX mqueues - complete rewrite
@ 2002-11-10 23:44 Peter Waechtler
  0 siblings, 0 replies; 11+ messages in thread
From: Peter Waechtler @ 2002-11-10 23:44 UTC (permalink / raw)
  To: linux-kernel; +Cc: torvalds

[-- Attachment #1: Type: text/plain, Size: 959 bytes --]

I completely rewrote the unified SysV and Posix mqueue patch
from Jakub Jelinek.

It adds a new msgfs filesystem that's also mountable
to lookup the names and to unlink() them.
The size of the fifo shows the number of msgs in the queue.
The interface boils down to 7 new syscalls (for now just i386):
- sys_mq_open
- sys_mq_unlink
- sys_mq_timedsend
- sys_mq_timedreceive
- sys_mq_notify
- sys_mq_getattr
- sys_mq_setattr

mq_close is a simple sys_close - the FD is pollable (enhancement)

The change to ipc/msg.c is minimal - just make
- load_msg
- store_msg
- free_msg
accessible (not static).

Now that I've missed the feature freeze - I want to argue that it's
not a new feature - it's a new interface just using already
available features :-)

userspace lib and test progs are on
http://homepage.mac.com/pwaechtler/linux/mqueue/

I'll pipe some code to Ulrich Drepper for inclusion to glibc.
(Linux interface and userspace impl. for all other platforms).

[-- Attachment #2: ipcmsg-for-posixmsg.diff --]
[-- Type: text/plain, Size: 784 bytes --]

diff -Nur -X dontdiff vanilla-2.5.46/ipc/msg.c linux-2.5.46/ipc/msg.c
--- vanilla-2.5.46/ipc/msg.c	2002-11-08 15:10:06.000000000 +0100
+++ linux-2.5.46/ipc/msg.c	2002-11-10 23:38:32.000000000 +0100
@@ -127,7 +127,7 @@
 	return msg_buildid(id,msq->q_perm.seq);
 }
 
-static void free_msg(struct msg_msg* msg)
+void free_msg(struct msg_msg* msg)
 {
 	struct msg_msgseg* seg;
 	seg = msg->next;
@@ -139,7 +139,7 @@
 	}
 }
 
-static struct msg_msg* load_msg(void* src, int len)
+struct msg_msg* load_msg(void* src, int len)
 {
 	struct msg_msg* msg;
 	struct msg_msgseg** pseg;
@@ -191,7 +191,7 @@
 	return ERR_PTR(err);
 }
 
-static int store_msg(void* dest, struct msg_msg* msg, int len)
+int store_msg(void* dest, struct msg_msg* msg, int len)
 {
 	int alen;
 	struct msg_msgseg *seg;

[-- Attachment #3: posix-mq2.txt --]
[-- Type: text/plain, Size: 26981 bytes --]

diff -Nur -X dontdiff vanilla-2.5.46/arch/i386/kernel/entry.S linux-2.5.46/arch/i386/kernel/entry.S
--- vanilla-2.5.46/arch/i386/kernel/entry.S	2002-11-08 15:09:43.000000000 +0100
+++ linux-2.5.46/arch/i386/kernel/entry.S	2002-11-08 15:53:34.000000000 +0100
@@ -740,7 +740,14 @@
 	.long sys_epoll_create
 	.long sys_epoll_ctl	/* 255 */
 	.long sys_epoll_wait
- 	.long sys_remap_file_pages
+	.long sys_remap_file_pages
+	.long sys_mq_open
+	.long sys_mq_unlink
+	.long sys_mq_timedsend	/* 260 */
+	.long sys_mq_timedreceive
+	.long sys_mq_notify
+	.long sys_mq_getattr
+	.long sys_mq_setattr
 
 
 	.rept NR_syscalls-(.-sys_call_table)/4
diff -Nur -X dontdiff vanilla-2.5.46/include/asm-i386/unistd.h linux-2.5.46/include/asm-i386/unistd.h
--- vanilla-2.5.46/include/asm-i386/unistd.h	2002-11-08 15:10:01.000000000 +0100
+++ linux-2.5.46/include/asm-i386/unistd.h	2002-11-08 15:50:24.000000000 +0100
@@ -261,8 +261,15 @@
 #define __NR_sys_epoll_create	254
 #define __NR_sys_epoll_ctl	255
 #define __NR_sys_epoll_wait	256
-#define __NR_remap_file_pages	257
-
+#define __NR_remap_file_pages 257
+#define __NR_mq_open	258
+#define __NR_mq_unlink	259
+#define __NR_mq_timedsend	260
+#define __NR_mq_timedreceive	261
+#define __NR_mq_notify	262
+#define __NR_mq_getattr	263
+#define __NR_mq_setattr	264
+  
 
 /* user-visible error numbers are in the range -1 - -124: see <asm-i386/errno.h> */
 
diff -Nur -X dontdiff vanilla-2.5.46/include/linux/mqueue.h linux-2.5.46/include/linux/mqueue.h
--- vanilla-2.5.46/include/linux/mqueue.h	1970-01-01 01:00:00.000000000 +0100
+++ linux-2.5.46/include/linux/mqueue.h	2002-11-10 23:14:42.000000000 +0100
@@ -0,0 +1,46 @@
+#ifndef _LINUX_MQUEUE_H
+#define _LINUX_MQUEUE_H
+
+#define MQ_MAXMSG 	40	/* max number of messages in each queue */
+#define MQ_MAXSYSSIZE	1048576	/* max size that all m.q. can have together */
+#define MQ_PRIO_MAX 	10000	/* max priority */
+
+typedef int mqd_t;		/* message queue descriptor */
+
+struct mq_attr {
+	long mq_flags;		/* message queue flags */
+	long mq_maxmsg;		/* maximum number of messages */
+	long mq_msgsize;	/* maximum message size */
+	long mq_curmsgs;	/* number of messages currently queued */
+};
+
+asmlinkage mqd_t sys_mq_open(const char *u_path, int oflag, mode_t mode,
+	struct mq_attr *u_attr);
+asmlinkage int sys_mq_close(mqd_t mqdes);
+asmlinkage int sys_mq_unlink(const char *u_name);
+asmlinkage int sys_mq_timedsend(mqd_t mqdes, const char *msg_ptr, 
+	size_t msg_len, unsigned int msg_prio, struct timespec *utime);
+asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char *msg_ptr, 
+	size_t msg_len, unsigned int *msg_prio, struct timespec *utime);
+asmlinkage int sys_mq_notify(mqd_t mqdes,
+	const struct sigevent *u_notification);
+asmlinkage int sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat);
+asmlinkage int sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	struct mq_attr *u_omqstat);
+
+#ifdef __KERNEL__
+
+struct mqueue_ds {		/* queue */
+	struct mq_attr attr;
+	struct msg_queue queue;	/* ipc/msg */
+
+	spinlock_t lock;
+	wait_queue_head_t wait_recv;
+	wait_queue_head_t wait_send;
+
+	pid_t notify_pid;	/* who we have to notify (or 0) */
+	struct sigevent notify;	/* notification */
+};
+#endif /* __KERNEL__ */
+
+#endif
diff -Nur -X dontdiff vanilla-2.5.46/include/linux/sys.h linux-2.5.46/include/linux/sys.h
--- vanilla-2.5.46/include/linux/sys.h	2002-11-01 01:15:04.000000000 +0100
+++ linux-2.5.46/include/linux/sys.h	2002-11-01 16:36:48.000000000 +0100
@@ -4,7 +4,7 @@
 /*
  * system call entry points ... but not all are defined
  */
-#define NR_syscalls 260
+#define NR_syscalls 270
 
 /*
  * These are system calls that will be removed at some time
diff -Nur -X dontdiff vanilla-2.5.46/init/Kconfig linux-2.5.46/init/Kconfig
--- vanilla-2.5.46/init/Kconfig	2002-11-01 01:15:04.000000000 +0100
+++ linux-2.5.46/init/Kconfig	2002-11-04 16:58:14.000000000 +0100
@@ -69,6 +69,13 @@
 	  section 6.4 of the Linux Programmer's Guide, available from
 	  <http://www.linuxdoc.org/docs.html#guide>.
 
+config POSIXMSG
+	bool "POSIX message queues"
+	depends on SYSVIPC
+	---help---
+	  This gives you POSIX compliant interfaces for message queues.
+
+
 config BSD_PROCESS_ACCT
 	bool "BSD Process Accounting"
 	help
diff -Nur -X dontdiff vanilla-2.5.46/ipc/Makefile linux-2.5.46/ipc/Makefile
--- vanilla-2.5.46/ipc/Makefile	2002-09-23 15:54:57.000000000 +0200
+++ linux-2.5.46/ipc/Makefile	2002-11-04 17:50:24.000000000 +0100
@@ -6,4 +6,8 @@
 
 obj-$(CONFIG_SYSVIPC) += msg.o sem.o shm.o
 
+ifeq ($(CONFIG_POSIXMSG),y)
+obj-$(CONFIG_SYSVIPC) += posixmsg.o
+endif
+
 include $(TOPDIR)/Rules.make
diff -Nur -X dontdiff vanilla-2.5.46/ipc/posixmsg.c linux-2.5.46/ipc/posixmsg.c
--- vanilla-2.5.46/ipc/posixmsg.c	1970-01-01 01:00:00.000000000 +0100
+++ linux-2.5.46/ipc/posixmsg.c	2002-11-10 23:26:26.000000000 +0100
@@ -0,0 +1,814 @@
+/*
+ *  linux/ipc/posixmsg.c
+ *
+ *  Copyright 2002 Peter Wächtler <pwaechtler@mac.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * root can mount the filesystem to see the names of the currently
+ * used mqueues. The i_size shows the number of msgs in the queue.
+ * The only shell operation on the "fifos" is an unlink() via rm(1)
+ * The queues are accessed through syscalls - poll is supported
+ *
+ * TODO:
+ *	check for more sysv msg limits (or add some new)?
+ *  implement SIGEV_THREAD
+ */
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/slab.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+#include <linux/namei.h>
+#include <linux/pagemap.h>	/* PAGE_CACHE_SIZE */
+#include <linux/poll.h>
+
+#include <linux/mqueue.h>
+#include <linux/msg.h>
+#include <asm/uaccess.h>
+
+/* functions used from ipc/msg.c */
+extern void free_msg(struct msg_msg *msg);
+extern int store_msg(void *dest, struct msg_msg *msg, int len);
+extern struct msg_msg *load_msg(void *src, int len);
+
+extern int msg_ctlmnb;		/* default max size of a message queue (all msgs sum up) */
+extern int msg_ctlmni;		/* max # of msg queue identifiers */
+extern int msg_ctlmax;		/* max size of one message (bytes) */
+
+static int mqueue_release(struct inode *inode, struct dentry *dentry);
+static int mqueue_close(struct inode *inode, struct file *filp);
+static unsigned int mqueue_poll(struct file *, struct poll_table_struct *);
+
+static atomic_t msg_bytes = ATOMIC_INIT(0);
+static struct vfsmount *msg_mnt;
+
+static struct file_operations msg_fops = {
+	.llseek = no_llseek,
+	.release = mqueue_close,
+	.poll = mqueue_poll,
+};
+static struct inode_operations msg_dir_inode_operations = {
+	.lookup = simple_lookup,
+	.unlink = mqueue_release,
+};
+
+static struct msg_queue *
+init_queue(struct msg_queue *queue)
+{
+	int retval;
+
+	queue->q_perm.mode = 0;
+	queue->q_perm.key = 0;
+	queue->q_perm.security = NULL;
+	retval = security_ops->msg_queue_alloc_security(queue);
+	if (retval)
+		return ERR_PTR(retval);;
+
+	queue->q_stime = queue->q_rtime = 0;
+	queue->q_ctime = CURRENT_TIME;
+	queue->q_qbytes = queue->q_cbytes = queue->q_qnum = 0;
+	queue->q_lspid = queue->q_lrpid = 0;
+	INIT_LIST_HEAD(&queue->q_messages);
+	INIT_LIST_HEAD(&queue->q_receivers);	/* unused */
+	INIT_LIST_HEAD(&queue->q_senders);	/* unused */
+
+	return queue;
+}
+
+static struct inode *
+get_msg_inode(struct super_block *sb, mode_t mode)
+{
+	struct mqueue_ds *q;
+	struct inode *inode = new_inode(sb);
+
+	if (inode) {
+		inode->i_mode = mode;
+		inode->i_uid = current->fsuid;
+		inode->i_gid = current->fsgid;
+		inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
+		inode->i_blksize = 1024;
+		switch (mode & S_IFMT) {
+		default:
+		case S_IFIFO:
+		case S_IFREG:
+			inode->i_fop = &msg_fops;
+			if ((q = kmalloc(sizeof (*q), GFP_KERNEL))) {
+				if (!init_queue(&q->queue)) {
+					kfree(q);
+					iput(inode);
+					inode = ERR_PTR(-EACCES);
+				}
+				inode->u.generic_ip = q;
+			} else {
+				iput(inode);
+				inode = ERR_PTR(-ENOSPC);
+			}
+			break;
+		case S_IFDIR:
+			inode->i_op = &msg_dir_inode_operations;
+			inode->i_fop = &simple_dir_operations;
+			/* directory inodes start off with i_nlink == 2 (for "." entry) */
+			inode->i_nlink++;
+			break;
+		case S_IFLNK:
+			break;
+		}
+	} else
+		inode = ERR_PTR(-ENOSPC);
+
+	return inode;
+}
+
+#define get_mqueue(filp)\
+	((filp) ? filp->f_dentry->d_inode->u.generic_ip: filp)
+
+/* don't use fget() to avoid the fput() for no good reason */
+static struct file *
+mqueue_lookup(mqd_t fd)
+{
+	struct files_struct *files = current->files;
+
+	read_lock(&files->file_lock);
+	if (fd >= files->max_fds || fd < 0)
+		goto out_unlock;
+	read_unlock(&files->file_lock);
+	return files->fd[fd];
+
+      out_unlock:
+	read_unlock(&files->file_lock);
+	return NULL;
+}
+
+static inline int
+freespace(struct mqueue_ds *q, size_t msg_len)
+{
+	int rc;
+	spin_lock(&q->lock);
+
+	rc = msg_len + q->queue.q_cbytes <= q->attr.mq_msgsize &&
+	    q->queue.q_qnum < q->attr.mq_maxmsg;
+
+	spin_unlock(&q->lock);
+	return rc;
+}
+
+/* cleans up after close() or exit() */
+static int
+mqueue_close(struct inode *inode, struct file *filp)
+{
+	struct mqueue_ds *q = inode->u.generic_ip;
+
+	if (!q || !filp)
+		return -EBADFD;
+
+	spin_lock(&q->lock);
+	/* remove possible notification;
+	 * sys_getpid() returns the tgid if multithreaded */
+	if (q->notify_pid == current->pid) {
+		q->notify_pid = 0;
+		q->notify.sigev_signo = 0;
+		q->notify.sigev_notify = 0;
+	}
+	spin_unlock(&q->lock);
+	return 0;
+}
+
+/* removes a queue - inode is the inode of the directory */
+static int
+mqueue_release(struct inode *inode, struct dentry *dentry)
+{
+	struct mqueue_ds *q = dentry->d_inode->u.generic_ip;
+	struct msg_queue *queue;
+	struct list_head *tmp;
+
+	if (!q)
+		return -EBADFD;
+
+	queue = &q->queue;
+
+	tmp = queue->q_messages.next;
+	while (tmp != &queue->q_messages) {
+		struct msg_msg *msg = list_entry(tmp, struct msg_msg, m_list);
+		tmp = tmp->next;
+		pr_info("mqueue_release: freeing msg:%p\n", msg);
+		free_msg(msg);
+	}
+	atomic_sub(queue->q_cbytes, &msg_bytes);
+	security_ops->msg_queue_free_security(queue);
+	dentry->d_inode->u.generic_ip = NULL;
+
+	kfree(q);
+	return 0;
+}
+
+static unsigned int
+mqueue_poll(struct file *filp, struct poll_table_struct *wait)
+{
+	struct mqueue_ds *q = get_mqueue(filp);
+	int ret = 0;
+
+	poll_wait(filp, &q->wait_recv, wait);
+	poll_wait(filp, &q->wait_send, wait);
+
+	if (q->queue.q_qnum)
+		ret = POLLIN | POLLRDNORM;
+
+	if (q->queue.q_qnum < q->attr.mq_maxmsg)
+		ret |= POLLOUT | POLLWRNORM;
+
+	return ret;
+}
+
+static int
+create_queue(struct dentry *dir, struct qstr *qname,
+	     int oflag, mode_t mode, struct mq_attr *u_attr)
+{
+	int ret, fd;
+	struct file *filp;
+	struct mqueue_ds *q;
+	struct inode *inode;
+	struct dentry *dentry;
+
+	inode = get_msg_inode(msg_mnt->mnt_sb, S_IFIFO | (mode & S_IRWXUGO));
+	if (IS_ERR(inode)) {
+		ret = PTR_ERR(inode);
+		goto out_ret;
+	}
+	q = inode->u.generic_ip;
+
+	if (u_attr != NULL) {
+		if (copy_from_user(&q->attr, u_attr, sizeof (struct mq_attr))) {
+			ret = -EFAULT;
+			goto out_inode;
+		}
+		if (q->attr.mq_maxmsg <= 0
+		    || q->attr.mq_msgsize <= 0
+		    || q->attr.mq_maxmsg > MQ_MAXMSG
+		    || q->attr.mq_msgsize > msg_ctlmax) {
+			ret = -EINVAL;
+			goto out_inode;
+		}
+	} else {		/* implementation defined */
+		q->attr.mq_maxmsg = MQ_MAXMSG;
+		q->attr.mq_msgsize = 1024 /*msg_ctlmax */ ;
+	}
+	q->attr.mq_flags = oflag & O_ACCMODE;
+	q->notify_pid = 0;
+	q->notify.sigev_signo = 0;
+	q->notify.sigev_notify = 0;
+	init_waitqueue_head(&q->wait_send);
+	init_waitqueue_head(&q->wait_recv);
+
+	ret = -ENFILE;
+	if ((fd = get_unused_fd()) < 0)
+		goto out_inode;
+	if (!(filp = get_empty_filp()))
+		goto out_fd;
+
+	qname->hash = full_name_hash(qname->name, qname->len);
+	dentry = d_alloc(dir, qname);
+	if (!dentry) {
+		ret = -ENOMEM;
+		goto out_filp;
+	}
+	d_add(dentry, inode);
+	ret = get_write_access(inode);
+	if (ret)
+		goto out_filp;
+
+	filp->f_vfsmnt = mntget(msg_mnt);
+	filp->f_dentry = dget(dentry);	/* leave it active */
+	filp->f_op = &msg_fops;
+	filp->f_mode = (q->attr.mq_flags + 1) & O_ACCMODE;
+	filp->f_flags = oflag;
+
+	/* Now we map fd to filp, so userspace can access it */
+	fd_install(fd, filp);
+	ret = fd;
+	goto out_ret;
+
+      out_filp:
+	put_filp(filp);
+      out_fd:
+	put_unused_fd(fd);
+      out_inode:
+	kfree(q);
+	iput(inode);
+      out_ret:
+	return ret;
+}
+
+/**
+ *	sys_mq_open	-	opens a message queue associated with @u_name 
+ *	@mqdes: descriptor of mqueue
+ *	@oflag: flags like O_CREAT, O_EXCL, O_RDWR
+ *	@mode: when O_CREAT is specified, the permission bits
+ *	@u_attr: pointer to the attributes, like max msgsize, when creating
+ *
+ *	returns a descriptor to the opened queue or negative value on error
+ */
+asmlinkage mqd_t
+sys_mq_open(const char *u_name, int oflag, mode_t mode, struct mq_attr * u_attr)
+{
+	struct file *filp;
+	struct dentry *dentry;
+	struct qstr this;
+	static int oflag2acc[O_ACCMODE] =
+	    { MAY_READ, MAY_WRITE, MAY_READ | MAY_WRITE };
+	int fd, ret;
+
+	if (IS_ERR(this.name = getname(u_name)))
+		return -ENOMEM;
+	this.len = strlen(this.name);
+	dentry = lookup_one_len(this.name, msg_mnt->mnt_root, this.len);
+
+	if (IS_ERR(dentry)) {
+		ret = PTR_ERR(dentry);
+		goto out_ret;
+	}
+	if (oflag & O_CREAT) {
+		if (dentry->d_inode) {
+			/* entry exists already */
+			if (oflag & O_EXCL) {
+				ret = -EEXIST;
+			} else {
+				goto open_existing;
+			}
+			goto out_dput;
+		} else {
+			ret =
+			    create_queue(msg_mnt->mnt_root, &this, oflag, mode,
+					 u_attr);
+		}
+
+	} else {		/* O_CREAT */
+		if (!dentry->d_inode) {
+			ret = -ENOENT;
+		} else {
+		      open_existing:
+			if (permission
+			    (dentry->d_inode, oflag2acc[oflag & O_ACCMODE])) {
+				ret = -EACCES;
+			} else {
+				fd = get_unused_fd();
+				if (fd >= 0) {
+					mntget(msg_mnt);
+					filp =
+					    dentry_open(dentry, msg_mnt, oflag);
+					filp->f_op = &msg_fops;
+					if (IS_ERR(filp)) {
+						ret = PTR_ERR(filp);
+						put_unused_fd(fd);
+						goto out_dput;
+					}
+					dget(dentry);
+					fd_install(fd, filp);
+				}
+				ret = fd;
+			}
+		}
+	}
+      out_dput:
+	dput(dentry);
+      out_ret:
+	putname(this.name);
+
+	return ret;
+}
+
+/**
+ *	sys_mq_unlink	-	removes a message queue from the namespace 
+ *	
+ *	@u_name: pointer to the name
+ */
+asmlinkage int
+sys_mq_unlink(const char *u_name)
+{
+	int err;
+	struct dentry *dentry, *dir;
+	char *name = getname(u_name);
+
+	if (IS_ERR(name))
+		return PTR_ERR(name);
+
+	dentry = lookup_one_len(name, msg_mnt->mnt_root, strlen(name));
+	putname(name);
+	if (IS_ERR(dentry))
+		return PTR_ERR(dentry);
+
+	if (!dentry->d_inode)
+		return -ENOENT;
+
+	err = -EACCES;
+	dir = dentry->d_parent;
+	if (dir) {
+		err = vfs_unlink(dir->d_inode, dentry);
+		if (!err)
+			d_delete(dentry);
+	}
+	dput(dentry);
+	return err;
+}
+
+/**
+ *	sys_mq_timedsend	-	send a message to the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@msg_ptr: pointer to buffer holding the message
+ *	@msg_len: length of the message
+ *	@msg_prio: the priority of the message
+ *	@utime: if !NULL the function will only block for specified time
+ */
+asmlinkage int
+sys_mq_timedsend(mqd_t mqdes,
+		 const char *msg_ptr, size_t msg_len,
+		 unsigned int msg_prio, struct timespec *utime)
+{
+	struct siginfo sig_i;
+	struct msg_msg *msg;
+	struct list_head *p;
+	struct msg_queue *queue;
+	int err;
+	long timeout;
+	struct timespec ts;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+	if ((filp->f_mode & O_ACCMODE) == O_RDONLY)
+		return -EBADF;
+	if ((unsigned int) msg_prio > (unsigned int) MQ_PRIO_MAX)
+		return -EINVAL;
+	if (msg_len > q->attr.mq_msgsize)
+		return -EMSGSIZE;
+
+	queue = &q->queue;
+	if ((filp->f_flags & O_NONBLOCK) && !freespace(q, msg_len))
+		return -EAGAIN;
+
+	/* check if this message will exceed overall limit for messages */
+	if (atomic_read(&msg_bytes) + msg_len > MQ_MAXSYSSIZE)
+		return -ENOMEM;
+
+	if (utime) {
+		if (copy_from_user(&ts, utime, sizeof (ts)))
+			return -EFAULT;
+		if (ts.tv_nsec >= 1000000000L || ts.tv_nsec < 0
+		    || ts.tv_sec < 0)
+			return -EINVAL;
+		timeout = (timespec_to_jiffies(&ts)
+			   + (ts.tv_sec || ts.tv_nsec));
+	} else
+		timeout = 0L;
+
+	msg = load_msg((char *) msg_ptr, msg_len);
+	if (IS_ERR(msg))
+		return PTR_ERR(msg);
+
+	msg->m_type = msg_prio;
+	msg->m_ts = msg_len;
+
+	if (!timeout)
+		timeout = MAX_SCHEDULE_TIMEOUT;
+
+	err = wait_event_interruptible_timeout(q->wait_send,
+					       freespace(q, msg_len), timeout);
+
+	if (err == -ERESTARTSYS)
+		return -EINTR;
+	if (err == 0)
+		return -ETIMEDOUT;
+
+	err = 0;
+	/* if we lose the race for the lock, we could overflow the limits */
+	spin_lock(&q->lock);
+	/* enqueue message in prio order */
+	p = queue->q_messages.next;	/* used as flag if msg was queued */
+	if (msg_prio > 0 && !list_empty(&queue->q_messages)) {
+
+		list_for_each(p, &queue->q_messages) {
+			struct msg_msg *tmp =
+			    list_entry(p, struct msg_msg, m_list);
+			if (tmp->m_type < msg_prio) {
+				list_add_tail(&msg->m_list, p);
+				p = NULL;
+				break;
+			}
+		}
+	}
+	if (p)			/* ok, put it at the end */
+		list_add_tail(&msg->m_list, &queue->q_messages);
+
+	queue->q_lspid = current->pid;
+	queue->q_stime = CURRENT_TIME;
+	queue->q_cbytes += msg_len;
+	atomic_add(msg_len, &msg_bytes);
+	queue->q_qnum++;
+	filp->f_dentry->d_inode->i_size = queue->q_qnum;
+
+	if (waitqueue_active(&q->wait_recv)) {
+		wake_up_interruptible(&q->wait_recv);
+	} else {
+		/* since there was no synchronously waiting process for message
+		 * we notify it when the state of queue changed from
+		 * empty to not empty */
+		if (q->notify_pid != 0 && queue->q_qnum == 1) {
+			/* TODO: Add support for sigev_notify==SIGEV_THREAD
+			 *    should we really create a thread? I think so.
+			 */
+			if (q->notify.sigev_notify == SIGEV_THREAD) {
+				err = -ENOSYS;
+				pr_info
+				    ("mq_*send: SIGEV_THREAD not supported\n");
+			}
+			/* sends signal */
+			if (q->notify.sigev_notify == SIGEV_SIGNAL) {
+				sig_i.si_signo = q->notify.sigev_signo;
+				sig_i.si_errno = 0;
+				sig_i.si_code = SI_MESGQ;
+				sig_i.si_pid = current->pid;
+				sig_i.si_uid = current->uid;
+				kill_proc_info(q->notify.sigev_signo,
+					       &sig_i, q->notify_pid);
+			}
+			/* after notification unregisters process */
+			q->notify_pid = 0;
+		}
+	}
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_timedreceive	-	receive a message from the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@msg_ptr: pointer to buffer to hold the message
+ *	@msg_len: length of the userspace buffer
+ *	@msg_prio: will hold the priority if a message was received
+ *	@utime: if !NULL the function will only block for specified time
+ */
+asmlinkage ssize_t
+sys_mq_timedreceive(mqd_t mqdes,
+		    char *msg_ptr, size_t msg_len,
+		    unsigned int *msg_prio, struct timespec * utime)
+{
+	struct msg_queue *queue;
+	struct msg_msg *msg;
+	int err;
+	long timeout;
+	struct timespec ts;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	if (!(filp->f_mode & FMODE_READ))
+		return -EBADF;
+
+	queue = &q->queue;
+	if ((filp->f_flags & O_NONBLOCK) && queue->q_qnum == 0)
+		return -EAGAIN;
+
+	if (utime) {
+		if (copy_from_user(&ts, utime, sizeof (ts)))
+			return -EFAULT;
+		if (ts.tv_nsec >= 1000000000L || ts.tv_nsec < 0
+		    || ts.tv_sec < 0)
+			return -EINVAL;
+		timeout = (timespec_to_jiffies(&ts)
+			   + (ts.tv_sec || ts.tv_nsec));
+	} else
+		timeout = 0L;
+      wait_on_msg:
+	if (!timeout)
+		timeout = MAX_SCHEDULE_TIMEOUT;
+
+	err = wait_event_interruptible_timeout(q->wait_recv,
+					       queue->q_qnum > 0, timeout);
+
+	if (err == -ERESTARTSYS)
+		return -EINTR;
+	if (err == 0)
+		return -ETIMEDOUT;
+
+	err = 0;
+	spin_lock(&q->lock);
+	if (!list_empty(&queue->q_messages)) {
+
+		msg =
+		    list_entry(queue->q_messages.next, struct msg_msg, m_list);
+		if (msg_len < msg->m_ts) {
+			err = -EMSGSIZE;
+			goto out_unlock;
+		}
+		list_del(&msg->m_list);
+		queue->q_rtime = CURRENT_TIME;
+		queue->q_lrpid = current->pid;
+		queue->q_cbytes -= msg->m_ts;
+		atomic_sub(msg->m_ts, &msg_bytes);
+		queue->q_qnum--;
+		filp->f_dentry->d_inode->i_size = queue->q_qnum;
+
+		wake_up_interruptible(&q->wait_send);
+
+		msg_len = (msg_len > msg->m_ts) ? msg->m_ts : msg_len;
+
+		spin_unlock(&q->lock);
+		if ((err = store_msg(msg_ptr, msg, msg_len)) ||
+		    put_user(msg->m_type, msg_prio)) {
+			msg_len = -EFAULT;	/* hmh, now the msg is lost */
+		}
+		free_msg(msg);
+		return msg_len;
+	} else {
+		spin_unlock(&q->lock);
+		goto wait_on_msg;
+	}
+      out_unlock:
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_notify	-	set or remove a notification on the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_notification: pointer to struct sigevent 
+ *
+ */
+asmlinkage int
+sys_mq_notify(mqd_t mqdes, const struct sigevent *u_notification)
+{
+	struct sigevent notify;
+	struct inode *inode;
+	struct file *filp = mqueue_lookup(mqdes);
+	struct mqueue_ds *q = get_mqueue(filp);
+	int err = 0;
+
+	if (!q)
+		return -EBADF;
+	if (u_notification != NULL)
+		if (copy_from_user
+		    (&notify, u_notification, sizeof (struct sigevent)))
+			return -EFAULT;
+
+	inode = filp->f_dentry->d_inode;
+	spin_lock(&q->lock);
+	if (q->notify_pid == current->pid
+	    && (u_notification == NULL || notify.sigev_notify == SIGEV_NONE)) {
+		q->notify_pid = 0;	/* remove notification */
+		q->notify.sigev_signo = 0;
+		q->notify.sigev_notify = 0;
+	} else if (q->notify_pid > 0) {
+		err = -EBUSY;
+	} else if (u_notification != NULL) {
+		if (notify.sigev_notify == SIGEV_SIGNAL) {
+			/* add notification */
+			q->notify_pid = current->pid;
+			q->notify.sigev_signo = notify.sigev_signo;
+			q->notify.sigev_notify = notify.sigev_notify;
+		} else if (notify.sigev_notify == SIGEV_THREAD) {
+			err = -ENOSYS;
+			pr_info("mq_*send: SIGEV_THREAD not supported yet\n");
+		}
+	}
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_getattr	-	get the attributes of the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_mqstat: pointer to struct holding the new values
+ *
+ */
+asmlinkage int
+sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat)
+{
+	int err = 0;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	spin_lock(&q->lock);
+	q->attr.mq_flags = (filp->f_mode - 1) & O_ACCMODE;
+	q->attr.mq_flags = (filp->f_flags) & O_NONBLOCK;
+	q->attr.mq_curmsgs = q->queue.q_qnum;
+	if (copy_to_user(u_mqstat, &q->attr, sizeof (struct mq_attr)))
+		err = -EFAULT;
+	spin_lock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_setattr	-	set the attributes of the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_mqstat: pointer to struct holding the new values
+ *	@u_omqstat: pointer to store the original attributes (if !NULL)
+ *
+ */
+asmlinkage int
+sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	       struct mq_attr *u_omqstat)
+{
+	struct mq_attr mqstat;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	spin_lock(&q->lock);
+	if (u_omqstat != NULL) {
+		q->attr.mq_flags = (filp->f_mode - 1) & O_ACCMODE;
+		q->attr.mq_flags = (filp->f_flags) & O_NONBLOCK;
+		q->attr.mq_curmsgs = q->queue.q_qnum;
+		if (copy_to_user(u_omqstat, &q->attr, sizeof (struct mq_attr))) {
+			spin_unlock(&q->lock);
+			return -EFAULT;
+		}
+	}
+	if (copy_from_user(&mqstat, u_mqstat, sizeof (struct mq_attr))) {
+		spin_unlock(&q->lock);
+		return -EFAULT;
+	}
+	if (mqstat.mq_flags & O_NONBLOCK)
+		filp->f_flags |= O_NONBLOCK;
+	else
+		filp->f_flags &= ~O_NONBLOCK;
+
+	spin_unlock(&q->lock);
+	return 0;
+}
+
+static struct super_operations msg_s_ops = {
+	.statfs = simple_statfs,
+	.drop_inode = generic_delete_inode,
+};
+
+static int
+msg_fill_super(struct super_block *sb, void *data, int silent)
+{
+	struct inode *root;
+	struct dentry *root_dentry;
+
+	sb->s_blocksize = PAGE_CACHE_SIZE;
+	sb->s_blocksize_bits = PAGE_CACHE_SHIFT;
+	sb->s_magic = 0x12121212;
+	sb->s_op = &msg_s_ops;
+
+	root = get_msg_inode(sb, S_IFDIR | S_IRWXUGO | S_ISVTX);
+	if (!root)
+		goto out;
+	root_dentry = d_alloc_root(root);
+	if (!root_dentry)
+		goto out_iput;
+	sb->s_root = root_dentry;
+	return 0;
+
+      out_iput:
+	iput(root);
+      out:
+	return -ENOMEM;
+}
+
+static struct super_block *
+msgfs_get_sb(struct file_system_type *fs_type,
+	     int flags, char *dev_name, void *data)
+{
+	return get_sb_single(fs_type, flags, data, msg_fill_super);
+}
+
+static struct file_system_type msg_fs_type = {
+	.name = "msgfs",
+	.get_sb = msgfs_get_sb,
+	.kill_sb = kill_anon_super,
+};
+
+static int __init
+mqueue_init(void)
+{
+	register_filesystem(&msg_fs_type);
+	if (IS_ERR(msg_mnt = kern_mount(&msg_fs_type)))
+		return PTR_ERR(msg_mnt);
+
+	return 0;
+}
+
+static void __exit
+mqueue_exit(void)
+{
+	unregister_filesystem(&msg_fs_type);
+}
+
+__initcall(mqueue_init);
diff -Nur -X dontdiff vanilla-2.5.46/ipc/util.c linux-2.5.46/ipc/util.c
--- vanilla-2.5.46/ipc/util.c	2002-11-08 15:10:06.000000000 +0100
+++ linux-2.5.46/ipc/util.c	2002-11-08 15:48:00.000000000 +0100
@@ -24,6 +24,7 @@
 #include <linux/security.h>
 #include <linux/rcupdate.h>
 #include <linux/workqueue.h>
+#include <linux/mqueue.h>
 
 #if defined(CONFIG_SYSVIPC)
 
@@ -579,3 +580,57 @@
 }
 
 #endif /* CONFIG_SYSVIPC */
+
+#if defined(CONFIG_POSIXMSG)
+/* nothing yet */
+#else
+/*
+ * Dummy functions when POSIXMSG isn't configured
+ */
+
+asmlinkage mqd_t sys_mq_open(const char *u_path, int oflag, mode_t mode,
+	struct mq_attr *u_attr)
+{
+	return (mqd_t) -ENOSYS;
+}
+
+asmlinkage int sys_mq_close(mqd_t mqdes)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_unlink(const char *u_name)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_timedsend(mqd_t mqdes, const char *msg_ptr, 
+	size_t msg_len, unsigned int msg_prio, struct timespec *utime)
+{
+	return -ENOSYS;
+}
+
+asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char *msg_ptr, 
+	size_t msg_len, unsigned int *msg_prio, struct timespec *utime)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_notify(mqd_t mqdes,
+	const struct sigevent *u_notification)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	struct mq_attr *u_omqstat)
+{
+	return -ENOSYS;
+}
+
+#endif				/* CONFIG_POSIXMSG */

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH] unified SysV and POSIX mqueues - complete rewrite
       [not found] <EDC461A30AC4D511ADE10002A5072CAD04C70992@orsmsx119.jf.intel.com>
@ 2002-11-13 15:10 ` Michal Wronski
  2002-11-13 22:25   ` Peter Waechtler
  2002-11-24 11:55   ` Michal Wronski
  0 siblings, 2 replies; 11+ messages in thread
From: Michal Wronski @ 2002-11-13 15:10 UTC (permalink / raw)
  To: linux-kernel
  Cc: Peter Waechtler, Krzysztof Benedyczak, Gustafson, Geoffrey R,
	Abbas, Mohamed


> The interface boils down to 7 new syscalls (for now just i386):
> - sys_mq_open
> - sys_mq_unlink
> - sys_mq_timedsend
> - sys_mq_timedreceive
> ...

Why add a new syscalls?? It's better to do this via ioctl's

> The change to ipc/msg.c is minimal - just make
> - load_msg
> - store_msg
> - free_msg
> accessible (not static).

I suggest doing this independently to SysV IPC

> userspace lib and test progs are on
> http://homepage.mac.com/pwaechtler/linux/mqueue/

"We're sorry, but we can't find the HomePage you've requested."

> +#ifndef _LINUX_MQUEUE_H
> +#define _LINUX_MQUEUE_H
> +
> +#define MQ_MAXMSG 40 /* max number of messages in each queue */
> +#define MQ_MAXSYSSIZE 1048576 /* max size that all m.q. can have 
> together 
> */
> +#define MQ_PRIO_MAX 10000 /* max priority */

I see that you've read our sources....

We (K. Benedyczak with me) are currently working on new implementation of 
mqueues. It's very similar to yours (filesystem, without new syscalls) and 
it's almost done. Maybe we should collaborate??

Michal Wronski


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH] unified SysV and POSIX mqueues - complete rewrite
  2002-11-13 15:10 ` [PATCH] unified SysV and POSIX mqueues - complete rewrite Michal Wronski
@ 2002-11-13 22:25   ` Peter Waechtler
  2002-11-19 11:28     ` Krzysztof Benedyczak
  2002-11-24 11:55   ` Michal Wronski
  1 sibling, 1 reply; 11+ messages in thread
From: Peter Waechtler @ 2002-11-13 22:25 UTC (permalink / raw)
  To: Michal Wronski
  Cc: linux-kernel, Krzysztof Benedyczak, Gustafson, Geoffrey R,
	Abbas, Mohamed

Michal Wronski schrieb:
> 
> > The interface boils down to 7 new syscalls (for now just i386):
> > - sys_mq_open
> > - sys_mq_unlink
> > - sys_mq_timedsend
> > - sys_mq_timedreceive
> > ...
> 
> Why add a new syscalls?? It's better to do this via ioctl's
> 

Hmh, there was some discussion concluding that ioctl is
"bad taste from hell". Syscalls are considered cleaner
and less error prone.

> I suggest doing this independently to SysV IPC

I just reused the message storage code.

> 
> > userspace lib and test progs are on
> > http://homepage.mac.com/pwaechtler/linux/mqueue/
> 
> "We're sorry, but we can't find the HomePage you've requested."
> 

Yes, I'm also sorry ;-) 

a wget http://homepage.mac.com/pwaechtler/linux/mqueue.tgz
should work. (i'm not comfortable with my own homepage, shame on me :)

> > +#ifndef _LINUX_MQUEUE_H
> > +#define _LINUX_MQUEUE_H
> > +
> > +#define MQ_MAXMSG 40 /* max number of messages in each queue */
> > +#define MQ_MAXSYSSIZE 1048576 /* max size that all m.q. can have
> > together
> > */
> > +#define MQ_PRIO_MAX 10000 /* max priority */
> 
> I see that you've read our sources....
> 

Yes, indeed. I also evaluated your implementation and the one from Jakub.
First I thought: hey cool, they use binary trees for the messages - but
no: you use trees to keep track which process uses which queue.
Your code carries a lot of baggage.. the VFS can do all this for you/me

> We (K. Benedyczak with me) are currently working on new implementation of
> mqueues. It's very similar to yours (filesystem, without new syscalls) and
> it's almost done. Maybe we should collaborate??
> 

Yes, why not. But to be honest my implementation is already "full featured".
Perhaps one could handle the message lists more effectively .. 

I'm currently trying to do some performance measurements with NPTL and the
userspace implementation I have.

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH] unified SysV and POSIX mqueues - complete rewrite
  2002-11-13 22:25   ` Peter Waechtler
@ 2002-11-19 11:28     ` Krzysztof Benedyczak
  2002-11-19 16:42       ` Peter Waechtler
  0 siblings, 1 reply; 11+ messages in thread
From: Krzysztof Benedyczak @ 2002-11-19 11:28 UTC (permalink / raw)
  To: Peter Waechtler
  Cc: Michal Wronski, linux-kernel, Gustafson, Geoffrey R,
	Abbas, Mohamed

Hello,

After some looking into your code, I think there is a bug.
Please correct me if I'm wrong.

The problem occur when awake processes which wait for message (or free
space). I think that your code will wake them up in random order. POSIX
says:

> If more than one thread is waiting to send when space becomes
> available in the message queue and the Priority Scheduling option is
> supported, then the thread of the highest priority that has been
> waiting the longest shall be unblocked to send its message

I've written a test and it shows that my suspects are rather true?

BTW: I've had some problems with your patch when linking kernel - in your
main file were used static functions from msg.c?? Maybe my patch (taken
from lkml - post date: XI 10) was incomplete? If there is more recent
version could you inform me? Thanks.

Regards

K. Benedzyczak




^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH] unified SysV and POSIX mqueues - complete rewrite
  2002-11-19 11:28     ` Krzysztof Benedyczak
@ 2002-11-19 16:42       ` Peter Waechtler
  0 siblings, 0 replies; 11+ messages in thread
From: Peter Waechtler @ 2002-11-19 16:42 UTC (permalink / raw)
  To: Krzysztof Benedyczak
  Cc: Michal Wronski, linux-kernel, Gustafson, Geoffrey R,
	Abbas, Mohamed

Krzysztof Benedyczak schrieb:
> 
> Hello,
> 
> After some looking into your code, I think there is a bug.
> Please correct me if I'm wrong.
> 
> The problem occur when awake processes which wait for message (or free
> space). I think that your code will wake them up in random order. POSIX
> says:
> 
> > If more than one thread is waiting to send when space becomes
> > available in the message queue and the Priority Scheduling option is
> > supported, then the thread of the highest priority that has been
> > waiting the longest shall be unblocked to send its message
> 
> I've written a test and it shows that my suspects are rather true?
> 
> BTW: I've had some problems with your patch when linking kernel - in your
> main file were used static functions from msg.c?? Maybe my patch (taken
> from lkml - post date: XI 10) was incomplete? If there is more recent
> version could you inform me? Thanks.
> 

The patch included 2 attachments - one was making the 3 function non static.

SuSv3 - mq_receive:
	If more than one thread is waiting to receive a message when a message
arrives at an empty queue and the Priority Scheduling option is supported, then
the thread of highest priority that has been waiting the longest shall be
selected to receive the message. Otherwise, it is unspecified which waiting
thread receives the message.

--- snip ---

We could implement some kind of priority sorted waitqueue.
I read about some prio aware waitqueue patch (from Ingo Molnar) used
by a NGPT developer for the futexes (I.Gonzalez?)

I thought about some waitqueue in the kernel, that at least put
realtime processes at the front of the waitqueue.

It would be nice to have some generic mechanism in the kernel since
only prio aware message read feature does not buy you much, IMHO

I will look in Ingos folder and also for the patch for the futexes.

^ permalink raw reply	[flat|nested] 11+ messages in thread

* RE: [PATCH] unified SysV and POSIX mqueues - complete rewrite
@ 2002-11-19 19:12 Perez-Gonzalez, Inaky
  0 siblings, 0 replies; 11+ messages in thread
From: Perez-Gonzalez, Inaky @ 2002-11-19 19:12 UTC (permalink / raw)
  To: 'Peter Waechtler', Krzysztof Benedyczak
  Cc: Michal Wronski, linux-kernel, Gustafson, Geoffrey R,
	Abbas, Mohamed

[-- Attachment #1: Type: text/plain, Size: 1283 bytes --]


> We could implement some kind of priority sorted waitqueue.
> I read about some prio aware waitqueue patch (from Ingo Molnar) used
> by a NGPT developer for the futexes (I.Gonzalez?)

I am just a contributor to NGPT ... what I did was to apply the
concept in Ingo's O(1) scheduling to the selection of tasks waiting
for the futex event.

> I thought about some waitqueue in the kernel, that at least put
> realtime processes at the front of the waitqueue.
> 
> It would be nice to have some generic mechanism in the kernel since
> only prio aware message read feature does not buy you much, IMHO

I did something like that also for the futexes [priority based futexes].
Check the attached patch vs. 2.5.45 [sorry for not inlining it, but I
am w/ Outlook now and it breaks it].

The nice next step would be to go ahead and create a breed of wait queues
that supported priority-based wake up, but there seemed to be not too
much interest; in any case, we'd need to come out with a sollution that
would not waste 2*sizeof(void*) * NUM_PRIO bytes per wait-queue, but
still being O(1) [and now that we are at asking, if somebody knows where
to buy a brand new Mercedes Benz C320 at 25%, pls email me]

Inaky Perez-Gonzalez -- Not speaking for Intel - opinions are my own [or my
fault]


[-- Attachment #2: prioarray-2.5.45.patch --]
[-- Type: application/octet-stream, Size: 3380 bytes --]

diff -u /dev/null include/linux/prioarray.h:1.1.2.1
--- /dev/null	Tue Nov 19 11:01:07 2002
+++ include/linux/prioarray.h	Tue Oct 15 15:17:29 2002
@@ -0,0 +1,57 @@
+/*
+ * O(1) priority arrays
+ *
+ * Modified from code (C) 2002 Ingo Molnar <mingo@redhat.com> in
+ * sched.c by Iñaky Pérez-González <inaky.perez-gonzalez@intel.com> so
+ * that other parts of the kernel can use the same constructs.
+ */
+
+#ifndef _LINUX_PRIOARRAY_
+#define _LINUX_PRIOARRAY_
+
+        /* This inclusion is kind of recursive ... hmmm */
+
+#include <linux/sched.h>
+
+struct prio_array {
+	int nr_active;
+	unsigned long bitmap[BITMAP_SIZE];
+	struct list_head queue[MAX_PRIO];
+};
+
+typedef struct prio_array prio_array_t;
+
+static inline
+void pa_init (prio_array_t *array)
+{
+        unsigned cnt;
+	array->nr_active = 0;
+        memset (array->bitmap, 0, sizeof (array->bitmap));
+        for (cnt = 0; cnt < MAX_PRIO; cnt++)
+                INIT_LIST_HEAD (&array->queue[cnt]);
+}
+
+/*
+ * Adding/removing a node to/from a priority array:
+ */
+
+static inline
+void pa_dequeue (struct list_head *p, unsigned prio, prio_array_t *array)
+{
+	array->nr_active--;
+	list_del(p);
+	if (list_empty(array->queue + prio))
+		__clear_bit(prio, array->bitmap);
+}
+
+static inline
+void pa_enqueue (struct list_head *p, unsigned prio, prio_array_t *array)
+{
+	list_add_tail(p, array->queue + prio);
+	__set_bit(prio, array->bitmap);
+	array->nr_active++;
+}
+
+
+
+#endif /* #ifndef _LINUX_PRIOARRAY_ */

diff -u include/linux/sched.h:1.1.1.4 include/linux/sched.h:1.1.1.1.2.4
--- include/linux/sched.h:1.1.1.4	Thu Oct 31 15:28:29 2002
+++ include/linux/sched.h	Thu Oct 31 15:36:14 2002
@@ -241,6 +241,9 @@
 #define MAX_RT_PRIO		MAX_USER_RT_PRIO
 
 #define MAX_PRIO		(MAX_RT_PRIO + 40)
+#define BITMAP_SIZE ((((MAX_PRIO+1+7)/8)+sizeof(long)-1)/sizeof(long))
+
+#include <linux/prioarray.h> /* Okay, this is ugly, but needs MAX_PRIO */
  
 /*
  * Some day this will be a full-fledged user tracking system..
@@ -265,7 +268,6 @@
 extern struct user_struct root_user;
 #define INIT_USER (&root_user)
 
-typedef struct prio_array prio_array_t;
 struct backing_dev_info;
 
 struct task_struct {

diff -u kernel/sched.c:1.1.1.3 kernel/sched.c:1.1.1.1.2.2
--- kernel/sched.c:1.1.1.3	Thu Oct 17 13:08:31 2002
+++ kernel/sched.c	Thu Oct 17 13:51:57 2002
@@ -129,15 +129,8 @@
  * These are the runqueue data structures:
  */
 
-#define BITMAP_SIZE ((((MAX_PRIO+1+7)/8)+sizeof(long)-1)/sizeof(long))
-
 typedef struct runqueue runqueue_t;
 
-struct prio_array {
-	int nr_active;
-	unsigned long bitmap[BITMAP_SIZE];
-	struct list_head queue[MAX_PRIO];
-};
 
 /*
  * This is the main, per-CPU runqueue data structure.
@@ -225,17 +218,12 @@
  */
 static inline void dequeue_task(struct task_struct *p, prio_array_t *array)
 {
-	array->nr_active--;
-	list_del(&p->run_list);
-	if (list_empty(array->queue + p->prio))
-		__clear_bit(p->prio, array->bitmap);
+        pa_dequeue (&p->run_list, p->prio, array);
 }
 
 static inline void enqueue_task(struct task_struct *p, prio_array_t *array)
 {
-	list_add_tail(&p->run_list, array->queue + p->prio);
-	__set_bit(p->prio, array->bitmap);
-	array->nr_active++;
+        pa_enqueue (&p->run_list, p->prio, array);
 	p->array = array;
 }
 

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH] unified SysV and POSIX mqueues - complete rewrite
  2002-11-13 15:10 ` [PATCH] unified SysV and POSIX mqueues - complete rewrite Michal Wronski
  2002-11-13 22:25   ` Peter Waechtler
@ 2002-11-24 11:55   ` Michal Wronski
  2002-11-26  9:12     ` Michal Wronski
  1 sibling, 1 reply; 11+ messages in thread
From: Michal Wronski @ 2002-11-24 11:55 UTC (permalink / raw)
  To: linux-kernel
  Cc: Peter Waechtler, Krzysztof Benedyczak, Gustafson, Geoffrey R,
	Abbas, Mohamed


I found a "bug" in your implementation:

+ if (!(q = get_mqueue(filp)))
+ return -EBADF;
+ if ((filp->f_mode & O_ACCMODE) == O_RDONLY)
+ return -EBADF;
+ if ((unsigned int) msg_prio > (unsigned int) MQ_PRIO_MAX)
+ return -EINVAL;
+ if (msg_len > q->attr.mq_msgsize)
+ return -EMSGSIZE;

POSIX says: "The value of msg_prio shall be less than {MQ_PRIO_MAX}."

Michal Wronski


^ permalink raw reply	[flat|nested] 11+ messages in thread

* [PATCH] unified SysV and POSIX mqueues - complete rewrite
@ 2002-11-24 23:05 Peter Waechtler
       [not found] ` <3DE15F9E.A585E26F@digeo.com>
  0 siblings, 1 reply; 11+ messages in thread
From: Peter Waechtler @ 2002-11-24 23:05 UTC (permalink / raw)
  To: linux-kernel

[-- Attachment #1: Type: text/plain, Size: 1150 bytes --]

[part to lkml]
Alan, feel invited to add it to your -ac tree :-)

There are at least 3 attempts to provide POSIX mqueues for Linux:

a) early attempt of Jakub Jelinek based on 2.4-test based on a
filesystem with read/write/ioctl interface - very intrusive to ipc/msg.c

b) http://www.mat.uni.torun.pl/~wrona/posix_ipc
complicated standalone version (against recent 2.4), with 
multiplexed syscall - and great effort to keep track of resources
which is far easier accomplished by vfs

c) based on a filesystem with its own syscalls and clean separation
(but dependence ) on SysV ipc/msg.c
why separate syscalls? you cannot specify the priority of a sent
message without ioctl/fcntl, also problems with timed versions - and
yes there is an outstanding issue with priority aware waitqueues

interface stub and userspace implementation is on
http://homepage.mac.com/pwaechtler/linux/mqueue.tgz

The userspace implementation is not complete. There we have the problems
with the locks on crashing apps (we could use flock), signal safety but what
about the timed versions? Performance is not as good as kernel version!?

following patch is against 2.5.49


[-- Warning: decoded text below may be mangled, UTF-8 assumed --]
[-- Attachment #2: kernel.patch --]
[-- Type: text/x-diff; charset="us-ascii"; name="kernel.patch", Size: 28523 bytes --]

diff -X dontdiff -Nur vanilla-2.5.49/arch/i386/kernel/entry.S linux-2.5.49/arch/i386/kernel/entry.S
--- vanilla-2.5.49/arch/i386/kernel/entry.S	2002-11-23 17:04:27.000000000 +0100
+++ linux-2.5.49/arch/i386/kernel/entry.S	2002-11-23 17:13:38.000000000 +0100
@@ -741,8 +741,19 @@
 	.long sys_epoll_create
 	.long sys_epoll_ctl	/* 255 */
 	.long sys_epoll_wait
- 	.long sys_remap_file_pages
- 	.long sys_set_tid_address
+	.long sys_remap_file_pages
+	.long sys_set_tid_address
+	.long sys_ni_syscall
+	.long sys_ni_syscall	/* 260 */
+	.long sys_ni_syscall
+	.long sys_ni_syscall
+	.long sys_mq_open
+	.long sys_mq_unlink
+	.long sys_mq_timedsend	/* 265 */
+	.long sys_mq_timedreceive
+	.long sys_mq_notify
+	.long sys_mq_getattr
+	.long sys_mq_setattr
 
 
 	.rept NR_syscalls-(.-sys_call_table)/4
diff -X dontdiff -Nur vanilla-2.5.49/include/asm-i386/unistd.h linux-2.5.49/include/asm-i386/unistd.h
--- vanilla-2.5.49/include/asm-i386/unistd.h	2002-11-21 00:18:34.000000000 +0100
+++ linux-2.5.49/include/asm-i386/unistd.h	2002-11-24 01:49:32.000000000 +0100
@@ -261,9 +261,16 @@
 #define __NR_sys_epoll_create	254
 #define __NR_sys_epoll_ctl	255
 #define __NR_sys_epoll_wait	256
-#define __NR_remap_file_pages	257
+#define __NR_remap_file_pages 257
 #define __NR_set_tid_address	258
-
+#define __NR_sys_mq_open	263
+#define __NR_sys_mq_unlink	264
+#define __NR_mq_timedsend	265
+#define __NR_mq_timedreceive	266
+#define __NR_mq_notify	267
+#define __NR_mq_getattr	268
+#define __NR_mq_setattr	269
+  
 
 /* user-visible error numbers are in the range -1 - -124: see <asm-i386/errno.h> */
 
diff -X dontdiff -Nur vanilla-2.5.49/include/linux/mqueue.h linux-2.5.49/include/linux/mqueue.h
--- vanilla-2.5.49/include/linux/mqueue.h	1970-01-01 01:00:00.000000000 +0100
+++ linux-2.5.49/include/linux/mqueue.h	2002-11-24 17:20:20.000000000 +0100
@@ -0,0 +1,49 @@
+#ifndef _LINUX_MQUEUE_H
+#define _LINUX_MQUEUE_H
+
+#define MQ_MAXMSG 	40	/* max number of messages in each queue */
+#define MQ_MAXSYSSIZE	1048576	/* max size that all m.q. can have together */
+#define MQ_PRIO_MAX 	100000	/* max priority */
+
+#define MSGFS_MAGIC 0x4D455347
+#define  _POSIX_MESSAGE_PASSING 1
+
+typedef int mqd_t;		/* message queue descriptor */
+
+struct mq_attr {
+	long mq_flags;		/* message queue flags */
+	long mq_maxmsg;		/* maximum number of messages */
+	long mq_msgsize;	/* maximum message size */
+	long mq_curmsgs;	/* number of messages currently queued */
+};
+
+asmlinkage mqd_t sys_mq_open(const char *u_path, int oflag, mode_t mode,
+	struct mq_attr *u_attr);
+asmlinkage int sys_mq_close(mqd_t mqdes);
+asmlinkage int sys_mq_unlink(const char *u_name);
+asmlinkage int sys_mq_timedsend(mqd_t mqdes, const char *msg_ptr, 
+	size_t msg_len, unsigned int msg_prio, struct timespec *utime);
+asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char *msg_ptr, 
+	size_t msg_len, unsigned int *msg_prio, struct timespec *utime);
+asmlinkage int sys_mq_notify(mqd_t mqdes,
+	const struct sigevent *u_notification);
+asmlinkage int sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat);
+asmlinkage int sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	struct mq_attr *u_omqstat);
+
+#ifdef __KERNEL__
+
+struct mqueue_ds {		/* queue */
+	struct mq_attr attr;
+	struct msg_queue queue;	/* ipc/msg */
+
+	spinlock_t lock;
+	wait_queue_head_t wait_recv;
+	wait_queue_head_t wait_send;
+
+	pid_t notify_pid;	/* who we have to notify (or 0) */
+	struct sigevent notify;	/* notification */
+};
+#endif /* __KERNEL__ */
+
+#endif
diff -X dontdiff -Nur vanilla-2.5.49/include/linux/sys.h linux-2.5.49/include/linux/sys.h
--- vanilla-2.5.49/include/linux/sys.h	2002-11-01 01:15:04.000000000 +0100
+++ linux-2.5.49/include/linux/sys.h	2002-11-01 16:36:48.000000000 +0100
@@ -4,7 +4,7 @@
 /*
  * system call entry points ... but not all are defined
  */
-#define NR_syscalls 260
+#define NR_syscalls 270
 
 /*
  * These are system calls that will be removed at some time
diff -X dontdiff -Nur vanilla-2.5.49/init/Kconfig linux-2.5.49/init/Kconfig
--- vanilla-2.5.49/init/Kconfig	2002-11-21 00:18:37.000000000 +0100
+++ linux-2.5.49/init/Kconfig	2002-11-21 00:26:40.000000000 +0100
@@ -69,6 +69,13 @@
 	  section 6.4 of the Linux Programmer's Guide, available from
 	  <http://www.linuxdoc.org/docs.html#guide>.
 
+config POSIXMSG
+	bool "POSIX message queues"
+	depends on SYSVIPC
+	---help---
+	  This gives you POSIX compliant interfaces for message queues.
+
+
 config BSD_PROCESS_ACCT
 	bool "BSD Process Accounting"
 	help
diff -X dontdiff -Nur vanilla-2.5.49/ipc/Makefile linux-2.5.49/ipc/Makefile
--- vanilla-2.5.49/ipc/Makefile	2002-09-23 15:54:57.000000000 +0200
+++ linux-2.5.49/ipc/Makefile	2002-11-04 17:50:24.000000000 +0100
@@ -6,4 +6,8 @@
 
 obj-$(CONFIG_SYSVIPC) += msg.o sem.o shm.o
 
+ifeq ($(CONFIG_POSIXMSG),y)
+obj-$(CONFIG_SYSVIPC) += posixmsg.o
+endif
+
 include $(TOPDIR)/Rules.make
diff -X dontdiff -Nur vanilla-2.5.49/ipc/msg.c linux-2.5.49/ipc/msg.c
--- vanilla-2.5.49/ipc/msg.c	2002-11-21 00:18:37.000000000 +0100
+++ linux-2.5.49/ipc/msg.c	2002-11-21 00:26:40.000000000 +0100
@@ -127,7 +127,7 @@
 	return msg_buildid(id,msq->q_perm.seq);
 }
 
-static void free_msg(struct msg_msg* msg)
+void free_msg(struct msg_msg* msg)
 {
 	struct msg_msgseg* seg;
 	seg = msg->next;
@@ -139,7 +139,7 @@
 	}
 }
 
-static struct msg_msg* load_msg(void* src, int len)
+struct msg_msg* load_msg(void* src, int len)
 {
 	struct msg_msg* msg;
 	struct msg_msgseg** pseg;
@@ -191,7 +191,7 @@
 	return ERR_PTR(err);
 }
 
-static int store_msg(void* dest, struct msg_msg* msg, int len)
+int store_msg(void* dest, struct msg_msg* msg, int len)
 {
 	int alen;
 	struct msg_msgseg *seg;
diff -X dontdiff -Nur vanilla-2.5.49/ipc/posixmsg.c linux-2.5.49/ipc/posixmsg.c
--- vanilla-2.5.49/ipc/posixmsg.c	1970-01-01 01:00:00.000000000 +0100
+++ linux-2.5.49/ipc/posixmsg.c	2002-11-24 17:27:42.000000000 +0100
@@ -0,0 +1,829 @@
+/*
+ *  linux/ipc/posixmsg.c
+ *
+ *  Copyright 2002 Peter Wächtler <pwaechtler@mac.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * root can mount the filesystem to see the names of the currently
+ * used mqueues. The i_size shows the number of msgs in the queue.
+ * The only shell operation on the "fifos" is an unlink() via rm(1)
+ * The queues are accessed through syscalls - poll is supported
+ *
+ * put MSGFS_MAGIC into mqueue.h
+ * update inode->m_time on send (yes, no posix semantics on this)
+ * update inode->a_time on recv
+ * translate absolute timeouts in kernel to prevent too long sleeps
+ *
+ * TODO:
+ *	check for more sysv msg limits (or add some new, e.g. MQ_PRIO_MAX)?
+ *  implement SIGEV_THREAD with clone_startup(hey, where is it?)
+ *  think about prio based waitqueues: simply enqueue them in order?
+ *   bits/posix_opt.h  claims to support _POSIX_PRIORITY_SCHEDULING
+ */
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/slab.h>
+#include <linux/fs.h>
+#include <linux/mount.h>
+#include <linux/file.h>
+#include <linux/namei.h>
+#include <linux/pagemap.h>	/* PAGE_CACHE_SIZE */
+#include <linux/poll.h>
+
+#include <linux/mqueue.h>
+#include <linux/msg.h>
+#include <asm/uaccess.h>
+
+/* functions used from ipc/msg.c */
+extern void free_msg(struct msg_msg *msg);
+extern int store_msg(void *dest, struct msg_msg *msg, int len);
+extern struct msg_msg *load_msg(void *src, int len);
+
+extern int msg_ctlmnb;		/* default max size of a message queue (all msgs sum up) */
+extern int msg_ctlmni;		/* max # of msg queue identifiers */
+extern int msg_ctlmax;		/* max size of one message (bytes) */
+
+static int mqueue_release(struct inode *inode, struct dentry *dentry);
+static int mqueue_close(struct inode *inode, struct file *filp);
+static unsigned int mqueue_poll(struct file *, struct poll_table_struct *);
+
+static atomic_t msg_bytes = ATOMIC_INIT(0);
+static struct vfsmount *msg_mnt;
+
+static struct file_operations msg_fops = {
+	.llseek = no_llseek,
+	.release = mqueue_close,
+	.poll = mqueue_poll,
+};
+static struct inode_operations msg_dir_inode_operations = {
+	.lookup = simple_lookup,
+	.unlink = mqueue_release,
+};
+
+static struct msg_queue *
+init_queue(struct msg_queue *queue)
+{
+	int retval;
+
+	queue->q_perm.mode = 0;
+	queue->q_perm.key = 0;
+	queue->q_perm.security = NULL;
+	retval = security_ops->msg_queue_alloc_security(queue);
+	if (retval)
+		return ERR_PTR(retval);;
+
+	queue->q_stime = queue->q_rtime = 0;
+	queue->q_qbytes = queue->q_cbytes = queue->q_qnum = 0;
+	queue->q_lspid = queue->q_lrpid = 0;
+	INIT_LIST_HEAD(&queue->q_messages);
+	INIT_LIST_HEAD(&queue->q_receivers);	/* unused */
+	INIT_LIST_HEAD(&queue->q_senders);	/* unused */
+
+	return queue;
+}
+
+static struct inode *
+get_msg_inode(struct super_block *sb, mode_t mode)
+{
+	struct mqueue_ds *q;
+	struct inode *inode = new_inode(sb);
+
+	if (inode) {
+		inode->i_mode = mode;
+		inode->i_uid = current->fsuid;
+		inode->i_gid = current->fsgid;
+		inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
+		inode->i_blksize = 1024;
+		switch (mode & S_IFMT) {
+		default:
+		case S_IFIFO:
+		case S_IFREG:
+			inode->i_fop = &msg_fops;
+			if ((q = kmalloc(sizeof (*q), GFP_KERNEL))) {
+				if (!init_queue(&q->queue)) {
+					kfree(q);
+					iput(inode);
+					inode = ERR_PTR(-EACCES);
+				}
+				inode->u.generic_ip = q;
+			} else {
+				iput(inode);
+				inode = ERR_PTR(-ENOSPC);
+			}
+			break;
+		case S_IFDIR:
+			inode->i_op = &msg_dir_inode_operations;
+			inode->i_fop = &simple_dir_operations;
+			/* directory inodes start off with i_nlink == 2 (for "." entry) */
+			inode->i_nlink++;
+			break;
+		case S_IFLNK:
+			break;
+		}
+	} else
+		inode = ERR_PTR(-ENOSPC);
+
+	return inode;
+}
+
+#define get_mqueue(filp)\
+	((filp) ? filp->f_dentry->d_inode->u.generic_ip: filp)
+
+/* don't use fget() to avoid the fput() for no good reason */
+static struct file *
+mqueue_lookup(mqd_t fd)
+{
+	struct files_struct *files = current->files;
+
+	read_lock(&files->file_lock);
+	if (fd >= files->max_fds || fd < 0)
+		goto out_unlock;
+	read_unlock(&files->file_lock);
+	return files->fd[fd];
+
+      out_unlock:
+	read_unlock(&files->file_lock);
+	return NULL;
+}
+
+static inline int
+freespace(struct mqueue_ds *q, size_t msg_len)
+{
+	int rc;
+	spin_lock(&q->lock);
+
+	rc = msg_len + q->queue.q_cbytes <= q->attr.mq_msgsize &&
+	    q->queue.q_qnum < q->attr.mq_maxmsg;
+
+	spin_unlock(&q->lock);
+	return rc;
+}
+
+/* cleans up after close() or exit() */
+static int
+mqueue_close(struct inode *inode, struct file *filp)
+{
+	struct mqueue_ds *q = inode->u.generic_ip;
+
+	if (!q || !filp)
+		return -EBADFD;
+
+	spin_lock(&q->lock);
+	/* remove possible notification;
+	 * sys_getpid() returns the tgid if multithreaded */
+	if (q->notify_pid == current->pid) {
+		q->notify_pid = 0;
+		q->notify.sigev_signo = 0;
+		q->notify.sigev_notify = 0;
+	}
+	spin_unlock(&q->lock);
+	return 0;
+}
+
+/* removes a queue - inode is the inode of the directory */
+static int
+mqueue_release(struct inode *inode, struct dentry *dentry)
+{
+	struct mqueue_ds *q = dentry->d_inode->u.generic_ip;
+	struct msg_queue *queue;
+	struct list_head *tmp;
+
+	if (!q)
+		return -EBADFD;
+
+	queue = &q->queue;
+
+	tmp = queue->q_messages.next;
+	while (tmp != &queue->q_messages) {
+		struct msg_msg *msg = list_entry(tmp, struct msg_msg, m_list);
+		tmp = tmp->next;
+		free_msg(msg);
+	}
+	atomic_sub(queue->q_cbytes, &msg_bytes);
+	security_ops->msg_queue_free_security(queue);
+	dentry->d_inode->u.generic_ip = NULL;
+
+	kfree(q);
+	return 0;
+}
+
+static unsigned int
+mqueue_poll(struct file *filp, struct poll_table_struct *wait)
+{
+	struct mqueue_ds *q = get_mqueue(filp);
+	int ret = 0;
+
+	poll_wait(filp, &q->wait_recv, wait);
+	poll_wait(filp, &q->wait_send, wait);
+
+	if (q->queue.q_qnum)
+		ret = POLLIN | POLLRDNORM;
+
+	if (q->queue.q_qnum < q->attr.mq_maxmsg)
+		ret |= POLLOUT | POLLWRNORM;
+
+	return ret;
+}
+
+static int
+create_queue(struct dentry *dir, struct qstr *qname,
+	     int oflag, mode_t mode, struct mq_attr *u_attr)
+{
+	int ret, fd;
+	struct file *filp;
+	struct mqueue_ds *q;
+	struct inode *inode;
+	struct dentry *dentry;
+
+	inode = get_msg_inode(msg_mnt->mnt_sb, S_IFIFO | (mode & S_IRWXUGO));
+	if (IS_ERR(inode)) {
+		ret = PTR_ERR(inode);
+		goto out_ret;
+	}
+	q = inode->u.generic_ip;
+
+	if (u_attr != NULL) {
+		if (copy_from_user(&q->attr, u_attr, sizeof (struct mq_attr))) {
+			ret = -EFAULT;
+			goto out_inode;
+		}
+		if (q->attr.mq_maxmsg <= 0
+		    || q->attr.mq_msgsize <= 0
+		    || q->attr.mq_maxmsg > MQ_MAXMSG
+		    || q->attr.mq_msgsize > msg_ctlmax) {
+			ret = -EINVAL;
+			goto out_inode;
+		}
+	} else {		/* implementation defined */
+		q->attr.mq_maxmsg = MQ_MAXMSG;
+		q->attr.mq_msgsize = 1024 /*msg_ctlmax */ ;
+	}
+	q->attr.mq_flags = oflag & O_ACCMODE;
+	q->notify_pid = 0;
+	q->notify.sigev_signo = 0;
+	q->notify.sigev_notify = 0;
+	init_waitqueue_head(&q->wait_send);
+	init_waitqueue_head(&q->wait_recv);
+
+	ret = -ENFILE;
+	if ((fd = get_unused_fd()) < 0)
+		goto out_inode;
+	if (!(filp = get_empty_filp()))
+		goto out_fd;
+
+	qname->hash = full_name_hash(qname->name, qname->len);
+	dentry = d_alloc(dir, qname);
+	if (!dentry) {
+		ret = -ENOMEM;
+		goto out_filp;
+	}
+	d_add(dentry, inode);
+	ret = get_write_access(inode);
+	if (ret)
+		goto out_filp;
+
+	filp->f_vfsmnt = mntget(msg_mnt);
+	filp->f_dentry = dget(dentry);	/* leave it active */
+	filp->f_op = &msg_fops;
+	filp->f_mode = (q->attr.mq_flags + 1) & O_ACCMODE;
+	filp->f_flags = oflag;
+
+	/* Now we map fd to filp, so userspace can access it */
+	fd_install(fd, filp);
+	ret = fd;
+	goto out_ret;
+
+      out_filp:
+	put_filp(filp);
+      out_fd:
+	put_unused_fd(fd);
+      out_inode:
+	kfree(q);
+	iput(inode);
+      out_ret:
+	return ret;
+}
+
+/**
+ *	sys_mq_open	-	opens a message queue associated with @u_name 
+ *	@mqdes: descriptor of mqueue
+ *	@oflag: flags like O_CREAT, O_EXCL, O_RDWR
+ *	@mode: when O_CREAT is specified, the permission bits
+ *	@u_attr: pointer to the attributes, like max msgsize, when creating
+ *
+ *	returns a descriptor to the opened queue or negative value on error
+ */
+asmlinkage mqd_t
+sys_mq_open(const char *u_name, int oflag, mode_t mode, struct mq_attr * u_attr)
+{
+	struct file *filp;
+	struct dentry *dentry;
+	struct qstr this;
+	static int oflag2acc[O_ACCMODE] =
+	    { MAY_READ, MAY_WRITE, MAY_READ | MAY_WRITE };
+	int fd, ret;
+
+	if (IS_ERR(this.name = getname(u_name)))
+		return -ENOMEM;
+	this.len = strlen(this.name);
+	dentry = lookup_one_len(this.name, msg_mnt->mnt_root, this.len);
+
+	if (IS_ERR(dentry)) {
+		ret = PTR_ERR(dentry);
+		goto out_ret;
+	}
+	if (oflag & O_CREAT) {
+		if (dentry->d_inode) {
+			/* entry exists already */
+			if (oflag & O_EXCL) {
+				ret = -EEXIST;
+			} else {
+				goto open_existing;
+			}
+			goto out_dput;
+		} else {
+			ret =
+			    create_queue(msg_mnt->mnt_root, &this, oflag, mode,
+					 u_attr);
+		}
+
+	} else {		/* O_CREAT */
+		if (!dentry->d_inode) {
+			ret = -ENOENT;
+		} else {
+open_existing:
+			if (permission
+			    (dentry->d_inode, oflag2acc[oflag & O_ACCMODE])) {
+				ret = -EACCES;
+			} else {
+				fd = get_unused_fd();
+				if (fd >= 0) {
+					mntget(msg_mnt);
+					filp =
+					    dentry_open(dentry, msg_mnt, oflag);
+					filp->f_op = &msg_fops;
+					if (IS_ERR(filp)) {
+						ret = PTR_ERR(filp);
+						put_unused_fd(fd);
+						goto out_dput;
+					}
+					dget(dentry);
+					fd_install(fd, filp);
+				}
+				ret = fd;
+			}
+		}
+	}
+out_dput:
+	dput(dentry);
+      out_ret:
+	putname(this.name);
+
+	return ret;
+}
+
+/**
+ *	sys_mq_unlink	-	removes a message queue from the namespace 
+ *	
+ *	@u_name: pointer to the name
+ */
+asmlinkage int
+sys_mq_unlink(const char *u_name)
+{
+	int err;
+	struct dentry *dentry, *dir;
+	char *name = getname(u_name);
+
+	if (IS_ERR(name))
+		return PTR_ERR(name);
+
+	dentry = lookup_one_len(name, msg_mnt->mnt_root, strlen(name));
+	putname(name);
+	if (IS_ERR(dentry))
+		return PTR_ERR(dentry);
+
+	if (!dentry->d_inode)
+		return -ENOENT;
+
+	err = -EACCES;
+	dir = dentry->d_parent;
+	if (dir) {
+		err = vfs_unlink(dir->d_inode, dentry);
+		if (!err)
+			d_delete(dentry);
+	}
+	dput(dentry);
+	return err;
+}
+
+static inline long get_timeout( struct timespec *abs)
+{
+	struct timespec t;
+
+	if (abs->tv_nsec >= 1000000000L || abs->tv_nsec < 0 || abs->tv_sec < 0)
+		return -EINVAL;
+	t=current_kernel_time();
+	if (t.tv_sec > abs->tv_sec || 
+	(t.tv_sec == abs->tv_sec && t.tv_nsec > abs->tv_nsec))
+		return -ETIMEDOUT;
+
+	t.tv_sec = abs->tv_sec - t.tv_sec;
+	t.tv_nsec = abs->tv_nsec - t.tv_nsec;
+	if (t.tv_nsec < 0){
+		t.tv_sec--;
+		t.tv_nsec+= 1000000000;
+	}
+	return timespec_to_jiffies(&t) + 1;
+}
+
+/**
+ *	sys_mq_timedsend	-	send a message to the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@msg_ptr: pointer to buffer holding the message
+ *	@msg_len: length of the message
+ *	@msg_prio: the priority of the message
+ *	@utime: if !NULL the function will only block for specified time
+ */
+asmlinkage int
+sys_mq_timedsend(mqd_t mqdes,
+		 const char *msg_ptr, size_t msg_len,
+		 unsigned int msg_prio, struct timespec *utime)
+{
+	struct siginfo sig_i;
+	struct msg_msg *msg;
+	struct list_head *p;
+	struct msg_queue *queue;
+	int err;
+	long timeout;
+	struct timespec ts;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+	if ((filp->f_mode & O_ACCMODE) == O_RDONLY)
+		return -EBADF;
+	if ((unsigned int) msg_prio >= (unsigned int) MQ_PRIO_MAX)
+		return -EINVAL;
+	if (msg_len > q->attr.mq_msgsize)
+		return -EMSGSIZE;
+
+	queue = &q->queue;
+	if ((filp->f_flags & O_NONBLOCK) && !freespace(q, msg_len))
+		return -EAGAIN;
+
+	/* check if this message will exceed overall limit for messages */
+	if (atomic_read(&msg_bytes) + msg_len > MQ_MAXSYSSIZE)
+		return -ENOMEM;
+
+	if (utime) {
+		if (copy_from_user(&ts, utime, sizeof (ts)))
+			return -EFAULT;
+		if ((timeout = get_timeout(&ts))<0)
+			return timeout;
+	} else
+		timeout = 0L;
+
+	msg = load_msg((char *) msg_ptr, msg_len);
+	if (IS_ERR(msg))
+		return PTR_ERR(msg);
+
+	msg->m_type = msg_prio;
+	msg->m_ts = msg_len;
+
+	if (!timeout)
+		timeout = MAX_SCHEDULE_TIMEOUT;
+
+	err = wait_event_interruptible_timeout(q->wait_send,
+					       freespace(q, msg_len), timeout);
+
+	if (err == -ERESTARTSYS)
+		return -EINTR;
+	if (err == 0)
+		return -ETIMEDOUT;
+
+	err = 0;
+	/* if we lose the race for the lock, we can overflow the limits */
+	spin_lock(&q->lock);
+	/* enqueue message in prio order */
+	p = queue->q_messages.next;	/* used as flag if msg was queued */
+	if (msg_prio > 0 && !list_empty(&queue->q_messages)) {
+
+		list_for_each(p, &queue->q_messages) {
+			struct msg_msg *tmp =
+			    list_entry(p, struct msg_msg, m_list);
+			if (tmp->m_type < msg_prio) {
+				list_add_tail(&msg->m_list, p);
+				p = NULL;
+				break;
+			}
+		}
+	}
+	if (p)			/* ok, put it at the end */
+		list_add_tail(&msg->m_list, &queue->q_messages);
+
+	queue->q_lspid = current->pid;
+	queue->q_cbytes += msg_len;
+	atomic_add(msg_len, &msg_bytes);
+	queue->q_qnum++;
+	filp->f_dentry->d_inode->i_size = queue->q_qnum;
+	filp->f_dentry->d_inode->i_mtime = CURRENT_TIME;
+
+	if (waitqueue_active(&q->wait_recv)) {
+		wake_up_interruptible(&q->wait_recv);
+	} else {
+		/* since there was no synchronously waiting process for message
+		 * we notify it when the state of queue changed from
+		 * empty to not empty */
+		if (q->notify_pid != 0 && queue->q_qnum == 1) {
+			/* TODO: Add support for sigev_notify==SIGEV_THREAD
+			 *    should we really create a thread? I think so.
+			 */
+			if (q->notify.sigev_notify == SIGEV_THREAD) {
+				
+				err = -ENOSYS;
+				pr_info
+				    ("mq_*send: SIGEV_THREAD not supported\n");
+			}
+			/* sends signal */
+			if (q->notify.sigev_notify == SIGEV_SIGNAL) {
+				sig_i.si_signo = q->notify.sigev_signo;
+				sig_i.si_errno = 0;
+				sig_i.si_code = SI_MESGQ;
+				sig_i.si_pid = current->pid;
+				sig_i.si_uid = current->uid;
+				kill_proc_info(q->notify.sigev_signo,
+					       &sig_i, q->notify_pid);
+			}
+			/* after notification unregisters process */
+			q->notify_pid = 0;
+		}
+	}
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_timedreceive	-	receive a message from the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@msg_ptr: pointer to buffer to hold the message
+ *	@msg_len: length of the userspace buffer
+ *	@msg_prio: will hold the priority if a message was received
+ *	@utime: if !NULL the function will only block for specified time
+ */
+asmlinkage ssize_t
+sys_mq_timedreceive(mqd_t mqdes,
+		    char *msg_ptr, size_t msg_len,
+		    unsigned int *msg_prio, struct timespec * utime)
+{
+	struct msg_queue *queue;
+	struct msg_msg *msg;
+	int err;
+	long timeout;
+	struct timespec ts;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	if (!(filp->f_mode & FMODE_READ))
+		return -EBADF;
+
+	queue = &q->queue;
+	if ((filp->f_flags & O_NONBLOCK) && queue->q_qnum == 0)
+		return -EAGAIN;
+
+	if (utime) {
+		if (copy_from_user(&ts, utime, sizeof (ts)))
+			return -EFAULT;
+		if ((timeout = get_timeout(&ts))<0)
+			return timeout;
+	} else
+		timeout = 0L;
+wait_on_msg:
+	if (!timeout)
+		timeout = MAX_SCHEDULE_TIMEOUT;
+
+	err = wait_event_interruptible_timeout(q->wait_recv,
+					       queue->q_qnum > 0, timeout);
+
+	if (err == -ERESTARTSYS)
+		return -EINTR;
+	if (err == 0)
+		return -ETIMEDOUT;
+
+	err = 0;
+	spin_lock(&q->lock);
+	if (!list_empty(&queue->q_messages)) {
+
+		msg =
+		    list_entry(queue->q_messages.next, struct msg_msg, m_list);
+		if (msg_len < msg->m_ts) {
+			err = -EMSGSIZE;
+			goto out_unlock;
+		}
+		list_del(&msg->m_list);
+		queue->q_lrpid = current->pid;
+		queue->q_cbytes -= msg->m_ts;
+		atomic_sub(msg->m_ts, &msg_bytes);
+		queue->q_qnum--;
+		filp->f_dentry->d_inode->i_size = queue->q_qnum;
+		filp->f_dentry->d_inode->i_atime = CURRENT_TIME;
+
+		wake_up_interruptible(&q->wait_send);
+
+		msg_len = (msg_len > msg->m_ts) ? msg->m_ts : msg_len;
+
+		spin_unlock(&q->lock);
+		if ((err = store_msg(msg_ptr, msg, msg_len)) ||
+		    put_user(msg->m_type, msg_prio)) {
+			msg_len = -EFAULT;	/* hmh, now the msg is lost */
+		}
+		free_msg(msg);
+		return msg_len;
+	} else {
+		spin_unlock(&q->lock);
+		goto wait_on_msg;
+	}
+      out_unlock:
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_notify	-	set or remove a notification on the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_notification: pointer to struct sigevent 
+ *
+ */
+asmlinkage int
+sys_mq_notify(mqd_t mqdes, const struct sigevent *u_notification)
+{
+	struct sigevent notify;
+	struct inode *inode;
+	struct file *filp = mqueue_lookup(mqdes);
+	struct mqueue_ds *q = get_mqueue(filp);
+	int err = 0;
+
+	if (!q)
+		return -EBADF;
+	if (u_notification != NULL)
+		if (copy_from_user
+		    (&notify, u_notification, sizeof (struct sigevent)))
+			return -EFAULT;
+
+	inode = filp->f_dentry->d_inode;
+	spin_lock(&q->lock);
+	if (q->notify_pid == current->pid
+	    && (u_notification == NULL || notify.sigev_notify == SIGEV_NONE)) {
+		q->notify_pid = 0;	/* remove notification */
+		q->notify.sigev_signo = 0;
+		q->notify.sigev_notify = 0;
+	} else if (q->notify_pid > 0) {
+		err = -EBUSY;
+	} else if (u_notification != NULL) {
+		if (notify.sigev_notify == SIGEV_SIGNAL) {
+			/* add notification */
+			q->notify_pid = current->pid;
+			q->notify.sigev_signo = notify.sigev_signo;
+			q->notify.sigev_notify = notify.sigev_notify;
+		} else if (notify.sigev_notify == SIGEV_THREAD) {
+			err = -ENOSYS;
+			pr_info("mq_*send: SIGEV_THREAD not supported yet\n");
+		}
+	}
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_getattr	-	get the attributes of the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_mqstat: pointer to struct holding the new values
+ *
+ */
+asmlinkage int
+sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat)
+{
+	int err = 0;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	spin_lock(&q->lock);
+	q->attr.mq_flags = (filp->f_mode - 1) & O_ACCMODE;
+	q->attr.mq_flags = (filp->f_flags) & O_NONBLOCK;
+	q->attr.mq_curmsgs = q->queue.q_qnum;
+	if (copy_to_user(u_mqstat, &q->attr, sizeof (struct mq_attr)))
+		err = -EFAULT;
+	spin_lock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_setattr	-	set the attributes of the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_mqstat: pointer to struct holding the new values
+ *	@u_omqstat: pointer to store the original attributes (if !NULL)
+ *
+ */
+asmlinkage int
+sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	       struct mq_attr *u_omqstat)
+{
+	struct mq_attr mqstat;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	spin_lock(&q->lock);
+	if (u_omqstat != NULL) {
+		q->attr.mq_flags = (filp->f_mode - 1) & O_ACCMODE;
+		q->attr.mq_flags = (filp->f_flags) & O_NONBLOCK;
+		q->attr.mq_curmsgs = q->queue.q_qnum;
+		if (copy_to_user(u_omqstat, &q->attr, sizeof (struct mq_attr))) {
+			spin_unlock(&q->lock);
+			return -EFAULT;
+		}
+	}
+	if (copy_from_user(&mqstat, u_mqstat, sizeof (struct mq_attr))) {
+		spin_unlock(&q->lock);
+		return -EFAULT;
+	}
+	if (mqstat.mq_flags & O_NONBLOCK)
+		filp->f_flags |= O_NONBLOCK;
+	else
+		filp->f_flags &= ~O_NONBLOCK;
+
+	spin_unlock(&q->lock);
+	return 0;
+}
+
+static struct super_operations msg_s_ops = {
+	.statfs = simple_statfs,
+	.drop_inode = generic_delete_inode,
+};
+
+static int
+msg_fill_super(struct super_block *sb, void *data, int silent)
+{
+	struct inode *root;
+	struct dentry *root_dentry;
+
+	sb->s_blocksize = PAGE_CACHE_SIZE;
+	sb->s_blocksize_bits = PAGE_CACHE_SHIFT;
+	sb->s_magic = MSGFS_MAGIC;
+	sb->s_op = &msg_s_ops;
+
+	root = get_msg_inode(sb, S_IFDIR | S_IRWXUGO | S_ISVTX);
+	if (!root)
+		goto out;
+	root_dentry = d_alloc_root(root);
+	if (!root_dentry)
+		goto out_iput;
+	sb->s_root = root_dentry;
+	return 0;
+
+      out_iput:
+	iput(root);
+      out:
+	return -ENOMEM;
+}
+
+static struct super_block *
+msgfs_get_sb(struct file_system_type *fs_type,
+	     int flags, char *dev_name, void *data)
+{
+	return get_sb_single(fs_type, flags, data, msg_fill_super);
+}
+
+static struct file_system_type msg_fs_type = {
+	.name = "msgfs",
+	.get_sb = msgfs_get_sb,
+	.kill_sb = kill_anon_super,
+};
+
+static int __init
+mqueue_init(void)
+{
+	register_filesystem(&msg_fs_type);
+	if (IS_ERR(msg_mnt = kern_mount(&msg_fs_type)))
+		return PTR_ERR(msg_mnt);
+
+	return 0;
+}
+
+__initcall(mqueue_init);
diff -X dontdiff -Nur vanilla-2.5.49/ipc/util.c linux-2.5.49/ipc/util.c
--- vanilla-2.5.49/ipc/util.c	2002-11-23 17:04:49.000000000 +0100
+++ linux-2.5.49/ipc/util.c	2002-11-23 17:14:05.000000000 +0100
@@ -24,6 +24,7 @@
 #include <linux/security.h>
 #include <linux/rcupdate.h>
 #include <linux/workqueue.h>
+#include <linux/mqueue.h>
 
 #if defined(CONFIG_SYSVIPC)
 
@@ -585,3 +586,52 @@
 }
 
 #endif /* CONFIG_SYSVIPC */
+
+#if defined(CONFIG_POSIXMSG)
+/* nothing yet */
+#else
+/*
+ * Dummy functions when POSIXMSG isn't configured
+ */
+
+asmlinkage mqd_t sys_mq_open(const char *u_path, int oflag, mode_t mode,
+	struct mq_attr *u_attr)
+{
+	return (mqd_t) -ENOSYS;
+}
+
+asmlinkage int sys_mq_unlink(const char *u_name)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_timedsend(mqd_t mqdes, const char *msg_ptr, 
+	size_t msg_len, unsigned int msg_prio, struct timespec *utime)
+{
+	return -ENOSYS;
+}
+
+asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char *msg_ptr, 
+	size_t msg_len, unsigned int *msg_prio, struct timespec *utime)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_notify(mqd_t mqdes,
+	const struct sigevent *u_notification)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	struct mq_attr *u_omqstat)
+{
+	return -ENOSYS;
+}
+
+#endif				/* CONFIG_POSIXMSG */

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH] unified SysV and POSIX mqueues - complete rewrite
       [not found] ` <3DE15F9E.A585E26F@digeo.com>
@ 2002-11-25  1:02   ` Peter Waechtler
  0 siblings, 0 replies; 11+ messages in thread
From: Peter Waechtler @ 2002-11-25  1:02 UTC (permalink / raw)
  To: Andrew Morton; +Cc: linux-kernel

[-- Attachment #1: Type: text/plain, Size: 342 bytes --]

Am Montag, 25. November 2002 00:24 schrieb Andrew Morton:
> Peter Waechtler wrote:
> > ...
> >    kernel.patch    Type: text/x-diff
>
> It's not legal to call copy_*_user inside spinlock.  You
> sem to dothin in several places.

Yes, I know.

here a fixed version - there was also an error in putting the mqflags
together in setattr/getattr


[-- Attachment #2: kernel.patch --]
[-- Type: text/x-diff, Size: 28463 bytes --]

diff -X dontdiff -Nur vanilla-2.5.49/arch/i386/kernel/entry.S linux-2.5.49/arch/i386/kernel/entry.S
--- vanilla-2.5.49/arch/i386/kernel/entry.S	2002-11-23 17:04:27.000000000 +0100
+++ linux-2.5.49/arch/i386/kernel/entry.S	2002-11-23 17:13:38.000000000 +0100
@@ -741,8 +741,19 @@
 	.long sys_epoll_create
 	.long sys_epoll_ctl	/* 255 */
 	.long sys_epoll_wait
- 	.long sys_remap_file_pages
- 	.long sys_set_tid_address
+	.long sys_remap_file_pages
+	.long sys_set_tid_address
+	.long sys_ni_syscall
+	.long sys_ni_syscall	/* 260 */
+	.long sys_ni_syscall
+	.long sys_ni_syscall
+	.long sys_mq_open
+	.long sys_mq_unlink
+	.long sys_mq_timedsend	/* 265 */
+	.long sys_mq_timedreceive
+	.long sys_mq_notify
+	.long sys_mq_getattr
+	.long sys_mq_setattr
 
 
 	.rept NR_syscalls-(.-sys_call_table)/4
diff -X dontdiff -Nur vanilla-2.5.49/include/asm-i386/unistd.h linux-2.5.49/include/asm-i386/unistd.h
--- vanilla-2.5.49/include/asm-i386/unistd.h	2002-11-21 00:18:34.000000000 +0100
+++ linux-2.5.49/include/asm-i386/unistd.h	2002-11-24 22:31:00.000000000 +0100
@@ -261,9 +261,16 @@
 #define __NR_sys_epoll_create	254
 #define __NR_sys_epoll_ctl	255
 #define __NR_sys_epoll_wait	256
-#define __NR_remap_file_pages	257
+#define __NR_remap_file_pages 257
 #define __NR_set_tid_address	258
-
+#define __NR_sys_mq_open	263
+#define __NR_sys_mq_unlink	264
+#define __NR_mq_timedsend	265
+#define __NR_mq_timedreceive	266
+#define __NR_mq_notify	267
+#define __NR_mq_getattr	268
+#define __NR_mq_setattr	269
+  
 
 /* user-visible error numbers are in the range -1 - -124: see <asm-i386/errno.h> */
 
diff -X dontdiff -Nur vanilla-2.5.49/include/linux/mqueue.h linux-2.5.49/include/linux/mqueue.h
--- vanilla-2.5.49/include/linux/mqueue.h	1970-01-01 01:00:00.000000000 +0100
+++ linux-2.5.49/include/linux/mqueue.h	2002-11-24 17:20:20.000000000 +0100
@@ -0,0 +1,49 @@
+#ifndef _LINUX_MQUEUE_H
+#define _LINUX_MQUEUE_H
+
+#define MQ_MAXMSG 	40	/* max number of messages in each queue */
+#define MQ_MAXSYSSIZE	1048576	/* max size that all m.q. can have together */
+#define MQ_PRIO_MAX 	100000	/* max priority */
+
+#define MSGFS_MAGIC 0x4D455347
+#define  _POSIX_MESSAGE_PASSING 1
+
+typedef int mqd_t;		/* message queue descriptor */
+
+struct mq_attr {
+	long mq_flags;		/* message queue flags */
+	long mq_maxmsg;		/* maximum number of messages */
+	long mq_msgsize;	/* maximum message size */
+	long mq_curmsgs;	/* number of messages currently queued */
+};
+
+asmlinkage mqd_t sys_mq_open(const char *u_path, int oflag, mode_t mode,
+	struct mq_attr *u_attr);
+asmlinkage int sys_mq_close(mqd_t mqdes);
+asmlinkage int sys_mq_unlink(const char *u_name);
+asmlinkage int sys_mq_timedsend(mqd_t mqdes, const char *msg_ptr, 
+	size_t msg_len, unsigned int msg_prio, struct timespec *utime);
+asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char *msg_ptr, 
+	size_t msg_len, unsigned int *msg_prio, struct timespec *utime);
+asmlinkage int sys_mq_notify(mqd_t mqdes,
+	const struct sigevent *u_notification);
+asmlinkage int sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat);
+asmlinkage int sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	struct mq_attr *u_omqstat);
+
+#ifdef __KERNEL__
+
+struct mqueue_ds {		/* queue */
+	struct mq_attr attr;
+	struct msg_queue queue;	/* ipc/msg */
+
+	spinlock_t lock;
+	wait_queue_head_t wait_recv;
+	wait_queue_head_t wait_send;
+
+	pid_t notify_pid;	/* who we have to notify (or 0) */
+	struct sigevent notify;	/* notification */
+};
+#endif /* __KERNEL__ */
+
+#endif
diff -X dontdiff -Nur vanilla-2.5.49/include/linux/sys.h linux-2.5.49/include/linux/sys.h
--- vanilla-2.5.49/include/linux/sys.h	2002-11-01 01:15:04.000000000 +0100
+++ linux-2.5.49/include/linux/sys.h	2002-11-01 16:36:48.000000000 +0100
@@ -4,7 +4,7 @@
 /*
  * system call entry points ... but not all are defined
  */
-#define NR_syscalls 260
+#define NR_syscalls 270
 
 /*
  * These are system calls that will be removed at some time
diff -X dontdiff -Nur vanilla-2.5.49/init/Kconfig linux-2.5.49/init/Kconfig
--- vanilla-2.5.49/init/Kconfig	2002-11-21 00:18:37.000000000 +0100
+++ linux-2.5.49/init/Kconfig	2002-11-21 00:26:40.000000000 +0100
@@ -69,6 +69,13 @@
 	  section 6.4 of the Linux Programmer's Guide, available from
 	  <http://www.linuxdoc.org/docs.html#guide>.
 
+config POSIXMSG
+	bool "POSIX message queues"
+	depends on SYSVIPC
+	---help---
+	  This gives you POSIX compliant interfaces for message queues.
+
+
 config BSD_PROCESS_ACCT
 	bool "BSD Process Accounting"
 	help
diff -X dontdiff -Nur vanilla-2.5.49/ipc/Makefile linux-2.5.49/ipc/Makefile
--- vanilla-2.5.49/ipc/Makefile	2002-09-23 15:54:57.000000000 +0200
+++ linux-2.5.49/ipc/Makefile	2002-11-04 17:50:24.000000000 +0100
@@ -6,4 +6,8 @@
 
 obj-$(CONFIG_SYSVIPC) += msg.o sem.o shm.o
 
+ifeq ($(CONFIG_POSIXMSG),y)
+obj-$(CONFIG_SYSVIPC) += posixmsg.o
+endif
+
 include $(TOPDIR)/Rules.make
diff -X dontdiff -Nur vanilla-2.5.49/ipc/msg.c linux-2.5.49/ipc/msg.c
--- vanilla-2.5.49/ipc/msg.c	2002-11-21 00:18:37.000000000 +0100
+++ linux-2.5.49/ipc/msg.c	2002-11-21 00:26:40.000000000 +0100
@@ -127,7 +127,7 @@
 	return msg_buildid(id,msq->q_perm.seq);
 }
 
-static void free_msg(struct msg_msg* msg)
+void free_msg(struct msg_msg* msg)
 {
 	struct msg_msgseg* seg;
 	seg = msg->next;
@@ -139,7 +139,7 @@
 	}
 }
 
-static struct msg_msg* load_msg(void* src, int len)
+struct msg_msg* load_msg(void* src, int len)
 {
 	struct msg_msg* msg;
 	struct msg_msgseg** pseg;
@@ -191,7 +191,7 @@
 	return ERR_PTR(err);
 }
 
-static int store_msg(void* dest, struct msg_msg* msg, int len)
+int store_msg(void* dest, struct msg_msg* msg, int len)
 {
 	int alen;
 	struct msg_msgseg *seg;
diff -X dontdiff -Nur vanilla-2.5.49/ipc/posixmsg.c linux-2.5.49/ipc/posixmsg.c
--- vanilla-2.5.49/ipc/posixmsg.c	1970-01-01 01:00:00.000000000 +0100
+++ linux-2.5.49/ipc/posixmsg.c	2002-11-25 01:18:27.000000000 +0100
@@ -0,0 +1,828 @@
+/*
+ *  linux/ipc/posixmsg.c
+ *
+ *  Copyright 2002 Peter Wächtler <pwaechtler@mac.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * root can mount the filesystem to see the names of the currently
+ * used mqueues. The i_size shows the number of msgs in the queue.
+ * The only shell operation on the "fifos" is an unlink() via rm(1)
+ * The queues are accessed through syscalls - poll is supported
+ *
+ * put MSGFS_MAGIC into mqueue.h
+ * update inode->m_time on send (yes, no posix semantics on this)
+ * update inode->a_time on recv
+ * translate absolute timeouts in kernel to prevent too long sleeps
+ * corrected spinlocks + [gs]etattr/mq_flags
+ *
+ * TODO:
+ *	check for more sysv msg limits (or add some new, e.g. MQ_PRIO_MAX)?
+ *  implement SIGEV_THREAD with clone_startup(hey, where is it?)
+ *  think about prio based waitqueues: simply enqueue them in order?
+ *   bits/posix_opt.h  claims to support _POSIX_PRIORITY_SCHEDULING
+ */
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/slab.h>
+#include <linux/fs.h>
+#include <linux/mount.h>
+#include <linux/file.h>
+#include <linux/namei.h>
+#include <linux/pagemap.h>	/* PAGE_CACHE_SIZE */
+#include <linux/poll.h>
+
+#include <linux/mqueue.h>
+#include <linux/msg.h>
+#include <asm/uaccess.h>
+
+/* functions used from ipc/msg.c */
+extern void free_msg(struct msg_msg *msg);
+extern int store_msg(void *dest, struct msg_msg *msg, int len);
+extern struct msg_msg *load_msg(void *src, int len);
+
+extern int msg_ctlmnb;		/* default max size of a message queue (all msgs sum up) */
+extern int msg_ctlmni;		/* max # of msg queue identifiers */
+extern int msg_ctlmax;		/* max size of one message (bytes) */
+
+static int mqueue_release(struct inode *inode, struct dentry *dentry);
+static int mqueue_close(struct inode *inode, struct file *filp);
+static unsigned int mqueue_poll(struct file *, struct poll_table_struct *);
+
+static atomic_t msg_bytes = ATOMIC_INIT(0);
+static struct vfsmount *msg_mnt;
+
+static struct file_operations msg_fops = {
+	.llseek = no_llseek,
+	.release = mqueue_close,
+	.poll = mqueue_poll,
+};
+static struct inode_operations msg_dir_inode_operations = {
+	.lookup = simple_lookup,
+	.unlink = mqueue_release,
+};
+
+static struct msg_queue *
+init_queue(struct msg_queue *queue)
+{
+	int retval;
+
+	queue->q_perm.mode = 0;
+	queue->q_perm.key = 0;
+	queue->q_perm.security = NULL;
+	retval = security_ops->msg_queue_alloc_security(queue);
+	if (retval)
+		return ERR_PTR(retval);;
+
+	queue->q_stime = queue->q_rtime = 0;
+	queue->q_qbytes = queue->q_cbytes = queue->q_qnum = 0;
+	queue->q_lspid = queue->q_lrpid = 0;
+	INIT_LIST_HEAD(&queue->q_messages);
+	INIT_LIST_HEAD(&queue->q_receivers);	/* unused */
+	INIT_LIST_HEAD(&queue->q_senders);	/* unused */
+
+	return queue;
+}
+
+static struct inode *
+get_msg_inode(struct super_block *sb, mode_t mode)
+{
+	struct mqueue_ds *q;
+	struct inode *inode = new_inode(sb);
+
+	if (inode) {
+		inode->i_mode = mode;
+		inode->i_uid = current->fsuid;
+		inode->i_gid = current->fsgid;
+		inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
+		inode->i_blksize = 1024;
+		switch (mode & S_IFMT) {
+		default:
+		case S_IFIFO:
+		case S_IFREG:
+			inode->i_fop = &msg_fops;
+			if ((q = kmalloc(sizeof (*q), GFP_KERNEL))) {
+				if (!init_queue(&q->queue)) {
+					kfree(q);
+					iput(inode);
+					inode = ERR_PTR(-EACCES);
+				}
+				inode->u.generic_ip = q;
+			} else {
+				iput(inode);
+				inode = ERR_PTR(-ENOSPC);
+			}
+			break;
+		case S_IFDIR:
+			inode->i_op = &msg_dir_inode_operations;
+			inode->i_fop = &simple_dir_operations;
+			/* directory inodes start off with i_nlink == 2 (for "." entry) */
+			inode->i_nlink++;
+			break;
+		case S_IFLNK:
+			break;
+		}
+	} else
+		inode = ERR_PTR(-ENOSPC);
+
+	return inode;
+}
+
+#define get_mqueue(filp)\
+	((filp) ? filp->f_dentry->d_inode->u.generic_ip: filp)
+
+/* don't use fget() to avoid the fput() for no good reason */
+static struct file *
+mqueue_lookup(mqd_t fd)
+{
+	struct files_struct *files = current->files;
+
+	read_lock(&files->file_lock);
+	if (fd >= files->max_fds || fd < 0)
+		goto out_unlock;
+	read_unlock(&files->file_lock);
+	return files->fd[fd];
+
+      out_unlock:
+	read_unlock(&files->file_lock);
+	return NULL;
+}
+
+static inline int
+freespace(struct mqueue_ds *q, size_t msg_len)
+{
+	int rc;
+	spin_lock(&q->lock);
+
+	rc = msg_len + q->queue.q_cbytes <= q->attr.mq_msgsize &&
+	    q->queue.q_qnum < q->attr.mq_maxmsg;
+
+	spin_unlock(&q->lock);
+	return rc;
+}
+
+/* cleans up after close() or exit() */
+static int
+mqueue_close(struct inode *inode, struct file *filp)
+{
+	struct mqueue_ds *q = inode->u.generic_ip;
+
+	if (!q || !filp)
+		return -EBADFD;
+
+	spin_lock(&q->lock);
+	/* remove possible notification;
+	 * sys_getpid() returns the tgid if multithreaded */
+	if (q->notify_pid == current->pid) {
+		q->notify_pid = 0;
+		q->notify.sigev_signo = 0;
+		q->notify.sigev_notify = 0;
+	}
+	spin_unlock(&q->lock);
+	return 0;
+}
+
+/* removes a queue - inode is the inode of the directory */
+static int
+mqueue_release(struct inode *inode, struct dentry *dentry)
+{
+	struct mqueue_ds *q = dentry->d_inode->u.generic_ip;
+	struct msg_queue *queue;
+	struct list_head *tmp;
+
+	if (!q)
+		return -EBADFD;
+
+	queue = &q->queue;
+
+	tmp = queue->q_messages.next;
+	while (tmp != &queue->q_messages) {
+		struct msg_msg *msg = list_entry(tmp, struct msg_msg, m_list);
+		tmp = tmp->next;
+		free_msg(msg);
+	}
+	atomic_sub(queue->q_cbytes, &msg_bytes);
+	security_ops->msg_queue_free_security(queue);
+	dentry->d_inode->u.generic_ip = NULL;
+
+	kfree(q);
+	return 0;
+}
+
+static unsigned int
+mqueue_poll(struct file *filp, struct poll_table_struct *wait)
+{
+	struct mqueue_ds *q = get_mqueue(filp);
+	int ret = 0;
+
+	poll_wait(filp, &q->wait_recv, wait);
+	poll_wait(filp, &q->wait_send, wait);
+
+	if (q->queue.q_qnum)
+		ret = POLLIN | POLLRDNORM;
+
+	if (q->queue.q_qnum < q->attr.mq_maxmsg)
+		ret |= POLLOUT | POLLWRNORM;
+
+	return ret;
+}
+
+static int
+create_queue(struct dentry *dir, struct qstr *qname,
+	     int oflag, mode_t mode, struct mq_attr *u_attr)
+{
+	int ret, fd;
+	struct file *filp;
+	struct mqueue_ds *q;
+	struct inode *inode;
+	struct dentry *dentry;
+
+	inode = get_msg_inode(msg_mnt->mnt_sb, S_IFIFO | (mode & S_IRWXUGO));
+	if (IS_ERR(inode)) {
+		ret = PTR_ERR(inode);
+		goto out_ret;
+	}
+	q = inode->u.generic_ip;
+
+	if (u_attr != NULL) {
+		if (copy_from_user(&q->attr, u_attr, sizeof (struct mq_attr))) {
+			ret = -EFAULT;
+			goto out_inode;
+		}
+		if (q->attr.mq_maxmsg <= 0
+		    || q->attr.mq_msgsize <= 0
+		    || q->attr.mq_maxmsg > MQ_MAXMSG
+		    || q->attr.mq_msgsize > msg_ctlmax) {
+			ret = -EINVAL;
+			goto out_inode;
+		}
+	} else {		/* implementation defined */
+		q->attr.mq_maxmsg = MQ_MAXMSG;
+		q->attr.mq_msgsize = 1024 /*msg_ctlmax */ ;
+	}
+	q->attr.mq_flags = oflag & O_ACCMODE;
+	q->notify_pid = 0;
+	q->notify.sigev_signo = 0;
+	q->notify.sigev_notify = 0;
+	init_waitqueue_head(&q->wait_send);
+	init_waitqueue_head(&q->wait_recv);
+
+	ret = -ENFILE;
+	if ((fd = get_unused_fd()) < 0)
+		goto out_inode;
+	if (!(filp = get_empty_filp()))
+		goto out_fd;
+
+	qname->hash = full_name_hash(qname->name, qname->len);
+	dentry = d_alloc(dir, qname);
+	if (!dentry) {
+		ret = -ENOMEM;
+		goto out_filp;
+	}
+	d_add(dentry, inode);
+	ret = get_write_access(inode);
+	if (ret)
+		goto out_filp;
+
+	filp->f_vfsmnt = mntget(msg_mnt);
+	filp->f_dentry = dget(dentry);	/* leave it active */
+	filp->f_op = &msg_fops;
+	filp->f_mode = (q->attr.mq_flags + 1) & O_ACCMODE;
+	filp->f_flags = oflag;
+
+	/* Now we map fd to filp, so userspace can access it */
+	fd_install(fd, filp);
+	ret = fd;
+	goto out_ret;
+
+      out_filp:
+	put_filp(filp);
+      out_fd:
+	put_unused_fd(fd);
+      out_inode:
+	kfree(q);
+	iput(inode);
+      out_ret:
+	return ret;
+}
+
+/**
+ *	sys_mq_open	-	opens a message queue associated with @u_name 
+ *	@mqdes: descriptor of mqueue
+ *	@oflag: flags like O_CREAT, O_EXCL, O_RDWR
+ *	@mode: when O_CREAT is specified, the permission bits
+ *	@u_attr: pointer to the attributes, like max msgsize, when creating
+ *
+ *	returns a descriptor to the opened queue or negative value on error
+ */
+asmlinkage mqd_t
+sys_mq_open(const char *u_name, int oflag, mode_t mode, struct mq_attr * u_attr)
+{
+	struct file *filp;
+	struct dentry *dentry;
+	struct qstr this;
+	static int oflag2acc[O_ACCMODE] =
+	    { MAY_READ, MAY_WRITE, MAY_READ | MAY_WRITE };
+	int fd, ret;
+
+	if (IS_ERR(this.name = getname(u_name)))
+		return -ENOMEM;
+	this.len = strlen(this.name);
+	dentry = lookup_one_len(this.name, msg_mnt->mnt_root, this.len);
+
+	if (IS_ERR(dentry)) {
+		ret = PTR_ERR(dentry);
+		goto out_ret;
+	}
+	if (oflag & O_CREAT) {
+		if (dentry->d_inode) {
+			/* entry exists already */
+			if (oflag & O_EXCL) {
+				ret = -EEXIST;
+			} else {
+				goto open_existing;
+			}
+			goto out_dput;
+		} else {
+			ret =
+			    create_queue(msg_mnt->mnt_root, &this, oflag, mode,
+					 u_attr);
+		}
+
+	} else {		/* O_CREAT */
+		if (!dentry->d_inode) {
+			ret = -ENOENT;
+		} else {
+open_existing:
+			if (permission
+			    (dentry->d_inode, oflag2acc[oflag & O_ACCMODE])) {
+				ret = -EACCES;
+			} else {
+				fd = get_unused_fd();
+				if (fd >= 0) {
+					mntget(msg_mnt);
+					filp =
+					    dentry_open(dentry, msg_mnt, oflag);
+					filp->f_op = &msg_fops;
+					if (IS_ERR(filp)) {
+						ret = PTR_ERR(filp);
+						put_unused_fd(fd);
+						goto out_dput;
+					}
+					dget(dentry);
+					fd_install(fd, filp);
+				}
+				ret = fd;
+			}
+		}
+	}
+out_dput:
+	dput(dentry);
+      out_ret:
+	putname(this.name);
+
+	return ret;
+}
+
+/**
+ *	sys_mq_unlink	-	removes a message queue from the namespace 
+ *	
+ *	@u_name: pointer to the name
+ */
+asmlinkage int
+sys_mq_unlink(const char *u_name)
+{
+	int err;
+	struct dentry *dentry, *dir;
+	char *name = getname(u_name);
+
+	if (IS_ERR(name))
+		return PTR_ERR(name);
+
+	dentry = lookup_one_len(name, msg_mnt->mnt_root, strlen(name));
+	putname(name);
+	if (IS_ERR(dentry))
+		return PTR_ERR(dentry);
+
+	if (!dentry->d_inode)
+		return -ENOENT;
+
+	err = -EACCES;
+	dir = dentry->d_parent;
+	if (dir) {
+		err = vfs_unlink(dir->d_inode, dentry);
+		if (!err)
+			d_delete(dentry);
+	}
+	dput(dentry);
+	return err;
+}
+
+static inline long get_timeout( struct timespec *abs)
+{
+	struct timespec t;
+
+	if (abs->tv_nsec >= 1000000000L || abs->tv_nsec < 0 || abs->tv_sec < 0)
+		return -EINVAL;
+	t=current_kernel_time();
+	if (t.tv_sec > abs->tv_sec || 
+	(t.tv_sec == abs->tv_sec && t.tv_nsec > abs->tv_nsec))
+		return -ETIMEDOUT;
+
+	t.tv_sec = abs->tv_sec - t.tv_sec;
+	t.tv_nsec = abs->tv_nsec - t.tv_nsec;
+	if (t.tv_nsec < 0){
+		t.tv_sec--;
+		t.tv_nsec+= 1000000000;
+	}
+	return timespec_to_jiffies(&t) + 1;
+}
+
+/**
+ *	sys_mq_timedsend	-	send a message to the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@msg_ptr: pointer to buffer holding the message
+ *	@msg_len: length of the message
+ *	@msg_prio: the priority of the message
+ *	@utime: if !NULL the function will only block for specified time
+ */
+asmlinkage int
+sys_mq_timedsend(mqd_t mqdes,
+		 const char *msg_ptr, size_t msg_len,
+		 unsigned int msg_prio, struct timespec *utime)
+{
+	struct siginfo sig_i;
+	struct msg_msg *msg;
+	struct list_head *p;
+	struct msg_queue *queue;
+	int err;
+	long timeout;
+	struct timespec ts;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+	if ((filp->f_mode & O_ACCMODE) == O_RDONLY)
+		return -EBADF;
+	if ((unsigned int) msg_prio >= (unsigned int) MQ_PRIO_MAX)
+		return -EINVAL;
+	if (msg_len > q->attr.mq_msgsize)
+		return -EMSGSIZE;
+
+	queue = &q->queue;
+	if ((filp->f_flags & O_NONBLOCK) && !freespace(q, msg_len))
+		return -EAGAIN;
+
+	/* check if this message will exceed overall limit for messages */
+	if (atomic_read(&msg_bytes) + msg_len > MQ_MAXSYSSIZE)
+		return -ENOMEM;
+
+	if (utime) {
+		if (copy_from_user(&ts, utime, sizeof (ts)))
+			return -EFAULT;
+		if ((timeout = get_timeout(&ts))<0)
+			return timeout;
+	} else
+		timeout = 0L;
+
+	msg = load_msg((char *) msg_ptr, msg_len);
+	if (IS_ERR(msg))
+		return PTR_ERR(msg);
+
+	msg->m_type = msg_prio;
+	msg->m_ts = msg_len;
+
+	if (!timeout)
+		timeout = MAX_SCHEDULE_TIMEOUT;
+
+	err = wait_event_interruptible_timeout(q->wait_send,
+					       freespace(q, msg_len), timeout);
+
+	if (err == -ERESTARTSYS)
+		return -EINTR;
+	if (err == 0)
+		return -ETIMEDOUT;
+
+	err = 0;
+	/* if we lose the race for the lock, we can overflow the limits */
+	spin_lock(&q->lock);
+	/* enqueue message in prio order */
+	p = queue->q_messages.next;	/* used as flag if msg was queued */
+	if (msg_prio > 0 && !list_empty(&queue->q_messages)) {
+
+		list_for_each(p, &queue->q_messages) {
+			struct msg_msg *tmp =
+			    list_entry(p, struct msg_msg, m_list);
+			if (tmp->m_type < msg_prio) {
+				list_add_tail(&msg->m_list, p);
+				p = NULL;
+				break;
+			}
+		}
+	}
+	if (p)			/* ok, put it at the end */
+		list_add_tail(&msg->m_list, &queue->q_messages);
+
+	queue->q_lspid = current->pid;
+	queue->q_cbytes += msg_len;
+	atomic_add(msg_len, &msg_bytes);
+	queue->q_qnum++;
+	filp->f_dentry->d_inode->i_size = queue->q_qnum;
+	filp->f_dentry->d_inode->i_mtime = CURRENT_TIME;
+
+	if (waitqueue_active(&q->wait_recv)) {
+		wake_up_interruptible(&q->wait_recv);
+	} else {
+		/* since there was no synchronously waiting process for message
+		 * we notify it when the state of queue changed from
+		 * empty to not empty */
+		if (q->notify_pid != 0 && queue->q_qnum == 1) {
+			/* TODO: Add support for sigev_notify==SIGEV_THREAD
+			 *    should we really create a thread? I think so.
+			 */
+			if (q->notify.sigev_notify == SIGEV_THREAD) {
+				
+				err = -ENOSYS;
+				pr_info("mq_*send: SIGEV_THREAD not supported\n");
+			}
+			/* sends signal */
+			if (q->notify.sigev_notify == SIGEV_SIGNAL) {
+				sig_i.si_signo = q->notify.sigev_signo;
+				sig_i.si_errno = 0;
+				sig_i.si_code = SI_MESGQ;
+				sig_i.si_pid = current->pid;
+				sig_i.si_uid = current->uid;
+				kill_proc_info(q->notify.sigev_signo,
+					       &sig_i, q->notify_pid);
+			}
+			/* after notification unregisters process */
+			q->notify_pid = 0;
+		}
+	}
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_timedreceive	-	receive a message from the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@msg_ptr: pointer to buffer to hold the message
+ *	@msg_len: length of the userspace buffer
+ *	@msg_prio: will hold the priority if a message was received
+ *	@utime: if !NULL the function will only block for specified time
+ */
+asmlinkage ssize_t
+sys_mq_timedreceive(mqd_t mqdes,
+		    char *msg_ptr, size_t msg_len,
+		    unsigned int *msg_prio, struct timespec * utime)
+{
+	struct msg_queue *queue;
+	struct msg_msg *msg;
+	int err;
+	long timeout;
+	struct timespec ts;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	if (!(filp->f_mode & FMODE_READ))
+		return -EBADF;
+
+	queue = &q->queue;
+	if ((filp->f_flags & O_NONBLOCK) && queue->q_qnum == 0)
+		return -EAGAIN;
+
+	if (utime) {
+		if (copy_from_user(&ts, utime, sizeof (ts)))
+			return -EFAULT;
+		if ((timeout = get_timeout(&ts))<0)
+			return timeout;
+	} else
+		timeout = 0L;
+wait_on_msg:
+	if (!timeout)
+		timeout = MAX_SCHEDULE_TIMEOUT;
+
+	err = wait_event_interruptible_timeout(q->wait_recv,
+					       queue->q_qnum > 0, timeout);
+
+	if (err == -ERESTARTSYS)
+		return -EINTR;
+	if (err == 0)
+		return -ETIMEDOUT;
+
+	err = 0;
+	spin_lock(&q->lock);
+	if (!list_empty(&queue->q_messages)) {
+
+		msg =
+		    list_entry(queue->q_messages.next, struct msg_msg, m_list);
+		if (msg_len < msg->m_ts) {
+			err = -EMSGSIZE;
+			goto out_unlock;
+		}
+		list_del(&msg->m_list);
+		queue->q_lrpid = current->pid;
+		queue->q_cbytes -= msg->m_ts;
+		atomic_sub(msg->m_ts, &msg_bytes);
+		queue->q_qnum--;
+		filp->f_dentry->d_inode->i_size = queue->q_qnum;
+		filp->f_dentry->d_inode->i_atime = CURRENT_TIME;
+		/* msg is removed from list, we're already safe */
+		spin_unlock(&q->lock);
+
+		wake_up_interruptible(&q->wait_send);
+
+		msg_len = (msg_len > msg->m_ts) ? msg->m_ts : msg_len;
+
+		if ((err = store_msg(msg_ptr, msg, msg_len)) ||
+		    put_user(msg->m_type, msg_prio)) {
+			msg_len = -EFAULT;	/* hmh, now the msg is lost */
+		}
+		free_msg(msg);
+		return msg_len;
+	} else {
+		spin_unlock(&q->lock);
+		goto wait_on_msg;
+	}
+out_unlock:
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_notify	-	set or remove a notification on the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_notification: pointer to struct sigevent 
+ *
+ */
+asmlinkage int
+sys_mq_notify(mqd_t mqdes, const struct sigevent *u_notification)
+{
+	struct sigevent notify;
+	struct inode *inode;
+	struct file *filp = mqueue_lookup(mqdes);
+	struct mqueue_ds *q = get_mqueue(filp);
+	int err = 0;
+
+	if (!q)
+		return -EBADF;
+	if (u_notification != NULL)
+		if (copy_from_user
+		    (&notify, u_notification, sizeof (struct sigevent)))
+			return -EFAULT;
+
+	inode = filp->f_dentry->d_inode;
+	spin_lock(&q->lock);
+	if (q->notify_pid == current->pid
+	    && (u_notification == NULL || notify.sigev_notify == SIGEV_NONE)) {
+		q->notify_pid = 0;	/* remove notification */
+		q->notify.sigev_signo = 0;
+		q->notify.sigev_notify = 0;
+	} else if (q->notify_pid > 0) {
+		err = -EBUSY;
+	} else if (u_notification != NULL) {
+		if (notify.sigev_notify == SIGEV_SIGNAL) {
+			/* add notification */
+			q->notify_pid = current->pid;
+			q->notify.sigev_signo = notify.sigev_signo;
+			q->notify.sigev_notify = notify.sigev_notify;
+		} else if (notify.sigev_notify == SIGEV_THREAD) {
+			err = -ENOSYS;
+			pr_info("mq_*send: SIGEV_THREAD not supported yet\n");
+		}
+	}
+	spin_unlock(&q->lock);
+	return err;
+}
+
+static inline void 
+fill_flags(struct mqueue_ds *q, struct file *filp)
+{
+	spin_lock(&q->lock);
+	q->attr.mq_flags = (filp->f_mode - 1) & O_ACCMODE;
+	q->attr.mq_flags |= (filp->f_flags) & O_NONBLOCK;
+	q->attr.mq_curmsgs = q->queue.q_qnum;
+	spin_unlock(&q->lock);
+}
+
+/**
+ *	sys_mq_getattr	-	get the attributes of the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_mqstat: pointer to struct holding the new values
+ *
+ */
+asmlinkage int
+sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat)
+{
+	int err = 0;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	fill_flags(q,filp);
+	if (copy_to_user(u_mqstat, &q->attr, sizeof (struct mq_attr)))
+		err = -EFAULT;
+	return err;
+}
+
+/**
+ *	sys_mq_setattr	-	set the attributes of the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_mqstat: pointer to struct holding the new values
+ *	@u_omqstat: pointer to store the original attributes (if !NULL)
+ *
+ */
+asmlinkage int
+sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	       struct mq_attr *u_omqstat)
+{
+	struct mq_attr mqstat;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	if (u_omqstat != NULL) {
+		fill_flags(q,filp);
+		if (copy_to_user(u_omqstat, &q->attr, sizeof (struct mq_attr)))
+			return -EFAULT;
+	}
+	if (copy_from_user(&mqstat, u_mqstat, sizeof (struct mq_attr)))
+		return -EFAULT;
+	if (mqstat.mq_flags & O_NONBLOCK)
+		filp->f_flags |= O_NONBLOCK;
+	else
+		filp->f_flags &= ~O_NONBLOCK;
+
+	return 0;
+}
+
+static struct super_operations msg_s_ops = {
+	.statfs = simple_statfs,
+	.drop_inode = generic_delete_inode,
+};
+
+static int
+msg_fill_super(struct super_block *sb, void *data, int silent)
+{
+	struct inode *root;
+	struct dentry *root_dentry;
+
+	sb->s_blocksize = PAGE_CACHE_SIZE;
+	sb->s_blocksize_bits = PAGE_CACHE_SHIFT;
+	sb->s_magic = MSGFS_MAGIC;
+	sb->s_op = &msg_s_ops;
+
+	root = get_msg_inode(sb, S_IFDIR | S_IRWXUGO | S_ISVTX);
+	if (!root)
+		goto out;
+	root_dentry = d_alloc_root(root);
+	if (!root_dentry)
+		goto out_iput;
+	sb->s_root = root_dentry;
+	return 0;
+
+out_iput:
+	iput(root);
+out:
+	return -ENOMEM;
+}
+
+static struct super_block *
+msgfs_get_sb(struct file_system_type *fs_type,
+	     int flags, char *dev_name, void *data)
+{
+	return get_sb_single(fs_type, flags, data, msg_fill_super);
+}
+
+static struct file_system_type msg_fs_type = {
+	.name = "msgfs",
+	.get_sb = msgfs_get_sb,
+	.kill_sb = kill_anon_super,
+};
+
+static int __init
+mqueue_init(void)
+{
+	register_filesystem(&msg_fs_type);
+	if (IS_ERR(msg_mnt = kern_mount(&msg_fs_type)))
+		return PTR_ERR(msg_mnt);
+
+	return 0;
+}
+
+__initcall(mqueue_init);
diff -X dontdiff -Nur vanilla-2.5.49/ipc/util.c linux-2.5.49/ipc/util.c
--- vanilla-2.5.49/ipc/util.c	2002-11-23 17:04:49.000000000 +0100
+++ linux-2.5.49/ipc/util.c	2002-11-23 17:14:05.000000000 +0100
@@ -24,6 +24,7 @@
 #include <linux/security.h>
 #include <linux/rcupdate.h>
 #include <linux/workqueue.h>
+#include <linux/mqueue.h>
 
 #if defined(CONFIG_SYSVIPC)
 
@@ -585,3 +586,52 @@
 }
 
 #endif /* CONFIG_SYSVIPC */
+
+#if defined(CONFIG_POSIXMSG)
+/* nothing yet */
+#else
+/*
+ * Dummy functions when POSIXMSG isn't configured
+ */
+
+asmlinkage mqd_t sys_mq_open(const char *u_path, int oflag, mode_t mode,
+	struct mq_attr *u_attr)
+{
+	return (mqd_t) -ENOSYS;
+}
+
+asmlinkage int sys_mq_unlink(const char *u_name)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_timedsend(mqd_t mqdes, const char *msg_ptr, 
+	size_t msg_len, unsigned int msg_prio, struct timespec *utime)
+{
+	return -ENOSYS;
+}
+
+asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char *msg_ptr, 
+	size_t msg_len, unsigned int *msg_prio, struct timespec *utime)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_notify(mqd_t mqdes,
+	const struct sigevent *u_notification)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	struct mq_attr *u_omqstat)
+{
+	return -ENOSYS;
+}
+
+#endif				/* CONFIG_POSIXMSG */

^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH] unified SysV and POSIX mqueues - complete rewrite
  2002-11-24 11:55   ` Michal Wronski
@ 2002-11-26  9:12     ` Michal Wronski
  2002-11-26 11:16       ` Peter Waechtler
  0 siblings, 1 reply; 11+ messages in thread
From: Michal Wronski @ 2002-11-26  9:12 UTC (permalink / raw)
  To: linux-kernel; +Cc: Peter Waechtler



I have a few remarks/questions:

1. I can't find unregister_filesystem in your patch
2. You have different MQ_PRIO_MAX in library and patch.
3. Does mq_unlink work in a proper way?

Michal Wronski


^ permalink raw reply	[flat|nested] 11+ messages in thread

* Re: [PATCH] unified SysV and POSIX mqueues - complete rewrite
  2002-11-26  9:12     ` Michal Wronski
@ 2002-11-26 11:16       ` Peter Waechtler
  0 siblings, 0 replies; 11+ messages in thread
From: Peter Waechtler @ 2002-11-26 11:16 UTC (permalink / raw)
  To: Michal Wronski, linux-kernel

Am Dienstag, 26. November 2002 10:12 schrieb Michal Wronski:
> I have a few remarks/questions:
>
> 1. I can't find unregister_filesystem in your patch

I removed the unused code. I can't have a module
with it's own syscalls - posixmsg can't be a module for that reason.

> 2. You have different MQ_PRIO_MAX in library and patch.

Umh, the value is really arbitrary. Could be something like MAX_INT -1

> 3. Does mq_unlink work in a proper way?

I do think so. Did you test it and found a bug?
The vfs keeps track of a reference count. Only when the usage count
of the inode drops to zero, the mqueue_release is called. 
I tested it and it worked.
Even when there is a process in mq_receive() waiting, only the name
is removed as you expect it with unix filesystem semantics. SuSv3
explicitly allows that:

"Calls to mq_open() to recreate the message queue may fail until the message 
queue is actually removed. However, the mq_unlink() call need not block until 
all references have been closed; it may return immediately."



^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2002-11-26 11:04 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
     [not found] <EDC461A30AC4D511ADE10002A5072CAD04C70992@orsmsx119.jf.intel.com>
2002-11-13 15:10 ` [PATCH] unified SysV and POSIX mqueues - complete rewrite Michal Wronski
2002-11-13 22:25   ` Peter Waechtler
2002-11-19 11:28     ` Krzysztof Benedyczak
2002-11-19 16:42       ` Peter Waechtler
2002-11-24 11:55   ` Michal Wronski
2002-11-26  9:12     ` Michal Wronski
2002-11-26 11:16       ` Peter Waechtler
2002-11-24 23:05 Peter Waechtler
     [not found] ` <3DE15F9E.A585E26F@digeo.com>
2002-11-25  1:02   ` Peter Waechtler
  -- strict thread matches above, loose matches on Subject: below --
2002-11-19 19:12 Perez-Gonzalez, Inaky
2002-11-10 23:44 Peter Waechtler

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox