RDS: Rewrite rds_send_drop_to() for clarity
authorAndy Grover <andy.grover@oracle.com>
Sat, 20 Feb 2010 02:01:41 +0000 (18:01 -0800)
committerAndy Grover <andy.grover@oracle.com>
Thu, 9 Sep 2010 01:07:32 +0000 (18:07 -0700)
This function has been the source of numerous bugs; it's just
too complicated. Simplified to nest spinlocks cleanly within
the second loop body, and kick out early if there are no
rms to drop.

This will be a little slower because conn lock is grabbed for
each entry instead of "caching" the lock across rms, but this
should be entirely irrelevant to fastpath performance.

Signed-off-by: Andy Grover <andy.grover@oracle.com>
net/rds/send.c

index 9c1c6bc..aee58f9 100644 (file)
@@ -619,9 +619,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
 {
        struct rds_message *rm, *tmp;
        struct rds_connection *conn;
-       unsigned long flags, flags2;
+       unsigned long flags;
        LIST_HEAD(list);
-       int wake = 0;
 
        /* get all the messages we're dropping under the rs lock */
        spin_lock_irqsave(&rs->rs_lock, flags);
@@ -631,59 +630,54 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
                             dest->sin_port != rm->m_inc.i_hdr.h_dport))
                        continue;
 
-               wake = 1;
                list_move(&rm->m_sock_item, &list);
                rds_send_sndbuf_remove(rs, rm);
                clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
        }
 
        /* order flag updates with the rs lock */
-       if (wake)
-               smp_mb__after_clear_bit();
+       smp_mb__after_clear_bit();
 
        spin_unlock_irqrestore(&rs->rs_lock, flags);
 
-       conn = NULL;
+       if (list_empty(&list))
+               return;
 
-       /* now remove the messages from the conn list as needed */
+       /* Remove the messages from the conn */
        list_for_each_entry(rm, &list, m_sock_item) {
-               /* We do this here rather than in the loop above, so that
-                * we don't have to nest m_rs_lock under rs->rs_lock */
-               spin_lock_irqsave(&rm->m_rs_lock, flags2);
-               /* If this is a RDMA operation, notify the app. */
-               spin_lock(&rs->rs_lock);
-               __rds_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED);
-               spin_unlock(&rs->rs_lock);
-               rm->m_rs = NULL;
-               spin_unlock_irqrestore(&rm->m_rs_lock, flags2);
+
+               conn = rm->m_inc.i_conn;
+               spin_lock_irqsave(&conn->c_lock, flags);
 
                /*
-                * If we see this flag cleared then we're *sure* that someone
-                * else beat us to removing it from the conn.  If we race
-                * with their flag update we'll get the lock and then really
-                * see that the flag has been cleared.
+                * Maybe someone else beat us to removing rm from the conn.
+                * If we race with their flag update we'll get the lock and
+                * then really see that the flag has been cleared.
                 */
-               if (!test_bit(RDS_MSG_ON_CONN, &rm->m_flags))
+               if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
+                       spin_unlock_irqrestore(&conn->c_lock, flags);
                        continue;
-
-               if (conn != rm->m_inc.i_conn) {
-                       if (conn)
-                               spin_unlock_irqrestore(&conn->c_lock, flags);
-                       conn = rm->m_inc.i_conn;
-                       spin_lock_irqsave(&conn->c_lock, flags);
                }
 
-               if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
-                       list_del_init(&rm->m_conn_item);
-                       rds_message_put(rm);
-               }
-       }
+               /*
+                * Couldn't grab m_rs_lock in top loop (lock ordering),
+                * but we can now.
+                */
+               spin_lock(&rm->m_rs_lock);
 
-       if (conn)
+               spin_lock(&rs->rs_lock);
+               __rds_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED);
+               spin_unlock(&rs->rs_lock);
+
+               rm->m_rs = NULL;
+               spin_unlock(&rm->m_rs_lock);
+
+               list_del_init(&rm->m_conn_item);
+               rds_message_put(rm);
                spin_unlock_irqrestore(&conn->c_lock, flags);
+       }
 
-       if (wake)
-               rds_wake_sk_sleep(rs);
+       rds_wake_sk_sleep(rs);
 
        while (!list_empty(&list)) {
                rm = list_entry(list.next, struct rds_message, m_sock_item);