ipc/sem.c: synchronize the proc interface
[cascardo/linux.git] / ipc / sem.c
index 69b6a21..cd6a733 100644 (file)
--- a/ipc/sem.c
+++ b/ipc/sem.c
@@ -243,71 +243,122 @@ static void merge_queues(struct sem_array *sma)
        }
 }
 
+static void sem_rcu_free(struct rcu_head *head)
+{
+       struct ipc_rcu *p = container_of(head, struct ipc_rcu, rcu);
+       struct sem_array *sma = ipc_rcu_to_struct(p);
+
+       security_sem_free(sma);
+       ipc_rcu_free(head);
+}
+
+/*
+ * Wait until all currently ongoing simple ops have completed.
+ * Caller must own sem_perm.lock.
+ * New simple ops cannot start, because simple ops first check
+ * that sem_perm.lock is free.
+ * that a) sem_perm.lock is free and b) complex_count is 0.
+ */
+static void sem_wait_array(struct sem_array *sma)
+{
+       int i;
+       struct sem *sem;
+
+       if (sma->complex_count)  {
+               /* The thread that increased sma->complex_count waited on
+                * all sem->lock locks. Thus we don't need to wait again.
+                */
+               return;
+       }
+
+       for (i = 0; i < sma->sem_nsems; i++) {
+               sem = sma->sem_base + i;
+               spin_unlock_wait(&sem->lock);
+       }
+}
+
 /*
  * If the request contains only one semaphore operation, and there are
  * no complex transactions pending, lock only the semaphore involved.
  * Otherwise, lock the entire semaphore array, since we either have
  * multiple semaphores in our own semops, or we need to look at
  * semaphores from other pending complex operations.
- *
- * Carefully guard against sma->complex_count changing between zero
- * and non-zero while we are spinning for the lock. The value of
- * sma->complex_count cannot change while we are holding the lock,
- * so sem_unlock should be fine.
- *
- * The global lock path checks that all the local locks have been released,
- * checking each local lock once. This means that the local lock paths
- * cannot start their critical sections while the global lock is held.
  */
 static inline int sem_lock(struct sem_array *sma, struct sembuf *sops,
                              int nsops)
 {
-       int locknum;
- again:
-       if (nsops == 1 && !sma->complex_count) {
-               struct sem *sem = sma->sem_base + sops->sem_num;
+       struct sem *sem;
 
-               /* Lock just the semaphore we are interested in. */
-               spin_lock(&sem->lock);
+       if (nsops != 1) {
+               /* Complex operation - acquire a full lock */
+               ipc_lock_object(&sma->sem_perm);
 
-               /*
-                * If sma->complex_count was set while we were spinning,
-                * we may need to look at things we did not lock here.
+               /* And wait until all simple ops that are processed
+                * right now have dropped their locks.
                 */
-               if (unlikely(sma->complex_count)) {
-                       spin_unlock(&sem->lock);
-                       goto lock_array;
-               }
+               sem_wait_array(sma);
+               return -1;
+       }
+
+       /*
+        * Only one semaphore affected - try to optimize locking.
+        * The rules are:
+        * - optimized locking is possible if no complex operation
+        *   is either enqueued or processed right now.
+        * - The test for enqueued complex ops is simple:
+        *      sma->complex_count != 0
+        * - Testing for complex ops that are processed right now is
+        *   a bit more difficult. Complex ops acquire the full lock
+        *   and first wait that the running simple ops have completed.
+        *   (see above)
+        *   Thus: If we own a simple lock and the global lock is free
+        *      and complex_count is now 0, then it will stay 0 and
+        *      thus just locking sem->lock is sufficient.
+        */
+       sem = sma->sem_base + sops->sem_num;
 
+       if (sma->complex_count == 0) {
                /*
-                * Another process is holding the global lock on the
-                * sem_array; we cannot enter our critical section,
-                * but have to wait for the global lock to be released.
+                * It appears that no complex operation is around.
+                * Acquire the per-semaphore lock.
                 */
-               if (unlikely(spin_is_locked(&sma->sem_perm.lock))) {
-                       spin_unlock(&sem->lock);
-                       spin_unlock_wait(&sma->sem_perm.lock);
-                       goto again;
+               spin_lock(&sem->lock);
+
+               /* Then check that the global lock is free */
+               if (!spin_is_locked(&sma->sem_perm.lock)) {
+                       /* spin_is_locked() is not a memory barrier */
+                       smp_mb();
+
+                       /* Now repeat the test of complex_count:
+                        * It can't change anymore until we drop sem->lock.
+                        * Thus: if is now 0, then it will stay 0.
+                        */
+                       if (sma->complex_count == 0) {
+                               /* fast path successful! */
+                               return sops->sem_num;
+                       }
                }
+               spin_unlock(&sem->lock);
+       }
 
-               locknum = sops->sem_num;
+       /* slow path: acquire the full lock */
+       ipc_lock_object(&sma->sem_perm);
+
+       if (sma->complex_count == 0) {
+               /* False alarm:
+                * There is no complex operation, thus we can switch
+                * back to the fast path.
+                */
+               spin_lock(&sem->lock);
+               ipc_unlock_object(&sma->sem_perm);
+               return sops->sem_num;
        } else {
-               int i;
-               /*
-                * Lock the semaphore array, and wait for all of the
-                * individual semaphore locks to go away.  The code
-                * above ensures no new single-lock holders will enter
-                * their critical section while the array lock is held.
+               /* Not a false alarm, thus complete the sequence for a
+                * full lock.
                 */
- lock_array:
-               ipc_lock_object(&sma->sem_perm);
-               for (i = 0; i < sma->sem_nsems; i++) {
-                       struct sem *sem = sma->sem_base + i;
-                       spin_unlock_wait(&sem->lock);
-               }
-               locknum = -1;
+               sem_wait_array(sma);
+               return -1;
        }
-       return locknum;
 }
 
 static inline void sem_unlock(struct sem_array *sma, int locknum)
@@ -374,12 +425,7 @@ static inline struct sem_array *sem_obtain_object_check(struct ipc_namespace *ns
 static inline void sem_lock_and_putref(struct sem_array *sma)
 {
        sem_lock(sma, NULL, -1);
-       ipc_rcu_putref(sma);
-}
-
-static inline void sem_putref(struct sem_array *sma)
-{
-       ipc_rcu_putref(sma);
+       ipc_rcu_putref(sma, ipc_rcu_free);
 }
 
 static inline void sem_rmid(struct ipc_namespace *ns, struct sem_array *s)
@@ -458,14 +504,13 @@ static int newary(struct ipc_namespace *ns, struct ipc_params *params)
        sma->sem_perm.security = NULL;
        retval = security_sem_alloc(sma);
        if (retval) {
-               ipc_rcu_putref(sma);
+               ipc_rcu_putref(sma, ipc_rcu_free);
                return retval;
        }
 
        id = ipc_addid(&sem_ids(ns), &sma->sem_perm, ns->sc_semmni);
        if (id < 0) {
-               security_sem_free(sma);
-               ipc_rcu_putref(sma);
+               ipc_rcu_putref(sma, sem_rcu_free);
                return id;
        }
        ns->used_sems += nsems;
@@ -1047,8 +1092,7 @@ static void freeary(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 
        wake_up_sem_queue_do(&tasks);
        ns->used_sems -= sma->sem_nsems;
-       security_sem_free(sma);
-       ipc_rcu_putref(sma);
+       ipc_rcu_putref(sma, sem_rcu_free);
 }
 
 static unsigned long copy_semid_to_user(void __user *buf, struct semid64_ds *in, int version)
@@ -1292,7 +1336,7 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
                        rcu_read_unlock();
                        sem_io = ipc_alloc(sizeof(ushort)*nsems);
                        if(sem_io == NULL) {
-                               sem_putref(sma);
+                               ipc_rcu_putref(sma, ipc_rcu_free);
                                return -ENOMEM;
                        }
 
@@ -1328,20 +1372,20 @@ static int semctl_main(struct ipc_namespace *ns, int semid, int semnum,
                if(nsems > SEMMSL_FAST) {
                        sem_io = ipc_alloc(sizeof(ushort)*nsems);
                        if(sem_io == NULL) {
-                               sem_putref(sma);
+                               ipc_rcu_putref(sma, ipc_rcu_free);
                                return -ENOMEM;
                        }
                }
 
                if (copy_from_user (sem_io, p, nsems*sizeof(ushort))) {
-                       sem_putref(sma);
+                       ipc_rcu_putref(sma, ipc_rcu_free);
                        err = -EFAULT;
                        goto out_free;
                }
 
                for (i = 0; i < nsems; i++) {
                        if (sem_io[i] > SEMVMX) {
-                               sem_putref(sma);
+                               ipc_rcu_putref(sma, ipc_rcu_free);
                                err = -ERANGE;
                                goto out_free;
                        }
@@ -1629,7 +1673,7 @@ static struct sem_undo *find_alloc_undo(struct ipc_namespace *ns, int semid)
        /* step 2: allocate new undo structure */
        new = kzalloc(sizeof(struct sem_undo) + sizeof(short)*nsems, GFP_KERNEL);
        if (!new) {
-               sem_putref(sma);
+               ipc_rcu_putref(sma, ipc_rcu_free);
                return ERR_PTR(-ENOMEM);
        }
 
@@ -2059,6 +2103,14 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it)
        struct sem_array *sma = it;
        time_t sem_otime;
 
+       /*
+        * The proc interface isn't aware of sem_lock(), it calls
+        * ipc_lock_object() directly (in sysvipc_find_ipc).
+        * In order to stay compatible with sem_lock(), we must wait until
+        * all simple semop() calls have left their critical regions.
+        */
+       sem_wait_array(sma);
+
        sem_otime = get_semotime(sma);
 
        return seq_printf(s,