x86/smpboot: Init apic mapping before usage
[cascardo/linux.git] / ipc / msg.c
index c6521c2..e12307d 100644 (file)
--- a/ipc/msg.c
+++ b/ipc/msg.c
@@ -51,19 +51,14 @@ struct msg_receiver {
        long                    r_msgtype;
        long                    r_maxsize;
 
-       /*
-        * Mark r_msg volatile so that the compiler
-        * does not try to get smart and optimize
-        * it. We rely on this for the lockless
-        * receive algorithm.
-        */
-       struct msg_msg          *volatile r_msg;
+       struct msg_msg          *r_msg;
 };
 
 /* one msg_sender for each sleeping sender */
 struct msg_sender {
        struct list_head        list;
        struct task_struct      *tsk;
+       size_t                  msgsz;
 };
 
 #define SEARCH_ANY             1
@@ -159,45 +154,72 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params)
        return msq->q_perm.id;
 }
 
-static inline void ss_add(struct msg_queue *msq, struct msg_sender *mss)
+static inline bool msg_fits_inqueue(struct msg_queue *msq, size_t msgsz)
+{
+       return msgsz + msq->q_cbytes <= msq->q_qbytes &&
+               1 + msq->q_qnum <= msq->q_qbytes;
+}
+
+static inline void ss_add(struct msg_queue *msq,
+                         struct msg_sender *mss, size_t msgsz)
 {
        mss->tsk = current;
+       mss->msgsz = msgsz;
        __set_current_state(TASK_INTERRUPTIBLE);
        list_add_tail(&mss->list, &msq->q_senders);
 }
 
 static inline void ss_del(struct msg_sender *mss)
 {
-       if (mss->list.next != NULL)
+       if (mss->list.next)
                list_del(&mss->list);
 }
 
-static void ss_wakeup(struct list_head *h, int kill)
+static void ss_wakeup(struct msg_queue *msq,
+                     struct wake_q_head *wake_q, bool kill)
 {
        struct msg_sender *mss, *t;
+       struct task_struct *stop_tsk = NULL;
+       struct list_head *h = &msq->q_senders;
 
        list_for_each_entry_safe(mss, t, h, list) {
                if (kill)
                        mss->list.next = NULL;
-               wake_up_process(mss->tsk);
+
+               /*
+                * Stop at the first task we don't wakeup,
+                * we've already iterated the original
+                * sender queue.
+                */
+               else if (stop_tsk == mss->tsk)
+                       break;
+               /*
+                * We are not in an EIDRM scenario here, therefore
+                * verify that we really need to wakeup the task.
+                * To maintain current semantics and wakeup order,
+                * move the sender to the tail on behalf of the
+                * blocked task.
+                */
+               else if (!msg_fits_inqueue(msq, mss->msgsz)) {
+                       if (!stop_tsk)
+                               stop_tsk = mss->tsk;
+
+                       list_move_tail(&mss->list, &msq->q_senders);
+                       continue;
+               }
+
+               wake_q_add(wake_q, mss->tsk);
        }
 }
 
-static void expunge_all(struct msg_queue *msq, int res)
+static void expunge_all(struct msg_queue *msq, int res,
+                       struct wake_q_head *wake_q)
 {
        struct msg_receiver *msr, *t;
 
        list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
-               msr->r_msg = NULL; /* initialize expunge ordering */
-               wake_up_process(msr->r_tsk);
-               /*
-                * Ensure that the wakeup is visible before setting r_msg as
-                * the receiving end depends on it: either spinning on a nil,
-                * or dealing with -EAGAIN cases. See lockless receive part 1
-                * and 2 in do_msgrcv().
-                */
-               smp_wmb(); /* barrier (B) */
-               msr->r_msg = ERR_PTR(res);
+               wake_q_add(wake_q, msr->r_tsk);
+               WRITE_ONCE(msr->r_msg, ERR_PTR(res));
        }
 }
 
