public inbox for linux-kernel@vger.kernel.org
 help / color / mirror / Atom feed
* [PATCH] unified SysV and POSIX mqueues - complete rewrite
@ 2002-11-10 23:44 Peter Waechtler
  0 siblings, 0 replies; 11+ messages in thread
From: Peter Waechtler @ 2002-11-10 23:44 UTC (permalink / raw)
  To: linux-kernel; +Cc: torvalds

[-- Attachment #1: Type: text/plain, Size: 959 bytes --]

I completely rewrote the unified SysV and Posix mqueue patch
from Jakub Jelinek.

It adds a new msgfs filesystem that's also mountable
to lookup the names and to unlink() them.
The size of the fifo shows the number of msgs in the queue.
The interface boils down to 7 new syscalls (for now just i386):
- sys_mq_open
- sys_mq_unlink
- sys_mq_timedsend
- sys_mq_timedreceive
- sys_mq_notify
- sys_mq_getattr
- sys_mq_setattr

mq_close is a simple sys_close - the FD is pollable (enhancement)

The change to ipc/msg.c is minimal - just make
- load_msg
- store_msg
- free_msg
accessible (not static).

Now that I've missed the feature freeze - I want to argue that it's
not a new feature - it's a new interface just using already
available features :-)

userspace lib and test progs are on
http://homepage.mac.com/pwaechtler/linux/mqueue/

I'll pipe some code to Ulrich Drepper for inclusion to glibc.
(Linux interface and userspace impl. for all other platforms).

[-- Attachment #2: ipcmsg-for-posixmsg.diff --]
[-- Type: text/plain, Size: 784 bytes --]

diff -Nur -X dontdiff vanilla-2.5.46/ipc/msg.c linux-2.5.46/ipc/msg.c
--- vanilla-2.5.46/ipc/msg.c	2002-11-08 15:10:06.000000000 +0100
+++ linux-2.5.46/ipc/msg.c	2002-11-10 23:38:32.000000000 +0100
@@ -127,7 +127,7 @@
 	return msg_buildid(id,msq->q_perm.seq);
 }
 
-static void free_msg(struct msg_msg* msg)
+void free_msg(struct msg_msg* msg)
 {
 	struct msg_msgseg* seg;
 	seg = msg->next;
@@ -139,7 +139,7 @@
 	}
 }
 
-static struct msg_msg* load_msg(void* src, int len)
+struct msg_msg* load_msg(void* src, int len)
 {
 	struct msg_msg* msg;
 	struct msg_msgseg** pseg;
@@ -191,7 +191,7 @@
 	return ERR_PTR(err);
 }
 
-static int store_msg(void* dest, struct msg_msg* msg, int len)
+int store_msg(void* dest, struct msg_msg* msg, int len)
 {
 	int alen;
 	struct msg_msgseg *seg;

[-- Attachment #3: posix-mq2.txt --]
[-- Type: text/plain, Size: 26981 bytes --]

diff -Nur -X dontdiff vanilla-2.5.46/arch/i386/kernel/entry.S linux-2.5.46/arch/i386/kernel/entry.S
--- vanilla-2.5.46/arch/i386/kernel/entry.S	2002-11-08 15:09:43.000000000 +0100
+++ linux-2.5.46/arch/i386/kernel/entry.S	2002-11-08 15:53:34.000000000 +0100
@@ -740,7 +740,14 @@
 	.long sys_epoll_create
 	.long sys_epoll_ctl	/* 255 */
 	.long sys_epoll_wait
- 	.long sys_remap_file_pages
+	.long sys_remap_file_pages
+	.long sys_mq_open
+	.long sys_mq_unlink
+	.long sys_mq_timedsend	/* 260 */
+	.long sys_mq_timedreceive
+	.long sys_mq_notify
+	.long sys_mq_getattr
+	.long sys_mq_setattr
 
 
 	.rept NR_syscalls-(.-sys_call_table)/4
diff -Nur -X dontdiff vanilla-2.5.46/include/asm-i386/unistd.h linux-2.5.46/include/asm-i386/unistd.h
--- vanilla-2.5.46/include/asm-i386/unistd.h	2002-11-08 15:10:01.000000000 +0100
+++ linux-2.5.46/include/asm-i386/unistd.h	2002-11-08 15:50:24.000000000 +0100
@@ -261,8 +261,15 @@
 #define __NR_sys_epoll_create	254
 #define __NR_sys_epoll_ctl	255
 #define __NR_sys_epoll_wait	256
-#define __NR_remap_file_pages	257
-
+#define __NR_remap_file_pages 257
+#define __NR_mq_open	258
+#define __NR_mq_unlink	259
+#define __NR_mq_timedsend	260
+#define __NR_mq_timedreceive	261
+#define __NR_mq_notify	262
+#define __NR_mq_getattr	263
+#define __NR_mq_setattr	264
+  
 
 /* user-visible error numbers are in the range -1 - -124: see <asm-i386/errno.h> */
 
diff -Nur -X dontdiff vanilla-2.5.46/include/linux/mqueue.h linux-2.5.46/include/linux/mqueue.h
--- vanilla-2.5.46/include/linux/mqueue.h	1970-01-01 01:00:00.000000000 +0100
+++ linux-2.5.46/include/linux/mqueue.h	2002-11-10 23:14:42.000000000 +0100
@@ -0,0 +1,46 @@
+#ifndef _LINUX_MQUEUE_H
+#define _LINUX_MQUEUE_H
+
+#define MQ_MAXMSG 	40	/* max number of messages in each queue */
+#define MQ_MAXSYSSIZE	1048576	/* max size that all m.q. can have together */
+#define MQ_PRIO_MAX 	10000	/* max priority */
+
+typedef int mqd_t;		/* message queue descriptor */
+
+struct mq_attr {
+	long mq_flags;		/* message queue flags */
+	long mq_maxmsg;		/* maximum number of messages */
+	long mq_msgsize;	/* maximum message size */
+	long mq_curmsgs;	/* number of messages currently queued */
+};
+
+asmlinkage mqd_t sys_mq_open(const char *u_path, int oflag, mode_t mode,
+	struct mq_attr *u_attr);
+asmlinkage int sys_mq_close(mqd_t mqdes);
+asmlinkage int sys_mq_unlink(const char *u_name);
+asmlinkage int sys_mq_timedsend(mqd_t mqdes, const char *msg_ptr, 
+	size_t msg_len, unsigned int msg_prio, struct timespec *utime);
+asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char *msg_ptr, 
+	size_t msg_len, unsigned int *msg_prio, struct timespec *utime);
+asmlinkage int sys_mq_notify(mqd_t mqdes,
+	const struct sigevent *u_notification);
+asmlinkage int sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat);
+asmlinkage int sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	struct mq_attr *u_omqstat);
+
+#ifdef __KERNEL__
+
+struct mqueue_ds {		/* queue */
+	struct mq_attr attr;
+	struct msg_queue queue;	/* ipc/msg */
+
+	spinlock_t lock;
+	wait_queue_head_t wait_recv;
+	wait_queue_head_t wait_send;
+
+	pid_t notify_pid;	/* who we have to notify (or 0) */
+	struct sigevent notify;	/* notification */
+};
+#endif /* __KERNEL__ */
+
+#endif
diff -Nur -X dontdiff vanilla-2.5.46/include/linux/sys.h linux-2.5.46/include/linux/sys.h
--- vanilla-2.5.46/include/linux/sys.h	2002-11-01 01:15:04.000000000 +0100
+++ linux-2.5.46/include/linux/sys.h	2002-11-01 16:36:48.000000000 +0100
@@ -4,7 +4,7 @@
 /*
  * system call entry points ... but not all are defined
  */
