ipc/msg: batch queue sender wakeups
[cascardo/linux.git] / ipc / msg.c
index b1fb06a..d320024 100644 (file)
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -166,14 +166,15 @@ static inline void ss_del(struct msg_sender *mss)
                list_del(&mss->list);
 }
 
-static void ss_wakeup(struct list_head *h, int kill)
+static void ss_wakeup(struct list_head *h,
+                     struct wake_q_head *wake_q, int kill)
 {
        struct msg_sender *mss, *t;
 
        list_for_each_entry_safe(mss, t, h, list) {
                if (kill)
                        mss->list.next = NULL;
-               wake_up_process(mss->tsk);
+               wake_q_add(wake_q, mss->tsk);
        }
 }
 
@@ -203,7 +204,7 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
        WAKE_Q(wake_q);
 
        expunge_all(msq, -EIDRM, &wake_q);
-       ss_wakeup(&msq->q_senders, 1);
+       ss_wakeup(&msq->q_senders, &wake_q, 1);
        msg_rmid(ns, msq);
        ipc_unlock_object(&msq->q_perm);
        wake_up_q(&wake_q);
@@ -331,7 +332,6 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
        struct kern_ipc_perm *ipcp;
        struct msqid64_ds uninitialized_var(msqid64);
        struct msg_queue *msq;
-       WAKE_Q(wake_q);
        int err;
 
        if (cmd == IPC_SET) {
@@ -362,6 +362,9 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
                freeque(ns, ipcp);
                goto out_up;
        case IPC_SET:
+       {
+               WAKE_Q(wake_q);
+
                if (msqid64.msg_qbytes > ns->msg_ctlmnb &&
                    !capable(CAP_SYS_RESOURCE)) {
                        err = -EPERM;
@@ -376,15 +379,21 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
                msq->q_qbytes = msqid64.msg_qbytes;
 
                msq->q_ctime = get_seconds();
-               /* sleeping receivers might be excluded by
+               /*
+                * Sleeping receivers might be excluded by
                 * stricter permissions.
                 */
                expunge_all(msq, -EAGAIN, &wake_q);
-               /* sleeping senders might be able to send
+               /*
+                * Sleeping senders might be able to send
                 * due to a larger queue size.
                 */
-               ss_wakeup(&msq->q_senders, 0);
-               break;
+               ss_wakeup(&msq->q_senders, &wake_q, 0);
+               ipc_unlock_object(&msq->q_perm);
+               wake_up_q(&wake_q);
+
+               goto out_unlock1;
+       }
        default:
                err = -EINVAL;
                goto out_unlock1;
@@ -392,7 +401,6 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 
 out_unlock0:
        ipc_unlock_object(&msq->q_perm);
-       wake_up_q(&wake_q);
 out_unlock1:
        rcu_read_unlock();
 out_up:
@@ -809,6 +817,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
        struct msg_queue *msq;
        struct ipc_namespace *ns;
        struct msg_msg *msg, *copy = NULL;
+       WAKE_Q(wake_q);
 
        ns = current->nsproxy->ipc_ns;
 
@@ -873,7 +882,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
                        msq->q_cbytes -= msg->m_ts;
                        atomic_sub(msg->m_ts, &ns->msg_bytes);
                        atomic_dec(&ns->msg_hdrs);
-                       ss_wakeup(&msq->q_senders, 0);
+                       ss_wakeup(&msq->q_senders, &wake_q, 0);
 
                        goto out_unlock0;
                }
@@ -945,6 +954,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 
 out_unlock0:
        ipc_unlock_object(&msq->q_perm);
+       wake_up_q(&wake_q);
 out_unlock1:
        rcu_read_unlock();
        if (IS_ERR(msg)) {