@@ -213,11 +235,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 {
        struct msg_msg *msg, *t;
        struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
+       WAKE_Q(wake_q);
 
-       expunge_all(msq, -EIDRM);
-       ss_wakeup(&msq->q_senders, 1);
+       expunge_all(msq, -EIDRM, &wake_q);
+       ss_wakeup(msq, &wake_q, true);
        msg_rmid(ns, msq);
        ipc_unlock_object(&msq->q_perm);
+       wake_up_q(&wake_q);
        rcu_read_unlock();
 
        list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) {
@@ -372,6 +396,9 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
                freeque(ns, ipcp);
                goto out_up;
        case IPC_SET:
+       {
+               WAKE_Q(wake_q);
+
                if (msqid64.msg_qbytes > ns->msg_ctlmnb &&
                    !capable(CAP_SYS_RESOURCE)) {
                        err = -EPERM;
@@ -386,15 +413,21 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
                msq->q_qbytes = msqid64.msg_qbytes;
 
                msq->q_ctime = get_seconds();
-               /* sleeping receivers might be excluded by
+               /*
+                * Sleeping receivers might be excluded by
                 * stricter permissions.
                 */
-               expunge_all(msq, -EAGAIN);
-               /* sleeping senders might be able to send
+               expunge_all(msq, -EAGAIN, &wake_q);
+               /*
+                * Sleeping senders might be able to send
                 * due to a larger queue size.
                 */
-               ss_wakeup(&msq->q_senders, 0);
-               break;
+               ss_wakeup(msq, &wake_q, false);
+               ipc_unlock_object(&msq->q_perm);
+               wake_up_q(&wake_q);
+
+               goto out_unlock1;
+       }
        default:
                err = -EINVAL;
                goto out_unlock1;
@@ -566,7 +599,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode)
        return 0;
 }
 
-static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
+static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
+                                struct wake_q_head *wake_q)
 {
        struct msg_receiver *msr, *t;
 
@@ -577,27 +611,14 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
 
                        list_del(&msr->r_list);
                        if (msr->r_maxsize < msg->m_ts) {
-                               /* initialize pipelined send ordering */
-                               msr->r_msg = NULL;
-                               wake_up_process(msr->r_tsk);
-                               /* barrier (B) see barrier comment below */
-                               smp_wmb();
-                               msr->r_msg = ERR_PTR(-E2BIG);
+                               wake_q_add(wake_q, msr->r_tsk);
+                               WRITE_ONCE(msr->r_msg, ERR_PTR(-E2BIG));
                        } else {
-                               msr->r_msg = NULL;
                                msq->q_lrpid = task_pid_vnr(msr->r_tsk);
                                msq->q_rtime = get_seconds();
-                               wake_up_process(msr->r_tsk);
-                               /*
-                                * Ensure that the wakeup is visible before
-                                * setting r_msg, as the receiving can otherwise
-                                * exit - once r_msg is set, the receiver can
-                                * continue. See lockless receive part 1 and 2
-                                * in do_msgrcv(). Barrier (B).
-                                */
-                               smp_wmb();
-                               msr->r_msg = msg;
 
+                               wake_q_add(wake_q, msr->r_tsk);
+                               WRITE_ONCE(msr->r_msg, msg);
                                return 1;
                        }
                }
@@ -613,6 +634,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
        struct msg_msg *msg;
        int err;
        struct ipc_namespace *ns;
+       WAKE_Q(wake_q);
 
        ns = current->nsproxy->ipc_ns;
 
@@ -654,10 +676,8 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
                if (err)
                        goto out_unlock0;
 
-               if (msgsz + msq->q_cbytes <= msq->q_qbytes &&
-                               1 + msq->q_qnum <= msq->q_qbytes) {
+               if (msg_fits_inqueue(msq, msgsz))
                        break;
-               }
 
                /* queue full, wait: */
                if (msgflg & IPC_NOWAIT) {
@@ -666,7 +686,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
                }
 
                /* enqueue the sender and prepare to block */
-               ss_add(msq, &s);
+               ss_add(msq, &s, msgsz);
 
                if (!ipc_rcu_getref(msq)) {
                        err = -EIDRM;
@@ -686,7 +706,6 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
                        err = -EIDRM;
                        goto out_unlock0;
                }
-
                ss_del(&s);
 
                if (signal_pending(current)) {
@@ -695,10 +714,11 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
                }
 
        }
+
        msq->q_lspid = task_tgid_vnr(current);
        msq->q_stime = get_seconds();
 