-#define NR_syscalls 260
+#define NR_syscalls 270
 
 /*
  * These are system calls that will be removed at some time
diff -Nur -X dontdiff vanilla-2.5.46/init/Kconfig linux-2.5.46/init/Kconfig
--- vanilla-2.5.46/init/Kconfig	2002-11-01 01:15:04.000000000 +0100
+++ linux-2.5.46/init/Kconfig	2002-11-04 16:58:14.000000000 +0100
@@ -69,6 +69,13 @@
 	  section 6.4 of the Linux Programmer's Guide, available from
 	  <http://www.linuxdoc.org/docs.html#guide>.
 
+config POSIXMSG
+	bool "POSIX message queues"
+	depends on SYSVIPC
+	---help---
+	  This gives you POSIX compliant interfaces for message queues.
+
+
 config BSD_PROCESS_ACCT
 	bool "BSD Process Accounting"
 	help
diff -Nur -X dontdiff vanilla-2.5.46/ipc/Makefile linux-2.5.46/ipc/Makefile
--- vanilla-2.5.46/ipc/Makefile	2002-09-23 15:54:57.000000000 +0200
+++ linux-2.5.46/ipc/Makefile	2002-11-04 17:50:24.000000000 +0100
@@ -6,4 +6,8 @@
 
 obj-$(CONFIG_SYSVIPC) += msg.o sem.o shm.o
 
+ifeq ($(CONFIG_POSIXMSG),y)
+obj-$(CONFIG_SYSVIPC) += posixmsg.o
+endif
+
 include $(TOPDIR)/Rules.make
diff -Nur -X dontdiff vanilla-2.5.46/ipc/posixmsg.c linux-2.5.46/ipc/posixmsg.c
--- vanilla-2.5.46/ipc/posixmsg.c	1970-01-01 01:00:00.000000000 +0100
+++ linux-2.5.46/ipc/posixmsg.c	2002-11-10 23:26:26.000000000 +0100
@@ -0,0 +1,814 @@
+/*
+ *  linux/ipc/posixmsg.c
+ *
+ *  Copyright 2002 Peter Wächtler <pwaechtler@mac.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * root can mount the filesystem to see the names of the currently
+ * used mqueues. The i_size shows the number of msgs in the queue.
+ * The only shell operation on the "fifos" is an unlink() via rm(1)
+ * The queues are accessed through syscalls - poll is supported
+ *
+ * TODO:
+ *	check for more sysv msg limits (or add some new)?
+ *  implement SIGEV_THREAD
+ */
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/slab.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+#include <linux/namei.h>
+#include <linux/pagemap.h>	/* PAGE_CACHE_SIZE */
+#include <linux/poll.h>
+
+#include <linux/mqueue.h>
+#include <linux/msg.h>
+#include <asm/uaccess.h>
+
+/* functions used from ipc/msg.c */
+extern void free_msg(struct msg_msg *msg);
+extern int store_msg(void *dest, struct msg_msg *msg, int len);
+extern struct msg_msg *load_msg(void *src, int len);
+
+extern int msg_ctlmnb;		/* default max size of a message queue (all msgs sum up) */
+extern int msg_ctlmni;		/* max # of msg queue identifiers */
+extern int msg_ctlmax;		/* max size of one message (bytes) */
+
+static int mqueue_release(struct inode *inode, struct dentry *dentry);
+static int mqueue_close(struct inode *inode, struct file *filp);
+static unsigned int mqueue_poll(struct file *, struct poll_table_struct *);
+
+static atomic_t msg_bytes = ATOMIC_INIT(0);
+static struct vfsmount *msg_mnt;
+
+static struct file_operations msg_fops = {
+	.llseek = no_llseek,
+	.release = mqueue_close,
+	.poll = mqueue_poll,
+};
+static struct inode_operations msg_dir_inode_operations = {
+	.lookup = simple_lookup,
+	.unlink = mqueue_release,
+};
+
+static struct msg_queue *
+init_queue(struct msg_queue *queue)
+{
+	int retval;
+
+	queue->q_perm.mode = 0;
+	queue->q_perm.key = 0;
+	queue->q_perm.security = NULL;
+	retval = security_ops->msg_queue_alloc_security(queue);
+	if (retval)
+		return ERR_PTR(retval);;
+
+	queue->q_stime = queue->q_rtime = 0;
+	queue->q_ctime = CURRENT_TIME;
+	queue->q_qbytes = queue->q_cbytes = queue->q_qnum = 0;
+	queue->q_lspid = queue->q_lrpid = 0;
+	INIT_LIST_HEAD(&queue->q_messages);
+	INIT_LIST_HEAD(&queue->q_receivers);	/* unused */
+	INIT_LIST_HEAD(&queue->q_senders);	/* unused */
+
+	return queue;
+}
+
+static struct inode *
+get_msg_inode(struct super_block *sb, mode_t mode)
+{
+	struct mqueue_ds *q;
+	struct inode *inode = new_inode(sb);
+
+	if (inode) {
+		inode->i_mode = mode;
+		inode->i_uid = current->fsuid;
+		inode->i_gid = current->fsgid;
+		inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
+		inode->i_blksize = 1024;
+		switch (mode & S_IFMT) {
+		default:
+		case S_IFIFO:
+		case S_IFREG:
+			inode->i_fop = &msg_fops;
+			if ((q = kmalloc(sizeof (*q), GFP_KERNEL))) {
+				if (!init_queue(&q->queue)) {
+					kfree(q);
+					iput(inode);
+					inode = ERR_PTR(-EACCES);
+				}
+				inode->u.generic_ip = q;
+			} else {
+				iput(inode);
+				inode = ERR_PTR(-ENOSPC);
+			}
+			break;
+		case S_IFDIR:
+			inode->i_op = &msg_dir_inode_operations;
+			inode->i_fop = &simple_dir_operations;
+			/* directory inodes start off with i_nlink == 2 (for "." entry) */
+			inode->i_nlink++;
+			break;
+		case S_IFLNK:
+			break;
+		}
+	} else
+		inode = ERR_PTR(-ENOSPC);
+
+	return inode;
+}
+
+#define get_mqueue(filp)\
+	((filp) ? filp->f_dentry->d_inode->u.generic_ip: filp)
+
+/* don't use fget() to avoid the fput() for no good reason */
+static struct file *
+mqueue_lookup(mqd_t fd)
+{
+	struct files_struct *files = current->files;
+
+	read_lock(&files->file_lock);
+	if (fd >= files->max_fds || fd < 0)
+		goto out_unlock;
+	read_unlock(&files->file_lock);
+	return files->fd[fd];
+
+      out_unlock:
+	read_unlock(&files->file_lock);
+	return NULL;
+}
+
+static inline int
+freespace(struct mqueue_ds *q, size_t msg_len)
+{
+	int rc;
+	spin_lock(&q->lock);
+
+	rc = msg_len + q->queue.q_cbytes <= q->attr.mq_msgsize &&
+	    q->queue.q_qnum < q->attr.mq_maxmsg;
+
+	spin_unlock(&q->lock);
+	return rc;
+}
+
+/* cleans up after close() or exit() */
+static int
+mqueue_close(struct inode *inode, struct file *filp)
+{
+	struct mqueue_ds *q = inode->u.generic_ip;
+
+	if (!q || !filp)
+		return -EBADFD;
+
+	spin_lock(&q->lock);
+	/* remove possible notification;
+	 * sys_getpid() returns the tgid if multithreaded */
+	if (q->notify_pid == current->pid) {
+		q->notify_pid = 0;
+		q->notify.sigev_signo = 0;
+		q->notify.sigev_notify = 0;
+	}
+	spin_unlock(&q->lock);
+	return 0;
+}
+
+/* removes a queue - inode is the inode of the directory */
+static int
+mqueue_release(struct inode *inode, struct dentry *dentry)
+{
+	struct mqueue_ds *q = dentry->d_inode->u.generic_ip;
+	struct msg_queue *queue;
+	struct list_head *tmp;
+
+	if (!q)
+		return -EBADFD;
+
+	queue = &q->queue;
+
+	tmp = queue->q_messages.next;
+	while (tmp != &queue->q_messages) {
+		struct msg_msg *msg = list_entry(tmp, struct msg_msg, m_list);
+		tmp = tmp->next;
+		pr_info("mqueue_release: freeing msg:%p\n", msg);
+		free_msg(msg);
+	}
+	atomic_sub(queue->q_cbytes, &msg_bytes);
+	security_ops->msg_queue_free_security(queue);
+	dentry->d_inode->u.generic_ip = NULL;
+
+	kfree(q);
+	return 0;
+}
+
+static unsigned int
+mqueue_poll(struct file *filp, struct poll_table_struct *wait)
+{
+	struct mqueue_ds *q = get_mqueue(filp);
+	int ret = 0;
+
+	poll_wait(filp, &q->wait_recv, wait);
+	poll_wait(filp, &q->wait_send, wait);
+
+	if (q->queue.q_qnum)
+		ret = POLLIN | POLLRDNORM;
+
+	if (q->queue.q_qnum < q->attr.mq_maxmsg)
+		ret |= POLLOUT | POLLWRNORM;
+
+	return ret;
+}
+
+static int
+create_queue(struct dentry *dir, struct qstr *qname,
+	     int oflag, mode_t mode, struct mq_attr *u_attr)
+{
+	int ret, fd;
+	struct file *filp;
+	struct mqueue_ds *q;
+	struct inode *inode;
+	struct dentry *dentry;
+
+	inode = get_msg_inode(msg_mnt->mnt_sb, S_IFIFO | (mode & S_IRWXUGO));
+	if (IS_ERR(inode)) {
+		ret = PTR_ERR(inode);
+		goto out_ret;
+	}
+	q = inode->u.generic_ip;
+
+	if (u_attr != NULL) {
+		if (copy_from_user(&q->attr, u_attr, sizeof (struct mq_attr))) {
+			ret = -EFAULT;
+			goto out_inode;
+		}
+		if (q->attr.mq_maxmsg <= 0
+		    || q->attr.mq_msgsize <= 0
+		    || q->attr.mq_maxmsg > MQ_MAXMSG
+		    || q->attr.mq_msgsize > msg_ctlmax) {
+			ret = -EINVAL;
+			goto out_inode;
+		}
+	} else {		/* implementation defined */
+		q->attr.mq_maxmsg = MQ_MAXMSG;
+		q->attr.mq_msgsize = 1024 /*msg_ctlmax */ ;
+	}
+	q->attr.mq_flags = oflag & O_ACCMODE;
+	q->notify_pid = 0;
+	q->notify.sigev_signo = 0;
+	q->notify.sigev_notify = 0;
+	init_waitqueue_head(&q->wait_send);
+	init_waitqueue_head(&q->wait_recv);
+
+	ret = -ENFILE;
+	if ((fd = get_unused_fd()) < 0)
+		goto out_inode;
+	if (!(filp = get_empty_filp()))
+		goto out_fd;
+
+	qname->hash = full_name_hash(qname->name, qname->len);
+	dentry = d_alloc(dir, qname);
+	if (!dentry) {
+		ret = -ENOMEM;
+		goto out_filp;
+	}
+	d_add(dentry, inode);
+	ret = get_write_access(inode);
+	if (ret)
+		goto out_filp;
+
+	filp->f_vfsmnt = mntget(msg_mnt);
+	filp->f_dentry = dget(dentry);	/* leave it active */
+	filp->f_op = &msg_fops;
+	filp->f_mode = (q->attr.mq_flags + 1) & O_ACCMODE;
+	filp->f_flags = oflag;
+
+	/* Now we map fd to filp, so userspace can access it */
+	fd_install(fd, filp);
+	ret = fd;
+	goto out_ret;
+
+      out_filp:
+	put_filp(filp);
+      out_fd:
+	put_unused_fd(fd);
+      out_inode:
+	kfree(q);
+	iput(inode);
+      out_ret:
+	return ret;
+}
+
+/**
+ *	sys_mq_open	-	opens a message queue associated with @u_name 
+ *	@mqdes: descriptor of mqueue
+ *	@oflag: flags like O_CREAT, O_EXCL, O_RDWR
+ *	@mode: when O_CREAT is specified, the permission bits
+ *	@u_attr: pointer to the attributes, like max msgsize, when creating
+ *
+ *	returns a descriptor to the opened queue or negative value on error
+ */
+asmlinkage mqd_t
+sys_mq_open(const char *u_name, int oflag, mode_t mode, struct mq_attr * u_attr)
+{
+	struct file *filp;
+	struct dentry *dentry;
+	struct qstr this;
+	static int oflag2acc[O_ACCMODE] =
+	    { MAY_READ, MAY_WRITE, MAY_READ | MAY_WRITE };
+	int fd, ret;
+
+	if (IS_ERR(this.name = getname(u_name)))
+		return -ENOMEM;
+	this.len = strlen(this.name);
+	dentry = lookup_one_len(this.name, msg_mnt->mnt_root, this.len);
+
+	if (IS_ERR(dentry)) {
+		ret = PTR_ERR(dentry);
+		goto out_ret;
+	}
+	if (oflag & O_CREAT) {
+		if (dentry->d_inode) {
+			/* entry exists already */
+			if (oflag & O_EXCL) {
+				ret = -EEXIST;
+			} else {
+				goto open_existing;
+			}
+			goto out_dput;
+		} else {
+			ret =
+			    create_queue(msg_mnt->mnt_root, &this, oflag, mode,
+					 u_attr);
+		}
+
+	} else {		/* O_CREAT */
+		if (!dentry->d_inode) {
+			ret = -ENOENT;
+		} else {
+		      open_existing:
+			if (permission
+			    (dentry->d_inode, oflag2acc[oflag & O_ACCMODE])) {
+				ret = -EACCES;
+			} else {
+				fd = get_unused_fd();
+				if (fd >= 0) {
+					mntget(msg_mnt);
+					filp =
+					    dentry_open(dentry, msg_mnt, oflag);
+					filp->f_op = &msg_fops;
+					if (IS_ERR(filp)) {
+						ret = PTR_ERR(filp);
+						put_unused_fd(fd);
+						goto out_dput;
+					}
+					dget(dentry);
+					fd_install(fd, filp);
+				}
+				ret = fd;
+			}
+		}
+	}
+      out_dput:
+	dput(dentry);
+      out_ret:
+	putname(this.name);
+
+	return ret;
+}
+
+/**
+ *	sys_mq_unlink	-	removes a message queue from the namespace 
+ *	
+ *	@u_name: pointer to the name
+ */
+asmlinkage int
+sys_mq_unlink(const char *u_name)
+{
+	int err;
+	struct dentry *dentry, *dir;
+	char *name = getname(u_name);
+
+	if (IS_ERR(name))
+		return PTR_ERR(name);
+
+	dentry = lookup_one_len(name, msg_mnt->mnt_root, strlen(name));
+	putname(name);
+	if (IS_ERR(dentry))
+		return PTR_ERR(dentry);
+
+	if (!dentry->d_inode)
+		return -ENOENT;
+
+	err = -EACCES;
+	dir = dentry->d_parent;
+	if (dir) {
+		err = vfs_unlink(dir->d_inode, dentry);
+		if (!err)
+			d_delete(dentry);
+	}
+	dput(dentry);
+	return err;
+}
+
+/**
+ *	sys_mq_timedsend	-	send a message to the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@msg_ptr: pointer to buffer holding the message
+ *	@msg_len: length of the message
+ *	@msg_prio: the priority of the message
+ *	@utime: if !NULL the function will only block for specified time
+ */
+asmlinkage int
+sys_mq_timedsend(mqd_t mqdes,
+		 const char *msg_ptr, size_t msg_len,
+		 unsigned int msg_prio, struct timespec *utime)
+{
+	struct siginfo sig_i;
+	struct msg_msg *msg;
+	struct list_head *p;
+	struct msg_queue *queue;
+	int err;
+	long timeout;
+	struct timespec ts;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+	if ((filp->f_mode & O_ACCMODE) == O_RDONLY)
+		return -EBADF;
+	if ((unsigned int) msg_prio > (unsigned int) MQ_PRIO_MAX)
+		return -EINVAL;
+	if (msg_len > q->attr.mq_msgsize)
+		return -EMSGSIZE;
+
+	queue = &q->queue;
+	if ((filp->f_flags & O_NONBLOCK) && !freespace(q, msg_len))
+		return -EAGAIN;
+
+	/* check if this message will exceed overall limit for messages */
+	if (atomic_read(&msg_bytes) + msg_len > MQ_MAXSYSSIZE)
+		return -ENOMEM;
+
+	if (utime) {
+		if (copy_from_user(&ts, utime, sizeof (ts)))
+			return -EFAULT;
+		if (ts.tv_nsec >= 1000000000L || ts.tv_nsec < 0
+		    || ts.tv_sec < 0)
+			return -EINVAL;
+		timeout = (timespec_to_jiffies(&ts)
+			   + (ts.tv_sec || ts.tv_nsec));
+	} else
+		timeout = 0L;
+
+	msg = load_msg((char *) msg_ptr, msg_len);
+	if (IS_ERR(msg))
+		return PTR_ERR(msg);
+
+	msg->m_type = msg_prio;
+	msg->m_ts = msg_len;
+
+	if (!timeout)
+		timeout = MAX_SCHEDULE_TIMEOUT;
+
+	err = wait_event_interruptible_timeout(q->wait_send,
+					       freespace(q, msg_len), timeout);
+
+	if (err == -ERESTARTSYS)
+		return -EINTR;
+	if (err == 0)
+		return -ETIMEDOUT;
+
+	err = 0;
+	/* if we lose the race for the lock, we could overflow the limits */
+	spin_lock(&q->lock);
+	/* enqueue message in prio order */
+	p = queue->q_messages.next;	/* used as flag if msg was queued */
+	if (msg_prio > 0 && !list_empty(&queue->q_messages)) {
+
+		list_for_each(p, &queue->q_messages) {
+			struct msg_msg *tmp =
+			    list_entry(p, struct msg_msg, m_list);
+			if (tmp->m_type < msg_prio) {
+				list_add_tail(&msg->m_list, p);
+				p = NULL;
+				break;
+			}
+		}
+	}
+	if (p)			/* ok, put it at the end */
+		list_add_tail(&msg->m_list, &queue->q_messages);
+
+	queue->q_lspid = current->pid;
+	queue->q_stime = CURRENT_TIME;
+	queue->q_cbytes += msg_len;
+	atomic_add(msg_len, &msg_bytes);
+	queue->q_qnum++;
+	filp->f_dentry->d_inode->i_size = queue->q_qnum;
+
+	if (waitqueue_active(&q->wait_recv)) {
+		wake_up_interruptible(&q->wait_recv);
+	} else {
+		/* since there was no synchronously waiting process for message
+		 * we notify it when the state of queue changed from
+		 * empty to not empty */
+		if (q->notify_pid != 0 && queue->q_qnum == 1) {
+			/* TODO: Add support for sigev_notify==SIGEV_THREAD
+			 *    should we really create a thread? I think so.
+			 */
+			if (q->notify.sigev_notify == SIGEV_THREAD) {
+				err = -ENOSYS;
+				pr_info
+				    ("mq_*send: SIGEV_THREAD not supported\n");
+			}
+			/* sends signal */
+			if (q->notify.sigev_notify == SIGEV_SIGNAL) {
+				sig_i.si_signo = q->notify.sigev_signo;
+				sig_i.si_errno = 0;
+				sig_i.si_code = SI_MESGQ;
+				sig_i.si_pid = current->pid;
+				sig_i.si_uid = current->uid;
+				kill_proc_info(q->notify.sigev_signo,
+					       &sig_i, q->notify_pid);
+			}
+			/* after notification unregisters process */
+			q->notify_pid = 0;
+		}
+	}
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_timedreceive	-	receive a message from the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@msg_ptr: pointer to buffer to hold the message
+ *	@msg_len: length of the userspace buffer
+ *	@msg_prio: will hold the priority if a message was received
+ *	@utime: if !NULL the function will only block for specified time
+ */
+asmlinkage ssize_t
+sys_mq_timedreceive(mqd_t mqdes,
+		    char *msg_ptr, size_t msg_len,
+		    unsigned int *msg_prio, struct timespec * utime)
+{
+	struct msg_queue *queue;
+	struct msg_msg *msg;
+	int err;
+	long timeout;
+	struct timespec ts;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	if (!(filp->f_mode & FMODE_READ))
+		return -EBADF;
+
+	queue = &q->queue;
+	if ((filp->f_flags & O_NONBLOCK) && queue->q_qnum == 0)
+		return -EAGAIN;
+
+	if (utime) {
+		if (copy_from_user(&ts, utime, sizeof (ts)))
+			return -EFAULT;
+		if (ts.tv_nsec >= 1000000000L || ts.tv_nsec < 0
+		    || ts.tv_sec < 0)
+			return -EINVAL;
+		timeout = (timespec_to_jiffies(&ts)
+			   + (ts.tv_sec || ts.tv_nsec));
+	} else
+		timeout = 0L;
+      wait_on_msg:
+	if (!timeout)
+		timeout = MAX_SCHEDULE_TIMEOUT;
+
+	err = wait_event_interruptible_timeout(q->wait_recv,
+					       queue->q_qnum > 0, timeout);
+
+	if (err == -ERESTARTSYS)
+		return -EINTR;
+	if (err == 0)
+		return -ETIMEDOUT;
+
+	err = 0;
+	spin_lock(&q->lock);
+	if (!list_empty(&queue->q_messages)) {
+
+		msg =
+		    list_entry(queue->q_messages.next, struct msg_msg, m_list);
+		if (msg_len < msg->m_ts) {
+			err = -EMSGSIZE;
+			goto out_unlock;
+		}
+		list_del(&msg->m_list);
+		queue->q_rtime = CURRENT_TIME;
+		queue->q_lrpid = current->pid;
+		queue->q_cbytes -= msg->m_ts;
+		atomic_sub(msg->m_ts, &msg_bytes);
+		queue->q_qnum--;
+		filp->f_dentry->d_inode->i_size = queue->q_qnum;
+
+		wake_up_interruptible(&q->wait_send);
+
+		msg_len = (msg_len > msg->m_ts) ? msg->m_ts : msg_len;
+
+		spin_unlock(&q->lock);
+		if ((err = store_msg(msg_ptr, msg, msg_len)) ||
+		    put_user(msg->m_type, msg_prio)) {
+			msg_len = -EFAULT;	/* hmh, now the msg is lost */
+		}
+		free_msg(msg);
+		return msg_len;
+	} else {
+		spin_unlock(&q->lock);
+		goto wait_on_msg;
+	}
+      out_unlock:
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_notify	-	set or remove a notification on the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_notification: pointer to struct sigevent 
+ *
+ */
+asmlinkage int
+sys_mq_notify(mqd_t mqdes, const struct sigevent *u_notification)
+{
+	struct sigevent notify;
+	struct inode *inode;
+	struct file *filp = mqueue_lookup(mqdes);
+	struct mqueue_ds *q = get_mqueue(filp);
+	int err = 0;
+
+	if (!q)
+		return -EBADF;
+	if (u_notification != NULL)
+		if (copy_from_user
+		    (&notify, u_notification, sizeof (struct sigevent)))
+			return -EFAULT;
+
+	inode = filp->f_dentry->d_inode;
+	spin_lock(&q->lock);
+	if (q->notify_pid == current->pid
+	    && (u_notification == NULL || notify.sigev_notify == SIGEV_NONE)) {
+		q->notify_pid = 0;	/* remove notification */
+		q->notify.sigev_signo = 0;
+		q->notify.sigev_notify = 0;
+	} else if (q->notify_pid > 0) {
+		err = -EBUSY;
+	} else if (u_notification != NULL) {
+		if (notify.sigev_notify == SIGEV_SIGNAL) {
+			/* add notification */
+			q->notify_pid = current->pid;
+			q->notify.sigev_signo = notify.sigev_signo;
+			q->notify.sigev_notify = notify.sigev_notify;
+		} else if (notify.sigev_notify == SIGEV_THREAD) {
+			err = -ENOSYS;
+			pr_info("mq_*send: SIGEV_THREAD not supported yet\n");
+		}
+	}
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_getattr	-	get the attributes of the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_mqstat: pointer to struct holding the new values
+ *
+ */
+asmlinkage int
+sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat)
+{
+	int err = 0;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	spin_lock(&q->lock);
+	q->attr.mq_flags = (filp->f_mode - 1) & O_ACCMODE;
+	q->attr.mq_flags = (filp->f_flags) & O_NONBLOCK;
+	q->attr.mq_curmsgs = q->queue.q_qnum;
+	if (copy_to_user(u_mqstat, &q->attr, sizeof (struct mq_attr)))
+		err = -EFAULT;
+	spin_lock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_setattr	-	set the attributes of the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_mqstat: pointer to struct holding the new values
+ *	@u_omqstat: pointer to store the original attributes (if !NULL)
+ *
+ */
+asmlinkage int
+sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	       struct mq_attr *u_omqstat)
+{
+	struct mq_attr mqstat;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	spin_lock(&q->lock);
+	if (u_omqstat != NULL) {
+		q->attr.mq_flags = (filp->f_mode - 1) & O_ACCMODE;
+		q->attr.mq_flags = (filp->f_flags) & O_NONBLOCK;
+		q->attr.mq_curmsgs = q->queue.q_qnum;
+		if (copy_to_user(u_omqstat, &q->attr, sizeof (struct mq_attr))) {
+			spin_unlock(&q->lock);
+			return -EFAULT;
+		}
+	}
+	if (copy_from_user(&mqstat, u_mqstat, sizeof (struct mq_attr))) {
+		spin_unlock(&q->lock);
+		return -EFAULT;
+	}
+	if (mqstat.mq_flags & O_NONBLOCK)
+		filp->f_flags |= O_NONBLOCK;
+	else
+		filp->f_flags &= ~O_NONBLOCK;
+
+	spin_unlock(&q->lock);
+	return 0;
+}
+
+static struct super_operations msg_s_ops = {
+	.statfs = simple_statfs,
+	.drop_inode = generic_delete_inode,
+};
+
+static int
+msg_fill_super(struct super_block *sb, void *data, int silent)
+{
+	struct inode *root;
+	struct dentry *root_dentry;
+
+	sb->s_blocksize = PAGE_CACHE_SIZE;
+	sb->s_blocksize_bits = PAGE_CACHE_SHIFT;
+	sb->s_magic = 0x12121212;
+	sb->s_op = &msg_s_ops;
+
+	root = get_msg_inode(sb, S_IFDIR | S_IRWXUGO | S_ISVTX);
+	if (!root)
+		goto out;
+	root_dentry = d_alloc_root(root);
+	if (!root_dentry)
+		goto out_iput;
+	sb->s_root = root_dentry;
+	return 0;
+
+      out_iput:
+	iput(root);
+      out:
+	return -ENOMEM;
+}
+
+static struct super_block *
+msgfs_get_sb(struct file_system_type *fs_type,
+	     int flags, char *dev_name, void *data)
+{
+	return get_sb_single(fs_type, flags, data, msg_fill_super);
+}
+
+static struct file_system_type msg_fs_type = {
+	.name = "msgfs",
+	.get_sb = msgfs_get_sb,
+	.kill_sb = kill_anon_super,
+};
+
+static int __init
+mqueue_init(void)
+{
+	register_filesystem(&msg_fs_type);
+	if (IS_ERR(msg_mnt = kern_mount(&msg_fs_type)))
+		return PTR_ERR(msg_mnt);
+
+	return 0;
+}
+
+static void __exit
+mqueue_exit(void)
+{
+	unregister_filesystem(&msg_fs_type);
+}
+
+__initcall(mqueue_init);
diff -Nur -X dontdiff vanilla-2.5.46/ipc/util.c linux-2.5.46/ipc/util.c
--- vanilla-2.5.46/ipc/util.c	2002-11-08 15:10:06.000000000 +0100
+++ linux-2.5.46/ipc/util.c	2002-11-08 15:48:00.000000000 +0100
@@ -24,6 +24,7 @@
 #include <linux/security.h>
 #include <linux/rcupdate.h>
 #include <linux/workqueue.h>
+#include <linux/mqueue.h>
 
 #if defined(CONFIG_SYSVIPC)
 
@@ -579,3 +580,57 @@
 }
 
 #endif /* CONFIG_SYSVIPC */
+
+#if defined(CONFIG_POSIXMSG)
+/* nothing yet */
+#else
+/*
+ * Dummy functions when POSIXMSG isn't configured
+ */
+
+asmlinkage mqd_t sys_mq_open(const char *u_path, int oflag, mode_t mode,
+	struct mq_attr *u_attr)
+{
+	return (mqd_t) -ENOSYS;
+}
+
+asmlinkage int sys_mq_close(mqd_t mqdes)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_unlink(const char *u_name)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_timedsend(mqd_t mqdes, const char *msg_ptr, 
+	size_t msg_len, unsigned int msg_prio, struct timespec *utime)
+{
+	return -ENOSYS;
+}
+
+asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char *msg_ptr, 
+	size_t msg_len, unsigned int *msg_prio, struct timespec *utime)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_notify(mqd_t mqdes,
+	const struct sigevent *u_notification)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	struct mq_attr *u_omqstat)
+{
+	return -ENOSYS;
+}
+
+#endif				/* CONFIG_POSIXMSG */