-       if (!pipelined_send(msq, msg)) {
+       if (!pipelined_send(msq, msg, &wake_q)) {
                /* no one is waiting for this message, enqueue it */
                list_add_tail(&msg->m_list, &msq->q_messages);
                msq->q_cbytes += msgsz;
@@ -712,6 +732,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 
 out_unlock0:
        ipc_unlock_object(&msq->q_perm);
+       wake_up_q(&wake_q);
 out_unlock1:
        rcu_read_unlock();
        if (msg != NULL)
@@ -829,6 +850,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
        struct msg_queue *msq;
        struct ipc_namespace *ns;
        struct msg_msg *msg, *copy = NULL;
+       WAKE_Q(wake_q);
 
        ns = current->nsproxy->ipc_ns;
 
@@ -893,7 +915,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
                        msq->q_cbytes -= msg->m_ts;
                        atomic_sub(msg->m_ts, &ns->msg_bytes);
                        atomic_dec(&ns->msg_hdrs);
-                       ss_wakeup(&msq->q_senders, 0);
+                       ss_wakeup(msq, &wake_q, false);
 
                        goto out_unlock0;
                }
@@ -919,71 +941,38 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
                rcu_read_unlock();
                schedule();
 
-               /* Lockless receive, part 1:
-                * Disable preemption.  We don't hold a reference to the queue
-                * and getting a reference would defeat the idea of a lockless
-                * operation, thus the code relies on rcu to guarantee the
-                * existence of msq:
+               /*
+                * Lockless receive, part 1:
+                * We don't hold a reference to the queue and getting a
+                * reference would defeat the idea of a lockless operation,
+                * thus the code relies on rcu to guarantee the existence of
+                * msq:
                 * Prior to destruction, expunge_all(-EIRDM) changes r_msg.
                 * Thus if r_msg is -EAGAIN, then the queue not yet destroyed.
-                * rcu_read_lock() prevents preemption between reading r_msg
-                * and acquiring the q_perm.lock in ipc_lock_object().
                 */
                rcu_read_lock();
 
-               /* Lockless receive, part 2:
-                * Wait until pipelined_send or expunge_all are outside of
-                * wake_up_process(). There is a race with exit(), see
-                * ipc/mqueue.c for the details. The correct serialization
-                * ensures that a receiver cannot continue without the wakeup
-                * being visibible _before_ setting r_msg:
-                *
-                * CPU 0                             CPU 1
-                * <loop receiver>
-                *   smp_rmb(); (A) <-- pair -.      <waker thread>
-                *   <load ->r_msg>           |        msr->r_msg = NULL;
-                *                            |        wake_up_process();
-                * <continue>                 `------> smp_wmb(); (B)
-                *                                     msr->r_msg = msg;
+               /*
+                * Lockless receive, part 2:
+                * The work in pipelined_send() and expunge_all():
+                * - Set pointer to message
+                * - Queue the receiver task for later wakeup
+                * - Wake up the process after the lock is dropped.
                 *
-                * Where (A) orders the message value read and where (B) orders
-                * the write to the r_msg -- done in both pipelined_send and
-                * expunge_all.
-                */
-               for (;;) {
-                       /*
-                        * Pairs with writer barrier in pipelined_send
-                        * or expunge_all.
-                        */
-                       smp_rmb(); /* barrier (A) */
-                       msg = (struct msg_msg *)msr_d.r_msg;
-                       if (msg)
-                               break;
-
-                       /*
-                        * The cpu_relax() call is a compiler barrier
-                        * which forces everything in this loop to be
-                        * re-loaded.
-                        */
-                       cpu_relax();
-               }
-
-               /* Lockless receive, part 3:
-                * If there is a message or an error then accept it without
-                * locking.
+                * Should the process wake up before this wakeup (due to a
+                * signal) it will either see the message and continue ...
                 */
+               msg = READ_ONCE(msr_d.r_msg);
                if (msg != ERR_PTR(-EAGAIN))
                        goto out_unlock1;
 
-               /* Lockless receive, part 3:
-                * Acquire the queue spinlock.
-                */
+                /*
+                 * ... or see -EAGAIN, acquire the lock to check the message
+                 * again.
+                 */
                ipc_lock_object(&msq->q_perm);
 
-               /* Lockless receive, part 4:
-                * Repeat test after acquiring the spinlock.
-                */
-               msg = (struct msg_msg *)msr_d.r_msg;
+               msg = msr_d.r_msg;
                if (msg != ERR_PTR(-EAGAIN))
                        goto out_unlock0;
 
@@ -998,6 +987,7 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
 
 out_unlock0:
        ipc_unlock_object(&msq->q_perm);
+       wake_up_q(&wake_q);
 out_unlock1:
        rcu_read_unlock();
        if (IS_ERR(msg)) {