^ permalink raw reply	[flat|nested] 11+ messages in thread
[parent not found: <EDC461A30AC4D511ADE10002A5072CAD04C70992@orsmsx119.jf.intel.com>]
* RE: [PATCH] unified SysV and POSIX mqueues - complete rewrite
@ 2002-11-19 19:12 Perez-Gonzalez, Inaky
  0 siblings, 0 replies; 11+ messages in thread
From: Perez-Gonzalez, Inaky @ 2002-11-19 19:12 UTC (permalink / raw)
  To: 'Peter Waechtler', Krzysztof Benedyczak
  Cc: Michal Wronski, linux-kernel, Gustafson, Geoffrey R,
	Abbas, Mohamed

[-- Attachment #1: Type: text/plain, Size: 1283 bytes --]


> We could implement some kind of priority sorted waitqueue.
> I read about some prio aware waitqueue patch (from Ingo Molnar) used
> by a NGPT developer for the futexes (I.Gonzalez?)

I am just a contributor to NGPT ... what I did was to apply the
concept in Ingo's O(1) scheduling to the selection of tasks waiting
for the futex event.

> I thought about some waitqueue in the kernel, that at least put
> realtime processes at the front of the waitqueue.
> 
> It would be nice to have some generic mechanism in the kernel since
> only prio aware message read feature does not buy you much, IMHO

I did something like that also for the futexes [priority based futexes].
Check the attached patch vs. 2.5.45 [sorry for not inlining it, but I
am w/ Outlook now and it breaks it].

The nice next step would be to go ahead and create a breed of wait queues
that supported priority-based wake up, but there seemed to be not too
much interest; in any case, we'd need to come out with a sollution that
would not waste 2*sizeof(void*) * NUM_PRIO bytes per wait-queue, but
still being O(1) [and now that we are at asking, if somebody knows where
to buy a brand new Mercedes Benz C320 at 25%, pls email me]

Inaky Perez-Gonzalez -- Not speaking for Intel - opinions are my own [or my
fault]


[-- Attachment #2: prioarray-2.5.45.patch --]
[-- Type: application/octet-stream, Size: 3380 bytes --]

diff -u /dev/null include/linux/prioarray.h:1.1.2.1
--- /dev/null	Tue Nov 19 11:01:07 2002
+++ include/linux/prioarray.h	Tue Oct 15 15:17:29 2002
@@ -0,0 +1,57 @@
+/*
+ * O(1) priority arrays
+ *
+ * Modified from code (C) 2002 Ingo Molnar <mingo@redhat.com> in
+ * sched.c by Iñaky Pérez-González <inaky.perez-gonzalez@intel.com> so
+ * that other parts of the kernel can use the same constructs.
+ */
+
+#ifndef _LINUX_PRIOARRAY_
+#define _LINUX_PRIOARRAY_
+
+        /* This inclusion is kind of recursive ... hmmm */
+
+#include <linux/sched.h>
+
+struct prio_array {
+	int nr_active;
+	unsigned long bitmap[BITMAP_SIZE];
+	struct list_head queue[MAX_PRIO];
+};
+
+typedef struct prio_array prio_array_t;
+
+static inline
+void pa_init (prio_array_t *array)
+{
+        unsigned cnt;
+	array->nr_active = 0;
+        memset (array->bitmap, 0, sizeof (array->bitmap));
+        for (cnt = 0; cnt < MAX_PRIO; cnt++)
+                INIT_LIST_HEAD (&array->queue[cnt]);
+}
+
+/*
+ * Adding/removing a node to/from a priority array:
+ */
+
+static inline
+void pa_dequeue (struct list_head *p, unsigned prio, prio_array_t *array)
+{
+	array->nr_active--;
+	list_del(p);
+	if (list_empty(array->queue + prio))
+		__clear_bit(prio, array->bitmap);
+}
+
+static inline
+void pa_enqueue (struct list_head *p, unsigned prio, prio_array_t *array)
+{
+	list_add_tail(p, array->queue + prio);
+	__set_bit(prio, array->bitmap);
+	array->nr_active++;
+}
+
+
+
+#endif /* #ifndef _LINUX_PRIOARRAY_ */

diff -u include/linux/sched.h:1.1.1.4 include/linux/sched.h:1.1.1.1.2.4
--- include/linux/sched.h:1.1.1.4	Thu Oct 31 15:28:29 2002
+++ include/linux/sched.h	Thu Oct 31 15:36:14 2002
@@ -241,6 +241,9 @@
 #define MAX_RT_PRIO		MAX_USER_RT_PRIO
 
 #define MAX_PRIO		(MAX_RT_PRIO + 40)
+#define BITMAP_SIZE ((((MAX_PRIO+1+7)/8)+sizeof(long)-1)/sizeof(long))
+
+#include <linux/prioarray.h> /* Okay, this is ugly, but needs MAX_PRIO */
  
 /*
  * Some day this will be a full-fledged user tracking system..
@@ -265,7 +268,6 @@
 extern struct user_struct root_user;
 #define INIT_USER (&root_user)
 
-typedef struct prio_array prio_array_t;
 struct backing_dev_info;
 
 struct task_struct {

diff -u kernel/sched.c:1.1.1.3 kernel/sched.c:1.1.1.1.2.2
--- kernel/sched.c:1.1.1.3	Thu Oct 17 13:08:31 2002
+++ kernel/sched.c	Thu Oct 17 13:51:57 2002
@@ -129,15 +129,8 @@
  * These are the runqueue data structures:
  */
 
-#define BITMAP_SIZE ((((MAX_PRIO+1+7)/8)+sizeof(long)-1)/sizeof(long))
-
 typedef struct runqueue runqueue_t;
 
-struct prio_array {
-	int nr_active;
-	unsigned long bitmap[BITMAP_SIZE];
-	struct list_head queue[MAX_PRIO];
-};
 
 /*
  * This is the main, per-CPU runqueue data structure.
@@ -225,17 +218,12 @@
  */
 static inline void dequeue_task(struct task_struct *p, prio_array_t *array)
 {
-	array->nr_active--;
-	list_del(&p->run_list);
-	if (list_empty(array->queue + p->prio))
-		__clear_bit(p->prio, array->bitmap);
+        pa_dequeue (&p->run_list, p->prio, array);
 }
 
 static inline void enqueue_task(struct task_struct *p, prio_array_t *array)
 {
-	list_add_tail(&p->run_list, array->queue + p->prio);
-	__set_bit(p->prio, array->bitmap);
-	array->nr_active++;
+        pa_enqueue (&p->run_list, p->prio, array);
 	p->array = array;
 }
 

^ permalink raw reply	[flat|nested] 11+ messages in thread
* [PATCH] unified SysV and POSIX mqueues - complete rewrite
@ 2002-11-24 23:05 Peter Waechtler
       [not found] ` <3DE15F9E.A585E26F@digeo.com>
  0 siblings, 1 reply; 11+ messages in thread
From: Peter Waechtler @ 2002-11-24 23:05 UTC (permalink / raw)
  To: linux-kernel

[-- Attachment #1: Type: text/plain, Size: 1150 bytes --]

[part to lkml]
Alan, feel invited to add it to your -ac tree :-)

There are at least 3 attempts to provide POSIX mqueues for Linux:

a) early attempt of Jakub Jelinek based on 2.4-test based on a
filesystem with read/write/ioctl interface - very intrusive to ipc/msg.c

b) http://www.mat.uni.torun.pl/~wrona/posix_ipc
complicated standalone version (against recent 2.4), with 
multiplexed syscall - and great effort to keep track of resources
which is far easier accomplished by vfs

c) based on a filesystem with its own syscalls and clean separation
(but dependence ) on SysV ipc/msg.c
why separate syscalls? you cannot specify the priority of a sent
message without ioctl/fcntl, also problems with timed versions - and
yes there is an outstanding issue with priority aware waitqueues

interface stub and userspace implementation is on
http://homepage.mac.com/pwaechtler/linux/mqueue.tgz

The userspace implementation is not complete. There we have the problems
with the locks on crashing apps (we could use flock), signal safety but what
about the timed versions? Performance is not as good as kernel version!?

following patch is against 2.5.49


[-- Warning: decoded text below may be mangled, UTF-8 assumed --]
[-- Attachment #2: kernel.patch --]
[-- Type: text/x-diff; charset="us-ascii"; name="kernel.patch", Size: 28523 bytes --]

diff -X dontdiff -Nur vanilla-2.5.49/arch/i386/kernel/entry.S linux-2.5.49/arch/i386/kernel/entry.S
--- vanilla-2.5.49/arch/i386/kernel/entry.S	2002-11-23 17:04:27.000000000 +0100
+++ linux-2.5.49/arch/i386/kernel/entry.S	2002-11-23 17:13:38.000000000 +0100
@@ -741,8 +741,19 @@
 	.long sys_epoll_create
 	.long sys_epoll_ctl	/* 255 */
 	.long sys_epoll_wait
- 	.long sys_remap_file_pages
- 	.long sys_set_tid_address
+	.long sys_remap_file_pages
+	.long sys_set_tid_address
+	.long sys_ni_syscall
+	.long sys_ni_syscall	/* 260 */
+	.long sys_ni_syscall
+	.long sys_ni_syscall
+	.long sys_mq_open
+	.long sys_mq_unlink
+	.long sys_mq_timedsend	/* 265 */
+	.long sys_mq_timedreceive
+	.long sys_mq_notify
+	.long sys_mq_getattr
+	.long sys_mq_setattr
 
 
 	.rept NR_syscalls-(.-sys_call_table)/4
diff -X dontdiff -Nur vanilla-2.5.49/include/asm-i386/unistd.h linux-2.5.49/include/asm-i386/unistd.h
--- vanilla-2.5.49/include/asm-i386/unistd.h	2002-11-21 00:18:34.000000000 +0100
+++ linux-2.5.49/include/asm-i386/unistd.h	2002-11-24 01:49:32.000000000 +0100
@@ -261,9 +261,16 @@
 #define __NR_sys_epoll_create	254
 #define __NR_sys_epoll_ctl	255
 #define __NR_sys_epoll_wait	256
-#define __NR_remap_file_pages	257
+#define __NR_remap_file_pages 257
 #define __NR_set_tid_address	258
-
+#define __NR_sys_mq_open	263
+#define __NR_sys_mq_unlink	264
+#define __NR_mq_timedsend	265
+#define __NR_mq_timedreceive	266
+#define __NR_mq_notify	267
+#define __NR_mq_getattr	268
+#define __NR_mq_setattr	269
+  
 
 /* user-visible error numbers are in the range -1 - -124: see <asm-i386/errno.h> */
 
diff -X dontdiff -Nur vanilla-2.5.49/include/linux/mqueue.h linux-2.5.49/include/linux/mqueue.h
--- vanilla-2.5.49/include/linux/mqueue.h	1970-01-01 01:00:00.000000000 +0100
+++ linux-2.5.49/include/linux/mqueue.h	2002-11-24 17:20:20.000000000 +0100
@@ -0,0 +1,49 @@
+#ifndef _LINUX_MQUEUE_H
+#define _LINUX_MQUEUE_H
+
+#define MQ_MAXMSG 	40	/* max number of messages in each queue */
+#define MQ_MAXSYSSIZE	1048576	/* max size that all m.q. can have together */
+#define MQ_PRIO_MAX 	100000	/* max priority */
+
+#define MSGFS_MAGIC 0x4D455347
+#define  _POSIX_MESSAGE_PASSING 1
+
+typedef int mqd_t;		/* message queue descriptor */
+
+struct mq_attr {
+	long mq_flags;		/* message queue flags */
+	long mq_maxmsg;		/* maximum number of messages */
+	long mq_msgsize;	/* maximum message size */
+	long mq_curmsgs;	/* number of messages currently queued */
+};
+
+asmlinkage mqd_t sys_mq_open(const char *u_path, int oflag, mode_t mode,
+	struct mq_attr *u_attr);
+asmlinkage int sys_mq_close(mqd_t mqdes);
+asmlinkage int sys_mq_unlink(const char *u_name);
+asmlinkage int sys_mq_timedsend(mqd_t mqdes, const char *msg_ptr, 
+	size_t msg_len, unsigned int msg_prio, struct timespec *utime);
+asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char *msg_ptr, 
+	size_t msg_len, unsigned int *msg_prio, struct timespec *utime);
+asmlinkage int sys_mq_notify(mqd_t mqdes,
+	const struct sigevent *u_notification);
+asmlinkage int sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat);
+asmlinkage int sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	struct mq_attr *u_omqstat);
+
+#ifdef __KERNEL__
+
+struct mqueue_ds {		/* queue */
+	struct mq_attr attr;
+	struct msg_queue queue;	/* ipc/msg */
+
+	spinlock_t lock;
+	wait_queue_head_t wait_recv;
+	wait_queue_head_t wait_send;
+
+	pid_t notify_pid;	/* who we have to notify (or 0) */
+	struct sigevent notify;	/* notification */
+};
+#endif /* __KERNEL__ */
+
+#endif
diff -X dontdiff -Nur vanilla-2.5.49/include/linux/sys.h linux-2.5.49/include/linux/sys.h
--- vanilla-2.5.49/include/linux/sys.h	2002-11-01 01:15:04.000000000 +0100
+++ linux-2.5.49/include/linux/sys.h	2002-11-01 16:36:48.000000000 +0100
@@ -4,7 +4,7 @@
 /*
  * system call entry points ... but not all are defined
  */
-#define NR_syscalls 260
+#define NR_syscalls 270
 
 /*
  * These are system calls that will be removed at some time
diff -X dontdiff -Nur vanilla-2.5.49/init/Kconfig linux-2.5.49/init/Kconfig
--- vanilla-2.5.49/init/Kconfig	2002-11-21 00:18:37.000000000 +0100
+++ linux-2.5.49/init/Kconfig	2002-11-21 00:26:40.000000000 +0100
@@ -69,6 +69,13 @@
 	  section 6.4 of the Linux Programmer's Guide, available from
 	  <http://www.linuxdoc.org/docs.html#guide>.
 
+config POSIXMSG
+	bool "POSIX message queues"
+	depends on SYSVIPC
+	---help---
+	  This gives you POSIX compliant interfaces for message queues.
+
+
 config BSD_PROCESS_ACCT
 	bool "BSD Process Accounting"
 	help
diff -X dontdiff -Nur vanilla-2.5.49/ipc/Makefile linux-2.5.49/ipc/Makefile
--- vanilla-2.5.49/ipc/Makefile	2002-09-23 15:54:57.000000000 +0200
+++ linux-2.5.49/ipc/Makefile	2002-11-04 17:50:24.000000000 +0100
@@ -6,4 +6,8 @@
 
 obj-$(CONFIG_SYSVIPC) += msg.o sem.o shm.o
 
+ifeq ($(CONFIG_POSIXMSG),y)
+obj-$(CONFIG_SYSVIPC) += posixmsg.o
+endif
+
 include $(TOPDIR)/Rules.make
diff -X dontdiff -Nur vanilla-2.5.49/ipc/msg.c linux-2.5.49/ipc/msg.c
--- vanilla-2.5.49/ipc/msg.c	2002-11-21 00:18:37.000000000 +0100
+++ linux-2.5.49/ipc/msg.c	2002-11-21 00:26:40.000000000 +0100
@@ -127,7 +127,7 @@
 	return msg_buildid(id,msq->q_perm.seq);
 }
 
-static void free_msg(struct msg_msg* msg)
+void free_msg(struct msg_msg* msg)
 {
 	struct msg_msgseg* seg;
 	seg = msg->next;
@@ -139,7 +139,7 @@
 	}
 }
 
-static struct msg_msg* load_msg(void* src, int len)
+struct msg_msg* load_msg(void* src, int len)
 {
 	struct msg_msg* msg;
 	struct msg_msgseg** pseg;
@@ -191,7 +191,7 @@
 	return ERR_PTR(err);
 }
 
-static int store_msg(void* dest, struct msg_msg* msg, int len)
+int store_msg(void* dest, struct msg_msg* msg, int len)
 {
 	int alen;
 	struct msg_msgseg *seg;
diff -X dontdiff -Nur vanilla-2.5.49/ipc/posixmsg.c linux-2.5.49/ipc/posixmsg.c
--- vanilla-2.5.49/ipc/posixmsg.c	1970-01-01 01:00:00.000000000 +0100
+++ linux-2.5.49/ipc/posixmsg.c	2002-11-24 17:27:42.000000000 +0100
@@ -0,0 +1,829 @@
+/*
+ *  linux/ipc/posixmsg.c
+ *
+ *  Copyright 2002 Peter Wächtler <pwaechtler@mac.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * root can mount the filesystem to see the names of the currently
+ * used mqueues. The i_size shows the number of msgs in the queue.
+ * The only shell operation on the "fifos" is an unlink() via rm(1)
+ * The queues are accessed through syscalls - poll is supported
+ *
+ * put MSGFS_MAGIC into mqueue.h
+ * update inode->m_time on send (yes, no posix semantics on this)
+ * update inode->a_time on recv
+ * translate absolute timeouts in kernel to prevent too long sleeps
+ *
+ * TODO:
+ *	check for more sysv msg limits (or add some new, e.g. MQ_PRIO_MAX)?
+ *  implement SIGEV_THREAD with clone_startup(hey, where is it?)
+ *  think about prio based waitqueues: simply enqueue them in order?
+ *   bits/posix_opt.h  claims to support _POSIX_PRIORITY_SCHEDULING
+ */
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/slab.h>
+#include <linux/fs.h>
+#include <linux/mount.h>
+#include <linux/file.h>
+#include <linux/namei.h>
+#include <linux/pagemap.h>	/* PAGE_CACHE_SIZE */
+#include <linux/poll.h>
+
+#include <linux/mqueue.h>
+#include <linux/msg.h>
+#include <asm/uaccess.h>
+
+/* functions used from ipc/msg.c */
+extern void free_msg(struct msg_msg *msg);
+extern int store_msg(void *dest, struct msg_msg *msg, int len);
+extern struct msg_msg *load_msg(void *src, int len);
+
+extern int msg_ctlmnb;		/* default max size of a message queue (all msgs sum up) */
+extern int msg_ctlmni;		/* max # of msg queue identifiers */
+extern int msg_ctlmax;		/* max size of one message (bytes) */
+
+static int mqueue_release(struct inode *inode, struct dentry *dentry);
+static int mqueue_close(struct inode *inode, struct file *filp);
+static unsigned int mqueue_poll(struct file *, struct poll_table_struct *);
+
+static atomic_t msg_bytes = ATOMIC_INIT(0);
+static struct vfsmount *msg_mnt;
+
+static struct file_operations msg_fops = {
+	.llseek = no_llseek,
+	.release = mqueue_close,
+	.poll = mqueue_poll,
+};
+static struct inode_operations msg_dir_inode_operations = {
+	.lookup = simple_lookup,
+	.unlink = mqueue_release,
+};
+
+static struct msg_queue *
+init_queue(struct msg_queue *queue)
+{
+	int retval;
+
+	queue->q_perm.mode = 0;
+	queue->q_perm.key = 0;
+	queue->q_perm.security = NULL;
+	retval = security_ops->msg_queue_alloc_security(queue);
+	if (retval)
+		return ERR_PTR(retval);;
+
+	queue->q_stime = queue->q_rtime = 0;
+	queue->q_qbytes = queue->q_cbytes = queue->q_qnum = 0;
+	queue->q_lspid = queue->q_lrpid = 0;
+	INIT_LIST_HEAD(&queue->q_messages);
+	INIT_LIST_HEAD(&queue->q_receivers);	/* unused */
+	INIT_LIST_HEAD(&queue->q_senders);	/* unused */
+
+	return queue;
+}
+
+static struct inode *
+get_msg_inode(struct super_block *sb, mode_t mode)
+{
+	struct mqueue_ds *q;
+	struct inode *inode = new_inode(sb);
+
+	if (inode) {
+		inode->i_mode = mode;
+		inode->i_uid = current->fsuid;
+		inode->i_gid = current->fsgid;
+		inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME;
+		inode->i_blksize = 1024;
+		switch (mode & S_IFMT) {
+		default:
+		case S_IFIFO:
+		case S_IFREG:
+			inode->i_fop = &msg_fops;
+			if ((q = kmalloc(sizeof (*q), GFP_KERNEL))) {
+				if (!init_queue(&q->queue)) {
+					kfree(q);
+					iput(inode);
+					inode = ERR_PTR(-EACCES);
+				}
+				inode->u.generic_ip = q;
+			} else {
+				iput(inode);
+				inode = ERR_PTR(-ENOSPC);
+			}
+			break;
+		case S_IFDIR:
+			inode->i_op = &msg_dir_inode_operations;
+			inode->i_fop = &simple_dir_operations;
+			/* directory inodes start off with i_nlink == 2 (for "." entry) */
+			inode->i_nlink++;
+			break;
+		case S_IFLNK:
+			break;
+		}
+	} else
+		inode = ERR_PTR(-ENOSPC);
+
+	return inode;
+}
+
+#define get_mqueue(filp)\
+	((filp) ? filp->f_dentry->d_inode->u.generic_ip: filp)
+
+/* don't use fget() to avoid the fput() for no good reason */
+static struct file *
+mqueue_lookup(mqd_t fd)
+{
+	struct files_struct *files = current->files;
+
+	read_lock(&files->file_lock);
+	if (fd >= files->max_fds || fd < 0)
+		goto out_unlock;
+	read_unlock(&files->file_lock);
+	return files->fd[fd];
+
+      out_unlock:
+	read_unlock(&files->file_lock);
+	return NULL;
+}
+
+static inline int
+freespace(struct mqueue_ds *q, size_t msg_len)
+{
+	int rc;
+	spin_lock(&q->lock);
+
+	rc = msg_len + q->queue.q_cbytes <= q->attr.mq_msgsize &&
+	    q->queue.q_qnum < q->attr.mq_maxmsg;
+
+	spin_unlock(&q->lock);
+	return rc;
+}
+
+/* cleans up after close() or exit() */
+static int
+mqueue_close(struct inode *inode, struct file *filp)
+{
+	struct mqueue_ds *q = inode->u.generic_ip;
+
+	if (!q || !filp)
+		return -EBADFD;
+
+	spin_lock(&q->lock);
+	/* remove possible notification;
+	 * sys_getpid() returns the tgid if multithreaded */
+	if (q->notify_pid == current->pid) {
+		q->notify_pid = 0;
+		q->notify.sigev_signo = 0;
+		q->notify.sigev_notify = 0;
+	}
+	spin_unlock(&q->lock);
+	return 0;
+}
+
+/* removes a queue - inode is the inode of the directory */
+static int
+mqueue_release(struct inode *inode, struct dentry *dentry)
+{
+	struct mqueue_ds *q = dentry->d_inode->u.generic_ip;
+	struct msg_queue *queue;
+	struct list_head *tmp;
+
+	if (!q)
+		return -EBADFD;
+
+	queue = &q->queue;
+
+	tmp = queue->q_messages.next;
+	while (tmp != &queue->q_messages) {
+		struct msg_msg *msg = list_entry(tmp, struct msg_msg, m_list);
+		tmp = tmp->next;
+		free_msg(msg);
+	}
+	atomic_sub(queue->q_cbytes, &msg_bytes);
+	security_ops->msg_queue_free_security(queue);
+	dentry->d_inode->u.generic_ip = NULL;
+
+	kfree(q);
+	return 0;
+}
+
+static unsigned int
+mqueue_poll(struct file *filp, struct poll_table_struct *wait)
+{
+	struct mqueue_ds *q = get_mqueue(filp);
+	int ret = 0;
+
+	poll_wait(filp, &q->wait_recv, wait);
+	poll_wait(filp, &q->wait_send, wait);
+
+	if (q->queue.q_qnum)
+		ret = POLLIN | POLLRDNORM;
+
+	if (q->queue.q_qnum < q->attr.mq_maxmsg)
+		ret |= POLLOUT | POLLWRNORM;
+
+	return ret;
+}
+
+static int
+create_queue(struct dentry *dir, struct qstr *qname,
+	     int oflag, mode_t mode, struct mq_attr *u_attr)
+{
+	int ret, fd;
+	struct file *filp;
+	struct mqueue_ds *q;
+	struct inode *inode;
+	struct dentry *dentry;
+
+	inode = get_msg_inode(msg_mnt->mnt_sb, S_IFIFO | (mode & S_IRWXUGO));
+	if (IS_ERR(inode)) {
+		ret = PTR_ERR(inode);
+		goto out_ret;
+	}
+	q = inode->u.generic_ip;
+
+	if (u_attr != NULL) {
+		if (copy_from_user(&q->attr, u_attr, sizeof (struct mq_attr))) {
+			ret = -EFAULT;
+			goto out_inode;
+		}
+		if (q->attr.mq_maxmsg <= 0
+		    || q->attr.mq_msgsize <= 0
+		    || q->attr.mq_maxmsg > MQ_MAXMSG
+		    || q->attr.mq_msgsize > msg_ctlmax) {
+			ret = -EINVAL;
+			goto out_inode;
+		}
+	} else {		/* implementation defined */
+		q->attr.mq_maxmsg = MQ_MAXMSG;
+		q->attr.mq_msgsize = 1024 /*msg_ctlmax */ ;
+	}
+	q->attr.mq_flags = oflag & O_ACCMODE;
+	q->notify_pid = 0;
+	q->notify.sigev_signo = 0;
+	q->notify.sigev_notify = 0;
+	init_waitqueue_head(&q->wait_send);
+	init_waitqueue_head(&q->wait_recv);
+
+	ret = -ENFILE;
+	if ((fd = get_unused_fd()) < 0)
+		goto out_inode;
+	if (!(filp = get_empty_filp()))
+		goto out_fd;
+
+	qname->hash = full_name_hash(qname->name, qname->len);
+	dentry = d_alloc(dir, qname);
+	if (!dentry) {
+		ret = -ENOMEM;
+		goto out_filp;
+	}
+	d_add(dentry, inode);
+	ret = get_write_access(inode);
+	if (ret)
+		goto out_filp;
+
+	filp->f_vfsmnt = mntget(msg_mnt);
+	filp->f_dentry = dget(dentry);	/* leave it active */
+	filp->f_op = &msg_fops;
+	filp->f_mode = (q->attr.mq_flags + 1) & O_ACCMODE;
+	filp->f_flags = oflag;
+
+	/* Now we map fd to filp, so userspace can access it */
+	fd_install(fd, filp);
+	ret = fd;
+	goto out_ret;
+
+      out_filp:
+	put_filp(filp);
+      out_fd:
+	put_unused_fd(fd);
+      out_inode:
+	kfree(q);
+	iput(inode);
+      out_ret:
+	return ret;
+}
+
+/**
+ *	sys_mq_open	-	opens a message queue associated with @u_name 
+ *	@mqdes: descriptor of mqueue
+ *	@oflag: flags like O_CREAT, O_EXCL, O_RDWR
+ *	@mode: when O_CREAT is specified, the permission bits
+ *	@u_attr: pointer to the attributes, like max msgsize, when creating
+ *
+ *	returns a descriptor to the opened queue or negative value on error
+ */
+asmlinkage mqd_t
+sys_mq_open(const char *u_name, int oflag, mode_t mode, struct mq_attr * u_attr)
+{
+	struct file *filp;
+	struct dentry *dentry;
+	struct qstr this;
+	static int oflag2acc[O_ACCMODE] =
+	    { MAY_READ, MAY_WRITE, MAY_READ | MAY_WRITE };
+	int fd, ret;
+
+	if (IS_ERR(this.name = getname(u_name)))
+		return -ENOMEM;
+	this.len = strlen(this.name);
+	dentry = lookup_one_len(this.name, msg_mnt->mnt_root, this.len);
+
+	if (IS_ERR(dentry)) {
+		ret = PTR_ERR(dentry);
+		goto out_ret;
+	}
+	if (oflag & O_CREAT) {
+		if (dentry->d_inode) {
+			/* entry exists already */
+			if (oflag & O_EXCL) {
+				ret = -EEXIST;
+			} else {
+				goto open_existing;
+			}
+			goto out_dput;
+		} else {
+			ret =
+			    create_queue(msg_mnt->mnt_root, &this, oflag, mode,
+					 u_attr);
+		}
+
+	} else {		/* O_CREAT */
+		if (!dentry->d_inode) {
+			ret = -ENOENT;
+		} else {
+open_existing:
+			if (permission
+			    (dentry->d_inode, oflag2acc[oflag & O_ACCMODE])) {
+				ret = -EACCES;
+			} else {
+				fd = get_unused_fd();
+				if (fd >= 0) {
+					mntget(msg_mnt);
+					filp =
+					    dentry_open(dentry, msg_mnt, oflag);
+					filp->f_op = &msg_fops;
+					if (IS_ERR(filp)) {
+						ret = PTR_ERR(filp);
+						put_unused_fd(fd);
+						goto out_dput;
+					}
+					dget(dentry);
+					fd_install(fd, filp);
+				}
+				ret = fd;
+			}
+		}
+	}
+out_dput:
+	dput(dentry);
+      out_ret:
+	putname(this.name);
+
+	return ret;
+}
+
+/**
+ *	sys_mq_unlink	-	removes a message queue from the namespace 
+ *	
+ *	@u_name: pointer to the name
+ */
+asmlinkage int
+sys_mq_unlink(const char *u_name)
+{
+	int err;
+	struct dentry *dentry, *dir;
+	char *name = getname(u_name);
+
+	if (IS_ERR(name))
+		return PTR_ERR(name);
+
+	dentry = lookup_one_len(name, msg_mnt->mnt_root, strlen(name));
+	putname(name);
+	if (IS_ERR(dentry))
+		return PTR_ERR(dentry);
+
+	if (!dentry->d_inode)
+		return -ENOENT;
+
+	err = -EACCES;
+	dir = dentry->d_parent;
+	if (dir) {
+		err = vfs_unlink(dir->d_inode, dentry);
+		if (!err)
+			d_delete(dentry);
+	}
+	dput(dentry);
+	return err;
+}
+
+static inline long get_timeout( struct timespec *abs)
+{
+	struct timespec t;
+
+	if (abs->tv_nsec >= 1000000000L || abs->tv_nsec < 0 || abs->tv_sec < 0)
+		return -EINVAL;
+	t=current_kernel_time();
+	if (t.tv_sec > abs->tv_sec || 
+	(t.tv_sec == abs->tv_sec && t.tv_nsec > abs->tv_nsec))
+		return -ETIMEDOUT;
+
+	t.tv_sec = abs->tv_sec - t.tv_sec;
+	t.tv_nsec = abs->tv_nsec - t.tv_nsec;
+	if (t.tv_nsec < 0){
+		t.tv_sec--;
+		t.tv_nsec+= 1000000000;
+	}
+	return timespec_to_jiffies(&t) + 1;
+}
+
+/**
+ *	sys_mq_timedsend	-	send a message to the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@msg_ptr: pointer to buffer holding the message
+ *	@msg_len: length of the message
+ *	@msg_prio: the priority of the message
+ *	@utime: if !NULL the function will only block for specified time
+ */
+asmlinkage int
+sys_mq_timedsend(mqd_t mqdes,
+		 const char *msg_ptr, size_t msg_len,
+		 unsigned int msg_prio, struct timespec *utime)
+{
+	struct siginfo sig_i;
+	struct msg_msg *msg;
+	struct list_head *p;
+	struct msg_queue *queue;
+	int err;
+	long timeout;
+	struct timespec ts;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+	if ((filp->f_mode & O_ACCMODE) == O_RDONLY)
+		return -EBADF;
+	if ((unsigned int) msg_prio >= (unsigned int) MQ_PRIO_MAX)
+		return -EINVAL;
+	if (msg_len > q->attr.mq_msgsize)
+		return -EMSGSIZE;
+
+	queue = &q->queue;
+	if ((filp->f_flags & O_NONBLOCK) && !freespace(q, msg_len))
+		return -EAGAIN;
+
+	/* check if this message will exceed overall limit for messages */
+	if (atomic_read(&msg_bytes) + msg_len > MQ_MAXSYSSIZE)
+		return -ENOMEM;
+
+	if (utime) {
+		if (copy_from_user(&ts, utime, sizeof (ts)))
+			return -EFAULT;
+		if ((timeout = get_timeout(&ts))<0)
+			return timeout;
+	} else
+		timeout = 0L;
+
+	msg = load_msg((char *) msg_ptr, msg_len);
+	if (IS_ERR(msg))
+		return PTR_ERR(msg);
+
+	msg->m_type = msg_prio;
+	msg->m_ts = msg_len;
+
+	if (!timeout)
+		timeout = MAX_SCHEDULE_TIMEOUT;
+
+	err = wait_event_interruptible_timeout(q->wait_send,
+					       freespace(q, msg_len), timeout);
+
+	if (err == -ERESTARTSYS)
+		return -EINTR;
+	if (err == 0)
+		return -ETIMEDOUT;
+
+	err = 0;
+	/* if we lose the race for the lock, we can overflow the limits */
+	spin_lock(&q->lock);
+	/* enqueue message in prio order */
+	p = queue->q_messages.next;	/* used as flag if msg was queued */
+	if (msg_prio > 0 && !list_empty(&queue->q_messages)) {
+
+		list_for_each(p, &queue->q_messages) {
+			struct msg_msg *tmp =
+			    list_entry(p, struct msg_msg, m_list);
+			if (tmp->m_type < msg_prio) {
+				list_add_tail(&msg->m_list, p);
+				p = NULL;
+				break;
+			}
+		}
+	}
+	if (p)			/* ok, put it at the end */
+		list_add_tail(&msg->m_list, &queue->q_messages);
+
+	queue->q_lspid = current->pid;
+	queue->q_cbytes += msg_len;
+	atomic_add(msg_len, &msg_bytes);
+	queue->q_qnum++;
+	filp->f_dentry->d_inode->i_size = queue->q_qnum;
+	filp->f_dentry->d_inode->i_mtime = CURRENT_TIME;
+
+	if (waitqueue_active(&q->wait_recv)) {
+		wake_up_interruptible(&q->wait_recv);
+	} else {
+		/* since there was no synchronously waiting process for message
+		 * we notify it when the state of queue changed from
+		 * empty to not empty */
+		if (q->notify_pid != 0 && queue->q_qnum == 1) {
+			/* TODO: Add support for sigev_notify==SIGEV_THREAD
+			 *    should we really create a thread? I think so.
+			 */
+			if (q->notify.sigev_notify == SIGEV_THREAD) {
+				
+				err = -ENOSYS;
+				pr_info
+				    ("mq_*send: SIGEV_THREAD not supported\n");
+			}
+			/* sends signal */
+			if (q->notify.sigev_notify == SIGEV_SIGNAL) {
+				sig_i.si_signo = q->notify.sigev_signo;
+				sig_i.si_errno = 0;
+				sig_i.si_code = SI_MESGQ;
+				sig_i.si_pid = current->pid;
+				sig_i.si_uid = current->uid;
+				kill_proc_info(q->notify.sigev_signo,
+					       &sig_i, q->notify_pid);
+			}
+			/* after notification unregisters process */
+			q->notify_pid = 0;
+		}
+	}
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_timedreceive	-	receive a message from the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@msg_ptr: pointer to buffer to hold the message
+ *	@msg_len: length of the userspace buffer
+ *	@msg_prio: will hold the priority if a message was received
+ *	@utime: if !NULL the function will only block for specified time
+ */
+asmlinkage ssize_t
+sys_mq_timedreceive(mqd_t mqdes,
+		    char *msg_ptr, size_t msg_len,
+		    unsigned int *msg_prio, struct timespec * utime)
+{
+	struct msg_queue *queue;
+	struct msg_msg *msg;
+	int err;
+	long timeout;
+	struct timespec ts;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	if (!(filp->f_mode & FMODE_READ))
+		return -EBADF;
+
+	queue = &q->queue;
+	if ((filp->f_flags & O_NONBLOCK) && queue->q_qnum == 0)
+		return -EAGAIN;
+
+	if (utime) {
+		if (copy_from_user(&ts, utime, sizeof (ts)))
+			return -EFAULT;
+		if ((timeout = get_timeout(&ts))<0)
+			return timeout;
+	} else
+		timeout = 0L;
+wait_on_msg:
+	if (!timeout)
+		timeout = MAX_SCHEDULE_TIMEOUT;
+
+	err = wait_event_interruptible_timeout(q->wait_recv,
+					       queue->q_qnum > 0, timeout);
+
+	if (err == -ERESTARTSYS)
+		return -EINTR;
+	if (err == 0)
+		return -ETIMEDOUT;
+
+	err = 0;
+	spin_lock(&q->lock);
+	if (!list_empty(&queue->q_messages)) {
+
+		msg =
+		    list_entry(queue->q_messages.next, struct msg_msg, m_list);
+		if (msg_len < msg->m_ts) {
+			err = -EMSGSIZE;
+			goto out_unlock;
+		}
+		list_del(&msg->m_list);
+		queue->q_lrpid = current->pid;
+		queue->q_cbytes -= msg->m_ts;
+		atomic_sub(msg->m_ts, &msg_bytes);
+		queue->q_qnum--;
+		filp->f_dentry->d_inode->i_size = queue->q_qnum;
+		filp->f_dentry->d_inode->i_atime = CURRENT_TIME;
+
+		wake_up_interruptible(&q->wait_send);
+
+		msg_len = (msg_len > msg->m_ts) ? msg->m_ts : msg_len;
+
+		spin_unlock(&q->lock);
+		if ((err = store_msg(msg_ptr, msg, msg_len)) ||
+		    put_user(msg->m_type, msg_prio)) {
+			msg_len = -EFAULT;	/* hmh, now the msg is lost */
+		}
+		free_msg(msg);
+		return msg_len;
+	} else {
+		spin_unlock(&q->lock);
+		goto wait_on_msg;
+	}
+      out_unlock:
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_notify	-	set or remove a notification on the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_notification: pointer to struct sigevent 
+ *
+ */
+asmlinkage int
+sys_mq_notify(mqd_t mqdes, const struct sigevent *u_notification)
+{
+	struct sigevent notify;
+	struct inode *inode;
+	struct file *filp = mqueue_lookup(mqdes);
+	struct mqueue_ds *q = get_mqueue(filp);
+	int err = 0;
+
+	if (!q)
+		return -EBADF;
+	if (u_notification != NULL)
+		if (copy_from_user
+		    (&notify, u_notification, sizeof (struct sigevent)))
+			return -EFAULT;
+
+	inode = filp->f_dentry->d_inode;
+	spin_lock(&q->lock);
+	if (q->notify_pid == current->pid
+	    && (u_notification == NULL || notify.sigev_notify == SIGEV_NONE)) {
+		q->notify_pid = 0;	/* remove notification */
+		q->notify.sigev_signo = 0;
+		q->notify.sigev_notify = 0;
+	} else if (q->notify_pid > 0) {
+		err = -EBUSY;
+	} else if (u_notification != NULL) {
+		if (notify.sigev_notify == SIGEV_SIGNAL) {
+			/* add notification */
+			q->notify_pid = current->pid;
+			q->notify.sigev_signo = notify.sigev_signo;
+			q->notify.sigev_notify = notify.sigev_notify;
+		} else if (notify.sigev_notify == SIGEV_THREAD) {
+			err = -ENOSYS;
+			pr_info("mq_*send: SIGEV_THREAD not supported yet\n");
+		}
+	}
+	spin_unlock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_getattr	-	get the attributes of the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_mqstat: pointer to struct holding the new values
+ *
+ */
+asmlinkage int
+sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat)
+{
+	int err = 0;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	spin_lock(&q->lock);
+	q->attr.mq_flags = (filp->f_mode - 1) & O_ACCMODE;
+	q->attr.mq_flags = (filp->f_flags) & O_NONBLOCK;
+	q->attr.mq_curmsgs = q->queue.q_qnum;
+	if (copy_to_user(u_mqstat, &q->attr, sizeof (struct mq_attr)))
+		err = -EFAULT;
+	spin_lock(&q->lock);
+	return err;
+}
+
+/**
+ *	sys_mq_setattr	-	set the attributes of the queue associated 
+ *	with the descriptor mqdes
+ *	@mqdes: descriptor of mqueue
+ *	@u_mqstat: pointer to struct holding the new values
+ *	@u_omqstat: pointer to store the original attributes (if !NULL)
+ *
+ */
+asmlinkage int
+sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	       struct mq_attr *u_omqstat)
+{
+	struct mq_attr mqstat;
+	struct mqueue_ds *q;
+	struct file *filp = mqueue_lookup(mqdes);
+
+	if (!(q = get_mqueue(filp)))
+		return -EBADF;
+
+	spin_lock(&q->lock);
+	if (u_omqstat != NULL) {
+		q->attr.mq_flags = (filp->f_mode - 1) & O_ACCMODE;
+		q->attr.mq_flags = (filp->f_flags) & O_NONBLOCK;
+		q->attr.mq_curmsgs = q->queue.q_qnum;
+		if (copy_to_user(u_omqstat, &q->attr, sizeof (struct mq_attr))) {
+			spin_unlock(&q->lock);
+			return -EFAULT;
+		}
+	}
+	if (copy_from_user(&mqstat, u_mqstat, sizeof (struct mq_attr))) {
+		spin_unlock(&q->lock);
+		return -EFAULT;
+	}
+	if (mqstat.mq_flags & O_NONBLOCK)
+		filp->f_flags |= O_NONBLOCK;
+	else
+		filp->f_flags &= ~O_NONBLOCK;
+
+	spin_unlock(&q->lock);
+	return 0;
+}
+
+static struct super_operations msg_s_ops = {
+	.statfs = simple_statfs,
+	.drop_inode = generic_delete_inode,
+};
+
+static int
+msg_fill_super(struct super_block *sb, void *data, int silent)
+{
+	struct inode *root;
+	struct dentry *root_dentry;
+
+	sb->s_blocksize = PAGE_CACHE_SIZE;
+	sb->s_blocksize_bits = PAGE_CACHE_SHIFT;
+	sb->s_magic = MSGFS_MAGIC;
+	sb->s_op = &msg_s_ops;
+
+	root = get_msg_inode(sb, S_IFDIR | S_IRWXUGO | S_ISVTX);
+	if (!root)
+		goto out;
+	root_dentry = d_alloc_root(root);
+	if (!root_dentry)
+		goto out_iput;
+	sb->s_root = root_dentry;
+	return 0;
+
+      out_iput:
+	iput(root);
+      out:
+	return -ENOMEM;
+}
+
+static struct super_block *
+msgfs_get_sb(struct file_system_type *fs_type,
+	     int flags, char *dev_name, void *data)
+{
+	return get_sb_single(fs_type, flags, data, msg_fill_super);
+}
+
+static struct file_system_type msg_fs_type = {
+	.name = "msgfs",
+	.get_sb = msgfs_get_sb,
+	.kill_sb = kill_anon_super,
+};
+
+static int __init
+mqueue_init(void)
+{
+	register_filesystem(&msg_fs_type);
+	if (IS_ERR(msg_mnt = kern_mount(&msg_fs_type)))
+		return PTR_ERR(msg_mnt);
+
+	return 0;
+}
+
+__initcall(mqueue_init);
diff -X dontdiff -Nur vanilla-2.5.49/ipc/util.c linux-2.5.49/ipc/util.c
--- vanilla-2.5.49/ipc/util.c	2002-11-23 17:04:49.000000000 +0100
+++ linux-2.5.49/ipc/util.c	2002-11-23 17:14:05.000000000 +0100
@@ -24,6 +24,7 @@
 #include <linux/security.h>
 #include <linux/rcupdate.h>
 #include <linux/workqueue.h>
+#include <linux/mqueue.h>
 
 #if defined(CONFIG_SYSVIPC)
 
@@ -585,3 +586,52 @@
 }
 
 #endif /* CONFIG_SYSVIPC */
+
+#if defined(CONFIG_POSIXMSG)
+/* nothing yet */
+#else
+/*
+ * Dummy functions when POSIXMSG isn't configured
+ */
+
+asmlinkage mqd_t sys_mq_open(const char *u_path, int oflag, mode_t mode,
+	struct mq_attr *u_attr)
+{
+	return (mqd_t) -ENOSYS;
+}
+
+asmlinkage int sys_mq_unlink(const char *u_name)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_timedsend(mqd_t mqdes, const char *msg_ptr, 
+	size_t msg_len, unsigned int msg_prio, struct timespec *utime)
+{
+	return -ENOSYS;
+}
+
+asmlinkage ssize_t sys_mq_timedreceive(mqd_t mqdes, char *msg_ptr, 
+	size_t msg_len, unsigned int *msg_prio, struct timespec *utime)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_notify(mqd_t mqdes,
+	const struct sigevent *u_notification)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_getattr(mqd_t mqdes, struct mq_attr *u_mqstat)
+{
+	return -ENOSYS;
+}
+
+asmlinkage int sys_mq_setattr(mqd_t mqdes, const struct mq_attr *u_mqstat,
+	struct mq_attr *u_omqstat)
+{
+	return -ENOSYS;
+}
+
+#endif				/* CONFIG_POSIXMSG */

^ permalink raw reply	[flat|nested] 11+ messages in thread

end of thread, other threads:[~2002-11-26 11:04 UTC | newest]

Thread overview: 11+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2002-11-10 23:44 [PATCH] unified SysV and POSIX mqueues - complete rewrite Peter Waechtler
     [not found] <EDC461A30AC4D511ADE10002A5072CAD04C70992@orsmsx119.jf.intel.com>
2002-11-13 15:10 ` Michal Wronski
2002-11-13 22:25   ` Peter Waechtler
2002-11-19 11:28     ` Krzysztof Benedyczak
2002-11-19 16:42       ` Peter Waechtler
2002-11-24 11:55   ` Michal Wronski
2002-11-26  9:12     ` Michal Wronski
2002-11-26 11:16       ` Peter Waechtler
  -- strict thread matches above, loose matches on Subject: below --
2002-11-19 19:12 Perez-Gonzalez, Inaky
2002-11-24 23:05 Peter Waechtler
     [not found] ` <3DE15F9E.A585E26F@digeo.com>
2002-11-25  1:02   ` Peter Waechtler

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox